kafka新老集群平滑遷移實(shí)戰(zhàn)
前言
公司一直使用云上的kafka服務(wù),隨著業(yè)務(wù)規(guī)模和體量的增大,使用云上的服務(wù)成本相對(duì)比較高,所以考慮本地自建kafka集群對(duì)外提供服務(wù)。
因此,需要把正在運(yùn)行的還在使用云上kafka的業(yè)務(wù)服務(wù)遷移到本地自建集群上。
要求
代碼改動(dòng)小
升級(jí)過程中的穩(wěn)定性
升級(jí)后消息發(fā)送與消費(fèi)的正確性
遷移方案
1、雙寫/雙讀
顧名思義,生產(chǎn)端:消息同時(shí)發(fā)送新、老集群,消費(fèi)端:同時(shí)消費(fèi)兩個(gè)集群的消息。

等到確認(rèn)新集群的穩(wěn)定和消息正確性后,逐漸下掉對(duì)老集群的依賴。
發(fā)送端雙寫還好做,難點(diǎn)在于消費(fèi)端消費(fèi)遷移實(shí)現(xiàn)上,主要可能有以下幾種問題:
如果采用先消費(fèi)到備庫上,后續(xù)備庫再切換為主庫,很多業(yè)務(wù)在其目前場(chǎng)景下其實(shí)很難實(shí)現(xiàn)。
消費(fèi)端對(duì)新集群消息的消費(fèi)邏輯只是空轉(zhuǎn)意義不大,如果期望檢測(cè)消費(fèi)的新、老集群的消息一致性,開發(fā)成本也是比較高。
消費(fèi)端不采用雙讀方案,不消費(fèi)新集群的消息。最后直接切換到新集群開始消費(fèi),這樣需要保證消費(fèi)的冪等性。但是很多場(chǎng)景下是無法保證的,比如使用了第三方大數(shù)據(jù)相關(guān)的組件。
采用雙寫/雙讀的方案,很多項(xiàng)目相關(guān)負(fù)責(zé)的同學(xué),肯定也是無法接受的,畢竟代碼改造太多了,開發(fā)成本太高。
所以,優(yōu)先不考慮這種方案,采用了下面這種數(shù)據(jù)同步的方案。
2、數(shù)據(jù)同步
采用消息同步工具,將老集群的消息直接同步到新集群,客戶端不再需要雙讀/雙寫,最后切換的時(shí)候直接修改為新集群的地址,重新發(fā)布即可。
整個(gè)過程如下:

最后下掉老集群。
這樣無論對(duì)發(fā)送端或消費(fèi)端都是是極其友好,且“平滑”。
我最終考慮并采用的是這個(gè)方案,唯一的問題便是遷移過程中的所有問題和壓力都從業(yè)務(wù)側(cè)的同學(xué)轉(zhuǎn)移到我們這邊,比如:發(fā)送的消息如何同步,如何避免消費(fèi)端切換后,重復(fù)消費(fèi)或者漏掉消息未消費(fèi)。
遷移過程
遷移基本流程正如前面流程圖展示的:數(shù)據(jù)同步->遷移生產(chǎn)端/消費(fèi)端。
生產(chǎn)端和消費(fèi)端沒有先后切換新集群上的順序要求,但是如果先把生產(chǎn)端切換到新集群,消費(fèi)端就無法從老集群繼續(xù)消費(fèi)消息了,需要在消息的過期時(shí)間內(nèi),趕緊也切換到新集群。
但是如果消費(fèi)端先切,則發(fā)送端可以在之后的任何時(shí)間。
1、消息同步
消息同步是第一步,kafka的消息同步工具在業(yè)內(nèi)有做的比較好的商業(yè)版提供,同時(shí)也有開源版本供使用。
我選取的是官方自帶的kafka-mirror-maker工具。
但是不能直接拿來用,否則同步過來的消息無法繼續(xù)我下面的方案。
kafka-mirror-maker的默認(rèn)實(shí)現(xiàn)就是消費(fèi)老集群指定topic的消息并重新發(fā)送到新集群,且發(fā)送的時(shí)候未指定分區(qū)。但是我需要保證新老集群上每條消息在每個(gè)分區(qū)上的順序保持一致,不能出現(xiàn)消息在老集群的分區(qū)0上,同步后被發(fā)送到新集群的分區(qū)1上。
因此進(jìn)行適當(dāng)改造,如下,構(gòu)造消息的時(shí)候指定發(fā)送的分區(qū):
private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestampCollections.singletonList(new ProducerRecord(record.topic, record.partition, timestamp, record.key, record.value, record.headers))}}
重新編譯打包。
其實(shí)執(zhí)行同步前,確保要同步的topic,已經(jīng)在新集群創(chuàng)建并且新、老集群的分區(qū)數(shù)保持一致。
最后,同步的時(shí)候,我們并不需要一次性把所有的topic消息都向新集群同步。有針對(duì)性的處理,要遷移哪個(gè)服務(wù),同步對(duì)應(yīng)該topic的消息,遷移完成,停掉對(duì)應(yīng)的同步進(jìn)程,然后繼續(xù)下個(gè)服務(wù)。
當(dāng)然為了方便,我同時(shí)開發(fā)了對(duì)應(yīng)的啟停腳本,盡可能方便、規(guī)范的進(jìn)行。
下面是我計(jì)劃的協(xié)作流程:

因?yàn)槲覀冏越洪_啟了ACL,老集群并沒有,所以中間涉及到ACL相關(guān)的處理。
2、消費(fèi)位點(diǎn)同步
消費(fèi)端切換集群這一步問題也是最多最復(fù)雜的。
對(duì)于生產(chǎn)者來說,因?yàn)樽髁讼⑼?,新老集群消息及消息在分區(qū)的位置都是一致的,所以直接修改地址切換新集群繼續(xù)發(fā)送即可。
對(duì)于消費(fèi)端來說,如何在切換到新集群后,還能繼續(xù)相對(duì)于老集群中原來的消費(fèi)位點(diǎn)繼續(xù)消費(fèi)是一個(gè)問題,需要保證消息不會(huì)重復(fù)消費(fèi),也不會(huì)丟失消費(fèi)。所以計(jì)算在新集群中的消費(fèi)位點(diǎn)是很重要的。
正常情況下,kafka是沒有創(chuàng)建消費(fèi)組的功能,我們現(xiàn)在要做的就是,如果消費(fèi)端切換新集群后,就已經(jīng)知道現(xiàn)在要做的這個(gè)消費(fèi)組要從這個(gè)位點(diǎn)繼續(xù)消費(fèi)了。
對(duì)于這個(gè)問題,我開發(fā)了一個(gè)功能,可以新增訂閱:

輸入一個(gè)消費(fèi)組,并選擇集群已經(jīng)存在topic,創(chuàng)建訂閱關(guān)系,示例如下,我創(chuàng)建一個(gè)訂閱test_topic的消費(fèi)組:

如果現(xiàn)在消費(fèi)組為test_consumer的消費(fèi)端,現(xiàn)在要從老集群切換到新集群繼續(xù)消費(fèi),我們只要保證它不會(huì)出現(xiàn)重復(fù)消費(fèi),也不會(huì)漏掉消息即可。
所以這里解決方案的關(guān)鍵點(diǎn)就是需要將每個(gè)分區(qū)的消費(fèi)位點(diǎn),進(jìn)行重置,重置的位置就是想當(dāng)于在老集群中消費(fèi)到的消息位點(diǎn)。
這個(gè)位點(diǎn)計(jì)算還是比較麻煩的,因?yàn)樾吕霞褐懈鞣謪^(qū)位點(diǎn)是一定不一致的。比如對(duì)于test_topic,可能此時(shí)的最小位點(diǎn)是1001(1001以前的消息過期刪除了),日志最大位點(diǎn)是1300,所以現(xiàn)在實(shí)際某個(gè)分區(qū)有300條消息。但是同步到新集群,位點(diǎn)是從0開始,該分區(qū)在新集群最大位點(diǎn)就是300。
如果同步進(jìn)程運(yùn)行了好久,消費(fèi)端才遷移新集群,新老集群的消息留存時(shí)間又不太一致,都刪過幾次過期消息,則老集群可能日志的最小位點(diǎn)是3001,最大位點(diǎn)是3300,新集群的最小位點(diǎn)是1000,最大位點(diǎn)是2300,兩個(gè)集群現(xiàn)在留存的消息數(shù)都不一致了。
所以我這里采用了兩階段的同步方案:
準(zhǔn)備同步消息的時(shí)候,進(jìn)行位點(diǎn)標(biāo)記
準(zhǔn)備遷移消費(fèi)端的時(shí)候,進(jìn)行消費(fèi)位點(diǎn)同步


大概就是上面說明的,通過相關(guān)的標(biāo)記和計(jì)算確保新老集群消費(fèi)的相對(duì)位點(diǎn)在各處場(chǎng)景下都是一致的,同步功能做了必要的效驗(yàn),會(huì)保證同步的時(shí)候環(huán)境是預(yù)期的否則同步不了。如果最后同步階段失敗,則清除新集群中該topic相關(guān)數(shù)據(jù),重新同步,重新執(zhí)行這個(gè)流程即可(兜底解決方案)。
數(shù)據(jù)同步問題
數(shù)據(jù)同步過程中也是可能出現(xiàn)問題的,有些難以預(yù)料且致命的,如果消費(fèi)端還沒遷移,我們可以清除數(shù)據(jù),重新同步,影響不大。
如果在同步過程中,同步進(jìn)程掛掉,重啟可能導(dǎo)致新老集群數(shù)據(jù)不一致。因?yàn)橥揭彩且贿呄M(fèi)老集群一邊發(fā)送到新集群,所以無法保證在掛掉重啟的時(shí)候,是否會(huì)重復(fù)同步那一批次消息。
這個(gè)時(shí)候也是有解決方案的:
上面說過,在同步前進(jìn)行了相關(guān)位移對(duì)齊,這時(shí)候可以查看相關(guān)對(duì)齊信息,人工重置用來同步的消費(fèi)組的消費(fèi)位點(diǎn)來保證消息一致。
最后,如何確認(rèn)新老集群的消息是一致。這個(gè)時(shí)候假定同步過程,消息體沒有出問題(這個(gè)出問題確實(shí)不好校驗(yàn),認(rèn)命了),只要查看對(duì)齊記錄里的位移信息進(jìn)行計(jì)算,確認(rèn)從同步一來,新老集群每個(gè)分區(qū)的消息數(shù)都是一樣,就可以確認(rèn)消息一致的(不保證消息體的內(nèi)容也沒問題,比如丟包導(dǎo)致的)。
推薦閱讀:
世界的真實(shí)格局分析,地球人類社會(huì)底層運(yùn)行原理
不是你需要中臺(tái),而是一名合格的架構(gòu)師(附各大廠中臺(tái)建設(shè)PPT)
企業(yè)IT技術(shù)架構(gòu)規(guī)劃方案
論數(shù)字化轉(zhuǎn)型——轉(zhuǎn)什么,如何轉(zhuǎn)?
企業(yè)10大管理流程圖,數(shù)字化轉(zhuǎn)型從業(yè)者必備!
【中臺(tái)實(shí)踐】華為大數(shù)據(jù)中臺(tái)架構(gòu)分享.pdf
華為如何實(shí)施數(shù)字化轉(zhuǎn)型(附PPT)
