<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          kafka新老集群平滑遷移實(shí)戰(zhàn)

          共 3339字,需瀏覽 7分鐘

           ·

          2022-09-17 21:21



          前言


          公司一直使用云上的kafka服務(wù),隨著業(yè)務(wù)規(guī)模和體量的增大,使用云上的服務(wù)成本相對(duì)比較高,所以考慮本地自建kafka集群對(duì)外提供服務(wù)。


          因此,需要把正在運(yùn)行的還在使用云上kafka的業(yè)務(wù)服務(wù)遷移到本地自建集群上。


          要求


          • 代碼改動(dòng)小

          • 升級(jí)過程中的穩(wěn)定性

          • 升級(jí)后消息發(fā)送與消費(fèi)的正確性


          遷移方案


          1、雙寫/雙讀


          顧名思義,生產(chǎn)端:消息同時(shí)發(fā)送新、老集群,消費(fèi)端:同時(shí)消費(fèi)兩個(gè)集群的消息。



          等到確認(rèn)新集群的穩(wěn)定和消息正確性后,逐漸下掉對(duì)老集群的依賴。


          發(fā)送端雙寫還好做,難點(diǎn)在于消費(fèi)端消費(fèi)遷移實(shí)現(xiàn)上,主要可能有以下幾種問題:


          • 如果采用先消費(fèi)到備庫上,后續(xù)備庫再切換為主庫,很多業(yè)務(wù)在其目前場(chǎng)景下其實(shí)很難實(shí)現(xiàn)。


          • 消費(fèi)端對(duì)新集群消息的消費(fèi)邏輯只是空轉(zhuǎn)意義不大,如果期望檢測(cè)消費(fèi)的新、老集群的消息一致性,開發(fā)成本也是比較高。


          • 消費(fèi)端不采用雙讀方案,不消費(fèi)新集群的消息。最后直接切換到新集群開始消費(fèi),這樣需要保證消費(fèi)的冪等性。但是很多場(chǎng)景下是無法保證的,比如使用了第三方大數(shù)據(jù)相關(guān)的組件。


          • 采用雙寫/雙讀的方案,很多項(xiàng)目相關(guān)負(fù)責(zé)的同學(xué),肯定也是無法接受的,畢竟代碼改造太多了,開發(fā)成本太高。


          所以,優(yōu)先不考慮這種方案,采用了下面這種數(shù)據(jù)同步的方案。


          2、數(shù)據(jù)同步


          采用消息同步工具,將老集群的消息直接同步到新集群,客戶端不再需要雙讀/雙寫,最后切換的時(shí)候直接修改為新集群的地址,重新發(fā)布即可。


          整個(gè)過程如下:



          最后下掉老集群。


          這樣無論對(duì)發(fā)送端或消費(fèi)端都是是極其友好,且“平滑”。


          我最終考慮并采用的是這個(gè)方案,唯一的問題便是遷移過程中的所有問題和壓力都從業(yè)務(wù)側(cè)的同學(xué)轉(zhuǎn)移到我們這邊,比如:發(fā)送的消息如何同步,如何避免消費(fèi)端切換后,重復(fù)消費(fèi)或者漏掉消息未消費(fèi)。


          遷移過程


          遷移基本流程正如前面流程圖展示的:數(shù)據(jù)同步->遷移生產(chǎn)端/消費(fèi)端。


          生產(chǎn)端和消費(fèi)端沒有先后切換新集群上的順序要求,但是如果先把生產(chǎn)端切換到新集群,消費(fèi)端就無法從老集群繼續(xù)消費(fèi)消息了,需要在消息的過期時(shí)間內(nèi),趕緊也切換到新集群。


          但是如果消費(fèi)端先切,則發(fā)送端可以在之后的任何時(shí)間。


          1、消息同步


          消息同步是第一步,kafka的消息同步工具在業(yè)內(nèi)有做的比較好的商業(yè)版提供,同時(shí)也有開源版本供使用。


          我選取的是官方自帶的kafka-mirror-maker工具。


          但是不能直接拿來用,否則同步過來的消息無法繼續(xù)我下面的方案。


          kafka-mirror-maker的默認(rèn)實(shí)現(xiàn)就是消費(fèi)老集群指定topic的消息并重新發(fā)送到新集群,且發(fā)送的時(shí)候未指定分區(qū)。但是我需要保證新老集群上每條消息在每個(gè)分區(qū)上的順序保持一致,不能出現(xiàn)消息在老集群的分區(qū)0上,同步后被發(fā)送到新集群的分區(qū)1上。


          因此進(jìn)行適當(dāng)改造,如下,構(gòu)造消息的時(shí)候指定發(fā)送的分區(qū):


            private[tools] object defaultMirrorMakerMessageHandler extends MirrorMakerMessageHandler {    override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = {      val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp      Collections.singletonList(new ProducerRecord(record.topic, record.partition, timestamp, record.key, record.value, record.headers))    }  }


          重新編譯打包。


          其實(shí)執(zhí)行同步前,確保要同步的topic,已經(jīng)在新集群創(chuàng)建并且新、老集群的分區(qū)數(shù)保持一致。


          最后,同步的時(shí)候,我們并不需要一次性把所有的topic消息都向新集群同步。有針對(duì)性的處理,要遷移哪個(gè)服務(wù),同步對(duì)應(yīng)該topic的消息,遷移完成,停掉對(duì)應(yīng)的同步進(jìn)程,然后繼續(xù)下個(gè)服務(wù)。


          當(dāng)然為了方便,我同時(shí)開發(fā)了對(duì)應(yīng)的啟停腳本,盡可能方便、規(guī)范的進(jìn)行。


          下面是我計(jì)劃的協(xié)作流程:



          因?yàn)槲覀冏越洪_啟了ACL,老集群并沒有,所以中間涉及到ACL相關(guān)的處理。


          2、消費(fèi)位點(diǎn)同步


          消費(fèi)端切換集群這一步問題也是最多最復(fù)雜的。


          對(duì)于生產(chǎn)者來說,因?yàn)樽髁讼⑼?,新老集群消息及消息在分區(qū)的位置都是一致的,所以直接修改地址切換新集群繼續(xù)發(fā)送即可。


          對(duì)于消費(fèi)端來說,如何在切換到新集群后,還能繼續(xù)相對(duì)于老集群中原來的消費(fèi)位點(diǎn)繼續(xù)消費(fèi)是一個(gè)問題,需要保證消息不會(huì)重復(fù)消費(fèi),也不會(huì)丟失消費(fèi)。所以計(jì)算在新集群中的消費(fèi)位點(diǎn)是很重要的。


          正常情況下,kafka是沒有創(chuàng)建消費(fèi)組的功能,我們現(xiàn)在要做的就是,如果消費(fèi)端切換新集群后,就已經(jīng)知道現(xiàn)在要做的這個(gè)消費(fèi)組要從這個(gè)位點(diǎn)繼續(xù)消費(fèi)了。


          對(duì)于這個(gè)問題,我開發(fā)了一個(gè)功能,可以新增訂閱:



          輸入一個(gè)消費(fèi)組,并選擇集群已經(jīng)存在topic,創(chuàng)建訂閱關(guān)系,示例如下,我創(chuàng)建一個(gè)訂閱test_topic的消費(fèi)組:



          如果現(xiàn)在消費(fèi)組為test_consumer的消費(fèi)端,現(xiàn)在要從老集群切換到新集群繼續(xù)消費(fèi),我們只要保證它不會(huì)出現(xiàn)重復(fù)消費(fèi),也不會(huì)漏掉消息即可。


          所以這里解決方案的關(guān)鍵點(diǎn)就是需要將每個(gè)分區(qū)的消費(fèi)位點(diǎn),進(jìn)行重置,重置的位置就是想當(dāng)于在老集群中消費(fèi)到的消息位點(diǎn)。


          這個(gè)位點(diǎn)計(jì)算還是比較麻煩的,因?yàn)樾吕霞褐懈鞣謪^(qū)位點(diǎn)是一定不一致的。比如對(duì)于test_topic,可能此時(shí)的最小位點(diǎn)是1001(1001以前的消息過期刪除了),日志最大位點(diǎn)是1300,所以現(xiàn)在實(shí)際某個(gè)分區(qū)有300條消息。但是同步到新集群,位點(diǎn)是從0開始,該分區(qū)在新集群最大位點(diǎn)就是300。


          如果同步進(jìn)程運(yùn)行了好久,消費(fèi)端才遷移新集群,新老集群的消息留存時(shí)間又不太一致,都刪過幾次過期消息,則老集群可能日志的最小位點(diǎn)是3001,最大位點(diǎn)是3300,新集群的最小位點(diǎn)是1000,最大位點(diǎn)是2300,兩個(gè)集群現(xiàn)在留存的消息數(shù)都不一致了。


          所以我這里采用了兩階段的同步方案:


          • 準(zhǔn)備同步消息的時(shí)候,進(jìn)行位點(diǎn)標(biāo)記

          • 準(zhǔn)備遷移消費(fèi)端的時(shí)候,進(jìn)行消費(fèi)位點(diǎn)同步




          大概就是上面說明的,通過相關(guān)的標(biāo)記和計(jì)算確保新老集群消費(fèi)的相對(duì)位點(diǎn)在各處場(chǎng)景下都是一致的,同步功能做了必要的效驗(yàn),會(huì)保證同步的時(shí)候環(huán)境是預(yù)期的否則同步不了。如果最后同步階段失敗,則清除新集群中該topic相關(guān)數(shù)據(jù),重新同步,重新執(zhí)行這個(gè)流程即可(兜底解決方案)。


          數(shù)據(jù)同步問題


          數(shù)據(jù)同步過程中也是可能出現(xiàn)問題的,有些難以預(yù)料且致命的,如果消費(fèi)端還沒遷移,我們可以清除數(shù)據(jù),重新同步,影響不大。


          如果在同步過程中,同步進(jìn)程掛掉,重啟可能導(dǎo)致新老集群數(shù)據(jù)不一致。因?yàn)橥揭彩且贿呄M(fèi)老集群一邊發(fā)送到新集群,所以無法保證在掛掉重啟的時(shí)候,是否會(huì)重復(fù)同步那一批次消息。


          這個(gè)時(shí)候也是有解決方案的:


          上面說過,在同步前進(jìn)行了相關(guān)位移對(duì)齊,這時(shí)候可以查看相關(guān)對(duì)齊信息,人工重置用來同步的消費(fèi)組的消費(fèi)位點(diǎn)來保證消息一致。


          最后,如何確認(rèn)新老集群的消息是一致。這個(gè)時(shí)候假定同步過程,消息體沒有出問題(這個(gè)出問題確實(shí)不好校驗(yàn),認(rèn)命了),只要查看對(duì)齊記錄里的位移信息進(jìn)行計(jì)算,確認(rèn)從同步一來,新老集群每個(gè)分區(qū)的消息數(shù)都是一樣,就可以確認(rèn)消息一致的(不保證消息體的內(nèi)容也沒問題,比如丟包導(dǎo)致的)。


          作者丨不識(shí)君的荒漠
          來源丨網(wǎng)址:https://blog.csdn.net/x763795151/article/details/121070563

          推薦閱讀:

          世界的真實(shí)格局分析,地球人類社會(huì)底層運(yùn)行原理

          不是你需要中臺(tái),而是一名合格的架構(gòu)師(附各大廠中臺(tái)建設(shè)PPT)

          企業(yè)IT技術(shù)架構(gòu)規(guī)劃方案

          論數(shù)字化轉(zhuǎn)型——轉(zhuǎn)什么,如何轉(zhuǎn)?

          華為干部與人才發(fā)展手冊(cè)(附PPT)

          企業(yè)10大管理流程圖,數(shù)字化轉(zhuǎn)型從業(yè)者必備!

          【中臺(tái)實(shí)踐】華為大數(shù)據(jù)中臺(tái)架構(gòu)分享.pdf

          華為的數(shù)字化轉(zhuǎn)型方法論

          華為如何實(shí)施數(shù)字化轉(zhuǎn)型(附PPT)

          超詳細(xì)280頁Docker實(shí)戰(zhàn)文檔!開放下載

          華為大數(shù)據(jù)解決方案(PPT)

          瀏覽 40
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  91操视频| 国产精品三级片在线观看 | 欧美多人操逼 | 一本色道久久88亚洲精品 | 久久久一卡二卡 |