<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          萬(wàn)字長(zhǎng)文,Kafka從入坑到大佬

          共 17558字,需瀏覽 36分鐘

           ·

          2021-07-14 22:26

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

          1、為什么有消息系統(tǒng)

          1、解耦合
          2、異步處理
          例如電商平臺(tái),秒殺活動(dòng)。
          一般流程會(huì)分為:
          1:風(fēng)險(xiǎn)控制、2:庫(kù)存鎖定、3:生成訂單、4:短信通知、5:更新數(shù)據(jù)
          通過消息系統(tǒng)將秒殺活動(dòng)業(yè)務(wù)拆分開,將不急需處理的業(yè)務(wù)放在后面慢慢處理;
          流程改為:
          1:風(fēng)險(xiǎn)控制、2:庫(kù)存鎖定、3:消息系統(tǒng)、4:生成訂單、5:短信通知、6:更新數(shù)據(jù)
          3、流量的控制
          3.1 網(wǎng)關(guān)在接受到請(qǐng)求后,就把請(qǐng)求放入到消息隊(duì)列里面
          3.2 后端的服務(wù)從消息隊(duì)列里面獲取到請(qǐng)求,完成后續(xù)的秒殺處理流程。然后再給用戶返回結(jié)果。
          優(yōu)點(diǎn):控制了流量
          缺點(diǎn):會(huì)讓流程變慢

          2、Kafka核心概念

          生產(chǎn)者:Producer 往Kafka集群生成數(shù)據(jù)
          消費(fèi)者:Consumer 往Kafka里面去獲取數(shù)據(jù),處理數(shù)據(jù)、消費(fèi)數(shù)據(jù)
          Kafka的數(shù)據(jù)是由消費(fèi)者自己去拉去Kafka里面的數(shù)據(jù)
          主題:topic
          分區(qū):partition
          默認(rèn)一個(gè)topic有一個(gè)分區(qū)(partition),自己可設(shè)置多個(gè)分區(qū)(分區(qū)分散存儲(chǔ)在服務(wù)器不同節(jié)點(diǎn)上)
          解決了一個(gè)海量數(shù)據(jù)如何存儲(chǔ)的問題
          例如:有2T的數(shù)據(jù),一臺(tái)服務(wù)器有1T,一個(gè)topic可以分多個(gè)區(qū),分別存儲(chǔ)在多臺(tái)服務(wù)器上,解決海量數(shù)據(jù)存儲(chǔ)問題

          3、Kafka的集群架構(gòu)

          Kafka集群中,一個(gè)kafka服務(wù)器就是一個(gè)broker
          Topic只是邏輯上的概念,partition在磁盤上就體現(xiàn)為一個(gè)目錄
          Consumer Group:消費(fèi)組
          消費(fèi)數(shù)據(jù)的時(shí)候,都必須指定一個(gè)group id,指定一個(gè)組的id
          假定程序A和程序B指定的group id號(hào)一樣,那么兩個(gè)程序就屬于同一個(gè)消費(fèi)組
          特殊:
          比如,有一個(gè)主題topicA
          程序A去消費(fèi)了這個(gè)topicA,那么程序B就不能再去消費(fèi)topicA(程序A和程序B屬于一個(gè)消費(fèi)組)
          再比如程序A已經(jīng)消費(fèi)了topicA里面的數(shù)據(jù),現(xiàn)在還是重新再次消費(fèi)topicA的數(shù)據(jù),是不可以的,但是重新指定一個(gè)group id號(hào)以后,可以消費(fèi)。
          不同消費(fèi)組之間沒有影響。消費(fèi)組需自定義,消費(fèi)者名稱程序自動(dòng)生成(獨(dú)一無(wú)二)。
          Controller:Kafka節(jié)點(diǎn)里面的一個(gè)主節(jié)點(diǎn)。借助zookeeper

          4、Kafka磁盤順序?qū)懕WC寫數(shù)據(jù)性能

          kafka寫數(shù)據(jù):
          順序?qū)懀疟P上寫數(shù)據(jù)時(shí),就是追加數(shù)據(jù),沒有隨機(jī)寫的操作。
          經(jīng)驗(yàn):
          如果一個(gè)服務(wù)器磁盤達(dá)到一定的個(gè)數(shù),磁盤也達(dá)到一定轉(zhuǎn)數(shù),往磁盤里面順序?qū)懀ㄗ芳訉懀?shù)據(jù)的速度和寫內(nèi)存的速度差不多。

          生產(chǎn)者生產(chǎn)消息,經(jīng)過kafka服務(wù)先寫到os cache 內(nèi)存中,然后經(jīng)過sync順序?qū)懙酱疟P上

          5、Kafka零拷貝機(jī)制保證讀數(shù)據(jù)高性能

          消費(fèi)者讀取數(shù)據(jù)流程:
          1. 消費(fèi)者發(fā)送請(qǐng)求給kafka服務(wù)
          2.kafka服務(wù)去os cache緩存讀取數(shù)據(jù)(緩存沒有就去磁盤讀取數(shù)據(jù))
          3.從磁盤讀取了數(shù)據(jù)到os cache緩存中
          4.os cache復(fù)制數(shù)據(jù)到kafka應(yīng)用程序中
          5.kafka將數(shù)據(jù)(復(fù)制)發(fā)送到socket cache中
          6.socket cache通過網(wǎng)卡傳輸給消費(fèi)者

          kafka linux sendfile技術(shù) — 零拷貝
          1. 消費(fèi)者發(fā)送請(qǐng)求給kafka服務(wù)
          2.kafka服務(wù)去os cache緩存讀取數(shù)據(jù)(緩存沒有就去磁盤讀取數(shù)據(jù))
          3.從磁盤讀取了數(shù)據(jù)到os cache緩存中
          4.os cache直接將數(shù)據(jù)發(fā)送給網(wǎng)卡
          5.通過網(wǎng)卡將數(shù)據(jù)傳輸給消費(fèi)者

          6、Kafka日志分段保存

          Kafka中一個(gè)主題,一般會(huì)設(shè)置分區(qū);
          比如創(chuàng)建了一個(gè)topic_a,然后創(chuàng)建的時(shí)候指定了這個(gè)主題有三個(gè)分區(qū)。
          其實(shí)在三臺(tái)服務(wù)器上,會(huì)創(chuàng)建三個(gè)目錄。
          服務(wù)器1(kafka1):
          創(chuàng)建目錄topic_a-0:
          目錄下面是我們文件(存儲(chǔ)數(shù)據(jù)),kafka數(shù)據(jù)就是message,數(shù)據(jù)存儲(chǔ)在log文件里
          .log結(jié)尾的就是日志文件,在kafka中把數(shù)據(jù)文件就叫做日志文件。
          一個(gè)分區(qū)下面默認(rèn)有n多個(gè)日志文件(分段存儲(chǔ)),一個(gè)日志文件默認(rèn)1G

          服務(wù)器2(kafka2):
          創(chuàng)建目錄topic_a-1:
          服務(wù)器3(kafka3):
          創(chuàng)建目錄topic_a-2:

          7、Kafka二分查找定位數(shù)據(jù)

          Kafka里面每一條消息,都有自己的offset(相對(duì)偏移量),存在物理磁盤上面,在position
          Position:物理位置(磁盤上面哪個(gè)地方)
          也就是說一條消息就有兩個(gè)位置:
          offset:相對(duì)偏移量(相對(duì)位置)
          position:磁盤物理位置
          稀疏索引:
          Kafka中采用了稀疏索引的方式讀取索引,kafka每當(dāng)寫入了4k大小的日志(.log),就往index里寫入一個(gè)記錄索引。
          其中會(huì)采用二分查找

          8、高并發(fā)網(wǎng)絡(luò)設(shè)計(jì)(先了解NIO)

          網(wǎng)絡(luò)設(shè)計(jì)部分是kafka中設(shè)計(jì)最好的一個(gè)部分,這也是保證Kafka高并發(fā)、高性能的原因
          對(duì)kafka進(jìn)行調(diào)優(yōu),就得對(duì)kafka原理比較了解,尤其是網(wǎng)絡(luò)設(shè)計(jì)部分

          Reactor網(wǎng)絡(luò)設(shè)計(jì)模式1:

          Reactor網(wǎng)絡(luò)設(shè)計(jì)模式2:

          Reactor網(wǎng)絡(luò)設(shè)計(jì)模式3:

          Kafka超高并發(fā)網(wǎng)絡(luò)設(shè)計(jì):


          9、Kafka冗余副本保證高可用

          在kafka里面分區(qū)是有副本的,注:0.8以前是沒有副本機(jī)制的。
          創(chuàng)建主題時(shí),可以指定分區(qū),也可以指定副本個(gè)數(shù)。
          副本是有角色的:
          leader partition:
          1、寫數(shù)據(jù)、讀數(shù)據(jù)操作都是從leader partition去操作的。
          2、會(huì)維護(hù)一個(gè)ISR(in-sync- replica )列表,但是會(huì)根據(jù)一定的規(guī)則刪除ISR列表里面的值
          生產(chǎn)者發(fā)送來(lái)一個(gè)消息,消息首先要寫入到leader partition中
          寫完了以后,還要把消息寫入到ISR列表里面的其它分區(qū),寫完后才算這個(gè)消息提交
          follower partition:從leader partition同步數(shù)據(jù)。

          10、優(yōu)秀架構(gòu)思考-總結(jié)

          Kafka — 高并發(fā)、高可用、高性能
          高可用:
          多副本機(jī)制
          高并發(fā):
          網(wǎng)絡(luò)架構(gòu)設(shè)計(jì)
          三層架構(gòu):多selector -> 多線程 -> 隊(duì)列的設(shè)計(jì)(NIO)
          高性能:
          寫數(shù)據(jù):
          1. 把數(shù)據(jù)先寫入到OS Cache
          2. 寫到磁盤上面是順序?qū)懀阅芎芨?br style="outline: 0px;">讀數(shù)據(jù):
          1. 根據(jù)稀疏索引,快速定位到要消費(fèi)的數(shù)據(jù)
          2. 零拷貝機(jī)制
          減少數(shù)據(jù)的拷貝
          減少了應(yīng)用程序與操作系統(tǒng)上下文切換

          11、Kafka生產(chǎn)環(huán)境搭建

          11.1 需求場(chǎng)景分析

          電商平臺(tái),需要每天10億請(qǐng)求都要發(fā)送到Kafka集群上面。二八反正,一般評(píng)估出來(lái)問題都不大。
          10億請(qǐng)求 -> 24 過來(lái)的,一般情況下,每天的12:00 到早上8:00 這段時(shí)間其實(shí)是沒有多大的數(shù)據(jù)量的。80%的請(qǐng)求是用的另外16小時(shí)的處理的。
          16個(gè)小時(shí)處理 -> 8億的請(qǐng)求。
          16 * 0.2 = 3個(gè)小時(shí) 處理了8億請(qǐng)求的80%的數(shù)據(jù)

          也就是說6億的數(shù)據(jù)是靠3個(gè)小時(shí)處理完的。
          我們簡(jiǎn)單的算一下高峰期時(shí)候的qps
          6億/3小時(shí) =5.5萬(wàn)/s qps=5.5萬(wàn)

          10億請(qǐng)求 * 50kb = 46T 每天需要存儲(chǔ)46T的數(shù)據(jù)

          一般情況下,我們都會(huì)設(shè)置兩個(gè)副本 46T * 2 = 92T
          Kafka里面的數(shù)據(jù)是有保留的時(shí)間周期,保留最近3天的數(shù)據(jù)。
          92T * 3天 = 276T
          我這兒說的是50kb不是說一條消息就是50kb不是(把日志合并了,多條日志合并在一起),通常情況下,一條消息就幾b,也有可能就是幾百字節(jié)。

          11.2 物理機(jī)數(shù)量評(píng)估

          1)首先分析一下是需要虛擬機(jī)還是物理機(jī)
          像Kafka mysql hadoop這些集群搭建的時(shí)候,我們生產(chǎn)里面都是使用物理機(jī)。
          2)高峰期需要處理的請(qǐng)求總的請(qǐng)求每秒5.5萬(wàn)個(gè),其實(shí)一兩臺(tái)物理機(jī)絕對(duì)是可以抗住的。一般情況下,我們?cè)u(píng)估機(jī)器的時(shí)候,是按照高峰期的4倍的去評(píng)估。
          如果是4倍的話,大概我們集群的能力要準(zhǔn)備到 20萬(wàn)qps。這樣子的集群才是比較安全的集群。
          大概就需要5臺(tái)物理機(jī)。每臺(tái)承受4萬(wàn)請(qǐng)求。

          場(chǎng)景總結(jié):
          搞定10億請(qǐng)求,高峰期5.5萬(wàn)的qps,276T的數(shù)據(jù),需要5臺(tái)物理機(jī)。

          11.3 磁盤選擇

          搞定10億請(qǐng)求,高峰期5.5萬(wàn)的qps,276T的數(shù)據(jù),需要5臺(tái)物理機(jī)。
          1)SSD固態(tài)硬盤,還是需要普通的機(jī)械硬盤
          SSD硬盤:性能比較好,但是價(jià)格貴
          SAS盤:某方面性能不是很好,但是比較便宜。
          SSD硬盤性能比較好,指的是它隨機(jī)讀寫的性能比較好。適合MySQL這樣集群。
          但是其實(shí)他的順序?qū)懙男阅芨鶶AS盤差不多。
          kafka的理解:
          就是用的順序?qū)憽K晕覀兙陀闷胀ǖ摹緳C(jī)械硬盤】就可以了。

          2)需要我們?cè)u(píng)估每臺(tái)服務(wù)器需要多少塊磁盤
          5臺(tái)服務(wù)器,一共需要276T ,大約每臺(tái)服務(wù)器 需要存儲(chǔ)60T的數(shù)據(jù)。
          我們公司里面服務(wù)器的配置用的是 11塊硬盤,每個(gè)硬盤 7T。
          11 * 7T = 77T

          77T * 5 臺(tái)服務(wù)器 = 385T。

          場(chǎng)景總結(jié):
          搞定10億請(qǐng)求,需要5臺(tái)物理機(jī),11(SAS) * 7T

          11.4 內(nèi)存評(píng)估

          搞定10億請(qǐng)求,需要5臺(tái)物理機(jī),11(SAS) * 7T

          我們發(fā)現(xiàn)kafka讀寫數(shù)據(jù)的流程 都是基于os cache,換句話說假設(shè)咱們的os cashe無(wú)限大那么整個(gè)kafka是不是相當(dāng)于就是基于內(nèi)存去操作,如果是基于內(nèi)存去操作,性能肯定很好。內(nèi)存是有限的。
          1) 盡可能多的內(nèi)存資源要給 os cache
          2) Kafka的代碼用 核心的代碼用的是scala寫的,客戶端的代碼java寫的。
          都是基于jvm。所以我們還要給一部分的內(nèi)存給jvm。
          Kafka的設(shè)計(jì),沒有把很多數(shù)據(jù)結(jié)構(gòu)都放在jvm里面。所以我們的這個(gè)jvm不需要太大的內(nèi)存。
          根據(jù)經(jīng)驗(yàn),給個(gè)10G就可以了。

          NameNode:
             jvm里面還放了元數(shù)據(jù)(幾十G),JVM一定要給得很大。比如給個(gè)100G。

          假設(shè)我們這個(gè)10請(qǐng)求的這個(gè)項(xiàng)目,一共會(huì)有100個(gè)topic。
          100 topic * 5 partition * 2 = 1000 partition
          一個(gè)partition其實(shí)就是物理機(jī)上面的一個(gè)目錄,這個(gè)目錄下面會(huì)有很多個(gè).log的文件。
          .log就是存儲(chǔ)數(shù)據(jù)文件,默認(rèn)情況下一個(gè).log文件的大小是1G。
          我們?nèi)绻WC 1000個(gè)partition 的最新的.log 文件的數(shù)據(jù) 如果都在內(nèi)存里面,這個(gè)時(shí)候性能就是最好。1000 * 1G = 1000G內(nèi)存.
          我們只需要把當(dāng)前最新的這個(gè)log 保證里面的25%的最新的數(shù)據(jù)在內(nèi)存里面。
          250M * 1000 = 0.25 G* 1000 =250G的內(nèi)存。

          250內(nèi)存 / 5 = 50G內(nèi)存
          50G+10G = 60G內(nèi)存

          64G的內(nèi)存,另外的4G,操作系統(tǒng)本生是不是也需要內(nèi)存。
          其實(shí)Kafka的jvm也可以不用給到10G這么多。
          評(píng)估出來(lái)64G是可以的。
          當(dāng)然如果能給到128G的內(nèi)存的服務(wù)器,那就最好。

          我剛剛評(píng)估的時(shí)候用的都是一個(gè)topic是5個(gè)partition,但是如果是數(shù)據(jù)量比較大的topic,可能會(huì)有10個(gè)partition。

          總結(jié):
          搞定10億請(qǐng)求,需要5臺(tái)物理機(jī),11(SAS) * 7T ,需要64G的內(nèi)存(128G更好)

          11.5 CPU壓力評(píng)估

          評(píng)估一下每臺(tái)服務(wù)器需要多少cpu core(資源很有限)

          我們?cè)u(píng)估需要多少個(gè)cpu ,依據(jù)就是看我們的服務(wù)里面有多少線程去跑。
          線程就是依托cpu 去運(yùn)行的。
          如果我們的線程比較多,但是cpu core比較少,這樣的話,我們的機(jī)器負(fù)載就會(huì)很高,性能不就不好。

          1. 評(píng)估一下,kafka的一臺(tái)服務(wù)器 啟動(dòng)以后會(huì)有多少線程?

            所以大概一個(gè)Kafka的服務(wù)啟動(dòng)起來(lái)以后,會(huì)有一百多個(gè)線程。

            cpu core = 4個(gè),一遍來(lái)說,幾十個(gè)線程,就肯定把cpu 打滿了。
            cpu core = 8個(gè),應(yīng)該很輕松的能支持幾十個(gè)線程。
            如果我們的線程是100多個(gè),或者差不多200個(gè),那么8 個(gè) cpu core是搞不定的。
            所以我們這兒建議:
            CPU core = 16個(gè)。如果可以的話,能有32個(gè)cpu core 那就最好。

            結(jié)論:
            kafka集群,最低也要給16個(gè)cpu core,如果能給到32 cpu core那就更好。
            2cpu * 8 =16 cpu core
            4cpu * 8 = 32 cpu core

            1. Acceptor線程 1

            2. processor線程 3 6~9個(gè)線程

            3. 處理請(qǐng)求線程 8個(gè) 32個(gè)線程

            4. 定時(shí)清理的線程,拉取數(shù)據(jù)的線程,定時(shí)檢查ISR列表的機(jī)制 等等。

          總結(jié):
          搞定10億請(qǐng)求,需要5臺(tái)物理機(jī),11(SAS) * 7T ,需要64G的內(nèi)存(128G更好),需要16個(gè)cpu core(32個(gè)更好)

          11.6 網(wǎng)絡(luò)需求評(píng)估

          評(píng)估我們需要什么樣網(wǎng)卡?
          一般要么是千兆的網(wǎng)卡(1G/s),還有的就是萬(wàn)兆的網(wǎng)卡(10G/s)

          高峰期的時(shí)候 每秒會(huì)有5.5萬(wàn)的請(qǐng)求涌入,5.5/5 = 大約是每臺(tái)服務(wù)器會(huì)有1萬(wàn)個(gè)請(qǐng)求涌入。
          我們之前說的,
          10000 * 50kb = 488M  也就是每條服務(wù)器,每秒要接受488M的數(shù)據(jù)。數(shù)據(jù)還要有副本,副本之間的同步
          也是走的網(wǎng)絡(luò)的請(qǐng)求。488 * 2 = 976m/s
          說明一下:
             很多公司的數(shù)據(jù),一個(gè)請(qǐng)求里面是沒有50kb這么大的,我們公司是因?yàn)橹鳈C(jī)在生產(chǎn)端封裝了數(shù)據(jù)
             然后把多條數(shù)據(jù)合并在一起了,所以我們的一個(gè)請(qǐng)求才會(huì)有這么大。
             
          說明一下:
             一般情況下,網(wǎng)卡的帶寬是達(dá)不到極限的,如果是千兆的網(wǎng)卡,我們能用的一般就是700M左右。
             但是如果最好的情況,我們還是使用萬(wàn)兆的網(wǎng)卡。
             如果使用的是萬(wàn)兆的,那就是很輕松。

          11.7 集群規(guī)劃

          請(qǐng)求量
          規(guī)劃物理機(jī)的個(gè)數(shù)
          分析磁盤的個(gè)數(shù),選擇使用什么樣的磁盤
          內(nèi)存
          cpu core
          網(wǎng)卡

          就是告訴大家,以后要是公司里面有什么需求,進(jìn)行資源的評(píng)估,服務(wù)器的評(píng)估,大家按照我的思路去評(píng)估。

          一條消息的大小 50kb -> 1kb 500byte 1M

          ip 主機(jī)名
          192.168.0.100 hadoop1
          192.168.0.101 hadoop2
          192.168.0.102 hadoop3

          主機(jī)的規(guī)劃:
          kafka集群架構(gòu)的時(shí)候:
          主從式的架構(gòu):
          controller -> 通過zk集群來(lái)管理整個(gè)集群的元數(shù)據(jù)。

          1. zookeeper集群
            hadoop1 hadoop2 hadoop3

          2. kafka集群
            理論上來(lái)講,我們不應(yīng)該把kafka的服務(wù)于zk的服務(wù)安裝在一起。
            但是我們這兒服務(wù)器有限。
            所以我們kafka集群也是安裝在hadoop1 haadoop2 hadoop3

          11.8 zookeeper集群搭建

          11.9 核心參數(shù)詳解

          11.10 集群壓力測(cè)試

          12、kafka運(yùn)維

          12.1 常見運(yùn)維工具介紹

          KafkaManager — 頁(yè)面管理工具

          12.2 常見運(yùn)維命令

          場(chǎng)景一:topic數(shù)據(jù)量太大,要增加topic數(shù)
          一開始創(chuàng)建主題的時(shí)候,數(shù)據(jù)量不大,給的分區(qū)數(shù)不多。
          kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic test6

          kafka-topics.sh --alter --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --partitions 3 --topic test6

          broker id:
          hadoop1:0
          hadoop2:1
          hadoop3:2

          假設(shè)一個(gè)partition有三個(gè)副本:
          partition0:
          a,b,c

          a:leader partition
          b,c:follower partition

          ISR:{a,b,c}
          如果一個(gè)follower分區(qū) 超過10秒 沒有向leader partition去拉取數(shù)據(jù),那么這個(gè)分區(qū)就從ISR列表里面移除。

          leader patition ->

          場(chǎng)景二:核心topic增加副本因子
          如果對(duì)核心業(yè)務(wù)數(shù)據(jù)需要增加副本因子
          vim test.json腳本,將下面一行json腳本保存

          {“version”:1,“partitions”:[{“topic”:“test6”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:1,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:2,“replicas”:[0,1,2]}]}

          執(zhí)行上面json腳本:
          kafka-reassign-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --reassignment-json-file test.json --execute

          場(chǎng)景三:負(fù)載不均衡的topic,手動(dòng)遷移
          vi topics-to-move.json

          {“topics”: [{“topic”: “test01”}, {“topic”: “test02”}], “version”: 1} // 把你所有的topic都寫在這里

          kafka-reassgin-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topics-to-move-json-file topics-to-move.json --broker-list “5,6” --generate
          // 把你所有的包括新加入的broker機(jī)器都寫在這里,就會(huì)說是把所有的partition均勻的分散在各個(gè)broker上,包括新進(jìn)來(lái)的broker

          此時(shí)會(huì)生成一個(gè)遷移方案,可以保存到一個(gè)文件里去:expand-cluster-reassignment.json

          kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

          kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

          這種數(shù)據(jù)遷移操作一定要在晚上低峰的時(shí)候來(lái)做,因?yàn)樗麜?huì)在機(jī)器之間遷移數(shù)據(jù),非常的占用帶寬資源
          –generate: 根據(jù)給予的Topic列表和Broker列表生成遷移計(jì)劃。generate并不會(huì)真正進(jìn)行消息遷移,而是將消息遷移計(jì)劃計(jì)算出來(lái),供execute命令使用。
          –execute: 根據(jù)給予的消息遷移計(jì)劃進(jìn)行遷移。
          –verify: 檢查消息是否已經(jīng)遷移完成。

          場(chǎng)景四:如果某個(gè)broker leader partition過多
          正常情況下,我們的leader partition在服務(wù)器之間是負(fù)載均衡。
          hadoop1 4
          hadoop2 1
          hadoop3 1

          現(xiàn)在各個(gè)業(yè)務(wù)方可以自行申請(qǐng)創(chuàng)建topic,分區(qū)數(shù)量都是自動(dòng)分配和后續(xù)動(dòng)態(tài)調(diào)整的,
          kafka本身會(huì)自動(dòng)把leader partition均勻分散在各個(gè)機(jī)器上,這樣可以保證每臺(tái)機(jī)器的讀寫吞吐量都是均勻的
          但是也有例外,那就是如果某些broker宕機(jī),會(huì)導(dǎo)致leader partition過于集中在其他少部分幾臺(tái)broker上,
          這會(huì)導(dǎo)致少數(shù)幾臺(tái)broker的讀寫請(qǐng)求壓力過高,其他宕機(jī)的broker重啟之后都是folloer partition,讀寫請(qǐng)求很低,
          造成集群負(fù)載不均衡有一個(gè)參數(shù),auto.leader.rebalance.enable,默認(rèn)是true,
          每隔300秒(leader.imbalance.check.interval.seconds)檢查leader負(fù)載是否平衡
          如果一臺(tái)broker上的不均衡的leader超過了10%,leader.imbalance.per.broker.percentage,
          就會(huì)對(duì)這個(gè)broker進(jìn)行選舉
          配置參數(shù):
          auto.leader.rebalance.enable 默認(rèn)是true
          leader.imbalance.per.broker.percentage: 每個(gè)broker允許的不平衡的leader的比率。如果每個(gè)broker超過了這個(gè)值,控制器會(huì)觸發(fā)leader的平衡。這個(gè)值表示百分比。10%
          leader.imbalance.check.interval.seconds:默認(rèn)值300秒

          13、Kafka生產(chǎn)者

          13.1 消費(fèi)者發(fā)送消息原理

          13.2 消費(fèi)者發(fā)送消息原理—基礎(chǔ)案例演示

          13.3 如何提升吞吐量

          如何提升吞吐量:
          參數(shù)一:buffer.memory:
          設(shè)置發(fā)送消息的緩沖區(qū),默認(rèn)值是33554432,就是32MB
          參數(shù)二:compression.type:
          默認(rèn)是none,不壓縮,但是也可以使用lz4壓縮,效率還是不錯(cuò)的,壓縮之后可以減小數(shù)據(jù)量,提升吞吐量,但是會(huì)加大producer端的cpu開銷
          參數(shù)三:batch.size:
          設(shè)置batch的大小,如果batch太小,會(huì)導(dǎo)致頻繁網(wǎng)絡(luò)請(qǐng)求,吞吐量下降;
          如果batch太大,會(huì)導(dǎo)致一條消息需要等待很久才能被發(fā)送出去,而且會(huì)讓內(nèi)存緩沖區(qū)有很大壓力,過多數(shù)據(jù)緩沖在內(nèi)存里,默認(rèn)值是:16384,就是16kb,也就是一個(gè)batch滿了16kb就發(fā)送出去,一般在實(shí)際生產(chǎn)環(huán)境,這個(gè)batch的值可以增大一些來(lái)提升吞吐量,如果一個(gè)批次設(shè)置大了,會(huì)有延遲。一般根據(jù)一條消息大小來(lái)設(shè)置。
          如果我們消息比較少。配合使用的參數(shù)linger.ms,這個(gè)值默認(rèn)是0,意思就是消息必須立即被發(fā)送,但是這是不對(duì)的,一般設(shè)置一個(gè)100毫秒之類的,這樣的話就是說,這個(gè)消息被發(fā)送出去后進(jìn)入一個(gè)batch,如果100毫秒內(nèi),這個(gè)batch滿了16kb,自然就會(huì)發(fā)送出去。

          13.4 如何處理異常

          1、LeaderNotAvailableException:
          這個(gè)就是如果某臺(tái)機(jī)器掛了,此時(shí)leader副本不可用,會(huì)導(dǎo)致你寫入失敗,要等待其他follower副本切換為leader副本之后,才能繼續(xù)寫入,此時(shí)可以重試發(fā)送即可;如果說你平時(shí)重啟kafka的broker進(jìn)程,肯定會(huì)導(dǎo)致leader切換,一定會(huì)導(dǎo)致你寫入報(bào)錯(cuò),是LeaderNotAvailableException。
          2、NotControllerException:
          這個(gè)也是同理,如果說Controller所在Broker掛了,那么此時(shí)會(huì)有問題,需要等待Controller重新選舉,此時(shí)也是一樣就是重試即可。
          3、NetworkException:網(wǎng)絡(luò)異常 timeout
          a. 配置retries參數(shù),他會(huì)自動(dòng)重試的
          b. 但是如果重試幾次之后還是不行,就會(huì)提供Exception給我們來(lái)處理了,我們獲取到異常以后,再對(duì)這個(gè)消息進(jìn)行單獨(dú)處理。我們會(huì)有備用的鏈路。發(fā)送不成功的消息發(fā)送到Redis或者寫到文件系統(tǒng)中,甚至是丟棄。

          13.5 重試機(jī)制

          重試會(huì)帶來(lái)一些問題:

          1. 消息會(huì)重復(fù)
            有的時(shí)候一些leader切換之類的問題,需要進(jìn)行重試,設(shè)置retries即可,但是消息重試會(huì)導(dǎo)致,重復(fù)發(fā)送的問題,比如說網(wǎng)絡(luò)抖動(dòng)一下導(dǎo)致他以為沒成功,就重試了,其實(shí)人家都成功了.

          2. 消息亂序
            消息重試是可能導(dǎo)致消息的亂序的,因?yàn)榭赡芘旁谀愫竺娴南⒍及l(fā)送出去了。
            所以可以使用"max.in.flight.requests.per.connection"參數(shù)設(shè)置為1,
            這樣可以保證producer同一時(shí)間只能發(fā)送一條消息。
            兩次重試的間隔默認(rèn)是100毫秒,用"retry.backoff.ms"來(lái)進(jìn)行設(shè)置
            基本上在開發(fā)過程中,靠重試機(jī)制基本就可以搞定95%的異常問題。

          13.6 ACK參數(shù)詳解

          producer端設(shè)置的
          request.required.acks=0;
          只要請(qǐng)求已發(fā)送出去,就算是發(fā)送完了,不關(guān)心有沒有寫成功。
          性能很好,如果是對(duì)一些日志進(jìn)行分析,可以承受丟數(shù)據(jù)的情況,用這個(gè)參數(shù),性能會(huì)很好。
          request.required.acks=1;
          發(fā)送一條消息,當(dāng)leader partition寫入成功以后,才算寫入成功。
          不過這種方式也有丟數(shù)據(jù)的可能。
          request.required.acks=-1;
          需要ISR列表里面,所有副本都寫完以后,這條消息才算寫入成功。
          ISR:1個(gè)副本。1 leader partition 1 follower partition
          kafka服務(wù)端:
          min.insync.replicas:1, 如果我們不設(shè)置的話,默認(rèn)這個(gè)值是1
          一個(gè)leader partition會(huì)維護(hù)一個(gè)ISR列表,這個(gè)值就是限制ISR列表里面
          至少得有幾個(gè)副本,比如這個(gè)值是2,那么當(dāng)ISR列表里面只有一個(gè)副本的時(shí)候。
          往這個(gè)分區(qū)插入數(shù)據(jù)的時(shí)候會(huì)報(bào)錯(cuò)。
          設(shè)計(jì)一個(gè)不丟數(shù)據(jù)的方案:
          數(shù)據(jù)不丟失的方案:
          1)分區(qū)副本 >=2
          2)acks = -1
          3)min.insync.replicas >=2
          還有可能就是發(fā)送有異常:對(duì)異常進(jìn)行處理

          13.7 自定義分區(qū)

          分區(qū):
          1、沒有設(shè)置key
          我們的消息就會(huì)被輪訓(xùn)的發(fā)送到不同的分區(qū)。
          2、設(shè)置了key
          kafka自帶的分區(qū)器,會(huì)根據(jù)key計(jì)算出來(lái)一個(gè)hash值,這個(gè)hash值會(huì)對(duì)應(yīng)某一個(gè)分區(qū)。
          如果key相同的,那么hash值必然相同,key相同的值,必然是會(huì)被發(fā)送到同一個(gè)分區(qū)。
          但是有些比較特殊的時(shí)候,我們就需要自定義分區(qū)
          public class HotDataPartitioner implements Partitioner {
          private Random random;
          @Override
          public void configure(Map<String, ?> configs) {
          random = new Random();
          }
          @Override
          public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
          String key = (String)keyObj;
          List partitionInfoList = cluster.availablePartitionsForTopic(topic);
          //獲取到分區(qū)的個(gè)數(shù) 0,1,2
          int partitionCount = partitionInfoList.size();
          //最后一個(gè)分區(qū)
          int hotDataPartition = partitionCount - 1;
          return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;
          }
          }
          如何使用:
          配置上這個(gè)類即可:props.put(”partitioner.class”, “com.zhss.HotDataPartitioner”);

          13.8 綜合案例演示

          需求分析:
          電商背景 -》 二手的電商平臺(tái)
          【歡樂送】的項(xiàng)目,用戶購(gòu)買了東西以后會(huì)有【星星】,用星星去換物品。一塊錢一個(gè)星星。

          訂單系統(tǒng)(消息的生產(chǎn)),發(fā)送一條消息(支付訂單,取消訂單) -> Kafka <- 會(huì)員系統(tǒng),從kafak里面去消費(fèi)數(shù)據(jù),找到對(duì)應(yīng)用戶消費(fèi)的金額
          然后給該用戶更新星星的數(shù)量。

          分析一下:
          發(fā)送消息的時(shí)候,可以指定key,也可以不指定key.
          1)如果不指定key
          zhangsan ->下訂單 -> 100 -> +100
          zhangsan -> 取消訂單 -> -100 -> -100
          會(huì)員系統(tǒng)消費(fèi)數(shù)據(jù)的時(shí)候,有可能先消費(fèi)到的是 取消訂單的數(shù)據(jù)。

          2)如果指定key,key -> hash(數(shù)字) -> 對(duì)應(yīng)分區(qū)號(hào) -> 發(fā)送到對(duì)應(yīng)的分區(qū)里面。
          如果key相同的 -> 數(shù)據(jù)肯定會(huì)被發(fā)送到同一個(gè)分區(qū)(有序的)
          這個(gè)項(xiàng)目需要指定key,把用戶的id指定為key.

          14、Kafka消費(fèi)者

          14.1 消費(fèi)組概念

          groupid相同就屬于同一個(gè)消費(fèi)組
          1)每個(gè)consumer都要屬于一個(gè)consumer.group,就是一個(gè)消費(fèi)組,topic的一個(gè)分區(qū)只會(huì)分配給
          一個(gè)消費(fèi)組下的一個(gè)consumer來(lái)處理,每個(gè)consumer可能會(huì)分配多個(gè)分區(qū),也有可能某個(gè)consumer沒有分配到任何分區(qū)
          2)如果想要實(shí)現(xiàn)一個(gè)廣播的效果,那只需要使用不同的group id去消費(fèi)就可以。
          topicA:
          partition0、partition1
          groupA:
          consumer1:消費(fèi) partition0
          consuemr2:消費(fèi) partition1
          consuemr3:消費(fèi)不到數(shù)據(jù)
          groupB:
          consuemr3:消費(fèi)到partition0和partition1
          3)如果consumer group中某個(gè)消費(fèi)者掛了,此時(shí)會(huì)自動(dòng)把分配給他的分區(qū)交給其他的消費(fèi)者,如果他又重啟了,那么又會(huì)把一些分區(qū)重新交還給他

          14.2 基礎(chǔ)案例演示

          14.3 偏移量管理

          1. 每個(gè)consumer內(nèi)存里數(shù)據(jù)結(jié)構(gòu)保存對(duì)每個(gè)topic的每個(gè)分區(qū)的消費(fèi)offset,定期會(huì)提交offset,老版本是寫入zk,但是那樣高并發(fā)請(qǐng)求zk是不合理的架構(gòu)設(shè)計(jì),zk是做分布式系統(tǒng)的協(xié)調(diào)的,輕量級(jí)的元數(shù)據(jù)存儲(chǔ),不能負(fù)責(zé)高并發(fā)讀寫,作為數(shù)據(jù)存儲(chǔ)

          2. 現(xiàn)在新的版本提交offset發(fā)送給kafka內(nèi)部topic:__consumer_offsets,提交過去的時(shí)候,
            key是group.id+topic+分區(qū)號(hào),value就是當(dāng)前offset的值,每隔一段時(shí)間,kafka內(nèi)部會(huì)對(duì)這個(gè)topic進(jìn)行compact(合并),也就是每個(gè)group.id+topic+分區(qū)號(hào)就保留最新數(shù)據(jù)。

          3. __consumer_offsets可能會(huì)接收高并發(fā)的請(qǐng)求,所以默認(rèn)分區(qū)50個(gè)(leader partitiron -> 50 kafka),這樣如果你的kafka部署了一個(gè)大的集群,比如有50臺(tái)機(jī)器,就可以用50臺(tái)機(jī)器來(lái)抗offset提交的請(qǐng)求壓力.
            消費(fèi)者 -> broker端的數(shù)據(jù)
            message -> 磁盤 -> offset 順序遞增
            從哪兒開始消費(fèi)?-> offset
            消費(fèi)者(offset)

          14.4 偏移量監(jiān)控工具介紹

          1. web頁(yè)面管理的一個(gè)管理軟件(kafka Manager)
            修改bin/kafka-run-class.sh腳本,第一行增加JMX_PORT=9988
            重啟kafka進(jìn)程

          2. 另一個(gè)軟件:主要監(jiān)控的consumer的偏移量。就是一個(gè)jar包
            java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar 
            com.quantifind.kafka.offsetapp.OffsetGetterWeb 
            –offsetStorage kafka \(根據(jù)版本:偏移量存在kafka就填kafka,存在zookeeper就填zookeeper)
            –zk hadoop1:2181 
            –port 9004 
            –refresh 15.seconds 
            –retain 2.days

          寫了一段程序 ,消費(fèi)kafka里面的數(shù)據(jù)(consumer,處理數(shù)據(jù) -> 業(yè)務(wù)代碼) -> Kafka
          如何去判斷你的這段代碼真的是實(shí)時(shí)的去消費(fèi)的呢?

          延遲幾億條數(shù)據(jù) -> 閾值(20萬(wàn)條的時(shí)候 發(fā)送一個(gè)告警。)

          14.5 消費(fèi)異常感知

          heartbeat.interval.ms:
          consumer心跳時(shí)間間隔,必須得與coordinator保持心跳才能知道consumer是否故障了,
          然后如果故障之后,就會(huì)通過心跳下發(fā)rebalance的指令給其他的consumer通知他們進(jìn)行rebalance的操作
          session.timeout.ms:
          kafka多長(zhǎng)時(shí)間感知不到一個(gè)consumer就認(rèn)為他故障了,默認(rèn)是10秒
          max.poll.interval.ms:
          如果在兩次poll操作之間,超過了這個(gè)時(shí)間,那么就會(huì)認(rèn)為這個(gè)consume處理能力太弱了,會(huì)被踢出消費(fèi)組,分區(qū)分配給別人去消費(fèi),一般來(lái)說結(jié)合業(yè)務(wù)處理的性能來(lái)設(shè)置就可以了。

          14.6 核心參數(shù)解釋

          fetch.max.bytes:
          獲取一條消息最大的字節(jié)數(shù),一般建議設(shè)置大一些,默認(rèn)是1M
          其實(shí)我們?cè)谥岸鄠€(gè)地方都見到過這個(gè)類似的參數(shù),意思就是說一條信息最大能多大?
          1. Producer
          發(fā)送的數(shù)據(jù),一條消息最大多大, -> 10M
          2. Broker
          存儲(chǔ)數(shù)據(jù),一條消息最大能接受多大 -> 10M
          3. Consumer 
          max.poll.records:
          一次poll返回消息的最大條數(shù),默認(rèn)是500條
          connection.max.idle.ms:
          consumer跟broker的socket連接如果空閑超過了一定的時(shí)間,此時(shí)就會(huì)自動(dòng)回收連接,但是下次消費(fèi)就要重新建立socket連接,這個(gè)建議設(shè)置為-1,不要去回收
          enable.auto.commit:
          開啟自動(dòng)提交偏移量
          auto.commit.interval.ms:
          每隔多久提交一次偏移量,默認(rèn)值5000毫秒
          _consumer_offset
          auto.offset.reset:
          earliest
          當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無(wú)提交的offset時(shí),從頭開始消費(fèi)
          topica -> partition0:1000
          partitino1:2000
          latest
          當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開始消費(fèi);無(wú)提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
          none
          topic各分區(qū)都存在已提交的offset時(shí),從offset后開始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常

          14.7 綜合案例演示

          引入案例:二手電商平臺(tái)(歡樂送),根據(jù)用戶消費(fèi)的金額,對(duì)用戶星星進(jìn)行累計(jì)。
          訂單系統(tǒng)(生產(chǎn)者) -> Kafka集群里面發(fā)送了消息。
          會(huì)員系統(tǒng)(消費(fèi)者) -> Kafak集群里面消費(fèi)消息,對(duì)消息進(jìn)行處理。

          14.8 group coordinator原理

          面試題:消費(fèi)者是如何實(shí)現(xiàn)rebalance的?— 根據(jù)coordinator實(shí)現(xiàn)

          1. 什么是coordinator
            每個(gè)consumer group都會(huì)選擇一個(gè)broker作為自己的coordinator,他是負(fù)責(zé)監(jiān)控這個(gè)消費(fèi)組里的各個(gè)消費(fèi)者的心跳,以及判斷是否宕機(jī),然后開啟rebalance的

          2. 如何選擇coordinator機(jī)器
            首先對(duì)groupId進(jìn)行hash(數(shù)字),接著對(duì)__consumer_offsets的分區(qū)數(shù)量取模,默認(rèn)是50,_consumer_offsets的分區(qū)數(shù)可以通過offsets.topic.num.partitions來(lái)設(shè)置,找到分區(qū)以后,這個(gè)分區(qū)所在的broker機(jī)器就是coordinator機(jī)器。

            比如說:groupId,“myconsumer_group” -> hash值(數(shù)字)-> 對(duì)50取模 -> 8
            __consumer_offsets 這個(gè)主題的8號(hào)分區(qū)在哪臺(tái)broker上面,那一臺(tái)就是coordinator
            就知道這個(gè)consumer group下的所有的消費(fèi)者提交offset的時(shí)候是往哪個(gè)分區(qū)去提交offset,

          3. 運(yùn)行流程
            1)每個(gè)consumer都發(fā)送JoinGroup請(qǐng)求到Coordinator,
            2)然后Coordinator從一個(gè)consumer group中選擇一個(gè)consumer作為leader,
            3)把consumer group情況發(fā)送給這個(gè)leader,
            4)接著這個(gè)leader會(huì)負(fù)責(zé)制定消費(fèi)方案,
            5)通過SyncGroup發(fā)給Coordinator
            6)接著Coordinator就把消費(fèi)方案下發(fā)給各個(gè)consumer,他們會(huì)從指定的分區(qū)的
            leader broker開始進(jìn)行socket連接以及消費(fèi)消息

          14.9 rebalance策略

          consumer group靠coordinator實(shí)現(xiàn)了Rebalance

          這里有三種rebalance的策略:range、round-robin、sticky

          比如我們消費(fèi)的一個(gè)主題有12個(gè)分區(qū):
          p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11
          假設(shè)我們的消費(fèi)者組里面有三個(gè)消費(fèi)者

          1. range策略
            range策略就是按照partiton的序號(hào)范圍
            p0~3 consumer1
            p4~7 consumer2
            p8~11 consumer3
            默認(rèn)就是這個(gè)策略;

          2. round-robin策略
            就是輪詢分配
            consumer1:0,3,6,9
            consumer2:1,4,7,10
            consumer3:2,5,8,11
            但是前面的這兩個(gè)方案有個(gè)問題:
            12 -> 2 每個(gè)消費(fèi)者會(huì)消費(fèi)6個(gè)分區(qū)

            假設(shè)consuemr1掛了:p0-5分配給consumer2,p6-11分配給consumer3
            這樣的話,原本在consumer2上的的p6,p7分區(qū)就被分配到了 consumer3上。

          3. sticky策略
            最新的一個(gè)sticky策略,就是說盡可能保證在rebalance的時(shí)候,讓原本屬于這個(gè)consumer
            的分區(qū)還是屬于他們,然后把多余的分區(qū)再均勻分配過去,這樣盡可能維持原來(lái)的分區(qū)分配的策略

          consumer1:0-3
          consumer2: 4-7
          consumer3: 8-11
          假設(shè)consumer3掛了
          consumer1:0-3,+8,9
          consumer2: 4-7,+10,11

          15、Broker管理

          15.1 Leo、hw含義

          1. Kafka的核心原理

          2. 如何去評(píng)估一個(gè)集群資源

          3. 搭建了一套kafka集群 -》 介紹了簡(jiǎn)單的一些運(yùn)維管理的操作。

          4. 生產(chǎn)者(使用,核心的參數(shù))

          5. 消費(fèi)者(原理,使用的,核心參數(shù))

          6. broker內(nèi)部的一些原理
            核心的概念:LEO,HW

          LEO:是跟offset偏移量有關(guān)系。

          LEO:
          在kafka里面,無(wú)論leader partition還是follower partition統(tǒng)一都稱作副本(replica)。

          每次partition接收到一條消息,都會(huì)更新自己的LEO,也就是log end offset,LEO其實(shí)就是最新的offset + 1

          HW:高水位
          LEO有一個(gè)很重要的功能就是更新HW,如果follower和leader的LEO同步了,此時(shí)HW就可以更新
          HW之前的數(shù)據(jù)對(duì)消費(fèi)者是可見,消息屬于commit狀態(tài)。HW之后的消息消費(fèi)者消費(fèi)不到。

          15.2 Leo更新

          15.3 hw更新

          15.4 controller如何管理整個(gè)集群

          1: 競(jìng)爭(zhēng)controller的
          /controller/id
          2:controller服務(wù)監(jiān)聽的目錄:
          /broker/ids/ 用來(lái)感知 broker上下線
          /broker/topics/ 創(chuàng)建主題,我們當(dāng)時(shí)創(chuàng)建主題命令,提供的參數(shù),ZK地址。
          /admin/reassign_partitions 分區(qū)重分配
          ……

          15.5 延時(shí)任務(wù)

          kafka的延遲調(diào)度機(jī)制(擴(kuò)展知識(shí))
          我們先看一下kafka里面哪些地方需要有任務(wù)要進(jìn)行延遲調(diào)度。
          第一類延時(shí)的任務(wù):
          比如說producer的acks=-1,必須等待leader和follower都寫完才能返回響應(yīng)。
          有一個(gè)超時(shí)時(shí)間,默認(rèn)是30秒(request.timeout.ms)。
          所以需要在寫入一條數(shù)據(jù)到leader磁盤之后,就必須有一個(gè)延時(shí)任務(wù),到期時(shí)間是30秒延時(shí)任務(wù)
          放到DelayedOperationPurgatory(延時(shí)管理器)中。
          假如在30秒之前如果所有follower都寫入副本到本地磁盤了,那么這個(gè)任務(wù)就會(huì)被自動(dòng)觸發(fā)蘇醒,就可以返回響應(yīng)結(jié)果給客戶端了,
          否則的話,這個(gè)延時(shí)任務(wù)自己指定了最多是30秒到期,如果到了超時(shí)時(shí)間都沒等到,就直接超時(shí)返回異常。
          第二類延時(shí)的任務(wù):
          follower往leader拉取消息的時(shí)候,如果發(fā)現(xiàn)是空的,此時(shí)會(huì)創(chuàng)建一個(gè)延時(shí)拉取任務(wù)
          延時(shí)時(shí)間到了之后(比如到了100ms),就給follower返回一個(gè)空的數(shù)據(jù),然后follower再次發(fā)送請(qǐng)求讀取消息,
          但是如果延時(shí)的過程中(還沒到100ms),leader寫入了消息,這個(gè)任務(wù)就會(huì)自動(dòng)蘇醒,自動(dòng)執(zhí)行拉取任務(wù)。

          海量的延時(shí)任務(wù),需要去調(diào)度。

          15.6 時(shí)間輪機(jī)制

          1. 什么會(huì)有要設(shè)計(jì)時(shí)間輪?
            Kafka內(nèi)部有很多延時(shí)任務(wù),沒有基于JDK Timer來(lái)實(shí)現(xiàn),那個(gè)插入和刪除任務(wù)的時(shí)間復(fù)雜度是O(nlogn),
            而是基于了自己寫的時(shí)間輪來(lái)實(shí)現(xiàn)的,時(shí)間復(fù)雜度是O(1),依靠時(shí)間輪機(jī)制,延時(shí)任務(wù)插入和刪除,O(1)

          2. 時(shí)間輪是什么?
            其實(shí)時(shí)間輪說白其實(shí)就是一個(gè)數(shù)組。
            tickMs:時(shí)間輪間隔 1ms
            wheelSize:時(shí)間輪大小 20
            interval:timckMS * whellSize,一個(gè)時(shí)間輪的總的時(shí)間跨度。20ms
            currentTime:當(dāng)時(shí)時(shí)間的指針。
            a:因?yàn)闀r(shí)間輪是一個(gè)數(shù)組,所以要獲取里面數(shù)據(jù)的時(shí)候,靠的是index,時(shí)間復(fù)雜度是O(1)
            b:數(shù)組某個(gè)位置上對(duì)應(yīng)的任務(wù),用的是雙向鏈表存儲(chǔ)的,往雙向鏈表里面插入,刪除任務(wù),時(shí)間復(fù)雜度也是O(1)
            舉例:插入一個(gè)8ms以后要執(zhí)行的任務(wù)
            19ms
            3.多層級(jí)的時(shí)間輪
            比如:要插入一個(gè)110毫秒以后運(yùn)行的任務(wù)。
            tickMs:時(shí)間輪間隔 20ms
            wheelSize:時(shí)間輪大小 20
            interval:timckMS * whellSize,一個(gè)時(shí)間輪的總的時(shí)間跨度。20ms
            currentTime:當(dāng)時(shí)時(shí)間的指針。
            第一層時(shí)間輪:1ms * 20
            第二層時(shí)間輪:20ms * 20
            第三層時(shí)間輪:400ms * 20


          原文鏈接:

          https://blog.csdn.net/eraining/article/details/115860664




          瀏覽 82
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲国产高清国产精品 | 毛片其地 | 免费观看视频久久 | 99热在线精品播放 | 成人做爰A片AAA毛真人 |