<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          事務(wù)消息大揭秘!RocketMQ、Kafka、Pulsar全方位對比

          共 10296字,需瀏覽 21分鐘

           ·

          2021-09-17 17:34


          導(dǎo)語 | 事務(wù)是一個程序執(zhí)行單元,里面的所有操作要么全部執(zhí)行成功,要么全部執(zhí)行失敗。RocketMQ、Kafka和Pulsar都是當(dāng)今業(yè)界應(yīng)用十分廣泛的開源消息隊(duì)列(MQ)組件,筆者在工作中遇到關(guān)于MQ選型相關(guān)的內(nèi)容,了解到關(guān)于“事務(wù)消息”這個概念在不同的MQ組件里有不同內(nèi)涵。故借此文,試著淺析一番這三種消息隊(duì)列(MQ)的事務(wù)消息有何異同,目的是形成關(guān)于消息隊(duì)列事務(wù)消息的全景視圖,給有類似業(yè)務(wù)需求的同學(xué)提供一些參考和借鑒。



          一、消息隊(duì)列演化


          消息隊(duì)列(Message Queue,簡稱MQ),是指在消息的傳輸中保存消息的容器或服務(wù),是一種異步的服務(wù)間通信方式,適用于無服務(wù)器和微服務(wù)架構(gòu),是分布式系統(tǒng)實(shí)現(xiàn)高性能、高可用、可伸縮等高級特效的重要組件。


          常見的主流消息隊(duì)列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ、Pulsar等。而在公司內(nèi)有TubeMQ、Ckafka、TDMQ、CMQ、CDMQ、Hippo等。



          Kafka:Apache Kafka是由Apache軟件基金會開發(fā)的一個開源消息系統(tǒng)項(xiàng)目,由Scala寫成。Kafka最初是由LinkedIn開發(fā),并于2011年初開源。2012年10月從Apache Incubator畢業(yè)。該項(xiàng)目的目標(biāo)是為處理實(shí)時(shí)數(shù)據(jù)提供一個統(tǒng)一、高通量、低等待的平臺。


          Kafka是一個分布式的、分區(qū)的、多復(fù)本的日志提交服務(wù)。它通過一種獨(dú)一無二的設(shè)計(jì)提供了一個消息系統(tǒng)的功能,其整體架構(gòu)圖如下所示。



          RocketMQ:Apache RocketMQ是一個分布式消息和流媒體平臺,具有低延遲、強(qiáng)一致、高性能和可靠性、萬億級容量和靈活的可擴(kuò)展性。它有借鑒Kafka的設(shè)計(jì)思想,但不是Kafka的拷貝,其整體架構(gòu)圖如下所示。



          Pulsar:Apache Pulsar是Apache軟件基金會頂級項(xiàng)目,是下一代云原生分布式消息流平臺,集消息、存儲、輕量化函數(shù)式計(jì)算為一體,采用計(jì)算與存儲分離架構(gòu)設(shè)計(jì),支持多租戶、持久化存儲、多機(jī)房跨區(qū)域數(shù)據(jù)復(fù)制,具有強(qiáng)一致性、高吞吐、低延時(shí)及高可擴(kuò)展性等流數(shù)據(jù)存儲特性,被看作是云原生時(shí)代實(shí)時(shí)消息流傳輸、存儲和計(jì)算最佳解決方案,其整體架構(gòu)圖如下所示。




          二、背景知識


          (一)什么是事務(wù)?


          • 事務(wù)(Trasaction)


          事務(wù)是一個程序執(zhí)行單元,里面的所有操作要么全部執(zhí)行成功,要么全部執(zhí)行失敗。


          一個事務(wù)有個基本特性,也就是我們常說的(ACID)。


          Atomicity(原子性):事務(wù)是一個不可分割的整體,事務(wù)內(nèi)所有操作要么全做成功,要么全失敗。


          Consistency(一致性):事務(wù)執(zhí)行前后,數(shù)據(jù)從一個狀態(tài)到另一個狀態(tài)必須是一致的(A向B轉(zhuǎn)賬,不能出現(xiàn)A扣了錢,B卻沒收到)。


          Isolation(隔離性):多個并發(fā)事務(wù)之間相互隔離,不能互相干擾。


          Durablity(持久性):事務(wù)完成后,對數(shù)據(jù)的更改是永久保存的,不能回滾。


          • 分布式事務(wù)


          分布式事務(wù)是指事務(wù)的參與者、支持事務(wù)的服務(wù)器、資源服務(wù)器以及事務(wù)管理器分別位于不同的分布式系統(tǒng)的不同節(jié)點(diǎn)之上。分布式事務(wù)通常用于在分布式系統(tǒng)中保證不同節(jié)點(diǎn)之間的數(shù)據(jù)一致性。


          分布式事務(wù)的解決方案一般有以下幾種:

          XA(2PC/3PC)


          最具有代表性的是由Oracle Tuxedo系統(tǒng)提出的XA分布式事務(wù)協(xié)議。XA中大致分為兩部分:事務(wù)管理器和本地資源管理器。其中本地資源管理器往往由數(shù)據(jù)庫實(shí)現(xiàn),比如Oracle、DB2這些商業(yè)數(shù)據(jù)庫都實(shí)現(xiàn)了XA接口,而事務(wù)管理器作為全局的調(diào)度者,負(fù)責(zé)各個本地資源的提交和回滾。XA協(xié)議通常包含兩階段提交(2PC)三階段提交(3PC)兩種實(shí)現(xiàn)。兩階段提交顧名思義就是要進(jìn)行兩個階段的提交:第一階段,準(zhǔn)備階段(投票階段);第二階段,提交階段(執(zhí)行階段)。實(shí)現(xiàn)過程如下所示:



          二階段提交看似能夠提供原子性的操作,但它存在著一些缺陷,三段提交(3PC)是對兩段提交(2PC)的一種升級優(yōu)化,有興趣的可以深入了解一下,這里不再贅述。


          TCC


          TCC(Try-Confirm-Cancel)是Try、Commit、Cancel三種指令的縮寫,又被稱補(bǔ)償事務(wù),其邏輯模式類似于XA兩階段提交,事務(wù)處理流程也很相似,但2PC是應(yīng)用于在DB層面,TCC則可以理解為在應(yīng)用層面的2PC,是需要我們編寫業(yè)務(wù)邏輯來實(shí)現(xiàn)。


          TCC它的核心思想是:“針對每個操作都要注冊一個與其對應(yīng)的確認(rèn)(Try)和補(bǔ)償(Cancel)”。


          消息事務(wù)


          所謂的消息事務(wù)就是基于消息隊(duì)列的兩階段提交,本質(zhì)上是對消息隊(duì)列的一種特殊利用,它是將本地事務(wù)和發(fā)消息放在了一個分布式事務(wù)里,保證要么本地操作成功成功并且對外發(fā)消息成功,要么兩者都失敗。


          基于消息隊(duì)列的兩階段提交往往用在高并發(fā)場景下,將一個分布式事務(wù)拆成一個消息事務(wù)(A系統(tǒng)的本地操作+發(fā)消息)+B系統(tǒng)的本地操作,其中B系統(tǒng)的操作由消息驅(qū)動,只要消息事務(wù)成功,那么A操作一定成功,消息也一定發(fā)出來了,這時(shí)候B會收到消息去執(zhí)行本地操作,如果本地操作失敗,消息會重投,直到B操作成功,這樣就變相地實(shí)現(xiàn)了A與B的分布式事務(wù)。原理如下:


          雖然上面的方案能夠完成A和B的操作,但是A和B并不是強(qiáng)一致的,而是最終一致(Eventually consistent)的。而這也是滿足BASE理論的要求的。這里引申一下,BASE是Basically Available(基本可用)、Soft state(軟狀態(tài))和Eventually consistent(最終一致性)三個短語的縮寫。BASE理論是對CAP中AP(CAP已經(jīng)被證實(shí)一個分布式系統(tǒng)最多只能同時(shí)滿足CAP三項(xiàng)中的兩項(xiàng))的一個擴(kuò)展,通過犧牲強(qiáng)一致性來獲得可用性,當(dāng)出現(xiàn)故障允許部分不可用但要保證核心功能可用,允許數(shù)據(jù)在一段時(shí)間內(nèi)是不一致的,但最終達(dá)到一致狀態(tài)。滿足BASE理論的事務(wù),我們稱之為“柔性事務(wù)”。



          (二)什么是Exactly-once (精確一次)語義?


          在分布式系統(tǒng)中,任何節(jié)點(diǎn)都有可能出現(xiàn)異常甚至宕機(jī)。在消息隊(duì)列中也一樣,當(dāng)Producer在生產(chǎn)消息時(shí),可能會發(fā)生Broker宕機(jī)不可用,或者網(wǎng)絡(luò)突然中斷等異常情況。根據(jù)在發(fā)生異常時(shí)Producer處理消息的方式,系統(tǒng)可以具備以下三種消息語義。


          • At-least-once(至少一次)語義


          Producer通過接收Broker的ACK(消息確認(rèn))通知來確保消息成功寫入Topic。然而,當(dāng)Producer接收ACK通知超時(shí),或者收到Broker出錯信息時(shí),會嘗試重新發(fā)送消息。??????如果Broker正好在成功把消息寫入到Topic,但還沒有給Producer發(fā)送ACK時(shí)宕機(jī),Producer重新發(fā)送的消息會被再次寫入到Topic,最終導(dǎo)致消息被重復(fù)分發(fā)至Consumer。即:消息不會丟失,但有可能被重復(fù)發(fā)送。


          • At-most-once(最多一次)語義


          當(dāng)Producer在接收ACK超時(shí),或者收到Broker出錯信息時(shí)不重發(fā)消息,那就有可能導(dǎo)致這條消息丟失,沒有寫入到Topic中,也不會被Consumer消費(fèi)到。在某些場景下,為了避免發(fā)生重復(fù)消費(fèi),我們可以容許消息丟失的發(fā)生。即:消息可能會丟失,但絕不會被重復(fù)發(fā)送。


          • Exactly-once(精確一次)語義


          Exactly-once語義保證了即使Producer多次發(fā)送同一條消息到服務(wù)端,服務(wù)端也僅僅會記錄一次。Exactly-once語義是最可靠的,同時(shí)也是最難理解的。Exactly-once語義需要消息隊(duì)列服務(wù)端,消息生產(chǎn)端和消費(fèi)端應(yīng)用三者的協(xié)同才能實(shí)現(xiàn)。比如,當(dāng)消費(fèi)端應(yīng)用成功消費(fèi)并且ACK了一條消息之后,又把消費(fèi)位點(diǎn)回滾到之前的一個消息ID,那么從那個消息ID往后的所有消息都會被消費(fèi)端應(yīng)用重新消費(fèi)到。即:消息不會丟失,也不會被重復(fù)發(fā)送。



          三、RocketMQ、Kafka、Pulsar事務(wù)消息


          (一)RocketMQ的事務(wù)消息


          RocketMQ在4.3.0版中已經(jīng)支持分布式事務(wù)消息,這里RocketMQ采用了2PC的思想來實(shí)現(xiàn)了提交事務(wù)消息,同時(shí)增加一個補(bǔ)償邏輯來處理二階段超時(shí)或者失敗的消息,流程如下圖所示:



          其具體工作流程分為正常事務(wù)消息的發(fā)送及提交和不正常情況下事務(wù)消息的補(bǔ)償流程:


          1. 在消息隊(duì)列上開啟一個事務(wù)主題。


          2. 事務(wù)中第一個執(zhí)行的服務(wù)發(fā)送一條“半消息”(半消息和普通消息的唯一區(qū)別是,在事務(wù)提交之前,對于消費(fèi)者來說,這個消息是不可見的)給消息隊(duì)列。


          3. 半消息發(fā)送成功后,發(fā)送半消息的服務(wù)就會開始執(zhí)行本地事務(wù),根據(jù)本地事務(wù)執(zhí)行結(jié)果來決定事務(wù)消息提交或者回滾。


          4. 本地事務(wù)成功后會讓這個“半消息”變成正常消息,供分布式事務(wù)后面的步驟執(zhí)行自己的本地事務(wù)。(這里的事務(wù)消息,Producer不會因?yàn)镃onsumer消費(fèi)失敗而做回滾,采用事務(wù)消息的應(yīng)用,其所追求的是高可用和最終一致性,消息消費(fèi)失敗的話,RocketMQ自己會負(fù)責(zé)重推消息,直到消費(fèi)成功。


          補(bǔ)償流程RocketMQ提供事務(wù)反查來解決異常情況,如果RocketMQ沒有收到提交或者回滾的請求,Broker會定時(shí)到生產(chǎn)者上去反查本地事務(wù)的狀態(tài),然后根據(jù)生產(chǎn)者本地事務(wù)的狀態(tài)來處理這個“半消息”是提交還是回滾。值得注意的是我們需要根據(jù)自己的業(yè)務(wù)邏輯來實(shí)現(xiàn)反查邏輯接口,然后根據(jù)返回值Broker決定是提交還是回滾。而且這個反查接口需要是無狀態(tài)的,請求到任意一個生產(chǎn)者節(jié)點(diǎn)都會返回正確的數(shù)據(jù)。


          其中,補(bǔ)償流程用于解決消息Commit或者Rollback發(fā)生超時(shí)或者失敗的情況。在RocketMQ事務(wù)消息的主要流程中,一階段的消息如何對用戶不可見。其中,事務(wù)消息相對普通消息最大的特點(diǎn)就是一階段發(fā)送的消息對用戶是不可見的。那么,如何做到寫入消息但是對用戶不可見呢?RocketMQ事務(wù)消息的做法是:如果消息是“半消息”,將備份原消息的主題與消息消費(fèi)隊(duì)列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由于消費(fèi)組未訂閱該主題,故消費(fèi)端無法消費(fèi)“半消息”的消息,然后RocketMQ會開啟一個定時(shí)任務(wù),從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進(jìn)行消費(fèi),根據(jù)生產(chǎn)者組獲取一個服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請求,根據(jù)事務(wù)狀態(tài)來決定是提交或回滾消息。


          講到這里大家就明白了,這里說的就是上文提到分布式事務(wù)中的消息事務(wù),目的是在分布式事務(wù)中實(shí)現(xiàn)系統(tǒng)的最終一致性。



          (二)Kafka的事務(wù)消息


          與RocketMQ的事務(wù)消息用途不同,Kafka的事務(wù)基本上是配合其冪等機(jī)制來實(shí)現(xiàn)Exactly-once(見上文)語義的。


          開發(fā)此功能的原因可以總結(jié)如下:


          流處理的需求


          隨著流處理的興起,對具有更強(qiáng)處理保證的流處理應(yīng)用的需求也在增長。例如,在金融行業(yè),金融機(jī)構(gòu)使用流處理引擎為用戶處理借款和信貸。這種類型的用例要求每條消息都只處理一次,無一例外。


          換句話說,如果流處理應(yīng)用程序消費(fèi)消息A并將結(jié)果作為消息B(B = f(A)),那么恰好一次處理保證意味著當(dāng)且僅當(dāng)B被成功生產(chǎn)后A才能被標(biāo)記為消費(fèi),反之亦然。


          事務(wù)API使流處理應(yīng)用程序能夠在一個原子操作中使用、處理和生成消息。這意味著,事務(wù)中的一批消息可以從許多主題分區(qū)接收、生成和確認(rèn)。一個事務(wù)涉及的所有操作都作為整體成功或失敗。


          目前,Kafka默認(rèn)提供的交付可靠性保障是At-least-once。如果消息成功“提交”,但Broker的應(yīng)答沒有成功發(fā)送回Producer端(比如網(wǎng)絡(luò)出現(xiàn)瞬時(shí)抖動),那么Producer就無法確定消息是否真的提交成功了。因此,它只能選擇重試,這就是Kafka默認(rèn)提供At-least-once保障的原因,不過這會導(dǎo)致消息重復(fù)發(fā)送。大部分用戶還是希望消息只會被交付一次,這樣的話,消息既不會丟失,也不會被重復(fù)處理?;蛘哒f,即使Producer端重復(fù)發(fā)送了相同的消息,Broker端也能做到自動去重。在下游Consumer看來,消息依然只有一條。那么問題來了,Kafka是怎么做到精確一次的呢?簡單來說,這是通過兩種機(jī)制:冪等性(Idempotence)和事務(wù)(Transaction)。


          • 冪等性Producer


          “冪等”這個詞原是數(shù)學(xué)領(lǐng)域中的概念,指的是某些操作或函數(shù)能夠被執(zhí)行多次,但每次得到的結(jié)果都是不變的。冪等性有很多好處,其最大的優(yōu)勢在于我們可以安全地重試任何冪等性操作,反正它們也不會破壞我們的系統(tǒng)狀態(tài)。如果是非冪等性操作,我們還需要擔(dān)心某些操作執(zhí)行多次對狀態(tài)的影響,但對于冪等性操作而言,我們根本無需擔(dān)心此事。


          在Kafka中,Producer默認(rèn)不是冪等性的,但我們可以創(chuàng)建冪等性Producer。它其實(shí)是0.11.0.0版本引入的新功能。enable.idempotence 被設(shè)置成true后,Producer自動升級成冪等性Producer,其他所有的代碼邏輯都不需要改變。Kafka自動幫你做消息的重復(fù)去重。Kafka為了實(shí)現(xiàn)冪等性,它在底層設(shè)計(jì)架構(gòu)中引入了ProducerIDSequenceNumber。ProducerID:在每個新的Producer初始化時(shí),會被分配一個唯一的ProducerID,用來標(biāo)識本次會話。


          SequenceNumber:對于每個ProducerID,Producer發(fā)送數(shù)據(jù)的每個Topic和Partition都對應(yīng)一個從0開始單調(diào)遞增的SequenceNumber值。Broker在內(nèi)存維護(hù)(pid,seq)映射,收到消息后檢查seq。Producer在收到明確的的消息丟失ack,或者超時(shí)后未收到ack,要進(jìn)行重試。


          • new_seq=old_seq+1: 正常消息;

          • new_seq<=old_seq: 重復(fù)消息;

          • new_seq>old_seq+1: 消息丟失;


          另外我們需要了解冪等性Producer的作用范圍。首先,它只能保證單分區(qū)上的冪等性,即一個冪等性Producer能夠保證某個主題的一個分區(qū)上不出現(xiàn)重復(fù)消息,它無法實(shí)現(xiàn)多個分區(qū)的冪等性。其次,它只能實(shí)現(xiàn)單會話上的冪等性,不能實(shí)現(xiàn)跨會話的冪等性。這里的會話,你可以理解為Producer進(jìn)程的一次運(yùn)行。當(dāng)你重啟了Producer進(jìn)程之后,這種冪等性保證就喪失了。如果想實(shí)現(xiàn)多分區(qū)以及多會話上的消息無重復(fù),應(yīng)該怎么做呢?答案就是事務(wù)(transaction)或者依賴事務(wù)型Producer。這也是冪等性Producer和事務(wù)型Producer的最大區(qū)別。


          • 事務(wù)型Producer


          事務(wù)型Producer能夠保證將消息原子性地寫入到多個分區(qū)中。這批消息要么全部寫入成功,要么全部失敗。另外,事務(wù)型Producer也不受進(jìn)程的重啟影響。Producer重啟后,Kafka依然保證它們發(fā)送消息的Exactly-once處理。和普通Producer代碼相比,事務(wù)型Producer的顯著特點(diǎn)是調(diào)用了一些事務(wù)API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,它們分別對應(yīng)事務(wù)的初始化、事務(wù)開始、事務(wù)提交以及事務(wù)終止。


          Kafka事務(wù)消息是由Producer、事務(wù)協(xié)調(diào)器、Broker、組協(xié)調(diào)器、Consumer等共同參與實(shí)現(xiàn)的。


          Producer


          為Producer指定固定的TransactionalId(事務(wù)id),可以穿越Producer的多次會(Producer重啟/斷線重連)中,持續(xù)標(biāo)識Producer的身份。


          每個生產(chǎn)者增加一個epoch。用于標(biāo)識同一個TransactionalId在一次事務(wù)中的epoch,每次初始化事務(wù)時(shí)會遞增,從而讓服務(wù)端可以知道生產(chǎn)者請求是否舊的請求。使用epoch標(biāo)識Producer的每一次“重生”,可以防止同一Producer存在多個會話。


          Producer遵從冪等消息的行為,并在發(fā)送的BatchRecord中增加事務(wù)id和epoch。


          事務(wù)協(xié)調(diào)器(Transaction Coordinator)


          引入事務(wù)協(xié)調(diào)器,類似于消費(fèi)組負(fù)載均衡的協(xié)調(diào)者,每一個實(shí)現(xiàn)事務(wù)的生產(chǎn)端都被分配到一個事務(wù)協(xié)調(diào)者。以兩階段提交的方式,實(shí)現(xiàn)消息的事務(wù)提交。


          事務(wù)協(xié)調(diào)器使用一個特殊的Topic:即事務(wù)Topic,事務(wù)Topic本身也是持久化的,日志信息記錄事務(wù)狀態(tài)信息,由事務(wù)協(xié)調(diào)者寫入。


          事務(wù)協(xié)調(diào)器通過RPC調(diào)用,協(xié)調(diào)Broker和Consumer實(shí)現(xiàn)事務(wù)的兩階段提交。


          每一個Broker都會啟動一個事務(wù)協(xié)調(diào)器,使用hash(TransactionalId)確定Producer對應(yīng)的事務(wù)協(xié)調(diào)器,使得整個集群的負(fù)載均衡。


          Broker


          引入控制消息(Control Messages):這些消息是客戶端產(chǎn)生的并寫入到主題的特殊消息,但對于使用者來說不可見。它們是用來讓Broker告知消費(fèi)者之前拉取的消息是否被原子性提交。


          Broker處理事務(wù)協(xié)調(diào)器的commit/abort控制消息,把控制消息向正常消息一樣寫入Topic(圖中標(biāo)c的消息,和正常消息交織在一起,用來確認(rèn)事務(wù)提交的日志偏移),并向前推進(jìn)消息提交偏移hw。



          組協(xié)調(diào)器


          如果在事務(wù)過程中,提交了消費(fèi)偏移,組協(xié)調(diào)器在offset log中寫入事務(wù)消費(fèi)偏移。當(dāng)事務(wù)提交時(shí),在offset log中寫入事務(wù)offset確認(rèn)消息。


          Consumer


          Consumer過濾未提交消息和事務(wù)控制消息,使這些消息對用戶不可見。

          有兩種實(shí)現(xiàn)方式:


          • Consumer緩存方式


          設(shè)置isolation.level=read_uncommitted,此時(shí)topic的所有消息對Consumer都可見。Consumer緩存這些消息,直到收到事務(wù)控制消息。若事務(wù)commit,則對外發(fā)布這些消息;若事務(wù)abort,則丟棄這些消息。


          • Broker過濾方式


          設(shè)置isolation.level=read_committed,此時(shí)topic中未提交的消息對Consumer不可見,只有在事務(wù)結(jié)束后,消息才對Consumer可見。Broker給Consumer的BatchRecord消息中,會包含以列表,指明哪些是“abort”事務(wù),Consumer丟棄abort事務(wù)的消息即可。


          因?yàn)槭聞?wù)機(jī)制會影響消費(fèi)者所能看到的消息的范圍,它不只是簡單依賴高水位來判斷。它依靠一個名為LSO(Log Stable Offset)的位移值來判斷事務(wù)型消費(fèi)者的可見性。



          (三)Pulsar的事務(wù)消息


          Apache Pulsar在2.8.0正式支持了事務(wù)相關(guān)的功能,Pulsar這里提供的事務(wù)區(qū)別于RocketMQ中2PC那種事務(wù)的實(shí)現(xiàn)方式,沒有本地事務(wù)回查的機(jī)制,更類似于Kafka的事務(wù)實(shí)現(xiàn)機(jī)制。Apache Pulsar中的事務(wù)主要用來保證類似Pulsar Functions這種流計(jì)算場景中Exactly-once語義的實(shí)現(xiàn),這也符合Apache Pulsar本身Event Streaming的定位,即保證端到端(End-to-End)的事務(wù)實(shí)現(xiàn)的語義。


          在Pulsar中,對于事務(wù)語義是這樣定義的:允許事件流應(yīng)用將消費(fèi)、處理、生產(chǎn)消息整個過程定義為一個原子操作,即生產(chǎn)者或消費(fèi)者能夠處理跨多個主題和分區(qū)的消息,并確保這些消息作為一個單元被處理。


          Pulsar事務(wù)具有以下語義:


          • 事務(wù)中的所有操作都作為一個單元提交。要么提交所有消息,要么都不提交。


          • 每條消息只寫入或處理一次,不會丟失數(shù)據(jù)或重復(fù)(即使發(fā)生故障)。


          • 如果事務(wù)中止,則此事務(wù)中的所有寫入和確認(rèn)都將回滾。


          事務(wù)中的批量消息可以被以多分區(qū)接收、生產(chǎn)和確認(rèn)。


          • 消費(fèi)者只能讀取已提交(確認(rèn))的消息。換句話說,Broker不傳遞屬于打開事務(wù)的事務(wù)消息或?qū)儆谥兄故聞?wù)的消息。


          • 跨多個分區(qū)的消息寫入是原子性的。


          • 跨多個訂閱的消息確認(rèn)是原子性的。訂閱下的消費(fèi)者在確認(rèn)帶有事務(wù)ID的消息時(shí),只會成功確認(rèn)一次消息。


          Pulsar事務(wù)消息由以下幾個關(guān)鍵點(diǎn)構(gòu)成:


          • 事務(wù)ID


          事務(wù)ID(TxnID)標(biāo)識Pulsar中的唯一事務(wù)。事務(wù)ID長度是128-bit。最高16位保留給事務(wù)協(xié)調(diào)器的ID,其余位用于每個事務(wù)協(xié)調(diào)器中單調(diào)遞增的數(shù)字。


          • 事務(wù)協(xié)調(diào)器(Transaction Coordinator)


          事務(wù)協(xié)調(diào)器(TC)是運(yùn)行在Pulsar Broker中的一個模塊。


          • 它維護(hù)事務(wù)的整個生命周期,并防止事務(wù)進(jìn)入錯誤狀態(tài)。

          • 它處理事務(wù)超時(shí),并確保事務(wù)在事務(wù)超時(shí)后中止。


          • 事務(wù)日志


          所有事務(wù)元數(shù)據(jù)都保存在事務(wù)日志中。事務(wù)日志由Pulsar主題記錄。如果事務(wù)協(xié)調(diào)器崩潰,它可以從事務(wù)日志恢復(fù)事務(wù)元數(shù)據(jù)。

          事務(wù)日志存儲事務(wù)狀態(tài),而不是事務(wù)中的實(shí)際消息(實(shí)際消息存儲在實(shí)際的主題分區(qū)中)。


          • 事務(wù)緩存


          向事務(wù)內(nèi)的主題分區(qū)生成的消息存儲在該主題分區(qū)的事務(wù)緩沖區(qū)(TB)中。在提交事務(wù)之前,事務(wù)緩沖區(qū)中的消息對消費(fèi)者不可見。當(dāng)事務(wù)中止時(shí),事務(wù)緩沖區(qū)中的消息將被丟棄。


          事務(wù)緩沖區(qū)將所有正在進(jìn)行和中止的事務(wù)存儲在內(nèi)存中。所有消息都發(fā)送到實(shí)際的分區(qū)Pulsar主題。提交事務(wù)后,事務(wù)緩沖區(qū)中的消息對消費(fèi)者具體化(可見)。事務(wù)中止時(shí),事務(wù)緩沖區(qū)中的消息將被丟棄。


          • 待確認(rèn)狀態(tài)


          掛起確認(rèn)狀態(tài)在事務(wù)完成之前維護(hù)事務(wù)中的消息確認(rèn)。如果消息處于掛起確認(rèn)狀態(tài),則在該消息從掛起確認(rèn)狀態(tài)中移除之前,其他事務(wù)無法確認(rèn)該消息。


          掛起的確認(rèn)狀態(tài)被保留到掛起的確認(rèn)日志中(cursor ledger)。新啟動的broker可以從掛起的確認(rèn)日志中恢復(fù)狀態(tài),以確保狀態(tài)確認(rèn)不會丟失。

          處理流程一般分為以下幾個步驟:


          1. 開啟事務(wù)。

          2. 使用事務(wù)發(fā)布消息。

          3. 使用事務(wù)確認(rèn)消息。

          4. 結(jié)束事務(wù)。

          Pulsar的事務(wù)處理流程與Kafka的事務(wù)處理思路大致上保持一致,大家都有一個TC以及對應(yīng)的一個用于持久化TC所有操作的Topic來記錄所有事務(wù)狀態(tài)變更的請求。同樣的在事務(wù)開始階段也都有一個專門的Topic來去查詢TC對應(yīng)的Owner Broker的位置在哪里。


          不同的是,第一:Kafka中對于未確認(rèn)的消息是維護(hù)在Broker端的,但是Pulsar的是維護(hù)在Client端的,通過Transaction Timeout來決定這個事務(wù)是否執(zhí)行成功,所以有了Transaction Timeout的存在之后,就可以確保Client和Broker側(cè)事務(wù)處理的一致性。第二:由于Kafka本身沒有單條消息的Ack,所以Kafka的事務(wù)處理只能是順序執(zhí)行的,當(dāng)一個事務(wù)請求被阻塞之后,會阻塞后續(xù)所有的事務(wù)請求,但是Pulsar是可以對消息進(jìn)行單條Ack的,所以在這里每一個事務(wù)的Ack動作是獨(dú)立的,不會出現(xiàn)事務(wù)阻塞的情況。



          四、結(jié)論


          RocketMQ和Kafka/Pulsar的事務(wù)消息實(shí)用的場景是不一樣的。


          RocketMQ中的事務(wù),它解決的問題是,確保執(zhí)行本地事務(wù)和發(fā)消息這兩個操作,要么都成功,要么都失敗。并且RocketMQ增加了一個事務(wù)反查的機(jī)制,來盡量提高事務(wù)執(zhí)行的成功率和數(shù)據(jù)一致性。


          Kafka中的事務(wù),它解決的問題是,確保在一個事務(wù)中發(fā)送的多條消息,要么都成功,要么都失敗。(這里面的多條消息不一定要在同一個主題和分區(qū)中,可以是發(fā)往多個主題和分區(qū)的消息)當(dāng)然也可以在kafka事務(wù)執(zhí)行過程中開啟本地事務(wù)來實(shí)現(xiàn)類似RocketMQ事務(wù)消息的效果,但是Kafka是沒有事務(wù)消息反查機(jī)制的,它是直接拋出異常的,用戶可以根據(jù)異常來實(shí)現(xiàn)自己的重試等方法保證事務(wù)正常運(yùn)行。


          它們的共同點(diǎn)就是:都是通過兩階段提交來實(shí)現(xiàn)事務(wù)的,事務(wù)消息都保存在單獨(dú)的主題上。不同的地方就是RocketMQ是通過“半消息”來實(shí)現(xiàn)的,kafka是直接將消息發(fā)送給對應(yīng)的topic,通過客戶端來過濾實(shí)現(xiàn)的。而且它們兩個使用的場景區(qū)別是非常之大的,RockteMQ主要解決的是基于本地事務(wù)和消息的數(shù)據(jù)一致性,而Kafka的事務(wù)則是用于實(shí)現(xiàn)它的Exactly-once機(jī)制,應(yīng)用于實(shí)時(shí)流計(jì)算的場景中。


          Pulsar的事務(wù)消息和Kafka應(yīng)用場景和語義類似,只是由于底層實(shí)現(xiàn)機(jī)制有差別,在一些細(xì)節(jié)上有區(qū)別。


          相信看到這里就非常清楚了,對于事務(wù)消息如何選型和應(yīng)用,首先要明白你的業(yè)務(wù)需求是什么。是要實(shí)現(xiàn)分布式事務(wù)的最終一致性,還是要實(shí)現(xiàn)Exactly-once (精確一次)語義?明白之后需求,選擇什么組件就十分明確了。


          參考文章

          1.【萬字長文】淺談Apache Kafka---入門須知

          2. Apache Pulsar技術(shù)系列-事務(wù)消息

          3. 消息隊(duì)列(MQ)架構(gòu)篇之RocketMQ

          4. Apache Pulsar技術(shù)系列-Pulsar事務(wù)實(shí)現(xiàn)機(jī)制

          5. 消息隊(duì)列漫談:如何使用消息隊(duì)列實(shí)現(xiàn)分布式事務(wù)?



           作者簡介


          劉若愚

          微信支付后臺開發(fā)工程師

          微信支付后臺開發(fā)工程師,碩士畢業(yè)于北京大學(xué)。深度參與騰訊WXG境外支付團(tuán)隊(duì)多個重要業(yè)務(wù)的研發(fā)工作,有豐富的后臺開發(fā)經(jīng)驗(yàn)。騰訊技術(shù)分享達(dá)人,社會招聘伯樂。



           推薦閱讀


          Linux入門必看:如何在60秒內(nèi)分析Linux性能?

          “Docker VS Kubernetes”是共生還是相愛相殺?

          揭秘!是什么能讓APP快速精準(zhǔn)定位?

          人機(jī)共生時(shí)代,分布式機(jī)器學(xué)習(xí)是如何加速的?





          瀏覽 38
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产天堂在线观看视频 | 成人毛片18女人在线播放 | 大香蕉伊人乱伦 | 123好逼网 | 操女人逼网站 |