RoP重磅發(fā)布0.2.0版本: 架構(gòu)全新升級,消息準(zhǔn)確性達100%


導(dǎo)語
日前,騰訊云中間件團隊聯(lián)合StreamNative社區(qū)正式發(fā)布了RoP 0.2.0版本,該版本在架構(gòu)上全新升級,用戶在使用中可以完全避免消息丟失、消息重復(fù)消費、只能消費一部分 Partition 的數(shù)據(jù)等問題。

作者簡介
冉小龍
騰訊云中間件團隊研發(fā)工程師
Apache Pulsar committer
RoP 作者及 Maintainer
Apache BookKeeper contributor
Apache Pulsar Go client 作者
Apache Pulsar Go Functions作者
StreamNative/pulsarctl 作者

RoP的定義
與 KoP、MoP 和 AoP 相似,RoP 是一種可插拔的協(xié)議處理插件。
將 RoP 協(xié)議處理插件添加到現(xiàn)有 Pulsar 集群后,用戶無需修改代碼,便能將現(xiàn)有的 RocketMQ 應(yīng)用程序和服務(wù)遷移到 Pulsar,同時還能使用 Pulsar 的強大功能,例如:
計算與存儲分離
多租戶
跨地域復(fù)制
分層分片
輕量化計算框架 -- Pulsar Functions
...

RoP 0.2.0發(fā)布
2021年5月17日,騰訊云中間件團隊向社區(qū)貢獻了 RoP 0.1.0 的 beta 版本,RoP(RocketMQ on Pulsar) 是 將 RocketMQ 協(xié)議處理插件引入 Pulsar Broker,這樣 Pulsar 即可支持原生的 RocketMQ 協(xié)議,RocketMQ 用戶可以無縫遷移到 Apache Pulsar 。
今天,我們重磅發(fā)布RoP 0.2.0 ,該版本在架構(gòu)上全新升級,在功能和穩(wěn)定性上得到了更大的提升。提供了 ACL 鑒權(quán)和驗證的功能,可以更好的確保用戶數(shù)據(jù)的安全性,同時允許用戶對 Partitioned Topic 進行擴容,可以獲得更好的并發(fā)寫入能力, 并且完善了 RocketMQ 原生的管控端接口,可以更好的對服務(wù)進行處理和監(jiān)控。

最新功能優(yōu)化
在0.2.0版本中,騰訊云中間件團隊在0.1.0的架構(gòu)上進行全新設(shè)計,對MessageID 、消息路由模型進行重構(gòu),確保不同場景下 RoP 消息的準(zhǔn)確性。
主要有以下三點優(yōu)化內(nèi)容:
1、支持 RoP ACL 功能
ACL 機制是RocketMQ 社區(qū)自帶的一個能力,可以很好的對用戶的數(shù)據(jù)進行鑒權(quán)和認(rèn)證。RoP 0.2.0 版本復(fù)用了 RocketMQ 自身的 Hook 實現(xiàn),利用 Pulsar 自身的鑒權(quán)機制,實現(xiàn)了對用戶數(shù)據(jù)進行鑒權(quán)和認(rèn)證的功能。
RoP ACL 的使用方式依舊延續(xù)了 RocketMQ 的使用方式,只需定義 ACL_ACCESS_KEY 和 ACL_SECRET_KEY 字段,然后利用 RocketMQ 的 ACLRPCHook 函數(shù)加載即可,這樣可以確保用戶盡可能少的改動客戶端的業(yè)務(wù)代碼邏輯。
具體代碼示例如下:
private static final String ACL_ACCESS_KEY = "eyJrZXlJZCI6InJvY2tldG1xLW13bmI3bWFwMjhqZSIsImFsZyI6IkhTMjU2In0."+ "eyJzdWIiOiJyb2NrZXRtcS1td25iN21hcDI4amVfdGVzdCJ9.BDOjqqY25a6apnZTMZCqg0I0pxVFcqz7fvZbaTqkf5U"; // tokenprivate static final String ACL_SECRET_KEY = "rop";public static void producer() throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("rocketmq-mwnb7map28je|nit", "ProducerGroupName",getAclRPCHook());...}static RPCHook getAclRPCHook() {return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY, ACL_SECRET_KEY));}
ACL_ACCESS_KEY:即用戶在 Namespace 級別下創(chuàng)建的 Token。
ACL_SECRET_KEY:固定值,在 RoP 內(nèi)部解析時,不會使用到這個字段。
2、重構(gòu) MessageID?
RocketMQ 與 Kafka 類似,都是使用 64 位的 Offset 來唯一標(biāo)識一條消息,但是在 Pulsar 中,使用 64 位的 LedgerID、64 位的 EntryID 來唯一標(biāo)識一條消息。針對這個問題,在 RoP 0.1.0 中,我們使用如下的形式來構(gòu)造 MessageID 對象:

PartitionID: 8 位,可以允許一個 Topic 最多創(chuàng)建 256 個 Partitions
LedgerID: 32 位
EntryID: 24 位
使用如上的方式可能存在 MessageID 的消息精度丟失,在系統(tǒng)運行一段時間之后,無法繼續(xù)創(chuàng)建出新的 LedgerID,導(dǎo)致整個集群的服務(wù)對外不可用的情況。這個問題與早期的 KoP 版本所面臨的是同樣的困境,所以在 RoP 0.2.0 中,我們采用了和 KoP 相同的處理方式,使用?[PIP 70: Introduce lightweight broker entry metadata]的處理思路,在 Broker 的協(xié)議頭中,附加了一個 64 位的 index/publish-time 字段,這樣無需在客戶端側(cè)進行協(xié)議的解析即可在每一條消息中附加一個 64 位的字段來使用。
[PIP 70: Introduce lightweight broker entry metadata]https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata
PIP-70 是使用插件的方式進行加載的,所以在服務(wù)啟動時,我們需要做如下配置:
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
Note: Broker Entry Metadata 是在 Pulsar 2.8.0 的版本中才支持的,所以需要確保 Pulsar Broker 的版本在 2.8.0 及以上。
需要說明的是,RocketMQ 和 Kafka 在 Offset 的使用方式上又有所不同,RocketMQ 中有兩個 Offset,一個是 Queue Offset,用來表示消息在 MessageQueue 中的位置, MessageQueue 本質(zhì)上是一個數(shù)組,一條消息進來數(shù)組的下標(biāo)就會 +1。一個是 CommitLog Offset 用來表示消息存儲在 CommitLog 中的位置,消息存儲是由 ConsumeQueue 和 CommitLog 配合完成,ConsumeQueue 是邏輯隊列,CommitLog 是真正存儲消息文件的,ConsumeQueue 存儲的是指向物理存儲的地址。Topic 下的每個 MessageQueue 都有對應(yīng)的 ConsumeQueue 文件,內(nèi)容也會被持久化到磁盤。
所以,在 MessageID 重構(gòu)的實現(xiàn)中,區(qū)別于 Kafka 中只有一個全局的 Offset 來標(biāo)識消息的唯一性,在 RoP 中需要針對這兩種 Offset 的情況分別進行處理,具體如下:

RESERVED_BITS: 1 字節(jié)的保留位,避免第一個字節(jié)出現(xiàn)負(fù)數(shù)等情況導(dǎo)致 Offset 計算有誤。
RETRY_TOPIC_TAG_BITS:1 字節(jié)的標(biāo)記為,用來標(biāo)識這個 Topic 是否為 Retry 類型的 Topic。
PULSAR_PARTITION_ID_BITS:10 字節(jié)的 Partition Num,用來記錄一個 ?PartitionedTopic 下有多少個 Partitions,最大支持 1024 個 Partitions。
OFFSET_BITS:52 字節(jié)用來標(biāo)識消息的 Offset。
3、重構(gòu)消息的路由模型
在 RoP 0.1.0 的版本中,在消息路由的實現(xiàn)上,RocketMQ 和 Pulsar 都是首先通過 Topic Lookup 的操作找到對應(yīng)的 Owner Broker 節(jié)點,然后將該 Broker 的地址返回。但是在這個動作中,忽略了一個重要的問題,即 RocketMQ 與 Kafka 和 Pulsar 都是不同的,它的 Queue 不是全局唯一的。
RocketMQ 路由協(xié)議主要包括兩部分:
Broker服務(wù)的 IP 地址信息;
某個Broker上對應(yīng)的 Topic 分區(qū)總數(shù)以及分區(qū)可讀寫信息。
在 RocketMQ 路由協(xié)議中,沒有全局標(biāo)識 Topic 的分區(qū)的唯一ID(例如在Pulsar/Kafka中,分區(qū) ID 在集群中是唯一的);而在 RocketMQ 中,分區(qū)路由信息是由 Broker 標(biāo)識加上該 Broker 上的順序從 0→N 的 Index 來標(biāo)識 Topic 的分區(qū)。
因此 RocketMQ 協(xié)議中,客戶端只需要獲取到 Topic 對應(yīng) Broker 上分區(qū)總數(shù),就能通過計算獲得該 Broker 上分區(qū)的 ID;所有的請求都是基于【Broker-Tag】+【Broker-Topic-Seq】構(gòu)建唯一路由查詢原語來請求服務(wù)。簡單來說:RocketMQ的分區(qū)是有狀態(tài)的,他綁定在特定的Broker之上;分區(qū)一旦分配在某個Broker上,終身與之相關(guān)且不能遷移??蛻舳私馕龇謪^(qū)路由信息是通過計算得到;比如:某個TopicA有5個分區(qū),分別落在BrokerA和BrokerB上,BrokerA有3個,BrokerB有2個;那么協(xié)議記錄為(BrokerA,3)(BrokerB,2),客戶端通過計算就得到全部的分區(qū)數(shù)據(jù):
BrokerA-TopicA-0,BrokerA-TopicA-1,BrokerA-TopicA-2
BrokerB-TopicA-0, BrokerB-TopicA-1;
由于上面的路由關(guān)系的原因,所以沒有辦法通過 GET_ROUTEINTO_BY_TOPIC 這個協(xié)議請求去和Pulsar的Lookup協(xié)議去做映射。本質(zhì)原因是像Kafka/Pulsar這種,它的Partition信息是全局唯一的,在執(zhí)行 Topic 路由策略之后,能準(zhǔn)確的返回某一個Topic 的 Partition所對應(yīng)的Owner Broker是誰。但是RocketMQ的Topic路由返回的是兩個字段,一個是Broker Name,一個是Queue的數(shù)量。具體的QueueID,是Client根據(jù)Broker返回的數(shù)量固定的從0開始遞增計算。所以在Topic的路由映射中,RocketMQ和Pulsar自身的路由協(xié)議沒辦法一一映射。為了解決這個問題,在 RoP 0.2.0 中,抽象了一層 Proxy 用來維護 Topic 與 Broker 之間的映射關(guān)系。為了達到這個目的,這里主要有以下幾方面的事情需要考慮:
1、這些映射關(guān)系存儲在哪里?
2、如何分配路由關(guān)系?
3、當(dāng)路由關(guān)系發(fā)生變化之后,如何處理?
針對第一個問題,綜合考量,我們選擇將路由的映射關(guān)系存儲到 ZooKeeper?集群中來,因為當(dāng)前 RoP 的服務(wù)本身也需要依賴 ZooKeeper 集群,不會引入新的組件;其次 ZooKeeper 自身的一致性能力能很好的滿足這個場景需求。
針對第二個問題,我們是在 RoP 接口創(chuàng)建分區(qū)主題的同時,依次查找各個分區(qū)所在的 Broker 節(jié)點,依照初始主題所在節(jié)點信息為基準(zhǔn),將映射關(guān)系寫入到 ZooKeeper 集群中。這樣做的好處在于:
復(fù)用Pulsar自身分區(qū)分配機制計算的結(jié)果,實現(xiàn)簡單。
初始分配后,虛擬節(jié)點和物理節(jié)點處于一個節(jié)點上,性能好。
如果配合路由關(guān)系重平衡能力,可以實現(xiàn)最優(yōu)性能。
針對第三個問題,我們通過增加 Master-Slave 模式,可以減少單節(jié)點故障對系統(tǒng)的影響。ZooKeeper元數(shù)據(jù)如下,只需要增加Broker相關(guān)信息,即可實現(xiàn)各個節(jié)點的互為主從關(guān)系,達到主節(jié)點不可用時從節(jié)點可以繼續(xù)提供服務(wù)。由于當(dāng)前 Offset 信息都存儲在Compact Topic中,全部節(jié)點同時訂閱,所以各個節(jié)點的元數(shù)據(jù)可以保證一致,可以實現(xiàn)主從切換。下面是測試環(huán)境中部署 RoP 集群中的路由映射關(guān)系:

所以,為了保證 RoP 集群能有較好的容錯能力,在部署 RoP 集群中建議使用偶數(shù)臺節(jié)點。可以通過如下參數(shù)配置決定當(dāng)前 Master 節(jié)點有幾個 Slave 節(jié)點作為其備份節(jié)點:
RoPBrokerReplicationNum=2
假設(shè)有 6 臺 Broker 節(jié)點,RoPBrokerReplicationNum=2, 那么就說明此時只有三臺 Master Broker 節(jié)點對外提供服務(wù)。但是對于 Pulsar 來說,Broker 節(jié)點之間是對等的,當(dāng)創(chuàng)建 Topic 的時候,可能會分配到任意節(jié)點上,所以對于不在Owner Broker節(jié)點上的請求,在 RoP Proxy 層做了一層代理,會先對該 Topic 進行查找的操作,然后將請求轉(zhuǎn)發(fā)到 Owner Broker 的節(jié)點上來返回。

未來規(guī)劃
為了更好的踐行開源協(xié)同和開源共建的理念,目前,上述功能均已貢獻回社區(qū)。除此之外針對 RocketMQ 商業(yè)版本的任意延遲消息功能,騰訊云中間件團隊也基于 Pulsar 原生的特性開發(fā)了相關(guān)的插件來進行支持。RoP 的延遲消息功能除了支持多級別的延遲消息之外還具備支持任意延遲消息的能力。
之后,騰訊云中間件團隊將會在確保 RoP 項目穩(wěn)定的同時,持續(xù)開發(fā) RoP 相關(guān)的功能,諸如消息軌跡,消息查詢和回溯以及監(jiān)控等能力,進一步完善 RoP 的功能以及周邊生態(tài)。
注:
RoP 項目地址:
https://github.com/streamnative/rop

特別鳴謝
感謝騰訊云中間件團隊韓明澤和張勇華對本文提供的技術(shù)細(xì)節(jié)校驗和支持。
往期
推薦
《ZooKeeper系列文章:ZooKeeper 源碼和實踐揭秘(二)》
《服務(wù)器又崩了?深度解析高可用架構(gòu)的挑戰(zhàn)和實踐》
《Kratos技術(shù)系列|從Kratos設(shè)計看Go微服務(wù)工程實踐》
《Pulsar技術(shù)系列 - 深度解讀Pulsar Schema》
《Apache Pulsar事務(wù)機制原理解析|Apache Pulsar 技術(shù)系列》

掃描下方二維碼關(guān)注本公眾號,
了解更多微服務(wù)、消息隊列的相關(guān)信息!
解鎖超多鵝廠周邊!
戳原文,查看更多消息隊列TDMQ信息!
點個在看你最好看
