kafka初探
Apache Kafka的流行歸功于它設(shè)計(jì)和操作簡單、存儲(chǔ)系統(tǒng)高效、充分利用磁盤順序讀寫等特性、非常適合在線日志收集等高吞吐場景。
初識(shí)kafka集群結(jié)構(gòu):

kafaka集群的broker和Consumer都需要連接Zookeeper進(jìn)行集群配置管理,Producer 直接連接 Broker。Producer把數(shù)據(jù)上傳到Broker,Producer可以指定數(shù)據(jù)有幾個(gè)分區(qū)、幾個(gè)備份。
上面的圖中,數(shù)據(jù)有兩個(gè)分區(qū) 0、1,每個(gè)分區(qū)都有自己的副本:0'、 1'。黃色的分區(qū)為leader,白色的為follower。
和其他消息隊(duì)列相比,Kafka的優(yōu)勢在哪里?
我們現(xiàn)在經(jīng)常提到 Kafka 的時(shí)候就已經(jīng)默認(rèn)它是一個(gè)非常優(yōu)秀的消息隊(duì)列了,我們也會(huì)經(jīng)常拿它給 RocketMQ、RabbitMQ 對(duì)比。我覺得 Kafka 相比其他消息隊(duì)列主要的優(yōu)勢如下:
極致的性能 :基于 Scala 和 Java 語言開發(fā),設(shè)計(jì)中大量使用了批量處理和異步的思想,最高可以每秒處理千萬級(jí)別的消息。
生態(tài)系統(tǒng)兼容性無可匹敵 :Kafka 與周邊生態(tài)系統(tǒng)的兼容性是最好的沒有之一,尤其在大數(shù)據(jù)和流計(jì)算領(lǐng)域。
kafka的消息模型:發(fā)布-訂閱模型
主要區(qū)別傳統(tǒng)的queue模型中,消息只能被一個(gè)consumer消費(fèi)的問題。

發(fā)布訂閱模型(Pub-Sub)使用主題(Topic) 作為消息通信載體,類似于廣播模式;發(fā)布者發(fā)布一條消息,該消息通過主題傳遞給所有的訂閱者,在一條消息廣播之后才訂閱的用戶則是收不到該條消息的(不包括beging這種情況)。在發(fā)布 - 訂閱模型中,如果只有一個(gè)訂閱者,那它和隊(duì)列模型就基本是一樣的了。所以說,發(fā)布 - 訂閱模型在功能層面上是可以兼容隊(duì)列模型的。
kafka核心元件整理

producer:消息生產(chǎn)者
consumer:消息消費(fèi)者
broker(代理):可以看作是一個(gè)獨(dú)立的kafka實(shí)例。多個(gè)kafka broker組成一個(gè)kafka cluster。
topic(主題):Kafka 將生產(chǎn)者發(fā)布的消息發(fā)送到 Topic(主題) 中,需要這些消息的消費(fèi)者可以訂閱這些 Topic(主題),topic就是消息的分組。
partition(分區(qū)):Partition屬于Topic的一部分。一個(gè)Topic可以有多個(gè)Partition ,并且同一Topic下的Partition可以分布在不同的Broker上,這也就表明一個(gè)Topic可以橫跨多個(gè)Broker 。
Kafka 中的 Partition(分區(qū)) 實(shí)際上可以對(duì)應(yīng)成為消息隊(duì)列中的隊(duì)列queue。
kafka多副本機(jī)制
還有一點(diǎn)我覺得比較重要的是 Kafka 為分區(qū)(Partition)引入了多副本(Replica)機(jī)制。分區(qū)(Partition)中的多個(gè)副本之間會(huì)有一個(gè)叫做leader節(jié)點(diǎn),其他副本稱為follower。我們發(fā)送的消息會(huì)被發(fā)送到leader副本,然后follower 副本才能從 leader副本中拉取消息進(jìn)行同步。
生產(chǎn)者和消費(fèi)者只與 leader 副本交互。你可以理解為其他副本只是 leader 副本的拷貝,它們的存在只是為了保證消息存儲(chǔ)的安全性。當(dāng) leader 副本發(fā)生故障時(shí)會(huì)從 follower 中選舉出一個(gè) leader,但是 follower 中如果有和 leader 同步程度達(dá)不到要求的參加不了 leader 的競選。

多分區(qū)(Partition)以及多副本(Replica)機(jī)制有什么好處
Kafka 通過給特定 Topic 指定多個(gè) Partition, 而各個(gè) Partition 可以分布在不同的 Broker 上, 這樣便能提供比較好的并發(fā)能力(負(fù)載均衡)。
Partition 可以指定對(duì)應(yīng)的 Replica 數(shù), 這也極大地提高了消息存儲(chǔ)的安全性, 提高了容災(zāi)能力,不過也相應(yīng)的增加了所需要的存儲(chǔ)空間。
kafka能保證消息的順序性嗎?
Kafka 中 Partition(分區(qū))是真正保存消息的地方,我們發(fā)送的消息都被放在了這里。每次添加消息到 Partition(分區(qū)) 的時(shí)候都會(huì)采用尾加法,Kafka 只能為我們保證 Partition(分區(qū)) 中的消息有序,而不能保證 Topic(主題) 中的 Partition(分區(qū)) 的有序。

消息在被追加到 Partition(分區(qū))的時(shí)候都會(huì)分配一個(gè)特定的偏移量(offset)。Kafka 通過偏移量(offset)來保證消息在分區(qū)內(nèi)的順序性。所以,我們就有一種很簡單的保證消息消費(fèi)順序的方法:1 個(gè) Topic 只對(duì)應(yīng)一個(gè) Partition。這樣當(dāng)然可以解決問題,但是破壞了 Kafka 的設(shè)計(jì)初衷。
Kafka 中發(fā)送 1 條消息的時(shí)候,可以指定 topic, partition, key,data(數(shù)據(jù)) 4 個(gè)參數(shù)。如果在發(fā)送消息的時(shí)候指定了 Partition的話,所有消息都會(huì)被發(fā)送到指定的 Partition。并且,同一個(gè) key 的消息可以保證只發(fā)送到同一個(gè)partition,可以采用表/對(duì)象的 id 來作為key 。
總結(jié)一下,對(duì)于如何保證 Kafka 中消息消費(fèi)的順序,有了下面兩種方法:
1個(gè)Topic只對(duì)應(yīng)一個(gè) Partition。
發(fā)送消息的時(shí)候指定 key/Partition。(推薦)
當(dāng)然不僅僅只有上面兩種方法,比如如果producer單點(diǎn)固定,可以為每條消息綁定一個(gè)順序號(hào)等。需要根據(jù)具體場景適用。
如何保證消息安全不丟失
producer端消息丟失
生產(chǎn)者(Producer) 調(diào)用send方法發(fā)送消息之后,消息可能因?yàn)榫W(wǎng)絡(luò)問題并沒有發(fā)送過去。
所以,我們不能默認(rèn)在調(diào)用send方法發(fā)送消息之后消息消息發(fā)送成功了。為了確定消息是發(fā)送成功,我們要判斷消息發(fā)送的結(jié)果。但是要注意的是 Kafka 生產(chǎn)者(Producer) 使用 send 方法發(fā)送消息實(shí)際上是異步的操作,我們可以通過 get()方法獲取調(diào)用結(jié)果,但是這樣也讓它變?yōu)榱送讲僮鳎?/p>SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();if (sendResult.getRecordMetadata() != null) { logger.info("生產(chǎn)者成功發(fā)送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe sult.getProducerRecord().value().toString());}
但是一般不推薦這么做!可以采用為其添加回調(diào)函數(shù)的形式,示例代碼如下:
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);future.addCallback(result -> logger.info("生產(chǎn)者成功發(fā)送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),ex -> logger.error("生產(chǎn)者發(fā)送消失敗,原因:{}", ex.getMessage()));
如果消息發(fā)送失敗的話,我們檢查失敗的原因之后重新發(fā)送即可.
另外這里推薦為 Producer 的retries (重試次數(shù))設(shè)置一個(gè)比較合理的值,一般是 3 ,但是為了保證消息不丟失的話一般會(huì)設(shè)置比較大一點(diǎn)。設(shè)置完成之后,當(dāng)出現(xiàn)網(wǎng)絡(luò)問題之后能夠自動(dòng)重試消息發(fā)送,避免消息丟失。另外,建議還要設(shè)置重試間隔,因?yàn)殚g隔太小的話重試的效果就不明顯了,網(wǎng)絡(luò)波動(dòng)一次你3次一下子就重試完了.
Consumer端消息丟失
消息在被追加到 Partition(分區(qū))的時(shí)候都會(huì)分配一個(gè)特定的偏移量(offset)。偏移量(offset)表示 Consumer 當(dāng)前消費(fèi)到的 Partition(分區(qū))的所在的位置。Kafka 通過偏移量(offset)可以保證消息在分區(qū)內(nèi)的順序性。

短讀風(fēng)險(xiǎn):當(dāng)消費(fèi)者拉取到了分區(qū)的某個(gè)消息之后,消費(fèi)者會(huì)自動(dòng)提交了 offset。自動(dòng)提交的話會(huì)有一個(gè)問題,試想一下,當(dāng)消費(fèi)者剛拿到這個(gè)消息準(zhǔn)備進(jìn)行真正消費(fèi)的時(shí)候,突然掛掉了,消息實(shí)際上并沒有被消費(fèi),但是 offset 卻被自動(dòng)提交了。
長讀風(fēng)險(xiǎn):為了解決消息少消費(fèi)的問題,我們手動(dòng)關(guān)閉自動(dòng)提交 offset,每次在真正消費(fèi)完消息之后之后再自己手動(dòng)提交 offset 。 但是,這樣會(huì)帶來消息被重新消費(fèi)的問題。比如你剛剛消費(fèi)完消息之后,還沒提交 offset,結(jié)果自己掛掉了,那么這個(gè)消息理論上就會(huì)被消費(fèi)兩次。
集群故障導(dǎo)致消息丟失
由于partition中的消息都是通過leader節(jié)點(diǎn)復(fù)制到follower節(jié)點(diǎn)的,假如 leader 副本所在的 broker 突然掛掉,那么就要從 follower 副本重新選出一個(gè) leader ,但是 leader 的數(shù)據(jù)還有一些沒有被 follower 副本的同步的話,就會(huì)造成消息丟失。
解決辦法就是我們設(shè)置
acks = all
acks 是 Kafka 生產(chǎn)者(Producer) 很重要的一個(gè)參數(shù)。acks 的默認(rèn)值即為1,代表我們的消息被leader副本接收之后就算被成功發(fā)送。當(dāng)我們配置 acks = all 代表則所有副本都要接收到該消息之后該消息才算真正成功被發(fā)送。
replication.factor >= 3
為了保證 leader 副本能有 follower 副本能同步消息,我們一般會(huì)為 topic 設(shè)置 replication.factor >= 3。這樣就可以保證每個(gè) 分區(qū)(partition) 至少有 3 個(gè)副本。雖然造成了數(shù)據(jù)冗余,但是帶來了數(shù)據(jù)的安全性。
min.insync.replicas > 1
一般情況下我們還需要設(shè)置 min.insync.replicas> 1 ,這樣配置代表消息至少要被寫入到 2 個(gè)副本才算是被成功發(fā)送。min.insync.replicas的默認(rèn)值為 1 ,在實(shí)際生產(chǎn)中應(yīng)盡量避免默認(rèn)值 1。但是,為了保證整個(gè) Kafka 服務(wù)的高可用性,需要確保 replication.factor > min.insync.replicas 。主要是為了應(yīng)對(duì)只要是有一個(gè)副本掛掉,整個(gè)分區(qū)就無法正常工作的情況。這明顯違反高可用性!一般推薦設(shè)置成 replication.factor = min.insync.replicas + 1。
unclean.leader.election.enable = false
Kafka 0.11.0.0版本開始 unclean.leader.election.enable 參數(shù)的默認(rèn)值由原來的true 改為false
我們最開始也說了我們發(fā)送的消息會(huì)被發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進(jìn)行同步。多個(gè) follower副本之間的消息同步情況不一樣,當(dāng)我們配置了 unclean.leader.election.enable = false 的話,當(dāng) leader 副本發(fā)生故障時(shí)就不會(huì)從 follower 副本中和 leader 同步程度達(dá)不到要求的副本中選擇出 leader ,這樣降低了消息丟失的可能性。
總結(jié)
生產(chǎn)端 | 消費(fèi)端 | 集群配置 |
將異步發(fā)送改為同步發(fā)送send().get() | 通過自動(dòng)提交offset,存在短讀風(fēng)險(xiǎn) | acks=all,完成所有副本的寫 |
通過添加回調(diào)函數(shù)future=send() | 通過手動(dòng)提交offset,存在長讀風(fēng)險(xiǎn) | |
配置retries重試次數(shù) |
zookeeper在kafka中的作用
zookeeper為kafka提供了配置元數(shù)據(jù)的管理。

每個(gè)存儲(chǔ)節(jié)點(diǎn)的含義:

broker注冊
Broker是分布式部署并且相互之間相互獨(dú)立,但是需要有一個(gè)注冊系統(tǒng)能夠?qū)⒄麄€(gè)集群中的Broker管理起來,此時(shí)就使用到了Zookeeper。在Zookeeper上會(huì)有一個(gè)專門用來進(jìn)行Broker服務(wù)器列表記錄的節(jié)點(diǎn):
/brokers/ids
每個(gè)Broker在啟動(dòng)時(shí),都會(huì)到Zookeeper上進(jìn)行注冊,即到/brokers/ids下創(chuàng)建屬于自己的節(jié)點(diǎn),如/brokers/ids/[0...N]。Kafka使用了全局唯一的數(shù)字來指代每個(gè)Broker服務(wù)器,不同的Broker必須使用不同的Broker ID進(jìn)行注冊,創(chuàng)建完節(jié)點(diǎn)后,每個(gè)Broker就會(huì)將自己的IP地址和端口信息記錄到該節(jié)點(diǎn)中去。其中Broker創(chuàng)建的節(jié)點(diǎn)類型是臨時(shí)節(jié)點(diǎn),一旦Broker宕機(jī),則對(duì)應(yīng)的臨時(shí)節(jié)點(diǎn)也會(huì)被自動(dòng)刪除。
topic注冊
在Kafka中,同一個(gè)Topic的消息會(huì)被分成多個(gè)分區(qū)并將其分布在多個(gè)Broker上,這些分區(qū)信息及與Broker的對(duì)應(yīng)關(guān)系也都是由Zookeeper在維護(hù),由專門的節(jié)點(diǎn)來記錄,如:
/borkers/topics
Kafka中每個(gè)Topic都會(huì)以/brokers/topics/[topic]的形式被記錄,如/brokers/topics/login和/brokers/topics/search等。Broker服務(wù)器啟動(dòng)后,會(huì)到對(duì)應(yīng)Topic節(jié)點(diǎn)(/brokers/topics)上注冊自己的Broker ID并寫入針對(duì)該Topic的分區(qū)總數(shù),如/brokers/topics/login/3->2,這個(gè)節(jié)點(diǎn)表示Broker ID為3的一個(gè)Broker服務(wù)器,對(duì)于"login"這個(gè)Topic的消息,提供了2個(gè)分區(qū)進(jìn)行消息存儲(chǔ),同樣,這個(gè)分區(qū)節(jié)點(diǎn)也是臨時(shí)節(jié)點(diǎn)。
負(fù)載均衡
生產(chǎn)者負(fù)載均衡
由于同一個(gè)Topic消息會(huì)被分區(qū)并將其分布在多個(gè)Broker上,因此,生產(chǎn)者需要將消息合理地發(fā)送到這些分布式的Broker上,那么如何實(shí)現(xiàn)生產(chǎn)者的負(fù)載均衡,Kafka支持傳統(tǒng)的四層負(fù)載均衡,也支持Zookeeper方式實(shí)現(xiàn)負(fù)載均衡。
四層負(fù)載均衡
根據(jù)生產(chǎn)者的IP地址和端口來為其確定一個(gè)相關(guān)聯(lián)的Broker。通常,一個(gè)生產(chǎn)者只會(huì)對(duì)應(yīng)單個(gè)Broker,然后該生產(chǎn)者產(chǎn)生的消息都發(fā)往該Broker。這種方式邏輯簡單,每個(gè)生產(chǎn)者不需要同其他系統(tǒng)建立額外的TCP連接,只需要和Broker維護(hù)單個(gè)TCP連接即可。但是,其無法做到真正的負(fù)載均衡,因?yàn)閷?shí)際系統(tǒng)中的每個(gè)生產(chǎn)者產(chǎn)生的消息量及每個(gè)Broker的消息存儲(chǔ)量都是不一樣的,如果有些生產(chǎn)者產(chǎn)生的消息遠(yuǎn)多于其他生產(chǎn)者的話,那么會(huì)導(dǎo)致不同的Broker接收到的消息總數(shù)差異巨大,同時(shí),生產(chǎn)者也無法實(shí)時(shí)感知到Broker的新增和刪除。
使用Zookeeper進(jìn)行負(fù)載均衡
由于每個(gè)Broker啟動(dòng)時(shí),都會(huì)完成Broker注冊過程,生產(chǎn)者會(huì)通過該節(jié)點(diǎn)的變化來動(dòng)態(tài)地感知到Broker服務(wù)器列表的變更,這樣就可以實(shí)現(xiàn)動(dòng)態(tài)的負(fù)載均衡機(jī)制。
消費(fèi)者負(fù)載均衡
與生產(chǎn)者類似,Kafka中的消費(fèi)者同樣需要進(jìn)行負(fù)載均衡來實(shí)現(xiàn)多個(gè)消費(fèi)者合理地從對(duì)應(yīng)的Broker服務(wù)器上接收消息,每個(gè)消費(fèi)者分組包含若干消費(fèi)者,每條消息都只會(huì)發(fā)送給分組中的一個(gè)消費(fèi)者,不同的消費(fèi)者分組消費(fèi)自己特定的Topic下面的消息,互不干擾。
記錄partition與consumer之間的分組group關(guān)系
消費(fèi)組 (Consumer Group):consumer group 下有多個(gè) Consumer(消費(fèi)者)。
對(duì)于每個(gè)消費(fèi)者組 (Consumer Group),Kafka都會(huì)為其分配一個(gè)全局唯一的Group ID,Group 內(nèi)部的所有消費(fèi)者共享該 ID。訂閱的topic下的每個(gè)分區(qū)只能分配給某個(gè) group 下的一個(gè)consumer(當(dāng)然該分區(qū)還可以被分配給其他group)。同時(shí),Kafka為每個(gè)消費(fèi)者分配一個(gè)Consumer ID,通常用"Hostname:UUID"形式表示。
在Kafka中,規(guī)定了每個(gè)消息分區(qū)只能被同組的一個(gè)消費(fèi)者進(jìn)行消費(fèi),因此,需要在 Zookeeper 上記錄 消息分區(qū) 與 Consumer 之間的關(guān)系,每個(gè)消費(fèi)者一旦確定了對(duì)一個(gè)消息分區(qū)的消費(fèi)權(quán)力,需要將其Consumer ID 寫入到 Zookeeper 對(duì)應(yīng)消息分區(qū)的臨時(shí)節(jié)點(diǎn)上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]。其中,[broker_id-partition_id]就是一個(gè) 消息分區(qū) 的標(biāo)識(shí),節(jié)點(diǎn)內(nèi)容就是該 消息分區(qū) 上 消費(fèi)者的Consumer ID。
記錄消息偏移量offset
在消費(fèi)者對(duì)指定消息分區(qū)進(jìn)行消息消費(fèi)的過程中,需要定時(shí)地將分區(qū)消息的消費(fèi)進(jìn)度Offset記錄到Zookeeper上,以便在該消費(fèi)者進(jìn)行重啟或者其他消費(fèi)者重新接管該消息分區(qū)的消息消費(fèi)后,能夠從之前的進(jìn)度開始繼續(xù)進(jìn)行消息消費(fèi)。Offset在Zookeeper中由一個(gè)專門節(jié)點(diǎn)進(jìn)行記錄,其節(jié)點(diǎn)路徑為:
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id],節(jié)點(diǎn)內(nèi)容就是Offset的值。
消費(fèi)者注冊信息
消費(fèi)者服務(wù)器在初始化啟動(dòng)時(shí)加入消費(fèi)者分組的步驟如下
1、注冊到消費(fèi)者分組。每個(gè)消費(fèi)者服務(wù)器啟動(dòng)時(shí),都會(huì)到Zookeeper的指定節(jié)點(diǎn)下創(chuàng)建一個(gè)屬于自己的消費(fèi)者節(jié)點(diǎn),例如/consumers/[group_id]/ids/[consumer_id],完成節(jié)點(diǎn)創(chuàng)建后,消費(fèi)者就會(huì)將自己訂閱的Topic信息寫入該臨時(shí)節(jié)點(diǎn)。
2、對(duì) 消費(fèi)者分組 中的 消費(fèi)者 的變化注冊監(jiān)聽。每個(gè) 消費(fèi)者 都需要關(guān)注所屬 消費(fèi)者分組 中其他消費(fèi)者服務(wù)器的變化情況,即對(duì)/consumers/[group_id]/ids節(jié)點(diǎn)注冊子節(jié)點(diǎn)變化的Watcher監(jiān)聽,一旦發(fā)現(xiàn)消費(fèi)者新增或減少,就觸發(fā)消費(fèi)者的負(fù)載均衡。
3、對(duì)Broker服務(wù)器變化注冊監(jiān)聽。消費(fèi)者需要對(duì)/broker/ids/[0-N]中的節(jié)點(diǎn)進(jìn)行監(jiān)聽,如果發(fā)現(xiàn)Broker服務(wù)器列表發(fā)生變化,那么就根據(jù)具體情況來決定是否需要進(jìn)行消費(fèi)者負(fù)載均衡。
4、進(jìn)行消費(fèi)者負(fù)載均衡。為了讓同一個(gè)Topic下不同分區(qū)的消息盡量均衡地被多個(gè) 消費(fèi)者 消費(fèi)而進(jìn)行 消費(fèi)者 與 消息 分區(qū)分配的過程,通常,對(duì)于一個(gè)消費(fèi)者分組,如果組內(nèi)的消費(fèi)者服務(wù)器發(fā)生變更或Broker服務(wù)器發(fā)生變更,會(huì)發(fā)出消費(fèi)者負(fù)載均衡。
