Kafka Consumer 消費(fèi)消息和 Rebalance 機(jī)制
Kafka 有消費(fèi)組的概念,每個(gè)消費(fèi)者只能消費(fèi)所分配到的分區(qū)的消息,每一個(gè)分區(qū)只能被一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者所消費(fèi),所以同一個(gè)消費(fèi)組中消費(fèi)者的數(shù)量如果超過了分區(qū)的數(shù)量,將會(huì)出現(xiàn)有些消費(fèi)者分配不到消費(fèi)的分區(qū)。消費(fèi)組與消費(fèi)者關(guān)系如下圖所示:
consumer group
Kafka Consumer Client 消費(fèi)消息通常包含以下步驟:
- 配置客戶端,創(chuàng)建消費(fèi)者
- 訂閱主題
- 拉去消息并消費(fèi)
- 提交消費(fèi)位移
- 關(guān)閉消費(fèi)者實(shí)例
過程
因?yàn)?Kafka 的 Consumer 客戶端是線程不安全的,為了保證線程安全,并提升消費(fèi)性能,可以在 Consumer 端采用類似 Reactor 的線程模型來消費(fèi)數(shù)據(jù)。
消費(fèi)模型 Kafka consumer 參數(shù)
- bootstrap.servers:連接 broker 地址,
host:port格式。 - group.id:消費(fèi)者隸屬的消費(fèi)組。
- key.deserializer:與生產(chǎn)者的
key.serializer對(duì)應(yīng),key 的反序列化方式。 - value.deserializer:與生產(chǎn)者的
value.serializer對(duì)應(yīng),value 的反序列化方式。 - session.timeout.ms:coordinator 檢測(cè)失敗的時(shí)間。默認(rèn) 10s 該參數(shù)是 Consumer Group 主動(dòng)檢測(cè) (組內(nèi)成員 comsummer) 崩潰的時(shí)間間隔,類似于心跳過期時(shí)間。
- auto.offset.reset:該屬性指定了消費(fèi)者在讀取一個(gè)沒有偏移量后者偏移量無效(消費(fèi)者長時(shí)間失效當(dāng)前的偏移量已經(jīng)過時(shí)并且被刪除了)的分區(qū)的情況下,應(yīng)該作何處理,默認(rèn)值是 latest,也就是從最新記錄讀取數(shù)據(jù)(消費(fèi)者啟動(dòng)之后生成的記錄),另一個(gè)值是 earliest,意思是在偏移量無效的情況下,消費(fèi)者從起始位置開始讀取數(shù)據(jù)。
- enable.auto.commit:否自動(dòng)提交位移,如果為
false,則需要在程序中手動(dòng)提交位移。對(duì)于精確到一次的語義,最好手動(dòng)提交位移 - fetch.max.bytes:單次拉取數(shù)據(jù)的最大字節(jié)數(shù)量
- max.poll.records:單次 poll 調(diào)用返回的最大消息數(shù),如果處理邏輯很輕量,可以適當(dāng)提高該值。但是
max.poll.records條數(shù)據(jù)需要在在 session.timeout.ms 這個(gè)時(shí)間內(nèi)處理完 。默認(rèn)值為 500 - request.timeout.ms:一次請(qǐng)求響應(yīng)的最長等待時(shí)間。如果在超時(shí)時(shí)間內(nèi)未得到響應(yīng),kafka 要么重發(fā)這條消息,要么超過重試次數(shù)的情況下直接置為失敗。
rebalance 本質(zhì)上是一種協(xié)議,規(guī)定了一個(gè) consumer group 下的所有 consumer 如何達(dá)成一致來分配訂閱 topic 的每個(gè)分區(qū)。比如某個(gè) group 下有 20 個(gè) consumer,它訂閱了一個(gè)具有 100 個(gè)分區(qū)的 topic。正常情況下,Kafka 平均會(huì)為每個(gè) consumer 分配 5 個(gè)分區(qū)。這個(gè)分配的過程就叫 rebalance。
什么時(shí)候 rebalance?
這也是經(jīng)常被提及的一個(gè)問題。rebalance 的觸發(fā)條件有三種:
- 組成員發(fā)生變更(新 consumer 加入組、已有 consumer 主動(dòng)離開組或已有 consumer 崩潰了——這兩者的區(qū)別后面會(huì)談到)
- 訂閱主題數(shù)發(fā)生變更
- 訂閱主題的分區(qū)數(shù)發(fā)生變更
如何進(jìn)行組內(nèi)分區(qū)分配?
Kafka 默認(rèn)提供了兩種分配策略:Range 和 Round-Robin。當(dāng)然 Kafka 采用了可插拔式的分配策略,你可以創(chuàng)建自己的分配器以實(shí)現(xiàn)不同的分配策略。
kafka 高頻面試題- Kafka 有哪些命令行工具?你用過哪些?
/bin目錄,管理 kafka 集群、管理 topic、生產(chǎn)和消費(fèi) kafka - Kafka Producer 的執(zhí)行過程?攔截器,序列化器,分區(qū)器和累加器
- Kafka Producer 有哪些常見配置?broker 配置,ack 配置,網(wǎng)絡(luò)和發(fā)送參數(shù),壓縮參數(shù),ack 參數(shù)
- 如何讓 Kafka 的消息有序?Kafka 在 Topic 級(jí)別本身是無序的,只有 partition 上才有序,所以為了保證處理順序,可以自定義分區(qū)器,將需順序處理的數(shù)據(jù)發(fā)送到同一個(gè) partition
- Producer 如何保證數(shù)據(jù)發(fā)送不丟失?ack 機(jī)制,重試機(jī)制
- 如何提升 Producer 的性能?批量,異步,壓縮
- 如果同一 group 下 consumer 的數(shù)量大于 part 的數(shù)量,kafka 如何處理?多余的 Part 將處于無用狀態(tài),不消費(fèi)數(shù)據(jù)
- Kafka Consumer 是否是線程安全的?不安全,單線程消費(fèi),多線程處理
- 講一下你使用 Kafka Consumer 消費(fèi)消息時(shí)的線程模型,為何如此設(shè)計(jì)?拉取和處理分離
- Kafka Consumer 的常見配置?broker, 網(wǎng)絡(luò)和拉取參數(shù),心跳參數(shù)
- Consumer 什么時(shí)候會(huì)被踢出集群?奔潰,網(wǎng)絡(luò)異常,處理時(shí)間過長提交位移超時(shí)
- 當(dāng)有 Consumer 加入或退出時(shí),Kafka 會(huì)作何反應(yīng)?進(jìn)行 Rebalance
- 什么是 Rebalance,何時(shí)會(huì)發(fā)生 Rebalance?topic 變化,consumer 變化
往期推薦
Kafka Producer 發(fā)送消息至 Broker 原理和高性能必備參數(shù)設(shè)置
點(diǎn)個(gè)在看你最好看
