Kafka簡(jiǎn)明教程
Kafka是啥?用Kafka官方的話來說就是:
Kafka is used for building?real-time data?pipelines and streaming apps. It is?horizontally scalable,?fault-tolerant,?wicked fast, and runs in production in thousands of companies.
大致的意思就是,這是一個(gè)實(shí)時(shí)數(shù)據(jù)處理系統(tǒng),可以橫向擴(kuò)展、高可靠,而且還變態(tài)快,已經(jīng)被很多公司使用。
那么什么是實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)呢?顧名思義,實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)就是數(shù)據(jù)一旦產(chǎn)生,就要能快速進(jìn)行處理的系統(tǒng)。
對(duì)于實(shí)時(shí)數(shù)據(jù)處理,我們最常見的,就是消息中間件了,也叫MQ(Message Queue,消息隊(duì)列),也有叫Message Broker的。
這篇文章,我將從消息中間件的角度,帶大家看看Kafka的內(nèi)部結(jié)構(gòu),看看它是如何做到橫向擴(kuò)展、高可靠的同時(shí),還能變態(tài)快的。
1、為什么需要消息中間件消息中間件的作用主要有兩點(diǎn):
解耦消息的生產(chǎn)和消費(fèi)。
緩沖。
想象一個(gè)場(chǎng)景,你的一個(gè)創(chuàng)建訂單的操作,在訂單創(chuàng)建完成之后,需要觸發(fā)一系列其他的操作,比如進(jìn)行用戶訂單數(shù)據(jù)的統(tǒng)計(jì)、給用戶發(fā)送短信、給用戶發(fā)送郵件等等,就像這樣:
createOrder(...) {
? ...
? statOrderData(...);
? sendSMS();
? sendEmail();
}
代碼這樣寫似乎沒什么問題,可是過了一段時(shí)間,你給系統(tǒng)引進(jìn)了一個(gè)用戶行為分析服務(wù),它也需要在訂單創(chuàng)建完成之后,進(jìn)行一個(gè)分析用戶行為的操作,而且隨著系統(tǒng)的逐漸壯大,創(chuàng)建訂單之后要觸發(fā)的操作也就越來越多,代碼也漸漸膨脹成這樣:
createOrder(...) {
? ...
? statOrderData(...);
? sendSMS();
? sendEmail();
? // new operation
? statUserBehavior(...);
? doXXX(...);
? doYYY(...);
? // more and more operations
? ...
}
導(dǎo)致代碼越來越膨脹的癥結(jié)在于,消息的生產(chǎn)和消費(fèi)耦合在一起了。createOrder方法不僅僅要負(fù)責(zé)生產(chǎn)“訂單已創(chuàng)建”這條消息,還要負(fù)責(zé)處理這條消息。
這就好比BBC的記者,在知道皇馬拿到歐冠冠軍之后,拿起手機(jī),翻開皇馬球迷通訊錄,給球迷一個(gè)一個(gè)打電話,告訴他們,皇馬奪冠了。
事實(shí)上,BBC的記者只需要在他們官網(wǎng)發(fā)布這條消息,然后球迷自行訪問BBC,去上面獲取這條新聞;又或者球迷訂閱了BBC,那么訂閱系統(tǒng)會(huì)主動(dòng)把發(fā)布在官網(wǎng)的消息推送給球迷。
同樣,createOrder也需要一個(gè)像BBC官網(wǎng)那樣的載體,也就是消息中間件,在訂單創(chuàng)建完成之后,把一條主題為“orderCreated”的消息,放到消息中間件去就ok了,不必關(guān)心需要把這條消息發(fā)給誰。這就完成了消息的生產(chǎn)。
至于需要在訂單創(chuàng)建完成之后觸發(fā)操作的服務(wù),則只需要訂閱主題為“orderCreated”的消息,在消息中間件出現(xiàn)新的“orderCreated”消息時(shí),就會(huì)收到這條消息,然后進(jìn)行相應(yīng)的處理。
因此,通過使用消息中間件,上面的代碼也就簡(jiǎn)化成了:
createOrder(...) {
? ...
? sendOrderCreatedMessage(...);
}
以后如果在訂單創(chuàng)建之后有新的操作需要執(zhí)行,這串代碼也不需要修改,只需要給對(duì)消息進(jìn)行訂閱即可。
另外,通過這樣的解耦,消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)更加的靈活,不必每次消息一產(chǎn)生就要馬上去處理(雖然通常消費(fèi)者側(cè)也會(huì)有線程池等緩沖機(jī)制),可以等自己有空了的時(shí)候,再過來消息中間件這里取數(shù)據(jù)進(jìn)行處理。這就是消息中間件帶來的緩沖作用。
2、Kafka一代 - 消息隊(duì)列從上面的描述,我們可以看出,消息中間件之所以可以解耦消息的生產(chǎn)和消費(fèi),主要是它提供了一個(gè)存放消息的地方——生產(chǎn)者把消息放進(jìn)來,消費(fèi)者在從中取出消息進(jìn)行處理。
那么這個(gè)存放消息的地方,應(yīng)該采用什么數(shù)據(jù)結(jié)構(gòu)呢?
在絕大多數(shù)情況下,我們都希望先發(fā)送進(jìn)來的消息,可以先被處理(FIFO),這符合大多數(shù)的業(yè)務(wù)邏輯,少數(shù)情況下我們會(huì)給消息設(shè)置優(yōu)先級(jí)。不管怎樣,對(duì)于消息中間件來說,一個(gè)先進(jìn)先出的隊(duì)列,是非常合適的數(shù)據(jù)結(jié)構(gòu):
那么要怎樣保證消息可以被順序消費(fèi)呢?
消費(fèi)者過來獲取消息時(shí),每次都把index=0的數(shù)據(jù)返回過去,然后再刪除index=0的那條數(shù)據(jù)?
很明顯不行,因?yàn)橛嗛喠诉@條消息的消費(fèi)者數(shù)量,可能是0,也可能是1,還可能大于1。如果每次消費(fèi)完就刪除了,那么其他訂閱了這條消息的消費(fèi)者就獲取不到這條消息了。
事實(shí)上,Kafka會(huì)對(duì)數(shù)據(jù)進(jìn)行持久化存儲(chǔ)(至于存放多長(zhǎng)時(shí)間,這是可以配置的),消費(fèi)者端會(huì)記錄一個(gè)offset,表明該消費(fèi)者當(dāng)前消費(fèi)到哪條數(shù)據(jù),所以下次消費(fèi)者想繼續(xù)消費(fèi),只需從offset+1的位置繼續(xù)消費(fèi)就好了。
消費(fèi)者甚至可以通過調(diào)整offset的值,重新消費(fèi)以前的數(shù)據(jù)。
那么這就是Kafka了嗎?不,這只是一條非常普通的消息隊(duì)列,我們姑且叫它為Kafka一代吧。
這個(gè)Kafka一代用一條消息隊(duì)列實(shí)現(xiàn)了消息中間件,這樣的簡(jiǎn)單實(shí)現(xiàn)存在不少問題:
Topic魚龍混雜。想象一下,一個(gè)只訂閱了topic為“A”的消費(fèi)者,卻要在一條有ABCDEFG…等各種各樣topic的隊(duì)列里頭去尋找topic為A的消息,這樣性能豈不是很慢?
吞吐量低。我們把全部消息都放在一條隊(duì)列了,請(qǐng)求一多,它肯定應(yīng)付不過來。
由此就引申出了Kafka二代。
3、Kafka二代 - Partition要解決Kafka一代的那兩個(gè)問題,很簡(jiǎn)單——分布存儲(chǔ)。
二代Kafka引入了Partition的概念,也就是采用多條隊(duì)列, 每條隊(duì)列里面的消息都是相同的topic:
Partition的設(shè)計(jì)解決了上面提到的兩個(gè)問題:
純Topic隊(duì)列。一個(gè)隊(duì)列只有一種topic,消費(fèi)者再也不用擔(dān)心會(huì)碰到不是自己想要的topic的消息了。
提高吞吐量。不同topic的消息交給不同隊(duì)列去存儲(chǔ),再也不用以一敵十了。
一個(gè)隊(duì)列只有一種topic,但是一種topic的消息卻可以根據(jù)自定義的key值,分散到多條隊(duì)列中。也就是說,上圖的p1和p2,可以都是同一種topic的隊(duì)列。不過這是屬于比較高級(jí)的應(yīng)用了,以后有機(jī)會(huì)再和大家討論。
Kafka二代足夠完美了嗎?當(dāng)然不是,我們雖然通過Partition提升了性能,但是我們忽略了一個(gè)很重要的問題——高可用。
萬一機(jī)器掛掉了怎么辦?單點(diǎn)系統(tǒng)總是不可靠的。我們必須考慮備用節(jié)點(diǎn)和數(shù)據(jù)備份的問題。
4、Kafka三代 - Broker集群很明顯,為了解決高可用問題,我們需要集群。
Kafka對(duì)集群的支持也是非常友好的。在Kafka中,集群里的每個(gè)實(shí)例叫做Broker,就像這樣:
每個(gè)partition不再只有一個(gè),而是有一個(gè)leader(紅色)和多個(gè)replica(藍(lán)色),生產(chǎn)者根據(jù)消息的topic和key值,確定了消息要發(fā)往哪個(gè)partition之后(假設(shè)是p1),會(huì)找到partition對(duì)應(yīng)的leader(也就是broker2里的p1),然后將消息發(fā)給leader,leader負(fù)責(zé)消息的寫入,并與其余的replica進(jìn)行同步。
一旦某一個(gè)partition的leader掛掉了,那么只需提拔一個(gè)replica出來,讓它成為leader就ok了,系統(tǒng)依舊可以正常運(yùn)行。
通過Broker集群的設(shè)計(jì),我們不僅解決了系統(tǒng)高可用的問題,還進(jìn)一步提升了系統(tǒng)的吞吐量,因?yàn)閞eplica同樣可以為消費(fèi)者提供數(shù)據(jù)查找的功能。
5、Kafka沒那么簡(jiǎn)單這篇文章只是帶大家初步認(rèn)識(shí)一下Kafka,很多細(xì)節(jié)并沒有深入討論,比如:
Kafka的消息結(jié)構(gòu)?
我們只知道Kafka內(nèi)部是一個(gè)消息隊(duì)列,但是隊(duì)列里的元素長(zhǎng)什么樣,包含了哪些消息呢?
參考:Kafka - messageformat
Zookeeper和Kafka的關(guān)系?
如果玩過Kafka的Quick Start教程,就會(huì)發(fā)現(xiàn),我們?cè)谑褂肒afka時(shí),需要先啟動(dòng)一個(gè)ZK,那么這個(gè)ZK的作用到底是什么呢?
參考:What-is-the-actual-role-of-Zookeeper-in-Kafka
數(shù)據(jù)可靠性和重復(fù)消費(fèi)
生產(chǎn)者把消息發(fā)給Kafka,發(fā)送過程中掛掉、或者Kafka保存消息時(shí)發(fā)送異常怎么辦?
同理,消費(fèi)者獲取消費(fèi)時(shí)發(fā)生異常怎么辦?
甚至,如果消費(fèi)者已經(jīng)消費(fèi)了數(shù)據(jù),但是修改offset時(shí)失敗了,導(dǎo)致重復(fù)消費(fèi)怎么辦?
等等這些異常場(chǎng)景,都是Kafka需要考慮的。
參考:Kafka - Message Delivery Semantics
pull or push
消費(fèi)者側(cè)在獲取消息時(shí),是通過主動(dòng)去pull消息呢?還是由Kafka給消費(fèi)者push消息?
這兩種方式各自有什么優(yōu)劣?
參考:Kafka - push vs pull
如何提高消費(fèi)者處理性能
還是之前的訂單創(chuàng)建的例子,訂單創(chuàng)建后,你要給用戶發(fā)送短信,現(xiàn)在你發(fā)現(xiàn)由于你只有一個(gè)消費(fèi)者在發(fā)送短信,忙不過來,怎么辦?這就有了Kafka里頭的消費(fèi)者組(Consumer Group)的設(shè)計(jì)。
參考:Understanding-kafka-consumer-groups-and-consumer
……
終極問題:一條消息從生產(chǎn),到被消費(fèi),完整流程是怎樣的?
如果能詳盡透徹地回答這個(gè)問題,那你對(duì)Kafka的理解也就非常深入了。
6、總結(jié)本文從一個(gè)演化的視角,帶大家在Kafka的后花園里走馬觀花,逛了一圈。
很多細(xì)節(jié)并沒有深入討論,只是一個(gè)引子,希望能起到拋磚引玉的作用。
兩年嘔心瀝血的文章:「面試題」「基礎(chǔ)」「進(jìn)階」這里全都有!
300多篇原創(chuàng)技術(shù)文章加入交流群學(xué)習(xí)海量視頻資源精美腦圖面試題長(zhǎng)按掃碼可關(guān)注獲取?
在看和分享對(duì)我非常重要!![]()
