<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          10分鐘掌握RocketMQ的核心知識(shí)

          共 10791字,需瀏覽 22分鐘

           ·

          2021-06-27 13:12

          Spring Boot 作為主流微服務(wù)框架,擁有成熟的社區(qū)生態(tài)。市場應(yīng)用廣泛,為了方便大家,整理了一個(gè)基于spring boot的常用中間件快速集成入門系列手冊(cè),涉及RPC、緩存、消息隊(duì)列、分庫分表、注冊(cè)中心、分布式配置等常用開源組件,大概有幾十篇文章,陸續(xù)會(huì)開放出來,感興趣同學(xué)請(qǐng)?zhí)崆瓣P(guān)注&收藏

          前言

          Apache RocketMQ 是阿里開源的一款高性能、高吞吐量的分布式消息中間件。

          RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負(fù)責(zé)生產(chǎn)消息,Consumer 負(fù)責(zé)消費(fèi)消息,Broker 負(fù)責(zé)存儲(chǔ)消息。每個(gè) Broker 可以存儲(chǔ)多個(gè)Topic的消息,每個(gè)Topic的消息也可以分片存儲(chǔ)于集群中的不同的Broker Group。

          快速安裝:https://rocketmq.apache.org/docs/quick-start/

          源代碼:https://github.com/apache/rocketmq-spring

          主要功能:

          1、業(yè)務(wù)解耦。采用發(fā)布訂閱模式,生產(chǎn)端發(fā)送消息到MQ Server,下游的消費(fèi)端訂閱接收消息。異步形式,系統(tǒng)解耦,提升系統(tǒng)擴(kuò)展性

          2、削峰限流。由于消息中間件的吞吐量很高,過量的請(qǐng)求會(huì)暫時(shí)放在 MQ server,下游慢慢消費(fèi),避免過量請(qǐng)求沖垮系統(tǒng)

          3、億級(jí)消息的堆積能力,單個(gè)隊(duì)列中的百萬級(jí)消息的累積容量。

          4、高可用性:Broker服務(wù)器支持多Master多Slave的同步雙寫以及Master多Slave的異步復(fù)制模式,其中同步雙寫可保證消息不丟失。

          5、高可靠性:生產(chǎn)者將消息發(fā)送到Broker端有三種方式,同步、異步和單向。Broker在對(duì)于消息刷盤有兩種策略:同步刷盤和異步刷盤,其中同步刷盤可以保證消息成功的存儲(chǔ)到磁盤中。消費(fèi)者的消費(fèi)模式也有集群消費(fèi)和廣播消費(fèi)兩種,默認(rèn)集群消費(fèi),如果集群模式中消費(fèi)者掛了,一個(gè)組里的其他消費(fèi)者會(huì)接替其消費(fèi)。

          6、分布式事務(wù)消息:這里是采用半消息確認(rèn)和消息回查機(jī)制來保證分布式事務(wù)消息。

          7、支持消息過濾:建議采用消費(fèi)者業(yè)務(wù)端的tag過濾

          8、支持順序消息:消息在Broker中是采用隊(duì)列的FIFO模式存儲(chǔ)的,也就是發(fā)送是順序的,只要保證消費(fèi)的順序性即可。

          9、支持定時(shí)消息和延遲消息:Broker中由定時(shí)消息的機(jī)制,消息發(fā)送到Broker中,不會(huì)立即被Consumer消費(fèi),會(huì)等到一定的時(shí)間才被消費(fèi)。延遲消息也是一樣,延遲一定時(shí)間之后才會(huì)被Consumer消費(fèi)。

          核心組件:

          1、Namesrv

          Namesrv充當(dāng)路由消息的提供者。Namesrv是一個(gè)幾乎無狀態(tài)節(jié)點(diǎn),多個(gè)Namesrv實(shí)例組成集群,但相互獨(dú)立,沒有信息交換。Namesrv主要作用是:為producer和consumer提供關(guān)于topic的路由信息。管理broker節(jié)點(diǎn):監(jiān)控更新broker的實(shí)時(shí)狀態(tài)。路由注冊(cè)、路由刪除(故障剔除)。

          2、Broker

          負(fù)責(zé)存儲(chǔ)消息、轉(zhuǎn)發(fā)消息。Broker是以group為單位提供服務(wù)。一個(gè)group里面分Master和Slave。Master和Slave存儲(chǔ)的數(shù)據(jù)一樣,slave從master同步數(shù)據(jù)(同步雙寫或異步復(fù)制看配置)。一個(gè)Master可以對(duì)應(yīng)多個(gè)Slave,一個(gè)Slave只能對(duì)應(yīng)一個(gè)Master。Master與Slave的對(duì)應(yīng)關(guān)系通過指定相同的BrokerName、不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。

          基本概念:

          1、主題(Topic) 表示一類消息的集合,每個(gè)主題包含若干條消息,每條消息只能屬于一個(gè)主題,是RocketMQ進(jìn)行消息訂閱的基本單位。每個(gè)topic可分為若干個(gè)分區(qū)(queue)

          2、生產(chǎn)者組(Producer Group) 同一類Producer的集合,這類Producer發(fā)送同一類消息且發(fā)送邏輯一致。如果發(fā)送的是事務(wù)消息且原始生產(chǎn)者在發(fā)送之后崩潰,則Broker服務(wù)器會(huì)聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實(shí)例以提交或回溯消費(fèi)。

          3、消費(fèi)者組(Consumer Group) 同一類Consumer的集合,這類Consumer通常消費(fèi)同一類消息且消費(fèi)邏輯一致。消費(fèi)者組使得在消息消費(fèi)方面,實(shí)現(xiàn)負(fù)載均衡和容錯(cuò)的目標(biāo)變得非常容易。要注意的是,消費(fèi)者組的消費(fèi)者實(shí)例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(fèi)(Clustering)和廣播消費(fèi)(Broadcasting)。

          4、普通順序消息(Normal Ordered Message) 普通順序消費(fèi)模式下,消費(fèi)者通過同一個(gè)消費(fèi)隊(duì)列收到的消息是有順序的,不同消息隊(duì)列收到的消息則可能是無順序的。

          5、嚴(yán)格順序消息(Strictly Ordered Message) 嚴(yán)格順序消息模式下,消費(fèi)者收到的所有消息均是有序的。

          6、消息(Message) 消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費(fèi)數(shù)據(jù)的最小單位,每條消息必須屬于一個(gè)主題。RocketMQ中每個(gè)消息擁有唯一的Message ID,且可以攜帶具有業(yè)務(wù)標(biāo)識(shí)的Key。系統(tǒng)提供了通過Message ID和Key查詢消息的功能。

          7、標(biāo)簽(Tag) 為消息設(shè)置的標(biāo)志,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務(wù)單元的消息,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽。標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對(duì)不同子主題的不同消費(fèi)邏輯,實(shí)現(xiàn)更好的擴(kuò)展性。

          RocketMQ 特性:

          • 同步發(fā)送
          • 異步發(fā)送
          • 單向方式發(fā)送
          • 發(fā)送有序消息
          • 發(fā)送批量消息
          • 發(fā)送事務(wù)信息
          • 發(fā)送延遲消息
          • 消費(fèi)有序消息
          • 使用標(biāo)簽或sql92表達(dá)式過濾消息
          • 支持消息跟蹤
          • 支持身份驗(yàn)證和授權(quán)
          • 支持請(qǐng)求-回復(fù)消息交換模式
          • 消費(fèi)消息支持推、拉模式

          代碼演示

          外部依賴:

          spring boot 已經(jīng)為RocketMQ 封裝了starter組件,只需在 pom.xml 文件中添加jar版本依賴即可:

          <dependency>
          ????<groupId>org.apache.rocketmq</groupId>
          ????<artifactId>rocketmq-spring-boot-starter</artifactId>
          ????<version>2.0.3</version>
          </dependency>

          配置文件:

          在配置文件 application.yaml 中配置 RocketMQ 的相關(guān)參數(shù),具體內(nèi)容如下:

          rocketmq:
          ??name-server:?localhost:9876
          ??consumer:
          ????topic:?maker-order-topic
          ????group:?my-group1
          ??producer:
          ????group:?p-my-group1

          消息生產(chǎn)端:

          @Resource
          private?RocketMQTemplate?rocketMQTemplate;
          private?static?String?makerOrderTopic?=?"maker-order-topic";

          @GetMapping("/send_make_order_message")
          public?Object?send_make_order_message()?{
          ????try?{
          ????????Long?orderId?=?Long.valueOf(new?Random().nextInt(1000000));
          ????????OrderModel?orderModel?=?OrderModel.builder().orderId(orderId).buyerUid(200000L).amount(26.8).shippingAddress("上海").build();
          ????????SendResult?sendResult?=?rocketMQTemplate.syncSend(makerOrderTopic,?orderModel);
          ????????System.out.printf("Send?message?to?topic?%s?,?sendResult=%s?%n",?makerOrderTopic,?sendResult);
          ????????return?"消息發(fā)送成功";
          ????}?catch?(Exception?e)?{
          ????????e.printStackTrace();
          ????????return?"消息發(fā)送失敗";
          ????}
          }

          消息消費(fèi)端:

          @Service
          @RocketMQMessageListener(nameServer?=?"${rocketmq.name-server}",?topic?=?"${rocketmq.consumer.topic}",?consumerGroup?=?"${rocketmq.consumer.group}")
          public?class?OrderConsumer?implements?RocketMQListener<OrderModel>?{

          ????@Override
          ????public?void?onMessage(OrderModel?orderModel)?{
          ????????System.out.printf("consumer?received?message:?%s?\n",?JSON.toJSONString(orderModel));
          ????}
          }

          操作演示

          瀏覽器訪問:http://localhost:9071/send_make_order_message,模擬生產(chǎn)端發(fā)送消息到MQ Server中。

          消費(fèi)端接收消息日志:

          Send?message?to?topic?maker-order-topic?,?sendResult=SendResult?[sendStatus=SEND_OK,?msgId=C0A80069816F14DAD5DC73A75B9F0014,?offsetMsgId=C0A8006900002A9F0000000000058841,?messageQueue=MessageQueue?[topic=maker-order-topic,?brokerName=192.168.0.105,?queueId=2],?queueOffset=0]?
          consumer?received?message:?{"amount":26.8,"buyerUid":200000,"orderId":895586,"shippingAddress":"上海"}?

          其他消息類型如何發(fā)送

          1、同步發(fā)送

          同步發(fā)送是指消息發(fā)送方發(fā)出一條消息后,在收到服務(wù)端返回響應(yīng)后,線程才會(huì)執(zhí)行后續(xù)代碼

          OrderModel?orderModel?=?mockOrderModel();
          Message?message?=?new?Message(makerOrderTopic,?"TageA",?JSON.toJSONString(orderModel).getBytes());
          SendResult?sendResult?=?rocketMQTemplate.getProducer().send(message);

          2、異步發(fā)送

          異步發(fā)送是指發(fā)送方發(fā)出一條消息后,不需要等服務(wù)端返回響應(yīng)。異步發(fā)送,需要實(shí)現(xiàn)異步發(fā)送回調(diào)接口(SendCallback),通過回調(diào)接口接收服務(wù)端響應(yīng),并處理結(jié)果

          OrderModel?orderModel?=?mockOrderModel();
          rocketMQTemplate.asyncSend(makerOrderTopic,?orderModel,?new?SendCallback()?{
          ????@Override
          ????public?void?onSuccess(SendResult?sendResult)?{
          ????????System.out.println("消息發(fā)送成功,msgId="?+?sendResult.getMsgId());
          ????}

          ????@Override
          ????public?void?onException(Throwable?throwable)?{
          ????????System.out.println("發(fā)送失敗,"?+?throwable);
          ????}
          });

          3、順序消息

          對(duì)于指定的一個(gè)Topic,所有消息根據(jù)Sharding Key分區(qū)。同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的FIFO順序進(jìn)行發(fā)布和消費(fèi)。Sharding Key是順序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段,和普通消息的Key是完全不同的概念。

          比如:電商的訂單創(chuàng)建,以訂單ID作為Sharding Key,那么同一個(gè)訂單相關(guān)的消息,如創(chuàng)建訂單、付款、發(fā)貨、訂單退款消息、訂單物流消息都會(huì)按照發(fā)布的先后順序來消費(fèi)。

          for?(long?orderId?=?0;?orderId?<?20;?orderId++)?{
          ????String?shardingKey?=?String.valueOf(orderId?%?5);
          ????OrderModel?orderModel?=?OrderModel.builder().orderId(orderId).build();
          ????SendResult?sendResult?=?rocketMQTemplate.syncSendOrderly(makerOrderTopic,?orderModel,?shardingKey);
          ????if?(sendResult?!=?null)?{
          ????????System.out.println(orderId?+?"?,發(fā)送成功");
          ????}
          }

          4、延時(shí)消息

          Producer將消息發(fā)送到消息隊(duì)列RocketMQ服務(wù)端,但并不期望立馬投遞這條消息,而是延遲一定時(shí)間后才投遞到Consumer進(jìn)行消費(fèi),該消息稱為延時(shí)消息。

          OrderModel?orderModel?=?mockOrderModel();
          org.springframework.messaging.Message?message?=?MessageBuilder.withPayload(JSON.toJSONString(orderModel).getBytes()).build();
          //延時(shí)等級(jí)?3,?這個(gè)消息將在10s之后發(fā)送,現(xiàn)在只支持固定的幾個(gè)時(shí)間值
          //delayTimeLevel?=?"1s?5s?10s?30s?1m?2m?3m?4m?5m?6m?7m?8m?9m?10m?20m?30m?1h?2h";
          SendResult?sendResult?=?rocketMQTemplate.syncSend(makerOrderTopic,?message,?8000,?3);

          5、事務(wù)消息

          RocketMQ提供類似X/Open XA的分布式事務(wù)功能,通過消息隊(duì)列RocketMQ事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。

          由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,消息隊(duì)列RocketMQ服務(wù)端通過掃描發(fā)現(xiàn)某條消息長期處于“半事務(wù)消息”時(shí),主動(dòng)向生產(chǎn)者查詢?cè)撓⒌淖罱K狀態(tài)(Commit或Rollback),該過程稱之為消息回查。

          典型場景:在電商購物車下單時(shí),涉及到購物車系統(tǒng)和交易系統(tǒng),這兩個(gè)系統(tǒng)之間的數(shù)據(jù)最終一致性可以通過分布式事務(wù)消息的異步處理實(shí)現(xiàn)。在這種場景下,交易系統(tǒng)是最為核心的系統(tǒng),需要最大限度地保證下單成功。而購物車系統(tǒng)只需要訂閱消息隊(duì)列RocketMQ的交易訂單消息,做相應(yīng)的業(yè)務(wù)處理,即可保證最終的數(shù)據(jù)一致性。

          發(fā)送步驟:

          • 發(fā)送方將半事務(wù)消息發(fā)送至MQ Server。
          • MQ服務(wù)端將消息持久化成功之后,向發(fā)送方返回Ack確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半事務(wù)消息。
          • 發(fā)送方開始執(zhí)行本地事務(wù)邏輯
          • 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)(Commit或Rollback),服務(wù)端收到Commit狀態(tài)則將半事務(wù)消息標(biāo)記為可投遞,訂閱方將收到該消息;服務(wù)端收到Rollback狀態(tài)則刪除半事務(wù)消息,訂閱方不會(huì)收到該消息。

          回查步驟:

          • 在斷網(wǎng)或者應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá)服務(wù)端,經(jīng)過固定時(shí)間后服務(wù)端將對(duì)該消息發(fā)起消息回查。
          • 發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
          • 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對(duì)半事務(wù)消息進(jìn)行操作。

          發(fā)送半事務(wù)消息,示例代碼如下:

          OrderModel?orderModel?=?mockOrderModel();
          org.springframework.messaging.Message?message?=?MessageBuilder.withPayload(JSON.toJSONString(orderModel)).build();
          TransactionSendResult?transactionSendResult?=?rocketMQTemplate.sendMessageInTransaction("tx_order_message",?makerOrderTopic,?message,?null);
          SendStatus?sendStatus?=?transactionSendResult.getSendStatus();
          LocalTransactionState?localTransactionState?=?transactionSendResult.getLocalTransactionState();
          System.out.println("send message status:?"?+?sendStatus?+?" , localTransactionState:?"?+?localTransactionState);

          編寫RocketMQLocalTransactionListener接口實(shí)現(xiàn)類,實(shí)現(xiàn)執(zhí)行本地事務(wù)和事務(wù)回查兩個(gè)方法。

          @Component
          @RocketMQTransactionListener(txProducerGroup?=?"tx_order_message")
          public?class?TXProducerListener?implements?RocketMQLocalTransactionListener?{

          ????@Override
          ????public?RocketMQLocalTransactionState?executeLocalTransaction(Message?message,?Object?arg)?{
          ????????//?執(zhí)行本地事務(wù)
          ????????System.out.println("TXProducerListener 開始執(zhí)行本地事務(wù)。。。");
          ????????RocketMQLocalTransactionState?result;
          ????????try?{
          ????????????//?模擬業(yè)務(wù)處理(?如:創(chuàng)建訂單?)
          ????????????//?int?i?=?1?/?0;??//模擬異常
          ????????????result?=?RocketMQLocalTransactionState.COMMIT;??//?成功
          ????????}?catch?(Exception?e)?{
          ????????????System.out.println("本地事務(wù)執(zhí)行失敗。。。");
          ????????????result?=?RocketMQLocalTransactionState.ROLLBACK;
          ????????}
          ????????return?result;
          ????}

          ????@Override
          ????public?RocketMQLocalTransactionState?checkLocalTransaction(Message?msg)?{
          ????????//?檢查本地事務(wù)(?例如檢查下訂單是否成功?)
          ????????System.out.println("檢查本地事務(wù)。。。");
          ????????RocketMQLocalTransactionState?result;
          ????????try?{
          ????????????//模擬業(yè)務(wù)處理(?根據(jù)檢查結(jié)果,決定是COMMIT或ROLLBACK?)
          ????????????result?=?RocketMQLocalTransactionState.COMMIT;
          ????????}?catch?(Exception?e)?{
          ????????????//?異常就回滾
          ????????????System.out.println("檢查本地事務(wù)?error");
          ????????????result?=?RocketMQLocalTransactionState.ROLLBACK;
          ????????}
          ????????return?result;
          ????}

          }

          演示代碼地址

          https://github.com/aalansehaiyang/spring-boot-bulking??

          模塊:spring-boot-bulking-rocketmq

          面試官一般喜歡考察哪些知識(shí)點(diǎn)

          1、如何保證順序消息?

          順序由producer發(fā)送到broker的消息隊(duì)列是滿足FIFO的,所以發(fā)送是順序的,單個(gè)queue里的消息是順序的。多個(gè)Queue同時(shí)消費(fèi)是無法絕對(duì)保證消息的有序性的。所以,同一個(gè)topic,同一個(gè)queue,發(fā)消息的時(shí)候一個(gè)線程發(fā)送消息,消費(fèi)的時(shí)候一個(gè)線程去消費(fèi)一個(gè)queue里的消息。

          2、怎么保證消息發(fā)到同一個(gè)queue里?

          RocketMQ給我們提供了MessageQueueSelector接口,可以重寫里面的接口,實(shí)現(xiàn)自己的算法,比如判斷i%2==0,那就發(fā)送消息到queue1否則發(fā)送到queue2。

          3、如何實(shí)現(xiàn)消息過濾?

          有兩種方案,一種是在broker端按照Consumer的去重邏輯進(jìn)行過濾,這樣做的好處是避免了無用的消息傳輸?shù)紺onsumer端,缺點(diǎn)是加重了Broker的負(fù)擔(dān),實(shí)現(xiàn)起來相對(duì)復(fù)雜。另一種是在Consumer端過濾,比如按照消息設(shè)置的tag去重,這樣的好處是實(shí)現(xiàn)起來簡單,缺點(diǎn)是有大量無用的消息到達(dá)了Consumer端只能丟棄不處理。

          4、如果由于網(wǎng)絡(luò)等原因,多條重復(fù)消息投遞到了Consumer端,你怎么進(jìn)行消息去重?

          這個(gè)得先說下消息的冪等性原則:就是用戶對(duì)于同一種操作發(fā)起的多次請(qǐng)求的結(jié)果是一樣的,不會(huì)因?yàn)椴僮髁硕啻尉彤a(chǎn)生不一樣的結(jié)果。只要保持冪等性,不管來多少條消息,最后處理結(jié)果都一樣,需要Consumer端自行實(shí)現(xiàn)。

          去重的方案:因?yàn)槊總€(gè)消息都有一個(gè)MessageId, 保證每個(gè)消息都有一個(gè)唯一鍵,可以是數(shù)據(jù)庫的主鍵或者唯一約束,也可以是Redis緩存中的鍵,當(dāng)消費(fèi)一條消息前,先檢查數(shù)據(jù)庫或緩存中是否存在這個(gè)唯一鍵,如果存在就不再處理這條消息,如果消費(fèi)成功,要保證這個(gè)唯一鍵插入到去重表中。

          5、RocketMQ是怎么實(shí)現(xiàn)分布式事務(wù)消息的?

          • Producer向broker發(fā)送半消息
          • Producer端收到響應(yīng),消息發(fā)送成功,此時(shí)消息是半消息,標(biāo)記為“不可投遞”狀態(tài),Consumer消費(fèi)不了。
          • Producer端執(zhí)行本地事務(wù)。
          • 正常情況本地事務(wù)執(zhí)行完成,Producer向Broker發(fā)送Commit/Rollback,如果是Commit,Broker端將半消息標(biāo)記為正常消息,Consumer可以消費(fèi),如果是Rollback,Broker丟棄此消息。
          • 異常情況,Broker端遲遲等不到二次確認(rèn)。在一定時(shí)間后,會(huì)查詢所有的半消息,然后到Producer端查詢半消息的執(zhí)行情況。
          • Producer 端查詢本地事務(wù)的狀態(tài)
          • 根據(jù)事務(wù)的狀態(tài)提交commit/rollback到broker端。

          6、從Producer角度分析,如何確保消息成功發(fā)送到了Broker?

          • 采用同步發(fā)送,即發(fā)送一條數(shù)據(jù)等到接受者返回響應(yīng)之后再發(fā)送下一個(gè)數(shù)據(jù)包。如果返回響應(yīng)OK,表示消息成功發(fā)送到了broker,狀態(tài)超時(shí)或者失敗都會(huì)觸發(fā)二次重試。MQ Server端會(huì)有冪等控制。

          • 可以采用分布式事務(wù)消息的投遞方式。

          • 如果一條消息發(fā)送之后超時(shí),也可以通過查詢?nèi)罩镜腁PI,來檢查是否在Broker存儲(chǔ)成功。總的來說,Producer還是采用同步發(fā)送來保證的。

          7、從Broker角度分析,如何確保消息持久化?

          • 消息只要持久化到CommitLog(日志文件)中,即使Broker宕機(jī),未消費(fèi)的消息也能重新恢復(fù)再消費(fèi)。
          • Broker的刷盤機(jī)制:同步刷盤和異步刷盤,不管哪種刷盤都可以保證消息一定存儲(chǔ)在page cache,但是同步刷盤更可靠,它是Producer發(fā)送消息后等數(shù)據(jù)持久化到磁盤之后再返回響應(yīng)給Producer。
          • Broker支持多Master多Slave同步雙寫和多Master多Slave異步復(fù)制模式,消息都是發(fā)送給Master主機(jī),但是消費(fèi)既可以從Master消費(fèi),也可以從Slave消費(fèi)。同步雙寫模式可以保證即使Master宕機(jī),消息肯定在Slave中有備份,保證了消息不會(huì)丟失。

          8、從Consumer角度分析,如何保證消息被成功消費(fèi)?

          Consumer自身維護(hù)了個(gè)持久化的offset(對(duì)應(yīng)Message Queue里的min offset),用來標(biāo)記已經(jīng)成功消費(fèi)且已經(jīng)成功發(fā)回Broker的消息下標(biāo)。如果Consumer消費(fèi)失敗,它會(huì)向Broker發(fā)回消費(fèi)失敗的狀態(tài),發(fā)回成功才會(huì)更新自己的offset。如果發(fā)回給broker時(shí)broker掛掉了,Consumer會(huì)定時(shí)重試,如果Consumer和Broker一起掛掉了,消息還在Broker端存儲(chǔ)著,Consumer端的offset也是持久化的,重啟之后繼續(xù)拉取offset之前的消息進(jìn)行消費(fèi)。


          推薦閱讀:億級(jí)系統(tǒng)的Redis緩存如何設(shè)計(jì)學(xué)會(huì)這10個(gè)設(shè)計(jì)原則,離架構(gòu)師又進(jìn)了一步
          Spring Boot 集成 Kafka

          關(guān)號(hào)互聯(lián)網(wǎng)全棧架構(gòu)價(jià)。

          瀏覽 82
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日日A片| 精品成人无码麻豆 | 亚洲国产在人播放首页 | 欧美成人在线视频观看 | 九一成人电影。 |