10分鐘掌握RocketMQ的核心知識(shí)
Spring Boot 作為主流微服務(wù)框架,擁有成熟的社區(qū)生態(tài)。市場應(yīng)用廣泛,為了方便大家,整理了一個(gè)基于spring boot的常用中間件快速集成入門系列手冊(cè),涉及RPC、緩存、消息隊(duì)列、分庫分表、注冊(cè)中心、分布式配置等常用開源組件,大概有幾十篇文章,陸續(xù)會(huì)開放出來,感興趣同學(xué)請(qǐng)?zhí)崆瓣P(guān)注&收藏
前言
Apache RocketMQ 是阿里開源的一款高性能、高吞吐量的分布式消息中間件。
RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負(fù)責(zé)生產(chǎn)消息,Consumer 負(fù)責(zé)消費(fèi)消息,Broker 負(fù)責(zé)存儲(chǔ)消息。每個(gè) Broker 可以存儲(chǔ)多個(gè)Topic的消息,每個(gè)Topic的消息也可以分片存儲(chǔ)于集群中的不同的Broker Group。
快速安裝:https://rocketmq.apache.org/docs/quick-start/
源代碼:https://github.com/apache/rocketmq-spring
主要功能:
1、業(yè)務(wù)解耦。采用發(fā)布訂閱模式,生產(chǎn)端發(fā)送消息到MQ Server,下游的消費(fèi)端訂閱接收消息。異步形式,系統(tǒng)解耦,提升系統(tǒng)擴(kuò)展性
2、削峰限流。由于消息中間件的吞吐量很高,過量的請(qǐng)求會(huì)暫時(shí)放在 MQ server,下游慢慢消費(fèi),避免過量請(qǐng)求沖垮系統(tǒng)
3、億級(jí)消息的堆積能力,單個(gè)隊(duì)列中的百萬級(jí)消息的累積容量。
4、高可用性:Broker服務(wù)器支持多Master多Slave的同步雙寫以及Master多Slave的異步復(fù)制模式,其中同步雙寫可保證消息不丟失。
5、高可靠性:生產(chǎn)者將消息發(fā)送到Broker端有三種方式,同步、異步和單向。Broker在對(duì)于消息刷盤有兩種策略:同步刷盤和異步刷盤,其中同步刷盤可以保證消息成功的存儲(chǔ)到磁盤中。消費(fèi)者的消費(fèi)模式也有集群消費(fèi)和廣播消費(fèi)兩種,默認(rèn)集群消費(fèi),如果集群模式中消費(fèi)者掛了,一個(gè)組里的其他消費(fèi)者會(huì)接替其消費(fèi)。
6、分布式事務(wù)消息:這里是采用半消息確認(rèn)和消息回查機(jī)制來保證分布式事務(wù)消息。
7、支持消息過濾:建議采用消費(fèi)者業(yè)務(wù)端的tag過濾
8、支持順序消息:消息在Broker中是采用隊(duì)列的FIFO模式存儲(chǔ)的,也就是發(fā)送是順序的,只要保證消費(fèi)的順序性即可。
9、支持定時(shí)消息和延遲消息:Broker中由定時(shí)消息的機(jī)制,消息發(fā)送到Broker中,不會(huì)立即被Consumer消費(fèi),會(huì)等到一定的時(shí)間才被消費(fèi)。延遲消息也是一樣,延遲一定時(shí)間之后才會(huì)被Consumer消費(fèi)。
核心組件:
1、Namesrv
Namesrv充當(dāng)路由消息的提供者。Namesrv是一個(gè)幾乎無狀態(tài)節(jié)點(diǎn),多個(gè)Namesrv實(shí)例組成集群,但相互獨(dú)立,沒有信息交換。Namesrv主要作用是:為producer和consumer提供關(guān)于topic的路由信息。管理broker節(jié)點(diǎn):監(jiān)控更新broker的實(shí)時(shí)狀態(tài)。路由注冊(cè)、路由刪除(故障剔除)。
2、Broker
負(fù)責(zé)存儲(chǔ)消息、轉(zhuǎn)發(fā)消息。Broker是以group為單位提供服務(wù)。一個(gè)group里面分Master和Slave。Master和Slave存儲(chǔ)的數(shù)據(jù)一樣,slave從master同步數(shù)據(jù)(同步雙寫或異步復(fù)制看配置)。一個(gè)Master可以對(duì)應(yīng)多個(gè)Slave,一個(gè)Slave只能對(duì)應(yīng)一個(gè)Master。Master與Slave的對(duì)應(yīng)關(guān)系通過指定相同的BrokerName、不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。
基本概念:
1、主題(Topic) 表示一類消息的集合,每個(gè)主題包含若干條消息,每條消息只能屬于一個(gè)主題,是RocketMQ進(jìn)行消息訂閱的基本單位。每個(gè)topic可分為若干個(gè)分區(qū)(queue)
2、生產(chǎn)者組(Producer Group) 同一類Producer的集合,這類Producer發(fā)送同一類消息且發(fā)送邏輯一致。如果發(fā)送的是事務(wù)消息且原始生產(chǎn)者在發(fā)送之后崩潰,則Broker服務(wù)器會(huì)聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實(shí)例以提交或回溯消費(fèi)。
3、消費(fèi)者組(Consumer Group) 同一類Consumer的集合,這類Consumer通常消費(fèi)同一類消息且消費(fèi)邏輯一致。消費(fèi)者組使得在消息消費(fèi)方面,實(shí)現(xiàn)負(fù)載均衡和容錯(cuò)的目標(biāo)變得非常容易。要注意的是,消費(fèi)者組的消費(fèi)者實(shí)例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(fèi)(Clustering)和廣播消費(fèi)(Broadcasting)。
4、普通順序消息(Normal Ordered Message) 普通順序消費(fèi)模式下,消費(fèi)者通過同一個(gè)消費(fèi)隊(duì)列收到的消息是有順序的,不同消息隊(duì)列收到的消息則可能是無順序的。
5、嚴(yán)格順序消息(Strictly Ordered Message) 嚴(yán)格順序消息模式下,消費(fèi)者收到的所有消息均是有序的。
6、消息(Message) 消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費(fèi)數(shù)據(jù)的最小單位,每條消息必須屬于一個(gè)主題。RocketMQ中每個(gè)消息擁有唯一的Message ID,且可以攜帶具有業(yè)務(wù)標(biāo)識(shí)的Key。系統(tǒng)提供了通過Message ID和Key查詢消息的功能。
7、標(biāo)簽(Tag) 為消息設(shè)置的標(biāo)志,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務(wù)單元的消息,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽。標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對(duì)不同子主題的不同消費(fèi)邏輯,實(shí)現(xiàn)更好的擴(kuò)展性。
RocketMQ 特性:
- 同步發(fā)送
- 異步發(fā)送
- 單向方式發(fā)送
- 發(fā)送有序消息
- 發(fā)送批量消息
- 發(fā)送事務(wù)信息
- 發(fā)送延遲消息
- 消費(fèi)有序消息
- 使用標(biāo)簽或sql92表達(dá)式過濾消息
- 支持消息跟蹤
- 支持身份驗(yàn)證和授權(quán)
- 支持請(qǐng)求-回復(fù)消息交換模式
- 消費(fèi)消息支持推、拉模式
代碼演示
外部依賴:
spring boot 已經(jīng)為RocketMQ 封裝了starter組件,只需在 pom.xml 文件中添加jar版本依賴即可:
<dependency>
????<groupId>org.apache.rocketmq</groupId>
????<artifactId>rocketmq-spring-boot-starter</artifactId>
????<version>2.0.3</version>
</dependency>
配置文件:
在配置文件 application.yaml 中配置 RocketMQ 的相關(guān)參數(shù),具體內(nèi)容如下:
rocketmq:
??name-server:?localhost:9876
??consumer:
????topic:?maker-order-topic
????group:?my-group1
??producer:
????group:?p-my-group1
消息生產(chǎn)端:
@Resource
private?RocketMQTemplate?rocketMQTemplate;
private?static?String?makerOrderTopic?=?"maker-order-topic";
@GetMapping("/send_make_order_message")
public?Object?send_make_order_message()?{
????try?{
????????Long?orderId?=?Long.valueOf(new?Random().nextInt(1000000));
????????OrderModel?orderModel?=?OrderModel.builder().orderId(orderId).buyerUid(200000L).amount(26.8).shippingAddress("上海").build();
????????SendResult?sendResult?=?rocketMQTemplate.syncSend(makerOrderTopic,?orderModel);
????????System.out.printf("Send?message?to?topic?%s?,?sendResult=%s?%n",?makerOrderTopic,?sendResult);
????????return?"消息發(fā)送成功";
????}?catch?(Exception?e)?{
????????e.printStackTrace();
????????return?"消息發(fā)送失敗";
????}
}
消息消費(fèi)端:
@Service
@RocketMQMessageListener(nameServer?=?"${rocketmq.name-server}",?topic?=?"${rocketmq.consumer.topic}",?consumerGroup?=?"${rocketmq.consumer.group}")
public?class?OrderConsumer?implements?RocketMQListener<OrderModel>?{
????@Override
????public?void?onMessage(OrderModel?orderModel)?{
????????System.out.printf("consumer?received?message:?%s?\n",?JSON.toJSONString(orderModel));
????}
}
操作演示
瀏覽器訪問:http://localhost:9071/send_make_order_message,模擬生產(chǎn)端發(fā)送消息到MQ Server中。
消費(fèi)端接收消息日志:
Send?message?to?topic?maker-order-topic?,?sendResult=SendResult?[sendStatus=SEND_OK,?msgId=C0A80069816F14DAD5DC73A75B9F0014,?offsetMsgId=C0A8006900002A9F0000000000058841,?messageQueue=MessageQueue?[topic=maker-order-topic,?brokerName=192.168.0.105,?queueId=2],?queueOffset=0]?
consumer?received?message:?{"amount":26.8,"buyerUid":200000,"orderId":895586,"shippingAddress":"上海"}?
其他消息類型如何發(fā)送
1、同步發(fā)送
同步發(fā)送是指消息發(fā)送方發(fā)出一條消息后,在收到服務(wù)端返回響應(yīng)后,線程才會(huì)執(zhí)行后續(xù)代碼
OrderModel?orderModel?=?mockOrderModel();
Message?message?=?new?Message(makerOrderTopic,?"TageA",?JSON.toJSONString(orderModel).getBytes());
SendResult?sendResult?=?rocketMQTemplate.getProducer().send(message);
2、異步發(fā)送
異步發(fā)送是指發(fā)送方發(fā)出一條消息后,不需要等服務(wù)端返回響應(yīng)。異步發(fā)送,需要實(shí)現(xiàn)異步發(fā)送回調(diào)接口(SendCallback),通過回調(diào)接口接收服務(wù)端響應(yīng),并處理結(jié)果
OrderModel?orderModel?=?mockOrderModel();
rocketMQTemplate.asyncSend(makerOrderTopic,?orderModel,?new?SendCallback()?{
????@Override
????public?void?onSuccess(SendResult?sendResult)?{
????????System.out.println("消息發(fā)送成功,msgId="?+?sendResult.getMsgId());
????}
????@Override
????public?void?onException(Throwable?throwable)?{
????????System.out.println("發(fā)送失敗,"?+?throwable);
????}
});
3、順序消息
對(duì)于指定的一個(gè)Topic,所有消息根據(jù)Sharding Key分區(qū)。同一個(gè)分區(qū)內(nèi)的消息按照嚴(yán)格的FIFO順序進(jìn)行發(fā)布和消費(fèi)。Sharding Key是順序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段,和普通消息的Key是完全不同的概念。
比如:電商的訂單創(chuàng)建,以訂單ID作為Sharding Key,那么同一個(gè)訂單相關(guān)的消息,如創(chuàng)建訂單、付款、發(fā)貨、訂單退款消息、訂單物流消息都會(huì)按照發(fā)布的先后順序來消費(fèi)。
for?(long?orderId?=?0;?orderId?<?20;?orderId++)?{
????String?shardingKey?=?String.valueOf(orderId?%?5);
????OrderModel?orderModel?=?OrderModel.builder().orderId(orderId).build();
????SendResult?sendResult?=?rocketMQTemplate.syncSendOrderly(makerOrderTopic,?orderModel,?shardingKey);
????if?(sendResult?!=?null)?{
????????System.out.println(orderId?+?"?,發(fā)送成功");
????}
}
4、延時(shí)消息
Producer將消息發(fā)送到消息隊(duì)列RocketMQ服務(wù)端,但并不期望立馬投遞這條消息,而是延遲一定時(shí)間后才投遞到Consumer進(jìn)行消費(fèi),該消息稱為延時(shí)消息。
OrderModel?orderModel?=?mockOrderModel();
org.springframework.messaging.Message?message?=?MessageBuilder.withPayload(JSON.toJSONString(orderModel).getBytes()).build();
//延時(shí)等級(jí)?3,?這個(gè)消息將在10s之后發(fā)送,現(xiàn)在只支持固定的幾個(gè)時(shí)間值
//delayTimeLevel?=?"1s?5s?10s?30s?1m?2m?3m?4m?5m?6m?7m?8m?9m?10m?20m?30m?1h?2h";
SendResult?sendResult?=?rocketMQTemplate.syncSend(makerOrderTopic,?message,?8000,?3);
5、事務(wù)消息
RocketMQ提供類似X/Open XA的分布式事務(wù)功能,通過消息隊(duì)列RocketMQ事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。
由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,消息隊(duì)列RocketMQ服務(wù)端通過掃描發(fā)現(xiàn)某條消息長期處于“半事務(wù)消息”時(shí),主動(dòng)向生產(chǎn)者查詢?cè)撓⒌淖罱K狀態(tài)(Commit或Rollback),該過程稱之為消息回查。
典型場景:在電商購物車下單時(shí),涉及到購物車系統(tǒng)和交易系統(tǒng),這兩個(gè)系統(tǒng)之間的數(shù)據(jù)最終一致性可以通過分布式事務(wù)消息的異步處理實(shí)現(xiàn)。在這種場景下,交易系統(tǒng)是最為核心的系統(tǒng),需要最大限度地保證下單成功。而購物車系統(tǒng)只需要訂閱消息隊(duì)列RocketMQ的交易訂單消息,做相應(yīng)的業(yè)務(wù)處理,即可保證最終的數(shù)據(jù)一致性。
發(fā)送步驟:
- 發(fā)送方將半事務(wù)消息發(fā)送至MQ Server。
- MQ服務(wù)端將消息持久化成功之后,向發(fā)送方返回Ack確認(rèn)消息已經(jīng)發(fā)送成功,此時(shí)消息為半事務(wù)消息。
- 發(fā)送方開始執(zhí)行本地事務(wù)邏輯
- 發(fā)送方根據(jù)本地事務(wù)執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn)(Commit或Rollback),服務(wù)端收到
Commit狀態(tài)則將半事務(wù)消息標(biāo)記為可投遞,訂閱方將收到該消息;服務(wù)端收到Rollback狀態(tài)則刪除半事務(wù)消息,訂閱方不會(huì)收到該消息。
回查步驟:
- 在斷網(wǎng)或者應(yīng)用重啟的特殊情況下,上述步驟4提交的二次確認(rèn)最終未到達(dá)服務(wù)端,經(jīng)過固定時(shí)間后服務(wù)端將對(duì)該消息發(fā)起消息回查。
- 發(fā)送方收到消息回查后,需要檢查對(duì)應(yīng)消息的本地事務(wù)執(zhí)行的最終結(jié)果。
- 發(fā)送方根據(jù)檢查得到的本地事務(wù)的最終狀態(tài)再次提交二次確認(rèn),服務(wù)端仍按照步驟4對(duì)半事務(wù)消息進(jìn)行操作。
發(fā)送半事務(wù)消息,示例代碼如下:
OrderModel?orderModel?=?mockOrderModel();
org.springframework.messaging.Message?message?=?MessageBuilder.withPayload(JSON.toJSONString(orderModel)).build();
TransactionSendResult?transactionSendResult?=?rocketMQTemplate.sendMessageInTransaction("tx_order_message",?makerOrderTopic,?message,?null);
SendStatus?sendStatus?=?transactionSendResult.getSendStatus();
LocalTransactionState?localTransactionState?=?transactionSendResult.getLocalTransactionState();
System.out.println("send message status:?"?+?sendStatus?+?" , localTransactionState:?"?+?localTransactionState);
編寫RocketMQLocalTransactionListener接口實(shí)現(xiàn)類,實(shí)現(xiàn)執(zhí)行本地事務(wù)和事務(wù)回查兩個(gè)方法。
@Component
@RocketMQTransactionListener(txProducerGroup?=?"tx_order_message")
public?class?TXProducerListener?implements?RocketMQLocalTransactionListener?{
????@Override
????public?RocketMQLocalTransactionState?executeLocalTransaction(Message?message,?Object?arg)?{
????????//?執(zhí)行本地事務(wù)
????????System.out.println("TXProducerListener 開始執(zhí)行本地事務(wù)。。。");
????????RocketMQLocalTransactionState?result;
????????try?{
????????????//?模擬業(yè)務(wù)處理(?如:創(chuàng)建訂單?)
????????????//?int?i?=?1?/?0;??//模擬異常
????????????result?=?RocketMQLocalTransactionState.COMMIT;??//?成功
????????}?catch?(Exception?e)?{
????????????System.out.println("本地事務(wù)執(zhí)行失敗。。。");
????????????result?=?RocketMQLocalTransactionState.ROLLBACK;
????????}
????????return?result;
????}
????@Override
????public?RocketMQLocalTransactionState?checkLocalTransaction(Message?msg)?{
????????//?檢查本地事務(wù)(?例如檢查下訂單是否成功?)
????????System.out.println("檢查本地事務(wù)。。。");
????????RocketMQLocalTransactionState?result;
????????try?{
????????????//模擬業(yè)務(wù)處理(?根據(jù)檢查結(jié)果,決定是COMMIT或ROLLBACK?)
????????????result?=?RocketMQLocalTransactionState.COMMIT;
????????}?catch?(Exception?e)?{
????????????//?異常就回滾
????????????System.out.println("檢查本地事務(wù)?error");
????????????result?=?RocketMQLocalTransactionState.ROLLBACK;
????????}
????????return?result;
????}
}
演示代碼地址
https://github.com/aalansehaiyang/spring-boot-bulking??
模塊:spring-boot-bulking-rocketmq
面試官一般喜歡考察哪些知識(shí)點(diǎn)
1、如何保證順序消息?
順序由producer發(fā)送到broker的消息隊(duì)列是滿足FIFO的,所以發(fā)送是順序的,單個(gè)queue里的消息是順序的。多個(gè)Queue同時(shí)消費(fèi)是無法絕對(duì)保證消息的有序性的。所以,同一個(gè)topic,同一個(gè)queue,發(fā)消息的時(shí)候一個(gè)線程發(fā)送消息,消費(fèi)的時(shí)候一個(gè)線程去消費(fèi)一個(gè)queue里的消息。
2、怎么保證消息發(fā)到同一個(gè)queue里?
RocketMQ給我們提供了MessageQueueSelector接口,可以重寫里面的接口,實(shí)現(xiàn)自己的算法,比如判斷i%2==0,那就發(fā)送消息到queue1否則發(fā)送到queue2。
3、如何實(shí)現(xiàn)消息過濾?
有兩種方案,一種是在broker端按照Consumer的去重邏輯進(jìn)行過濾,這樣做的好處是避免了無用的消息傳輸?shù)紺onsumer端,缺點(diǎn)是加重了Broker的負(fù)擔(dān),實(shí)現(xiàn)起來相對(duì)復(fù)雜。另一種是在Consumer端過濾,比如按照消息設(shè)置的tag去重,這樣的好處是實(shí)現(xiàn)起來簡單,缺點(diǎn)是有大量無用的消息到達(dá)了Consumer端只能丟棄不處理。
4、如果由于網(wǎng)絡(luò)等原因,多條重復(fù)消息投遞到了Consumer端,你怎么進(jìn)行消息去重?
這個(gè)得先說下消息的冪等性原則:就是用戶對(duì)于同一種操作發(fā)起的多次請(qǐng)求的結(jié)果是一樣的,不會(huì)因?yàn)椴僮髁硕啻尉彤a(chǎn)生不一樣的結(jié)果。只要保持冪等性,不管來多少條消息,最后處理結(jié)果都一樣,需要Consumer端自行實(shí)現(xiàn)。
去重的方案:因?yàn)槊總€(gè)消息都有一個(gè)MessageId, 保證每個(gè)消息都有一個(gè)唯一鍵,可以是數(shù)據(jù)庫的主鍵或者唯一約束,也可以是Redis緩存中的鍵,當(dāng)消費(fèi)一條消息前,先檢查數(shù)據(jù)庫或緩存中是否存在這個(gè)唯一鍵,如果存在就不再處理這條消息,如果消費(fèi)成功,要保證這個(gè)唯一鍵插入到去重表中。
5、RocketMQ是怎么實(shí)現(xiàn)分布式事務(wù)消息的?
- Producer向broker發(fā)送半消息
- Producer端收到響應(yīng),消息發(fā)送成功,此時(shí)消息是半消息,標(biāo)記為“不可投遞”狀態(tài),Consumer消費(fèi)不了。
- Producer端執(zhí)行本地事務(wù)。
- 正常情況本地事務(wù)執(zhí)行完成,Producer向Broker發(fā)送Commit/Rollback,如果是Commit,Broker端將半消息標(biāo)記為正常消息,Consumer可以消費(fèi),如果是Rollback,Broker丟棄此消息。
- 異常情況,Broker端遲遲等不到二次確認(rèn)。在一定時(shí)間后,會(huì)查詢所有的半消息,然后到Producer端查詢半消息的執(zhí)行情況。
- Producer 端查詢本地事務(wù)的狀態(tài)
- 根據(jù)事務(wù)的狀態(tài)提交commit/rollback到broker端。
6、從Producer角度分析,如何確保消息成功發(fā)送到了Broker?
采用同步發(fā)送,即發(fā)送一條數(shù)據(jù)等到接受者返回響應(yīng)之后再發(fā)送下一個(gè)數(shù)據(jù)包。如果返回響應(yīng)OK,表示消息成功發(fā)送到了broker,狀態(tài)超時(shí)或者失敗都會(huì)觸發(fā)二次重試。MQ Server端會(huì)有冪等控制。
可以采用分布式事務(wù)消息的投遞方式。
如果一條消息發(fā)送之后超時(shí),也可以通過查詢?nèi)罩镜腁PI,來檢查是否在Broker存儲(chǔ)成功。總的來說,Producer還是采用同步發(fā)送來保證的。
7、從Broker角度分析,如何確保消息持久化?
- 消息只要持久化到CommitLog(日志文件)中,即使Broker宕機(jī),未消費(fèi)的消息也能重新恢復(fù)再消費(fèi)。
- Broker的刷盤機(jī)制:同步刷盤和異步刷盤,不管哪種刷盤都可以保證消息一定存儲(chǔ)在page cache,但是同步刷盤更可靠,它是Producer發(fā)送消息后等數(shù)據(jù)持久化到磁盤之后再返回響應(yīng)給Producer。
- Broker支持多Master多Slave同步雙寫和多Master多Slave異步復(fù)制模式,消息都是發(fā)送給Master主機(jī),但是消費(fèi)既可以從Master消費(fèi),也可以從Slave消費(fèi)。同步雙寫模式可以保證即使Master宕機(jī),消息肯定在Slave中有備份,保證了消息不會(huì)丟失。
8、從Consumer角度分析,如何保證消息被成功消費(fèi)?
Consumer自身維護(hù)了個(gè)持久化的offset(對(duì)應(yīng)Message Queue里的min offset),用來標(biāo)記已經(jīng)成功消費(fèi)且已經(jīng)成功發(fā)回Broker的消息下標(biāo)。如果Consumer消費(fèi)失敗,它會(huì)向Broker發(fā)回消費(fèi)失敗的狀態(tài),發(fā)回成功才會(huì)更新自己的offset。如果發(fā)回給broker時(shí)broker掛掉了,Consumer會(huì)定時(shí)重試,如果Consumer和Broker一起掛掉了,消息還在Broker端存儲(chǔ)著,Consumer端的offset也是持久化的,重啟之后繼續(xù)拉取offset之前的消息進(jìn)行消費(fèi)。
Spring Boot 集成 Kafka
歡迎關(guān)注微信公眾號(hào):互聯(lián)網(wǎng)全棧架構(gòu),收取更多有價(jià)值的信息。
