<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Apache Pulsar事務(wù)機(jī)制原理解析|Apache Pulsar 技術(shù)系列

          共 6262字,需瀏覽 13分鐘

           ·

          2021-07-01 17:35


          7b358ac4e79e36598ca99a9214b81a1d.webp72c845d96004dfaf1f0a79cb6b3af471.webp

          導(dǎo)讀

          Apache Pulsar 在 2.8.0 正式支持了事務(wù)相關(guān)的功能,Pulsar 這里提供的事務(wù)區(qū)別于 RocketMQ 中 2PC 那種事務(wù)的實(shí)現(xiàn)方式,沒有本地事務(wù)回查的機(jī)制,更類似于 Kafka 的事務(wù)實(shí)現(xiàn)機(jī)制。Apache Pulsar 中的事務(wù)主要用來保證類似 Pulsar Functions 這種流計(jì)算場景中 Exactly Once 語義的實(shí)現(xiàn),這也符合 Apache Pulsar 本身 Event Streaming 的定位,即保證端到端(End-to-End)的事務(wù)實(shí)現(xiàn)的語義。


          66e79eec89f313b23c92d9f433d5c322.webp




          作者簡介 

           冉小龍-騰訊云中間件團(tuán)隊(duì)研發(fā)工程師 

          Apache Pulsar committer

          RoP 作者及 Maintainer

          Apache BookKeeper contributor

          Apache Pulsar Go client 作者

          Apache Pulsar Go Functions作者

          Stremnative/pulsarctl 作者



          dfee715a8480e56319bf13ec8b75943b.webp

          基本概念


          為了更好的理解和實(shí)現(xiàn)事務(wù)相關(guān)的邏輯,Apache Pulsar 在這里抽象了如下幾個(gè)核心概念:


          1. Transaction Coordinator


          Transaction Coordinator(簡稱:TC) 本質(zhì)上是一個(gè) ManagerLedger 對(duì)象,是事務(wù)的管理器,所有有關(guān)事務(wù)處理的請(qǐng)求都會(huì)發(fā)送到 TC 中來協(xié)調(diào)處理 commit 和 abort 事務(wù)等請(qǐng)求。一個(gè) TC 會(huì)對(duì)應(yīng)一個(gè) Partitioned Topic,這個(gè) Topic 用來確認(rèn)當(dāng)前 TC 對(duì)應(yīng)的 Owner Broker,這里可以通過 Pulsar 自身的 Lookup 機(jī)制定位到這個(gè) Topic 對(duì)應(yīng)的 Owner Broker。這里使用 Partitioned Topic 也可以充分利用 Partitioned 自身的擴(kuò)展能力來擴(kuò)展 TC 的處理性能。Pulsar 中定義了一個(gè)特殊的 Topic 來標(biāo)記這個(gè) Topic:transaction_coordinator_assign。


          TC 的生命周期涵蓋整個(gè)事務(wù)的處理過程,其一可以確保事務(wù)的正確運(yùn)行,防止事務(wù)進(jìn)入錯(cuò)誤的狀態(tài),其二,可以確保在 client 端設(shè)置的 Transaction Timeout 過期后,能夠處理接下來的操作。有關(guān) Transaction Timeout 的詳細(xì)解釋以及作用我們后面還會(huì)繼續(xù)提到.


          2. Transection Buffer


          Pulsar 的事務(wù)消息其實(shí)早在 2.6.0 的版本之時(shí)就已經(jīng)有計(jì)劃在實(shí)現(xiàn)了,當(dāng)時(shí)所有真實(shí)的事務(wù)消息是存儲(chǔ)在 TB 中的,當(dāng) Producer 生產(chǎn)一條事務(wù)消息時(shí),會(huì)進(jìn)入 TB 中來,此時(shí)這條消息對(duì) Consumer 是不可見的,只有當(dāng)事務(wù)處理完成之后,這條對(duì)應(yīng)的消息才會(huì)投遞到 Consumer 監(jiān)聽的真實(shí)的 Topic 中。但是在 2.8.0 的實(shí)現(xiàn)中,真實(shí)的事務(wù)消息并沒有存儲(chǔ)到這里,而是投遞到了真實(shí)的 Topic 中,那么這里就有一個(gè)問題,放到真實(shí)的 Topic 中,Consumer 一直在監(jiān)聽這個(gè) Topic,它是如何保證消息不會(huì)立即被 Consumer 消費(fèi)到呢?所以 Consumer 這里必然要有相應(yīng)的處理,即它會(huì)根據(jù)事務(wù)的狀態(tài)進(jìn)行過濾,只有當(dāng)事務(wù)完成的時(shí)候 Consumer 才會(huì)消費(fèi)指定的消息。這樣做的好處就在于避免了消息寫放大的問題。之前一條事務(wù)消息會(huì)首先投遞到 TB 中,TB 消費(fèi)完成之后再投遞到真實(shí)的 Topic,所以一條消息事實(shí)上是被寫了兩次。現(xiàn)在的實(shí)現(xiàn)依賴 Consumer 的狀態(tài)過濾機(jī)制可以巧妙的規(guī)避寫放大的問題。


          3. Transaction Log


          一個(gè) TransactionLog 對(duì)應(yīng)的是一個(gè) managerLedger,所有事務(wù)的元數(shù)據(jù)信息會(huì)存儲(chǔ)到 Transaction Log 中,這樣做的好處在于,當(dāng) TC 出錯(cuò)宕機(jī)之后,可以從 Transaction Log 中恢復(fù)出來事務(wù)的元數(shù)據(jù)處理信息。一個(gè) Transaction Log 主要的操作就是 append 和 reply 這兩個(gè)動(dòng)作,append 用來將事務(wù)的操作添加到 Transaction Log 中,reply 主要從指定的 Position 位置恢復(fù)對(duì)應(yīng)的元數(shù)據(jù)信息。這里我們需要對(duì) Transaction Log 中存儲(chǔ)的數(shù)據(jù)做一下澄清,Transaction Log 中存儲(chǔ)的主要是一些元數(shù)據(jù)的信息,即事務(wù)的狀態(tài)信息,真實(shí)的事務(wù)消息我們存儲(chǔ)在其它的地方。事務(wù)的狀態(tài)有如下幾種:

          - OPEN

          - COMMITTING

          - ABORTING

          - COMMITTED

          - ABORTED

          - ERROR

          Transaction Log 主要用來存儲(chǔ)事務(wù)對(duì)應(yīng)的是上述的哪種狀態(tài),即事務(wù)執(zhí)行到了什么地方,接下來需要做什么操作以及在事務(wù)回滾時(shí)會(huì)根據(jù)這些相應(yīng)的狀態(tài)信息來確認(rèn)事務(wù)最終執(zhí)行的狀態(tài)。


          4. Transaction ID


          Transaction ID(簡稱:TxnID) 是用來唯一標(biāo)識(shí)事務(wù)的一個(gè)字段,由兩部分組成:64 位的 mostSigBits 和 64 位的 leastSigBits,總共128 位。TxnID 是由 TC 生成的,這里在 TC 端生成的好處就在于可以確保 TxnID 的全局唯一性,這個(gè)是事務(wù)執(zhí)行的基礎(chǔ)。mostSigBits 是 TC 的分區(qū) id,leastSigBits 是最新的日志 id,leastSigBits 是事務(wù)的序號(hào),該序號(hào)每次+1,可以從日志中進(jìn)行恢復(fù),不會(huì)重復(fù),兩部分都是long類型。


          5. Pending Acknowledge State


          對(duì)于一個(gè)事務(wù)消息而言,它可能由很多的普通消息構(gòu)成,對(duì)于每一條消息其都有自己的 Ack 狀態(tài),但是對(duì)于事務(wù)消息而言,只有等構(gòu)成這個(gè)事務(wù)的所有的普通消息完成之后,它才可以被正確的 commit,所以需要有一個(gè)集合來存儲(chǔ)一個(gè)事務(wù)消息中待 Ack 的消息的集合。它主要提供如下保證,其一在 Transaction Timeout 結(jié)束之前,這個(gè)事務(wù)還沒有完成,那么就需要將 Pending acknoeledge state 中未完成的消息進(jìn)行回滾,即執(zhí)行 abort 操作,回滾到事務(wù)執(zhí)行前的狀態(tài)。其二在 Transaction Timeout 之內(nèi),即事務(wù)還在執(zhí)行過程中,那么其它的事務(wù)是不可以來操作這個(gè)事務(wù)集合中的消息的。既然涉及到事務(wù)狀態(tài)的操作,那么必然會(huì)涉及到宕機(jī)之后狀態(tài)恢復(fù)的問題,Pending acknowledge state 的消息存儲(chǔ)是依賴 Cursor Log 來實(shí)現(xiàn)的,這樣新的 Broker 節(jié)點(diǎn)就可以從 Cursor Log 中恢復(fù)出來宕機(jī)之前消息的確認(rèn)狀態(tài),確保事務(wù)消息中確認(rèn)的狀態(tài)信息不會(huì)丟失。


          在這里需要說明的是,Pulsar 之前的 Ack 是沒有返回值的,也就是 Client 是不知道某一條消息是否被正確的 Ack 掉了,Ack 的狀態(tài)是由 Broker 來維護(hù)管理的。為了能讓 Client 獲取到 Ack 的狀態(tài),在事務(wù)實(shí)現(xiàn)時(shí),這里為 Ack 添加了返回值來確保 Client 側(cè)能夠知道 Ack 的具體狀態(tài)信息。


          dfee715a8480e56319bf13ec8b75943b.webp

          事物流程


          有了上面這些概念的解釋,我們對(duì) Pulsar 事務(wù)中各個(gè)主要的組件已經(jīng)有了一個(gè)了解,接下來看看一個(gè)完整的事務(wù)流程


          1. 尋找TC


          由于 TXnID 是用來唯一標(biāo)識(shí)一個(gè)事務(wù)的,且它是由 TC 生成的,所以我們?cè)陂_始事務(wù)執(zhí)行的時(shí)候,首先就需要依賴我們上述提到的 Pulsar 的 TC Topic:transaction_coordinator_assign 來找到對(duì)應(yīng)的 Owner Broker,然后在該 Broker 上創(chuàng)建對(duì)應(yīng)的 TC 來為這次事務(wù)執(zhí)行分配一個(gè)唯一的事務(wù) ID。


          2. 打開一個(gè)事物


          在尋找到這次事務(wù)歸屬于哪一個(gè) TC 協(xié)調(diào)并生成對(duì)應(yīng)的 TxnID 之后,TC 會(huì)將這個(gè)動(dòng)作同步到 Transaction Log 中,將這一個(gè)事務(wù) OPEN 的狀態(tài)信息記錄到 Transaction Log 中,一旦事務(wù)的狀態(tài)信息被持久化之后,就可以確保這個(gè)記錄不會(huì)被丟失。在完成持久化的動(dòng)作之后,TC 會(huì)將這個(gè)事務(wù)的 ID 返回給 Client 端使用。


          其實(shí)這里我們可以看到,對(duì)于一個(gè)事務(wù)而言,它執(zhí)行的每一個(gè)狀態(tài)變更信息都會(huì)首先請(qǐng)求到 TC,然后由 TC 通知 Transaction Log 進(jìn)行持久化的動(dòng)作,來確保事務(wù)的執(zhí)行狀態(tài)不會(huì)丟失。在上面我們也提到,TC 和 Transaction Log 本質(zhì)上都是一個(gè) ManagerLedger 對(duì)象,即相當(dāng)于 BookKeeper 的 client 角色,將這些日志信息持久化到了 Bookie 中。


          3. 發(fā)送事物消息


          正如第二步所說,在事務(wù)操作的過程中,TC 作為一個(gè)協(xié)調(diào)器,需要感知一個(gè)事務(wù)執(zhí)行過程中所有的狀態(tài)操作。所以在事務(wù)消息發(fā)送之前也是一樣的,需要先將這個(gè)操作告訴 TC,然后再由 TC 將這個(gè)動(dòng)作通知到 Transaction Log 持久化起來。等一切準(zhǔn)備就緒之后,Producer 開始將消息發(fā)送到真實(shí)的 Partitioned Topic 中,這里根據(jù)是否開啟 batch 分為兩種情況來討論:如果沒有開啟 batch 的話,這個(gè)和正常的消息發(fā)送流程是一致的。假設(shè)開啟了 batch 相關(guān)的功能,那么消息中會(huì)附加這個(gè)消息 TxnID, 在 broker 接收到這條消息之后,會(huì)檢查這個(gè) batch 中是否包含當(dāng)前這個(gè)事務(wù)的消息,如果包含這個(gè)事務(wù)的消息,那么 Broker 會(huì)將這個(gè) batch 寫到分區(qū)的事務(wù)緩存中來做進(jìn)一步的處理。


          4. 確認(rèn)事物消息


          同理,在確認(rèn)事務(wù)消息之前,client 會(huì)首先發(fā)送確認(rèn)消息的請(qǐng)求到 TC 中,然后由 TC 來協(xié)調(diào)在事務(wù)的執(zhí)行過程中究竟是需要做 commit 還是 abort 的操作。在準(zhǔn)備工作就緒之后,與正常創(chuàng)建一個(gè)訂閱的邏輯一樣,client 會(huì)將訂閱的請(qǐng)求發(fā)送到 Broker 中。正如我們?cè)?《Pending acknowledge state》 章節(jié)中提到的,對(duì)于 Ack 的請(qǐng)求,client 需要感知這個(gè)動(dòng)作是否成功,所以在原來 Ack 的請(qǐng)求中添加了一個(gè) TxnID. 拿到 TxnID 之后 Broker 就可以檢查這個(gè) Ack 請(qǐng)求是否屬于這次事務(wù)。之后就到了 Pending acknowledge state 組件的工作范疇了,它會(huì)決定這個(gè) Ack 請(qǐng)求可以被哪些 consumer 來確認(rèn)。


          5. 完成事物


          當(dāng)上述所有的操作流程完成之后,TC 就會(huì)知道當(dāng)前的這個(gè)事務(wù)狀態(tài)是處于可以 commit 還是 abort 的狀態(tài),此時(shí) TC 會(huì)將 COMMITTING 或者 ABORTING 的狀態(tài)寫到 Transaction Log 中持久化起來。只有將狀態(tài)記錄到 Transaction Log 中之后,這個(gè)事務(wù)才會(huì)開始執(zhí)行真正的 commit 或者 abort 動(dòng)作。當(dāng)所有的事物消息完成之后(commit 或者 abort),TC 會(huì)將這次事務(wù)的狀態(tài)標(biāo)記為 COMMITTED 或者 ABORTED 然后記錄到 Transaction Log 中,至此,一個(gè)事務(wù)的完整的處理流程就結(jié)束了。


          dfee715a8480e56319bf13ec8b75943b.webp

          對(duì)比 Kafka 事務(wù)


          Pulsar 的事務(wù)處理流程與 Kafka 的事務(wù)處理思路大致上保持一致,大家都有一個(gè) TC 以及對(duì)應(yīng)的一個(gè)用于持久化 TC 所有操作的 Topic 來記錄所有事務(wù)狀態(tài)變更的請(qǐng)求。同樣的在事務(wù)開始階段也都有一個(gè)專門的 Partitioned Topic 來去 Lookup TC 對(duì)應(yīng)的 Owner Broker 的位置在哪里。不同的是,第一:Kafka 中對(duì)于未確認(rèn)的消息是維護(hù)在 Broker 端的,但是 Pulsar 的是維護(hù)在 Client 端的,通過 Transaction Timeout 來決定這個(gè)事務(wù)是否執(zhí)行成功,所以有了 Transaction Timeout 的存在之后,就可以確保 client 和 broker 側(cè)事務(wù)處理的一致性。第二:由于 Kafka 本身沒有單條消息的 Ack,所以 Kafka 的事務(wù)處理只能是順序執(zhí)行的,當(dāng)一個(gè)事務(wù)請(qǐng)求被阻塞之后,會(huì)阻塞后續(xù)所有的事務(wù)請(qǐng)求,但是 Pulsar 是可以對(duì)消息進(jìn)行單條 Ack 的,所以在這里每一個(gè)事務(wù)的 Ack 動(dòng)作是獨(dú)立的,不會(huì)出現(xiàn)事務(wù)阻塞的情況。


          dfee715a8480e56319bf13ec8b75943b.webp

          寫在最后


          目前 TDMD 已經(jīng)基于 Apache Pulsar 應(yīng)用在多種業(yè)務(wù)場景下,騰訊云TDMQ、計(jì)平、數(shù)平等多個(gè)團(tuán)隊(duì)也在一起共建Pulsar,對(duì)Pulsar感興趣的小伙伴,歡迎關(guān)注騰訊云中間件我們下一期的分享。




          免費(fèi)體驗(yàn)館

          消息隊(duì)列CKafka

          分布式、高吞吐量、高可擴(kuò)展性的消息服務(wù),具備數(shù)據(jù)壓縮、同時(shí)支持離線和實(shí)時(shí)數(shù)據(jù)處理等優(yōu)點(diǎn)。

          掃碼即可免費(fèi)體驗(yàn)

          免費(fèi)體驗(yàn)路徑:云產(chǎn)品體驗(yàn)->基礎(chǔ)->消息隊(duì)列CKafka


          消息隊(duì)列TDMQ

          一款基于 Apache 頂級(jí)開源項(xiàng)目 Pulsar 自研的金融級(jí)分布式消息中間件。其計(jì)算與存儲(chǔ)分離的架構(gòu)設(shè)計(jì),使得它具備極好的云原生和 Serverless 特性,用戶按量使用,無需關(guān)心底層資源。

          掃碼點(diǎn)擊“立即使用”,即可免費(fèi)體驗(yàn)


          微服務(wù)平臺(tái)TSF

          穩(wěn)定、高性能的技術(shù)中臺(tái)。一個(gè)圍繞著應(yīng)用和微服務(wù)的 PaaS 平臺(tái),提供應(yīng)用全生命周期管理、數(shù)據(jù)化運(yùn)營、立體化監(jiān)控和服務(wù)治理等功能。TSF 擁抱 Spring Cloud 、Service Mesh 微服務(wù)框架,幫助企業(yè)客戶解決傳統(tǒng)集中式架構(gòu)轉(zhuǎn)型的困難,打造大規(guī)模高可用的分布式系統(tǒng)架構(gòu),實(shí)現(xiàn)業(yè)務(wù)、產(chǎn)品的快速落地。

          掃碼點(diǎn)擊“免費(fèi)體驗(yàn)”,即可免費(fèi)體驗(yàn)


          微服務(wù)引擎TSE

          高效、穩(wěn)定的注冊(cè)中心托管,助力您快速實(shí)現(xiàn)微服務(wù)架構(gòu)轉(zhuǎn)型。

          掃碼點(diǎn)擊“立即申請(qǐng)”,即可免費(fèi)體驗(yàn)


          彈性微服務(wù)TEM

          面向微服務(wù)應(yīng)用的 Serverless PaaS 平臺(tái),實(shí)現(xiàn)資源 Serverless 化與微服務(wù)架構(gòu)的完美結(jié)合,提供一整套開箱即用的微服務(wù)解決方案。彈性微服務(wù)幫助用戶創(chuàng)建和管理云資源,并提供秒級(jí)彈性伸縮,用戶可按需使用、按量付費(fèi),極大程度上幫用戶節(jié)約運(yùn)維和資源成本。讓用戶充分聚焦企業(yè)核心業(yè)務(wù)本身,助力業(yè)務(wù)成功。

          掃碼點(diǎn)擊“立即申請(qǐng)”,即可免費(fèi)體驗(yàn)



          往期

          推薦


          《玩轉(zhuǎn)Kafka Raft模式-入門寶典》

          《Tencent Kona JDK11無暫停內(nèi)存管理ZGC生產(chǎn)實(shí)踐》

          《騰訊云中間件月報(bào)(2021年第六期)》



          45e474743c5c2bf65ea2933452e343c6.webp


          掃描下方二維碼關(guān)注本公眾號(hào),

          了解更多微服務(wù)、消息隊(duì)列的相關(guān)信息!

          解鎖超多鵝廠周邊!


          79a1284dd1d015eb6063beba2db8ca69.webp戳原文,了解更多騰訊微服務(wù)平臺(tái)相關(guān)信息
          瀏覽 67
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  五月婷婷中文字幕 | 欧美色图亚洲另类 | 五月天天天操 | 精品孕妇一级A片免费看 | 操B在线视频 |