MQ->消費者,因此在這三個步驟中都有可能造成消息丟失。一 消息生產(chǎn)者沒有把消息成功發(fā)送到MQ1.1 事務(wù)機制 AMQP協(xié)議提供了事務(wù)機制,在投遞消息時開啟事務(wù)支持,如果消息投遞失敗,..." />
<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          給你1分鐘,回答下RabbitMQ如何保證消息不丟?

          共 5138字,需瀏覽 11分鐘

           ·

          2020-09-04 23:50

          一條消費成功被消費經(jīng)歷了生產(chǎn)者->MQ->消費者,因此在這三個步驟中都有可能造成消息丟失。

          一 消息生產(chǎn)者沒有把消息成功發(fā)送到MQ

          1.1 事務(wù)機制

          AMQP協(xié)議提供了事務(wù)機制,在投遞消息時開啟事務(wù)支持,如果消息投遞失敗,則回滾事務(wù)。

          自定義事務(wù)管理器

          @Configuration
          public?class?RabbitTranscation?{

          ????@Bean
          ????public?RabbitTransactionManager?rabbitTransactionManager(ConnectionFactory?connectionFactory){
          ????????return?new?RabbitTransactionManager(connectionFactory);
          ????}

          ????@Bean
          ????public?RabbitTemplate?rabbitTemplate(ConnectionFactory?connectionFactory){
          ????????return?new?RabbitTemplate(connectionFactory);
          ????}
          }

          修改yml

          spring:
          ??rabbitmq:
          ????#?消息在未被隊列收到的情況下返回
          ????publisher-returns:?true

          開啟事務(wù)支持

          rabbitTemplate.setChannelTransacted(true);

          消息未接收時調(diào)用ReturnCallback

          rabbitTemplate.setMandatory(true);

          生產(chǎn)者投遞消息

          @Service
          public?class?ProviderTranscation?implements?RabbitTemplate.ReturnCallback?{

          ????@Autowired
          ????RabbitTemplate?rabbitTemplate;

          ????@PostConstruct
          ????public?void?init(){
          ????????//?設(shè)置channel開啟事務(wù)
          ????????rabbitTemplate.setChannelTransacted(true);
          ????????rabbitTemplate.setReturnCallback(this);
          ????}

          ????@Override
          ????public?void?returnedMessage(Message?message,?int?replyCode,?String?replyText,?String?exchange,?String?routingKey)?{
          ????????System.out.println("這條消息發(fā)送失敗了"+message+",請?zhí)幚?);
          ????}

          ????@Transactional(rollbackFor?=?Exception.class,transactionManager?=?"rabbitTransactionManager")
          ????public?void?publishMessage(String?message)?throws?Exception?{
          ????????rabbitTemplate.setMandatory(true);
          ????????rabbitTemplate.convertAndSend("javatrip",message);
          ????}
          }

          但是,很少有人這么干,因為這是同步操作,一條消息發(fā)送之后會使發(fā)送端阻塞,以等待RabbitMQ-Server的回應(yīng),之后才能繼續(xù)發(fā)送下一條消息,生產(chǎn)者生產(chǎn)消息的吞吐量和性能都會大大降低。

          1.2 發(fā)送方確認機制

          發(fā)送消息時將信道設(shè)置為confirm模式,消息進入該信道后,都會被指派給一個唯一ID,一旦消息被投遞到所匹配的隊列后,RabbitMQ就會發(fā)送給生產(chǎn)者一個確認。

          開啟消息確認機制

          spring:
          ??rabbitmq:
          ????#?消息在未被隊列收到的情況下返回
          ????publisher-returns:?true
          ????#?開啟消息確認機制
          ????publisher-confirm-type:?correlated

          消息未接收時調(diào)用ReturnCallback

          rabbitTemplate.setMandatory(true);

          生產(chǎn)者投遞消息

          @Service
          public?class?ConfirmProvider?implements?RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback?{

          ????@Autowired
          ????RabbitTemplate?rabbitTemplate;

          ????@PostConstruct
          ????public?void?init()?{
          ????????rabbitTemplate.setReturnCallback(this);
          ????????rabbitTemplate.setConfirmCallback(this);
          ????}

          ????@Override
          ????public?void?confirm(CorrelationData?correlationData,?boolean?ack,?String?cause)?{
          ????????if(ack){
          ????????????System.out.println("確認了這條消息:"+correlationData);
          ????????}else{
          ????????????System.out.println("確認失敗了:"+correlationData+";出現(xiàn)異常:"+cause);
          ????????}
          ????}

          ????@Override
          ????public?void?returnedMessage(Message?message,?int?replyCode,?String?replyText,?String?exchange,?String?routingKey)?{
          ????????System.out.println("這條消息發(fā)送失敗了"+message+",請?zhí)幚?);
          ????}

          ????public?void?publisMessage(String?message){
          ????????rabbitTemplate.setMandatory(true);
          ????????rabbitTemplate.convertAndSend("javatrip",message);
          ????}
          }

          如果消息確認失敗后,我們可以進行消息補償,也就是消息的重試機制。當(dāng)未收到確認信息時進行消息的重新投遞。設(shè)置如下配置即可完成。

          spring:
          ??rabbitmq:
          ????#?支持消息發(fā)送失敗后重返隊列
          ????publisher-returns:?true
          ????#?開啟消息確認機制
          ????publisher-confirm-type:?correlated
          ????listener:
          ??????simple:
          ????????retry:
          ??????????#?開啟重試
          ??????????enabled:?true
          ??????????#?最大重試次數(shù)
          ??????????max-attempts:?5
          ??????????#?重試時間間隔
          ??????????initial-interval:?3000

          二 消息發(fā)送到MQ后,MQ宕機導(dǎo)致內(nèi)存中的消息丟失

          消息在MQ中有可能發(fā)生丟失,這時候我們就需要將隊列和消息都進行持久化。

          @Queue注解為我們提供了隊列相關(guān)的一些屬性,具體如下:

          1. name: 隊列的名稱;
          2. durable: 是否持久化;
          3. exclusive: 是否獨享、排外的;
          4. autoDelete: 是否自動刪除;
          5. arguments:隊列的其他屬性參數(shù),有如下可選項,可參看圖2的arguments:
            • x-message-ttl:消息的過期時間,單位:毫秒;
            • x-expires:隊列過期時間,隊列在多長時間未被訪問將被刪除,單位:毫秒;
            • x-max-length:隊列最大長度,超過該最大值,則將從隊列頭部開始刪除消息;
            • x-max-length-bytes:隊列消息內(nèi)容占用最大空間,受限于內(nèi)存大小,超過該閾值則從隊列頭部開始刪除消息;
            • x-overflow:設(shè)置隊列溢出行為。這決定了當(dāng)達到隊列的最大長度時消息會發(fā)生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁隊列類型僅支持drop-head;
            • x-dead-letter-exchange:死信交換器名稱,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息可指定發(fā)送到該交換器中;
            • x-dead-letter-routing-key:死信消息路由鍵,在消息發(fā)送到死信交換器時會使用該路由鍵,如果不設(shè)置,則使用消息的原來的路由鍵值
            • x-single-active-consumer:表示隊列是否是單一活動消費者,true時,注冊的消費組內(nèi)只有一個消費者消費消息,其他被忽略,false時消息循環(huán)分發(fā)給所有消費者(默認false)
            • x-max-priority:隊列要支持的最大優(yōu)先級數(shù);如果未設(shè)置,隊列將不支持消息優(yōu)先級;
            • x-queue-mode(Lazy mode):將隊列設(shè)置為延遲模式,在磁盤上保留盡可能多的消息,以減少RAM的使用;如果未設(shè)置,隊列將保留內(nèi)存緩存以盡可能快地傳遞消息;
            • x-queue-master-locator:在集群模式下設(shè)置鏡像隊列的主節(jié)點信息。
          持久化隊列
          創(chuàng)建隊列的時候?qū)⒊志没瘜傩詃urable設(shè)置為true,同時要將autoDelete設(shè)置為false
          @Queue(value?=?"javatrip",durable?=?"true",autoDelete?=?"false")
          持久化消息
          發(fā)送消息的時候?qū)⑾⒌膁eliveryMode設(shè)置為2,在Spring Boot中消息默認就是持久化的。

          三 消費者消費消息的時候,未消費完畢就出現(xiàn)了異常

          消費者剛消費了消息,還沒有處理業(yè)務(wù),結(jié)果發(fā)生異常。這時候就需要關(guān)閉自動確認,改為手動確認消息。
          修改yml為手動簽收模式

          spring:
          ??rabbitmq:
          ????listener:
          ??????simple:
          ????????#?手動簽收模式
          ????????acknowledge-mode:?manual
          ????????#?每次簽收一條消息
          ????????prefetch:?1

          消費者手動簽收

          @Component
          @RabbitListener(queuesToDeclare?=?@Queue(value?=?"javatrip",?durable?=?"true"))
          public?class?Consumer?{

          ????@RabbitHandler
          ????public?void?receive(String?message,?@Headers?Map?headers,?Channel?channel)?throws?Exception{

          ????????System.out.println(message);
          ????????//?唯一的消息ID
          ????????Long?deliverTag?=?(Long)?headers.get(AmqpHeaders.DELIVERY_TAG);
          ????????//?確認該條消息
          ????????if(...){
          ????????????channel.basicAck(deliverTag,false);
          ????????}else{
          ????????????//?消費失敗,消息重返隊列
          ????????????channel.basicNack(deliverTag,false,true);
          ????????}

          ????}
          }

          四 總結(jié)

          消息丟失的原因?

          生產(chǎn)者、MQ、消費者都有可能造成消息丟失

          如何保證消息的可靠性?

          • 發(fā)送方采取發(fā)送者確認模式
          • MQ進行隊列及消息的持久化
          • 消費者消費成功后手動確認消息

          有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)

          歡迎大家關(guān)注Java之道公眾號


          好文章,我在看??

          瀏覽 56
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩成人无码专区 | 体内射精一区二区三区在线视频 | 国产欧美视频在线 | 操逼大战| 婷婷三级片 |