<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          消息冪等(去重)通用解決方案

          共 7705字,需瀏覽 16分鐘

           ·

          2021-08-13 22:36


          消息中間件是分布式系統(tǒng)常用的組件,無論是異步化、解耦、削峰等都有廣泛的應(yīng)用價值。我們通常會認為,消息中間件是一個可靠的組件——這里所謂的可靠是指,只要我把消息成功投遞到了消息中間件,消息就不會丟失,即消息肯定會至少保證消息能被消費者成功消費一次,這是消息中間件最基本的特性之一,也就是我們常說的“AT LEAST ONCE”,即消息至少會被“成功消費一遍”。

          舉個例子,一個消息M發(fā)送到了消息中間件,消息投遞到了消費程序A,A接受到了消息,然后進行消費,但在消費到一半的時候程序重啟了,這時候這個消息并沒有標(biāo)記為消費成功,這個消息還會繼續(xù)投遞給這個消費者,直到其消費成功了,消息中間件才會停止投遞。

          然而這種可靠的特性導(dǎo)致,消息可能被多次地投遞。舉個例子,還是剛剛這個例子,程序A接受到這個消息M并完成消費邏輯之后,正想通知消息中間件“我已經(jīng)消費成功了”的時候,程序就重啟了,那么對于消息中間件來說,這個消息并沒有成功消費過,所以他還會繼續(xù)投遞。這時候?qū)τ趹?yīng)用程序A來說,看起來就是這個消息明明消費成功了,但是消息中間件還在重復(fù)投遞。

          這在RockectMQ的場景來看,就是同一個messageId的消息重復(fù)投遞下來了。

          基于消息的投遞可靠(消息不丟)是優(yōu)先級更高的,所以消息不重的任務(wù)就會轉(zhuǎn)移到應(yīng)用程序自我實現(xiàn),這也是為什么RocketMQ的文檔里強調(diào)的,消費邏輯需要自我實現(xiàn)冪等。背后的邏輯其實就是:不丟和不重是矛盾的(在分布式場景下),但消息重復(fù)是有解決方案的,而消息丟失是很麻煩的。

          簡單的消息去重解決方案

          例如:假設(shè)我們業(yè)務(wù)的消息消費邏輯是:插入某張訂單表的數(shù)據(jù),然后更新庫存:

          insert into t_order values .....
          update t_inv set count = count-1 where good_id = 'good123';

          要實現(xiàn)消息的冪等,我們可能會采取這樣的方案:

          select * from t_order where order_no = 'order123'

          if(order  != null) {

              return ;//消息重復(fù),直接返回

          }

          這對于很多情況下,的確能起到不錯的效果,但是在并發(fā)場景下,還是會有問題。

          并發(fā)重復(fù)消息

          假設(shè)這個消費的所有代碼加起來需要1秒,有重復(fù)的消息在這1秒內(nèi)(假設(shè)100毫秒)內(nèi)到達(例如生產(chǎn)者快速重發(fā),Broker重啟等),那么很可能,上面去重代碼里面會發(fā)現(xiàn),數(shù)據(jù)依然是空的(因為上一條消息還沒消費完,還沒成功更新訂單狀態(tài)),

          那么就會穿透掉檢查的擋板,最后導(dǎo)致重復(fù)的消息消費邏輯進入到非冪等安全的業(yè)務(wù)代碼中,從而引發(fā)重復(fù)消費的問題(如主鍵沖突拋出異常、庫存被重復(fù)扣減而沒釋放等)

          并發(fā)去重的解決方案之一

          要解決上面并發(fā)場景下的消息冪等問題,一個可取的方案是開啟事務(wù)把select 改成 select for update語句,把記錄進行鎖定。

          select * from t_order where order_no = 'THIS_ORDER_NO' for update  //開啟事務(wù)
          if(order.status != null) {
              return ;//消息重復(fù),直接返回
          }

          但這樣消費的邏輯會因為引入了事務(wù)包裹而導(dǎo)致整個消息消費可能變長,并發(fā)度下降。

          當(dāng)然還有其他更高級的解決方案,例如更新訂單狀態(tài)采取樂觀鎖,更新失敗則消息重新消費之類的。但這需要針對具體業(yè)務(wù)場景做更復(fù)雜和細致的代碼開發(fā)、庫表設(shè)計,不在本文討論的范圍。

          但無論是select for update, 還是樂觀鎖這種解決方案,實際上都是基于業(yè)務(wù)表本身做去重,這無疑增加了業(yè)務(wù)開發(fā)的復(fù)雜度, 一個業(yè)務(wù)系統(tǒng)里面很大部分的請求處理都是依賴MQ的,如果每個消費邏輯本身都需要基于業(yè)務(wù)本身而做去重/冪等的開發(fā)的話,這是繁瑣的工作量。本文希望探索出一個通用的消息冪等處理的方法,從而抽象出一定的工具類用以適用各個業(yè)務(wù)場景。

          Exactly Once

          在消息中間件里,有一個投遞語義的概念,而這個語義里有一個叫”Exactly Once”,即消息肯定會被成功消費,并且只會被消費一次。以下是阿里云里對Exactly Once的解釋:

          Exactly-Once 是指發(fā)送到消息系統(tǒng)的消息只能被消費端處理且僅處理一次,即使生產(chǎn)端重試消息發(fā)送導(dǎo)致某消息重復(fù)投遞,該消息在消費端也只被消費一次。

          在我們業(yè)務(wù)消息冪等處理的領(lǐng)域內(nèi),可以認為業(yè)務(wù)消息的代碼肯定會被執(zhí)行,并且只被執(zhí)行一次,那么我們可以認為是Exactly Once。

          但這在分布式的場景下想找一個通用的方案幾乎是不可能的。不過如果是針對基于數(shù)據(jù)庫事務(wù)的消費邏輯,實際上是可行的。

          基于關(guān)系數(shù)據(jù)庫事務(wù)插入消息表

          假設(shè)我們業(yè)務(wù)的消息消費邏輯是:更新MySQL數(shù)據(jù)庫的某張訂單表的狀態(tài):

          update t_order set status = 'SUCCESS' where order_no= 'order123';

          要實現(xiàn)Exaclty Once即這個消息只被消費一次(并且肯定要保證能消費一次),我們可以這樣做:在這個數(shù)據(jù)庫中增加一個消息消費記錄表,把消息插入到這個表,并且把原來的訂單更新和這個插入的動作放到同一個事務(wù)中一起提交,就能保證消息只會被消費一遍了。

          1、開啟事務(wù)
          2、插入消息表(處理好主鍵沖突的問題)
          3、更新訂單表(原消費邏輯)
          4、提交事務(wù)

          說明:

          1、這時候如果消息消費成功并且事務(wù)提交了,那么消息表就插入成功了,這時候就算RocketMQ還沒有收到消費位點的更新再次投遞,也會插入消息失敗而視為已經(jīng)消費過,后續(xù)就直接更新消費位點了。這保證我們消費代碼只會執(zhí)行一次。2、如果事務(wù)提交之前服務(wù)掛了(例如重啟),對于本地事務(wù)并沒有執(zhí)行所以訂單沒有更新,消息表也沒插入成功;而對于RocketMQ服務(wù)端來說,消費位點也沒更新,所以消息還會繼續(xù)投遞下來,投遞下來發(fā)現(xiàn)這個消息插入消息表也是成功的,所以可以繼續(xù)消費。這保證了消息不丟失。

          事實上,阿里云ONS的EXACTLY-ONCE語義的實現(xiàn)上,就是類似這個方案基于數(shù)據(jù)庫的事務(wù)特性實現(xiàn)的。更多詳情可參考:https://help.aliyun.com/document_detail/102777.html

          基于這種方式,的確這是有能力拓展到不同的應(yīng)用場景,因為他的實現(xiàn)方案與具體業(yè)務(wù)本身無關(guān)——而是依賴一個消息表。

          但是這里有它的局限性

          1、消息的消費邏輯必須是依賴于關(guān)系型數(shù)據(jù)庫事務(wù)。如果消費的消費過程中還涉及其他數(shù)據(jù)的修改,例如Redis這種不支持事務(wù)特性的數(shù)據(jù)源,則這些數(shù)據(jù)是不可回滾的。
          2、數(shù)據(jù)庫的數(shù)據(jù)必須是在一個庫,跨庫無法解決

          注:業(yè)務(wù)上,消息表的設(shè)計不應(yīng)該以消息ID作為標(biāo)識,而應(yīng)該以業(yè)務(wù)的業(yè)務(wù)主鍵作為標(biāo)識更為合理,以應(yīng)對生產(chǎn)者的重發(fā)。阿里云上的消息去重只是RocketMQ的messageId,在生產(chǎn)者因為某些原因手動重發(fā)(例如上游針對一個交易重復(fù)請求了)的場景下起不到去重/冪等的效果(因消息id不同)。

          更復(fù)雜的業(yè)務(wù)場景

          如上所述,這種方式Exactly Once語義的實現(xiàn),實際上有很多局限性,這種局限性使得這個方案基本不具備廣泛應(yīng)用的價值。并且由于基于事務(wù),可能導(dǎo)致鎖表時間過長等性能問題。

          例如我們以一個比較常見的一個訂單申請的消息來舉例,可能有以下幾步(以下統(tǒng)稱為步驟X):

          1、 檢查庫存(RPC)
          2、 鎖庫存(RPC)
          3、 開啟事務(wù),插入訂單表(MySQL)
          4、 調(diào)用某些其他下游服務(wù)(RPC)
          5、 更新訂單狀態(tài)
          6、 commit 事務(wù)(MySQL)

          這種情況下,我們?nèi)绻扇∠⒈?本地事務(wù)的實現(xiàn)方式,消息消費過程中很多子過程是不支持回滾的,也就是說就算我們加了事務(wù),實際上這背后的操作并不是原子性的。怎么說呢,就是說有可能第一條小在經(jīng)歷了第二步鎖庫存的時候,服務(wù)重啟了,這時候?qū)嶋H上庫存是已經(jīng)在另外的服務(wù)里被鎖定了,這并不能被回滾。當(dāng)然消息還會再次投遞下來,要保證消息能至少消費一遍,換句話說,鎖庫存的這個RPC接口本身依舊要支持“冪等”。

          再者,如果在這個比較耗時的長鏈條場景下加入事務(wù)的包裹,將大大的降低系統(tǒng)的并發(fā)。所以通常情況下,我們處理這種場景的消息去重的方法還是會使用一開始說的業(yè)務(wù)自己實現(xiàn)去重邏輯的方式,如前面加select for update,或者使用樂觀鎖。

          那我們有沒有方法抽取出一個公共的解決方案,能兼顧去重、通用、高性能呢?

          拆解消息執(zhí)行過程

          其中一個思路是把上面的幾步,拆解成幾個不同的子消息,例如:

          1、庫存系統(tǒng)消費A:檢查庫存并做鎖庫存,發(fā)送消息B給訂單服務(wù)
          2、訂單系統(tǒng)消費消息B:插入訂單表(MySQL),發(fā)送消息C給自己(下游系統(tǒng))消費
          3、下游系統(tǒng)消費消息C:處理部分邏輯,發(fā)送消息D給訂單系統(tǒng)
          4、訂單系統(tǒng)消費消息D:更新訂單狀態(tài)

          注:上述步驟需要保證本地事務(wù)和消息是一個事務(wù)的(至少是最終一致性的),這其中涉及到分布式事務(wù)消息相關(guān)的話題,不在本文論述。

          可以看到這樣的處理方法會使得每一步的操作都比較原子,而原子則意味著是小事務(wù),小事務(wù)則意味著使用消息表+事務(wù)的方案顯得可行。

          然而,這太復(fù)雜了!這把一個本來連續(xù)的代碼邏輯割裂成多個系統(tǒng)多次消息交互!那還不如業(yè)務(wù)代碼層面上加鎖實現(xiàn)呢。

          更通用的解決方案

          上面消息表+本地事務(wù)的方案之所以有其局限性和并發(fā)的短板,究其根本是因為它依賴于關(guān)系型數(shù)據(jù)庫的事務(wù),且必須要把事務(wù)包裹于整個消息消費的環(huán)節(jié)。

          如果我們能不依賴事務(wù)而實現(xiàn)消息的去重,那么方案就能推廣到更復(fù)雜的場景例如:RPC、跨庫等。

          例如,我們依舊使用消息表,但是不依賴事務(wù),而是針對消息表增加消費狀態(tài),是否可以解決問題呢?

          基于消息冪等表的非事務(wù)方案

          67_1.png

          以上是去事務(wù)化后的消息冪等方案的流程,可以看到,此方案是無事務(wù)的,而是針對消息表本身做了狀態(tài)的區(qū)分:消費中、消費完成。只有消費完成的消息才會被冪等處理掉。而對于已有消費中的消息,后面重復(fù)的消息會觸發(fā)延遲消費(在RocketMQ的場景下即發(fā)送到RETRY TOPIC),之所以觸發(fā)延遲消費是為了控制并發(fā)場景下,第二條消息在第一條消息沒完成的過程中,去控制消息不丟(如果直接冪等,那么會丟失消息(同一個消息id的話),因為上一條消息如果沒有消費完成的時候,第二條消息你已經(jīng)告訴broker成功了,那么第一條消息這時候失敗broker也不會重新投遞了)

          上面的流程不再細說,后文有g(shù)ithub源碼的地址,讀者可以參考源碼的實現(xiàn),這里我們回頭看看我們一開始想解決的問題是否解決了:

          1、 消息已經(jīng)消費成功了,第二條消息將被直接冪等處理掉(消費成功)。
          2、 并發(fā)場景下的消息,依舊能滿足不會出現(xiàn)消息重復(fù),即穿透冪等擋板的問題。
          3、 支持上游業(yè)務(wù)生產(chǎn)者重發(fā)的業(yè)務(wù)重復(fù)的消息冪等問題。

          關(guān)于第一個問題已經(jīng)很明顯已經(jīng)解決了,在此就不討論了。

          關(guān)于第二個問題是如何解決的?主要是依靠插入消息表的這個動作做控制的,假設(shè)我們用MySQL作為消息表的存儲媒介(設(shè)置消息的唯一ID為主鍵),那么插入的動作只有一條消息會成功,后面的消息插入會由于主鍵沖突而失敗,走向延遲消費的分支,然后后面延遲消費的時候就會變成上面第一個場景的問題。

          關(guān)于第三個問題,只要我們設(shè)計去重的消息鍵讓其支持業(yè)務(wù)的主鍵(例如訂單號、請求流水號等),而不僅僅是messageId即可。所以也不是問題。

          此方案是否有消息丟失的風(fēng)險?

          如果細心的讀者可能會發(fā)現(xiàn)這里實際上是有邏輯漏洞的,問題出在上面聊到的個三問題中的第2個問題(并發(fā)場景),在并發(fā)場景下我們依賴于消息狀態(tài)是做并發(fā)控制使得第2條消息重復(fù)的消息會不斷延遲消費(重試)。但如果這時候第1條消息也由于一些異常原因(例如機器重啟了、外部異常導(dǎo)致消費失?。]有成功消費成功呢?也就是說這時候延遲消費實際上每次下來看到的都是消費中的狀態(tài),最后消費就會被視為消費失敗而被投遞到死信Topic中(RocketMQ默認可以重復(fù)消費16次)。

          有這種顧慮是正確的!對于此,我們解決的方法是,插入的消息表必須要帶一個最長消費過期時間,例如10分鐘,意思是如果一個消息處于消費中超過10分鐘,就需要從消息表中刪除(需要程序自行實現(xiàn))。所以最后這個消息的流程會是這樣的:

          67_2.png

          更靈活的消息表存儲媒介

          我們這個方案實際上沒有事務(wù)的,只需要一個存儲的中心媒介,那么自然我們可以選擇更靈活的存儲媒介,例如Redis。使用Redis有兩個好處:

          1、性能上損耗更低
          2、上面我們講到的超時時間可以直接利用Redis本身的ttl實現(xiàn)

          當(dāng)然Redis存儲的數(shù)據(jù)可靠性、一致性等方面是不如MySQL的,需要用戶自己取舍。

          源碼:RocketMQDedupListener

          以上方案針對RocketMQ的Java實現(xiàn)已經(jīng)開源放到Github中,具體的使用文檔可以參考https://github.com/Jaskey/RocketMQDedupListener ,

          以下僅貼一個Readme中利用Redis去重的使用樣例,用以意業(yè)務(wù)中如果使用此工具加入消息去重冪等的是多么簡單:

          //利用Redis做冪等表
          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TEST-APP1");
          consumer.subscribe("TEST-TOPIC""*");

          String appName = consumer.getConsumerGroup();// 大部分情況下可直接使用consumer group名
          StringRedisTemplate stringRedisTemplate = null;// 這里省略獲取StringRedisTemplate的過程
          DedupConfig dedupConfig = DedupConfig.enableDedupConsumeConfig(appName, stringRedisTemplate);
          DedupConcurrentListener messageListener = new SampleListener(dedupConfig);

          consumer.registerMessageListener(messageListener);
          consumer.start();


          以上代碼大部分是原始RocketMQ的必須代碼,唯一需要修改的僅僅是創(chuàng)建一個DedupConcurrentListener示例,在這個示例中指明你的消費邏輯和去重的業(yè)務(wù)鍵(默認是messageId)。

          更多使用詳情請參考Github上的說明。

          這種實現(xiàn)是否一勞永逸?

          實現(xiàn)到這里,似乎方案挺完美的,所有的消息都能快速的接入去重,且與具體業(yè)務(wù)實現(xiàn)也完全解耦。那么這樣是否就完美的完成去重的所有任務(wù)呢?

          很可惜,其實不是的。原因很簡單:因為要保證消息至少被成功消費一遍,那么消息就有機會消費到一半的時候失敗觸發(fā)消息重試的可能。還是以上面的訂單流程X:

          1、 檢查庫存(RPC)
          2、 鎖庫存(RPC)
          3、 開啟事務(wù),插入訂單表(MySQL)
          4、 調(diào)用某些其他下游服務(wù)(RPC)
          5、 更新訂單狀態(tài)
          6、 commit 事務(wù)(MySQL)

          當(dāng)消息消費到步驟3的時候,我們假設(shè)MySQL異常導(dǎo)致失敗了,觸發(fā)消息重試。因為在重試前我們會刪除冪等表的記錄,所以消息重試的時候就會重新進入消費代碼,那么步驟1和步驟2就會重新再執(zhí)行一遍。如果步驟2本身不是冪等的,那么這個業(yè)務(wù)消息消費依舊沒有做好完整的冪等處理。

          本實現(xiàn)方式的價值?

          那么既然這個并不能完整的完成消息冪等,還有什么價值呢?價值可就大了!雖然這不是解決消息冪等的銀彈(事實上,軟件工程領(lǐng)域里基本沒有銀彈),但是他能以便捷的手段解決:

          1、各種由于Broker、負載均衡等原因?qū)е碌南⒅赝哆f的重復(fù)問題

          2、各種上游生產(chǎn)者導(dǎo)致的業(yè)務(wù)級別消息重復(fù)問題

          3、重復(fù)消息并發(fā)消費的控制窗口問題,就算重復(fù),重復(fù)也不可能同一時間進入消費邏輯

          一些其他的消息去重的建議

          也就是說,使用這個方法能保證正常的消費邏輯場景下(無異常,無異常退出),消息的冪等工作全部都能解決,無論是業(yè)務(wù)重復(fù),還是rocketmq特性帶來的重復(fù)。

          事實上,這已經(jīng)能解決99%的消息重復(fù)問題了,畢竟異常的場景肯定是少數(shù)的。那么如果希望異常場景下也能處理好冪等的問題,可以做以下工作降低問題率:

          1、消息消費失敗做好回滾處理。如果消息消費失敗本身是帶回滾機制的,那么消息重試自然就沒有副作用了。
          2、消費者做好優(yōu)雅退出處理。這是為了盡可能避免消息消費到一半程序退出導(dǎo)致的消息重試。
          3、一些無法做到冪等的操作,至少要做到終止消費并告警。例如鎖庫存的操作,如果統(tǒng)一的業(yè)務(wù)流水鎖成功了一次庫存,再觸發(fā)鎖庫存,如果做不到冪等的處理,至少要做到消息消費觸發(fā)異常(例如主鍵沖突導(dǎo)致消費異常等)
          4、在#3做好的前提下,做好消息的消費監(jiān)控,發(fā)現(xiàn)消息重試不斷失敗的時候,手動做好#1的回滾,使得下次重試消費成功。

          轉(zhuǎn)自:Jaskey Lam

          鏈接:jaskey.github.io/blog/2020/06/08/rocketmq-message-dedup/

          推薦閱讀:

          世界的真實格局分析,地球人類社會底層運行原理

          不是你需要中臺,而是一名合格的架構(gòu)師(附各大廠中臺建設(shè)PPT)

          企業(yè)IT技術(shù)架構(gòu)規(guī)劃方案

          論數(shù)字化轉(zhuǎn)型——轉(zhuǎn)什么,如何轉(zhuǎn)?

          華為干部與人才發(fā)展手冊(附PPT)

          企業(yè)10大管理流程圖,數(shù)字化轉(zhuǎn)型從業(yè)者必備!

          【中臺實踐】華為大數(shù)據(jù)中臺架構(gòu)分享.pdf

          華為的數(shù)字化轉(zhuǎn)型方法論

          華為如何實施數(shù)字化轉(zhuǎn)型(附PPT)

          超詳細280頁Docker實戰(zhàn)文檔!開放下載

          華為大數(shù)據(jù)解決方案(PPT)

          瀏覽 43
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产午夜精品久久 | 五月天婷婷综合久久 | 在线的成人网站 | 青娱乐青青草视频在线观看 | 国产操操逼图片 |