Rocketmq源碼分析15:延遲消息
注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.
rocketmq支持延遲消息,本文我們將從源碼角度分析延遲消息的實現(xiàn)原理。
1. demo 準(zhǔn)備
延遲消息的demo在org.apache.rocketmq.example.delay包下,發(fā)送消息的producer如下:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
String nameServer = "localhost:9876";
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr(nameServer);
producer.start();
for (int i = 0; i < 1; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// delayLevel=1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// delayTime =1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// 設(shè)置延遲延遲級別
msg.setDelayTimeLevel(5);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
rocketmq在實現(xiàn)延遲消息時,會準(zhǔn)備18個延遲級別,這些級別對應(yīng)的延遲時間如下:
| 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 1s | 5s | 10s | 30s | 1m | 2m | 3m | 4m | 5m | 6m | 7m | 8m | 9m | 10m | 20m | 30m | 1h | 2h |
在發(fā)送延遲消息時,需要指定消息的延遲級別:
msg.setDelayTimeLevel(5);
這里指定的延遲級別為5,即延遲1分鐘后發(fā)送。
2. 延遲消息的存儲
延遲消息與普通消息的發(fā)送并無太多差別,不過在broker在存儲延遲消息時,會做一些額外的處理,進(jìn)入CommitLog#asyncPutMessage方法:
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 消息的存儲時間
msg.setStoreTimestamp(System.currentTimeMillis());
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 延遲消息
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore
.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(
this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 指定延遲消息對應(yīng)的topic
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 延遲級別對應(yīng)的隊列,即每個延遲級別都對應(yīng)一條隊列
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// 原始的topic與queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// 省略消息寫入 commitLog 的操作
...
}
在延遲消息寫入前,會做一些特別處理,其實就是將消息的topic與queueId修改為延遲消息專用的topic與queueId。
獲取延遲隊列的方法為ScheduleMessageService#delayLevel2QueueId,代碼如下:
public static int delayLevel2QueueId(final int delayLevel) {
return delayLevel - 1;
}
這里的delayLevel,就對應(yīng)前面提到的18個延遲級別,這也就是說,每個延遲級別的消息都會有一個專門隊列來存儲。這樣存儲有何好處呢?最大的好處就是避免了排序,舉個簡單的例子:上午10:00broker收到了一條延遲消息1,延遲級別為5;然后在10:02又收到了一條延遲消息2,延遲級別也為5,由于延遲級別相同,他們會存儲在同一條隊列中.
由于隊列天生有序,入隊時間先按送達(dá)broker的時間先后進(jìn)行排序,而同一隊列上延遲時間也相同,因此延遲消息1一定會在延遲消息2前進(jìn)行消消費,后面如果有消息再進(jìn)入該隊列中,也會按照先進(jìn)先出的方式進(jìn)行消費。
3. 延遲消息的投遞
上一節(jié)分析了延遲消息的存儲,本節(jié)我們來分析延遲消息的消費。
延遲消息存儲到隊列后,會有一個專門的線程定期掃描這些隊列,找到滿足消費時間的消息,然后將其投遞到真正的topic與queueId中,這樣這條消息就能被consumer消息了。
處理延遲隊列掃描的線程為scheduleMessageService,它在DefaultMessageStore#start方法中啟動:
public void start() throws Exception {
...
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
this.haService.start();
// 這里處理延遲消息
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
...
}
繼續(xù)跟進(jìn),進(jìn)入DefaultMessageStore#handleScheduleMessageService 方法:
@Override
public void handleScheduleMessageService(final BrokerRole brokerRole) {
if (this.scheduleMessageService != null) {
if (brokerRole == BrokerRole.SLAVE) {
this.scheduleMessageService.shutdown();
} else {
// 啟動
this.scheduleMessageService.start();
}
}
}
繼續(xù)跟進(jìn),進(jìn)入ScheduleMessageService#start方法:
/**
* 延遲消息服務(wù)的啟動方式
*/
public void start() {
// CAS 鎖機(jī)制保證必須 shutdown 后才能再次start
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
// 定時執(zhí)行延遲消息處理任務(wù)
this.timer.schedule(
new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 每隔10s,將延遲消息的相關(guān)信息持久化到硬盤中
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
在這個線程中,主要做了兩件事:
遍歷所有的延遲級別,為每個延遲級別在延遲 FIRST_DELAY_TIME毫秒后就處理延遲消息的投遞操作開啟執(zhí)久化定時任務(wù):定時將延遲消息的相關(guān)信息持久化到硬盤中
3.1 投遞操作
處理延遲消息的投遞任務(wù)為DeliverDelayedMessageTimerTask#run方法,代碼如下:
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}
在這個方法中,調(diào)用了executeOnTimeup()方法繼續(xù)操作,我們再進(jìn)入ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup方法:
public void executeOnTimeup() {
// 獲得一條隊列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore
.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
log.error(...);
// 1. 消息的寫入的時間
long msgStoreTime = defaultMessageStore.getCommitLog()
.pickupStoreTimestamp(offsetPy, sizePy);
// 2. 計算投遞時間,投遞時間 = 消息寫入時間 + 延遲級別對應(yīng)的時間
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
// 處理投遞時間,保證投遞時間必須小于(當(dāng)前時間 + 延遲級別對應(yīng)的時間)
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
// 小于等于0,表示消費需要投遞
if (countdown <= 0) {
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC
.equals(msgInner.getTopic())) {
log.error(...);
continue;
}
// 3. 投遞操作
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null && putMessageResult
.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(...);
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
log.error(...);
}
}
} else {
// 4. 安排下一次執(zhí)行,執(zhí)行時間為 countdown 毫秒后
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
// 5. 更新偏移量
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
}
// 之后再執(zhí)行
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
// 更新偏移量
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
}
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error(...);
}
}
}
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
這個方法雖然有點長,但邏輯很清晰,執(zhí)行過程如下:
獲取消息寫入時間,就是寫入到 commitLog的時間計算投遞時間,投遞時間 = 消息寫入時間 + 延遲級別對應(yīng)的時間,如果當(dāng)前時間大于等于投遞時間,就表示消息需要進(jìn)行投遞操作 如果消息滿足投遞時間,就進(jìn)行投遞操作,所謂的投遞操作,就是將消息寫入到真正的 topic與queueId的隊列中如果當(dāng)前消息不滿足投遞時間,就表明該隊列上之后的消息也不會投遞時間,就計算投遞時間與當(dāng)前時間的差值,這個差值就是下次執(zhí)行 executeOnTimeup()方法的時間更新偏移量,就是記錄當(dāng)前隊列的消費位置
我們來看看偏移量的更新操作,進(jìn)入ScheduleMessageService#updateOffset方法:
public class ScheduleMessageService extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final long FIRST_DELAY_TIME = 1000L;
private static final long DELAY_FOR_A_WHILE = 100L;
private static final long DELAY_FOR_A_PERIOD = 10000L;
/** 延遲級別對應(yīng)的延遲時間,單位為毫秒 */
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
/** 延遲級別對應(yīng)的偏移量 */
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
private final DefaultMessageStore defaultMessageStore;
private final AtomicBoolean started = new AtomicBoolean(false);
private Timer timer;
private MessageStore writeMessageStore;
private int maxDelayLevel;
...
/**
* 更新偏移量的操作
*/
private void updateOffset(int delayLevel, long offset) {
this.offsetTable.put(delayLevel, offset);
}
...
}
可以看到,這里的更新偏移量,就是將當(dāng)前延遲級別消費位置的偏移量添加到offsetTable中進(jìn)行保存。
3.2 持久化
讓我們回到``ScheduleMessageService#start`方法,這個方法中開啟了一個持久化任務(wù):
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
該任務(wù)會定期執(zhí)行ConfigManager#persist方法進(jìn)行持久化操作:
public synchronized void persist() {
String jsonString = this.encode(true);
if (jsonString != null) {
String fileName = this.configFilePath();
try {
MixAll.string2File(jsonString, fileName);
} catch (IOException e) {
log.error("persist file " + fileName + " exception", e);
}
}
}
這個方法主要進(jìn)行了兩個操作:
調(diào)用 this.encode(true)得到json字符串將 json字符串寫入到文件中
這個json字符串是個啥呢?我們進(jìn)入ScheduleMessageService#encode(boolean)方法:
public String encode(final boolean prettyFormat) {
DelayOffsetSerializeWrapper delayOffsetSerializeWrapper
= new DelayOffsetSerializeWrapper();
// 這個 offsetTable 就是用來保存消費位置偏移量的
delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
return delayOffsetSerializeWrapper.toJson(prettyFormat);
}
從代碼來看,這個方法就是將ScheduleMessageService#offsetTable序列化成json字符串的, 這個 offsetTable 就是用來保存消費位置偏移量的。由此不難得出這個定時任務(wù)的作用:定期將延遲隊列的消費位置偏移量持久化到文件中。
4. 總結(jié)
RocketMq支持了18種延遲級別,每個延遲級別對應(yīng)不同的延遲時間延遲消息對應(yīng)著一個 topic,每個延遲級別都對應(yīng)著該topic下的一個隊列當(dāng) broker收到延遲消息后,會將該消息放入到延遲級別對應(yīng)的延遲消息中消息投遞由定時線程執(zhí)行,當(dāng)消息達(dá)到投遞時間后,會從延遲隊列中寫入到真正需要投遞的隊列中
客觀來說,開源版 RocketMq 的延遲消息比較簡陋,僅支持18種延遲級別,而阿里云版可指定發(fā)送時間。
限于作者個人水平,文中難免有錯誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。
本文首發(fā)于微信公眾號 「Java技術(shù)探秘」,如果您喜歡本文,歡迎關(guān)注該公眾號,讓我們一起在技術(shù)的世界里探秘吧!
