推與拉,RocketMQ消息消費(fèi)的那些姿勢(shì)
消息消費(fèi)方式,一般來說有兩種姿勢(shì),我們往往稱之“推”模式(Push)以及“拉”模式(Pull),如圖所示。

“推”模式,從模型上來說,消費(fèi)者訂閱了消息中間件中的Topic(主題),當(dāng)該主題有接收到生產(chǎn)者發(fā)送的消息之后,消息中間件會(huì)主動(dòng)將消息推送(push)至訂閱了該主題的消費(fèi)者。這種由消息中間件推送到消費(fèi)者的方式,稱為“推”模式。
“拉”模式,從模型上來說,消費(fèi)者向消息中間件發(fā)起消息拉取請(qǐng)求,消息中間件接收到拉取請(qǐng)求之后,將消息進(jìn)行打包之后返回給消費(fèi)者。這種方式下,消息是消費(fèi)者主動(dòng)向消息中間件進(jìn)行拉取的,這種由消費(fèi)者主動(dòng)向消息中間件拉取的方式,稱為“拉”模式。
?這里需要注意的是,我們要明白實(shí)現(xiàn)完全的推模式,對(duì)于MQ的broker而言需要付出較多的性能,由于broker需要主動(dòng)與消費(fèi)者進(jìn)程建立連接并且需要主動(dòng)探查消費(fèi)者進(jìn)程的健康狀態(tài),相當(dāng)于broker對(duì)消費(fèi)者進(jìn)程構(gòu)成了反向依賴,這便很大程度上增加了broker實(shí)現(xiàn)復(fù)雜度。
簡(jiǎn)單提一句,對(duì)于RocketMQ而言,推方式消費(fèi)消息其本質(zhì)實(shí)現(xiàn)其實(shí)是長(zhǎng)輪詢的拉,相關(guān)文檔可以自行查找資料或者翻看本公眾號(hào)的歷史文章。
?
基于“推”模式消費(fèi)消息
首先介紹“推”模式下是如何消費(fèi)消息的,以RocketMQ為例,代碼如下:
public?class?PushConsumerDemo?{
??public?static?void?main(String[]?args)?throws?InterruptedException,?MQClientException?{
????DefaultMQPushConsumer?consumer?=?new?DefaultMQPushConsumer("push_consumer_group");
????
????consumer.setNamesrvAddr("192.168.1.106");
????
????consumer.subscribe("PUSH_TOPIC",?"*");
????
????consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
????
????consumer.setConsumeTimestamp(new?SimpleDateFormat("yyyyMMddHHmmss").format(new?Date()));
????
????consumer.registerMessageListener(new?MessageListenerConcurrently()?{
??????@Override
??????public?ConsumeConcurrentlyStatus?consumeMessage(List?msgs,?ConsumeConcurrentlyContext?context)?{
????
????????System.out.printf("%s?Receive?New?Messages:?%s?%n",?Thread.currentThread().getName(),?msgs);
????
????????return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????
??????}
????
????});
????
????consumer.start();
????
????System.out.printf("Consumer?Started.%n");
??}
}
RocketMQ中通過DefaultMQPushConsumer實(shí)現(xiàn)“推”模式的消息消費(fèi),這種方式的特點(diǎn)在于實(shí)時(shí)性好,只要MQ服務(wù)端有消息到來,就會(huì)實(shí)時(shí)性的推送給消費(fèi)者進(jìn)行消費(fèi)。
從章節(jié)開始的介紹中我們得知,在“推”模式下,服務(wù)端需要感知與它建立鏈接的客戶端,這意味著服務(wù)端主動(dòng)推送消息的過程中,需要對(duì)消息做額外的處理,以便能夠及時(shí)將消息分發(fā)給客戶端。這些計(jì)算邏輯會(huì)為消息中間件的服務(wù)端帶來額外的負(fù)擔(dān),因此RocketMQ通過“長(zhǎng)輪詢”的方式,巧妙的解決了這個(gè)問題。
長(zhǎng)輪詢本質(zhì)上仍舊是輪詢,它與輪詢不同之處在于,當(dāng)服務(wù)端接收到客戶端的請(qǐng)求后服務(wù)端不會(huì)立即將數(shù)據(jù)返回給客戶端,而是會(huì)先將這個(gè)請(qǐng)求hold住,判斷服務(wù)器端數(shù)據(jù)是否有更新。如果有更新,則對(duì)客戶端進(jìn)行響應(yīng),如果一直沒有數(shù)據(jù),則它會(huì)在長(zhǎng)輪詢超時(shí)時(shí)間之前一直hold住請(qǐng)求并檢測(cè)是否有數(shù)據(jù)更新,直到有數(shù)據(jù)或者超時(shí)后才返回。
“長(zhǎng)輪詢”的效果基本上與服務(wù)端實(shí)時(shí)推送相似,兼顧了實(shí)時(shí)性,降低了純“推”模式實(shí)現(xiàn)的復(fù)雜度。
基于“拉”模式消費(fèi)消息
接著介紹“拉”模式下是如何消費(fèi)消息的,還是以RocektMQ為例,代碼如下:
public?class?PullConsumerDemo?{
??private?static?final?Map?OFFSE_TABLE?=?new?HashMap();
??public?static?void?main(String[]?args)?throws?MQClientException?{
????DefaultMQPullConsumer?consumer?=?new?DefaultMQPullConsumer("PULL_CONSUMER_GROUP");
????consumer.setNamesrvAddr("127.0.0.1:9876");
????consumer.start();
?
????Set?mqs?=?consumer.fetchSubscribeMessageQueues("PULL-TOPIC");
????for?(MessageQueue?mq?:?mqs)?{
??????System.out.printf("Consume?from?the?queue:?%s%n",?mq);
??????SINGLE_MQ:
??????while?(true)?{
????????try?{
??????????PullResult?pullResult?=
????????????consumer.pullBlockIfNotFound(mq,?null,?getMessageQueueOffset(mq),?32);
??????????System.out.printf("%s%n",?pullResult);
??????????putMessageQueueOffset(mq,?pullResult.getNextBeginOffset());
??????????switch?(pullResult.getPullStatus())?{
????????????case?FOUND:
??????????????break;
?????????????case?NO_MATCHED_MSG:
??????????????break;
????????????case?NO_NEW_MSG:
??????????????break?SINGLE_MQ;
????????????case?OFFSET_ILLEGAL:
??????????????break;
????????????default:
??????????????break;
??????????}
????????}?catch?(Exception?e)?{
??????????e.printStackTrace();
????????}
??????}
????}
?
????consumer.shutdown();
??}
?
??private?static?long?getMessageQueueOffset(MessageQueue?mq)?{
????Long?offset?=?OFFSE_TABLE.get(mq);
????if?(offset?!=?null)?{
??????return?offset;
????}
????return?0;
??}
?
??private?static?void?putMessageQueueOffset(MessageQueue?mq,long?offset){
????OFFSE_TABLE.put(mq,?offset);
??}
}
RocketMQ通過DefaultMQPullConsumer實(shí)現(xiàn)了“拉”模式的消息消費(fèi)。
(1)需要定義消費(fèi)者組,實(shí)例化一個(gè)DefaultMQPullConsumer消費(fèi)者對(duì)象,并指定消費(fèi)者組;
(2)接著為消費(fèi)者設(shè)置NameServer地址,保證消費(fèi)者客戶端能夠從NameServer獲取到broker地址,從而執(zhí)行消息消費(fèi)流程;
(3)通過consumer.fetchSubscribeMessageQueues(TOPIC)方法獲取指定TOPIC下的所有隊(duì)列,默認(rèn)有4個(gè);
(4)接著需要對(duì)獲取到MessageQueue集合進(jìn)行遍歷,拉取數(shù)據(jù)并執(zhí)行具體的消費(fèi)過程;
(5)通過while(true) 不間斷地從隊(duì)列中拉取數(shù)據(jù),默認(rèn)情況下每次拉取32條,這里需要顯式地傳入拉取開始的offset,通過getMessageQueueOffset(mq)方法獲取到開始拉取的offset,從持久化設(shè)施中得到對(duì)應(yīng)MessageQueue的拉取進(jìn)度(可以認(rèn)為是消費(fèi)進(jìn)度);
拉取結(jié)束后,在持久化設(shè)施(這里是一個(gè)Map)中保存下次拉取開始時(shí)的offset,也就是本次拉取結(jié)束的下一個(gè)offset(通過pullResult.getNextBeginOffset()獲取); (6)需要注意的是,每次拉取成功之后都需要顯式調(diào)用putMessageQueueOffset()方法,刷新對(duì)應(yīng)隊(duì)列MessageQueue的拉取進(jìn)度。
總結(jié)來說,RocketMQ中的“拉”模式消費(fèi)方式需要開發(fā)者顯式維護(hù)消費(fèi)進(jìn)度,每次消費(fèi)成功之后都需要更新消費(fèi)進(jìn)度,并進(jìn)行存儲(chǔ),比如這里的案例就是通過Map存儲(chǔ)了隊(duì)列的消費(fèi)進(jìn)度(offset)。
假如由于開發(fā)者的疏忽忘記保存offset,則每次都會(huì)從第一條消息進(jìn)行拉取,這樣很容易造成消息的重復(fù)消費(fèi)。如果是生產(chǎn)環(huán)境沒有做冪等則后果除了會(huì)造成大量業(yè)務(wù)邏輯的重復(fù)執(zhí)行還會(huì)造成業(yè)務(wù)的積壓從而導(dǎo)致線上業(yè)務(wù)的卡頓甚至雪崩。
另外還需要通過額外的存儲(chǔ)手段對(duì)offset進(jìn)行保存(推薦使用MySQL或者Redis進(jìn)行存儲(chǔ)),并且需要保證存儲(chǔ)設(shè)施的穩(wěn)定可靠,否則還是會(huì)引起重復(fù)消費(fèi)的問題。
推/拉模式的對(duì)比與使用建議
基于“推”模式消費(fèi)消息,實(shí)時(shí)性好,只要消息進(jìn)入消息中間件就可以即時(shí)被消費(fèi)者感知并進(jìn)行消費(fèi);缺點(diǎn)在于“推”模式需要消息中間件進(jìn)行額外的計(jì)算和消費(fèi)者的維護(hù)工作,因此可能引起消息中間件服務(wù)端的機(jī)器CPU負(fù)載升高;
而“拉”模式消費(fèi)消息,消費(fèi)者能夠自主控制拉取的頻率,拉取的數(shù)量,因此對(duì)消息中間件的機(jī)器而言,負(fù)載較低;但是“拉”模式由于是定時(shí)發(fā)起的消息拉取請(qǐng)求,因此實(shí)時(shí)性較弱。而且“拉”模式下還需要消費(fèi)者自行維護(hù)消費(fèi)進(jìn)度,相比而言“推”模式的消息消費(fèi)方式則不需要客戶端主動(dòng)維護(hù)消費(fèi)進(jìn)度(廣播消費(fèi)模式除外)。
因此對(duì)推/拉模式的使用建議如下:
(1)如果追求消息消費(fèi)的實(shí)時(shí)性,則推薦使用“推”模式消費(fèi)消息,但是要注意盡量提高消息中間件服務(wù)器的配置,并添加必要的監(jiān)控以感知服務(wù)器的性能指標(biāo)變化;
(2)如果想要靈活控制消費(fèi)頻率,消息拉取數(shù)量,則推薦使用“拉”模式消費(fèi)消息。
