Kafka組消費(fèi)之Rebalance機(jī)制
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

《Kafka重要知識點(diǎn)之消費(fèi)組概念》講到了kafka的消費(fèi)組相關(guān)的概念,消費(fèi)組有多個消費(fèi)者,消費(fèi)組在消費(fèi)一個Topic的時候,kafka為了保證消息消費(fèi)不重不漏,kafka將每個partition唯一性地分配給了消費(fèi)者。但是如果某個消費(fèi)組在消費(fèi)的途中有消費(fèi)者宕機(jī)或者有新的消費(fèi)者加入的時候那么partition分配就是不公平的,可能導(dǎo)致某些消費(fèi)者負(fù)載特別重,某些消費(fèi)者又沒有負(fù)載的情況。Kafka有一種專門的機(jī)制處理這種情況,這種機(jī)制稱為Rebalance機(jī)制。
當(dāng)kafka遇到如下四種情況的時候,kafka會觸發(fā)Rebalance機(jī)制:
消費(fèi)組成員發(fā)生了變更,比如有新的消費(fèi)者加入了消費(fèi)組組或者有消費(fèi)者宕機(jī)
消費(fèi)者無法在指定的時間之內(nèi)完成消息的消費(fèi)
消費(fèi)組訂閱的Topic發(fā)生了變化
訂閱的Topic的partition發(fā)生了變化
1. 消費(fèi)超時實(shí)踐
筆者針對上文的第二個原因筆者有如下兩個疑問
消費(fèi)者默認(rèn)消費(fèi)超時的時間是多少
消息消費(fèi)超時的時候會發(fā)生什么
于是筆者在Test-Group分組下創(chuàng)建了8個消費(fèi)者線程,提交消息改為手動提交,并且消費(fèi)完成一批消息后,讓筆者讓消費(fèi)線程睡眠15秒
代碼如下
public void consume() {
try {
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : records) {
System.out.printf("id = %d , partition = %d , offset = %d, key = %s, value = %s%n", id, record.partition(), record.offset(), record.key(), record.value());
}
try {
TimeUnit.SECONDS.sleep(15);
} catch (InterruptedException e) {
e.printStackTrace();
}
//手動提交offset
consumer.commitSync();
}
} finally {
consumer.close();
}
} 多消費(fèi)者運(yùn)行代碼如下
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 8; i++) {
final int id = i;
new Thread() {
@Override
public void run() {
new ReblanceConsumer(id).consume();
}
}.start();
}
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
運(yùn)行過程中,消費(fèi)者拋出了如下消費(fèi)者消費(fèi)異常
[Consumer clientId=client-5, groupId=Test-Group] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
在手動提交offset的時候拋出了如下異常
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
在這一節(jié),筆者只介紹第一個異常(第二個異常筆者將在Generation機(jī)制中介紹),拋出第一個異常的原因是消費(fèi)超時,導(dǎo)致消費(fèi)線程長時間無法向Coordinator節(jié)點(diǎn)發(fā)送心跳,Coordinator節(jié)點(diǎn)以為Consumer已經(jīng)宕機(jī),Coordinator于是將Consumer節(jié)點(diǎn)從消費(fèi)組中剔除,并觸發(fā)了Rebalance機(jī)制。其實(shí)這和Consumer的心跳發(fā)送機(jī)制也有關(guān)系。在大多數(shù)中間件的設(shè)計中都會分離業(yè)務(wù)線程和心跳發(fā)送線程,目的就是避免業(yè)務(wù)線程長時間執(zhí)行業(yè)務(wù)流程,導(dǎo)致長時間無法發(fā)送心跳。但是kafka卻沒有這樣做,kafka的目的可能是為了實(shí)現(xiàn)簡單。如果消費(fèi)者消費(fèi)業(yè)務(wù)確實(shí)需要非常長時間,我們可以通過參數(shù)max. poll. interval. ms配置,它代表消費(fèi)兩次poll最大的時間間隔,比如將其配置成60s
props.put("max.poll.interval.ms", "60000");或者我們可以減少consumer每次從broker拉取的數(shù)據(jù)量,consumer默認(rèn)拉取500條,我們可以將其修改了50條
props.put("max.poll.records", "50");Kafka在后續(xù)的新版本中修正了Consumer的心跳發(fā)送機(jī)制,將心跳發(fā)送的任務(wù)交給了專門的HeartbeatThread。那么max.poll.interval.ms參數(shù)還有意義么?該參數(shù)其實(shí)還是有意義,因?yàn)榧词剐奶l(fā)送正常,那也只能證明Consumer是存活狀態(tài),但是Consumer可能處于假死狀態(tài),比如Consumer遇到了死鎖導(dǎo)致長時間等待超過了poll設(shè)定的時間間隔max.poll.interval.ms。
在這一節(jié),筆者熟悉了會觸發(fā)kafka Rebalance機(jī)制的第二種情況以及應(yīng)對措施,接下來,筆者將深入介紹kafka的重平衡機(jī)制
2. Coordinator
在介紹Rebalance機(jī)制之前,筆者想先介紹一下Coordinator,它是Rebalance機(jī)制中非常重要的一個角色。每個消費(fèi)組都會有一個coordinator,Coordinator負(fù)責(zé)處理管理組內(nèi)的消費(fèi)者和位移管理,Coordinator并不負(fù)責(zé)消費(fèi)組內(nèi)的partition分配。消費(fèi)者通過心跳的方式告知Coordinator自己仍然處于存活狀態(tài),Coordinator以session. timeout. ms參數(shù)的頻率檢測消費(fèi)組group內(nèi)消費(fèi)者存活情況,該參數(shù)的默認(rèn)值是10s,如果該值太大,那么coordinator需要非常長時間才能檢測到消費(fèi)者宕機(jī)
選舉機(jī)制
如果kafka集群有多個broker節(jié)點(diǎn),消費(fèi)組會選擇哪個partition節(jié)點(diǎn)作為Coordinator節(jié)點(diǎn)呢?它會通過如下公式,其中的50代表著kafka內(nèi)部主題consumer offset的分區(qū)總數(shù)
Math.abs(hash(groupID)) % 50那么當(dāng)前Consumer Group的Coordinator就是上述公式計算出的partition的leader partition
3. Rebalance流程
Coordinator發(fā)生Rebalance的時候,Coordinator并不會主動通知組內(nèi)的所有Consumer重新加入組,而是當(dāng)Consumer向Coordinator發(fā)送心跳的時候,Coordinator將Rebalance的狀況通過心跳響應(yīng)告知Consumer。Rebalance機(jī)制整體可以分為兩個步驟,一個是Joining the Group,另外一個是分配Synchronizing Group State
3.1 Joining the Group
在當(dāng)前這個步驟中,所有的消費(fèi)者會和Coordinator交互,請求Coordinator加入當(dāng)前消費(fèi)組。Coordinator會從所有的消費(fèi)者中選擇一個消費(fèi)者作為leader consumer, 選擇的算法是隨機(jī)選擇
3.2 Synchronizing Group State
leader Consumer從Coordinator獲取所有的消費(fèi)者的信息,并將消費(fèi)組訂閱的partition分配結(jié)果封裝為SyncGroup請求,需要注意的是leader Consumer不會直接與組內(nèi)其它的消費(fèi)者交互,leader Consumer會將SyncGroup發(fā)送給Coordinator,Coordinator再將分配結(jié)果發(fā)送給各個Consumer。分配partition有如下3種策略RangeAssignor,RoundRobinAssignor,StickyAssignor,關(guān)于這三種分配方案更詳細(xì)的資料請看上一篇文章
如果leader consumer因?yàn)橐恍┨厥庠驅(qū)е路峙浞謪^(qū)失敗(Coordinator通過超時的方式檢測),那么Coordinator會重新要求所有的Consumer重新進(jìn)行步驟Joining the Group狀態(tài)
4. Coordinator生命周期
為了更好的了解Coordinator的職責(zé)以及Rebalance機(jī)制,筆者詳細(xì)介紹一下Coordinator的生命周期
Coordinator生命周期中總共有5種狀態(tài),Down,Initialize,Stable,Joining,AwaitingSync
Down:Coordinator不會維護(hù)任何消費(fèi)組狀態(tài)
Initialize:Coordinator處于初始化狀態(tài),Coordinator從Zookeeper中讀取相關(guān)的消費(fèi)組數(shù)據(jù),這個時候Coordinator對接受到消費(fèi)者心跳或者加入組的請求都會返回錯誤
Stable:Coordinator處理消費(fèi)者心跳請求,但是還未開始初始化generation,Coordinator正在等待消費(fèi)者加入組的請求
Joining:Coordinator正在處理組內(nèi)成員加入組的請求
AwaitingSync:等待leader consumer分配分區(qū),并將分區(qū)分配結(jié)果發(fā)送給各個Consumer
這五個狀態(tài)相互轉(zhuǎn)換流程圖示如下,其中的重點(diǎn)用紅框標(biāo)出,它們對應(yīng)著Rebalance的流程步驟

5. Generation機(jī)制
在上文中提到消費(fèi)者消費(fèi)消息超時之后,如果再次嘗試提交offset,就會出現(xiàn)如下的異常
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
出現(xiàn)該異常的原因是Coordinator消費(fèi)組的保護(hù)機(jī)制。上文提到如果消費(fèi)者消費(fèi)超時,筆者稱其為TimeoutConsumer,那么TimeoutConsumer就會被Coordinator從消費(fèi)組中剔除,Coordinator就會進(jìn)行Rebalance,將當(dāng)前消費(fèi)者負(fù)責(zé)的partition重新分配給其它的消費(fèi)者,如果TimeoutConsumer完成了消息的消費(fèi),假設(shè)TimeoutConsumer成功提交partition的offset,那么就會出現(xiàn)混亂,因?yàn)門imeoutConsumer負(fù)責(zé)的partition已經(jīng)被分配給了其它的消費(fèi)者。Generation(代際)機(jī)制就是上述的保護(hù)機(jī)制。
Coordinator每進(jìn)行一次Rebalance,就會為當(dāng)前的Rebalance設(shè)置一個Generation標(biāo)記,比如說第一次Rebalance標(biāo)記是1,如果再次Rebalance,該標(biāo)記就會成為2,消費(fèi)者在提交offset的時候會將generation一同提交,Coordinator在發(fā)現(xiàn)TimeoutConsumer的標(biāo)記已經(jīng)超時的情況下會拒絕消費(fèi)者提交generation標(biāo)記。
Generation的機(jī)制可能會導(dǎo)致上一代際消費(fèi)者和當(dāng)前代際消費(fèi)者消費(fèi)相同的消息,所以消費(fèi)者在消費(fèi)消息的時候需要實(shí)現(xiàn)消息消費(fèi)的冪等性,關(guān)于冪等性消費(fèi)的問題筆者將會寫一瓶文章詳細(xì)介紹。
6. Leader Consumer
上文提到Leader Consumer是Coordinator在Joining the Group步驟的時候隨機(jī)選擇的,Leader Consumer負(fù)責(zé)組內(nèi)各個Consumer的partition分配,除此之外Leader Consumer還負(fù)責(zé)整個消費(fèi)組訂閱的主題的監(jiān)控,Leader Consumer會定期更新消費(fèi)組訂閱的主題信息,一旦發(fā)現(xiàn)主題信息發(fā)生了變化,Leader Consumer會通知Coordinator觸發(fā)Rebalance機(jī)制。

版權(quán)聲明:
文章不錯?點(diǎn)個【在看】吧!??




