<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringCloud微服務(wù)架構(gòu)中分布式事務(wù)解決方案,一次性給你說到爛

          共 13577字,需瀏覽 28分鐘

           ·

          2021-02-09 23:51

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

          76套java從入門到精通實(shí)戰(zhàn)課程分享

          說明

          Java生鮮電商平臺中由于采用了微服務(wù)架構(gòu)進(jìn)行業(yè)務(wù)的處理,買家,賣家,配送,銷售,供應(yīng)商等進(jìn)行服務(wù)化,但是不可避免存在分布式事務(wù)的問題。

          業(yè)界有很多的解決方案,對此我相信大家都百度一下子就有很多,但是我巨人大哥想說的是:微服務(wù)架構(gòu)中應(yīng)當(dāng)盡量避免分布式事務(wù)。

          下面就是來討論下,分布式事務(wù)中主要聚焦于強(qiáng)一致性和最終一致性的解決方案。

          微服務(wù)的發(fā)展

          微服務(wù)倡導(dǎo)將復(fù)雜的單體應(yīng)用拆分為若干個(gè)功能簡單、松耦合的服務(wù),這樣可以降低開發(fā)難度、增強(qiáng)擴(kuò)展性、便于敏捷開發(fā)。當(dāng)前被越來越多的開發(fā)者推崇,很多互聯(lián)網(wǎng)行業(yè)巨頭、開源社區(qū)等都開始了微服務(wù)的討論和實(shí)踐。

          微服務(wù)落地存在的問題

          雖然微服務(wù)現(xiàn)在如火如荼,但對其實(shí)踐其實(shí)仍處于探索階段。很多中小型互聯(lián)網(wǎng)公司,鑒于經(jīng)驗(yàn)、技術(shù)實(shí)力等問題,微服務(wù)落地比較困難。

          如著名架構(gòu)師Chris Richardson所言,目前存在的主要困難有如下幾方面:

          • 單體應(yīng)用拆分為分布式系統(tǒng)后,進(jìn)程間的通訊機(jī)制和故障處理措施變的更加復(fù)雜。

          • 系統(tǒng)微服務(wù)化后,一個(gè)看似簡單的功能,內(nèi)部可能需要調(diào)用多個(gè)服務(wù)并操作多個(gè)數(shù)據(jù)庫實(shí)現(xiàn),服務(wù)調(diào)用的分布式事務(wù)問題變的非常突出。

          • 微服務(wù)數(shù)量眾多,其測試、部署、監(jiān)控等都變的更加困難。

          隨著RPC框架的成熟,第一個(gè)問題已經(jīng)逐漸得到解決。例如springcloud可以非常好的支持restful調(diào)用,dubbo可以支持多種通訊協(xié)議。關(guān)注公眾號碼猿技術(shù)專欄獲取更多面試資源。

          對于第三個(gè)問題,隨著docker、devops技術(shù)的發(fā)展以及各公有云paas平臺自動化運(yùn)維工具的推出,微服務(wù)的測試、部署與運(yùn)維會變得越來越容易。

          而對于第二個(gè)問題,現(xiàn)在還沒有通用方案很好的解決微服務(wù)產(chǎn)生的事務(wù)問題。分布式事務(wù)已經(jīng)成為微服務(wù)落地最大的阻礙,也是最具挑戰(zhàn)性的一個(gè)技術(shù)難題。

          ACID

          • 原子性(Atomicity):?一個(gè)事務(wù)的所有系列操作步驟被看成是一個(gè)動作,所有的步驟要么全部完成要么一個(gè)也不會完成,如果事務(wù)過程中任何一點(diǎn)失敗,將要被改變的數(shù)據(jù)庫記錄就不會被真正被改變。

          • 一致性(Consistency):?數(shù)據(jù)庫的約束 級聯(lián)和觸發(fā)機(jī)制Trigger都必須滿足事務(wù)的一致性。也就是說,通過各種途徑包括外鍵約束等任何寫入數(shù)據(jù)庫的數(shù)據(jù)都是有效的,不能發(fā)生表與表之間存在外鍵約束,但是有數(shù)據(jù)卻違背這種約束性。所有改變數(shù)據(jù)庫數(shù)據(jù)的動作事務(wù)必須完成,沒有事務(wù)會創(chuàng)建一個(gè)無效數(shù)據(jù)狀態(tài),這是不同于CAP理論的一致性"consistency".

          • 隔離性(Isolation):?主要用于實(shí)現(xiàn)并發(fā)控制, 隔離能夠確保并發(fā)執(zhí)行的事務(wù)能夠順序一個(gè)接一個(gè)執(zhí)行,通過隔離,一個(gè)未完成事務(wù)不會影響另外一個(gè)未完成事務(wù)。

          • 持久性(Durability):?一旦一個(gè)事務(wù)被提交,它應(yīng)該持久保存,不會因?yàn)楹推渌僮鳑_突而取消這個(gè)事務(wù)。很多人認(rèn)為這意味著事務(wù)是持久在磁盤上,但是規(guī)范沒有特別定義這點(diǎn)。

          一致性理論

          分布式事務(wù)的目的是保障分庫數(shù)據(jù)一致性,而跨庫事務(wù)會遇到各種不可控制的問題,如個(gè)別節(jié)點(diǎn)永久性宕機(jī),像單機(jī)事務(wù)一樣的 ACID 是無法奢望的。

          另外,業(yè)界著名的 CAP 理論也告訴我們,對分布式系統(tǒng),需要將數(shù)據(jù)一致性和系統(tǒng)可用性、分區(qū)容忍性放在天平上一起考慮。

          兩階段提交協(xié)議(簡稱2PC)是實(shí)現(xiàn)分布式事務(wù)較為經(jīng)典的方案,但 2PC 的可擴(kuò)展性很差,在分布式架構(gòu)下應(yīng)用代價(jià)較大,eBay 架構(gòu)師 Dan Pritchett 提出了 BASE 理論,用于解決大規(guī)模分布式系統(tǒng)下的數(shù)據(jù)一致性問題。關(guān)注公眾號碼猿技術(shù)專欄獲取更多面試資源。

          BASE 理論告訴我們:可以通過放棄系統(tǒng)在每個(gè)時(shí)刻的強(qiáng)一致性來換取系統(tǒng)的可擴(kuò)展性。

          CAP 理論

          在分布式系統(tǒng)中,一致性(Consistency)、可用性(Availability)和分區(qū)容忍性(Partition Tolerance)3 個(gè)要素最多只能同時(shí)滿足兩個(gè),不可兼得。其中,分區(qū)容忍性又是不可或缺的。

          • 一致性:分布式環(huán)境下,多個(gè)節(jié)點(diǎn)的數(shù)據(jù)是否強(qiáng)一致。

          • 可用性:分布式服務(wù)能一直保證可用狀態(tài)。當(dāng)用戶發(fā)出一個(gè)請求后,服務(wù)能在有限時(shí)間內(nèi)返回結(jié)果。

          • 分區(qū)容忍性:特指對網(wǎng)絡(luò)分區(qū)的容忍性。

          舉例:Cassandra、Dynamo 等,默認(rèn)優(yōu)先選擇 AP,弱化 C;HBase、MongoDB 等,默認(rèn)優(yōu)先選擇 CP,弱化 A。

          BASE 理論

          核心思想:

          • 基本可用(?Basically?Available):指分布式系統(tǒng)在出現(xiàn)故障時(shí),允許損失部分的可用性來保證核心可用;

          • 軟狀態(tài)(?Soft state):指允許分布式系統(tǒng)存在中間狀態(tài),該中間狀態(tài)不會影響到系統(tǒng)的整體可用性;

          • 最終一致性(?Eventual consistency):指分布式系統(tǒng)中的所有副本數(shù)據(jù)經(jīng)過一定時(shí)間后,最終能夠達(dá)到一致的狀態(tài);

          • 原子性(A)與持久性(D)必須根本保障;

          • 為了可用性、性能與降級服務(wù)的需要,只有降低一致性( C ) 與 隔離性( I ) 的要求;

          • 酸堿平衡(ACID-BASE Balance);

          BASE 是對 CAP 中 AP 的一個(gè)擴(kuò)展

          一致性模型

          數(shù)據(jù)的一致性模型可以分成以下三類:

          • 強(qiáng)一致性:數(shù)據(jù)更新成功后,任意時(shí)刻所有副本中的數(shù)據(jù)都是一致的,一般采用同步的方式實(shí)現(xiàn)。

          • 弱一致性:數(shù)據(jù)更新成功后,系統(tǒng)不承諾立即可以讀到最新寫入的值,也不承諾具體多久之后可以讀到。

          • 最終一致性:弱一致性的一種形式,數(shù)據(jù)更新成功后,系統(tǒng)不承諾立即可以返回最新寫入的值,但是保證最終會返回上一次更新操作的值。

          分布式系統(tǒng)數(shù)據(jù)的強(qiáng)一致性、弱一致性和最終一致性可以通過 Quorum NRW 算法分析。

          本地事務(wù)

          • 在單個(gè)數(shù)據(jù)庫的本地并且限制在單個(gè)進(jìn)程內(nèi)的事務(wù)

          • 本地事務(wù)不涉及多個(gè)數(shù)據(jù)來源

          分布式事務(wù)典型方案

          • 兩階段提交(2PC, Two Phase Commit)方案;

          • 本地消息表 (eBay 事件隊(duì)列方案);

          • TCC 補(bǔ)償模式;

          分類:

          • 階段型

          • 補(bǔ)償型

          • 異步確保型

          • 最大努力通知型

          服務(wù)模式:

          • 可查詢操作

          • 冪等操作

          • TCC操作

          • 可補(bǔ)償操作

          兩階段提交2PC(強(qiáng)一致性)

          基于XA協(xié)議的兩階段提交:

          • 第一階段是表決階段,所有參與者都將本事務(wù)能否成功的信息反饋發(fā)給協(xié)調(diào)者;

          • 第二階段是執(zhí)行階段,協(xié)調(diào)者根據(jù)所有參與者的反饋,通知所有參與者,步調(diào)一致地在所有分支上提交或者回滾;

          缺點(diǎn):

          • 單點(diǎn)問題:事務(wù)管理器在整個(gè)流程中扮演的角色很關(guān)鍵,如果其宕機(jī),比如在第一階段已經(jīng)完成,在第二階段正準(zhǔn)備提交的時(shí)候事務(wù)管理器宕機(jī),資源管理器就會一直阻塞,導(dǎo)致數(shù)據(jù)庫無法使用。

          • 同步阻塞:在準(zhǔn)備就緒之后,資源管理器中的資源一直處于阻塞,直到提交完成,釋放資源。

          • 數(shù)據(jù)不一致:兩階段提交協(xié)議雖然為分布式數(shù)據(jù)強(qiáng)一致性所設(shè)計(jì),但仍然存在數(shù)據(jù)不一致性的可能。比如:在第二階段中,假設(shè)協(xié)調(diào)者發(fā)出了事務(wù) Commit 的通知,但是因?yàn)榫W(wǎng)絡(luò)問題該通知僅被一部分參與者所收到并執(zhí)行了 Commit 操作,其余的參與者則因?yàn)闆]有收到通知一直處于阻塞狀態(tài),這時(shí)候就產(chǎn)生了數(shù)據(jù)的不一致性。

          總的來說,XA 協(xié)議比較簡單,成本較低,但是其單點(diǎn)問題,以及不能支持高并發(fā)(由于同步阻塞)依然是其最大的弱點(diǎn)。

          本地消息表(最終一致性)

          eBay 的架構(gòu)師 Dan Pritchett,曾在一篇解釋 BASE 原理的論文《Base:An Acid Alternative》中提到一個(gè) eBay 分布式系統(tǒng)一致性問題的解決方案。

          它的核心思想是將需要分布式處理的任務(wù)通過消息或者日志的方式來異步執(zhí)行,消息或日志可以存到本地文件、數(shù)據(jù)庫或消息隊(duì)列,再通過業(yè)務(wù)規(guī)則進(jìn)行失敗重試,它要求各服務(wù)的接口是冪等的。

          本地消息表與業(yè)務(wù)數(shù)據(jù)表處于同一個(gè)數(shù)據(jù)庫中,這樣就能利用本地事務(wù)來保證在對這兩個(gè)表的操作滿足事務(wù)特性,并且使用了消息隊(duì)列來保證最終一致性。

          • 在分布式事務(wù)操作的一方完成寫業(yè)務(wù)數(shù)據(jù)的操作之后向本地消息表發(fā)送一個(gè)消息,本地事務(wù)能保證這個(gè)消息一定會被寫入本地消息表中;

          • 之后將本地消息表中的消息轉(zhuǎn)發(fā)到 Kafka 等消息隊(duì)列中,如果轉(zhuǎn)發(fā)成功則將消息從本地消息表中刪除,否則繼續(xù)重新轉(zhuǎn)發(fā);

          • 消息消費(fèi)方處理這個(gè)消息,并完成自己的業(yè)務(wù)邏輯。此時(shí)如果本地事務(wù)處理成功,表明已經(jīng)處理成功了,如果處理失敗,那么就會重試執(zhí)行。如果是業(yè)務(wù)上面的失敗,可以給生產(chǎn)方發(fā)送一個(gè)業(yè)務(wù)補(bǔ)償消息,通知生產(chǎn)方進(jìn)行回滾等操作;

          優(yōu)點(diǎn):?一種非常經(jīng)典的實(shí)現(xiàn),避免了分布式事務(wù),實(shí)現(xiàn)了最終一致性。

          缺點(diǎn):?消息表會耦合到業(yè)務(wù)系統(tǒng)中,如果沒有封裝好的解決方案,會有很多雜活需要處理。

          這個(gè)方案的核心在于第二階段的重試和冪等執(zhí)行。失敗后重試,這是一種補(bǔ)償機(jī)制,它是能保證系統(tǒng)最終一致的關(guān)鍵流程。

          可靠消息的最終一致性代碼示例

          表結(jié)構(gòu)

          DROP?TABLE?IF?EXISTS?`rp_transaction_message`;
          ?
          CREATE?TABLE?`rp_transaction_message`?(
          ????`id`?VARCHAR?(50)?NOT?NULL?DEFAULT?''?COMMENT?'主鍵ID',
          ????`version`?INT?(11)?NOT?NULL?DEFAULT?'0'?COMMENT?'版本號',
          ????`editor`?VARCHAR?(100)?DEFAULT?NULL?COMMENT?'修改者',
          ????`creater`?VARCHAR?(100)?DEFAULT?NULL?COMMENT?'創(chuàng)建者',
          ????`edit_time`?datetime?DEFAULT?NULL?COMMENT?'最后修改時(shí)間',
          ????`create_time`?datetime?NOT?NULL?DEFAULT?'0000-00-00?00:00:00'?COMMENT?'創(chuàng)建時(shí)間',
          ????`message_id`?VARCHAR?(50)?NOT?NULL?DEFAULT?''?COMMENT?'消息ID',
          ????`message_body`?LONGTEXT?NOT?NULL?COMMENT?'消息內(nèi)容',
          ????`message_data_type`?VARCHAR?(50)?DEFAULT?NULL?COMMENT?'消息數(shù)據(jù)類型',
          ????`consumer_queue`?VARCHAR?(100)?NOT?NULL?DEFAULT?''?COMMENT?'消費(fèi)隊(duì)列',
          ????`message_send_times`?SMALLINT?(6)?NOT?NULL?DEFAULT?'0'?COMMENT?'消息重發(fā)次數(shù)',
          ????`areadly_dead`?VARCHAR?(20)?NOT?NULL?DEFAULT?''?COMMENT?'是否死亡',
          ????`status`?VARCHAR?(20)?NOT?NULL?DEFAULT?''?COMMENT?'狀態(tài)',
          ????`remark`?VARCHAR?(200)?DEFAULT?NULL?COMMENT?'備注',
          ????`field1`?VARCHAR?(200)?DEFAULT?NULL?COMMENT?'擴(kuò)展字段1',
          ????`field2`?VARCHAR?(200)?DEFAULT?NULL?COMMENT?'擴(kuò)展字段2',
          ????`field3`?VARCHAR?(200)?DEFAULT?NULL?COMMENT?'擴(kuò)展字段3',
          ????PRIMARY?KEY?(`id`),
          ????KEY?`AK_Key_2`?(`message_id`)
          )?ENGINE?=?INNODB?DEFAULT?CHARSET?=?utf8;
          ?
          public?interface?RpTransactionMessageService?{
          ?
          ????/**
          ?????*?預(yù)存儲消息.
          ?????*/
          ????public?int?saveMessageWaitingConfirm(RpTransactionMessage?rpTransactionMessage)?throws?MessageBizException;
          ?
          ????/**
          ?????*?確認(rèn)并發(fā)送消息.
          ?????*/
          ????public?void?confirmAndSendMessage(String?messageId)?throws?MessageBizException;
          ?
          ????/**
          ?????*?存儲并發(fā)送消息.
          ?????*/
          ????public?int?saveAndSendMessage(RpTransactionMessage?rpTransactionMessage)?throws?MessageBizException;
          ?
          ????/**
          ?????*?直接發(fā)送消息.
          ?????*/
          ????public?void?directSendMessage(RpTransactionMessage?rpTransactionMessage)?throws?MessageBizException;
          ?
          ????/**
          ?????*?重發(fā)消息.
          ?????*/
          ????public?void?reSendMessage(RpTransactionMessage?rpTransactionMessage)?throws?MessageBizException;
          ?
          ????/**
          ?????*?根據(jù)messageId重發(fā)某條消息.
          ?????*/
          ????public?void?reSendMessageByMessageId(String?messageId)?throws?MessageBizException;
          ?
          ????/**
          ?????*?將消息標(biāo)記為死亡消息.
          ?????*/
          ????public?void?setMessageToAreadlyDead(String?messageId)?throws?MessageBizException;
          ?
          ????/**
          ?????*?根據(jù)消息ID獲取消息
          ?????*/
          ????public?RpTransactionMessage?getMessageByMessageId(String?messageId)?throws?MessageBizException;
          ?
          ????/**
          ?????*?根據(jù)消息ID刪除消息
          ?????*/
          ????public?void?deleteMessageByMessageId(String?messageId)?throws?MessageBizException;
          ?
          ????/**
          ?????*?重發(fā)某個(gè)消息隊(duì)列中的全部已死亡的消息.
          ?????*/
          ????public?void?reSendAllDeadMessageByQueueName(String?queueName,?int?batchSize)?throws?MessageBizException;
          ?
          ????/**
          ?????*?獲取分頁數(shù)據(jù)
          ?????*/
          ????PageBean?listPage(PageParam?pageParam,?Map?paramMap)?throws?MessageBizException;
          ?
          }
          @Service("rpTransactionMessageService")
          public?class?RpTransactionMessageServiceImpl?implements?RpTransactionMessageService?{
          ?
          ????private?static?final?Log?log?=?LogFactory.getLog(RpTransactionMessageServiceImpl.class);
          ?
          ????@Autowired
          ????private?RpTransactionMessageDao?rpTransactionMessageDao;
          ?
          ????@Autowired
          ????private?JmsTemplate?notifyJmsTemplate;
          ?
          ????public?int?saveMessageWaitingConfirm(RpTransactionMessage?message)?{
          ????????if?(message?==?null)?{
          ????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"保存的消息為空");
          ????????}
          ????????if?(StringUtil.isEmpty(message.getConsumerQueue()))?{
          ????????????throw?new?MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL,?"消息的消費(fèi)隊(duì)列不能為空?");
          ????????}
          ????????message.setEditTime(new?Date());
          ????????message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());
          ????????message.setAreadlyDead(PublicEnum.NO.name());
          ????????message.setMessageSendTimes(0);
          ????????return?rpTransactionMessageDao.insert(message);
          ????}
          ?
          ????public?void?confirmAndSendMessage(String?messageId)?{
          ????????final?RpTransactionMessage?message?=?getMessageByMessageId(messageId);
          ????????if?(message?==?null)?{
          ????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"根據(jù)消息id查找的消息為空");
          ????????}
          ????????message.setStatus(MessageStatusEnum.SENDING.name());
          ????????message.setEditTime(new?Date());
          ????????rpTransactionMessageDao.update(message);
          ????????notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
          ????????notifyJmsTemplate.send(new?MessageCreator()?{
          ????????????public?Message?createMessage(Session?session)?throws?JMSException?{
          ????????????????return?session.createTextMessage(message.getMessageBody());
          ????????????}
          ????????});
          ????}
          ?
          ????public?int?saveAndSendMessage(final?RpTransactionMessage?message)?{
          ????????if?(message?==?null)?{
          ????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"保存的消息為空");
          ????????}
          ????????if?(StringUtil.isEmpty(message.getConsumerQueue()))?{
          ????????????throw?new?MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL,?"消息的消費(fèi)隊(duì)列不能為空?");
          ????????}
          ????????message.setStatus(MessageStatusEnum.SENDING.name());
          ????????message.setAreadlyDead(PublicEnum.NO.name());
          ????????message.setMessageSendTimes(0);
          ????????message.setEditTime(new?Date());
          ????????int?result?=?rpTransactionMessageDao.insert(message);
          ????????notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
          ????????notifyJmsTemplate.send(new?MessageCreator()?{
          ????????????public?Message?createMessage(Session?session)?throws?JMSException?{
          ????????????????return?session.createTextMessage(message.getMessageBody());
          ????????????}
          ????????});
          ????????return?result;
          ????}
          ?
          ????public?void?directSendMessage(final?RpTransactionMessage?message)?{
          ????????if?(message?==?null)?{
          ????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"保存的消息為空");
          ????????}
          ????????if?(StringUtil.isEmpty(message.getConsumerQueue()))?{
          ????????????throw?new?MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL,?"消息的消費(fèi)隊(duì)列不能為空?");
          ????????}
          ????????notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
          ????????notifyJmsTemplate.send(new?MessageCreator()?{
          ????????????public?Message?createMessage(Session?session)?throws?JMSException?{
          ????????????????return?session.createTextMessage(message.getMessageBody());
          ????????????}
          ????????});
          ????}
          ?
          ????public?void?reSendMessage(final?RpTransactionMessage?message)?{
          ????????if?(message?==?null)?{
          ????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"保存的消息為空");
          ????????}
          ????????if?(StringUtil.isEmpty(message.getConsumerQueue()))?{
          ????????????throw?new?MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL,?"消息的消費(fèi)隊(duì)列不能為空?");
          ????????}
          ????????message.addSendTimes();
          ????????message.setEditTime(new?Date());
          ????????rpTransactionMessageDao.update(message);
          ????????notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
          ????????notifyJmsTemplate.send(new?MessageCreator()?{
          ????????????public?Message?createMessage(Session?session)?throws?JMSException?{
          ????????????????return?session.createTextMessage(message.getMessageBody());
          ????????????}
          ????????});
          ????}
          ?
          ????public?void?reSendMessageByMessageId(String?messageId)?{
          ????????final?RpTransactionMessage?message?=?getMessageByMessageId(messageId);
          ????????if?(message?==?null)?{
          ????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"根據(jù)消息id查找的消息為空");
          ????????}
          ????????int?maxTimes?=?Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
          ????????if?(message.getMessageSendTimes()?>=?maxTimes)?{
          ????????????message.setAreadlyDead(PublicEnum.YES.name());
          ????????}
          ????????message.setEditTime(new?Date());
          ????????message.setMessageSendTimes(message.getMessageSendTimes()?+?1);
          ????????rpTransactionMessageDao.update(message);
          ????????notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
          ????????notifyJmsTemplate.send(new?MessageCreator()?{
          ????????????public?Message?createMessage(Session?session)?throws?JMSException?{
          ????????????????return?session.createTextMessage(message.getMessageBody());
          ????????????}
          ????????});
          ????}
          ?
          ????public?void?setMessageToAreadlyDead(String?messageId)?{
          ????????RpTransactionMessage?message?=?getMessageByMessageId(messageId);
          ????????if?(message?==?null)?{
          ????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"根據(jù)消息id查找的消息為空");
          ????????}
          ????????message.setAreadlyDead(PublicEnum.YES.name());
          ????????message.setEditTime(new?Date());
          ????????rpTransactionMessageDao.update(message);
          ????}
          ?
          ????public?RpTransactionMessage?getMessageByMessageId(String?messageId)?{
          ????????Map?paramMap?=?new?HashMap();
          ????????paramMap.put("messageId",?messageId);
          ????????return?rpTransactionMessageDao.getBy(paramMap);
          ????}
          ?
          ????public?void?deleteMessageByMessageId(String?messageId)?{
          ????????Map?paramMap?=?new?HashMap();
          ????????paramMap.put("messageId",?messageId);
          ????????rpTransactionMessageDao.delete(paramMap);
          ????}
          ?
          ????@SuppressWarnings("unchecked")
          ????public?void?reSendAllDeadMessageByQueueName(String?queueName,?int?batchSize)?{
          ????????log.info("==>reSendAllDeadMessageByQueueName");
          ????????int?numPerPage?=?1000;
          ????????if?(batchSize?>?0?&&?batchSize?????????????numPerPage?=?100;
          ????????}?else?if?(batchSize?>?100?&&?batchSize?????????????numPerPage?=?batchSize;
          ????????}?else?if?(batchSize?>?5000)?{
          ????????????numPerPage?=?5000;
          ????????}?else?{
          ????????????numPerPage?=?1000;
          ????????}
          ????????int?pageNum?=?1;
          ????????Map?paramMap?=?new?HashMap();
          ????????paramMap.put("consumerQueue",?queueName);
          ????????paramMap.put("areadlyDead",?PublicEnum.YES.name());
          ????????paramMap.put("listPageSortType",?"ASC");
          ????????Map?messageMap?=?new?HashMap();
          ????????List?recordList?=?new?ArrayList();
          ????????int?pageCount?=?1;
          ????????PageBean?pageBean?=?rpTransactionMessageDao.listPage(new?PageParam(pageNum,?numPerPage),?paramMap);
          ????????recordList?=?pageBean.getRecordList();
          ????????if?(recordList?==?null?||?recordList.isEmpty())?{
          ????????????log.info("==>recordList?is?empty");
          ????????????return;
          ????????}
          ????????pageCount?=?pageBean.getTotalPage();
          ????????for?(final?Object?obj?:?recordList)?{
          ????????????final?RpTransactionMessage?message?=?(RpTransactionMessage)?obj;
          ????????????messageMap.put(message.getMessageId(),?message);
          ????????}
          ????????for?(pageNum?=?2;?pageNum?<=?pageCount;?pageNum++)?{
          ????????????pageBean?=?rpTransactionMessageDao.listPage(new?PageParam(pageNum,?numPerPage),?paramMap);
          ????????????recordList?=?pageBean.getRecordList();
          ????????????if?(recordList?==?null?||?recordList.isEmpty())?{
          ????????????????break;
          ????????????}
          ????????????for?(final?Object?obj?:?recordList)?{
          ????????????????final?RpTransactionMessage?message?=?(RpTransactionMessage)?obj;
          ????????????????messageMap.put(message.getMessageId(),?message);
          ????????????}
          ????????}
          ????????recordList?=?null;
          ????????pageBean?=?null;
          ????????for?(Map.Entry?entry?:?messageMap.entrySet())?{
          ????????????final?RpTransactionMessage?message?=?entry.getValue();
          ????????????message.setEditTime(new?Date());
          ????????????message.setMessageSendTimes(message.getMessageSendTimes()?+?1);
          ????????????rpTransactionMessageDao.update(message);
          ????????????notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
          ????????????notifyJmsTemplate.send(new?MessageCreator()?{
          ????????????????public?Message?createMessage(Session?session)?throws?JMSException?{
          ????????????????????return?session.createTextMessage(message.getMessageBody());
          ????????????????}
          ????????????});
          ????????}
          ????}
          ?
          ????@SuppressWarnings("unchecked")
          ????public?PageBean?listPage(PageParam?pageParam,?Map?paramMap)?{
          ????????return?rpTransactionMessageDao.listPage(pageParam,?paramMap);
          ????}
          ?
          }
          @Component("messageBiz")
          public?class?MessageBiz?{
          ?
          ????private?static?final?Log?log?=?LogFactory.getLog(MessageBiz.class);
          ?
          ????@Autowired
          ????private?RpTradePaymentQueryService?rpTradePaymentQueryService;
          ?
          ????@Autowired
          ????private?RpTransactionMessageService?rpTransactionMessageService;
          ?
          ????/**
          ?????*?處理[waiting_confirm]狀態(tài)的消息
          ?????*?@param?messages
          ?????*/
          ????public?void?handleWaitingConfirmTimeOutMessages(Map?messageMap)?{
          ????????log.debug("開始處理[waiting_confirm]狀態(tài)的消息,總條數(shù)["?+?messageMap.size()?+?"]");
          ????????//?單條消息處理(目前該狀態(tài)的消息,消費(fèi)隊(duì)列全部是accounting,如果后期有業(yè)務(wù)擴(kuò)充,需做隊(duì)列判斷,做對應(yīng)的業(yè)務(wù)處理。)
          ????????for?(Map.Entry?entry?:?messageMap.entrySet())?{
          ????????????RpTransactionMessage?message?=?entry.getValue();
          ????????????try?{
          ????????????????log.debug("開始處理[waiting_confirm]消息ID為["?+?message.getMessageId()?+?"]的消息");
          ????????????????String?bankOrderNo?=?message.getField1();
          ????????????????RpTradePaymentRecord?record?=?rpTradePaymentQueryService.getRecordByBankOrderNo(bankOrderNo);
          ????????????????//?如果訂單成功,把消息改為待處理,并發(fā)送消息
          ????????????????if?(TradeStatusEnum.SUCCESS.name().equals(record.getStatus()))?{
          ????????????????????//?確認(rèn)并發(fā)送消息
          ????????????????????rpTransactionMessageService.confirmAndSendMessage(message.getMessageId());
          ????????????????}?else?if?(TradeStatusEnum.WAITING_PAYMENT.name().equals(record.getStatus()))?{
          ????????????????????//?訂單狀態(tài)是等到支付,可以直接刪除數(shù)據(jù)
          ????????????????????log.debug("訂單沒有支付成功,刪除[waiting_confirm]消息id["?+?message.getMessageId()?+?"]的消息");
          ????????????????????rpTransactionMessageService.deleteMessageByMessageId(message.getMessageId());
          ????????????????}
          ????????????????log.debug("結(jié)束處理[waiting_confirm]消息ID為["?+?message.getMessageId()?+?"]的消息");
          ????????????}?catch?(Exception?e)?{
          ????????????????log.error("處理[waiting_confirm]消息ID為["?+?message.getMessageId()?+?"]的消息異常:",?e);
          ????????????}
          ????????}
          ????}
          ?
          ????/**
          ?????*?處理[SENDING]狀態(tài)的消息
          ?????*?@param?messages
          ?????*/
          ????public?void?handleSendingTimeOutMessage(Map?messageMap)?{
          ????????SimpleDateFormat?sdf?=?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss");
          ????????log.debug("開始處理[SENDING]狀態(tài)的消息,總條數(shù)["?+?messageMap.size()?+?"]");
          ????????//?根據(jù)配置獲取通知間隔時(shí)間
          ????????Map?notifyParam?=?getSendTime();
          ????????//?單條消息處理
          ????????for?(Map.Entry?entry?:?messageMap.entrySet())?{
          ????????????RpTransactionMessage?message?=?entry.getValue();
          ????????????try?{
          ????????????????log.debug("開始處理[SENDING]消息ID為["?+?message.getMessageId()?+?"]的消息");
          ????????????????//?判斷發(fā)送次數(shù)
          ????????????????int?maxTimes?=?Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
          ????????????????log.debug("[SENDING]消息ID為["?+?message.getMessageId()?+?"]的消息,已經(jīng)重新發(fā)送的次數(shù)["
          ????????????????????????+?message.getMessageSendTimes()?+?"]");
          ????????????????//?如果超過最大發(fā)送次數(shù)直接退出
          ????????????????if?(maxTimes?????????????????????//?標(biāo)記為死亡
          ????????????????????rpTransactionMessageService.setMessageToAreadlyDead(message.getMessageId());
          ????????????????????continue;
          ????????????????}
          ????????????????//?判斷是否達(dá)到發(fā)送消息的時(shí)間間隔條件
          ????????????????int?reSendTimes?=?message.getMessageSendTimes();
          ????????????????int?times?=?notifyParam.get(reSendTimes?==?0???1?:?reSendTimes);
          ????????????????long?currentTimeInMillis?=?Calendar.getInstance().getTimeInMillis();
          ????????????????long?needTime?=?currentTimeInMillis?-?times?*?60?*?1000;
          ????????????????long?hasTime?=?message.getEditTime().getTime();
          ????????????????//?判斷是否達(dá)到了可以再次發(fā)送的時(shí)間條件
          ????????????????if?(hasTime?>?needTime)?{
          ????????????????????log.debug("currentTime["?+?sdf.format(new?Date())?+?"],[SENDING]消息上次發(fā)送時(shí)間["
          ????????????????????????????+?sdf.format(message.getEditTime())?+?"],必須過了["?+?times?+?"]分鐘才可以再發(fā)送。");
          ????????????????????continue;
          ????????????????}
          ????????????????//?重新發(fā)送消息
          ????????????????rpTransactionMessageService.reSendMessage(message);
          ????????????????log.debug("結(jié)束處理[SENDING]消息ID為["?+?message.getMessageId()?+?"]的消息");
          ????????????}?catch?(Exception?e)?{
          ????????????????log.error("處理[SENDING]消息ID為["?+?message.getMessageId()?+?"]的消息異常:",?e);
          ????????????}
          ????????}
          ????}
          ?
          ????/**
          ?????*?根據(jù)配置獲取通知間隔時(shí)間
          ?????*?@return
          ?????*/
          ????private?Map?getSendTime()?{
          ????????Map?notifyParam?=?new?HashMap();
          ????????notifyParam.put(1,?Integer.valueOf(PublicConfigUtil.readConfig("message.send.1.time")));
          ????????notifyParam.put(2,?Integer.valueOf(PublicConfigUtil.readConfig("message.send.2.time")));
          ????????notifyParam.put(3,?Integer.valueOf(PublicConfigUtil.readConfig("message.send.3.time")));
          ????????notifyParam.put(4,?Integer.valueOf(PublicConfigUtil.readConfig("message.send.4.time")));
          ????????notifyParam.put(5,?Integer.valueOf(PublicConfigUtil.readConfig("message.send.5.time")));
          ????????return?notifyParam;
          ????}
          ?
          }
          public?class?AccountingMessageListener?implements?SessionAwareMessageListener?{
          ?
          ????private?static?final?Log?LOG?=?LogFactory.getLog(AccountingMessageListener.class);
          ?
          ????/**
          ?????*?會計(jì)隊(duì)列模板(由Spring創(chuàng)建并注入進(jìn)來)
          ?????*/
          ????@Autowired
          ????private?JmsTemplate?notifyJmsTemplate;
          ?
          ????@Autowired
          ????private?RpAccountingVoucherService?rpAccountingVoucherService;
          ?
          ????@Autowired
          ????private?RpTransactionMessageService?rpTransactionMessageService;
          ?
          ????public?synchronized?void?onMessage(Message?message,?Session?session)?{
          ????????RpAccountingVoucher?param?=?null;
          ????????String?strMessage?=?null;
          ????????try?{
          ????????????ActiveMQTextMessage?objectMessage?=?(ActiveMQTextMessage)?message;
          ????????????strMessage?=?objectMessage.getText();
          ????????????LOG.info("strMessage1?accounting:"?+?strMessage);
          ????????????param?=?JSONObject.parseObject(strMessage,?RpAccountingVoucher.class);
          ????????????//?這里轉(zhuǎn)換成相應(yīng)的對象還有問題
          ????????????if?(param?==?null)?{
          ????????????????LOG.info("param參數(shù)為空");
          ????????????????return;
          ????????????}
          ????????????int?entryType?=?param.getEntryType();
          ????????????double?payerChangeAmount?=?param.getPayerChangeAmount();
          ????????????String?voucherNo?=?param.getVoucherNo();
          ????????????String?payerAccountNo?=?param.getPayerAccountNo();
          ????????????int?fromSystem?=?param.getFromSystem();
          ????????????int?payerAccountType?=?0;
          ????????????if?(param.getPayerAccountType()?!=?null?&&?!param.getPayerAccountType().equals(""))?{
          ????????????????payerAccountType?=?param.getPayerAccountType();
          ????????????}
          ????????????double?payerFee?=?param.getPayerFee();
          ????????????String?requestNo?=?param.getRequestNo();
          ????????????double?bankChangeAmount?=?param.getBankChangeAmount();
          ????????????double?receiverChangeAmount?=?param.getReceiverChangeAmount();
          ????????????String?receiverAccountNo?=?param.getReceiverAccountNo();
          ????????????String?bankAccount?=?param.getBankAccount();
          ????????????String?bankChannelCode?=?param.getBankChannelCode();
          ????????????double?profit?=?param.getProfit();
          ????????????double?income?=?param.getIncome();
          ????????????double?cost?=?param.getCost();
          ????????????String?bankOrderNo?=?param.getBankOrderNo();
          ????????????int?receiverAccountType?=?0;
          ????????????double?payAmount?=?param.getPayAmount();
          ????????????if?(param.getReceiverAccountType()?!=?null?&&?!param.getReceiverAccountType().equals(""))?{
          ????????????????receiverAccountType?=?param.getReceiverAccountType();
          ????????????}
          ????????????double?receiverFee?=?param.getReceiverFee();
          ????????????String?remark?=?param.getRemark();
          ????????????rpAccountingVoucherService.createAccountingVoucher(entryType,?voucherNo,?payerAccountNo,?receiverAccountNo,
          ????????????????????payerChangeAmount,?receiverChangeAmount,?income,?cost,?profit,?bankChangeAmount,?requestNo,
          ????????????????????bankChannelCode,?bankAccount,?fromSystem,?remark,?bankOrderNo,?payerAccountType,?payAmount,
          ????????????????????receiverAccountType,?payerFee,?receiverFee);
          ????????????//刪除消息
          ????????????rpTransactionMessageService.deleteMessageByMessageId(param.getMessageId());
          ????????}?catch?(BizException?e)?{
          ????????????//?業(yè)務(wù)異常,不再寫會隊(duì)列
          ????????????LOG.error("==>BizException",?e);
          ????????}?catch?(Exception?e)?{
          ????????????//?不明異常不再寫會隊(duì)列
          ????????????LOG.error("==>Exception",?e);
          ????????}
          ????}
          ?
          ????public?JmsTemplate?getNotifyJmsTemplate()?{
          ????????return?notifyJmsTemplate;
          ????}
          ?
          ????public?void?setNotifyJmsTemplate(JmsTemplate?notifyJmsTemplate)?{
          ????????this.notifyJmsTemplate?=?notifyJmsTemplate;
          ????}
          ?
          ????public?RpAccountingVoucherService?getRpAccountingVoucherService()?{
          ????????return?rpAccountingVoucherService;
          ????}
          ?
          ????public?void?setRpAccountingVoucherService(RpAccountingVoucherService?rpAccountingVoucherService)?{
          ????????this.rpAccountingVoucherService?=?rpAccountingVoucherService;
          ????}
          ?
          }
          ?

          與常規(guī)MQ的ACK機(jī)制對比

          常規(guī)MQ確認(rèn)機(jī)制:

          • Producer生成消息并發(fā)送給MQ(同步、異步);

          • MQ接收消息并將消息數(shù)據(jù)持久化到消息存儲(持久化操作為可選配置);

          • MQ向Producer返回消息的接收結(jié)果(返回值、異常);

          • Consumer監(jiān)聽并消費(fèi)MQ中的消息;

          • Consumer獲取到消息后執(zhí)行業(yè)務(wù)處理;

          • Consumer對已成功消費(fèi)的消息向MQ進(jìn)行ACK確認(rèn)(確認(rèn)后的消息將從MQ中刪除);

          常規(guī)MQ隊(duì)列消息的處理流程無法實(shí)現(xiàn)?消息發(fā)送一致性, 因此直接使用現(xiàn)成的MQ中間件產(chǎn)品無法實(shí)現(xiàn)可靠消息最終一致性的分布式事務(wù)解決方案

          消息發(fā)送一致性:是指產(chǎn)生消息的業(yè)務(wù)動作與消息發(fā)送的一致。也就是說,如果業(yè)務(wù)操作成功,那么由這個(gè)業(yè)務(wù)操作所產(chǎn)生的消息一定要成功投遞出去(一般是發(fā)送到kafka、rocketmq、rabbitmq等消息中間件中),否則就丟消息。

          下面用偽代碼進(jìn)行演示消息發(fā)送和投遞的不可靠性:

          先進(jìn)行數(shù)據(jù)庫操作,再發(fā)送消息:

          public?void?test1(){
          ????//1?數(shù)據(jù)庫操作
          ????//2?發(fā)送MQ消息
          }
          ?


          這種情況下無法保證數(shù)據(jù)庫操作與發(fā)送消息的一致性,因?yàn)榭赡軘?shù)據(jù)庫操作成功,發(fā)送消息失敗。

          先發(fā)送消息,再操作數(shù)據(jù)庫:

          public?void?test1(){
          ????//1?發(fā)送MQ消息
          ????//2?數(shù)據(jù)庫操作
          }
          ?

          這種情況下無法保證數(shù)據(jù)庫操作與發(fā)送消息的一致性,因?yàn)榭赡馨l(fā)送消息成功,數(shù)據(jù)庫操作失敗。

          在數(shù)據(jù)庫事務(wù)中,先發(fā)送消息,后操作數(shù)據(jù)庫:

          @Transactional
          public?void?test1(){
          ????//1?發(fā)送MQ消息
          ????//2?數(shù)據(jù)庫操作
          }
          ?

          這里使用spring 的@Transactional注解,方法里面的操作都在一個(gè)事務(wù)中。同樣無法保證一致性,因?yàn)榘l(fā)送消息成功了,數(shù)據(jù)庫操作失敗的情況下,數(shù)據(jù)庫操作是回滾了,但是MQ消息沒法進(jìn)行回滾。

          在數(shù)據(jù)庫事務(wù)中,先操作數(shù)據(jù)庫,后發(fā)送消息:

          @Transactional
          public?void?test1(){
          ????//1?數(shù)據(jù)庫操作
          ????//2?發(fā)送MQ消息
          }
          ?


          這種情況下,貌似沒有問題,如果發(fā)送MQ消息失敗,拋出異常,事務(wù)一定會回滾(加上了@Transactional注解后,spring方法拋出異常后,會自動進(jìn)行回滾)。

          這只是一個(gè)假象,因?yàn)榘l(fā)送MQ消息可能事實(shí)上已經(jīng)成功,如果是響應(yīng)超時(shí)導(dǎo)致的異常。這個(gè)時(shí)候,數(shù)據(jù)庫操作依然回滾,但是MQ消息實(shí)際上已經(jīng)發(fā)送成功,導(dǎo)致不一致。

          與消息發(fā)送一致性流程的對比:

          • 常規(guī)MQ隊(duì)列消息的處理流程無法實(shí)現(xiàn)消息發(fā)送一致性;

          • 投遞消息的流程其實(shí)就是消息的消費(fèi)流程,可細(xì)化;

          TCC (Try-Confirm-Cancel)補(bǔ)償模式(最終一致性)

          TCC 其實(shí)就是采用的補(bǔ)償機(jī)制,其核心思想是:針對每個(gè)操作,都要注冊一個(gè)與其對應(yīng)的確認(rèn)和補(bǔ)償(撤銷)操作。

          它分為三個(gè)階段:

          • Try 階段主要是對業(yè)務(wù)系統(tǒng)做檢測及資源預(yù)留

          • Confirm 階段主要是對業(yè)務(wù)系統(tǒng)做確認(rèn)提交,Try階段執(zhí)行成功并開始執(zhí)行 Confirm階段時(shí),默認(rèn) Confirm階段是不會出錯(cuò)的。即:只要Try成功,Confirm一定成功。

          • Cancel 階段主要是在業(yè)務(wù)執(zhí)行錯(cuò)誤,需要回滾的狀態(tài)下執(zhí)行的業(yè)務(wù)取消,預(yù)留資源釋放。

          舉例(Bob 要向 Smith 轉(zhuǎn)賬):

          • 首先在 Try 階段,要先調(diào)用遠(yuǎn)程接口把 Smith 和 Bob 的錢凍結(jié)起來。

          • 在 Confirm 階段,執(zhí)行遠(yuǎn)程調(diào)用的轉(zhuǎn)賬的操作,轉(zhuǎn)賬成功進(jìn)行解凍。

          • 如果第2步執(zhí)行成功,那么轉(zhuǎn)賬成功,如果第二步執(zhí)行失敗,則調(diào)用遠(yuǎn)程凍結(jié)接口對應(yīng)的解凍方法 (Cancel)。

          優(yōu)點(diǎn):跟2PC比起來,實(shí)現(xiàn)以及流程相對簡單了一些,但數(shù)據(jù)的一致性比2PC也要差一些

          缺點(diǎn):缺點(diǎn)還是比較明顯的,在2,3步中都有可能失敗。TCC屬于應(yīng)用層的一種補(bǔ)償方式,所以需要程序員在實(shí)現(xiàn)的時(shí)候多寫很多補(bǔ)償?shù)拇a,在一些場景中,一些業(yè)務(wù)流程可能用TCC不太好定義及處理。

          可靠消息最終一致(常用)

          不要用本地的消息表了,直接基于MQ來實(shí)現(xiàn)事務(wù)。比如阿里的RocketMQ就支持消息事務(wù)。

          可靠消息最終一致性方案

          大概流程:

          • A系統(tǒng)先發(fā)送一個(gè)prepared消息到mq,如果這個(gè)prepared消息發(fā)送失敗那么就直接取消操作別執(zhí)行了

          • 如果這個(gè)消息發(fā)送成功過了,那么接著執(zhí)行本地事務(wù),如果成功就告訴mq發(fā)送確認(rèn)消息,如果失敗就告訴mq回滾消息

          • 如果發(fā)送了確認(rèn)消息,那么此時(shí)B系統(tǒng)會接收到確認(rèn)消息,然后執(zhí)行本地的事務(wù)

          • mq會自動定時(shí)輪詢所有prepared消息回調(diào)你的接口,問你,這個(gè)消息是不是本地事務(wù)處理失敗了,所有沒發(fā)送確認(rèn)消息?那是繼續(xù)重試還是回滾?一般來說這里你就可以查下數(shù)據(jù)庫看之前本地事務(wù)是否執(zhí)行,如果回滾了,那么這里也回滾吧。這個(gè)就是避免可能本地事務(wù)執(zhí)行成功了,別確認(rèn)消息發(fā)送失敗了。

          這個(gè)方案里,要是系統(tǒng)B的事務(wù)失敗了咋辦?重試咯,自動不斷重試直到成功,如果實(shí)在是不行,要么就是針對重要的資金類業(yè)務(wù)進(jìn)行回滾,比如B系統(tǒng)本地回滾后,想辦法通知系統(tǒng)A也回滾;或者是發(fā)送報(bào)警由人工來手工回滾和補(bǔ)償

          目前國內(nèi)互聯(lián)網(wǎng)公司大都是這么玩兒的,要不你使用RocketMQ支持的,要不你就基于其他MQ中間件自己封裝一套類似的邏輯,總之思路就是這樣的。

          最大努力通知

          業(yè)務(wù)發(fā)起方將協(xié)調(diào)服務(wù)的消息發(fā)送到MQ,下游服務(wù)接收此消息,如果處理失敗,將進(jìn)行重試,重試N次后依然失敗,將不進(jìn)行重試,放棄處理,這個(gè)應(yīng)用場景要求對事物性要求不高的地方。



          版權(quán)聲明:本文為博主原創(chuàng)文章,遵循?CC 4.0 BY-SA?版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接和本聲明。

          本文鏈接:

          https://blog.csdn.net/QAQFyl/article/details/113727579




          鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布

          ??????

          ??長按上方微信二維碼?2 秒






          感謝點(diǎn)贊支持下哈?

          瀏覽 39
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                    <th id="afajh"><progress id="afajh"></progress></th>
                    综合久久综合 | 婷婷内射视频在线观看 | 国产精品伦子伦露脸 | 国产中文字幕在线观看 | 女人操女人·视频网站 |