Kafka原理分析之基礎篇
點擊上方藍色字體,選擇“標星公眾號”
優(yōu)質(zhì)文章,第一時間送達
作者 | 碼頭工人
來源 | urlify.cn/ZRFFZf
一、Kafka
Kafka是一個分布式的消息系統(tǒng)。
二、解決問題
消息系統(tǒng)通常被應用于異步處理、應用解耦、流量削峰、消息通信等場景。
異步處理

生產(chǎn)者將消息寫入消息隊列中,消費者異步拉取消息隊列消息,從而提升消息處理能力。
應用解耦

Kafka作為消息傳遞的媒介,各子系統(tǒng)只需要做系統(tǒng)責任內(nèi)的事情。生產(chǎn)者-消費者模式,Kafka就是消息隊列。
流量削峰

正常情況下,上游服務(如報價、營銷等)常年流量較大,面對大流量時能夠較為從容地應對,但下游應用(如:交易、訂單等)由于常年流量較小,面對大流量時會因為準備不足,而導致系統(tǒng)被打垮,引發(fā)雪崩。
為了應對這一問題,可以利用消息隊列作為臨時數(shù)據(jù)存儲節(jié)點,消費者根據(jù)自身消費能力,通過拉取的方式控制消費速度,達到流量削峰的目的。
三、特性
讀寫效率
Kafka在面對大流量數(shù)據(jù)時,能夠高效地處理消息的存儲與查詢。通過軟件設計避免硬件讀取磁盤的性能瓶頸。
網(wǎng)絡傳輸
批量讀取消息,對消息進行批量壓縮,從而提升網(wǎng)絡利用率。
并發(fā)能力
Kafka支持消息分區(qū),每個分區(qū)內(nèi)保證消息的順序性,多分區(qū)之間能夠支持并發(fā)操作,提升Kafka并發(fā)操作。
持久化能力
Kafka將消息持久化至硬盤。網(wǎng)絡傳輸不可靠,所以需要將數(shù)據(jù)進行持久化。其中利用了零拷貝、順序讀、順序?qū)憽㈨摼彺娴燃夹g使Kafa具備高吞吐特性。
可靠性
支持分區(qū)多副本,Leader副本負責讀寫,F(xiàn)ollow副本只負責同步Leader副本數(shù)據(jù),實現(xiàn)消息冗余備份,提升Kafka容災能力。
水平擴展
多Producer、Broker、Consumer,均為分布式,多Consumer可以加入同一Consumer Group,每個分區(qū)只能分配一個Consumer,當Kafka服務端增加分區(qū)數(shù)量進行水平擴展時,可以向Consumer Group添加Consumer,提升消費能力。當Consumer Group中有Consumer出現(xiàn)故障下線時,能通過再平衡(Rebalance)對分區(qū)進行再分配。
四、基本概念
消息&批次
消息
(1)消息是Kafka的基本單位;
(2)消息由key和value的byte數(shù)組構(gòu)成;
(3)key能夠根據(jù)策略將消息發(fā)送到指定分區(qū)。
批次
(1)為了提升效率,消息被分批寫入kafka,同一組消息必須屬于同一主題的同一分區(qū);
(2)分批發(fā)送能夠降低網(wǎng)絡開銷,提升傳輸速度。
主題&分區(qū)
主題(Topic)是用于存儲消息分類關系的邏輯單元,可以看做存儲消息的集合。分區(qū)(partition)是Kafka數(shù)據(jù)存儲的基本單元,可以看做存儲消息的集合的子集。Kafka消息通過主題進行分類,同一Topic的不同分區(qū)(partition)會分配在不用的Broker上,分區(qū)機制提供橫向擴展的基礎,可以通過增加并在其上分配partition來提升Kafka的消息并行處理能力。

日志
Log基本概念
(1)分區(qū)邏輯上對應一個Log,生產(chǎn)者將消息寫入分區(qū)實際是寫入分區(qū)對應的Log;
(2)Log可以對應磁盤上的文件夾,其由多個Segment組成,每個Segment對應一個日志文件和索引文件;
(3)當Segment大小超出限制時,就會創(chuàng)建新的Segment;
(4)Kafka采用順序I/O,所以只會向最新的Segment追加數(shù)據(jù);
(5)索引采用稀疏索引,運行時將其映射至內(nèi)存中,提升索引速度。

Log保存與壓縮
日志保存
(1)時間限制
根據(jù)保留時間,當消息在kafka中保存的時間超過指定時間,就會被刪除。
(2)大小限制
根據(jù)Topic存儲大小,當Topic所占日志的大小大于一個閾值,則可以開始刪除最舊的消息。Kafka會啟動一個新的線程,定期檢查是否存在可以刪除的消息。
日志壓縮
很多場景中,Kafka消息的key與value值會不斷變化,就像數(shù)據(jù)庫中的數(shù)據(jù)會不斷被修改,消費者只會關心最新的key對應的value。如果開啟日志壓縮功能,Kafka會開啟線程,定時對相同key的消息進行合并,并保留最新的value值。
Broker
獨立的Kafka服務就是一個broker,broker主要的工作就是接受生產(chǎn)者發(fā)送來的消息,分配offset并保存到磁盤中。Broker除了接受生產(chǎn)者發(fā)送的消息,還處理消費者、其他Broker的請求,根據(jù)請求類型進行相應處理行和響應返回。正常情況下一臺機器對應一個broker。
副本
所謂副本就是對消息進程冗余備份,分布式系統(tǒng)在不同機器上相互保存對方數(shù)據(jù)。在Kafka中,每個分區(qū)(partition)可以有多個副本,每個副本中的消息是一樣的(在同一時刻,多臺機器之間的消息并不完全一致)。
生產(chǎn)者
生產(chǎn)者(Producer)的主要工作是生成消息。將消息發(fā)布根據(jù)規(guī)則推送到Topic的對應分區(qū)中。例如:(1)對key進行hash;(2)輪詢;(3)自定義。
消費者
消費者(Consumer)的主要工作消費消息。從對應分區(qū)中拉取Topic的消息進行消費。消費者需要通過offset記錄自己的消費位置。
消費者組
多個消費者(Consumer)構(gòu)成消費者組(Consumer Group)。消費者組(Consumer Group)訂閱的主題(Topic)的每個分區(qū)只能被分配給,在同一個消費者組中的一個消費者處理。但一個消費者可以消費同一主題(Topic)的多個分區(qū)。

消息傳遞模式

kafka沒有消息推送,只有消息拉取。但消費者可以通過輪詢拉取的方式實現(xiàn)消息推送功能。
Kafka架構(gòu)概圖

五、核心特性詳解
消費者
(1)消費者從訂閱的主題消費消息的偏移量保存至名字為"__consumer_offsets"的主題中;
(2)推薦使用Kafka來存儲消費者偏移量,zookeeper不適合高并發(fā)。
單消費者組
多個消費同一主題的消費者只要將group_id設置相同,就可以組成消費者組。
情況一:一個消費者組中,只有一個消費者。

情況二:消費者組中有多個消費者。

情況三:分區(qū)數(shù)與消費者組數(shù)相同。

情況四:消費者組中消費者數(shù)量大于分區(qū)數(shù)。閑置的消費者不會接收消息。

多消費者組
一個主題對應多個消費者組,每個消費者組都能夠消費該主題的所有消息。

心跳機制
Kafka的心跳機制保證Consumer和Broker之間的健康,當Broker Coordinator正常時,Consumer才會發(fā)送心跳。
再平衡機制
再平衡是規(guī)定消費者組下消費者與主題的分區(qū)之間發(fā)生變化時如何分配的協(xié)議。
再平衡觸發(fā)條件
(1)消費組內(nèi)消費者發(fā)生變化。(消費組數(shù)量變化,例如消費組宕機退出消費組)
(2)主題對應分區(qū)數(shù)發(fā)生變化。(kafka只支持增加分區(qū))
(3)訂閱主題發(fā)生變化。(消費組使用正則表達式訂閱主題,此時恰好新建了對應主題)
情況一:正常情況,每個分區(qū)只能分配給一個消費者。

情況二:消費者機器宕機,消費者退出消費組,觸發(fā)再平衡,重新給消費者組中的消費者分配分區(qū)。

情況三:Broker機器宕機,導致分區(qū)3無法提供服務。如果分區(qū)有副本則觸發(fā)再平衡,如果沒有副本則消費者3閑置。

情況四:使用正則表達式訂閱主題,當新增主題時,主題對應的分區(qū)會分配給當前消費者,會觸發(fā)再平衡。

避免再平衡
訂閱主題數(shù)和主題分區(qū)數(shù)發(fā)生變化,一般情況下是運維主動觸發(fā),正常情況下不需要避免再平衡。所以我們可以重點關注由消費者組消費者數(shù)量變化而引發(fā)的重平衡。
在再平衡完成后,每個消費者實例會定時向Coodinator發(fā)送心跳請求。
消費者判“死”條件
消費者沒有定期地向Coordinator發(fā)送心跳請求
(1)session.timeout.ms參數(shù)標識判定消費者死亡的時間閾值。參數(shù)默認值為10秒,即如果10秒內(nèi)沒有收到Group下的某Consumer實例的心跳請求,則被判定該Consumer實例“死亡”,移出Group。
(2)heartbeat.interval.ms參數(shù)標識心跳請求發(fā)送的頻率。值越小,Consumer實例發(fā)送心跳請求的頻率就越高。
規(guī)定時間內(nèi)沒有消費完poll方法返回的消息
(1)max.poll.interval.ms參數(shù)標識Consumer實例調(diào)用poll方法的最大時間間隔。默認值是5分鐘,表示Comsumer如果在5分鐘內(nèi)無法消費完poll方法返回的消息,則會被移出Group。
避免消費者被判“死”
避免被“條件1”判死
session.timeout.ms >= 3 * heartbeat.interval.ms。保證Consumer被判死前至少經(jīng)過3輪心跳請求。
例如:設置 session.timeout.ms = 6s;設置 heartbeat.interval.ms = 2s。
避免被“條件2”判死
盡可能將max.poll.interval.ms時間設置大一些。可以將消費者實例中的最長耗時作為依據(jù),再此基礎之上擴大1-1.5倍。為業(yè)務處理留下充足的處理時間,避免由于消息消費時間過長而導致再平衡。
位移管理
位移主題
Kafka中消費者根據(jù)消息的位移順序消費消息,消費者的位移由消費者管理,可以存儲在zookeeper中,可以存儲于Kafka主題__consumer_offse中hjmgbknjk.n,jvgnvmnn/.vt。sconsumer_offsets就是位移主題。
引入原因
(1)老版本的位移管理依托Zookeeper,會自動或手動的方式將位移數(shù)據(jù)提交至Zookeeper進行保存。當Consumer重啟后,它就能自動從Zookeeper中讀取位移數(shù)據(jù),從上次截止消費的地方繼續(xù)消費。這種設計是的Kafka Broker不需要保存位移數(shù)據(jù)。
(2)但Zookeeper不適合高頻寫操作,所以在0.8.2.x版本后新版本的Consumer推出了全新的位移管理機制。將Consumer的位移數(shù)據(jù)作為一條普通的Kafka消息,提交到__consumer_offsets。
(3)正情況下不需要修改它,也不可以隨意地向該主題寫消息,因為這會導致Kafka無法正常解析。
消息格式
(1)Key中包含GroupID、主題名、分區(qū)號;
(2)Value中包含位移值。
位移提交
(1)Consumer需要向Kafka記錄自己的位移數(shù)據(jù),這個匯報過程稱為 提交位移(Committing Offsets)
(2)Consumer 需要為分配給它的每個分區(qū)提交各自的位移數(shù)據(jù)
(3)位移提交的由Consumer端負責的,Kafka只負責保管。__consumer_offsets
(4)位移提交分為自動提交和手動提交
(5)位移提交分為同步提交和異步提交
自動提交
(1)設置enable.auto.commit值為true;
(2)通過auto.commit.interval.ms,可以設置自動提交的時間間隔,默認值為5秒;
(3)Kafka會保證在開始調(diào)用poll方法時,提交上次poll返回的所有消息的位移信息。poll方法的邏輯是先提交上一批消息的位移,再處理下一批消息,因此它能保證不出現(xiàn)消費丟失的情況;
(4)自動提交會出現(xiàn)消息重復消費。例:Consumer每 5s提交offset,當提交位移信息后3秒發(fā)生了再平衡,所有Consumer都會從上次提交的offset開始消費,但此時獲取的offset已經(jīng)是3秒前的offset了,所以我們又會重新消費再平衡前3秒的所有數(shù)據(jù)。我們只能夠縮小提交offset的時間窗口,但無法避免重復消費。
手動提交
1、同步提交
(1)使用 KafkaConsumer#commitSync():會提交 KafkaConsumer#poll() 返回的最新offset;
(2)該方法為同步操作,等待直到 offset 被成功提交才返回;
1while (true) {
2 ConsumerRecords<String, String> records =
3 consumer.poll(Duration.ofSeconds(1));
4 process(records); // 處理消息
5 try {
6 consumer.commitSync();
7 } catch (CommitFailedException e) {
8 handle(e); // 處理提交失敗異常
9 }
(3)同步提交會使Consumer處于阻塞狀態(tài);
(4)同步提交在出現(xiàn)異常時會自動重試。
2、異步提交
(1)使用異步提交規(guī)避Consumer阻塞;
(2)異常(GC、網(wǎng)絡抖動)時使用同步提交進行重試。
1try {
2 while(true) {
3 ConsumerRecords<String, String> records =
4 consumer.poll(Duration.ofSeconds(1));
5 process(records); // 處理消息
6 commitAysnc(); // 使用異步提交規(guī)避阻塞
7 }
8} catch(Exception e) {
9 handle(e); // 處理異常
10} finally {
11 try {
12 consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
13 } finally {
14 consumer.close();
15 }
分區(qū)
分區(qū)(Partition)是Kafka數(shù)據(jù)的基本單元。同一個主題(topic)數(shù)據(jù)會被分散存儲到多個partion中,這些分區(qū)可以被分配到同一臺機器或不同機器上。優(yōu)點是有利于水平擴展,避免單臺機器在磁盤空間和性能上的限制,同時可以通過復制來增加數(shù)據(jù)冗余,從而提升容災能力。為了做到均勻分布,一般partition的數(shù)量一般是Broker Server數(shù)量的整數(shù)倍。
副本機制
副本機制的優(yōu)點
分區(qū)擁有多個副本,提供冗余數(shù)據(jù),有利于確保kafka的高可用性。
副本定義
(1)每個主題可以分為多個分區(qū),每個分區(qū)下配置多個副本;
(2)副本的本質(zhì)是一個只能追加寫消息的提交日志;
(3)同一分區(qū)下的所有副本保存相同的消息序列;
(4)分區(qū)寫的不同副本分散保存在不同Broker上,應對Broker宕機時分區(qū)數(shù)據(jù)不可用的情況。

副本角色
同分區(qū)多副本,如何保證副本消息一致性?
最常見的解決方案是基于領導者(Leader-based)的副本機制。

1、副本分為兩類
(1)領導者副本;
(2)追隨者副本。
2、在Kafka的副本機制與其他分布式系統(tǒng)不同
(1)在kafka中,追隨者副本不對外提供服務。所有請求都必須由領導者副本來處理;
(2)追隨者副本的唯一任務就是從領導者副本異步拉取消息,并寫入自己的提交日志中,從而實現(xiàn)與領導者副本的同步。
3、領導者副本所處Broker宕機
(1)Kafka依托Zookeeper提供的監(jiān)控功能能夠感知Broker宕機,并開啟一輪新的選舉;
(2)老Leader副本重啟后,只能作為追隨者副本加入集群中。
追隨者副本不對外提供服務的原因
1、方便實現(xiàn)“Read-your-writes”
(1)生產(chǎn)者使用API想Kafka寫入消息成功后,能夠立馬使用消費者API查看到剛才生產(chǎn)的信息。
(2)如果允許追隨者副本對外提供服務,由于追隨者副本是異步的,因此就可能出現(xiàn)追隨者副本沒有從領導者副本拉取到最新消息的情況,就會出現(xiàn)無法立刻讀到最新寫入的消息。
2、方便實現(xiàn)單調(diào)讀(Monotonic Reads)
(1)什么是單調(diào)讀?對于消息消費者而言,消息不會時有時無。
(2)如果允許追隨者副本對外提供服務,由于追隨者副本是異步的,多個副本從領導者副本拉取的消息不一定同步,就會出現(xiàn)多次請求讀取不同的追隨者副本的情況,數(shù)據(jù)讀取時有時無。如果讀取全由領導者副本來處理,那么Kafka就很實現(xiàn)單調(diào)讀一致性。
同步副本(ISR)與非同步副本(OSR)
由于追隨者副本需要異步去拉取領導者副本,那么我們就需要確定再怎么樣才算與領導者副本同步。
Kafka引入了In-Sync Replicas,也就是ISR(同步)副本集合,該副本在Zookeeper上維護。如果存在于ISR中則意味著與領導者副本同步,相反則為非同步副本(OSR)
同步副本的標準

(1)replica.lag.time.max.ms參數(shù)值標識Follower副本能夠慢于Leader副本的最長時間間隔,默認值為10秒。
(2)若Follower副本落后于Leader副本的最長連續(xù)時間間隔不超過該replica.lag.time.max.ms參數(shù)值設定的大小,則認定該Follower副本與Leader副本是同步的,否則認定為非同步,會將副本從ISR副本集合中移出(Follower副本的拉取速度慢于Leader副本寫入消息的速度,且時間間隔超過設定閾值)。
(3)ISR是動態(tài)調(diào)整集合,非靜態(tài)不變的。當Follower副本追上進度時,就會重新被添加會ISR集合。
HW
高水位(HW是High Watermark的縮寫),表示一個特定消息的偏移量,消費者只能拉取到這個offset之前的數(shù)據(jù)。
LEO
LEO是Log End Offset的縮寫,表示當前日志文件下一條待寫入消息的offset

Leader選舉
少部分副本宕機
(1)當Leader副本對應的broker宕機后,就會從Follower副本中選擇一個副本作為Leader;
(2)當宕機的broker恢復后就會重新從leader中pull數(shù)據(jù)。
全部副本宕機
unclean.leader.election.enable 控制是否允許 Unclean 領導者選舉。
(1)不開啟Unclean。等待ISR中的一個恢復,并選擇其當leader;(等待時間較長,可用性降低)
(2)開啟Unclean。選擇第一個恢復的副本作為新的leader,無論是否是ISR副本。(開啟會造成數(shù)據(jù)丟失)
(3)正常情況下建議不開啟,雖然犧牲了高可用性,但維護了數(shù)據(jù)一致性,避免消息丟失。
為什么不少數(shù)服從多數(shù)?
選擇Leader副本時如果需要超過半數(shù)的同步副本同意,算法所需的冗余同步副本較多。(一臺機器失敗,就需要3個同步副本)
物理存儲
存儲概述
基本概念
(1)Kafka使用日志文件保存生產(chǎn)者發(fā)送的消息;
(2)每條消息都有一個offset值表示它在分區(qū)中的偏移量;
(3)offset值是邏輯值并不是真實存在的物理地址。其類似于數(shù)據(jù)庫中的主鍵,唯一標識了數(shù)據(jù)庫表中的一條數(shù)據(jù)。而offset在Kafka中的某個分區(qū)唯一標識一條消息。
(4)Log與分區(qū)一一對應,Log并不是一個文件而是一個文件夾;
(5)文件夾以topicName_pratiitonID命名,分區(qū)消息全部都存儲在次文件夾下的日志文件中;
(6)Kafka通過分段的方式將Log分為多個LogSegment,LogSegment是邏輯概念,對應磁盤上的Log目錄下的一個日志文件和索引文件;
(7)日志文件的命名規(guī)則是[baseOffset].log,baseOffset是日志文件中第一條消息的offset;
(8)Kafka日志是順序追加的;
(9)每個日志文件都對應一個索引文件,索引文件使用稀疏索引的方式為文件日志中部分消息建立索引。
(10)日志文件結(jié)構(gòu)圖

(11)Log示例

創(chuàng)建了一個tp_demo_01的主題,其中存在6個partition,對應的每個partition下存在一個Topic-partition命名的消息日志。
(12)LogSegment示例

文件類別

日志存儲
索引
為了提升消息查找的速度,Kafka從0.8版本開始,為每個日志文件添加對應的索引文件。IndexFile與MassageSet File共同組成了LogSegment。
偏移量索引文件用于記錄消息偏移量與物理地址之間的映射關系。時間戳索引文件則根據(jù)時間戳查找對應的偏移量。
索引文件中的索引項的格式:每個索引項為8字節(jié),分為兩部分,第一部分是相對offset(4字節(jié)),即相對于baseOffset的偏移量(baseOffset就是基準偏移量,日志文件的命名以基準偏移量來命名)。第二部分是物理地址(4字節(jié)),即其索引消息在日志文件中對應的position位置。通過這兩部分就能實現(xiàn)offset與物理地址之間的映射。
消息壓縮

偏移量索引

舉例
假設需要查找startOffset為1067。需要將offset=1067轉(zhuǎn)換成對應的物理地址,其過程是怎樣的?
(1)將絕對offset轉(zhuǎn)化為相對offset,絕對offset減去baseOffset,得到相對offset=67;
(2)通過相對offset查找索引文件,得到(58,1632)索引項(通過跳表的方式定位到某一個index文件,再通過二分法找到不大于相對offset的最大索引項);
(3)從position為1632處開始順序查找,找到絕對offset=1067的消息;
(4)最終會得到offset為1070的位置消息。(因為消息被壓縮,offset=1067這條消息被壓縮后一起構(gòu)成offset=1070這條外層消息)。
六、參考
《Apache Kafka源碼剖析》
《極客時間-Kafka核心技術與實戰(zhàn)》
《拉鉤Java-Kafka》
七、總結(jié)
本文從場景、特性、基本概念、核心特性等多個維度較為詳細地闡述了Kafka的相關知識。關于kafka穩(wěn)定性與具體源碼實現(xiàn)會在進階篇中闡述。
粉絲福利:Java從入門到入土學習路線圖
??????

??長按上方微信二維碼 2 秒
感謝點贊支持下哈 
