Rocketmq源碼分析16:消息過濾
注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.
在rocketmq中,消息過濾有兩種方式:
tagsql92
本文將從源碼角度來分析消息過濾的一些細節(jié)。
1. demo 準備
消息過濾的示例demo位于org.apache.rocketmq.example.filter包下,這里我們分別來看下tag與sql的過濾方式。
1.1 消息過濾producer
public class FilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer
= new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 60; i++) {
Message msg = new Message("TagFilterTest",
// 指定消息的tag
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
在producer中,我們僅是指定了消息的tag,然后調用send(...)方法發(fā)送該消息。
關于消息過濾,producer就只是把它當作普通消息發(fā)送出去,并沒有做什么額外的操作。
1.2 消息過濾consumer
1. tag 過濾
tag過濾的consumer示例如下:
public class TagFilterConsumer {
public static void main(String[] args) throws
InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.subscribe("TagFilterTest",
// 設置要過濾的tag,多個使用 || 分開
"TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
在使用時,需要指定過濾的tag,多個tag使用||分開。
2. sql 過濾
sql過濾的consumer示例如下:
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
String nameServer = "localhost:9876";
DefaultMQPushConsumer consumer
= new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("SqlFilterTest",
// sql 過濾語句
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
與tag過濾不同的是,sql過濾時,需要使用MessageSelector.bySql(...)指定sql語句。
另外,為了讓broker支持sql過濾,需要設置屬性:enablePropertyFilter=true,這樣broker才能支持sql過濾。
從以上代碼來看,consumer會指定過濾規(guī)則,告訴broker自己能接收哪些消息,broker從而返回對應的消息。
2. 從broker獲取消息
consumer從broker拉取消息時,會把自己的過濾規(guī)則一并上報,當broker收到consumer的消息后,從而為consumer返回相應的消息,broker獲取消息的方法為PullMessageProcessor#processRequest(...):
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
boolean brokerAllowSuspend) throws RemotingCommandException {
...
// 創(chuàng)建消息過濾的filter
SubscriptionData subscriptionData = null;
ConsumerFilterData consumerFilterData = null;
if (hasSubscriptionFlag) {
try {
// 構建過濾數(shù)據(jù)
subscriptionData = FilterAPI.build(requestHeader.getTopic(),
requestHeader.getSubscription(), requestHeader.getExpressionType());
// 如果不是tag類型的過濾,創(chuàng)建 consumerFilterData 對象
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getExpressionType(), requestHeader.getSubVersion());
assert consumerFilterData != null;
}
} catch (Exception e) {
...
}
} else {
...
}
...
// 消息過濾對象
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}
// 獲取消息
// 1. 根據(jù) topic 與 queueId 獲取 ConsumerQueue 文件
// 2. 根據(jù) ConsumerQueue 文件的信息,從 CommitLog 中獲取消息內容
final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
...
return response;
}
這個方法是consumer從broker拉取消息的核心方法了,不過我們這里僅關注消息過濾相關的操作,因此這里省去了大量代碼,僅保留了消息過濾相關的內容。
消息過濾相關的內容如下:
構建 subscriptionData構建 ConsumerFilterData:如果不是tag類型的過濾,創(chuàng)建consumerFilterData對象創(chuàng)建消息過濾對象 MessageFilter獲取消息,在這里會進行消息過濾,處理方法為 DefaultMessageStore#getMessage
接下來我們就來分別看看這些步驟。
2.1 構建subscriptionData:FilterAPI#build
構建subscriptionData的方法為FilterAPI#build,代碼如下:
public static SubscriptionData build(final String topic, final String subString,
final String type) throws Exception {
// 這里是構建tag類型的過濾數(shù)據(jù)
if (ExpressionType.TAG.equals(type) || type == null) {
return buildSubscriptionData(null, topic, subString);
}
if (subString == null || subString.length() < 1) {
throw new IllegalArgumentException("Expression can't be null! " + type);
}
// 構建sql類型的過濾數(shù)據(jù)
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
subscriptionData.setExpressionType(type);
return subscriptionData;
}
/**
* 構建tag過濾消息
*/
public static SubscriptionData buildSubscriptionData(final String consumerGroup,
String topic, String subString) throws Exception {
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
if (null == subString || subString.equals(SubscriptionData.SUB_ALL)
|| subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
// 如果指定了tag,按 || 拆分tag
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
// tag 放入 tagsSet,tag 的 hashCode 放入 codeSet
subscriptionData.getTagsSet().add(trimString);
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
throw new Exception("subString split error");
}
}
return subscriptionData;
}
從上面的方法來看,構建subscriptionData時,會根據(jù)tag與非tag過濾來構建不同的subscriptionData:
如果是 tag過濾,則按“||”拆分指定的tag,得到的tag放入tagsSet中,tag的hash值放入codeSet中如果是非 tag過濾,則不用處理tag相關操作,設置其他屬性即可
2.2 構建ConsumerFilterData:ConsumerFilterManager#build
對于非tag過濾的類型,rocketMq會額外構建ConsumerFilterData對象,方法為ConsumerFilterManager#build:
public static ConsumerFilterData build(final String topic, final String consumerGroup,
final String expression, final String type,
final long clientVersion) {
if (ExpressionType.isTagType(type)) {
return null;
}
ConsumerFilterData consumerFilterData = new ConsumerFilterData();
// 設置一系列的屬性
consumerFilterData.setTopic(topic);
consumerFilterData.setConsumerGroup(consumerGroup);
consumerFilterData.setBornTime(System.currentTimeMillis());
consumerFilterData.setDeadTime(0);
consumerFilterData.setExpression(expression);
consumerFilterData.setExpressionType(type);
consumerFilterData.setClientVersion(clientVersion);
try {
// 設置處理表達式的過濾器
consumerFilterData.setCompiledExpression(
FilterFactory.INSTANCE.get(type).compile(expression)
);
} catch (Throwable e) {
log.error(...);
return null;
}
return consumerFilterData;
}
這個方法中,關鍵就是如下一行:
consumerFilterData.setCompiledExpression(
FilterFactory.INSTANCE.get(type).compile(expression)
);
它設置了表達式的解析器,FilterFactory代碼如下:
public class FilterFactory {
/** 單例對象 */
public static final FilterFactory INSTANCE = new FilterFactory();
/** 存放過濾器的map */
protected static final Map<String, FilterSpi> FILTER_SPI_HOLDER
= new HashMap<String, FilterSpi>(4);
static {
FilterFactory.INSTANCE.register(new SqlFilter());
}
/**
* 將 過濾器添加到 FILTER_SPI_HOLDER 中
*/
public void register(FilterSpi filterSpi) {
if (FILTER_SPI_HOLDER.containsKey(filterSpi.ofType())) {
throw new IllegalArgumentException(...);
}
FILTER_SPI_HOLDER.put(filterSpi.ofType(), filterSpi);
}
/**
* 根據(jù)類型獲取 filter
*/
public FilterSpi get(String type) {
return FILTER_SPI_HOLDER.get(type);
}
...
}
可以看到,整個FILTER_SPI_HOLDERy就只有一個FilterSpi實例:SqlFilter,sql的過濾也是由這個類來處理的。
2.3 創(chuàng)建MessageFilter對象
這塊就是創(chuàng)建了一個MessageFilter對象,上面創(chuàng)建的subscriptionData與consumerFilterData都會被傳入這個對象中。
2.4 獲取消息
到了這一步,就是真正去commitLog中獲取消息了,獲取方法為DefaultMessageStore#getMessage:
public GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final MessageFilter messageFilter) {
...
// 判斷消息是否滿足過濾條件
if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(
isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue;
}
// 獲取消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}
if (messageFilter != null
// 比較sql表達式
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}
...
}
在這個方法里,我們依舊只關注過濾相關流程,該方法所進行的操作如下:
判斷消息是否滿足過濾條件,這里只過濾 tag的hashCode,不滿足條件的消息就不會獲取到獲取消息,就是從 commitlog文件中獲取消息判斷消息是否滿足過濾條件,這里處理 sql類型的過濾,不滿足條件的消息不會返回
1. 過濾tag的hashCode:
broker處理tag的操作方法為DefaultMessageFilter#isMatchedByConsumeQueue,代碼如下:
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == tagsCode || null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode()) {
return true;
}
return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
// 判斷是否滿足標簽的hashcode
|| subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
需要注意的是,這里只判斷tag的hashCode是否相等,但不同tag的hashCode可能相等,真正的tag過濾是在consumer中進行的。
2. sql 過濾
從commitlog中獲得消息后,接下來會進行sql過濾,方法為ExpressionMessageFilter#isMatchedByCommitLog:
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
// 省略一些內容
...
Object ret = null;
try {
MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
// 處理值
ret = realFilterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
}
log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
if (ret == null || !(ret instanceof Boolean)) {
return false;
}
return (Boolean) ret;
}
realFilterData內容如下:


3. consumer的過濾tag
對于tag過濾,broker僅是根據(jù)tag的hashCode進行過濾了,在consumer才會根據(jù)tag的內容過濾,我們進入拉取消息的方法 DefaultMQPushConsumerImpl#pullMessage:
public void pullMessage(final PullRequest pullRequest) {
...
// 消息拉取的回調函數(shù),在拉取到消息后會進入這個方法處理
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 處理消息,將二制消息解碼為java對象,也會對消息進行tag過濾
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(
pullRequest.getMessageQueue(), pullResult, subscriptionData);
...
}
...
}
...
}
}
根據(jù)跟進PullAPIWrapper#processPullResult方法:
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
// 將二進制數(shù)據(jù)解碼為對象
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
// 按 tag 過濾
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
// 根據(jù)tag過濾消息
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
...
}
....
}
從代碼來看,方法中會根據(jù)tag是否在TagsSet中來決定該消息是否需要加入msgListFilterAgain,而msgListFilterAgain就是過濾的消息列表了。
3. 總結
RocketMq消息過濾支持tag與sql兩種方式,
1. tag 方式
在broker獲取消息時,根據(jù)tag的hashCode過濾一波消息,但這樣得到的消息可能并不只是指定tag的,因此需要在consumer上做進一步的過濾。
舉例來說,consumer訂閱了tag為tag1的消息,tag1與tag11兩者的hashCode都是100,因此在broker上過濾時,根據(jù)tag的hashCode,這兩者對應的消息都會發(fā)往consumer,因此consumer需要再進比較tag的值,過濾出真正需要的消息。
2. sql 方式
sql方式的過濾方式,只在broker中進行。
限于作者個人水平,文中難免有錯誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉載請聯(lián)系作者獲得授權,非商業(yè)轉載請注明出處。
本文首發(fā)于微信公眾號 「Java技術探秘」,如果您喜歡本文,歡迎關注該公眾號,讓我們一起在技術的世界里探秘吧!
