<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          推與拉,RocketMQ消息消費(fèi)的那些姿勢(shì)

          共 4680字,需瀏覽 10分鐘

           ·

          2022-05-14 17:17

          消息消費(fèi)方式,一般來說有兩種姿勢(shì),我們往往稱之“推”模式(Push)以及“拉”模式(Pull),如圖所示。

          “推”模式,從模型上來說,消費(fèi)者訂閱了消息中間件中的Topic(主題),當(dāng)該主題有接收到生產(chǎn)者發(fā)送的消息之后,消息中間件會(huì)主動(dòng)將消息推送(push)至訂閱了該主題的消費(fèi)者。這種由消息中間件推送到消費(fèi)者的方式,稱為“推”模式。

          “拉”模式,從模型上來說,消費(fèi)者向消息中間件發(fā)起消息拉取請(qǐng)求,消息中間件接收到拉取請(qǐng)求之后,將消息進(jìn)行打包之后返回給消費(fèi)者。這種方式下,消息是消費(fèi)者主動(dòng)向消息中間件進(jìn)行拉取的,這種由消費(fèi)者主動(dòng)向消息中間件拉取的方式,稱為“拉”模式。

          ?

          這里需要注意的是,我們要明白實(shí)現(xiàn)完全的推模式,對(duì)于MQ的broker而言需要付出較多的性能,由于broker需要主動(dòng)與消費(fèi)者進(jìn)程建立連接并且需要主動(dòng)探查消費(fèi)者進(jìn)程的健康狀態(tài),相當(dāng)于broker對(duì)消費(fèi)者進(jìn)程構(gòu)成了反向依賴,這便很大程度上增加了broker實(shí)現(xiàn)復(fù)雜度。

          簡(jiǎn)單提一句,對(duì)于RocketMQ而言,推方式消費(fèi)消息其本質(zhì)實(shí)現(xiàn)其實(shí)是長(zhǎng)輪詢的拉,相關(guān)文檔可以自行查找資料或者翻看本公眾號(hào)的歷史文章。

          ?

          基于“推”模式消費(fèi)消息

          首先介紹“推”模式下是如何消費(fèi)消息的,以RocketMQ為例,代碼如下:


          public?class?PushConsumerDemo?{

          ??public?static?void?main(String[]?args)?throws?InterruptedException,?MQClientException?{

          ????DefaultMQPushConsumer?consumer?=?new?DefaultMQPushConsumer("push_consumer_group");
          ????
          ????consumer.setNamesrvAddr("192.168.1.106");
          ????
          ????consumer.subscribe("PUSH_TOPIC",?"*");
          ????
          ????consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
          ????
          ????consumer.setConsumeTimestamp(new?SimpleDateFormat("yyyyMMddHHmmss").format(new?Date()));
          ????
          ????consumer.registerMessageListener(new?MessageListenerConcurrently()?{

          ??????@Override
          ??????public?ConsumeConcurrentlyStatus?consumeMessage(List?msgs,?ConsumeConcurrentlyContext?context)?{
          ????
          ????????System.out.printf("%s?Receive?New?Messages:?%s?%n",?Thread.currentThread().getName(),?msgs);
          ????
          ????????return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          ????
          ??????}
          ????
          ????});
          ????
          ????consumer.start();
          ????
          ????System.out.printf("Consumer?Started.%n");
          ??}
          }

          RocketMQ中通過DefaultMQPushConsumer實(shí)現(xiàn)“推”模式的消息消費(fèi),這種方式的特點(diǎn)在于實(shí)時(shí)性好,只要MQ服務(wù)端有消息到來,就會(huì)實(shí)時(shí)性的推送給消費(fèi)者進(jìn)行消費(fèi)。

          從章節(jié)開始的介紹中我們得知,在“推”模式下,服務(wù)端需要感知與它建立鏈接的客戶端,這意味著服務(wù)端主動(dòng)推送消息的過程中,需要對(duì)消息做額外的處理,以便能夠及時(shí)將消息分發(fā)給客戶端。這些計(jì)算邏輯會(huì)為消息中間件的服務(wù)端帶來額外的負(fù)擔(dān),因此RocketMQ通過“長(zhǎng)輪詢”的方式,巧妙的解決了這個(gè)問題。

          長(zhǎng)輪詢本質(zhì)上仍舊是輪詢,它與輪詢不同之處在于,當(dāng)服務(wù)端接收到客戶端的請(qǐng)求后服務(wù)端不會(huì)立即將數(shù)據(jù)返回給客戶端,而是會(huì)先將這個(gè)請(qǐng)求hold住,判斷服務(wù)器端數(shù)據(jù)是否有更新。如果有更新,則對(duì)客戶端進(jìn)行響應(yīng),如果一直沒有數(shù)據(jù),則它會(huì)在長(zhǎng)輪詢超時(shí)時(shí)間之前一直hold住請(qǐng)求并檢測(cè)是否有數(shù)據(jù)更新,直到有數(shù)據(jù)或者超時(shí)后才返回。

          “長(zhǎng)輪詢”的效果基本上與服務(wù)端實(shí)時(shí)推送相似,兼顧了實(shí)時(shí)性,降低了純“推”模式實(shí)現(xiàn)的復(fù)雜度。

          基于“拉”模式消費(fèi)消息

          接著介紹“拉”模式下是如何消費(fèi)消息的,還是以RocektMQ為例,代碼如下:

          public?class?PullConsumerDemo?{

          ??private?static?final?Map?OFFSE_TABLE?=?new?HashMap();

          ??public?static?void?main(String[]?args)?throws?MQClientException?{

          ????DefaultMQPullConsumer?consumer?=?new?DefaultMQPullConsumer("PULL_CONSUMER_GROUP");

          ????consumer.setNamesrvAddr("127.0.0.1:9876");

          ????consumer.start();

          ?

          ????Set?mqs?=?consumer.fetchSubscribeMessageQueues("PULL-TOPIC");

          ????for?(MessageQueue?mq?:?mqs)?{

          ??????System.out.printf("Consume?from?the?queue:?%s%n",?mq);

          ??????SINGLE_MQ:

          ??????while?(true)?{

          ????????try?{

          ??????????PullResult?pullResult?=

          ????????????consumer.pullBlockIfNotFound(mq,?null,?getMessageQueueOffset(mq),?32);

          ??????????System.out.printf("%s%n",?pullResult);

          ??????????putMessageQueueOffset(mq,?pullResult.getNextBeginOffset());

          ??????????switch?(pullResult.getPullStatus())?{

          ????????????case?FOUND:

          ??????????????break;

          ?????????????case?NO_MATCHED_MSG:

          ??????????????break;

          ????????????case?NO_NEW_MSG:

          ??????????????break?SINGLE_MQ;

          ????????????case?OFFSET_ILLEGAL:

          ??????????????break;

          ????????????default:

          ??????????????break;

          ??????????}

          ????????}?catch?(Exception?e)?{

          ??????????e.printStackTrace();

          ????????}

          ??????}

          ????}

          ?

          ????consumer.shutdown();

          ??}

          ?

          ??private?static?long?getMessageQueueOffset(MessageQueue?mq)?{

          ????Long?offset?=?OFFSE_TABLE.get(mq);

          ????if?(offset?!=?null)?{

          ??????return?offset;

          ????}

          ????return?0;

          ??}

          ?

          ??private?static?void?putMessageQueueOffset(MessageQueue?mq,long?offset){

          ????OFFSE_TABLE.put(mq,?offset);

          ??}

          }

          RocketMQ通過DefaultMQPullConsumer實(shí)現(xiàn)了“拉”模式的消息消費(fèi)。

          • (1)需要定義消費(fèi)者組,實(shí)例化一個(gè)DefaultMQPullConsumer消費(fèi)者對(duì)象,并指定消費(fèi)者組;

          • (2)接著為消費(fèi)者設(shè)置NameServer地址,保證消費(fèi)者客戶端能夠從NameServer獲取到broker地址,從而執(zhí)行消息消費(fèi)流程;

          • (3)通過consumer.fetchSubscribeMessageQueues(TOPIC)方法獲取指定TOPIC下的所有隊(duì)列,默認(rèn)有4個(gè);

          • (4)接著需要對(duì)獲取到MessageQueue集合進(jìn)行遍歷,拉取數(shù)據(jù)并執(zhí)行具體的消費(fèi)過程;

          • (5)通過while(true) 不間斷地從隊(duì)列中拉取數(shù)據(jù),默認(rèn)情況下每次拉取32條,這里需要顯式地傳入拉取開始的offset,通過getMessageQueueOffset(mq)方法獲取到開始拉取的offset,從持久化設(shè)施中得到對(duì)應(yīng)MessageQueue的拉取進(jìn)度(可以認(rèn)為是消費(fèi)進(jìn)度);

            • 拉取結(jié)束后,在持久化設(shè)施(這里是一個(gè)Map)中保存下次拉取開始時(shí)的offset,也就是本次拉取結(jié)束的下一個(gè)offset(通過pullResult.getNextBeginOffset()獲取);
          • (6)需要注意的是,每次拉取成功之后都需要顯式調(diào)用putMessageQueueOffset()方法,刷新對(duì)應(yīng)隊(duì)列MessageQueue的拉取進(jìn)度。

          總結(jié)來說,RocketMQ中的“拉”模式消費(fèi)方式需要開發(fā)者顯式維護(hù)消費(fèi)進(jìn)度,每次消費(fèi)成功之后都需要更新消費(fèi)進(jìn)度,并進(jìn)行存儲(chǔ),比如這里的案例就是通過Map存儲(chǔ)了隊(duì)列的消費(fèi)進(jìn)度(offset)。

          假如由于開發(fā)者的疏忽忘記保存offset,則每次都會(huì)從第一條消息進(jìn)行拉取,這樣很容易造成消息的重復(fù)消費(fèi)。如果是生產(chǎn)環(huán)境沒有做冪等則后果除了會(huì)造成大量業(yè)務(wù)邏輯的重復(fù)執(zhí)行還會(huì)造成業(yè)務(wù)的積壓從而導(dǎo)致線上業(yè)務(wù)的卡頓甚至雪崩。

          另外還需要通過額外的存儲(chǔ)手段對(duì)offset進(jìn)行保存(推薦使用MySQL或者Redis進(jìn)行存儲(chǔ)),并且需要保證存儲(chǔ)設(shè)施的穩(wěn)定可靠,否則還是會(huì)引起重復(fù)消費(fèi)的問題。

          推/拉模式的對(duì)比與使用建議

          基于“推”模式消費(fèi)消息,實(shí)時(shí)性好,只要消息進(jìn)入消息中間件就可以即時(shí)被消費(fèi)者感知并進(jìn)行消費(fèi);缺點(diǎn)在于“推”模式需要消息中間件進(jìn)行額外的計(jì)算和消費(fèi)者的維護(hù)工作,因此可能引起消息中間件服務(wù)端的機(jī)器CPU負(fù)載升高;

          而“拉”模式消費(fèi)消息,消費(fèi)者能夠自主控制拉取的頻率,拉取的數(shù)量,因此對(duì)消息中間件的機(jī)器而言,負(fù)載較低;但是“拉”模式由于是定時(shí)發(fā)起的消息拉取請(qǐng)求,因此實(shí)時(shí)性較弱。而且“拉”模式下還需要消費(fèi)者自行維護(hù)消費(fèi)進(jìn)度,相比而言“推”模式的消息消費(fèi)方式則不需要客戶端主動(dòng)維護(hù)消費(fèi)進(jìn)度(廣播消費(fèi)模式除外)。

          因此對(duì)推/拉模式的使用建議如下:

          • (1)如果追求消息消費(fèi)的實(shí)時(shí)性,則推薦使用“推”模式消費(fèi)消息,但是要注意盡量提高消息中間件服務(wù)器的配置,并添加必要的監(jiān)控以感知服務(wù)器的性能指標(biāo)變化;

          • (2)如果想要靈活控制消費(fèi)頻率,消息拉取數(shù)量,則推薦使用“拉”模式消費(fèi)消息。


          瀏覽 181
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  大香蕉伊在线观看 | 日日夜夜人人人 | www.俺去了 | 色婷婷国产精品免 | www国产无码内射 |