終于有人把 kafka 原理說清楚了!

Java技術(shù)棧
www.javastack.cn
關(guān)注閱讀更多優(yōu)質(zhì)文章
為什么需要消息隊列
周末無聊刷著手機,某寶網(wǎng)APP突然蹦出來一條消息“為了回饋老客戶,女朋友買一送一,活動僅限今天!”。買一送一還有這種好事,那我可不能錯過!忍不住立馬點了去。于是選了兩個最新款,下單、支付一氣呵成!滿足的躺在床上,想著馬上有女朋友了,竟然幸福的失眠了……
第二天正常上著班,突然接到快遞小哥的電話:小哥:“你是xx嗎?你的女朋友到了,我現(xiàn)在在你樓下,你來拿一下吧!”。我:“這……我在上班呢,可以晚上送過來嗎?“。小哥:“晚上可不行哦,晚上我也下班了呢!”。于是兩個人僵持了很久……
最后小哥說,要不我?guī)湍惴诺綐窍滦》急憷臧桑阃砩舷掳嗔诉^來拿,尷尬的局面這才得以緩解!
回到正題,如果沒有小芳便利店,那快遞小哥和我的交互圖就應(yīng)該如下:

會出現(xiàn)什么情況呢?
1、為了這個女朋友,我請假回去拿(老板不批)。
2、小哥一直在你樓下等(小哥還有其他的快遞要送)。
3、周末再送(顯然等不及)。
4、這個女朋友我不要了(絕對不可能)!
小芳便利店出現(xiàn)后,交互圖就應(yīng)如下:

在上面例子中,“快遞小哥”和“買女朋友的我”就是需要交互的兩個系統(tǒng),小芳便利店就是我們本文要講的-“消息中間件”。總結(jié)下來小芳便利店(消息中間件)出現(xiàn)后有如下好處:
1、 解耦
快遞小哥手上有很多快遞需要送,他每次都需要先電話一一確認(rèn)收貨人是否有空、哪個時間段有空,然后再確定好送貨的方案。這樣完全依賴收貨人了!
如果快遞一多,快遞小哥估計的忙瘋了……如果有了便利店,快遞小哥只需要將同一個小區(qū)的快遞放在同一個便利店,然后通知收貨人來取貨就可以了,這時候快遞小哥和收貨人就實現(xiàn)了解耦!
2、 異步
快遞小哥打電話給我后需要一直在你樓下等著,直到我拿走你的快遞他才能去送其他人的。快遞小哥將快遞放在小芳便利店后,又可以干其他的活兒去了,不需要等待你到來而一直處于等待狀態(tài)。提高了工作的效率。
3、 削峰
假設(shè)雙十一我買了不同店里的各種商品,而恰巧這些店發(fā)貨的快遞都不一樣,有中通、圓通、申通、各種通等……更巧的是他們都同時到貨了!中通的小哥打來電話叫我去北門取快遞、圓通小哥叫我去南門、申通小哥叫我去東門。我一時手忙腳亂……
我們能看到在系統(tǒng)需要交互的場景中,使用消息隊列中間件真的是好處多多,基于這種思路,就有了豐巢、菜鳥驛站等比小芳便利店更專業(yè)的“中間件”了。最后,上面的故事純屬虛構(gòu)……
消息隊列通信的模式
通過上面的例子我們引出了消息中間件,并且介紹了消息隊列出現(xiàn)后的好處,這里就需要介紹消息隊列通信的兩種模式了:
一、 點對點模式

如上圖所示,點對點模式通常是基于拉取或者輪詢的消息傳送模型,這個模型的特點是發(fā)送到隊列的消息被一個且只有一個消費者進行處理。生產(chǎn)者將消息放入消息隊列后,由消費者主動的去拉取消息進行消費。
點對點模型的的優(yōu)點是消費者拉取消息的頻率可以由自己控制。但是消息隊列是否有消息需要消費,在消費者端無法感知,所以在消費者端需要額外的線程去監(jiān)控。
二、 發(fā)布訂閱模式

如上圖所示,發(fā)布訂閱模式是一個基于消息送的消息傳送模型,改模型可以有多種不同的訂閱者。生產(chǎn)者將消息放入消息隊列后,隊列會將消息推送給訂閱過該類消息的消費者(類似微信公眾號)。
由于是消費者被動接收推送,所以無需感知消息隊列是否有待消費的消息!但是consumer1、consumer2、consumer3由于機器性能不一樣,所以處理消息的能力也會不一樣,但消息隊列卻無法感知消費者消費的速度!
所以推送的速度成了發(fā)布訂閱模模式的一個問題!假設(shè)三個消費者處理速度分別是8M/s、5M/s、2M/s,如果隊列推送的速度為5M/s,則consumer3無法承受!如果隊列推送的速度為2M/s,則consumer1、consumer2會出現(xiàn)資源的極大浪費!
Kafka
上面簡單的介紹了為什么需要消息隊列以及消息隊列通信的兩種模式,接下來就到了我們本文的主角——kafka閃亮登場的時候了!
Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費者規(guī)模的網(wǎng)站中的所有動作流數(shù)據(jù),具有高性能、持久化、多副本備份、橫向擴展能力………?
一些基本的介紹這里就不展開了,網(wǎng)上有太多關(guān)于這些的介紹了,讀者可以自行百度一下!原來這才是 Kafka!(多圖+深入)這篇推薦看下。關(guān)注公眾號Java技術(shù)棧可以獲取更多 Kafka 系列教程。
基礎(chǔ)架構(gòu)及術(shù)語
話不多說,先看圖,通過這張圖我們來捋一捋相關(guān)的概念及之間的關(guān)系:

如果看到這張圖你很懵逼,木有關(guān)系!
我們先來分析相關(guān)概念
Producer:Producer即生產(chǎn)者,消息的產(chǎn)生者,是消息的入口。
kafka cluster:Broker:Broker是kafka實例,每個服務(wù)器上有一個或多個kafka的實例,我們姑且認(rèn)為每個broker對應(yīng)一臺服務(wù)器。每個kafka集群內(nèi)的broker都有一個不重復(fù)的編號,如圖中的broker-0、broker-1等……
Topic:消息的主題,可以理解為消息的分類,kafka的數(shù)據(jù)就保存在topic。在每個broker上都可以創(chuàng)建多個topic。
Partition:Topic的分區(qū),每個topic可以有多個分區(qū),分區(qū)的作用是做負(fù)載,提高kafka的吞吐量。同一個topic在不同的分區(qū)的數(shù)據(jù)是不重復(fù)的,partition的表現(xiàn)形式就是一個一個的文件夾!
Replication:每一個分區(qū)都有多個副本,副本的作用是做備胎。當(dāng)主分區(qū)(Leader)故障的時候會選擇一個備胎(Follower)上位,成為Leader。在kafka中默認(rèn)副本的最大數(shù)量是10個,且副本的數(shù)量不能大于Broker的數(shù)量,follower和leader絕對是在不同的機器,同一機器對同一個分區(qū)也只可能存放一個副本(包括自己)。
Message:每一條發(fā)送的消息主體。
Consumer:消費者,即消息的消費方,是消息的出口。
Consumer Group:我們可以將多個消費組組成一個消費者組,在kafka的設(shè)計中同一個分區(qū)的數(shù)據(jù)只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個topic的不同分區(qū)的數(shù)據(jù),這也是為了提高kafka的吞吐量!
Zookeeper:kafka集群依賴zookeeper來保存集群的的元信息,來保證系統(tǒng)的可用性。
工作流程分析
上面介紹了kafka的基礎(chǔ)架構(gòu)及基本概念,不知道大家看完有沒有對kafka有個大致印象,如果對還比較懵也沒關(guān)系!6個步驟,全方位掌握 Kafka,這篇建議看一。下。關(guān)注公眾號Java技術(shù)棧可以獲取更多 Kafka 系列教程。
我們接下來再結(jié)合上面的結(jié)構(gòu)圖分析kafka的工作流程,最后再回來整個梳理一遍我相信你會更有收獲!
發(fā)送數(shù)據(jù)
我們看上面的架構(gòu)圖中,producer就是生產(chǎn)者,是數(shù)據(jù)的入口。注意看圖中的紅色箭頭,Producer在寫入數(shù)據(jù)的時候永遠的找leader,不會直接將數(shù)據(jù)寫入follower!那leader怎么找呢?
寫入的流程又是什么樣的呢?我們看下圖:

發(fā)送的流程就在圖中已經(jīng)說明了,就不單獨在文字列出來了!需要注意的一點是,消息寫入leader后,follower是主動的去leader進行同步的!producer采用push模式將數(shù)據(jù)發(fā)布到broker,每條消息追加到分區(qū)中,順序?qū)懭氪疟P,所以保證同一分區(qū)內(nèi)的數(shù)據(jù)是有序的!寫入示意圖如下:

上面說到數(shù)據(jù)會寫入到不同的分區(qū),那kafka為什么要做分區(qū)呢?相信大家應(yīng)該也能猜到,分區(qū)的主要目的是:
1、 方便擴展。因為一個topic可以有多個partition,所以我們可以通過擴展機器去輕松的應(yīng)對日益增長的數(shù)據(jù)量。
2、 提高并發(fā)。以partition為讀寫單位,可以多個消費者同時消費數(shù)據(jù),提高了消息的處理效率。
熟悉負(fù)載均衡的朋友應(yīng)該知道,當(dāng)我們向某個服務(wù)器發(fā)送請求的時候,服務(wù)端可能會對請求做一個負(fù)載,將流量分發(fā)到不同的服務(wù)器,那在kafka中,如果某個topic有多個partition,producer又怎么知道該將數(shù)據(jù)發(fā)往哪個partition呢?
kafka中有幾個原則:
1、 partition在寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應(yīng)的partition。
2、 如果沒有指定partition,但是設(shè)置了數(shù)據(jù)的key,則會根據(jù)key的值hash出一個partition。
3、 如果既沒指定partition,又沒有設(shè)置key,則會輪詢選出一個partition。
保證消息不丟失是一個消息隊列中間件的基本保證,那producer在向kafka寫入消息的時候,怎么保證消息不丟失呢?
其實上面的寫入流程圖中有描述出來,那就是通過ACK應(yīng)答機制!在生產(chǎn)者向隊列寫入數(shù)據(jù)的時候可以設(shè)置參數(shù)來確定是否確認(rèn)kafka接收到數(shù)據(jù),這個參數(shù)可設(shè)置的值為0、1、all。
0代表producer往集群發(fā)送數(shù)據(jù)不需要等到集群的返回,不確保消息發(fā)送成功。安全性最低但是效率最高。
1代表producer往集群發(fā)送數(shù)據(jù)只要leader應(yīng)答就可以發(fā)送下一條,只確保leader發(fā)送成功。
all代表producer往集群發(fā)送數(shù)據(jù)需要所有的follower都完成從leader的同步才會發(fā)送下一條,確保leader發(fā)送成功和所有的副本都完成備份。安全性最高,但是效率最低。
最后要注意的是,如果往不存在的topic寫數(shù)據(jù),能不能寫入成功呢?kafka會自動創(chuàng)建topic,分區(qū)和副本的數(shù)量根據(jù)默認(rèn)配置都是1。
保存數(shù)據(jù)
Producer將數(shù)據(jù)寫入kafka后,集群就需要對數(shù)據(jù)進行保存了!kafka將數(shù)據(jù)保存在磁盤,可能在我們的一般的認(rèn)知里,寫入磁盤是比較耗時的操作,不適合這種高并發(fā)的組件。Kafka初始會單獨開辟一塊磁盤空間,順序?qū)懭霐?shù)據(jù)(效率比隨機寫入高)。
Partition 結(jié)構(gòu)前面說過了每個topic都可以分為一個或多個partition,如果你覺得topic比較抽象,那partition就是比較具體的東西了!Partition在服務(wù)器上的表現(xiàn)形式就是一個一個的文件夾,每個partition的文件夾下面會有多組segment文件,每組segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中沒有)三個文件, log文件就實際是存儲message的地方,而index和timeindex文件為索引文件,用于檢索消息。

如上圖,這個partition有三組segment文件,每個log文件的大小是一樣的,但是存儲的message數(shù)量是不一定相等的(每條的message大小不一致)。文件的命名是以該segment最小offset來命名的,如000.index存儲offset為0~368795的消息,kafka就是利用分段+索引的方式來解決查找效率的問題。
Message結(jié)構(gòu)上面說到log文件就實際是存儲message的地方,我們在producer往kafka寫入的也是一條一條的message,那存儲在log中的message是什么樣子的呢?消息主要包含消息體、消息大小、offset、壓縮類型……等等!
我們重點需要知道的是下面三個:
1、 offset:offset是一個占8byte的有序id號,它可以唯一確定每條消息在parition內(nèi)的位置!
2、 消息大小:消息大小占用4byte,用于描述消息的大小。
3、 消息體:消息體存放的是實際的消息數(shù)據(jù)(被壓縮過),占用的空間根據(jù)具體的消息而不一樣。
存儲策略無論消息是否被消費,kafka都會保存所有的消息。那對于舊數(shù)據(jù)有什么刪除策略呢?
1、 基于時間,默認(rèn)配置是168小時(7天)。
2、 基于大小,默認(rèn)配置是1073741824。需要注意的是,kafka讀取特定消息的時間復(fù)雜度是O(1),所以這里刪除過期的文件并不會提高kafka的性能!
消費數(shù)據(jù)
消息存儲在log文件后,消費者就可以進行消費了。在講消息隊列通信的兩種模式的時候講到過點對點模式和發(fā)布訂閱模式。
Kafka采用的是點對點的模式,消費者主動的去kafka集群拉取消息,與producer相同的是,消費者在拉取消息的時候也是找leader去拉取。
多個消費者可以組成一個消費者組(consumer group),每個消費者組都有一個組id!同一個消費組者的消費者可以消費同一topic下不同分區(qū)的數(shù)據(jù),但是不會組內(nèi)多個消費者消費同一分區(qū)的數(shù)據(jù)!!!
是不是有點繞。我們看下圖:

圖示是消費者組內(nèi)的消費者小于partition數(shù)量的情況,所以會出現(xiàn)某個消費者消費多個partition數(shù)據(jù)的情況,消費的速度也就不及只處理一個partition的消費者的處理速度!如果是消費者組的消費者多于partition的數(shù)量,那會不會出現(xiàn)多個消費者消費同一個partition的數(shù)據(jù)呢?
上面已經(jīng)提到過不會出現(xiàn)這種情況!多出來的消費者不消費任何partition的數(shù)據(jù)。所以在實際的應(yīng)用中,建議消費者組的consumer的數(shù)量與partition的數(shù)量一致!在保存數(shù)據(jù)的小節(jié)里面,我們聊到了partition劃分為多組segment,每個segment又包含.log、.index、.timeindex文件,存放的每條message包含offset、消息大小、消息體……
我們多次提到segment和offset,查找消息的時候是怎么利用segment+offset配合查找的呢?假如現(xiàn)在需要查找一個offset為368801的message是什么樣的過程呢?我們先看看下面的圖:

1、 先找到offset的368801message所在的segment文件(利用二分法查找),這里找到的就是在第二個segment文件。
2、 打開找到的segment中的.index文件(也就是368796.index文件,該文件起始偏移量為368796+1,我們要查找的offset為368801的message在該index內(nèi)的偏移量為368796+5=368801,所以這里要查找的相對offset為5)。由于該文件采用的是稀疏索引的方式存儲著相對offset及對應(yīng)message物理偏移量的關(guān)系,所以直接找相對offset為5的索引找不到,這里同樣利用二分法查找相對offset小于或者等于指定的相對offset的索引條目中最大的那個相對offset,所以找到的是相對offset為4的這個索引。
3、 根據(jù)找到的相對offset為4的索引確定message存儲的物理偏移位置為256。打開數(shù)據(jù)文件,從位置為256的那個地方開始順序掃描直到找到offset為368801的那條Message。
這套機制是建立在offset為有序的基礎(chǔ)上,利用segment+有序offset+稀疏索引+二分查找+順序查找等多種手段來高效的查找數(shù)據(jù)!
至此,消費者就能拿到需要處理的數(shù)據(jù)進行處理了。那每個消費者又是怎么記錄自己消費的位置呢?
在早期的版本中,消費者將消費到的offset維護zookeeper中,consumer每間隔一段時間上報一次,這里容易導(dǎo)致重復(fù)消費,且性能不好!在新的版本中消費者消費到的offset已經(jīng)直接維護在kafk集群的_consumeroffsets這個topic中!





關(guān)注Java技術(shù)棧看更多干貨


