消息隊(duì)列之事務(wù)消息,RocketMQ 和 Kafka是如何做的?

每個(gè)時(shí)代,都不會(huì)虧待會(huì)學(xué)習(xí)的人。
今天我們來(lái)談一談消息隊(duì)列的事務(wù)消息,一說(shuō)起事務(wù)相信大家都不陌生,腦海里蹦出來(lái)的就是 ACID。
通常我們理解的事務(wù)就是為了一些更新操作要么都成功,要么都失敗,不會(huì)有中間狀態(tài)的產(chǎn)生,而 ACID 是一個(gè)嚴(yán)格的事務(wù)實(shí)現(xiàn)的定義,不過(guò)在單體系統(tǒng)時(shí)候一般都不會(huì)嚴(yán)格的遵循 ACID 的約束來(lái)實(shí)現(xiàn)事務(wù),更別說(shuō)分布式系統(tǒng)了。
分布式系統(tǒng)往往只能妥協(xié)到最終一致性,保證數(shù)據(jù)最終的完整性和一致性,主要原因就是實(shí)力不允許...因?yàn)榭捎眯詾橥酢?/p>
而且要保證完全版的事務(wù)實(shí)現(xiàn)代價(jià)很大,你想想要維護(hù)這么多系統(tǒng)的數(shù)據(jù),不允許有中間狀態(tài)數(shù)據(jù)可以被讀取,所有的操作必須不可分割,這意味著一個(gè)事務(wù)的執(zhí)行是阻塞的,資源是被長(zhǎng)時(shí)間鎖定的。
在高并發(fā)情況下資源被長(zhǎng)時(shí)間的占用,就是致命的傷害,舉一個(gè)有味道的例子,如廁高峰期,好了懂得都懂。

對(duì)了, ACID 是什么還不太清楚的同學(xué),趕緊去查一查,這里我就不展開(kāi)說(shuō)了。
分布式事務(wù)
那說(shuō)到分布式事務(wù),常見(jiàn)的有 2PC、TCC 和事務(wù)消息,這篇文章重點(diǎn)就是事務(wù)消息,不過(guò) 2PC 和 TCC 我稍微提一下。
2PC
2PC 就是二階段提交,分別有協(xié)調(diào)者和參與者兩個(gè)角色,二階段分別是準(zhǔn)備階段和提交階段。
準(zhǔn)備階段就是協(xié)調(diào)者向各參與者發(fā)送準(zhǔn)備命令,這個(gè)階段參與者除了事務(wù)的提交啥都做了,而提交階段就是協(xié)調(diào)者看看各個(gè)參與者準(zhǔn)備階段都 o 不 ok,如果有 ok 那么就向各個(gè)參與者發(fā)送提交命令,如果有一個(gè)不 ok 那么就發(fā)送回滾命令。
這里的重點(diǎn)就是 2PC 只適用于數(shù)據(jù)庫(kù)層面的事務(wù),什么意思呢?就是你想在數(shù)據(jù)庫(kù)里面寫(xiě)一條數(shù)據(jù)同時(shí)又要上傳一張圖片,這兩個(gè)操作 2PC 無(wú)法保證兩個(gè)操作滿(mǎn)足事務(wù)的約束。
而且 2PC 是一種強(qiáng)一致性的分布式事務(wù),它是同步阻塞的,即在接收到提交或回滾命令之前,所有參與者都是互相等待,特別是執(zhí)行完準(zhǔn)備階段的時(shí)候,此時(shí)的資源都是鎖定的狀態(tài),假如有一個(gè)參與者卡了很久,其他參與者都得等它,產(chǎn)生長(zhǎng)時(shí)間資源鎖定狀態(tài)下的阻塞。
總體而言效率低,并且存在單點(diǎn)故障問(wèn)題,協(xié)調(diào)者是就是那個(gè)單點(diǎn),并且在極端條件下存在數(shù)據(jù)不一致的風(fēng)險(xiǎn),例如某個(gè)參與者未收到提交命令,此時(shí)宕機(jī)了,恢復(fù)之后數(shù)據(jù)是回滾的,而其他參與者其實(shí)都已經(jīng)執(zhí)行了提交事務(wù)的命令了。
TCC
TCC 能保證業(yè)務(wù)層面的事務(wù),也就是說(shuō)它不僅僅是數(shù)據(jù)庫(kù)層面,上面的上傳圖片這種操作它也能做。
TCC 分為三個(gè)階段 try - confirm - cancel,簡(jiǎn)單的說(shuō)就是每個(gè)業(yè)務(wù)都需要有這三個(gè)方法,先都執(zhí)行 try 方法,這一階段不會(huì)做真正的業(yè)務(wù)操作,只是先占個(gè)坑,什么意思呢?比如打算加 10 個(gè)積分,那先在預(yù)添加字段加上這 10 積分,這個(gè)時(shí)候用戶(hù)賬上的積分其實(shí)是沒(méi)有增加的。
然后如果都 try 成功了那么就執(zhí)行 confirm 方法,大家都來(lái)做真正的業(yè)務(wù)操作,如果有一個(gè) try 失敗了那么大家都執(zhí)行 cancel 操作,來(lái)撤回剛才的修改。
可以看到 TCC 其實(shí)對(duì)業(yè)務(wù)的耦合性很大,因?yàn)闃I(yè)務(wù)上需要做一定的改造才能完成這三個(gè)方法,這其實(shí)就是 TCC 的缺點(diǎn),并且 confirm 和 cancel 操作要注意冪等,因?yàn)榈綀?zhí)行這兩步的時(shí)候沒(méi)有退路,是務(wù)必要完成的,因此需要有重試機(jī)制,所以需要保證方法冪等。
事務(wù)消息
事務(wù)消息就是今天文章的主角了,它主要是適用于異步更新的場(chǎng)景,并且對(duì)數(shù)據(jù)實(shí)時(shí)性要求不高的地方。
它的目的是為了解決消息生產(chǎn)者與消息消費(fèi)者的數(shù)據(jù)一致性問(wèn)題。
比如你點(diǎn)外賣(mài),我們先選了炸雞加入購(gòu)物車(chē),又選了瓶可樂(lè),然后下單,付完款這個(gè)流程就結(jié)束了。

而購(gòu)物車(chē)?yán)锩娴臄?shù)據(jù)就很適合用消息通知異步刪除,因?yàn)橐话愣晕覀兿峦陠尾粫?huì)再去點(diǎn)開(kāi)這個(gè)店家的菜單,而且就算點(diǎn)開(kāi)了購(gòu)物車(chē)?yán)镞€有這些菜品也沒(méi)有關(guān)系,影響不大。
我們希望的就是下單成功之后購(gòu)物車(chē)的菜品最終會(huì)被刪除,所以要點(diǎn)就是下單和發(fā)消息這兩個(gè)步驟要么都成功要么都失敗。
RocketMQ 事務(wù)消息
我們先來(lái)看一下 RocketMQ 是如何實(shí)現(xiàn)事務(wù)消息的。
RocketMQ 的事務(wù)消息也可以被認(rèn)為是一個(gè)兩階段提交,簡(jiǎn)單的說(shuō)就是在事務(wù)開(kāi)始的時(shí)候會(huì)先發(fā)送一個(gè)半消息給 Broker。
半消息的意思就是這個(gè)消息此時(shí)對(duì) Consumer 是不可見(jiàn)的,而且也不是存在真正要發(fā)送的隊(duì)列中,而是一個(gè)特殊隊(duì)列。
發(fā)送完半消息之后再執(zhí)行本地事務(wù),再根據(jù)本地事務(wù)的執(zhí)行結(jié)果來(lái)決定是向 Broker 發(fā)送提交消息,還是發(fā)送回滾消息。
此時(shí)有人說(shuō)這一步發(fā)送提交或者回滾消息失敗了怎么辦?
影響不大,Broker 會(huì)定時(shí)的向 Producer 來(lái)反查這個(gè)事務(wù)是否成功,具體的就是 Producer 需要暴露一個(gè)接口,通過(guò)這個(gè)接口 Broker 可以得知事務(wù)到底有沒(méi)有執(zhí)行成功,沒(méi)成功就返回未知,因?yàn)橛锌赡苁聞?wù)還在執(zhí)行,會(huì)進(jìn)行多次查詢(xún)。
如果成功那么就將半消息恢復(fù)到正常要發(fā)送的隊(duì)列中,這樣消費(fèi)者就可以消費(fèi)這條消息了。
我們?cè)賮?lái)簡(jiǎn)單的看下如何使用,我根據(jù)官網(wǎng)示例代碼簡(jiǎn)化了下。

可以看到使用起來(lái)還是很簡(jiǎn)便直觀的,無(wú)非就是多加個(gè)反查事務(wù)結(jié)果的方法,然后把本地事務(wù)執(zhí)行的過(guò)程寫(xiě)在 TransationListener 里面。
至此 RocketMQ 事務(wù)消息大致的流程已經(jīng)清晰了,我們畫(huà)一張整體的流程圖來(lái)過(guò)一遍,其實(shí)到第四步這個(gè)消息要么就是正常的消息,要么就是拋棄什么都不存在,此時(shí)這個(gè)事務(wù)消息已經(jīng)結(jié)束它的生命周期了。

RocketMQ 事務(wù)消息源碼分析
然后我們?cè)購(gòu)脑创a的角度來(lái)看看到底是怎么做的,首先我們看下sendMessageInTransaction 方法,方法有點(diǎn)長(zhǎng),不過(guò)沒(méi)有關(guān)系結(jié)構(gòu)還是很清晰的。

流程也就是我們上面分析的,將消息塞入一些屬性,標(biāo)明此時(shí)這個(gè)消息還是半消息,然后發(fā)送至 Broker,然后執(zhí)行本地事務(wù),然后將本地事務(wù)的執(zhí)行狀態(tài)發(fā)送給 Broker ,我們現(xiàn)在再來(lái)看下 Broker 到底是怎么處理這個(gè)消息的。
在 Broker 的 SendMessageProcessor#sendMessage 中會(huì)處理這個(gè)半消息請(qǐng)求,因?yàn)榻裉熘饕治龅氖鞘聞?wù)消息,所以其他流程不做分析,我大致的說(shuō)一下原理。
簡(jiǎn)單的說(shuō)就是 sendMessage 中查到接受來(lái)的消息的屬性里面MessageConst.PROPERTY_TRANSACTION_PREPARED 是 true ,那么可以得知這個(gè)消息是事務(wù)消息,然后再判斷一下這條消息是否超過(guò)最大消費(fèi)次數(shù),是否要延遲,Broker 是否接受事務(wù)消息等操作后,將這條消息真正的 topic 和隊(duì)列存入屬性中,然后重置消息的 topic 為RMQ_SYS_TRANS_HALF_TOPIC,并且隊(duì)列是 0 的隊(duì)列中,使得消費(fèi)者無(wú)法讀取這個(gè)消息。
以上就是整體處理半消息的流程,我們來(lái)看一下源碼。

就是來(lái)了波貍貓換太子,其實(shí)延時(shí)消息也是這么實(shí)現(xiàn)的,最終將換了皮的消息入盤(pán)。
Broker 處理提交或者回滾消息的處理方法是 EndTransactionProcessor#processRequest,我們來(lái)看一看它做了什么操作。

可以看到,如果是提交事務(wù)就是把皮再換回來(lái)寫(xiě)入真正的topic所屬的隊(duì)列中,供消費(fèi)者消費(fèi),如果是回滾則是將半消息記錄到一個(gè) half_op 主題下,到時(shí)候后臺(tái)服務(wù)掃描半消息的時(shí)候就依據(jù)其來(lái)判斷這個(gè)消息已經(jīng)處理過(guò)了。
那個(gè)后臺(tái)服務(wù)就是 TransactionalMessageCheckService 服務(wù),它會(huì)定時(shí)的掃描半消息隊(duì)列,去請(qǐng)求反查接口看看事務(wù)成功了沒(méi),具體執(zhí)行的就是TransactionalMessageServiceImpl#check 方法。
我大致說(shuō)一下流程,這一步驟其實(shí)涉及到的代碼很多,我就不貼代碼了,有興趣的同學(xué)自行了解。不過(guò)我相信用語(yǔ)言也是能說(shuō)清楚的。
首先取半消息 topic 即RMQ_SYS_TRANS_HALF_TOPIC下的所有隊(duì)列,如果還記得上面內(nèi)容的話(huà),就知道半消息寫(xiě)入的隊(duì)列是 id 是 0 的這個(gè)隊(duì)列,然后取出這個(gè)隊(duì)列對(duì)應(yīng)的 half_op 主題下的隊(duì)列,即 RMQ_SYS_TRANS_OP_HALF_TOPIC 主題下的隊(duì)列。
這個(gè) half_op 主要是為了記錄這個(gè)事務(wù)消息已經(jīng)被處理過(guò),也就是說(shuō)已經(jīng)得知此事務(wù)消息是提交的還是回滾的消息會(huì)被記錄在 half_op 中。
然后調(diào)用 fillOpRemoveMap 方法,從 half_op 取一批已經(jīng)處理過(guò)的消息來(lái)去重,將那些沒(méi)有記錄在 half_op 里面的半消息調(diào)用 putBackHalfMsgQueue 又寫(xiě)入了 commitlog 中,然后發(fā)送事務(wù)反查請(qǐng)求,這個(gè)反查請(qǐng)求也是 oneWay,即不會(huì)等待響應(yīng)。當(dāng)然此時(shí)的半消息隊(duì)列的消費(fèi) offset 也會(huì)推進(jìn)。
然后producer中的 ClientRemotingProcessor#processRequest 會(huì)處理這個(gè)請(qǐng)求,會(huì)把任務(wù)扔到 TransactionMQProducer 的線程池中進(jìn)行,最終會(huì)調(diào)用上面我們發(fā)消息時(shí)候定義的 checkLocalTransactionState 方法,然后將事務(wù)狀態(tài)發(fā)送給 Broker,也是用 oneWay 的方式。
看到這里相信大家會(huì)有一些疑問(wèn),比如為什么要有個(gè) half_op ,為什么半消息處理了還要再寫(xiě)入 commitlog 中別急聽(tīng)我一一道來(lái)。
首先 RocketMQ 的設(shè)計(jì)就是順序追加寫(xiě)入,所以說(shuō)不會(huì)更改已經(jīng)入盤(pán)的消息,那事務(wù)消息又需要更新反查的次數(shù),超過(guò)一定反查失敗就判定事務(wù)回滾。
因此每一次要反查的時(shí)候就將以前的半消息再入盤(pán)一次,并且往前推進(jìn)消費(fèi)進(jìn)度。而 half_op 又會(huì)記錄每一次反查的結(jié)果,不論是提交還是回滾都會(huì)記錄,因此下一次還循環(huán)到處理此半消息的時(shí)候,可以從 half_op 得知此事務(wù)已經(jīng)結(jié)束了,因此就被過(guò)濾掉不需要處理了。
如果得到的反查的結(jié)果是 UNKNOW,那 half_op 中也不會(huì)記錄此結(jié)果,因此還能再次反查,并且更新反查次數(shù)。
到現(xiàn)在整個(gè)流程已經(jīng)清晰了,我再畫(huà)個(gè)圖總結(jié)一下 Broker 的事務(wù)處理流程。

Kafka 事務(wù)消息
Kafka 的事務(wù)消息和 RocketMQ 的事務(wù)消息又不一樣了,RocketMQ 解決的是本地事務(wù)的執(zhí)行和發(fā)消息這兩個(gè)動(dòng)作滿(mǎn)足事務(wù)的約束。
而 Kafka 事務(wù)消息則是用在一次事務(wù)中需要發(fā)送多個(gè)消息的情況,保證多個(gè)消息之間的事務(wù)約束,即多條消息要么都發(fā)送成功,要么都發(fā)送失敗,就像下面代碼所演示的。

Kafka 的事務(wù)基本上是配合其冪等機(jī)制來(lái)實(shí)現(xiàn) Exactly Once 語(yǔ)義的,所以說(shuō) Kafka 的事務(wù)消息不是我們想的那種事務(wù)消息,RocketMQ 的才是。
講到這我就想扯一下了,說(shuō)到這個(gè) Exactly Once 其實(shí)不太清楚的同學(xué)很容易會(huì)誤解。
我們知道消息可靠性有三種,分別是最多一次、恰好一次、最少一次,之前在消息隊(duì)列連環(huán)問(wèn)的文章我已經(jīng)提到了基本上我們都是用最少一次然后配合消費(fèi)者端的冪等來(lái)實(shí)現(xiàn)恰好一次。
消息恰好被消費(fèi)一次當(dāng)然我們所有人追求的,但是之前文章我已經(jīng)從各方面已經(jīng)分析過(guò)了,基本上難以達(dá)到。
而 Kafka 竟說(shuō)它能實(shí)現(xiàn) Exactly Once?這么牛啤嗎?這其實(shí)是 Kafka 的一個(gè)噱頭,你要說(shuō)他錯(cuò),他還真沒(méi)錯(cuò),你要說(shuō)他對(duì)但是他實(shí)現(xiàn)的 Exactly Once 不是你心中想的那個(gè) Exactly Once。
它的恰好一次只能存在一種場(chǎng)景,就是從 Kafka 作為消息源,然后做了一番操作之后,再寫(xiě)入 Kafka 中。

那他是如何實(shí)現(xiàn)恰好一次的?就是通過(guò)冪等,和我們?cè)跇I(yè)務(wù)上實(shí)現(xiàn)的一樣通過(guò)一個(gè)唯一 Id, 然后記錄下來(lái),如果已經(jīng)記錄過(guò)了就不寫(xiě)入,這樣來(lái)保證恰好一次。
所以說(shuō) Kafka 實(shí)現(xiàn)的是在特定場(chǎng)景下的恰好一次,不是我們所想的利用 Kafka 來(lái)發(fā)送消息,那么這條消息只會(huì)恰巧被消費(fèi)一次。
這其實(shí)和 Redis 說(shuō)他實(shí)現(xiàn)事務(wù)了一樣,也不是我們心想的事務(wù)。
所以開(kāi)源軟件說(shuō)啥啥特性開(kāi)發(fā)出來(lái)了,我們一味的相信,因此其往往都是殘血的或者在特殊的場(chǎng)景下才能滿(mǎn)足,不要被誤導(dǎo)了,不能相信表面上的描述,還得詳細(xì)的看看文檔或者源碼。
不過(guò)從另一個(gè)角度看也無(wú)可厚非,作為一個(gè)開(kāi)源軟件肯定是想更多的人用,我也沒(méi)說(shuō)謊呀,我文檔上寫(xiě)的很清楚的,這標(biāo)題也沒(méi)騙人吧?
確實(shí),比如你點(diǎn)進(jìn)震驚xxxx標(biāo)題的文章,人家也沒(méi)騙你啥,他自己確實(shí)震驚的呢。

再回來(lái)談 Kafka 的事務(wù)消息,所以說(shuō)這個(gè)事務(wù)消息不是我們想要的那個(gè)事務(wù)消息,其實(shí)不是今天的主題了,不過(guò)我還是簡(jiǎn)單的說(shuō)一下。
Kafka 的事務(wù)有事務(wù)協(xié)調(diào)者角色,事務(wù)協(xié)調(diào)者其實(shí)就是 Broker 的一部分。
在開(kāi)始事務(wù)的時(shí)候,生產(chǎn)者會(huì)向事務(wù)協(xié)調(diào)者發(fā)起請(qǐng)求表示事務(wù)開(kāi)啟,事務(wù)協(xié)調(diào)者會(huì)將這個(gè)消息記錄到特殊的日志-事務(wù)日志中,然后生產(chǎn)者再發(fā)送真正想要發(fā)送的消息,這里 Kafka 和 RocketMQ 處理不一樣,Kafka 會(huì)像對(duì)待正常消息一樣處理這些事務(wù)消息,由消費(fèi)端來(lái)過(guò)濾這個(gè)消息。
然后發(fā)送完畢之后生產(chǎn)者會(huì)向事務(wù)協(xié)調(diào)者發(fā)送提交或者回滾請(qǐng)求,由事務(wù)協(xié)調(diào)者來(lái)進(jìn)行兩階段提交,如果是提交那么會(huì)先執(zhí)行預(yù)提交,即把事務(wù)的狀態(tài)置為預(yù)提交然后寫(xiě)入事務(wù)日志,然后再向所有事務(wù)有關(guān)的分區(qū)寫(xiě)入一條類(lèi)似事務(wù)結(jié)束的消息,這樣消費(fèi)端消費(fèi)到這個(gè)消息的時(shí)候就知道事務(wù)好了,可以把消息放出來(lái)了。
最后協(xié)調(diào)者會(huì)向事務(wù)日志中再記一條事務(wù)結(jié)束信息,至此 Kafka 事務(wù)就完成了,我拿 confluent.io 上的圖來(lái)總結(jié)一下這個(gè)流程。

最后
至此我們已經(jīng)知道了 RocketMQ 和 Kakfa 的事務(wù)消息全流程,可以看到 RocketMQ 的事務(wù)消息才是我們想要的,當(dāng)然你要是用的流式計(jì)算那么 Kakfa 的事務(wù)消息也是你想要的。
需要貼代碼的文章其實(shí)很難受,這貼的多不好,貼的少又怕不清晰,真的難,如果覺(jué)得文章不錯(cuò)記得點(diǎn)個(gè)在看喲。

