<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          藏在 17 張圖里的 Kafka

          共 17358字,需瀏覽 35分鐘

           ·

          2022-06-19 22:47

          1、為什么有消息系統(tǒng)

          1、解耦合
          2、異步處理

          例如電商平臺,秒殺活動。

          一般流程會分為:

          1. 風險控制
          2. 庫存鎖定
          3. 生成訂單
          4. 短信通知
          5. 更新數(shù)據(jù)

          通過消息系統(tǒng)將秒殺活動業(yè)務拆分開,將不急需處理的業(yè)務放在后面慢慢處理;

          流程改為:

          1. 風險控制
          2. 庫存鎖定
          3. 消息系統(tǒng)
          4. 生成訂單
          5. 短信通知
          6. 更新數(shù)據(jù)
          3、流量的控制

          3.1 網(wǎng)關在接受到請求后,就把請求放入到消息隊列里面

          3.2 后端的服務從消息隊列里面獲取到請求,完成后續(xù)的秒殺處理流程。然后再給用戶返回結(jié)果。

          • 優(yōu)點:控制了流量
          • 缺點:會讓流程變慢

          2、Kafka核心概念

          • 生產(chǎn)者:Producer 往Kafka集群生成數(shù)據(jù)
          • 消費者:Consumer 往Kafka里面去獲取數(shù)據(jù),處理數(shù)據(jù)、消費數(shù)據(jù)

          Kafka的數(shù)據(jù)是由消費者自己去拉去Kafka里面的數(shù)據(jù)

          • 主題:topic
          • 分區(qū):partition

          默認一個topic有一個分區(qū)(partition),自己可設置多個分區(qū)(分區(qū)分散存儲在服務器不同節(jié)點上)

          解決了一個海量數(shù)據(jù)如何存儲的問題

          例如:有2T的數(shù)據(jù),一臺服務器有1T,一個topic可以分多個區(qū),分別存儲在多臺服務器上,解決海量數(shù)據(jù)存儲問題

          3、Kafka的集群架構(gòu)

          Kafka集群中,一個kafka服務器就是一個broker,Topic只是邏輯上的概念,partition在磁盤上就體現(xiàn)為一個目錄。

          Consumer Group:消費組,消費數(shù)據(jù)的時候,都必須指定一個group id,指定一個組的id

          假定程序A和程序B指定的group id號一樣,那么兩個程序就屬于同一個消費組

          特殊:

          • 比如,有一個主題topicA, 程序A去消費了這個topicA,那么程序B就不能再去消費topicA(程序A和程序B屬于一個消費組)
          • 再比如程序A已經(jīng)消費了topicA里面的數(shù)據(jù),現(xiàn)在還是重新再次消費topicA的數(shù)據(jù),是不可以的,但是重新指定一個group id號以后,可以消費。

          不同消費組之間沒有影響。消費組需自定義,消費者名稱程序自動生成(獨一無二)。

          Controller:Kafka節(jié)點里面的一個主節(jié)點。借助zookeeper

          4、Kafka磁盤順序?qū)懕WC寫數(shù)據(jù)性能

          kafka寫數(shù)據(jù):

          順序?qū)懀疟P上寫數(shù)據(jù)時,就是追加數(shù)據(jù),沒有隨機寫的操作。

          經(jīng)驗:

          如果一個服務器磁盤達到一定的個數(shù),磁盤也達到一定轉(zhuǎn)數(shù),往磁盤里面順序?qū)懀ㄗ芳訉懀?shù)據(jù)的速度和寫內(nèi)存的速度差不多。

          生產(chǎn)者生產(chǎn)消息,經(jīng)過kafka服務先寫到os cache 內(nèi)存中,然后經(jīng)過sync順序?qū)懙酱疟P上

          5、Kafka零拷貝機制保證讀數(shù)據(jù)高性能

          消費者讀取數(shù)據(jù)流程:

          1. 消費者發(fā)送請求給kafka服務
          2. kafka服務去os cache緩存讀取數(shù)據(jù)(緩存沒有就去磁盤讀取數(shù)據(jù))
          3. 從磁盤讀取了數(shù)據(jù)到os cache緩存中
          4. os cache復制數(shù)據(jù)到kafka應用程序中
          5. kafka將數(shù)據(jù)(復制)發(fā)送到socket cache中
          6. socket cache通過網(wǎng)卡傳輸給消費者

          kafka linux sendfile技術 — 零拷貝

          1. 消費者發(fā)送請求給kafka服務
          2. kafka服務去os cache緩存讀取數(shù)據(jù)(緩存沒有就去磁盤讀取數(shù)據(jù))
          3. 從磁盤讀取了數(shù)據(jù)到os cache緩存中
          4. os cache直接將數(shù)據(jù)發(fā)送給網(wǎng)卡
          5. 通過網(wǎng)卡將數(shù)據(jù)傳輸給消費者

          6、Kafka日志分段保存

          Kafka中一個主題,一般會設置分區(qū);比如創(chuàng)建了一個topic_a,然后創(chuàng)建的時候指定了這個主題有三個分區(qū)。

          其實在三臺服務器上,會創(chuàng)建三個目錄。

          服務器1(kafka1):

          • 創(chuàng)建目錄topic_a-0:
          • 目錄下面是我們文件(存儲數(shù)據(jù)),kafka數(shù)據(jù)就是message,數(shù)據(jù)存儲在log文件里
          • .log結(jié)尾的就是日志文件,在kafka中把數(shù)據(jù)文件就叫做日志文件。

          一個分區(qū)下面默認有n多個日志文件(分段存儲),一個日志文件默認1G

          服務器2(kafka2):

          • 創(chuàng)建目錄topic_a-1:

          服務器3(kafka3):

          • 創(chuàng)建目錄topic_a-2:

          7、Kafka二分查找定位數(shù)據(jù)

          Kafka里面每一條消息,都有自己的offset(相對偏移量),存在物理磁盤上面,在position

          Position:物理位置(磁盤上面那個地方)

          也就是說一條消息就有兩個位置:

          • offset:相對偏移量(相對位置)
          • position:磁盤物理位置

          稀疏索引:

          • Kafka中采用了稀疏索引的方式讀取索引,kafka每當寫入了4k大小的日志(.log),就往index里寫入一個記錄索引。

          其中會采用二分查找

          8、高并發(fā)網(wǎng)絡設計(先了解NIO)

          網(wǎng)絡設計部分是kafka中設計最好的一個部分,這也是保證Kafka高并發(fā)、高性能的原因

          對kafka進行調(diào)優(yōu),就得對kafka原理比較了解,尤其是網(wǎng)絡設計部分

          Reactor網(wǎng)絡設計模式1:

          Reactor網(wǎng)絡設計模式2:

          Reactor網(wǎng)絡設計模式3:

          Kafka超高并發(fā)網(wǎng)絡設計:


          9、Kafka冗余副本保證高可用

          在kafka里面分區(qū)是有副本的,注:0.8以前是沒有副本機制的。創(chuàng)建主題時,可以指定分區(qū),也可以指定副本個數(shù)。副本是有角色的:

          leader partition:

          • 寫數(shù)據(jù)、讀數(shù)據(jù)操作都是從leader partition去操作的。
          • 會維護一個ISR(in-sync- replica )列表,但是會根據(jù)一定的規(guī)則刪除ISR列表里面的值

          生產(chǎn)者發(fā)送來一個消息,消息首先要寫入到leader partition中

          寫完了以后,還要把消息寫入到ISR列表里面的其它分區(qū),寫完后才算這個消息提交

          follower partition:從leader partition同步數(shù)據(jù)。

          10、優(yōu)秀架構(gòu)思考-總結(jié)

          Kafka — 高并發(fā)、高可用、高性能

          • 高可用:多副本機制
          • 高并發(fā):網(wǎng)絡架構(gòu)設計 三層架構(gòu):多selector -> 多線程 -> 隊列的設計(NIO)
          • 高性能:

          寫數(shù)據(jù):

          1. 把數(shù)據(jù)先寫入到OS Cache
          2. 寫到磁盤上面是順序?qū)?,性能很?/section>

          讀數(shù)據(jù):

          1. 根據(jù)稀疏索引,快速定位到要消費的數(shù)據(jù)
          2. 零拷貝機制
            • 減少數(shù)據(jù)的拷貝
            • 減少了應用程序與操作系統(tǒng)上下文切換

          11、Kafka生產(chǎn)環(huán)境搭建

          11.1 需求場景分析

          電商平臺,需要每天10億請求都要發(fā)送到Kafka集群上面。二八反正,一般評估出來問題都不大。

          10億請求 -> 24 過來的,一般情況下,每天的12:00 到早上8:00 這段時間其實是沒有多大的數(shù)據(jù)量的。80%的請求是用的另外16小時的處理的。16個小時處理 -> 8億的請求。16 * 0.2 = 3個小時 處理了8億請求的80%的數(shù)據(jù)

          也就是說6億的數(shù)據(jù)是靠3個小時處理完的。我們簡單的算一下高峰期時候的qps

          6億/3小時 =5.5萬/s qps=5.5萬

          10億請求 * 50kb = 46T 每天需要存儲46T的數(shù)據(jù)

          一般情況下,我們都會設置兩個副本 46T * 2 = 92T,Kafka里面的數(shù)據(jù)是有保留的時間周期,保留最近3天的數(shù)據(jù)。

          92T * 3天 = 276T

          我這兒說的是50kb不是說一條消息就是50kb不是(把日志合并了,多條日志合并在一起),通常情況下,一條消息就幾b,也有可能就是幾百字節(jié)。

          11.2 物理機數(shù)量評估

          1)首先分析一下是需要虛擬機還是物理機

          像Kafka mysql hadoop這些集群搭建的時候,我們生產(chǎn)里面都是使用物理機。

          2)高峰期需要處理的請求總的請求每秒5.5萬個,其實一兩臺物理機絕對是可以抗住的。一般情況下,我們評估機器的時候,是按照高峰期的4倍的去評估。

          如果是4倍的話,大概我們集群的能力要準備到 20萬qps。這樣子的集群才是比較安全的集群。大概就需要5臺物理機。每臺承受4萬請求。

          場景總結(jié):

          • 搞定10億請求,高峰期5.5萬的qps,276T的數(shù)據(jù),需要5臺物理機。

          11.3 磁盤選擇

          搞定10億請求,高峰期5.5萬的qps,276T的數(shù)據(jù),需要5臺物理機。

          1)SSD固態(tài)硬盤,還是需要普通的機械硬盤

          • SSD硬盤:性能比較好,但是價格貴
          • SAS盤:某方面性能不是很好,但是比較便宜。

          SSD硬盤性能比較好,指的是它隨機讀寫的性能比較好。適合MySQL這樣集群。

          但是其實他的順序?qū)懙男阅芨鶶AS盤差不多。

          kafka的理解:就是用的順序?qū)?。所以我們就用普通的【機械硬盤】就可以了。

          2)需要我們評估每臺服務器需要多少塊磁盤

          5臺服務器,一共需要276T ,大約每臺服務器 需要存儲60T的數(shù)據(jù)。我們公司里面服務器的配置用的是 11塊硬盤,每個硬盤 7T。11 * 7T = 77T

          77T * 5 臺服務器 = 385T

          場景總結(jié):

          • 搞定10億請求,需要5臺物理機,11(SAS) * 7T

          11.4 內(nèi)存評估 搞定10億請求,需要5臺物理機,11(SAS) * 7T

          我們發(fā)現(xiàn)kafka讀寫數(shù)據(jù)的流程 都是基于os cache,換句話說假設咱們的os cashe無限大那么整個kafka是不是相當于就是基于內(nèi)存去操作,如果是基于內(nèi)存去操作,性能肯定很好。內(nèi)存是有限的。

          • 盡可能多的內(nèi)存資源要給 os cache
          • Kafka的代碼用 核心的代碼用的是scala寫的,客戶端的代碼java寫的。都是基于jvm。所以我們還要給一部分的內(nèi)存給jvm。

          Kafka的設計,沒有把很多數(shù)據(jù)結(jié)構(gòu)都放在jvm里面。所以我們的這個jvm不需要太大的內(nèi)存。根據(jù)經(jīng)驗,給個10G就可以了。

          NameNode:jvm里面還放了元數(shù)據(jù)(幾十G),JVM一定要給得很大。比如給個100G。

          假設我們這個10請求的這個項目,一共會有100個topic。100 topic * 5 partition * 2 = 1000 partition

          一個partition其實就是物理機上面的一個目錄,這個目錄下面會有很多個.log的文件。

          • .log就是存儲數(shù)據(jù)文件,默認情況下一個.log文件的大小是1G。

          我們?nèi)绻WC 1000個partition 的最新的.log 文件的數(shù)據(jù) 如果都在內(nèi)存里面,這個時候性能就是最好。1000 * 1G = 1000G內(nèi)存.

          我們只需要把當前最新的這個log 保證里面的25%的最新的數(shù)據(jù)在內(nèi)存里面。250M * 1000 = 0.25 G* 1000 =250G的內(nèi)存。

          • 250內(nèi)存 / 5 = 50G內(nèi)存
          • 50G+10G = 60G內(nèi)存

          64G的內(nèi)存,另外的4G,操作系統(tǒng)本生是不是也需要內(nèi)存。其實Kafka的jvm也可以不用給到10G這么多。評估出來64G是可以的。當然如果能給到128G的內(nèi)存的服務器,那就最好。

          我剛剛評估的時候用的都是一個topic是5個partition,但是如果是數(shù)據(jù)量比較大的topic,可能會有10個partition。

          總結(jié):

          • 搞定10億請求,需要5臺物理機,11(SAS) * 7T ,需要64G的內(nèi)存(128G更好)

          11.5 CPU壓力評估

          評估一下每臺服務器需要多少cpu core(資源很有限)

          我們評估需要多少個cpu ,依據(jù)就是看我們的服務里面有多少線程去跑。線程就是依托cpu 去運行的。如果我們的線程比較多,但是cpu core比較少,這樣的話,我們的機器負載就會很高,性能不就不好。

          評估一下,kafka的一臺服務器 啟動以后會有多少線程?

          • Acceptor線程 1
          • processor線程 3 6~9個線程
          • 處理請求線程 8個 32個線程
          • 定時清理的線程,拉取數(shù)據(jù)的線程,定時檢查ISR列表的機制 等等。

          所以大概一個Kafka的服務啟動起來以后,會有一百多個線程。

          • cpu core = 4個,一遍來說,幾十個線程,就肯定把cpu 打滿了。
          • cpu core = 8個,應該很輕松的能支持幾十個線程。

          如果我們的線程是100多個,或者差不多200個,那么8 個 cpu core是搞不定的。

          所以我們這兒建議:

          • CPU core = 16個。如果可以的話,能有32個cpu core 那就最好。

          結(jié)論:

          • kafka集群,最低也要給16個cpu core,如果能給到32 cpu core那就更好。
          • 2cpu * 8 =16 cpu core
          • 4cpu * 8 = 32 cpu core

          總結(jié):

          • 搞定10億請求,需要5臺物理機, 11(SAS) * 7T ,需要64G的內(nèi)存(128G更好),需要16個cpu core(32個更好)

          11.6 網(wǎng)絡需求評估

          評估我們需要什么樣網(wǎng)卡?

          一般要么是千兆的網(wǎng)卡(1G/s),還有的就是萬兆的網(wǎng)卡(10G/s)

          高峰期的時候 每秒會有5.5萬的請求涌入,5.5/5 = 大約是每臺服務器會有1萬個請求涌入。我們之前說的,10000 * 50kb = 488M  也就是每條服務器,每秒要接受488M的數(shù)據(jù)。數(shù)據(jù)還要有副本,副本之間的同步,也是走的網(wǎng)絡的請求。488 * 2 = 976m/s

          說明一下:

          • 很多公司的數(shù)據(jù),一個請求里面是沒有50kb這么大的,我們公司是因為主機在生產(chǎn)端封裝了數(shù)據(jù),然后把多條數(shù)據(jù)合并在一起了,所以我們的一個請求才會有這么大。
          • 一般情況下,網(wǎng)卡的帶寬是達不到極限的,如果是千兆的網(wǎng)卡,我們能用的一般就是700M左右。但是如果最好的情況,我們還是使用萬兆的網(wǎng)卡。
          • 如果使用的是萬兆的,那就是很輕松。

          11.7 集群規(guī)劃

          • 請求量
          • 規(guī)劃物理機的個數(shù)
          • 分析磁盤的個數(shù),選擇使用什么樣的磁盤
          • 內(nèi)存
          • cpu core
          • 網(wǎng)卡

          就是告訴大家,以后要是公司里面有什么需求,進行資源的評估,服務器的評估,大家按照我的思路去評估。

          一條消息的大小 50kb -> 1kb 500byte 1M

          ip 主機名

          • 192.168.0.100 hadoop1
          • 192.168.0.101 hadoop2
          • 192.168.0.102 hadoop3

          主機的規(guī)劃:kafka集群架構(gòu)的時候:主從式的架構(gòu):

          • controller -> 通過zk集群來管理整個集群的元數(shù)據(jù)。

          zookeeper集群

          • hadoop1
          • hadoop2
          • hadoop3

          kafka集群

          • 理論上來講,我們不應該把kafka的服務于zk的服務安裝在一起。
          • 但是我們這兒服務器有限。所以我們kafka集群也是安裝在hadoop1 haadoop2 hadoop3

          11.8 zookeeper集群搭建

          11.9 核心參數(shù)詳解

          11.10 集群壓力測試

          12、kafka運維

          12.1 常見運維工具介紹

          KafkaManager — 頁面管理工具

          12.2 常見運維命令

          場景一:topic數(shù)據(jù)量太大,要增加topic數(shù)

          一開始創(chuàng)建主題的時候,數(shù)據(jù)量不大,給的分區(qū)數(shù)不多。

          kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic test6

          kafka-topics.sh --alter --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --partitions 3 --topic test6

          broker id:

          • hadoop1:0
          • hadoop2:1
          • hadoop3:2

          假設一個partition有三個副本:partition0:

          a,b,c

          • a:leader partition
          • b,c:follower partition
          ISR:{a,b,c}

          如果一個follower分區(qū) 超過10秒 沒有向leader partition去拉取數(shù)據(jù),那么這個分區(qū)就從ISR列表里面移除。

          場景二:核心topic增加副本因子

          如果對核心業(yè)務數(shù)據(jù)需要增加副本因子

          vim test.json腳本,將下面一行json腳本保存

          {“version”:1,“partitions”:[{“topic”:“test6”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:1,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:2,“replicas”:[0,1,2]}]}

          執(zhí)行上面json腳本:

          kafka-reassign-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --reassignment-json-file test.json --execute

          場景三:負載不均衡的topic,手動遷移

          vi topics-to-move.json

          {“topics”: [{“topic”: “test01”}, {“topic”: “test02”}], “version”: 1
          // 把你所有的topic都寫在這里

          kafka-reassgin-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topics-to-move-json-file topics-to-move.json --broker-list “5,6” --generate
          // 把你所有的包括新加入的broker機器都寫在這里,就會說是把所有的partition均勻的分散在各個broker上,包括新進來的broker

          此時會生成一個遷移方案,可以保存到一個文件里去:expand-cluster-reassignment.json

          kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

          kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

          這種數(shù)據(jù)遷移操作一定要在晚上低峰的時候來做,因為他會在機器之間遷移數(shù)據(jù),非常的占用帶寬資源

          • generate: 根據(jù)給予的Topic列表和Broker列表生成遷移計劃。generate并不會真正進行消息遷移,而是將消息遷移計劃計算出來,供execute命令使用。
          • execute: 根據(jù)給予的消息遷移計劃進行遷移。
          • verify: 檢查消息是否已經(jīng)遷移完成。

          場景四:如果某個broker leader partition過多

          正常情況下,我們的leader partition在服務器之間是負載均衡。

          • hadoop1 4
          • hadoop2 1
          • hadoop3 1

          現(xiàn)在各個業(yè)務方可以自行申請創(chuàng)建topic,分區(qū)數(shù)量都是自動分配和后續(xù)動態(tài)調(diào)整的,kafka本身會自動把leader partition均勻分散在各個機器上,這樣可以保證每臺機器的讀寫吞吐量都是均勻的。

          但是也有例外,那就是如果某些broker宕機,會導致leader partition過于集中在其他少部分幾臺broker上,這會導致少數(shù)幾臺broker的讀寫請求壓力過高,其他宕機的broker重啟之后都是folloer partition,讀寫請求很低。

          造成集群負載不均衡有一個參數(shù),auto.leader.rebalance.enable,默認是true,每隔300秒(leader.imbalance.check.interval.seconds)檢查leader負載是否平衡

          如果一臺broker上的不均衡的leader超過了10%,leader.imbalance.per.broker.percentage,就會對這個broker進行選舉。

          配置參數(shù):

          • auto.leader.rebalance.enable 默認是true
          • leader.imbalance.per.broker.percentage: 每個broker允許的不平衡的leader的比率。如果每個broker超過了這個值,控制器會觸發(fā)leader的平衡。這個值表示百分比。10%
          • leader.imbalance.check.interval.seconds:默認值300秒

          13、Kafka生產(chǎn)者

          13.1 消費者發(fā)送消息原理

          13.2 消費者發(fā)送消息原理—基礎案例演示

          13.3 如何提升吞吐量

          如何提升吞吐量:參數(shù)一:buffer.memory:

          設置發(fā)送消息的緩沖區(qū),默認值是33554432,就是32MB

          參數(shù)二:compression.type:

          默認是none,不壓縮,但是也可以使用lz4壓縮,效率還是不錯的,壓縮之后可以減小數(shù)據(jù)量,提升吞吐量,但是會加大producer端的cpu開銷

          參數(shù)三:batch.size:

          • 設置batch的大小,如果batch太小,會導致頻繁網(wǎng)絡請求,吞吐量下降;
          • 如果batch太大,會導致一條消息需要等待很久才能被發(fā)送出去,而且會讓內(nèi)存緩沖區(qū)有很大壓力,過多數(shù)據(jù)緩沖在內(nèi)存里,默認值是:16384,就是16kb,也就是一個batch滿了16kb就發(fā)送出去,一般在實際生產(chǎn)環(huán)境,這個batch的值可以增大一些來提升吞吐量,如果一個批次設置大了,會有延遲。一般根據(jù)一條消息大小來設置。
          • 如果我們消息比較少。配合使用的參數(shù)linger.ms,這個值默認是0,意思就是消息必須立即被發(fā)送,但是這是不對的,一般設置一個100毫秒之類的,這樣的話就是說,這個消息被發(fā)送出去后進入一個batch,如果100毫秒內(nèi),這個batch滿了16kb,自然就會發(fā)送出去。

          13.4 如何處理異常

          1、LeaderNotAvailableException:

          這個就是如果某臺機器掛了,此時leader副本不可用,會導致你寫入失敗,要等待其他follower副本切換為leader副本之后,才能繼續(xù)寫入,此時可以重試發(fā)送即可;如果說你平時重啟kafka的broker進程,肯定會導致leader切換,一定會導致你寫入報錯,是LeaderNotAvailableException。

          2、NotControllerException:

          這個也是同理,如果說Controller所在Broker掛了,那么此時會有問題,需要等待Controller重新選舉,此時也是一樣就是重試即可。

          3、NetworkException:網(wǎng)絡異常 timeout

          • 配置retries參數(shù),他會自動重試的
          • 但是如果重試幾次之后還是不行,就會提供Exception給我們來處理了,我們獲取到異常以后,再對這個消息進行單獨處理。我們會有備用的鏈路。發(fā)送不成功的消息發(fā)送到Redis或者寫到文件系統(tǒng)中,甚至是丟棄。

          13.5 重試機制

          重試會帶來一些問題:

          消息重復

          有的時候一些leader切換之類的問題,需要進行重試,設置retries即可,但是消息重試會導致,重復發(fā)送的問題,比如說網(wǎng)絡抖動一下導致他以為沒成功,就重試了,其實人家都成功了.

          消息亂序消息重試是可能導致消息的亂序的,因為可能排在你后面的消息都發(fā)送出去了。所以可以使用" max.in.flight.requests.per.connection"參數(shù)設置為1,這樣可以保證producer同一時間只能發(fā)送一條消息。

          兩次重試的間隔默認是100毫秒,用"retry.backoff.ms"來進行設置,基本上在開發(fā)過程中,靠重試機制基本就可以搞定95%的異常問題。

          13.6 ACK參數(shù)詳解

          producer端

          request.required.acks=0;

          • 只要請求已發(fā)送出去,就算是發(fā)送完了,不關心有沒有寫成功。
          • 性能很好,如果是對一些日志進行分析,可以承受丟數(shù)據(jù)的情況,用這個參數(shù),性能會很好。

          request.required.acks=1;

          • 發(fā)送一條消息,當leader partition寫入成功以后,才算寫入成功。
          • 不過這種方式也有丟數(shù)據(jù)的可能。

          request.required.acks=-1;

          • 需要ISR列表里面,所有副本都寫完以后,這條消息才算寫入成功。
          • ISR:1個副本。1 leader partition 1 follower partition

          kafka服務端:

          min.insync.replicas:1

          如果我們不設置的話,默認這個值是1,一個leader partition會維護一個ISR列表,這個值就是限制ISR列表里面,至少得有幾個副本,比如這個值是2,那么當ISR列表里面只有一個副本的時候。往這個分區(qū)插入數(shù)據(jù)的時候會報錯。

          設計一個不丟數(shù)據(jù)的方案:

          • 分區(qū)副本 >=2
          • acks = -1
          • min.insync.replicas >=2

          還有可能就是發(fā)送有異常:對異常進行處理

          13.7 自定義分區(qū)

          分區(qū):

          • 沒有設置key

          我們的消息就會被輪訓的發(fā)送到不同的分區(qū)。

          • 設置了key

          kafka自帶的分區(qū)器,會根據(jù)key計算出來一個hash值,這個hash值會對應某一個分區(qū)。

          如果key相同的,那么hash值必然相同,key相同的值,必然是會被發(fā)送到同一個分區(qū)。

          但是有些比較特殊的時候,我們就需要自定義分區(qū)

          public class HotDataPartitioner implements Partitioner {
          private Random random;
          @Override
          public void configure(Map<String, ?> configs) {
          random = new Random();
          }
          @Override
          public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
          String key = (String)keyObj;
          List partitionInfoList = cluster.availablePartitionsForTopic(topic);
          //獲取到分區(qū)的個數(shù) 0,1,2
          int partitionCount = partitionInfoList.size();
          //最后一個分區(qū)
          int hotDataPartition = partitionCount - 1;
          return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;
          }
          }

          如何使用:

          配置上這個類即可:props.put(”partitioner.class”, “com.zhss.HotDataPartitioner”);

          13.8 綜合案例演示

          需求分析:

          電商背景 -》 二手的電商平臺

          【歡樂送】的項目,用戶購買了東西以后會有【星星】,用星星去換物品。一塊錢一個星星。

          訂單系統(tǒng)(消息的生產(chǎn)),發(fā)送一條消息(支付訂單,取消訂單) -> Kafka <- 會員系統(tǒng),從kafak里面去消費數(shù)據(jù),找到對應用戶消費的金額,然后給該用戶更新星星的數(shù)量。

          分析一下:

          發(fā)送消息的時候,可以指定key,也可以不指定key。

          1)如果不指定key

          • zhangsan ->下訂單 -> 100 -> +100
          • zhangsan -> 取消訂單 -> -100 -> -100
          • 會員系統(tǒng)消費數(shù)據(jù)的時候,有可能先消費到的是 取消訂單的數(shù)據(jù)。

          2)如果指定key,key -> hash(數(shù)字) -> 對應分區(qū)號 -> 發(fā)送到對應的分區(qū)里面。

          • 如果key相同的 -> 數(shù)據(jù)肯定會被發(fā)送到同一個分區(qū)(有序的)

          這個項目需要指定key,把用戶的id指定為key.

          14、Kafka消費者

          14.1 消費組概念

          groupid相同就屬于同一個消費組

          1)每個consumer都要屬于一個consumer.group,就是一個消費組,topic的一個分區(qū)只會分配給一個消費組下的一個consumer來處理,每個consumer可能會分配多個分區(qū),也有可能某個consumer沒有分配到任何分區(qū)。

          2)如果想要實現(xiàn)一個廣播的效果,那只需要使用不同的group id去消費就可以。

          topicA:

          • partition0、partition1

          groupA:

          • consumer1:消費 partition0
          • consuemr2:消費 partition1
          • consuemr3:消費不到數(shù)據(jù)

          groupB:

          • consuemr3:消費到partition0和partition1

          3)如果consumer group中某個消費者掛了,此時會自動把分配給他的分區(qū)交給其他的消費者,如果他又重啟了,那么又會把一些分區(qū)重新交還給他

          14.2 基礎案例演示

          14.3 偏移量管理

          每個consumer內(nèi)存里數(shù)據(jù)結(jié)構(gòu)保存對每個topic的每個分區(qū)的消費offset,定期會提交offset,老版本是寫入zk,但是那樣高并發(fā)請求zk是不合理的架構(gòu)設計,zk是做分布式系統(tǒng)的協(xié)調(diào)的,輕量級的元數(shù)據(jù)存儲,不能負責高并發(fā)讀寫,作為數(shù)據(jù)存儲。

          現(xiàn)在新的版本提交offset發(fā)送給kafka內(nèi)部topic:__consumer_offsets,提交過去的時候, key是group.id+topic+分區(qū)號,value就是當前offset的值,每隔一段時間,kafka內(nèi)部會對這個topic進行compact(合并),也就是每個group.id+topic+分區(qū)號就保留最新數(shù)據(jù)。

          __consumer_offsets可能會接收高并發(fā)的請求,所以默認分區(qū)50個(leader partitiron -> 50 kafka),這樣如果你的kafka部署了一個大的集群,比如有50臺機器,就可以用50臺機器來抗offset提交的請求壓力。

          • 消費者 -> broker端的數(shù)據(jù)
          • message -> 磁盤 -> offset 順序遞增
          • 從哪兒開始消費?-> offset
          • 消費者(offset)

          14.4 偏移量監(jiān)控工具介紹

          web頁面管理的一個管理軟件(kafka Manager)

          • 修改bin/kafka-run-class.sh腳本,第一行增加JMX_PORT=9988
          • 重啟kafka進程

          另一個軟件:主要監(jiān)控的consumer的偏移量。

          就是一個jar包java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar

          com.quantifind.kafka.offsetapp.OffsetGetterWeb

          • offsetStorage kafka \(根據(jù)版本:偏移量存在kafka就填kafka,存在zookeeper就填zookeeper)
          • zk hadoop1:2181
          • port 9004
          • refresh 15.seconds
          • retain 2.days

          寫了一段程序 ,消費kafka里面的數(shù)據(jù)(consumer,處理數(shù)據(jù) -> 業(yè)務代碼) -> Kafka 如何去判斷你的這段代碼真的是實時的去消費的呢?

          延遲幾億條數(shù)據(jù) -> 閾值(20萬條的時候 發(fā)送一個告警。)

          14.5 消費異常感知

          heartbeat.interval.ms

          • consumer心跳時間間隔,必須得與coordinator保持心跳才能知道consumer是否故障了,
          • 然后如果故障之后,就會通過心跳下發(fā)rebalance的指令給其他的consumer通知他們進行rebalance的操作

          session.timeout.ms

          • kafka多長時間感知不到一個consumer就認為他故障了,默認是10秒

          max.poll.interval.ms

          • 如果在兩次poll操作之間,超過了這個時間,那么就會認為這個consume處理能力太弱了,會被踢出消費組,分區(qū)分配給別人去消費,一般來說結(jié)合業(yè)務處理的性能來設置就可以了。

          14.6 核心參數(shù)解釋

          fetch.max.bytes

          獲取一條消息最大的字節(jié)數(shù),一般建議設置大一些,默認是1M 其實我們在之前多個地方都見到過這個類似的參數(shù),意思就是說一條信息最大能多大?

          1. Producer:發(fā)送的數(shù)據(jù),一條消息最大多大, -> 10M
          2. Broker:存儲數(shù)據(jù),一條消息最大能接受多大 -> 10M
          3. Consumer:

          max.poll.records:

          一次poll返回消息的最大條數(shù),默認是500條

          connection.max.idle.ms

          consumer跟broker的socket連接如果空閑超過了一定的時間,此時就會自動回收連接,但是下次消費就要重新建立socket連接,這個建議設置為-1,不要去回收

          enable.auto.commit:

          開啟自動提交偏移量

          auto.commit.interval.ms:

          每隔多久提交一次偏移量,默認值5000毫秒

          auto.offset.reset

          • earliest:當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費

          • latest:當各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù)

          • none:topic各分區(qū)都存在已提交的offset時,從offset后開始消費;只要有一個分區(qū)不存在已提交的offset,則拋出異常

          14.7 綜合案例演示

          引入案例:二手電商平臺(歡樂送),根據(jù)用戶消費的金額,對用戶星星進行累計。

          • 訂單系統(tǒng)(生產(chǎn)者) -> Kafka集群里面發(fā)送了消息。
          • 會員系統(tǒng)(消費者) -> Kafak集群里面消費消息,對消息進行處理。

          14.8 group coordinator原理

          面試題:消費者是如何實現(xiàn)rebalance的?— 根據(jù)coordinator實現(xiàn)

          什么是coordinator

          每個consumer group都會選擇一個broker作為自己的coordinator,他是負責監(jiān)控這個消費組里的各個消費者的心跳,以及判斷是否宕機,然后開啟rebalance的

          如何選擇coordinator機器

          首先對groupId進行hash(數(shù)字),接著對__consumer_offsets的分區(qū)數(shù)量取模,默認是50,_consumer_offsets的分區(qū)數(shù)可以通過offsets.topic.num.partitions來設置,找到分區(qū)以后,這個分區(qū)所在的broker機器就是coordinator機器。

          比如說:groupId,“myconsumer_group” -> hash值(數(shù)字)-> 對50取模 -> 8__consumer_offsets 這個主題的8號分區(qū)在哪臺broker上面,那一臺就是coordinator 就知道這個consumer group下的所有的消費者提交offset的時候是往哪個分區(qū)去提交offset,

          運行流程

          • 每個consumer都發(fā)送JoinGroup請求到Coordinator,
          • 然后Coordinator從一個consumer group中選擇一個consumer作為leader,
          • 把consumer group情況發(fā)送給這個leader,
          • 接著這個leader會負責制定消費方案,
          • 通過SyncGroup發(fā)給Coordinator
          • 接著Coordinator就把消費方案下發(fā)給各個consumer,他們會從指定的分區(qū)的

          leader broker開始進行socket連接以及消費消息

          14.9 rebalance策略

          consumer group靠coordinator實現(xiàn)了Rebalance

          這里有三種rebalance的策略:range、round-robin、sticky

          比如我們消費的一個主題有12個分區(qū):

          p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11

          假設我們的消費者組里面有三個消費者

          range策略

          • range策略就是按照partiton的序號范圍
          • p0~3 consumer1
          • p4~7 consumer2
          • p8~11 consumer3
          • 默認就是這個策略;

          round-robin策略

          • 就是輪詢分配
          • consumer1:0,3,6,9
          • consumer2:1,4,7,10
          • consumer3:2,5,8,11

          但是前面的這兩個方案有個問題:12 -> 2 每個消費者會消費6個分區(qū)

          假設consuemr1掛了:p0-5分配給consumer2,p6-11分配給consumer3,這樣的話,原本在consumer2上的的p6,p7分區(qū)就被分配到了 consumer3上。

          sticky策略

          最新的一個sticky策略,就是說盡可能保證在rebalance的時候,讓原本屬于這個consumer的分區(qū)還是屬于他們,然后把多余的分區(qū)再均勻分配過去,這樣盡可能維持原來的分區(qū)分配的策略

          • consumer1:0-3
          • consumer2: 4-7
          • consumer3: 8-11

          假設consumer3掛了

          • consumer1:0-3,+8,9
          • consumer2: 4-7,+10,11

          15、Broker管理

          15.1 Leo、hw含義

          • Kafka的核心原理
          • 如何去評估一個集群資源
          • 搭建了一套kafka集群 -》 介紹了簡單的一些運維管理的操作。
          • 生產(chǎn)者(使用,核心的參數(shù))
          • 消費者(原理,使用的,核心參數(shù))
          • broker內(nèi)部的一些原理,核心的概念:LEO,HW

          LEO:是跟offset偏移量有關系。

          LEO:

          在kafka里面,無論leader partition還是follower partition統(tǒng)一都稱作副本(replica)。

          每次partition接收到一條消息,都會更新自己的LEO,也就是log end offset,LEO其實就是最新的offset + 1

          HW:高水位

          LEO有一個很重要的功能就是更新HW,如果follower和leader的LEO同步了,此時HW就可以更新

          HW之前的數(shù)據(jù)對消費者是可見,消息屬于commit狀態(tài)。HW之后的消息消費者消費不到。

          15.2 Leo更新

          15.3 hw更新

          15.4 controller如何管理整個集群

          1: 競爭controller的

          • /controller/id

          2:controller服務監(jiān)聽的目錄:

          • /broker/ids/ 用來感知 broker上下線
          • /broker/topics/ 創(chuàng)建主題,我們當時創(chuàng)建主題命令,提供的參數(shù),ZK地址。
          • /admin/reassign_partitions 分區(qū)重分配

          15.5 延時任務

          kafka的延遲調(diào)度機制(擴展知識)

          我們先看一下kafka里面哪些地方需要有任務要進行延遲調(diào)度。

          第一類延時的任務:

          比如說producer的acks=-1,必須等待leader和follower都寫完才能返回響應。

          有一個超時時間,默認是30秒(request.timeout.ms)。

          所以需要在寫入一條數(shù)據(jù)到leader磁盤之后,就必須有一個延時任務,到期時間是30秒延時任務 放到DelayedOperationPurgatory(延時管理器)中。

          假如在30秒之前如果所有follower都寫入副本到本地磁盤了,那么這個任務就會被自動觸發(fā)蘇醒,就可以返回響應結(jié)果給客戶端了,否則的話,這個延時任務自己指定了最多是30秒到期,如果到了超時時間都沒等到,就直接超時返回異常。

          第二類延時的任務:

          follower往leader拉取消息的時候,如果發(fā)現(xiàn)是空的,此時會創(chuàng)建一個延時拉取任務

          延時時間到了之后(比如到了100ms),就給follower返回一個空的數(shù)據(jù),然后follower再次發(fā)送請求讀取消息,但是如果延時的過程中(還沒到100ms),leader寫入了消息,這個任務就會自動蘇醒,自動執(zhí)行拉取任務。

          海量的延時任務,需要去調(diào)度。

          15.6 時間輪機制

          1.什么會有要設計時間輪?

          Kafka內(nèi)部有很多延時任務,沒有基于JDK Timer來實現(xiàn),那個插入和刪除任務的時間復雜度是O(nlogn),而是基于了自己寫的時間輪來實現(xiàn)的,時間復雜度是O(1),依靠時間輪機制,延時任務插入和刪除,O(1)

          2.時間輪是什么?

          其實時間輪說白其實就是一個數(shù)組。

          • tickMs:時間輪間隔 1ms
          • wheelSize:時間輪大小 20
          • interval:timckMS * whellSize,一個時間輪的總的時間跨度。20ms
          • currentTime:當時時間的指針。
            • a:因為時間輪是一個數(shù)組,所以要獲取里面數(shù)據(jù)的時候,靠的是index,時間復雜度是O(1)
            • b:數(shù)組某個位置上對應的任務,用的是雙向鏈表存儲的,往雙向鏈表里面插入,刪除任務,時間復雜度也是O(1)

          3.多層級的時間輪

          比如:要插入一個110毫秒以后運行的任務。

          • tickMs:時間輪間隔 20ms
          • wheelSize:時間輪大小 20
          • interval:timckMS * whellSize,一個時間輪的總的時間跨度。20ms
          • currentTime:當時時間的指針。
            • 第一層時間輪:1ms * 20
            • 第二層時間輪:20ms * 20
            • 第三層時間輪:400ms * 20

          作者:erainm

          來源:blog.csdn.net/eraining/article/details/115860664

          瀏覽 36
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人网站三级在线视频网站 | 三级日韩| 水蜜桃一区二区AV | AV久成人| 国产精品无码在线看 |