<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kafka中副本機(jī)制的設(shè)計(jì)和原理

          共 5974字,需瀏覽 12分鐘

           ·

          2020-10-09 18:13

          Kafka中一個(gè)分區(qū)可以擁有多個(gè)副本,副本可分布于多臺(tái)機(jī)器上。而在多個(gè)副本中,只會(huì)有一個(gè)Leader副本與客戶(hù)端交互,也就是讀寫(xiě)數(shù)據(jù)。其他則作為Follower副本,負(fù)責(zé)同步Leader的數(shù)據(jù),當(dāng)Leader宕機(jī)時(shí),從Follower選舉出新的Leader,從而解決分區(qū)單點(diǎn)問(wèn)題。本文將繼續(xù)深入了解Kafka中副本機(jī)制的設(shè)計(jì)和原理。

          好處

          副本機(jī)制的使用在計(jì)算機(jī)的世界里是很常見(jiàn)的,比如MySQL、ZooKeeper、CDN等都有使用副本機(jī)制。使用副本機(jī)制所能帶來(lái)的好處有以下幾種:

          1. 提供數(shù)據(jù)冗余,提高可用性;
          2. 提供擴(kuò)展性,增加讀操作吞吐量;
          3. 改善數(shù)據(jù)局部,降低系統(tǒng)延時(shí)。

          但并不是每個(gè)好處都能獲得,這還是和具體的設(shè)計(jì)有關(guān),比如本文的主角Kafka,只具有第一個(gè)好處,即提高可用性。這是因?yàn)楦北局兄挥蠰eader可以和客戶(hù)端交互,進(jìn)行讀寫(xiě),其他副本是只能同步,不能分擔(dān)讀寫(xiě)壓力。為什么這么設(shè)計(jì)?這和Kafka作為消息系統(tǒng)有關(guān)。比如當(dāng)我們使用生產(chǎn)者成功寫(xiě)入消息后,希望消費(fèi)者能立馬讀取到剛生產(chǎn)的消息,這也被稱(chēng)作“Read-Your-Writes”一致性,可理解為寫(xiě)后立即讀,要實(shí)現(xiàn)這種一致性,如果是只在Leader上讀寫(xiě)是很方便實(shí)現(xiàn)的。而且也同時(shí)保證了“Monotomic Reads”一致性,即單調(diào)讀一致性,不會(huì)出現(xiàn)消息一會(huì)能讀到,一會(huì)讀不到的情況。你可能會(huì)問(wèn),為什么不讓多個(gè)副本都可以讀,來(lái)提高讀操作吞吐量,同時(shí)加入其它機(jī)制來(lái)保證這兩個(gè)一致性。筆者的理解是在Kafka中已經(jīng)引入了分區(qū)和消費(fèi)組機(jī)制,來(lái)提供擴(kuò)展性,提高讀吞吐量,所以這里沒(méi)必要再為了提高讀吞吐量,而讓系統(tǒng)更復(fù)雜。

          ISR副本

          我們已經(jīng)了解到當(dāng)Leader宕機(jī)時(shí),我們要從Follower中選舉出新的Leader,但并不是所有的Follower都有資格參與選舉。因?yàn)橛械腇ollower的同步情況滯后,如果讓他成為L(zhǎng)eader將會(huì)導(dǎo)致消息丟失。而為了避免這個(gè)情況,Kafka引入了ISR(In-Sync Replica)副本的概念,這是一個(gè)集合,里面存放的是和Leader保持同步的副本并含有Leader。這是一個(gè)動(dòng)態(tài)調(diào)整的集合,當(dāng)副本由同步變?yōu)闇髸r(shí)會(huì)從集合中剔除,而當(dāng)副本由滯后變?yōu)橥綍r(shí)又會(huì)加入到集合中。

          那么如何判斷一個(gè)副本是同步還是滯后呢?Kafka在0.9版本之前,是根據(jù)replica.lag.max.messages參數(shù)來(lái)判斷,其含義是同步副本所能落后的最大消息數(shù),當(dāng)Follower上的最大偏移量落后Leader大于replica.lag.max.messages時(shí),就認(rèn)為該副本是不同步的了,會(huì)從ISR中移除。如果ISR的值設(shè)置得過(guò)小,會(huì)導(dǎo)致Follower經(jīng)常被踢出ISR,而如果設(shè)置過(guò)大,則當(dāng)Leader宕機(jī)時(shí),會(huì)造成較多消息的丟失。在實(shí)際使用時(shí),很難給出一個(gè)合理值,這是因?yàn)楫?dāng)生產(chǎn)者為了提高吞吐量而調(diào)大batch.size時(shí),會(huì)發(fā)送更多的消息到Leader上,這時(shí)候如果不增大replica.lag.max.messages,則會(huì)有Follower頻繁被踢出ISR的現(xiàn)象,而當(dāng)Follower發(fā)生Fetch請(qǐng)求同步后,又被加入到ISR中,ISR將頻繁變動(dòng)。鑒于該參數(shù)難以設(shè)定,Kafka在0.9版本引入了一個(gè)新的參數(shù)replica.lag.time.max.ms,默認(rèn)10s,含義是當(dāng)Follower超過(guò)10s沒(méi)發(fā)送Fetch請(qǐng)求同步Leader時(shí),就會(huì)認(rèn)為不同步而被踢出ISR。從時(shí)間維度來(lái)考量,能夠很好地避免生產(chǎn)者發(fā)送大量消息到Leader副本導(dǎo)致分區(qū)ISR頻繁收縮和擴(kuò)張的問(wèn)題。

          Unclear Leader選舉

          當(dāng)ISR集合為空時(shí),即沒(méi)有同步副本(Leader也掛了),無(wú)法選出下一個(gè)Leader,Kafka集群將會(huì)失效。而為了提高可用性,Kafka提供了unclean.leader.election.enable參數(shù),當(dāng)設(shè)置為true且ISR集合為空時(shí),會(huì)進(jìn)行Unclear Leader選舉,允許在非同步副本中選出新的Leader,從而提高Kafka集群的可用性,但這樣會(huì)造成消息丟失。在允許消息丟失的場(chǎng)景中,是可以開(kāi)啟此參數(shù)來(lái)提高可用性的。而其他情況,則不建議開(kāi)啟,而是通過(guò)其他手段來(lái)提高可用性。

          LEO和HW

          下面我們一起了解副本同步的原理。副本的本質(zhì)其實(shí)是一個(gè)消息日志,為了讓副本正常同步,需要通過(guò)一些變量記錄副本的狀態(tài),如下圖所示:

          其中LEO(Last End Offset)記錄了日志的下一條消息偏移量,即當(dāng)前最新消息的偏移量加一。而HW(High Watermark)界定了消費(fèi)者可見(jiàn)的消息,消費(fèi)者可以消費(fèi)小于HW的消息,而大于等于HW的消息將無(wú)法消費(fèi)。HW和LEO的關(guān)系是HW一定小于LEO。下面介紹下HW的概念,其可翻譯為高水位或高水印,這一概念通常用于在流式處理領(lǐng)域(如Flink、Spark等),流式系統(tǒng)將保證在HW為t時(shí)刻時(shí),創(chuàng)建時(shí)間小于等于t時(shí)刻的所有事件都已經(jīng)到達(dá)或可被觀(guān)測(cè)到。而在Kafka中,HW的概念和時(shí)間無(wú)關(guān),而是和偏移量有關(guān),主要目的是為了保證一致性。試想如果一個(gè)消息到達(dá)了Leader,而Follower副本還未來(lái)得及同步,但該消息能已被消費(fèi)者消費(fèi)了,這時(shí)候Leader宕機(jī),F(xiàn)ollower副本中選出新的Leader,消息將丟失,出現(xiàn)不一致的現(xiàn)象。所以Kafka引入HW的概念,當(dāng)消息被同步副本同步完成時(shí),才讓消息可被消費(fèi)。

          上述即是LEO和HW的基本概念,下面我們看下具體是如何工作的。

          每個(gè)副本中都存有LEO和HW,而Leader副本中除了存有自身的LEO和HW,還存儲(chǔ)了其他Follower副本的LEO和HW值,為了區(qū)分我們把Leader上存儲(chǔ)的Follower副本的LEO和HW值叫做遠(yuǎn)程副本的LEO和HW值,如下圖所示:

          之所以這么設(shè)計(jì),是為了HW的更新,Leader需保證HW是ISR副本集合中LEO的最小值。關(guān)于具體的更新,我們分為Follower副本和Leader副本來(lái)看。

          Follower副本更新LEO和HW的時(shí)機(jī)只有向Leader拉取了消息之后,會(huì)用當(dāng)前的偏移量加1來(lái)更新LEO,并且用Leader的HW值和當(dāng)前LEO的最小值來(lái)更新HW:

          CurrentOffset?+?1?->?LEO
          min(LEO,?LeaderHW)?->?HW

          LEO的更新,很好理解。那為什么HW要取LEO和LeaderHW的最小值,為什么不直接取LeaderHW,LeaderHW不是一定大于LEO嗎?我們?cè)谇拔暮?jiǎn)單的提到了LeaderHW是根據(jù)同步副本來(lái)決定,所以L(fǎng)eaderHW一定小于所有同步副本的LEO,而并不一定小于非同步副本的LEO,所以如果一個(gè)非同步副本在拉取消息,那LEO是會(huì)小于LeaderHW的,則應(yīng)用當(dāng)前LEO值來(lái)更新HW。

          說(shuō)完了Follower副本上LEO和HW的更新,下面看Leader副本。

          正常情況下Leader副本的更新時(shí)機(jī)有兩個(gè):一、收到生產(chǎn)者的消息;二、被Follower拉取消息

          當(dāng)收到生產(chǎn)者消息時(shí),會(huì)用當(dāng)前偏移量加1來(lái)更新LEO,然后取LEO和遠(yuǎn)程ISR副本中LEO的最小值更新HW。

          CurrentOffset?+?1?->?LEO
          min(LEO,?RemoteIsrLEO)?->?HW

          而當(dāng)Follower拉取消息時(shí),會(huì)更新Leader上存儲(chǔ)的Follower副本LEO,然后判斷是否需要更新HW,更新的方式和上述相同。

          FollowerLEO?->?RemoteLEO
          min(LEO,?RemoteIsrLEO)?->?HW

          除了這兩種正常情況,而當(dāng)發(fā)生故障時(shí),例如Leader宕機(jī),Follower被選為新的Leader,會(huì)嘗試更新HW。還有副本被踢出ISR時(shí),也會(huì)嘗試更新HW。

          下面我們看下更新LEO和HW的示例,假設(shè)分區(qū)中有兩個(gè)副本,min.insync.replica=1。

          從上述過(guò)程中,我們可以看到remoteLEO、LeaderHW和FollowerHW的更新發(fā)生于Follower更新LEO后的第二輪Fetch請(qǐng)求,而這也意味著,更新需要額外一次Fetch請(qǐng)求。而這也將導(dǎo)致在Leader切換時(shí),會(huì)存在數(shù)據(jù)丟失和數(shù)據(jù)不一致的問(wèn)題。下面是數(shù)據(jù)丟失的示例:

          當(dāng)B作為Follower已經(jīng)Fetch了最新的消息,但是在發(fā)送第二輪Fetch時(shí),未來(lái)得及處理響應(yīng),宕機(jī)了。當(dāng)重啟時(shí),會(huì)根據(jù)HW更新LEO,將發(fā)生日志截?cái)啵1被丟棄。這時(shí)再發(fā)送Fetch請(qǐng)求給A,A宕機(jī)了,則B未能同步到消息m1,同時(shí)B被選為L(zhǎng)eader,而當(dāng)A重啟時(shí),作為Follower同步B的消息時(shí),會(huì)根據(jù)A的HW值更新HW和LEO,因此由2變成了1,也將發(fā)生日志截?cái)啵寻l(fā)送成功的消息m1將永久丟失。

          數(shù)據(jù)不一致的情況如下:

          A作為L(zhǎng)eader,A已寫(xiě)入m0、m1兩條消息,且HW為2,而B(niǎo)作為Follower,只有m0消息,且HW為1。若A、B同時(shí)宕機(jī),且B重啟時(shí),A還未恢復(fù),則B被選為L(zhǎng)eader。

          集群處于上述這種狀態(tài)有兩種情況可能導(dǎo)致,一、宕機(jī)前,B不在ISR中,因此A未待B同步,即更新了HW,且unclear leader為true,允許B成為L(zhǎng)eader;二、宕機(jī)前,B同步了消息m1,且發(fā)送了第二輪Fetch請(qǐng)求,Leader更新HW,但B未將消息m1落地到磁盤(pán),宕機(jī)了,當(dāng)再重啟時(shí),消息m1丟失,只剩m0。

          在B重啟作為L(zhǎng)eader之后,收到消息m2。A宕機(jī)重啟后向成為L(zhǎng)eader的B發(fā)送Fetch請(qǐng)求,發(fā)現(xiàn)自己的HW和B的HW一致,都是2,因此不會(huì)進(jìn)行消息截?cái)啵@也造成了數(shù)據(jù)不一致。

          Leader Epoch

          為了解決HW可能造成的數(shù)據(jù)丟失和數(shù)據(jù)不一致問(wèn)題,Kafka引入了Leader Epoch機(jī)制,在每個(gè)副本日志目錄下都有一個(gè)leader-epoch-checkpoint文件,用于保存Leader Epoch信息,其內(nèi)容示例如下:

          0?0
          1?300
          2?500

          上面每一行為一個(gè)Leader Epoch,分為兩部分,前者Epoch,表示Leader版本號(hào),是一個(gè)單調(diào)遞增的正整數(shù),每當(dāng)Leader變更時(shí),都會(huì)加1,后者StartOffset,為每一代Leader寫(xiě)入的第一條消息的位移。例如第0代Leader寫(xiě)的第一條消息位移為0,而第1代Leader寫(xiě)的第一條消息位移為300,也意味著第0代Leader在寫(xiě)了0-299號(hào)消息后掛了,重新選出了新的Leader。下面我們看下Leader Epoch如何工作:

          1. 當(dāng)副本成為L(zhǎng)eader時(shí)

            當(dāng)收到生產(chǎn)者發(fā)來(lái)的第一條消息時(shí),會(huì)將新的epoch和當(dāng)前LEO添加到leader-epoch-checkpoint文件中。

          2. 當(dāng)副本成為Follower時(shí)

            1. 向Leader發(fā)送LeaderEpochRequest請(qǐng)求,請(qǐng)求內(nèi)容中含有Follower當(dāng)前本地的最新Epoch;
            2. Leader將返回給Follower的響應(yīng)中含有一個(gè)LastOffset,其取值規(guī)則為:
              1. 若FollowerLastEpoch = LeaderLastEpoch,則取Leader LEO;
              2. 否則,取大于FollowerLastEpoch的第一個(gè)Leader Epoch中的StartOffset。
            3. Follower在拿到LastOffset后,若LastOffset < LEO,將截?cái)嗳罩荆?/section>
            4. Follower開(kāi)始正常工作,發(fā)送Fetch請(qǐng)求;

          我們?cè)倩仡櫩聪聰?shù)據(jù)丟失和數(shù)據(jù)不一致的場(chǎng)景,在應(yīng)用了LeaderEpoch后發(fā)生什么改變:

          當(dāng)B作為Follower已經(jīng)Fetch了最新的消息,但是發(fā)送第二輪Fetch時(shí),未來(lái)得及處理響應(yīng),宕機(jī)了。當(dāng)重啟時(shí),會(huì)向A發(fā)送LeaderEpochRequest請(qǐng)求。如果A沒(méi)宕機(jī),由于 FollowerLastEpoch = LeaderLastEpoch,所以將LeaderLEO,即2作為L(zhǎng)astOffset給A,又因?yàn)長(zhǎng)astOffset=LEO,所以不會(huì)截?cái)嗳罩尽_@種情況比較簡(jiǎn)單,而圖中所畫(huà)的情況是A宕機(jī)的情況,沒(méi)返回LeaderEpochRequest的響應(yīng)的情況。這時(shí)候B會(huì)被選作Leader,將當(dāng)前LEO和新的Epoch寫(xiě)進(jìn)leader-epoch-checkpoint文件中。當(dāng)A作為Follower重啟后,發(fā)送LeaderEpochRequest請(qǐng)求,包含最新的epoch值0,當(dāng)B收到請(qǐng)求后,由于FollowerLastEpoch < LeaderLastEpoch,所以會(huì)取大于FollowerLastEpoch的第一個(gè)Leader Epoch中的StartOffset,即2。當(dāng)A收到響應(yīng)時(shí),由于LEO = LastOffset,所以不會(huì)發(fā)生日志截?cái)啵簿筒粫?huì)丟失數(shù)據(jù)。

          下面是數(shù)據(jù)不一致情況:

          A作為L(zhǎng)eader,A已寫(xiě)入m0、m1兩條消息,且HW為2,而B(niǎo)作為Follower,只有消息m0,且HW為1,A、B同時(shí)宕機(jī)。B重啟,被選為L(zhǎng)eader,將寫(xiě)入新的LeaderEpoch(1, 1)。B開(kāi)始工作,收到消息m2時(shí)。這是A重啟,將作為Follower將發(fā)送LeaderEpochRequert(FollowerLastEpoch=0),B返回大于FollowerLastEpoch的第一個(gè)LeaderEpoch的StartOffset,即1,小于當(dāng)前LEO值,所以將發(fā)生日志截?cái)啵l(fā)送Fetch請(qǐng)求,同步消息m2,避免了消息不一致問(wèn)題。

          你可能會(huì)問(wèn),m2消息那豈不是丟失了?是的,m2消息丟失了,但這種情況的發(fā)送的根本原因在于min.insync.replicas的值設(shè)置為1,即沒(méi)有任何其他副本同步的情況下,就認(rèn)為m2消息為已提交狀態(tài)。LeaderEpoch不能解決min.insync.replicas為1帶來(lái)的數(shù)據(jù)丟失問(wèn)題,但是可以解決其所帶來(lái)的數(shù)據(jù)不一致問(wèn)題。而我們之前所說(shuō)能解決的數(shù)據(jù)丟失問(wèn)題,是指消息已經(jīng)成功同步到Follower上,但因HW未及時(shí)更新引起的數(shù)據(jù)丟失問(wèn)題。

          參考

          1. 《Kafka核心技術(shù)與實(shí)戰(zhàn)》

          2. Kafka ISR副本同步機(jī)制:https://objcoding.com/2019/11/05/kafka-isr

          3. 圖解Kafka水印備份機(jī)制:http://objcoding.com/2019/10/31/kafka-hw


          點(diǎn)個(gè)在看支持我吧,轉(zhuǎn)發(fā)就更好了
          瀏覽 56
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产精品婷婷午夜在线观看 | 黄a免费视频网站 | 曰本手机在线 | 国产成人视频九色91 | 波多野结衣中文字幕久久 |