顛覆Kafka的統(tǒng)治,新一代云原生消息系統(tǒng)Pulsar震撼來(lái)襲!

導(dǎo)語(yǔ)?|?在信息流場(chǎng)景,內(nèi)容的請(qǐng)求處理、原子模塊調(diào)度、結(jié)果的分發(fā)等至關(guān)重要,將會(huì)直接影響到內(nèi)容的外顯、推薦、排序等?;谙?00%成功的要求,我對(duì)Pulsar進(jìn)行了調(diào)研,并采用Pulsar實(shí)現(xiàn)消息的可靠處理。本文主要參考Pulsar的官方文檔和技術(shù)文章,對(duì)Pulsar的特性、機(jī)制、原理等進(jìn)行整理總結(jié)。
一、Pulsar概述
Apache Pulsar是Apache軟件基金會(huì)頂級(jí)項(xiàng)目,是下一代云原生分布式消息流平臺(tái),集消息、存儲(chǔ)、輕量化函數(shù)式計(jì)算為一體,采用計(jì)算與存儲(chǔ)分離架構(gòu)設(shè)計(jì),支持多租戶、持久化存儲(chǔ)、多機(jī)房跨區(qū)域數(shù)據(jù)復(fù)制,具有強(qiáng)一致性、高吞吐、低延時(shí)及高可擴(kuò)展性等流數(shù)據(jù)存儲(chǔ)特性,被看作是云原生時(shí)代實(shí)時(shí)消息流傳輸、存儲(chǔ)和計(jì)算最佳解決方案。
Pulsar是一個(gè)pub-sub (發(fā)布-訂閱)模型的消息隊(duì)列系統(tǒng)。
(一)Pulsar架構(gòu)
Pulsar由Producer、Consumer、多個(gè)Broker、一個(gè)BookKeeper集群、一個(gè)Zookeeper集群構(gòu)成,具體如下圖所示。

Producer:數(shù)據(jù)生成者,即發(fā)送消息的一方。生產(chǎn)者負(fù)責(zé)創(chuàng)建消息,將其投遞到Pulsar中。
Consumer:數(shù)據(jù)消費(fèi)者,即接收消息的一方。消費(fèi)者連接到 Pulsar 并接收消息,進(jìn)行相應(yīng)的業(yè)務(wù)處理。
Broker:無(wú)狀態(tài)的服務(wù)層,負(fù)責(zé)接收消息、傳遞消息、集群負(fù)載均衡等操作,Broker不會(huì)持久化保存元數(shù)據(jù)。
BookKeeper:有狀態(tài)的持久層,包含多個(gè)Bookie,負(fù)責(zé)持久化地存儲(chǔ)消息。
ZooKeeper:存儲(chǔ)Pulsar、BookKeeper的元數(shù)據(jù),集群配置等信息,負(fù)責(zé)集群間的協(xié)調(diào)(例如:Topic與Broker的關(guān)系)、服務(wù)發(fā)現(xiàn)等。
從Pulsar的架構(gòu)圖上可以看出,Pulsar在架構(gòu)設(shè)計(jì)上采用了計(jì)算與存儲(chǔ)分離的模式,發(fā)布/訂閱相關(guān)的計(jì)算邏輯在Broker上完成,而數(shù)據(jù)的持久化存儲(chǔ)交由BookKeeper去實(shí)現(xiàn)。
Broker擴(kuò)展
在Pulsar中Broker是無(wú)狀態(tài)的,當(dāng)需要支持更多的消費(fèi)者或生產(chǎn)者時(shí),可以簡(jiǎn)單地添加更多的Broker節(jié)點(diǎn)來(lái)滿足業(yè)務(wù)需求。Pulsar支持自動(dòng)的分區(qū)負(fù)載均衡,在Broker節(jié)點(diǎn)的資源使用率達(dá)到閾值時(shí),會(huì)將負(fù)載遷移到負(fù)載較低的Broker節(jié)點(diǎn),這個(gè)過(guò)程中分區(qū)也將在多個(gè)Broker節(jié)點(diǎn)中做平衡遷移,一些分區(qū)的所有權(quán)會(huì)轉(zhuǎn)移到新的Broker節(jié)點(diǎn)。在后面Bundle小節(jié)會(huì)具體介紹這部分的實(shí)現(xiàn)。
Bookie擴(kuò)展
存儲(chǔ)層的擴(kuò)容,通過(guò)增加Bookie節(jié)點(diǎn)來(lái)實(shí)現(xiàn)。在BooKie擴(kuò)容的階段,由于分片機(jī)制,整個(gè)過(guò)程不會(huì)涉及到不必要的數(shù)據(jù)搬遷,即不需要將舊數(shù)據(jù)從現(xiàn)有存儲(chǔ)節(jié)點(diǎn)重新復(fù)制到新存儲(chǔ)節(jié)點(diǎn)。在后續(xù)的Bookkeeper小節(jié)中會(huì)具體介紹。
(二)Topic
和其他消息隊(duì)列類似,Pulsar中也有Topic。Topic即在生產(chǎn)者與消費(fèi)者中傳輸消息的通道。消息可以以Topic為單位進(jìn)行歸類,生產(chǎn)者負(fù)責(zé)將消息發(fā)送到特定的Topic,而消費(fèi)者指定特定的Topic進(jìn)行消費(fèi)。
分區(qū)Topic(Topic-Partition)
Pulsar的Topic可以分為非分區(qū)Topic和分區(qū)Topic。
普通的Topic僅僅被保存在單個(gè)Broker中,這限制了Topic的最大吞吐量。分區(qū)Topic是一種特殊類型的主題,支持被多個(gè)Broker處理,從而實(shí)現(xiàn)更高的吞吐量。
針對(duì)一個(gè)Topic,可以設(shè)置多個(gè)Topic分區(qū)來(lái)提高Topic的吞吐量。每個(gè)Topic Partition由Pulsar分配給某個(gè)Broker,該Broker稱為該Topic Partition的所有者。生成者和消費(fèi)者會(huì)與每個(gè)Topic分區(qū)的Broker創(chuàng)建鏈接,發(fā)送消息并消費(fèi)消息。
如下圖所示,Topic1有Partition1、Partition2、Partition3、Partition4、Partition5五個(gè)分區(qū),Partition1和Partition4由Broker1處理,Partition2和Partition5由Broker2處理,Partition3由Broker3處理。

從Pulsar社區(qū)版的golang-sdk可以看出,客戶端的Producer和Consumer在初始化的時(shí)候,都會(huì)與每一個(gè)Topic-Partition創(chuàng)建鏈接,并且會(huì)監(jiān)聽(tīng)是否有新的Partition,以創(chuàng)建新的鏈接。
非持久Topic
默認(rèn)情況下,Pulsar會(huì)保存所有沒(méi)確認(rèn)的消息到BookKeeper中。持久Topic的消息在Broker重啟或者Consumer出現(xiàn)問(wèn)題時(shí)保存下來(lái)。
除了持久Topic,Pulsar也支持非持久Topic。這些Topic的消息只存在于內(nèi)存中,不會(huì)存儲(chǔ)到磁盤。
因?yàn)锽roker不會(huì)對(duì)消息進(jìn)行持久化存儲(chǔ),當(dāng)Producer將消息發(fā)送到Broker時(shí),Broker可以立即將ack返回給Producer,所以非持久Topic的消息傳遞會(huì)比持久Topic的消息傳遞更快一些。相對(duì)的,當(dāng)Broker因?yàn)橐恍┰蝈礄C(jī)、重啟后,非持久Topic的消息都會(huì)消失,訂閱者將無(wú)法收到這些消息。
重試Topic
由于業(yè)務(wù)邏輯處理出現(xiàn)異常,消息一般需要被重新消費(fèi)。Pulsar支持生產(chǎn)者同時(shí)將消息發(fā)送到普通的Topic和重試Topic,并指定允許延時(shí)和最大重試次數(shù)。當(dāng)配置了允許消費(fèi)者自動(dòng)重試時(shí),如果消息沒(méi)有被消費(fèi)成功,會(huì)被保存到重試Topic中,并在指定延時(shí)時(shí)間后,重新被消費(fèi)。
死信Topic
當(dāng)Consumer消費(fèi)消息出錯(cuò)時(shí),可以通過(guò)配置重試Topic對(duì)消息進(jìn)行重試,但是,如果當(dāng)消息超過(guò)了最大的重試次數(shù)仍處理失敗時(shí),該怎么辦呢?Pulsar提供了死信Topic,通過(guò)配置deadLetterTopic,當(dāng)消息達(dá)到最大重試次數(shù)的時(shí)候,Pulsar會(huì)將消息推送到死信Topic中進(jìn)行保存。
(三)訂閱(subscription)
通過(guò)訂閱的方式,我們可以指定消息如何投遞給消費(fèi)者。
訂閱類型(Subscription type)
Pulsar支持獨(dú)占(Exclusive)、災(zāi)備(Failover)、共享(Shared)、Key_Shared這四種訂閱類型。
獨(dú)占(Exclusive)SinglePartition
Exclusive下,只允許Subscription存在一個(gè)消費(fèi)者,如果多個(gè)消費(fèi)者使用同一個(gè)訂閱名稱去訂閱同一個(gè)Topic,則會(huì)報(bào)錯(cuò)。如下圖,只有Consumer A-0可以消費(fèi)數(shù)據(jù)。

災(zāi)備(Failover)
Failover下,一個(gè)Subscription中可以有多個(gè)消費(fèi)者,但只有Master Consumer可以消費(fèi)數(shù)據(jù)。當(dāng)Master Consumer斷開(kāi)連接時(shí),消息會(huì)由下一個(gè)被選中的Consumer進(jìn)行消費(fèi)。
分區(qū)Topic:Broker會(huì)按照消費(fèi)者的優(yōu)先級(jí)和消費(fèi)名的順序?qū)οM(fèi)者進(jìn)行排序,將Topic均勻地分配給優(yōu)先級(jí)最高的消費(fèi)者。
非分區(qū)Topic:Broker會(huì)根據(jù)消費(fèi)者訂閱的非分區(qū)Topic的時(shí)間順序選擇消費(fèi)者。
如下圖,Consumer-B-0是Master Consumer。當(dāng)Consumer-B-0發(fā)生問(wèn)題與Broker斷開(kāi)連接時(shí),Consumer-B-1將成為下一個(gè)Master Consumer來(lái)消費(fèi)數(shù)據(jù)。

共享(Shared)
shared中,多個(gè)消費(fèi)者可以綁定到同一個(gè)Subscription上。消息通過(guò) round robin即輪詢機(jī)制分發(fā)給不同的消費(fèi)者,并且每個(gè)消息僅會(huì)被分發(fā)給一個(gè)消費(fèi)者。當(dāng)消費(fèi)者斷開(kāi)連接,所有被發(fā)送給消費(fèi)者但沒(méi)有被確認(rèn)的消息將被重新處理,分發(fā)給其它存活的消費(fèi)者。
如下圖, Consumer-C-1、Consumer-C-2、Consumer-C-3都可以訂閱 Topic消費(fèi)數(shù)據(jù)。

Key_Shared
Key_Shared中,多個(gè)Consumer可以綁定到同一個(gè)Subscription。消息在傳遞給Consumer時(shí),具有相同鍵的消息只會(huì)傳遞給同一個(gè)Consumer。

訂閱模式(Subscription modes)
訂閱模式有持久化和非持久化兩種。訂閱模式取決于游標(biāo)(cursor)的類型。
創(chuàng)建訂閱時(shí),將創(chuàng)建一個(gè)相關(guān)的游標(biāo)來(lái)記錄最后使用的位置。當(dāng)訂閱的consumer重新啟動(dòng)時(shí),它可以從它所消費(fèi)的最后一條消息繼續(xù)消費(fèi)。
Durable(持久訂閱):游標(biāo)是持久性的,會(huì)保留消息并保持游標(biāo)記錄的位置。當(dāng)Broker重新啟動(dòng)時(shí),可以從BookKeeper中恢復(fù)游標(biāo),消息可以從游標(biāo)上次記錄的位置繼續(xù)消費(fèi)。默認(rèn)情況下,都是持久化訂閱。
NonDurable(非持久訂閱):游標(biāo)不是持久性的,當(dāng)Broker宕機(jī)時(shí),游標(biāo)會(huì)丟失并無(wú)法恢復(fù),所以消息無(wú)法繼續(xù)從上次消費(fèi)的位置開(kāi)始繼續(xù)消費(fèi)。
一個(gè)訂閱可以有一個(gè)或多個(gè)消費(fèi)者。當(dāng)使用者訂閱主題時(shí),它必須指定訂閱名稱。持久訂閱和非持久訂閱可以具有相同的名稱,它們彼此獨(dú)立。如果使用者指定了以前不存在的訂閱,則會(huì)自動(dòng)創(chuàng)建訂閱。
默認(rèn)情況下,沒(méi)有任何持久訂閱的Topic的消息將被標(biāo)記為已刪除。如果要防止消息被標(biāo)記為已刪除,可以為此Topic創(chuàng)建持久訂閱。在這種情況下,只有被確認(rèn)的消息才會(huì)被標(biāo)記為已刪除。
多主題訂閱
當(dāng)Consumer訂閱Topic時(shí),默認(rèn)指定訂閱一個(gè)主題。從Pulsar的1.23.0-incubating的版本開(kāi)始,Pulsar消費(fèi)者可以同時(shí)訂閱多個(gè)Topic??梢酝ㄟ^(guò)兩種方式進(jìn)行訂閱:
正則表達(dá)式,例如:
persistent://public/default/finance-.*
明確指定Topic列表。
二、Pulsar生產(chǎn)者(Producer)
Producer是連接topic的程序,它將消息發(fā)布到一個(gè)Pulsar?broker上。
(一)訪問(wèn)模式
消息生成者有多種模式訪問(wèn)Topic ,可以使用以下幾種方式將消息發(fā)送到Topic。
Shared:默認(rèn)情況下,多個(gè)生成者可以將消息發(fā)送到同一個(gè)Topic。
Exclusive:在這種模式下,只有一個(gè)生產(chǎn)者可以將消息發(fā)送到Topic ,當(dāng)其他生產(chǎn)者嘗試發(fā)送消息到這個(gè)Topic時(shí),會(huì)發(fā)生錯(cuò)誤。只有獨(dú)占Topic的生產(chǎn)者發(fā)生宕機(jī)時(shí)(Network Partition)該生產(chǎn)者會(huì)被驅(qū)逐,新的生產(chǎn)者才能產(chǎn)生并向Topic發(fā)送消息。
WaitForExclusive:在這種模式下,只有一個(gè)生產(chǎn)者可以將消息發(fā)送到Topic。當(dāng)已有生成者和Topic建立連接時(shí),其他生產(chǎn)者的創(chuàng)建會(huì)被掛起而不會(huì)產(chǎn)生錯(cuò)誤。如果想要采用領(lǐng)導(dǎo)者選舉機(jī)制來(lái)選擇消費(fèi)者的話,可以采用這種模式。
(二)路由模式
當(dāng)將消息發(fā)送到分區(qū)Topic時(shí),需要指定消息的路由模式,這決定了消息將會(huì)被發(fā)送到哪個(gè)分區(qū)Topic。Pulsar有以下三種消息路由模式,RoundRobinPartition為默認(rèn)路由模式。
RoundRobinPartition:如果消息沒(méi)有指定key,為了達(dá)到最大吞吐量,生產(chǎn)者會(huì)以round-robin (輪詢)方式將消息發(fā)布到所有分區(qū)。請(qǐng)注意round-robin并不是作用于每條單獨(dú)的消息,而是作用于延遲處理的批次邊界,以確保批處理有效。如果消息指定了key,分區(qū)生產(chǎn)者會(huì)根據(jù)key的hash值將該消息分配到對(duì)應(yīng)的分區(qū)。這是默認(rèn)的模式。
SinglePartition:如果消息沒(méi)有指定key,生產(chǎn)者將會(huì)隨機(jī)選擇一個(gè)分區(qū),并發(fā)布所有消息到這個(gè)分區(qū)。如果消息指定了key,分區(qū)生產(chǎn)者會(huì)根據(jù)key的hash值將該消息分配到對(duì)應(yīng)的分區(qū)。
CustomPartition:自定義模式,用戶可以創(chuàng)建自定義路由模式,通過(guò)實(shí)現(xiàn)MessageRouter接口實(shí)現(xiàn)自定義路由規(guī)則。
(三)批量處理
Pulsar支持對(duì)消息進(jìn)行批量處理。批量處理啟用后,Producer會(huì)在一次請(qǐng)求中累積并發(fā)送一批消息。批量處理時(shí)的消息數(shù)量取決于最大消息數(shù)(單次批量處理請(qǐng)求可以發(fā)送的最大消息數(shù))和最大發(fā)布延遲(單個(gè)請(qǐng)求的最大發(fā)布延遲時(shí)間)決定。開(kāi)啟批量處理后,積壓的數(shù)量是批量處理的請(qǐng)求總數(shù),而不是消息總數(shù)。
索引確認(rèn)機(jī)制
通常情況下,只有Consumer確認(rèn)了批量請(qǐng)求中的所有消息,這個(gè)批量請(qǐng)求才會(huì)被認(rèn)定為已處理。當(dāng)這批消息沒(méi)有全部被確認(rèn)的情況下,發(fā)生故障時(shí),會(huì)導(dǎo)致一些已確認(rèn)的消息被重復(fù)確認(rèn)。
為了避免Consumer重復(fù)消費(fèi)已確認(rèn)的消息,Pulsar從Pulsar 2.6.0開(kāi)始采用批量索引確認(rèn)機(jī)制。如果啟用批量索引確認(rèn)機(jī)制,Consumer將篩選出已被確認(rèn)的批量索引,并將批量索引確認(rèn)請(qǐng)求發(fā)送給Broker。Broker維護(hù)批量索引的確認(rèn)狀態(tài)并跟蹤每批索引的確認(rèn)狀態(tài),以避免向Consumer發(fā)送已確認(rèn)的消息。當(dāng)該批信息的所有索引都被確認(rèn)后,該批信息將被刪除。
默認(rèn)情況下,索引確認(rèn)機(jī)制處于關(guān)閉狀態(tài)。開(kāi)啟索引確認(rèn)機(jī)制將產(chǎn)生導(dǎo)致更多內(nèi)存開(kāi)銷。
key-based batching
key_shared模式下,Broker會(huì)根據(jù)消息的key來(lái)分發(fā)消息,但默認(rèn)的批量處理模式,無(wú)法保證將所有的相同的key都打包到同一批中,而且Consumer在接收到批數(shù)據(jù)時(shí),會(huì)默認(rèn)把第一個(gè)消息的key當(dāng)作這批消息的key,這會(huì)導(dǎo)致消息的錯(cuò)亂。因此key_shared模式下,不支持默認(rèn)的批量處理。
key-based batching能夠確保Producer在打包消息時(shí),將相同key的消息打包到同一批中,從而consumer在消費(fèi)的時(shí)候,也能夠消費(fèi)到指定key的批數(shù)據(jù)。
沒(méi)有指定key的消息在打包成批后,這一批數(shù)據(jù)也是沒(méi)有key的,Broker在分發(fā)這批消息時(shí),會(huì)使用NON_KEY作為這批消息的key。
(四)消息分塊
啟用分塊后,如果消息大小超過(guò)允許發(fā)送的最大消息大小時(shí),Producer會(huì)將原始消息分割成多個(gè)分塊消息,并將分塊消息與消息的元數(shù)據(jù)按順序發(fā)送到Broker。
在Broker中,分塊消息會(huì)和普通消息以相同的方式存儲(chǔ)在Ledger中。唯一的區(qū)別是,Consumer需要緩存分塊消息,并在接收到所有的分塊消息后將其合并成真正的消息。如果Producer不能及時(shí)發(fā)布消息的所有分塊,Consumer不能在消息的過(guò)期時(shí)間內(nèi)接收到所有的分塊,那么Consumer已接收到的分塊消息就會(huì)過(guò)期。
Consumer會(huì)將分塊的消息拼接在一起,并將它們放入接收器隊(duì)列中。客戶端從接收器隊(duì)列中消費(fèi)消息。當(dāng)Consumer消費(fèi)到原始的大消息并確認(rèn)后,Consumer就會(huì)發(fā)送與該大消息關(guān)聯(lián)的所有分塊消息的確認(rèn)。
處理一個(gè)producer和一個(gè)訂閱consumer的分塊消息
如下圖所示,當(dāng)生產(chǎn)者向主題發(fā)送一批大的分塊消息和普通的非分塊消息時(shí)。假設(shè)生產(chǎn)者發(fā)送的消息為M1,M1有三個(gè)分塊M1-C1,M1-C2和M1-C3。這個(gè)Broker在其管理的Ledger里面保存所有的三個(gè)塊消息,然后以相同的順序分發(fā)給消費(fèi)者(獨(dú)占/災(zāi)備模式)。消費(fèi)者將在內(nèi)存緩存所有的塊消息,直到收到所有的消息塊。將這些消息合并成為原始的消息M1,發(fā)送給處理進(jìn)程。

多個(gè)生產(chǎn)者和一個(gè)生產(chǎn)者處理塊消息
當(dāng)多個(gè)生產(chǎn)者發(fā)布?jí)K消息到單個(gè)主題,這個(gè)Broker在同一個(gè)Ledger里面保存來(lái)自不同生產(chǎn)者的所有塊消息。如下所示,生產(chǎn)者1發(fā)布的消息M1,M1 由M1-C1,M1-C2和M1-C3三個(gè)塊組成。生產(chǎn)者2發(fā)布的消息M2,M2由M2-C1,M2-C2和M2-C3三個(gè)塊組成。這些特定消息的所有分塊是順序排列的,但是其在Ledger里面可能不是連續(xù)的。這種方式會(huì)給消費(fèi)者帶來(lái)一定的內(nèi)存負(fù)擔(dān)。因?yàn)橄M(fèi)者會(huì)為每個(gè)大消息在內(nèi)存開(kāi)辟一塊緩沖區(qū),以便將所有的塊消息合并為原始的大消息。

三、Pulsar消費(fèi)者(Consumer)
Consumer是通過(guò)訂閱關(guān)系連接Topic,接收消息的程序。
Consumer向Broker發(fā)送flow permit request以獲取消息。在 Consumer端有一個(gè)隊(duì)列,用于接收從Broker推送來(lái)的消息。
(一)消息確認(rèn)
Pulsar提供兩種確認(rèn)模式:
累積確認(rèn):消費(fèi)者只需要確認(rèn)最后一條收到的消息,在此之前的消息,都不會(huì)被再次發(fā)送給消費(fèi)者。
單條確認(rèn):消費(fèi)者需要確認(rèn)每條消息并發(fā)送ack給Broker。

如圖,上方為累積確認(rèn)模式,當(dāng)消費(fèi)者發(fā)送M12的確認(rèn)消息給Broker后,Broker會(huì)把M12之前的消息和M12一樣都標(biāo)記為已確認(rèn)。下方為單條確認(rèn)模式,當(dāng)消費(fèi)者發(fā)送M7的確認(rèn)消息給Broker后,Broker會(huì)把M7這條消息標(biāo)記為已確認(rèn)。當(dāng)消費(fèi)者發(fā)送M12的確認(rèn)消息給Broker后,Broker會(huì)把M12這條消息標(biāo)記為已確認(rèn)。
需要注意的是,訂閱模式中的shared模式是不支持累積確認(rèn)的。因?yàn)樵撚嗛喣J较碌拿總€(gè)消費(fèi)者都能消費(fèi)數(shù)據(jù),無(wú)法保證單個(gè)消費(fèi)者的消費(fèi)消息的時(shí)序和順序。
AcknowledgmentsGroupingTracker
消息的單條確認(rèn)和累積確認(rèn)并不是直接發(fā)送確認(rèn)請(qǐng)求給Broker,而是把請(qǐng)求轉(zhuǎn)交給AcknowledgmentsGroupingTracker處理。
為了保證消息確認(rèn)的性能,并避免Broker接收到非常高并發(fā)的ack請(qǐng)求,Tracker默認(rèn)支持批量確認(rèn),即使是單條消息的確認(rèn),也會(huì)先進(jìn)入隊(duì)列,然后再一批發(fā)往Broker。在創(chuàng)建consumer的時(shí)候,可以設(shè)置acknowledgementGroupTimeMicros,默認(rèn)情況下,每100ms或者堆積超過(guò)1000時(shí),AcknowledgmentsGroupingTracker會(huì)發(fā)送一批確認(rèn)請(qǐng)求。如果設(shè)置為0,則每次確認(rèn)消息后,Consumer都會(huì)立即發(fā)送確認(rèn)請(qǐng)求。
(二)取消確認(rèn)
當(dāng)Consumer無(wú)法處理一條消息并想重新消費(fèi)時(shí),Consumer可以發(fā)送一個(gè)取消確認(rèn)的消息給Broker,Broker會(huì)重新將這條消息發(fā)送給Consumer。
如果啟用了批量處理,那這一批中的所有消息都會(huì)重新發(fā)送給消費(fèi)者。
消息取消確認(rèn)也有單條取消模式和累積取消模式,取決于消費(fèi)者使用的訂閱模式。
在Exclusive模式和Failover訂閱模式中,消費(fèi)者僅僅只能對(duì)收到的最后一條消息進(jìn)行取消確認(rèn)。
在Shared和Key_Shared的訂閱類型中,消費(fèi)者可以單獨(dú)否定確認(rèn)消息。
如果啟用了批量處理,那這一批中的所有消息都會(huì)重新發(fā)送給消費(fèi)者。
NegativeAcksTracker
取消確認(rèn)和其他消息確認(rèn)一樣,不會(huì)立即請(qǐng)求Broker,而是把請(qǐng)求轉(zhuǎn)交NegativeAcksTracker進(jìn)行處理。Tracker中記錄著每條消息以及需要延遲的時(shí)間。Tracker默認(rèn)是33ms左右一個(gè)時(shí)間刻度進(jìn)行檢查,默認(rèn)延遲時(shí)間是1分鐘,抽取出已經(jīng)到期的消息并觸發(fā)重新投遞。Tracker存在的意義是為了合并請(qǐng)求。另外如果延遲時(shí)間還沒(méi)到,消息會(huì)暫存在內(nèi)存,如果業(yè)務(wù)側(cè)有大量的消息需要延遲消費(fèi),還是建議使用reconsumeLater接口。NegativeAck唯一的好處是不需要每條消息都指定時(shí)間,可以全局設(shè)置延遲時(shí)間。
(三)redelivery backoff機(jī)制
通常情況下可以使用取消確認(rèn)來(lái)達(dá)到處理消息失敗后重新處理消息的目的,但通過(guò)redelivery backoff可以更好的實(shí)現(xiàn)這種目的??梢酝ㄟ^(guò)指定消息重試的次數(shù)、消息重發(fā)的延遲來(lái)重新消費(fèi)處理失敗的消息。
(四)確認(rèn)超時(shí)
除了取消確認(rèn)和redelivery backoff機(jī)制外,還可以通過(guò)開(kāi)啟自動(dòng)重傳遞機(jī)制來(lái)處理未確認(rèn)的消息。啟用自動(dòng)重傳遞后,client會(huì)在ackTimeout時(shí)間內(nèi)跟蹤未確認(rèn)的消息,并在消息確認(rèn)超時(shí)后自動(dòng)向代理重新發(fā)送未確認(rèn)的消息請(qǐng)求。
如果開(kāi)啟了批量處理,那這批消息都會(huì)重新發(fā)送給Consumer。
與確認(rèn)超時(shí)相比,取消確認(rèn)會(huì)更合適。因?yàn)槿∠_認(rèn)能更精確地控制單個(gè)消息的再交付,并避免在消息處理時(shí)引起的超過(guò)確認(rèn)超時(shí)而導(dǎo)致無(wú)效的再重傳。
(五)消息預(yù)拉取
Consumer客戶端SDK會(huì)默認(rèn)預(yù)先拉取消息到Consumer本地,Broker側(cè)會(huì)把這些已經(jīng)推送到Consumer本地的消息記錄為pendingAck,這些消息既不會(huì)再投遞給別的消費(fèi)者,也不會(huì)ack超時(shí),除非當(dāng)前Consumer被關(guān)閉,消息才會(huì)被重新投遞。Broker側(cè)有一個(gè)RedeliveryTracker接口,這個(gè)Tracker會(huì)記錄消息到底被重新投遞了多少次,每條消息推送給消費(fèi)者時(shí),會(huì)先從Tracker的哈希表中查詢一下重投遞的次數(shù),和消息一并推送給消費(fèi)者。
(六)未確認(rèn)的消息處理
如果消息被消費(fèi)者消費(fèi)后一直沒(méi)有確認(rèn)怎么辦?
unAckedMessageTracker中維護(hù)了一個(gè)時(shí)間輪,時(shí)間輪的刻度根據(jù)ackTimeout、tickDurationInMs這兩個(gè)參數(shù)生成,每個(gè)刻度時(shí)間=ackTimeout/tickDurationInMs。新追蹤的消息會(huì)放入最后一個(gè)刻度,每次調(diào)度都會(huì)移除隊(duì)列頭第一個(gè)刻度,并新增一個(gè)刻度放入隊(duì)列尾,保證刻度總數(shù)不變。每次調(diào)度,隊(duì)列頭刻度里的消息將會(huì)被清理,unAckedMessageTracker會(huì)自動(dòng)把這些消息做重投遞。
重投遞就是客戶端發(fā)送一個(gè)redeliverUnacknowledgedMessages命令給Broker。每一條推送給消費(fèi)者但是未ack的消息,在Broker側(cè)都會(huì)有一個(gè)集合來(lái)記錄(pengdingAck),這是用來(lái)避免重復(fù)投遞的。觸發(fā)重投遞后,Broker會(huì)把對(duì)應(yīng)的消息從這個(gè)集合里移除,然后這些消息就可以再次被消費(fèi)了。
四、Pulsar服務(wù)端?
Broker是Pulsar的一個(gè)無(wú)狀態(tài)組件,主要負(fù)責(zé)運(yùn)行以下兩個(gè)組件:
http服務(wù):提供為生產(chǎn)者和消費(fèi)者管理任務(wù)和Topic查找的REST API。Producer通過(guò)連接到Broker來(lái)發(fā)送消息,Consumer通過(guò)連接到Broker來(lái)接收消息。
調(diào)度器:提供異步http服務(wù),用于二進(jìn)制數(shù)據(jù)的傳輸。
(一)消息確認(rèn)與留存
Pulsar Broker會(huì)默認(rèn)刪除已經(jīng)被所有Consumer確認(rèn)的消息,并以backlog的方式持久化存儲(chǔ)所有未被確認(rèn)的內(nèi)消息。
Pulsar的message retention(消息留存) 和message expiry (消息過(guò)期)這兩個(gè)特性可以調(diào)整Broker的默認(rèn)設(shè)置。
Message retention: 保留Consumer已確認(rèn)的消息。
通過(guò)留存規(guī)則的設(shè)定,可以保證已經(jīng)被確認(rèn)且符合留存規(guī)則的消息持久地保存在Pulsar中,而沒(méi)有被留存規(guī)則覆蓋、已經(jīng)被確認(rèn)的消息會(huì)被刪除。

Message expire(消息過(guò)期):設(shè)置未確認(rèn)消息的存活時(shí)長(zhǎng)(TTL)。
通過(guò)設(shè)置消息的TTL,有些即使還沒(méi)有被確認(rèn),但已經(jīng)超過(guò)TTL的消息,也會(huì)被刪除。

(二)消息去重
實(shí)現(xiàn)消息去重的一種方式是確保消息僅生成一次,即生產(chǎn)者冪等。這種方式的缺點(diǎn)是把消息去重的工作交由應(yīng)用去做。
在Pulsar中,Broker支持配置開(kāi)啟消息去重,用戶不需要為了消息去重去調(diào)整Producer的代碼。啟用消息去重后,即使一條消息被多次發(fā)送到Topic上,這條消息也只會(huì)被持久化到磁盤一次。
如下圖,未開(kāi)啟消息去重時(shí),Producer發(fā)送消息1到Topic后,Broker會(huì)把消息1持久化到BookKeeper,當(dāng)Producer又發(fā)送消息1時(shí),Broker會(huì)把消息1再一次持久化到BookKeeper。開(kāi)啟消息去重后,當(dāng)Producer再次發(fā)送消息1時(shí),Broker不會(huì)把消息1再一次持久化到磁盤。

去重原理
Producer對(duì)每一個(gè)發(fā)送的消息,都會(huì)采用遞增的方式生成一個(gè)唯一的sequenceID,這個(gè)消息會(huì)放在message的元數(shù)據(jù)中傳遞給Broker。同時(shí),Broker也會(huì)維護(hù)一個(gè)PendingMessage隊(duì)列,當(dāng)Broker返回發(fā)送成功ack后,Producer會(huì)將PendingMessage隊(duì)列中的對(duì)于的sequence id刪除,表示Producer任務(wù)這個(gè)消息生產(chǎn)成功。Broker會(huì)記錄針對(duì)每個(gè) Producer接收到的最大Sequence ID和已經(jīng)處理完的最大Sequence ID。
當(dāng)Broker開(kāi)啟消息去重后,Broker會(huì)對(duì)每個(gè)消息請(qǐng)求進(jìn)行是否去重的判斷。收到的最新的Sequence ID是否大于Broker端記錄的兩個(gè)維度的最大Sequence ID,如果大于則不重復(fù),如果小于或等于則消息重復(fù)。消息重復(fù)時(shí),Broker端會(huì)直接返回ack,不會(huì)繼續(xù)走后續(xù)的存儲(chǔ)處理流程。
(三)消息延遲傳遞
延時(shí)消息功能允許Consumer能夠在消息發(fā)送到Topic后過(guò)一段時(shí)間才能消費(fèi)到這條消息。在這種機(jī)制中,消息在發(fā)布到Broker后,會(huì)被存儲(chǔ)在BookKeeper中,當(dāng)?shù)较⑻囟ǖ难舆t時(shí)間時(shí),消息就會(huì)傳遞給Consumer。
下圖為消息延遲傳遞的機(jī)制。Broker在存儲(chǔ)延遲消息的時(shí)候不會(huì)進(jìn)行特殊的處理。當(dāng)Consumer消費(fèi)消息的時(shí)候,如果這條消息設(shè)置了延遲時(shí)間,則會(huì)把這條消息加入DelayedDeliveryTracker中,當(dāng)?shù)搅酥付ǖ陌l(fā)送時(shí)間時(shí),DelayedDeliveryTracker才會(huì)把這條消息推送給消費(fèi)者。

延遲投遞原理
在Pulsar中,可以通過(guò)兩種方式實(shí)現(xiàn)延遲投遞。分別為deliverAfter和deliverAt。
deliverAfter可以指定具體的延遲時(shí)間戳,deliverAt可以指定消息在多長(zhǎng)時(shí)間后消費(fèi)。兩種方式本質(zhì)時(shí)一樣的,deliverAt方式下,客戶端會(huì)計(jì)算出具體的延遲時(shí)間戳發(fā)送給Broker。
DelayedDeliveryTracker會(huì)記錄所有需要延遲投遞的消息的index。index由Timestamp、Ledger ID、Entry ID三部分組成,其中Ledger ID和Entry ID用于定位該消息,Timestamp除了記錄需要投遞的時(shí)間,還用于延遲優(yōu)先級(jí)隊(duì)列排序。DelayedDeliveryTracker會(huì)根據(jù)延遲時(shí)間對(duì)消息進(jìn)行排序,延遲時(shí)間最短的放在前面。當(dāng)Consumer在消費(fèi)時(shí),如果有到期的消息需要消費(fèi),則根據(jù)DelayedDeliveryTracker index的Ledger ID、Entry ID找到對(duì)應(yīng)的消息進(jìn)行消費(fèi)。
如下圖,Producer依次投遞m1、m2、m3、m4、m5這五條消息,m2沒(méi)有設(shè)置延遲時(shí)間,所以會(huì)被Consumer直接消費(fèi)。m1、m3、m4、m5在DelayedDeliveryTracker會(huì)根據(jù)延遲時(shí)間進(jìn)行排序,并在到達(dá)延遲時(shí)間時(shí),依次被Consumer進(jìn)行消費(fèi)。

(四)Bundle
我們知道,Topic分區(qū)會(huì)散落在不同的Broker中,那Topic分區(qū)和Broker的關(guān)系是如何維護(hù)的呢?當(dāng)某個(gè)Broker負(fù)載過(guò)高時(shí),Pulsar怎么處理呢?
Topic分區(qū)與Broker的關(guān)聯(lián)是通過(guò)Bundle機(jī)制進(jìn)行管理的。
每個(gè)namespace存在一個(gè)Bundle列表,在namesapce創(chuàng)建時(shí)可以指定Bundle的數(shù)量。Bundle其實(shí)是一個(gè)分片機(jī)制,每個(gè)Bundle擁有 namespace 整個(gè)hash范圍的一部分。每個(gè)Topic (分區(qū)) 通過(guò)hash運(yùn)算落到相應(yīng)的Bundle區(qū)間,進(jìn)而找到當(dāng)前區(qū)間關(guān)聯(lián)的Broker。每個(gè)Bundle綁定唯一的一個(gè)Broker,但一個(gè)Broker可以有多個(gè)Bundle。
如下圖,T1、T2這兩個(gè)Topic的hash結(jié)果落在[0x0000000L——0x4000000L]中,這個(gè)hash范圍的Bundle對(duì)應(yīng)Broker2,Broker2會(huì)對(duì)T1、T2進(jìn)行處理。
同理,T4的hash結(jié)果落在[0x4000000L——0x8000000L]中,這個(gè)hash范圍的Bundle對(duì)應(yīng)Broker1,Broker1會(huì)對(duì)T4進(jìn)行處理;
T5的hash結(jié)果落在[0x8000000L——0xC000000L]中,這個(gè)hash范圍的Bundle對(duì)應(yīng)Broker3,Broker3會(huì)對(duì)T5進(jìn)行處理;
T3的hash結(jié)果落在[0xC000000L——0x0000000L]中,這個(gè)hash范圍的Bundle對(duì)應(yīng)Broker3,Broker3會(huì)對(duì)T3進(jìn)行處理。

Bundle可以根據(jù)綁定的Broker的負(fù)載進(jìn)行動(dòng)態(tài)的調(diào)整、綁定。當(dāng)Bundle綁定的Broker的Topic數(shù)過(guò)多、負(fù)載過(guò)高時(shí),都會(huì)觸發(fā)Bundle拆分,將原有的Bundle拆分成2個(gè)Bundle,并將其中一個(gè)Bundle重新分配給不同的Broker,以降低原Broker的Topic數(shù)或負(fù)載。
五、Pulsar存儲(chǔ)層(Bookkeeper)
BookKeeper是Pulsar的存儲(chǔ)組件。
對(duì)于Pulsar的每個(gè)Topic(分區(qū)),其數(shù)據(jù)并不會(huì)固定的分配在某個(gè) Bookie上,具體的邏輯實(shí)現(xiàn)我們?cè)?/span>Bundle一節(jié)已經(jīng)討論過(guò),而Topic的物理存儲(chǔ),實(shí)際上是通過(guò)BookKeeper組件來(lái)實(shí)現(xiàn)的。
(一)分片存儲(chǔ)
概念:
Bookie:BookKeeper的一部分,處理需要持久化的數(shù)據(jù)。
Ledger:BookKeeper的存儲(chǔ)邏輯單元,可用于追加寫數(shù)據(jù)。
Entry:寫入BookKeeper的數(shù)據(jù)實(shí)體。當(dāng)批量生產(chǎn)時(shí),Entry為多條消息,當(dāng)非批量生產(chǎn)時(shí),Entry為單條數(shù)據(jù)。
Pulsar在物理上采用分片存儲(chǔ)的模式,存儲(chǔ)粒度比分區(qū)更細(xì)化、存儲(chǔ)負(fù)載更均衡。如圖,一個(gè)分區(qū)Topic-Partition2的數(shù)據(jù)由多個(gè)分片組成。每個(gè)分片作為BookKeeper中的一個(gè)Ledger,均勻的分布并存儲(chǔ)在BookKeeper的多個(gè)Bookie節(jié)點(diǎn)中。
基于分配存儲(chǔ)的機(jī)制,使得Bookie的擴(kuò)容可以即時(shí)完成,無(wú)需任何數(shù)據(jù)復(fù)制或者遷移。當(dāng)Bookie擴(kuò)容時(shí),Broker可以立刻發(fā)現(xiàn)并感知新的Bookie,并嘗試將新的分片Segment寫入新增加的Bookie中。

如上圖,在Broker中,消息以Entry的形式追加的形式寫入Ledger中,每個(gè)Topic分區(qū)都有多個(gè)非連續(xù)ID的Ledger,Topic分區(qū)的Ledger同一時(shí)刻只有一個(gè)處于可寫狀態(tài)。
Topic分區(qū)在存儲(chǔ)消息時(shí),會(huì)先找到當(dāng)前使用的Ledger,生成Entry ID(每個(gè)Entry ID在同一個(gè)Ledger內(nèi)是遞增的)。當(dāng)Ledger的長(zhǎng)度或Entry個(gè)數(shù)超過(guò)閾值時(shí),新消息會(huì)存儲(chǔ)到新Ledger中。每個(gè)messageID由[Ledger ID,Entry ID,Partition編號(hào),batch-index]組成。(Partition:消息所屬的Topic分區(qū),batch-index:是否為批量消息)
一個(gè)Ledger會(huì)根據(jù)Topic指定的副本數(shù)量存儲(chǔ)到多個(gè)Bookie中。一個(gè)Bookie可以存放多個(gè)不連續(xù)的Ledger。
(二)讀寫數(shù)據(jù)的流程
?
Journals:Journals文件包含BookKeeper的事務(wù)日志信息。在對(duì)Ledger更新之前,Bookie會(huì)保證更新的事務(wù)信息已經(jīng)寫入Journals。當(dāng)Bookie啟動(dòng)或者舊的Journals大小達(dá)到閾值時(shí),就會(huì)創(chuàng)建一個(gè)新的Journals 。
Entry Logs:Entry Logs管理從Bookie收到的Entry數(shù)據(jù)。來(lái)自不同Ledger的Entry會(huì)按順序聚合并寫入Entry Logs,這些Entry在Entry Logs中的偏移量會(huì)作為指針保存在Ledger Cache中,以便快速查找。當(dāng)Bookie啟動(dòng)或者舊的Entry Logs大小達(dá)到閾值時(shí),就會(huì)創(chuàng)建一個(gè)新的Entry Logs。當(dāng)舊的Entry Logs沒(méi)有與任何活躍的Ledger關(guān)聯(lián)時(shí),就會(huì)被垃圾回收器刪除。
Index Files:每個(gè)Ledger都會(huì)創(chuàng)建一個(gè)Index file,它包括一個(gè)頭和幾個(gè)固定長(zhǎng)度的Index page,這些Index page記錄存儲(chǔ)在Entry Logs中的Entry的偏移量。由于更新索引文件會(huì)引入隨機(jī)的磁盤I/O,所以索引文件由后臺(tái)運(yùn)行的同步線程延遲更新。這確保了更新的快速性能。在索引頁(yè)持久化到磁盤之前,將它們聚集在Ledger Cache中以方便查找。
Ledger Cache:Ledger Cache存放在內(nèi)存池中,這樣可以更高效地管理磁盤頭調(diào)度。

消息的寫入
將Entry追加寫入Ledger中。
將這次Entry的更新操作寫入Journal日志中,當(dāng)由多個(gè)數(shù)據(jù)寫入時(shí),可以批量提交,將數(shù)據(jù)刷到Journal磁盤中。
將Entry數(shù)據(jù)寫入寫緩存中。
返回寫入成功響應(yīng)。
到這里,消息寫入的同步流程已經(jīng)完成。
3-A. 內(nèi)存中的Entry數(shù)據(jù)會(huì)根據(jù)Ledger和寫入Ledger的時(shí)間順序進(jìn)行排序,批量寫入Entry Log中。
3-B. Entry在Entry log中的偏移量以Index Page的方式寫入Ledger Cache中,即iIdex Files。
Entry Log和Ledger Cache中的Index File會(huì)Flush到磁盤中。
消息的讀取
A.先從寫緩存中以尾部讀的方式讀取。
B.如果寫緩存未命中,則從讀緩存中讀取。
C.如果讀緩存未命中,則從磁盤中讀取。磁盤讀取有三步:
C-1.讀取Index Disk,獲取Entry的偏移量。
C-2.根據(jù)Entry的偏移量,在Entry Disk中快速找到Entry。
C-3.將Entry數(shù)據(jù)寫入讀緩存中。
參考文獻(xiàn)
1.Pulsar官方文檔
2.BookKeeper官方文檔
3.Apache Pulsar 技術(shù)系列-客戶端消息確認(rèn)
4.Apache Pulsar 技術(shù)系列-Message deduplication這里的去重與你想的可能不一樣
5.Apache Pulsar 技術(shù)系列-Pulsar延遲消息投遞解析
6.Apache 系列—Pulsar核心特性解析
?作者簡(jiǎn)介
雷潔彥
騰訊后端開(kāi)發(fā)工程師
騰訊后端開(kāi)發(fā)工程師,目前負(fù)責(zé)QQ瀏覽器信息流內(nèi)容業(yè)務(wù)的后端開(kāi)發(fā)工作。
?推薦閱讀
基于流計(jì)算Oceanus和Elasticsearch Service構(gòu)建百億級(jí)實(shí)時(shí)監(jiān)控系統(tǒng)
云原生吞噬世界,是大勢(shì)所趨還是技術(shù)炒作?
帶你徹底擊潰跳表原理及其Golang實(shí)現(xiàn)?。▋?nèi)含圖解)


