<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          消息隊(duì)列:消息可靠性、重復(fù)消息、消息積壓、利用消息實(shí)現(xiàn)分布式事務(wù)

          共 7683字,需瀏覽 16分鐘

           ·

          2021-07-17 11:08



          -     如何確保消息不丟失    - 


          1、檢測消息丟失的方法

          可以利用消息隊(duì)列的有序性來驗(yàn)證是否有消息丟失。在Producer端給每個(gè)發(fā)出的消息附加一個(gè)連續(xù)遞增的序號,然后在Consumer端來檢查這個(gè)序號的連續(xù)性。如果沒有消息丟失,Consumer收到消息的序號必然是連續(xù)遞增的,如果檢測到序號不連續(xù),那就是丟消息了。還可以通過缺失的序號來確定丟失的是哪條消息,方便進(jìn)一步排查原因。

          大多數(shù)消息隊(duì)列的 客戶端都支持?jǐn)r截器機(jī)制,可以利用這個(gè)攔截器機(jī)制,在Producer發(fā)送消息之前的攔截器中將序號注入到消息中,在Consumer收到消息的攔截器中檢測序號的連續(xù)性。

          如果是在一個(gè)分布式系統(tǒng)中實(shí)現(xiàn)這個(gè)檢測方法,有幾個(gè)問題需要注意:


          首先,像Kafka和RocketMQ這樣的消息隊(duì)列,是不保證Topic上的嚴(yán)格順序的,只能保證分區(qū)上的消息是有序的,所以在發(fā)消息的時(shí)候必須指定分區(qū),并且,在每個(gè)分區(qū)單獨(dú)檢測消息序號的連續(xù)性。

          如果系統(tǒng)中Producer是多實(shí)例的,由于并不好協(xié)調(diào)多個(gè)Producer之間的發(fā)送順序,所以也需要每個(gè)Producer分別生成各自的消息序號,并且需要附加上Producer的標(biāo)識,在Consumer端按照每個(gè)Producer分別來檢測序號的連續(xù)性Consumer實(shí)例的數(shù)量最好和分區(qū)數(shù)量一致,做到Consumer和分區(qū)一一對應(yīng),這樣會(huì)比較方便地在Consumer內(nèi)檢測消息序號的連續(xù)性。

          2、確保消息可靠傳遞


          一條消息從生產(chǎn)到消費(fèi)完成這個(gè)過程,可以劃分為三個(gè)階段:


          (1)生產(chǎn)階段:在這個(gè)階段,從消息在Producer創(chuàng)建出來,經(jīng)過網(wǎng)絡(luò)傳輸發(fā)送到Broker端;
          (2)存儲階段:在這個(gè)階段,消息在Broker端存儲,如果是集群,消息會(huì)在這個(gè)階段被復(fù)制到其他的副本上;
          (3)消費(fèi)階段:在這個(gè)階段,Consumer從Broker上拉取消息,經(jīng)過網(wǎng)絡(luò)傳輸發(fā)送到Consumer上。

          2.1、生產(chǎn)階段

          在生產(chǎn)階段,消息隊(duì)列通過最常用的請求確認(rèn)機(jī)制,來保證消息的可靠傳遞:當(dāng)在代碼中調(diào)用發(fā)送消息方法時(shí),消息隊(duì)列的客戶端會(huì)把消息發(fā)送到Broker,Broker收到消息后,會(huì)給客戶端返回一個(gè)確認(rèn)響應(yīng),表明消息已經(jīng)收到了。客戶端收到響應(yīng)后,完成了一次正常消息的發(fā)送。

          只要Producer收到了Broker的確認(rèn)響應(yīng)就可以保證消息在生產(chǎn)階段不會(huì)丟失。有些消息隊(duì)列在長時(shí)間沒收到發(fā)送確認(rèn)響應(yīng)后,會(huì)自動(dòng)重試,如果重試再失敗,就會(huì)以返回值或者異常的方式告知用戶。

          在編寫發(fā)送消息代碼時(shí),需要注意,正確處理返回值或者捕獲異常,就可以保證這個(gè)階段的消息不會(huì)丟失。

          以 Kafka 為例:

          同步發(fā)送時(shí),只要注意捕獲異常即可。

          1. try{

          2. producer.send(record).get();

          3. System.out.println("消息發(fā)送成功");

          4. } catch(Exception e) {

          5. System.out.println("消息發(fā)送失敗");

          6. System.out.println(e);

          7. }



          異步發(fā)送時(shí),則需要在回調(diào)方法里進(jìn)行檢查:


          1. producer.send(record, newCallback() {

          2. @Override

          3. publicvoid onCompletion(RecordMetadata metadata, Exception exception) {

          4. if(metadata != null) {

          5. System.out.println("消息發(fā)送成功");

          6. } else{

          7. System.out.println("消息發(fā)送失敗");

          8. System.out.println(exception);

          9. }

          10. }

          11. });

          1. producer.send(record, (metadata, exception) -> {

          2. if(metadata != null) {

          3. System.out.println("消息發(fā)送成功");

          4. } else{

          5. System.out.println("消息發(fā)送失敗");

          6. System.out.println(exception);

          7. }

          8. });




          2.2、存儲階段

          在存儲階段正常情況下,只要Broker在正常運(yùn)行,就不會(huì)出現(xiàn)丟失消息的問題,但是如果Broker出現(xiàn)了故障,比如進(jìn)程死掉了或者服務(wù)器宕機(jī)了,還是可能會(huì)丟失消息的。

          如果對消息的可靠性要求非常高,可以通過配置Broker參數(shù)來避免因?yàn)殄礄C(jī)丟消息。

          對于單個(gè)節(jié)點(diǎn)的Broker,需要配置Broker參數(shù),在收到消息后,將消息寫入磁盤后再給Producer返回確認(rèn)響應(yīng),這樣即使發(fā)生宕機(jī),由于消息已經(jīng)被寫入磁盤,就不會(huì)丟失消息,恢復(fù)后還可以繼續(xù)消費(fèi)。例如,在RocketMQ中,需要將刷盤方式flushDiskType配置為SYNC_FLUSH同步刷盤。

          如果Broker是由多個(gè)節(jié)點(diǎn)組成的集群,需要將Broker集群配置成:至少將消息發(fā)送到2個(gè)以上的節(jié)點(diǎn),再給客戶端回復(fù)發(fā)送確認(rèn)響應(yīng)。這樣當(dāng)某個(gè)Broker宕機(jī)后,其他的Broker可以替代宕機(jī)的Broker,也不會(huì)發(fā)生消息丟失。

          2.3、消費(fèi)階段

          消費(fèi)階段采用和生產(chǎn)階段類似的確認(rèn)機(jī)制來保證消息的可靠傳遞,客戶端從Broker拉取消息后,執(zhí)行用戶的消費(fèi)業(yè)務(wù)邏輯。

          成功后,才會(huì)給Broker發(fā)送消費(fèi)確認(rèn)響應(yīng)。如果Broker沒有收到消費(fèi)確認(rèn)響應(yīng),下次拉消息的時(shí)候還會(huì)返回同一條消息,確認(rèn)消息不會(huì)在網(wǎng)絡(luò)傳輸過程中丟失,也不會(huì)因?yàn)榭蛻舳嗽趫?zhí)行消費(fèi)邏輯中出錯(cuò)導(dǎo)致丟失。

          在編寫消費(fèi)代碼時(shí)需要注意的是,不要在收到消息后就立即發(fā)送消費(fèi)確認(rèn),而是應(yīng)該在執(zhí)行完所有消費(fèi)業(yè)務(wù)邏輯之后,再發(fā)送消費(fèi)確認(rèn)。

          以 SpringBoot 整合 RabbitMQ 為例:



          -     小結(jié)    - 


          在生產(chǎn)階段,需要捕獲消息發(fā)送的錯(cuò)誤,并重發(fā)消息2、在存儲階段,可以通過配置刷盤和復(fù)制相關(guān)的參數(shù),讓消息寫入到多個(gè)副本的磁盤上,來確保消息不會(huì)因?yàn)槟硞€(gè)Broker宕機(jī)或者磁盤損壞而丟失3、在消費(fèi)階段,需要在處理完全部消費(fèi)業(yè)務(wù)邏輯之后,再發(fā)送消費(fèi)確認(rèn)。


          -     如何處理消費(fèi)過程中的重復(fù)消息    - 


          1、消息重復(fù)的情況必然存在


          在MQTT協(xié)議中,給出了三種傳遞消息時(shí)能夠提供的服務(wù)質(zhì)量標(biāo)準(zhǔn),這三種服務(wù)質(zhì)量從低到高依次是:

          At most once:至多一次。消息在傳遞時(shí),最多會(huì)被送達(dá)一次。也就是說,沒什么消息可靠性保證,允許丟消息。一般都是一些對消息可靠性要求不太高的監(jiān)控場景使用,比如每分鐘上報(bào)一次機(jī)房溫度數(shù)據(jù),可以接受數(shù)據(jù)少量丟失。

          At least once:至少一次。消息在傳遞時(shí),至少會(huì)被送達(dá)一次。也就是說,不允許丟消息,但是允許有少量重復(fù)消息出現(xiàn)。

          Exactly once:恰好一次。消息在傳遞時(shí),只會(huì)被送達(dá)一次,不允許丟失也不允許重復(fù),這個(gè)是最高的等級 這個(gè)服務(wù)質(zhì)量標(biāo)準(zhǔn)不僅適用于MQTT,對所有的消息隊(duì)列都是適用的。現(xiàn)在常用的絕大部分消息隊(duì)列提供的服務(wù)質(zhì)量都是 At east once,包括RocketMQ、RabbitMQ和Kafka都是這樣。也就是說,消息隊(duì)列很難保證消息不重復(fù)。

          2、用冪等性解決重復(fù)消息問題


          一般解決重復(fù)消息的辦法是,在消費(fèi)端,讓我們消費(fèi)消息的操作具備冪等性。一個(gè)冪等操作的特點(diǎn)是,其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。一個(gè)冪等的方法使用同樣的參數(shù),對它進(jìn)行多次調(diào)用和一次調(diào)用,對系統(tǒng)產(chǎn)生的影響是一樣的。所以,對于冪等的方法,不用擔(dān)心重復(fù)執(zhí)行會(huì)對系統(tǒng)造成任何改變。

          從對系統(tǒng)的影響結(jié)果來說:At least once+冪等消費(fèi)=Exactly once

          幾種常用的設(shè)計(jì)冪等操作的方法


          1、利用數(shù)據(jù)庫的唯一約束實(shí)現(xiàn)冪等
          舉個(gè)例子:將賬戶X的余額加100元。可以通過改造業(yè)務(wù)邏輯,讓它具備冪等性
          首先,可以限定對于每個(gè)轉(zhuǎn)賬單每個(gè)賬戶只可以執(zhí)行一次變更操作,最簡單的是在數(shù)據(jù)庫建一張轉(zhuǎn)賬流水表,這個(gè)表有三個(gè)字段:轉(zhuǎn)賬單ID、賬戶ID和變更金額,然后給轉(zhuǎn)賬單ID和賬戶ID這兩個(gè)字段聯(lián)合起來創(chuàng)建一個(gè)唯一約束,這樣對于相同的轉(zhuǎn)賬單ID和賬戶ID,表里至多只能存在一條記錄。

          這樣,消費(fèi)消息的邏輯可以變?yōu)椋涸谵D(zhuǎn)賬流水表中增加一條轉(zhuǎn)賬記錄,然后再根據(jù)轉(zhuǎn)賬記錄,異步操作更新用戶余額即可。在轉(zhuǎn)賬流水表增加一條轉(zhuǎn)賬記錄這個(gè)操作中,由于在這個(gè)表中預(yù)先定義了賬戶ID轉(zhuǎn)賬單ID的唯一索引,對于同一個(gè)轉(zhuǎn)賬單同一個(gè)賬戶只能插入一條記錄,后續(xù)重復(fù)的插入操作都會(huì)失敗,這樣就實(shí)現(xiàn)了一個(gè)冪等的操作。

          只要是支持類似INSERT IF NOT EXIST語義的存儲類系統(tǒng)都可以用于實(shí)現(xiàn)冪等,比如,可以用Redis的SETNX命令來替代數(shù)據(jù)庫中的唯一約束,來實(shí)現(xiàn)冪等消費(fèi)。

          2、為更新的數(shù)據(jù)設(shè)置前置條件
          另外一種實(shí)現(xiàn)冪等的思路是,給數(shù)據(jù)變更設(shè)置一個(gè)前置條件,如果滿足條件就更新數(shù)據(jù),否則拒絕更新數(shù)據(jù),在更新數(shù)據(jù)的時(shí)候,同時(shí)變更前置條件中需要判斷的數(shù)據(jù)。這樣,重復(fù)執(zhí)行這個(gè)操作時(shí),由于第一次更新數(shù)據(jù)的時(shí)候已經(jīng)變更了前置條件中需要判斷的數(shù)據(jù),不滿足前置條件,則不會(huì)重復(fù)執(zhí)行更新數(shù)據(jù)操作。

          比如,將賬戶X的余額增加100元這個(gè)操作并不滿足冪等性,可以把這個(gè)操作加上一個(gè)前置條件,變?yōu)椋喝绻~戶X當(dāng)前的月為500元,將余額加100元,這個(gè)操作就具備了冪等性。對應(yīng)到消息隊(duì)列中的使用時(shí),可以在發(fā)消息時(shí)在消息體中帶上當(dāng)前的余額,在消費(fèi)的時(shí)候判斷數(shù)據(jù)庫中當(dāng)前余額是否與消息中的余額相等,只有相等才執(zhí)行變更操作。

          更加通用的方法是,給數(shù)據(jù)增加一個(gè)版本號屬性,每次更新數(shù)據(jù)前,比較當(dāng)前數(shù)據(jù)的版本號是否和消息中的版本號一直,如果不一致就拒絕更新數(shù)據(jù),更新數(shù)據(jù)的同時(shí)將版本號+1,一樣可以實(shí)現(xiàn)冪等更新。

          3、記錄并檢查操作
          還有一種通用性最強(qiáng)的實(shí)現(xiàn)冪等性方法:記錄并檢查操作,也稱為Token機(jī)制或者GUID(全局唯一ID)機(jī)制,實(shí)現(xiàn)思路:在執(zhí)行數(shù)據(jù)更新操作之前,先檢查一下是否執(zhí)行過這個(gè)更新操作。

          具體的實(shí)現(xiàn)方法是,在發(fā)送消息時(shí),給每條消息指定一個(gè)全局唯一的ID,消費(fèi)時(shí),先根據(jù)這個(gè)ID檢查這條消息是否有被消費(fèi)過,如果沒有消費(fèi)過,才更新數(shù)據(jù),然后將消費(fèi)狀態(tài)置為已消費(fèi)。

          但在分布式系統(tǒng)中,這個(gè)方法非常難以實(shí)現(xiàn)。首先,給每個(gè)消息指定一個(gè)全局唯一的ID就是一件不那么簡單的事情,方法有很多,但都不太好同時(shí)滿足簡單、高可用和高性能,或多或少都要有些犧牲。更加麻煩的是,檢查消費(fèi)狀態(tài),然后更新數(shù)據(jù)并且設(shè)置消費(fèi)狀態(tài)這三個(gè)操作必須作為一組操作保證原子性,才能真正實(shí)現(xiàn)冪等,否則就會(huì)出現(xiàn)Bug。


          -     如何處理消息積壓?    - 


          消息積壓的直接原因一定是系統(tǒng)中的某個(gè)部分出現(xiàn)了性能問題,來不及處理上游發(fā)送的消息,才會(huì)導(dǎo)致消息積壓。

          優(yōu)化性能來避免消息積壓


          1、發(fā)送端性能優(yōu)化
          對于發(fā)送消息的業(yè)務(wù)邏輯,只需要設(shè)置合適的并發(fā)和批量大小,就可以達(dá)到很多的發(fā)送性能。

          Producer發(fā)送消息的過程包括:Producer發(fā)送消息給Broker,Broker收到消息返回確認(rèn)響應(yīng)。假設(shè)這一次交互的平均時(shí)延是1ms,這1ms包括了下面這些步驟的耗時(shí):
          • 發(fā)送端準(zhǔn)備數(shù)據(jù)、序列化消息、構(gòu)造請求等邏輯的時(shí)間,也就是發(fā)送端在網(wǎng)絡(luò)請求之前的耗時(shí);

          • 發(fā)送消息和返回響應(yīng)在網(wǎng)絡(luò)傳輸中的耗時(shí);

          • Broker處理消息的時(shí)延。


          如果是單線程發(fā)送,每次只發(fā)送1條消息,那么每秒只能發(fā)送1000ms/1ms*1條/ms=1000條消息。無論是增加每次發(fā)送消息的批量大小,還是增加并發(fā)都能成倍地提升發(fā)送性能。

          比如說,消息發(fā)送端主要接收RPC請求處理在線業(yè)務(wù),因?yàn)樗蠷PC框架都是多線程支持多并發(fā)的,自然就實(shí)現(xiàn)了并行發(fā)送消息。并且在線業(yè)務(wù)比較在意的是請求響應(yīng)時(shí)延,選擇批量發(fā)送會(huì)影響RPC服務(wù)的時(shí)延。

          如果是一個(gè)離線系統(tǒng),它在性能上更注重整個(gè)系統(tǒng)的吞吐量,發(fā)送端的數(shù)據(jù)都是來自于數(shù)據(jù)庫,這種情況就更適合批量發(fā)送。可以批量從數(shù)據(jù)庫讀取數(shù)據(jù),然后批量來發(fā)送消息,同樣用少量的并發(fā)就可以獲得非常高的吞吐量。

          2、消費(fèi)端性能優(yōu)化
          使用消息隊(duì)列的時(shí)候,大部分的性能問題都出現(xiàn)在消費(fèi)端,如果消費(fèi)的速度跟不上發(fā)送生產(chǎn)消息的速度,就會(huì)造成消息積壓。如果這種性能倒掛的問題只是暫時(shí)的,只要消費(fèi)單的性能恢復(fù)之后,超過發(fā)送端的性能,那積壓的消息是可以逐漸被消化掉的。

          要是消費(fèi)速度一直比生產(chǎn)速度慢,時(shí)間長了,整個(gè)系統(tǒng)就會(huì)出現(xiàn)問題,要么,消息隊(duì)列的存儲被填滿無法提供服務(wù),要么消息丟失,這對于整個(gè)系統(tǒng)來說都是嚴(yán)重故障。

          在設(shè)計(jì)系統(tǒng)的時(shí)候,一定要保證消費(fèi)端的消費(fèi)性能要高于生產(chǎn)端的發(fā)送性能。

          消費(fèi)端的性能優(yōu)化除了優(yōu)化消費(fèi)業(yè)務(wù)邏輯之外,也可以通過水平擴(kuò)容,增加消費(fèi)端的并發(fā)數(shù)來提升總體的消費(fèi)性能。在擴(kuò)容Consumer的實(shí)例數(shù)量的同時(shí),必須同步擴(kuò)容主題中的分區(qū)數(shù)量,確保Consumer的實(shí)例數(shù)和分區(qū)數(shù)量是相等的。如果Consumer的實(shí)例數(shù)量超過分區(qū)數(shù)量,這樣的擴(kuò)容是無效的。

          消息積壓了該如何處理?


          還有一種消息積壓的情況是,日常系統(tǒng)正常運(yùn)轉(zhuǎn)的時(shí)候,沒有積壓或者只有少量積壓很快就消費(fèi)掉了,但是某一時(shí)刻,突然就開始積壓消息并且積壓持續(xù)上漲。這種情況下需要在短時(shí)間內(nèi)找到消息積壓的原因,迅速解決問題。

          能導(dǎo)致積壓突然增加,最粗粒度的原因,只有兩種:要么是發(fā)送變快了,要么是消費(fèi)變慢了。

          大部分消息隊(duì)列都內(nèi)置了監(jiān)控的功能,只要通過監(jiān)控?cái)?shù)據(jù),很容易確定是哪種原因。如果是單位事件發(fā)送的消息增多,比如說是趕上大促或者搶購,短時(shí)間內(nèi)不太可能優(yōu)化消費(fèi)端的代碼來提升消費(fèi)性能,唯一的方法是通過擴(kuò)容消費(fèi)端的實(shí)例來提升總體的消費(fèi)能力。

          如果短時(shí)間內(nèi)沒有足夠的服務(wù)器資源進(jìn)行擴(kuò)容,沒辦法的辦法是將系統(tǒng)降級,通過關(guān)閉一些不重要的業(yè)務(wù),減少發(fā)送方發(fā)送的數(shù)據(jù)量,最低限度讓系統(tǒng)還能正常運(yùn)轉(zhuǎn),服務(wù)一些重要業(yè)務(wù)。

          還有一種不太常見的情況,通過監(jiān)控發(fā)現(xiàn),無論是發(fā)送消息的速度還是消費(fèi)消息的速度和原來都沒什么變化,這時(shí)候需要檢查一下消費(fèi)端是不是消費(fèi)失敗導(dǎo)致的一條消息發(fā)福消費(fèi)這種情況比較多,這種情況也會(huì)拖垮整個(gè)系統(tǒng)的消費(fèi)速度。


          -     利用事務(wù)消息實(shí)現(xiàn)分布式事務(wù)    - 


          消息隊(duì)列中的事務(wù),主要解決的是:消息生產(chǎn)者和消息消費(fèi)者的數(shù)據(jù)一致性問題

          拿電商來舉個(gè)例子,一般來說,用戶在電商APP上購物時(shí),先把商品加到購物車?yán)铮缓髱准唐芬黄鹣聠危詈笾Ц叮瓿少徫锪鞒蹋涂梢缘却肇浟恕?/span>

          這個(gè)過程中有一個(gè)需要用到消息隊(duì)列的步驟,訂單系統(tǒng)創(chuàng)建訂單后,發(fā)消息給購物車系統(tǒng),將已下單的商品從購物車中刪除。因?yàn)閺馁徫镘噭h除已下單商品這個(gè)步驟,并不是用戶下單支付這個(gè)主要流程中必需的步驟,使用消息隊(duì)里來異步清理購物車是更加合理的設(shè)計(jì)。


          對于訂單系統(tǒng)來說,它創(chuàng)建訂單的過程中實(shí)際上執(zhí)行了2個(gè)步驟的操作:

          1、在訂單庫中插入一條訂單數(shù)據(jù),創(chuàng)建訂單;
          2、發(fā)消息給消息隊(duì)列,消息的內(nèi)容就是剛剛創(chuàng)建的訂單。

          購物車系統(tǒng)訂閱相應(yīng)的主題,接收訂單創(chuàng)建的消息,然后清理購物車,在購物車中刪除訂單中的商品。

          問題的關(guān)鍵點(diǎn)集中在訂單系統(tǒng),創(chuàng)建訂單和發(fā)送消息這兩個(gè)步驟要么都操作成功,要么都操作失敗,不允許一個(gè)成功而另一個(gè)失敗的情況出現(xiàn)。

          什么是分布式事務(wù)?


          事務(wù)的4個(gè)特性(ACID):

          • 原子性:指一個(gè)事務(wù)操作不可分割,要么成功,要么失敗,不能有一半成功一半失敗的情況。


          • 一致性:指這些數(shù)據(jù)在事務(wù)執(zhí)行完成這個(gè)時(shí)間點(diǎn)之前,讀到的一定是更新前的數(shù)據(jù),之后讀到的一定是更新后的數(shù)據(jù),不應(yīng)該存在一個(gè)時(shí)刻,讓用戶讀到更新過程中的數(shù)據(jù)。


          • 隔離性:指一個(gè)事務(wù)的執(zhí)行不能被其他事務(wù)干擾。即一個(gè)事務(wù)內(nèi)部的操作及使用的數(shù)據(jù)對正在進(jìn)行的其他事務(wù)是隔離的,并發(fā)執(zhí)行的各個(gè)事務(wù)之間不能互相干擾。


          • 持久性:指一個(gè)事務(wù)一旦完成提交,后續(xù)的其他操作和故障都不會(huì)對事務(wù)的結(jié)果產(chǎn)生任何影響 事務(wù)消息適用的場景主要是那些需要異步更新數(shù)據(jù),并且對數(shù)據(jù)實(shí)時(shí)性要求不太高的場景。比如訂單系統(tǒng)的例子,在創(chuàng)建訂單后,如果出現(xiàn)短暫的幾秒,購物車?yán)锏纳唐窙]有及時(shí)情況,也不是完全不可接受的,只要最終購物車的數(shù)據(jù)和訂單數(shù)據(jù)保持一致就可以了。


          2、消息隊(duì)列是如何實(shí)現(xiàn)分布式事務(wù)的?


          回到訂單和購物車這個(gè)例子,來看下如何用消息隊(duì)列來實(shí)現(xiàn)分布式事務(wù):


          首先,訂單系統(tǒng)在消息隊(duì)列上開啟了一個(gè)事務(wù)。然后訂單系統(tǒng)給消息服務(wù)器發(fā)送一個(gè)半消息,這個(gè)半消息包含的內(nèi)容是完整的消息內(nèi)容,和普通消息的唯一區(qū)別是,在事務(wù)提交之前,對于消費(fèi)者來說,這個(gè)消息是不可見的。

          半消息發(fā)送成功后,訂單系統(tǒng)就可以執(zhí)行本地事務(wù)了,在訂單庫中創(chuàng)建一條訂單記錄,并提交訂單庫的數(shù)據(jù)庫事務(wù)。然后根據(jù)本地事務(wù)的執(zhí)行結(jié)果決定提交或者回滾事務(wù)消息。

          如果訂單創(chuàng)建成功,那就提交事務(wù)消息,購物車系統(tǒng)就可以消費(fèi)到這條消息繼續(xù)后續(xù)的流程。如果訂單創(chuàng)建失敗,那就回滾事務(wù)消息,購物車系統(tǒng)就不會(huì)收到這條消息。這樣就基本實(shí)現(xiàn)了要么都成功,要么都失敗的一致性要求。

          如果在第四步提交事務(wù)消息時(shí)失敗了,Kafka會(huì)直接拋出異常,讓用戶自行處理,可以在業(yè)務(wù)代碼中反復(fù)重試提交,直到提交成功,或者刪除之前創(chuàng)建的訂單進(jìn)行補(bǔ)償。

          3、RocketMQ中的分布式事務(wù)實(shí)現(xiàn)


          在RocketMQ中的事務(wù)實(shí)現(xiàn)中,增加了事務(wù)反查的機(jī)制來解決事務(wù)消息提交失敗的問題。

          如果Producer也就是訂單系統(tǒng),在提交或者回滾事務(wù)消息時(shí)發(fā)生網(wǎng)絡(luò)異常,RocketMQ的Broker沒有收到提交或者回滾的請求,Broker會(huì)定期去Producer上反查這個(gè)事務(wù)對應(yīng)的本地事務(wù)的狀態(tài),然后根據(jù)反查結(jié)果決定提交或者回滾這個(gè)事務(wù)。

          為了支撐這個(gè)事務(wù)反查機(jī)制,業(yè)務(wù)代碼中需要實(shí)現(xiàn)一個(gè)反查本地事務(wù)狀態(tài)的接口,告知RocketMQ本地事務(wù)是成功還是失敗。

          在訂單系統(tǒng)的例子中,反查本地事務(wù)的邏輯只要根據(jù)消息中的訂單ID,在訂單庫中查詢這個(gè)訂單是否存在即可,如果訂單存在則返回成功,否則返回失敗。

          RocketMQ會(huì)自動(dòng)根據(jù)事務(wù)反查的結(jié)果,提交或者回滾事務(wù)消息。

          這個(gè)反查本地事務(wù)的實(shí)現(xiàn),并不依賴消息的發(fā)送方,也就是訂單服務(wù)的某個(gè)實(shí)例節(jié)點(diǎn)上的任何數(shù)據(jù)。

          這種情況下,即使是發(fā)送事務(wù)消息的那個(gè)訂單服務(wù)節(jié)點(diǎn)宕機(jī)了,RocketMQ依然可以通過其他訂單服務(wù)的節(jié)點(diǎn)來執(zhí)行反查,確保事務(wù)的完整性。

          使用RocketMQ事務(wù)消息功能實(shí)現(xiàn)分布式事務(wù)的流程如下圖:


          者:邋遢的流浪劍客

          來源:

          https://blog.csdn.net/qq_40378034/article/details/98790433

          瀏覽 62
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黄 片 免费 在 线 观 看 s | 天天射天天射天天射 | 人人爱人人操人人摸 | 成人H动漫精品一区二区无码 | 欧美性爱在线视频播放 |