<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          消息積壓?jiǎn)栴}優(yōu)化思路探討

          共 17298字,需瀏覽 35分鐘

           ·

          2021-04-29 17:34


          JAVA前線 


          歡迎大家關(guān)注公眾號(hào)「JAVA前線」查看更多精彩分享,主要包括源碼分析、實(shí)際應(yīng)用、架構(gòu)思維、職場(chǎng)分享、產(chǎn)品思考等等,同時(shí)也非常歡迎大家加我微信「java_front」一起交流學(xué)習(xí)


          0 文章概述

          在使用消息中間件時(shí)消息積壓是我們必須面對(duì)的問題,無論這種問題是生產(chǎn)消息過快還是消費(fèi)者消費(fèi)能力不足導(dǎo)致的。本文我們以RocketMQ為例分析消息積壓?jiǎn)栴}通用處理思路。



          1 不處理

          消息積壓一定要處理嗎?我認(rèn)為在不影響業(yè)務(wù)情況下,消息積壓可以不處理,等待積壓消息逐漸被消化即可,因?yàn)橄⒎e壓本質(zhì)上是對(duì)消費(fèi)者的保護(hù)。我們不妨回顧一下消息中間件三個(gè)作用:解耦、異步、削峰。


          1.1 解耦

          假設(shè)用戶在一個(gè)電商系統(tǒng)購(gòu)物,支付成功后系統(tǒng)應(yīng)該怎么把這個(gè)消息告訴物流系統(tǒng)?

          第一種方式是支付系統(tǒng)直接調(diào)用物流系統(tǒng),但這樣會(huì)有一個(gè)問題:支付系統(tǒng)和物流系統(tǒng)產(chǎn)生了強(qiáng)依賴,當(dāng)物流系統(tǒng)出現(xiàn)問題,直接影響用戶交易流程,導(dǎo)致支付失敗。

          第二種方式是支付系統(tǒng)把支付成功消息推送給消息中間件,此時(shí)交易流程結(jié)束。物流系統(tǒng)訂閱這個(gè)消息進(jìn)行后續(xù)處理,即使物流系統(tǒng)出現(xiàn)問題,也不影響交易系統(tǒng)。


          1.2 異步

          假設(shè)物流系統(tǒng)處理業(yè)務(wù)需要100毫秒。如果支付系統(tǒng)直接調(diào)用物流系統(tǒng),整個(gè)鏈路響應(yīng)時(shí)長(zhǎng)就增加了100毫秒。

          如果支付系統(tǒng)把支付成功消息推送給消息中間件,支付系統(tǒng)就可以直接返回了,那么整個(gè)鏈路時(shí)長(zhǎng)就不需要增加這100毫秒了,這就是異步化帶來性能提升。


          1.3 削峰

          假設(shè)雙十一商家正在做秒殺活動(dòng),瞬時(shí)產(chǎn)生了大量支付單據(jù)。如果支付系統(tǒng)直接調(diào)用物流系統(tǒng),支付系統(tǒng)壓力就會(huì)同時(shí)傳遞給物流系統(tǒng),這是完全沒有必要的。

          如果支付系統(tǒng)把支付成功消息推送給消息中間件,物流系統(tǒng)可以根據(jù)系統(tǒng)能力,勻速拉取數(shù)據(jù)處理,削減了流量洪峰。

          消息堆積在中間件本質(zhì)上是對(duì)物流系統(tǒng)的一種保護(hù),流量壓力被勻速釋放給物流系統(tǒng),所以這種情況我們無須對(duì)消息積壓處理。


          2 要處理

          如果業(yè)務(wù)對(duì)消費(fèi)者處理實(shí)時(shí)性有要求,必須在一定時(shí)間內(nèi)處理完所有消息,因?yàn)檫@種場(chǎng)景消息積壓已經(jīng)影響了業(yè)務(wù),這時(shí)我們必須有所行動(dòng)。首先我們看看官網(wǎng)上RocketMQ網(wǎng)絡(luò)部署圖:




          Producer是消息生產(chǎn)者,Consumer是消息消費(fèi)者,Broker是消息中轉(zhuǎn)者,負(fù)責(zé)消息存儲(chǔ)和轉(zhuǎn)發(fā)。NameServer作為注冊(cè)中心維護(hù)著生產(chǎn)者、Broker、消費(fèi)者集群服務(wù)狀態(tài)。對(duì)于消息積壓?jiǎn)栴}我們可以從生產(chǎn)者、Broker、消費(fèi)者三個(gè)維度進(jìn)行思考。


          2.1 生產(chǎn)者

          生產(chǎn)者可以減少消息發(fā)送量從而減少消息積壓。消息發(fā)送量又可以從兩個(gè)維度進(jìn)行思考:第一是減少消息發(fā)送數(shù)量,對(duì)于下游明顯不需要的消息可以不發(fā)送,或者是對(duì)于一些頻繁變化的業(yè)務(wù)消息,可以選擇等待業(yè)務(wù)消息穩(wěn)定后再發(fā)送。

          第二是減少消息內(nèi)容大小,例如消費(fèi)者只需要5個(gè)字段,那么生產(chǎn)者就無需發(fā)送全部10個(gè)字段,尤其是一些體積很大的上下文字段可以不必發(fā)送。


          2.2 Broker

          對(duì)于消費(fèi)者不關(guān)心的消息完全可以在Broker端進(jìn)行過濾,從而減少傳輸?shù)较M(fèi)者的消息量從而提高吞吐量。我們以RocketMQ為例分析其提供的Tag、SQL表達(dá)式、Filter Server三種過濾方式。


          (1) Tag

          生產(chǎn)者為一個(gè)消息設(shè)置一個(gè)Tag,消費(fèi)者在訂閱時(shí)設(shè)置需要關(guān)注的Tag,這樣Broker可以在ConsumeQueue進(jìn)行過濾,只會(huì)從CommitLog讀取命中標(biāo)簽的消息:

          public class TagFilterProducer {
              public static void main(String[] args) throws Exception {
                  DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
                  producer.start();
                  String[] tags = new String[] { "TagA""TagB""TagC" };
                  for (int i = 0; i < 10; i++) {
                      // 每個(gè)消息設(shè)置一個(gè)Tag
                      Message msg = new Message("MyTopic", tags[i % tags.length], "Hello".getBytes(RemotingHelper.DEFAULT_CHARSET));
                      SendResult sendResult = producer.send(msg);
                      System.out.println("sendResult=", + sendResult);
                  }
                  producer.shutdown();
              }
          }

          public class TagFilterConsumer {
              public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
                  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
                  // 只訂閱消息Tag等于TagA或者TagC
                  consumer.subscribe("MyTopic""TagA || TagC");
                  consumer.registerMessageListener(new MessageListenerConcurrently() {
                      @Override
                      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                          System.out.println("ThreadName=" + Thread.currentThread().getName() + ",messages=" + msgs);
                          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                      }
                  });
                  consumer.start();
              }
          }

          (2) SQL表達(dá)式

          Tag只能支持比較簡(jiǎn)單的過濾邏輯,如果需要進(jìn)行復(fù)雜過濾就需要使用SQL表達(dá)式,官網(wǎng)對(duì)SQL表達(dá)式介紹如下:

          支持語(yǔ)法
          Numeric comparison, like >, >=, <, <=, BETWEEN, =
          Character comparison, like =, <>, IN
          IS NULL or IS NOT NULL
          Logical AND, OR, NOT

          支持類型
          Numeric, like 123, 3.1415
          Character, like 'abc', must be made with single quotes
          NULL, special constant
          Boolean, TRUE or FALSE

          生產(chǎn)者通過putUserProperty自定義屬性,消費(fèi)者通過表達(dá)式進(jìn)行過濾:

          public class SqlFilterProducer {
              public static void main(String[] args) throws Exception {
                  DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
                  producer.start();
                  String[] tags = new String[] { "TagA""TagB""TagC" };
                  for (int i = 0; i < 10; i++) {
                      // 每個(gè)消息設(shè)置一個(gè)Tag
                      Message msg = new Message("MyTopic", tags[i % tags.length], ("Hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                      // 每個(gè)消息設(shè)置自定義屬性
                      msg.putUserProperty("userId", String.valueOf(i));
                      SendResult sendResult = producer.send(msg);
                      System.out.println("sendResult=" + sendResult);
                  }
                  producer.shutdown();
              }
          }

          public class SqlFilterConsumer {
              public static void main(String[] args) throws Exception {
                  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
                  // SQL表達(dá)式
                  consumer.subscribe("MyTopic", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + "and (userId is not null and a between 0 and 3)"));
                  consumer.registerMessageListener(new MessageListenerConcurrently() {
                      @Override
                      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                          System.out.println("ThreadName=" + Thread.currentThread().getName() + ",messages=" + msgs);
                          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                      }
                  });
                  consumer.start();
              }
          }

          (3) Filter Server

          Filter Server支持用戶自定義Java函數(shù),Broker端會(huì)執(zhí)行該函數(shù)對(duì)消息進(jìn)行過濾。我們?cè)诰帉懞瘮?shù)時(shí)需要注意,不要包含大量消耗內(nèi)存或者創(chuàng)建線程操作,否則可能造成Broker宕機(jī):

          public class Producer {
              public static void main(String[] args) throws Exception {
                  DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
                  producer.start();
                  String[] tags = new String[] { "TagA""TagB""TagC" };
                  for (int i = 0; i < 10; i++) {
                      Message msg = new Message("MyTopic", tags[i % tags.length], ("Hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                      msg.putUserProperty("userId", String.valueOf(i));
                      SendResult sendResult = producer.send(msg);
                      System.out.println("sendResult=" + sendResult);
                  }
                  producer.shutdown();
              }
          }

          public class Consumer {
              public static void main(String[] args) throws InterruptedException, MQClientException {
                  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
                  String codeScript = MixAll.file2String("/home/admin/filters/MyMessageFilterImpl.java");
                  consumer.subscribe("MyTopic""com.java.front.rocketmq.test.filter.MyMessageFilterImpl", codeScript);
                  consumer.registerMessageListener(new MessageListenerConcurrently() {
                      @Override
                      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                          for(MessageExt msg : msgs) {
                              System.out.println("ThreadName=" + Thread.currentThread().getName() + ",message=" + msg);
                          }
                          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                      }

                  });
                  consumer.start();
              }
          }

          public class MyMessageFilterImpl implements MessageFilter {

              @Override
              public boolean match(MessageExt msg) {
                  String property = msg.getUserProperty("userId");
                  if(StringUtils.isEmpty(property)) {
                      return false;
                  }
                  if(Integer.parseInt(property) >= 3) {
                      return true;
                  }
                  return false;
              }
          }

          2.3 消費(fèi)者

          消費(fèi)者需要思考如何提高消費(fèi)速度,盡快消費(fèi)完積壓消息。需要注意如果消費(fèi)者還有下游依賴,例如訂閱消息后寫數(shù)據(jù)庫(kù)或者調(diào)用下游應(yīng)用,提高消費(fèi)速度時(shí)也必須考慮下游依賴的能力。


          (1) 優(yōu)化消費(fèi)邏輯

          如果消息消費(fèi)邏輯中存在慢SQL、慢服務(wù)等問題會(huì)降低消費(fèi)速度,從而造成消息積壓。我們可以使用例如Arthas等開源診斷工具,分析消費(fèi)全鏈路每個(gè)方法響應(yīng)時(shí)間,發(fā)現(xiàn)慢方法則進(jìn)行優(yōu)化。


          (2) 增加消費(fèi)線程

          適當(dāng)增加消費(fèi)線程,增加一定的消費(fèi)并發(fā)度也可以增加消費(fèi)速度,RocketMQ提供兩個(gè)方法設(shè)置線程數(shù):

          setConsumeThreadMin
          消費(fèi)最小線程數(shù)

          setConsumeThreadMax
          消費(fèi)最大線程數(shù)

          (3) 增加消費(fèi)步長(zhǎng)

          每次消費(fèi)可以多獲取幾條消息也可以增加消費(fèi)速度,RocketMQ提供兩個(gè)方法設(shè)置消費(fèi)步長(zhǎng):

          setPullBatchSize
          單次從MessageQueue獲取最大消息數(shù)量

          setConsumeMessageBatchMaxSize
          單次傳給消費(fèi)者執(zhí)行器最大消息數(shù)量(參數(shù)List<MessageExt> msgs最大長(zhǎng)度)

          (4) 增加消費(fèi)節(jié)點(diǎn)

          由于單機(jī)處理能力有限,當(dāng)消費(fèi)線程和消費(fèi)步長(zhǎng)都已增加到瓶頸時(shí),我們可以考慮擴(kuò)容集群中消費(fèi)節(jié)點(diǎn)。這個(gè)操作有兩個(gè)注意點(diǎn):第一是消費(fèi)節(jié)點(diǎn)數(shù)不要超過消息分區(qū)數(shù),第二是有序消費(fèi)造成并發(fā)度低問題。


          (5) 有序消費(fèi)改為無序消費(fèi)

          RocketMQ按照順序維度分為三類消息,普通消息并發(fā)度最好,但是不保證有序。全局有序消息有序性最好,但是并發(fā)度最差。分區(qū)有序消息既可以保證相同業(yè)務(wù)ID局部有序,又有保證一定并發(fā)度,但并發(fā)度受限于隊(duì)列數(shù)。

          (a) 普通消息

          普通消息也被稱為并發(fā)消息,生產(chǎn)時(shí)一條消息可能被寫入任意一個(gè)隊(duì)列里,消費(fèi)者可以啟動(dòng)多個(gè)線程并行消費(fèi),這類消息無法保證順序。雖然消息側(cè)無法保證有序,但我們可以在業(yè)務(wù)側(cè)使用狀態(tài)機(jī)實(shí)現(xiàn)業(yè)務(wù)有序

          (b) 分區(qū)有序消息

          需要注意消息有序性需要生產(chǎn)者和消費(fèi)者共同配合才能完成。生產(chǎn)者需要把相同業(yè)務(wù)ID的消息發(fā)送到同一個(gè)messageQueue,而在消費(fèi)時(shí)一個(gè)messageQueue不可以并發(fā)處理,這在一定程度上影響了消費(fèi)并發(fā)度

          public class Producer {
              public static void main(String[] args) throws Exception {
                  MQProducer producer = new DefaultMQProducer("producerGroup");
                  producer.start();
                  for (int i = 0; i < 10; i++) {
                      int orderId = i;
                      Message msg = new Message("MyTopic", ("Hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                      SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                          @Override
                          public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                              Integer id = (Integer) arg;
                              int index = id % mqs.size(); // 根據(jù)業(yè)務(wù)編號(hào)計(jì)算隊(duì)列下標(biāo)
                              return mqs.get(index);
                          }
                      }, orderId);
                  }
                  producer.shutdown();
              }
          }

          public class Consumer {
              public static void main(String[] args) throws Exception {
                  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
                  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                  consumer.subscribe("MyTopic""*")
                  consumer.registerMessageListener(new MessageListenerOrderly() { // 有序消費(fèi)
                      @Override
                      public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                          context.setAutoCommit(true);
                          System.out.println("ThreadName=" + Thread.currentThread().getName() + ",messages=" + msgs);
                          return ConsumeOrderlyStatus.SUCCESS;
                      }
                  });
                  consumer.start();
              }
          }

          (c) 全局有序消息

          全局有序是分區(qū)有序的一種特殊情況,如果一個(gè)主題只有一個(gè)消息隊(duì)列時(shí),那么就可以做到全局有序,但這種方案并發(fā)度最差


          3 文章總結(jié)

          本文我們分析了消息積壓?jiǎn)栴}處理思路,處理方案分為不處理和要處理兩大類。不處理是指在不影響業(yè)務(wù)的情況下利用消息系統(tǒng)削峰特性可以保護(hù)消費(fèi)者,如果要處理我們從生產(chǎn)者、Broker、消費(fèi)者三個(gè)維度進(jìn)行了分析,即生產(chǎn)者減少生產(chǎn)消息量,Broker進(jìn)行消息過濾,消費(fèi)者增加消費(fèi)速度,希望本文對(duì)大家有所幫助。




          JAVA前線 


          歡迎大家關(guān)注公眾號(hào)「JAVA前線」查看更多精彩分享,主要包括源碼分析、實(shí)際應(yīng)用、架構(gòu)思維、職場(chǎng)分享、產(chǎn)品思考等等,同時(shí)也非常歡迎大家加我微信「java_front」一起交流學(xué)習(xí)


          瀏覽 30
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  三级片网站入口 | WWW.撸一 | 91欧美在线播放 | 国产伦精品一区二区三区最新章节 | 人人妻人人澡人人DⅤD |