Rocketmq源碼分析13:consumer 消費(fèi)偏移量
注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.
接上文,繼續(xù)分析consumer消費(fèi)流程。
6. 不重復(fù)消費(fèi)消息:消費(fèi)位置的偏移量
rocketMq的消費(fèi)者如何保證不重復(fù)消費(fèi)消息呢?答應(yīng)就在于偏移量!consumer在拉取消息時(shí),會(huì)先獲取偏移量信息,然后拉消息時(shí)會(huì)帶上這個(gè)偏移量,之后broker則會(huì)根據(jù)這個(gè)偏移量,獲取對應(yīng)的消息返回給consumer。
6.1 偏移量的存儲(chǔ)初始化
處理偏移量存儲(chǔ)的接口為OffsetStore,它有兩個(gè)實(shí)現(xiàn)類:
LocalFileOffsetStore:本地文件存儲(chǔ),即存儲(chǔ)在本地文件中RemoteBrokerOffsetStore:遠(yuǎn)程broker存儲(chǔ),即存儲(chǔ)在遠(yuǎn)程broker中
這個(gè)接口的初始化在DefaultMQPushConsumerImpl#start方法中進(jìn)行:
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
...
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
// 消息模式:廣播模式存在本地,集群模式存在遠(yuǎn)程(broker)
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 加載消費(fèi)信息的偏移量
this.offsetStore.load();
...
}
...
}
這個(gè)方法中,與偏移量相關(guān)的操作有兩個(gè):
初始化:根據(jù)消息模式的不同,而初始化不同的 OffsetStore,簡單來說,廣播模式下,偏移量存儲(chǔ)在本地,集群模式下,偏移量存儲(chǔ)在遠(yuǎn)程broker加載偏移量信息
這里分別來看看兩者的加載操作:
LocalFileOffsetStore的加載:
@Override
public void load() throws MQClientException {
// 讀取本地文件
OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
// 加載每個(gè)隊(duì)列的偏移量
for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
}
}
}
private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
String content = null;
try {
// 讀取文件操作,將文件內(nèi)容轉(zhuǎn)為String
content = MixAll.file2String(this.storePath);
} catch (IOException e) {
log.warn("Load local offset store file exception", e);
}
if (null == content || content.length() == 0) {
// 讀取 bak 文件
return this.readLocalOffsetBak();
} else {
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
offsetSerializeWrapper =
OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
} catch (Exception e) {
log.warn("readLocalOffset Exception, and try to correct", e);
return this.readLocalOffsetBak();
}
return offsetSerializeWrapper;
}
}
可以看到,這種操作下,僅僅只是讀取本地文件而已。
再來看看RemoteBrokerOffsetStore的加載:
@Override
public void load() {
}
遠(yuǎn)程broker存儲(chǔ)時(shí),啥也沒做。
6.2 偏移量持久化
偏移量持久化是在定時(shí)任務(wù)中進(jìn)行的,定時(shí)任務(wù)的啟動(dòng)方法為MQClientInstance#startScheduledTask:
private void startScheduledTask() {
// 持久化消費(fèi)者的消費(fèi)偏移量,每5秒執(zhí)行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
}
處理持久化操作的方法為OffsetStore#persistAll,我們先來看看LocalFileOffsetStore的持久化操作:
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = entry.getValue();
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
}
}
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null) {
try {
// 保存到文件
MixAll.string2File(jsonString, this.storePath);
} catch (IOException e) {
log.error("persistAll consumer offset Exception, " + this.storePath, e);
}
}
}
這個(gè)操作比較簡單,就只是將偏移量信息寫入到文件中。
再來看看RemoteBrokerOffsetStore的操作:
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
// 更新偏移量信息到broker
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info(...);
} catch (Exception e) {
log.error(...);
}
} else {
unusedMQ.add(mq);
}
}
}
if (!unusedMQ.isEmpty()) {
for (MessageQueue mq : unusedMQ) {
this.offsetTable.remove(mq);
log.info("remove unused mq, {}, {}", mq, this.groupName);
}
}
}
在RemoteBrokerOffsetStore#persistAll方法中,會(huì)調(diào)用this.updateConsumeOffsetToBroker(...)將持久化信息提交到broker上,更新的操作方法為 MQClientAPIImpl#updateConsumerOffset:
public void updateConsumerOffset(
final String addr,
final UpdateConsumerOffsetRequestHeader requestHeader,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
// 更新偏移量的請求 code 為 UPDATE_CONSUMER_OFFSET
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
// 執(zhí)行 netty 請求
RemotingCommand response = this.remotingClient.invokeSync(
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
broker收到請求后,又是如何處理的呢?這里我們根據(jù)UPDATE_CONSUMER_OFFSET找到處理該code的Processor為ConsumerManageProcessor,它的ConsumerManageProcessor如下:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
這個(gè)方法中會(huì)處理三種類型的請求:
GET_CONSUMER_LIST_BY_GROUP:獲取指定消費(fèi)組下的所有消費(fèi)者UPDATE_CONSUMER_OFFSET:更新消費(fèi)位置的偏移量QUERY_CONSUMER_OFFSET:查詢消費(fèi)位置的偏移量
這里我們只關(guān)注更新消費(fèi)位置的偏移量操作,進(jìn)入ConsumerManageProcessor#updateConsumerOffset方法:
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
final UpdateConsumerOffsetRequestHeader requestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
// 繼續(xù)處理
this.brokerController.getConsumerOffsetManager().commitOffset(
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
繼續(xù)跟進(jìn),最終來到ConsumerOffsetManager類:
public class ConsumerOffsetManager extends ConfigManager {
...
/**
* 存放偏移量的map
*/
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
/**
* 處理保存操作
*/
public void commitOffset(final String clientHost, final String group, final String topic,
final int queueId,
final long offset) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
// 繼續(xù)調(diào)用重載方法
this.commitOffset(clientHost, key, queueId, offset);
}
/**
* 最終處理的地方
*/
private void commitOffset(final String clientHost, final String key, final int queueId,
final long offset) {
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
log.warn(...);
}
}
}
...
ConsumerOffsetManager類就是用來處理偏移量的存儲(chǔ)的,它使用一個(gè)ConcurrentMap來保存消費(fèi)信息的偏移量,key為topic@group,value為消費(fèi)位置的偏移量。
從ConsumerOffsetManager來看,偏移量僅僅只是保存在了內(nèi)存中,這也就是說,如果整個(gè)broker集群停機(jī)了,然后再重啟,消費(fèi)位置的偏移量就沒有了。
6.3 偏移量的獲取
在consumer拉取消息時(shí),在最初準(zhǔn)備pullRequest對象時(shí),會(huì)加載消費(fèi)信息的偏移量,方法為RebalanceImpl#updateProcessQueueTableInRebalance:
private boolean updateProcessQueueTableInRebalance(final String topic,
final Set<MessageQueue> mqSet, final boolean isOrder) {
...
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
...
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// 計(jì)算消費(fèi)位置的偏移量
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info(...);
} else {
log.info(...);
// 添加 pullRequest
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
// 設(shè)置偏移量
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn(...);
}
}
}
// 發(fā)布
this.dispatchPullRequest(pullRequestList);
return changed;
}
獲取消費(fèi)位置的偏移量的代碼為
// 計(jì)算消費(fèi)位置的偏移量
long nextOffset = this.computePullFromWhere(mq);
調(diào)用的方法為RebalancePushImpl#computePullFromWhere,進(jìn)入其中:
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl
.getDefaultMQPushConsumer().getConsumeFromWhere();
// 獲取 offsetStore
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
case CONSUME_FROM_MIN_OFFSET:
case CONSUME_FROM_MAX_OFFSET:
case CONSUME_FROM_LAST_OFFSET: {
// 讀取操作
long lastOffset = offsetStore.readOffset(
mq, ReadOffsetType.READ_FROM_STORE);
// 省略各種判斷
...
break;
}
case CONSUME_FROM_FIRST_OFFSET: {
long lastOffset = offsetStore.readOffset(
mq, ReadOffsetType.READ_FROM_STORE);
// 省略各種判斷
...
break;
}
case CONSUME_FROM_TIMESTAMP: {
long lastOffset = offsetStore.readOffset(
mq, ReadOffsetType.READ_FROM_STORE);
// 省略各種判斷
...
break;
}
default:
break;
}
return result;
}
讀取操作就是在這里進(jìn)行的,我們直接看看本地存儲(chǔ)的讀取與遠(yuǎn)程存儲(chǔ)的讀取:
本地文件存儲(chǔ),就是直接讀取本地文件,進(jìn)入LocalFileOffsetStore#readOffset方法:
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
}
case READ_FROM_STORE: {
OffsetSerializeWrapper offsetSerializeWrapper;
try {
// 讀取本地文件
offsetSerializeWrapper = this.readLocalOffset();
} catch (MQClientException e) {
return -1;
}
if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
if (offset != null) {
this.updateOffset(mq, offset.get(), false);
return offset.get();
}
}
}
default:
break;
}
}
return -1;
}
/**
* 讀取文件的操作
*/
private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
String content = null;
try {
// 將文件內(nèi)容轉(zhuǎn)化為字符串
content = MixAll.file2String(this.storePath);
} catch (IOException e) {
log.warn("Load local offset store file exception", e);
}
if (null == content || content.length() == 0) {
return this.readLocalOffsetBak();
} else {
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
offsetSerializeWrapper =
OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
} catch (Exception e) {
log.warn("readLocalOffset Exception, and try to correct", e);
return this.readLocalOffsetBak();
}
return offsetSerializeWrapper;
}
}
再來看看從遠(yuǎn)程broker是如何獲取的,進(jìn)入RemoteBrokerOffsetStore#readOffset方法:
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
}
case READ_FROM_STORE: {
try {
// 從broker中獲取
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
}
// No offset in broker
catch (MQBrokerException e) {
return -1;
}
//Other exceptions
catch (Exception e) {
log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
return -2;
}
}
default:
break;
}
}
return -1;
}
/**
* 繼續(xù)獲取操作
*/
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
// 找到對應(yīng)的 broker,只從對應(yīng)的broker上獲取
FindBrokerResult findBrokerResult = this.mQClientFactory
.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
// 從broker查詢
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
查詢操作在MQClientAPIImpl#queryConsumerOffset方法中,進(jìn)入:
public long queryConsumerOffset(
final String addr,
final QueryConsumerOffsetRequestHeader requestHeader,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(
RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(
MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
// 處理結(jié)果
QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(
QueryConsumerOffsetResponseHeader.class);
return responseHeader.getOffset();
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
收到該消息后,broker會(huì)如何應(yīng)答呢?
ConsumerManageProcessor#processRequest方法如下:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
處理查詢操作在ConsumerManageProcessor#queryConsumerOffset方法中:
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
final QueryConsumerOffsetRequestHeader requestHeader =
(QueryConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
// 查詢操作
long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(
requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId());
...
return response;
}
最終調(diào)用ConsumerOffsetManager#queryOffset(...)方法完成查詢操作:
public long queryOffset(final String group, final String topic, final int queueId) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null != map) {
Long offset = map.get(queueId);
if (offset != null)
return offset;
}
return -1;
}
其實(shí)就是從ConsumerOffsetManager的成員變量offsetTable中獲取數(shù)據(jù)。
限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。
本文首發(fā)于微信公眾號 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號,讓我們一起在技術(shù)的世界里探秘吧!
