<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Rocketmq源碼分析16:消息過濾

          共 30475字,需瀏覽 61分鐘

           ·

          2021-05-02 15:10

          注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.

          rocketmq中,消息過濾有兩種方式:

          • tag
          • sql92

          本文將從源碼角度來分析消息過濾的一些細節(jié)。

          1. demo 準備

          消息過濾的示例demo位于org.apache.rocketmq.example.filter包下,這里我們分別來看下tagsql的過濾方式。

          1.1 消息過濾producer

          public class FilterProducer {
              public static void main(String[] args) throws Exception {
                  DefaultMQProducer producer 
                      = new DefaultMQProducer("please_rename_unique_group_name");
                  producer.start();
                  String[] tags = new String[] {"TagA""TagB""TagC"};

                  for (int i = 0; i < 60; i++) {
                      Message msg = new Message("TagFilterTest",
                          // 指定消息的tag
                          tags[i % tags.length],
                          "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

                      SendResult sendResult = producer.send(msg);
                      System.out.printf("%s%n", sendResult);
                  }
                  producer.shutdown();
              }
          }

          producer中,我們僅是指定了消息的tag,然后調用send(...)方法發(fā)送該消息。

          關于消息過濾,producer就只是把它當作普通消息發(fā)送出去,并沒有做什么額外的操作。

          1.2 消息過濾consumer

          1. tag 過濾

          tag過濾的consumer示例如下:

          public class TagFilterConsumer {
              public static void main(String[] args) throws 
                      InterruptedException, MQClientException, IOException 
          {

                  DefaultMQPushConsumer consumer 
                      = new DefaultMQPushConsumer("please_rename_unique_group_name");
                  consumer.subscribe("TagFilterTest"
                      // 設置要過濾的tag,多個使用 || 分開
                      "TagA || TagC");
                  consumer.registerMessageListener(new MessageListenerConcurrently() {

                      @Override
                      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                          ConsumeConcurrentlyContext context)
           
          {
                          System.out.printf("%s Receive New Messages: %s %n"
                              Thread.currentThread().getName(), msgs);
                          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                      }
                  });
                  consumer.start();
                  System.out.printf("Consumer Started.%n");
              }
          }

          在使用時,需要指定過濾的tag,多個tag使用||分開。

          2. sql 過濾

          sql過濾的consumer示例如下:

          public class SqlFilterConsumer {

              public static void main(String[] args) throws Exception {
                  String nameServer = "localhost:9876";
                  DefaultMQPushConsumer consumer 
                      = new DefaultMQPushConsumer("please_rename_unique_group_name");
                  consumer.setNamesrvAddr(nameServer);
                  consumer.subscribe("SqlFilterTest",
                      // sql 過濾語句
                      MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                          "and (a is not null and a between 0 and 3)"));
                  consumer.registerMessageListener(new MessageListenerConcurrently() {
                      @Override
                      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                          ConsumeConcurrentlyContext context)
           
          {
                          System.out.printf("%s Receive New Messages: %s %n"
                              Thread.currentThread().getName(), msgs);
                          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                      }
                  });
                  consumer.start();
                  System.out.printf("Consumer Started.%n");
              }
          }

          tag過濾不同的是,sql過濾時,需要使用MessageSelector.bySql(...)指定sql語句。

          另外,為了讓broker支持sql過濾,需要設置屬性:enablePropertyFilter=true,這樣broker才能支持sql過濾。

          從以上代碼來看,consumer會指定過濾規(guī)則,告訴broker自己能接收哪些消息,broker從而返回對應的消息。

          2. 從broker獲取消息

          consumerbroker拉取消息時,會把自己的過濾規(guī)則一并上報,當broker收到consumer的消息后,從而為consumer返回相應的消息,broker獲取消息的方法為PullMessageProcessor#processRequest(...)

          private RemotingCommand processRequest(final Channel channel, RemotingCommand request, 
                  boolean brokerAllowSuspend)
           throws RemotingCommandException 
          {
              ...

              // 創(chuàng)建消息過濾的filter
              SubscriptionData subscriptionData = null;
              ConsumerFilterData consumerFilterData = null;
              if (hasSubscriptionFlag) {
                  try {
                      // 構建過濾數(shù)據(jù)
                      subscriptionData = FilterAPI.build(requestHeader.getTopic(), 
                          requestHeader.getSubscription(), requestHeader.getExpressionType());
                      // 如果不是tag類型的過濾,創(chuàng)建 consumerFilterData 對象
                      if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
                          consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), 
                              requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
                              requestHeader.getExpressionType(), requestHeader.getSubVersion());
                          assert consumerFilterData != null;
                      }
                  } catch (Exception e) {
                      ...
                  }
              } else {
                  ...
              }

              ...

              // 消息過濾對象
              MessageFilter messageFilter;
              if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
                  messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
                      this.brokerController.getConsumerFilterManager());
              } else {
                  messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
                      this.brokerController.getConsumerFilterManager());
              }

              // 獲取消息
              // 1. 根據(jù) topic 與 queueId 獲取 ConsumerQueue 文件
              // 2. 根據(jù) ConsumerQueue 文件的信息,從 CommitLog 中獲取消息內容
              final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(
                  requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), 
                  requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
              ...
              return response;
          }

          這個方法是consumerbroker拉取消息的核心方法了,不過我們這里僅關注消息過濾相關的操作,因此這里省去了大量代碼,僅保留了消息過濾相關的內容。

          消息過濾相關的內容如下:

          1. 構建subscriptionData
          2. 構建ConsumerFilterData:如果不是tag類型的過濾,創(chuàng)建 consumerFilterData 對象
          3. 創(chuàng)建消息過濾對象MessageFilter
          4. 獲取消息,在這里會進行消息過濾,處理方法為DefaultMessageStore#getMessage

          接下來我們就來分別看看這些步驟。

          2.1 構建subscriptionDataFilterAPI#build

          構建subscriptionData的方法為FilterAPI#build,代碼如下:

          public static SubscriptionData build(final String topic, final String subString,
              final String type)
           throws Exception 
          {
              // 這里是構建tag類型的過濾數(shù)據(jù)
              if (ExpressionType.TAG.equals(type) || type == null) {
                  return buildSubscriptionData(null, topic, subString);
              }

              if (subString == null || subString.length() < 1) {
                  throw new IllegalArgumentException("Expression can't be null! " + type);
              }
              // 構建sql類型的過濾數(shù)據(jù)
              SubscriptionData subscriptionData = new SubscriptionData();
              subscriptionData.setTopic(topic);
              subscriptionData.setSubString(subString);
              subscriptionData.setExpressionType(type);
              return subscriptionData;
          }

          /**
           * 構建tag過濾消息
           */

          public static SubscriptionData buildSubscriptionData(final String consumerGroup, 
                  String topic, String subString)
           throws Exception 
          {
              SubscriptionData subscriptionData = new SubscriptionData();
              subscriptionData.setTopic(topic);
              subscriptionData.setSubString(subString);

              if (null == subString || subString.equals(SubscriptionData.SUB_ALL) 
                      || subString.length() == 0) {
                  subscriptionData.setSubString(SubscriptionData.SUB_ALL);
              } else {
                  // 如果指定了tag,按 || 拆分tag
                  String[] tags = subString.split("\\|\\|");
                  if (tags.length > 0) {
                      for (String tag : tags) {
                          if (tag.length() > 0) {
                              String trimString = tag.trim();
                              if (trimString.length() > 0) {
                                  // tag 放入 tagsSet,tag 的 hashCode 放入 codeSet
                                  subscriptionData.getTagsSet().add(trimString);
                                  subscriptionData.getCodeSet().add(trimString.hashCode());
                              }
                          }
                      }
                  } else {
                      throw new Exception("subString split error");
                  }
              }

              return subscriptionData;
          }

          從上面的方法來看,構建subscriptionData時,會根據(jù)tag與非tag過濾來構建不同的subscriptionData

          1. 如果是tag過濾,則按“||”拆分指定的tag,得到的tag 放入 tagsSet中,taghash值 放入 codeSet
          2. 如果是非tag過濾,則不用處理tag相關操作,設置其他屬性即可

          2.2 構建ConsumerFilterDataConsumerFilterManager#build

          對于非tag過濾的類型,rocketMq會額外構建ConsumerFilterData對象,方法為ConsumerFilterManager#build

          public static ConsumerFilterData build(final String topic, final String consumerGroup,
                  final String expression, final String type,
                  final long clientVersion)
           
          {
                  if (ExpressionType.isTagType(type)) {
                      return null;
                  }

                  ConsumerFilterData consumerFilterData = new ConsumerFilterData();
                  // 設置一系列的屬性
                  consumerFilterData.setTopic(topic);
                  consumerFilterData.setConsumerGroup(consumerGroup);
                  consumerFilterData.setBornTime(System.currentTimeMillis());
                  consumerFilterData.setDeadTime(0);
                  consumerFilterData.setExpression(expression);
                  consumerFilterData.setExpressionType(type);
                  consumerFilterData.setClientVersion(clientVersion);
                  try {
                      // 設置處理表達式的過濾器
                      consumerFilterData.setCompiledExpression(
                          FilterFactory.INSTANCE.get(type).compile(expression)
                      );
                  } catch (Throwable e) {
                      log.error(...);
                      return null;
                  }

                  return consumerFilterData;
              }

          這個方法中,關鍵就是如下一行:

          consumerFilterData.setCompiledExpression(
              FilterFactory.INSTANCE.get(type).compile(expression)
          );

          它設置了表達式的解析器,FilterFactory代碼如下:

          public class FilterFactory {

              /** 單例對象 */
              public static final FilterFactory INSTANCE = new FilterFactory();

              /** 存放過濾器的map */
              protected static final Map<String, FilterSpi> FILTER_SPI_HOLDER 
                  = new HashMap<String, FilterSpi>(4);

              static {
                  FilterFactory.INSTANCE.register(new SqlFilter());
              }

              /**
               * 將 過濾器添加到 FILTER_SPI_HOLDER 中
               */

              public void register(FilterSpi filterSpi) {
                  if (FILTER_SPI_HOLDER.containsKey(filterSpi.ofType())) {
                      throw new IllegalArgumentException(...);
                  }

                  FILTER_SPI_HOLDER.put(filterSpi.ofType(), filterSpi);
              }

              /**
               * 根據(jù)類型獲取 filter
               */

              public FilterSpi get(String type) {
                  return FILTER_SPI_HOLDER.get(type);
              }

              ...
          }

          可以看到,整個FILTER_SPI_HOLDERy就只有一個FilterSpi實例:SqlFilter,sql的過濾也是由這個類來處理的。

          2.3 創(chuàng)建MessageFilter對象

          這塊就是創(chuàng)建了一個MessageFilter對象,上面創(chuàng)建的subscriptionDataconsumerFilterData都會被傳入這個對象中。

          2.4 獲取消息

          到了這一步,就是真正去commitLog中獲取消息了,獲取方法為DefaultMessageStore#getMessage

          public GetMessageResult getMessage(final String group, final String topic, final int queueId, 
                  final long offset, final int maxMsgNums, final MessageFilter messageFilter)
           
          {
              ...

              // 判斷消息是否滿足過濾條件
              if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(
                          isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                  if (getResult.getBufferTotalSize() == 0) {
                      status = GetMessageStatus.NO_MATCHED_MESSAGE;
                  }
                  continue;
              }

              // 獲取消息
              SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
              if (null == selectResult) {
                  if (getResult.getBufferTotalSize() == 0) {
                      status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                  }

                  nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                  continue;
              }

              if (messageFilter != null
                  // 比較sql表達式    
                  && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
                  if (getResult.getBufferTotalSize() == 0) {
                      status = GetMessageStatus.NO_MATCHED_MESSAGE;
                  }
                  // release...
                  selectResult.release();
                  continue;
              }
              ...
          }

          在這個方法里,我們依舊只關注過濾相關流程,該方法所進行的操作如下:

          1. 判斷消息是否滿足過濾條件,這里只過濾taghashCode,不滿足條件的消息就不會獲取到
          2. 獲取消息,就是從commitlog文件中獲取消息
          3. 判斷消息是否滿足過濾條件,這里處理sql類型的過濾,不滿足條件的消息不會返回
          1. 過濾taghashCode

          broker處理tag的操作方法為DefaultMessageFilter#isMatchedByConsumeQueue,代碼如下:

          @Override
          public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
              if (null == tagsCode || null == subscriptionData) {
                  return true;
              }

              if (subscriptionData.isClassFilterMode()) {
                  return true;
              }

              return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
                      // 判斷是否滿足標簽的hashcode
                      || subscriptionData.getCodeSet().contains(tagsCode.intValue());
          }

          需要注意的是,這里只判斷taghashCode是否相等,但不同taghashCode可能相等,真正的tag過濾是在consumer中進行的。

          2. sql 過濾

          commitlog中獲得消息后,接下來會進行sql過濾,方法為ExpressionMessageFilter#isMatchedByCommitLog

          public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
              // 省略一些內容
              ...

              Object ret = null;
              try {
                  MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
                  // 處理值
                  ret = realFilterData.getCompiledExpression().evaluate(context);
              } catch (Throwable e) {
                  log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
              }

              log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);

              if (ret == null || !(ret instanceof Boolean)) {
                  return false;
              }

              return (Boolean) ret;
          }

          realFilterData內容如下:

          3. consumer的過濾tag

          對于tag過濾,broker僅是根據(jù)taghashCode進行過濾了,在consumer才會根據(jù)tag的內容過濾,我們進入拉取消息的方法 DefaultMQPushConsumerImpl#pullMessage

             public void pullMessage(final PullRequest pullRequest) {
                 ...
                 // 消息拉取的回調函數(shù),在拉取到消息后會進入這個方法處理
                  PullCallback pullCallback = new PullCallback() {
                      @Override
                      public void onSuccess(PullResult pullResult) {
                          if (pullResult != null) {
                              // 處理消息,將二制消息解碼為java對象,也會對消息進行tag過濾
                              pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
                                  pullRequest.getMessageQueue(), pullResult, subscriptionData);
                              ...
                          }
                          ...
                      }
                      ...
                  }
             }

          根據(jù)跟進PullAPIWrapper#processPullResult方法:

          public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
              final SubscriptionData subscriptionData)
           
          {
              PullResultExt pullResultExt = (PullResultExt) pullResult;

              this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
              if (PullStatus.FOUND == pullResult.getPullStatus()) {
                  // 將二進制數(shù)據(jù)解碼為對象
                  ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
                  List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

                  List<MessageExt> msgListFilterAgain = msgList;
                  // 按 tag 過濾
                  if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
                      msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
                      for (MessageExt msg : msgList) {
                          if (msg.getTags() != null) {
                              // 根據(jù)tag過濾消息
                              if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                                  msgListFilterAgain.add(msg);
                              }
                          }
                      }
                  }
                  ...
              }
              ....
          }

          從代碼來看,方法中會根據(jù)tag是否在TagsSet中來決定該消息是否需要加入msgListFilterAgain,而msgListFilterAgain就是過濾的消息列表了。

          3. 總結

          RocketMq消息過濾支持tagsql兩種方式,

          1. tag 方式

          broker獲取消息時,根據(jù)taghashCode過濾一波消息,但這樣得到的消息可能并不只是指定tag的,因此需要在consumer上做進一步的過濾。

          舉例來說,consumer訂閱了tagtag1的消息,tag1tag11兩者的hashCode都是100,因此在broker上過濾時,根據(jù)taghashCode,這兩者對應的消息都會發(fā)往consumer,因此consumer需要再進比較tag的值,過濾出真正需要的消息。

          2. sql 方式

          sql方式的過濾方式,只在broker中進行。


          限于作者個人水平,文中難免有錯誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉載請聯(lián)系作者獲得授權,非商業(yè)轉載請注明出處。

          本文首發(fā)于微信公眾號 「Java技術探秘」,如果您喜歡本文,歡迎關注該公眾號,讓我們一起在技術的世界里探秘吧!


          瀏覽 46
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  激情深爱婷婷 | 91丨国产亚洲丨精品白丝 | 黄色视频网站观看 | 黄色免费网站在线 | 亚洲另类在线观看 |