<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          四種策略確保 RabbitMQ 消息發(fā)送可靠性!你用哪種?

          共 6102字,需瀏覽 13分鐘

           ·

          2021-08-11 23:12

          微服務(wù)可以設(shè)計成消息驅(qū)動的微服務(wù),響應(yīng)式系統(tǒng)也可以基于消息中間件來做,從這個角度來說,在互聯(lián)網(wǎng)應(yīng)用開發(fā)中,消息中間件真的是太重要了。

          今天,以 RabbitMQ 為例,松哥來和大家聊一聊消息中間消息發(fā)送可靠性的問題。

          注意,以下內(nèi)容我主要和大家討論如何確保消息生產(chǎn)者將消息發(fā)送成功,并不涉及消息消費的問題。

          ?

          1. RabbitMQ 消息發(fā)送機制

          大家知道,RabbitMQ 中的消息發(fā)送引入了 Exchange(交換機)的概念,消息的發(fā)送首先到達交換機上,然后再根據(jù)既定的路由規(guī)則,由交換機將消息路由到不同的 Queue(隊列)中,再由不同的消費者去消費。


          9d9d755fed322e68fed66f7e6633578d.webp


          大致的流程就是這樣,所以要確保消息發(fā)送的可靠性,主要從兩方面去確認:

          1. 消息成功到達 Exchange

          2. 消息成功到達 Queue

          如果能確認這兩步,那么我們就可以認為消息發(fā)送成功了。

          如果這兩步中任一步驟出現(xiàn)問題,那么消息就沒有成功送達,此時我們可能要通過重試等方式去重新發(fā)送消息,多次重試之后,如果消息還是不能到達,則可能就需要人工介入了。

          經(jīng)過上面的分析,我們可以確認,要確保消息成功發(fā)送,我們只需要做好三件事就可以了:

          1. 確認消息到達 Exchange。

          2. 確認消息到達 Queue。

          3. 開啟定時任務(wù),定時投遞那些發(fā)送失敗的消息。

          ?

          2. RabbitMQ 的努力

          上面提出的三個步驟,第三步需要我們自己實現(xiàn),前兩步 RabbitMQ 則有現(xiàn)成的解決方案。

          如何確保消息成功到達 RabbitMQ?RabbitMQ 給出了兩種方案:

          1. 開啟事務(wù)機制

          2. 發(fā)送方確認機制

          這是兩種不同的方案,不可以同時開啟,只能選擇其中之一,如果兩者同時開啟,則會報如下錯誤:


          64323254bcc7608e6073cc1c84fb1493.webp


          我們分別來看。以下所有案例都在 Spring Boot 中展開,文末可以下載相關(guān)源碼。

          2.1 開啟事務(wù)機制

          Spring Boot 中開啟 RabbitMQ 事務(wù)機制的方式如下:

          首先需要先提供一個事務(wù)管理器,如下:

          @Bean
          RabbitTransactionManager?transactionManager(ConnectionFactory?connectionFactory)?{
          ????return?new?RabbitTransactionManager(connectionFactory);
          }

          接下來,在消息生產(chǎn)者上面做兩件事:添加事務(wù)注解并設(shè)置通信信道為事務(wù)模式:

          @Service
          public?class?MsgService?{
          ????@Autowired
          ????RabbitTemplate?rabbitTemplate;

          ????@Transactional
          ????public?void?send()?{
          ????????rabbitTemplate.setChannelTransacted(true);
          ????????rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello?rabbitmq!".getBytes());
          ????????int?i?=?1?/?0;
          ????}
          }

          這里注意兩點:

          1. 發(fā)送消息的方法上添加 @Transactional 注解標記事務(wù)。

          2. 調(diào)用 setChannelTransacted 方法設(shè)置為 true 開啟事務(wù)模式。

          這就 OK 了。

          在上面的案例中,我們在結(jié)尾來了個 1/0 ,這在運行時必然拋出異常,我們可以嘗試運行該方法,發(fā)現(xiàn)消息并未發(fā)送成功。

          當我們開啟事務(wù)模式之后,RabbitMQ 生產(chǎn)者發(fā)送消息會多出四個步驟:

          1. 客戶端發(fā)出請求,將信道設(shè)置為事務(wù)模式。

          2. 服務(wù)端給出回復,同意將信道設(shè)置為事務(wù)模式。

          3. 客戶端發(fā)送消息。

          4. 客戶端提交事務(wù)。

          5. 服務(wù)端給出響應(yīng),確認事務(wù)提交。

          上面的步驟,除了第三步是本來就有的,其他幾個步驟都是平白無故多出來的。所以大家看到,事務(wù)模式其實效率有點低,這并非一個最佳解決方案。我們可以想想,什么項目會用到消息中間件?一般來說都是一些高并發(fā)的項目,這個時候并發(fā)性能尤為重要。

          所以,RabbitMQ 還提供了發(fā)送方確認機制(publisher confirm)來確保消息發(fā)送成功,這種方式,性能要遠遠高于事務(wù)模式,一起來看下。

          2.2 發(fā)送方確認機制

          2.2.1 單條消息處理

          首先我們移除剛剛關(guān)于事務(wù)的代碼,然后在 application.properties 中配置開啟消息發(fā)送方確認機制,如下:

          spring.rabbitmq.publisher-confirm-type=correlated
          spring.rabbitmq.publisher-returns=true

          第一行是配置消息到達交換器的確認回調(diào),第二行則是配置消息到達隊列的回調(diào)。

          第一行屬性的配置有三個取值:

          1. none:表示禁用發(fā)布確認模式,默認即此。

          2. correlated:表示成功發(fā)布消息到交換器后會觸發(fā)的回調(diào)方法。

          3. simple:類似 correlated,并且支持 waitForConfirms()waitForConfirmsOrDie() 方法的調(diào)用。

          接下來我們要開啟兩個監(jiān)聽,具體配置如下:

          @Configuration
          public?class?RabbitConfig?implements?RabbitTemplate.ConfirmCallback,?RabbitTemplate.ReturnsCallback?{
          ????public?static?final?String?JAVABOY_EXCHANGE_NAME?=?"javaboy_exchange_name";
          ????public?static?final?String?JAVABOY_QUEUE_NAME?=?"javaboy_queue_name";
          ????private?static?final?Logger?logger?=?LoggerFactory.getLogger(RabbitConfig.class);
          ????@Autowired
          ????RabbitTemplate?rabbitTemplate;
          ????@Bean
          ????Queue?queue()?{
          ????????return?new?Queue(JAVABOY_QUEUE_NAME);
          ????}
          ????@Bean
          ????DirectExchange?directExchange()?{
          ????????return?new?DirectExchange(JAVABOY_EXCHANGE_NAME);
          ????}
          ????@Bean
          ????Binding?binding()?{
          ????????return?BindingBuilder.bind(queue())
          ????????????????.to(directExchange())
          ????????????????.with(JAVABOY_QUEUE_NAME);
          ????}

          ????@PostConstruct
          ????public?void?initRabbitTemplate()?{
          ????????rabbitTemplate.setConfirmCallback(this);
          ????????rabbitTemplate.setReturnsCallback(this);
          ????}

          ????@Override
          ????public?void?confirm(CorrelationData?correlationData,?boolean?ack,?String?cause)?{
          ????????if?(ack)?{
          ????????????logger.info("{}:消息成功到達交換器",correlationData.getId());
          ????????}else{
          ????????????logger.error("{}:消息發(fā)送失敗",?correlationData.getId());
          ????????}
          ????}

          ????@Override
          ????public?void?returnedMessage(ReturnedMessage?returned)?{
          ????????logger.error("{}:消息未成功路由到隊列",returned.getMessage().getMessageProperties().getMessageId());
          ????}
          }

          關(guān)于這個配置類,我說如下幾點:

          1. 定義配置類,實現(xiàn) RabbitTemplate.ConfirmCallbackRabbitTemplate.ReturnsCallback 兩個接口,這兩個接口,前者的回調(diào)用來確定消息到達交換器,后者則會在消息路由到隊列失敗時被調(diào)用。

          2. 定義 initRabbitTemplate 方法并添加 @PostConstruct 注解,在該方法中為 rabbitTemplate 分別配置這兩個 Callback。

          這就可以了。

          接下來我們對消息發(fā)送進行測試。

          首先我們嘗試將消息發(fā)送到一個不存在的交換機中,像下面這樣:

          rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello?rabbitmq!".getBytes(),new?CorrelationData(UUID.randomUUID().toString()));

          注意第一個參數(shù)是一個字符串,不是變量,這個交換器并不存在,此時控制臺會報如下錯誤:


          2c89a02becfba16acf94a3008254b8c2.webp


          接下來我們給定一個真實存在的交換器,但是給一個不存在的隊列,像下面這樣:

          rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello?rabbitmq!".getBytes(),new?CorrelationData(UUID.randomUUID().toString()));

          注意此時第二個參數(shù)是一個字符串,不是變量。


          5c017b046a7c76b00a3807d00897139c.webp


          可以看到,消息雖然成功達到交換器了,但是沒有成功路由到隊列(因為隊列不存在)。

          這是一條消息的發(fā)送,我們再來看看消息的批量發(fā)送。

          2.2.2 消息批量處理

          如果是消息批量處理,那么發(fā)送成功的回調(diào)監(jiān)聽是一樣的,這里不再贅述。

          這就是 publisher-confirm 模式。

          相比于事務(wù),這種模式下的消息吞吐量會得到極大的提升。

          ?

          3. 失敗重試

          失敗重試分兩種情況,一種是壓根沒找到 MQ 導致的失敗重試,另一種是找到 MQ 了,但是消息發(fā)送失敗了。

          兩種重試我們分別來看。

          3.1 自帶重試機制

          前面所說的事務(wù)機制和發(fā)送方確認機制,都是發(fā)送方確認消息發(fā)送成功的辦法。如果發(fā)送方一開始就連不上 MQ,那么 Spring Boot 中也有相應(yīng)的重試機制,但是這個重試機制就和 MQ 本身沒有關(guān)系了,這是利用 Spring 中的 retry 機制來完成的,具體配置如下:

          spring.rabbitmq.template.retry.enabled=true
          spring.rabbitmq.template.retry.initial-interval=1000ms
          spring.rabbitmq.template.retry.max-attempts=10
          spring.rabbitmq.template.retry.max-interval=10000ms
          spring.rabbitmq.template.retry.multiplier=2

          從上往下配置含義依次是:

          • 開啟重試機制。

          • 重試起始間隔時間。

          • 最大重試次數(shù)。

          • 最大重試間隔時間。

          • 間隔時間乘數(shù)。(這里配置間隔時間乘數(shù)為 2,則第一次間隔時間 1 秒,第二次重試間隔時間 2 秒,第三次 4 秒,以此類推)

          配置完成后,再次啟動 Spring Boot 項目,然后關(guān)掉 MQ,此時嘗試發(fā)送消息,就會發(fā)送失敗,進而導致自動重試。


          bb66c98cbf77c06e517dbd889a2da26f.webp


          3.2 業(yè)務(wù)重試

          業(yè)務(wù)重試主要是針對消息沒有到達交換器的情況。

          如果消息沒有成功到達交換器,根據(jù)我們第二小節(jié)的講解,此時就會觸發(fā)消息發(fā)送失敗回調(diào),在這個回調(diào)中,我們就可以做文章了!

          整體思路是這樣:

          1. 首先創(chuàng)建一張表,用來記錄發(fā)送到中間件上的消息,像下面這樣:


          e7f94a49f7b0cbff915929fcde7498b5.webp


          每次發(fā)送消息的時候,就往數(shù)據(jù)庫中添加一條記錄。這里的字段都很好理解,有三個我額外說下:

          • status:表示消息的狀態(tài),有三個取值,0,1,2 分別表示消息發(fā)送中、消息發(fā)送成功以及消息發(fā)送失敗。

          • tryTime:表示消息的第一次重試時間(消息發(fā)出去之后,在 tryTime 這個時間點還未顯示發(fā)送成功,此時就可以開始重試了)。

          • count:表示消息重試次數(shù)。

          其他字段都很好理解,我就不一一啰嗦了。

          1. 在消息發(fā)送的時候,我們就往該表中保存一條消息發(fā)送記錄,并設(shè)置狀態(tài) status 為 0,tryTime 為 1 分鐘之后。

          2. 在 confirm 回調(diào)方法中,如果收到消息發(fā)送成功的回調(diào),就將該條消息的 status 設(shè)置為1(在消息發(fā)送時為消息設(shè)置 msgId,在消息發(fā)送成功回調(diào)時,通過 msgId 來唯一鎖定該條消息)。

          3. 另外開啟一個定時任務(wù),定時任務(wù)每隔 10s 就去數(shù)據(jù)庫中撈一次消息,專門去撈那些 status 為 0 并且已經(jīng)過了 tryTime 時間記錄,把這些消息拎出來后,首先判斷其重試次數(shù)是否已超過 3 次,如果超過 3 次,則修改該條消息的 status 為 2,表示這條消息發(fā)送失敗,并且不再重試。對于重試次數(shù)沒有超過 3 次的記錄,則重新去發(fā)送消息,并且為其 count 的值+1。

          大致的思路就是上面這樣,松哥這里就不給出代碼了,松哥的 vhr 里邊郵件發(fā)送就是這樣的思路來處理的,完整代碼大家可以參考 vhr 項目(https://github.com/lenve/vhr)。

          當然這種思路有兩個弊端:

          1. 去數(shù)據(jù)庫走一遭,可能拖慢 MQ 的 Qos,不過有的時候我們并不需要 MQ 有很高的 Qos,所以這個應(yīng)用時要看具體情況。

          2. 按照上面的思路,可能會出現(xiàn)同一條消息重復發(fā)送的情況,不過這都不是事,我們在消息消費時,解決好冪等性問題就行了。

          當然,大家也要注意,消息是否要確保 100% 發(fā)送成功,也要看具體情況。

          4. 小結(jié)

          好啦,這就是關(guān)于消息生產(chǎn)者的一些常見問題以及對應(yīng)的解決方案,下篇文章松哥和大家探討如果保證消息消費成功并解決冪等性問題。

          本文涉及到的相關(guān)源代碼大家可以在這里下載:https://github.com/lenve/javaboy-code-samples。


          有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)

          歡迎大家關(guān)注Java之道公眾號


          好文章,我在看??

          瀏覽 24
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  最新国产精品视频豆花 | 国产精品一区人妻精品阁在线 | 欧美A一级片 | 色婷婷在线综合 | 日本三级片天天干 |