Kafka又出問(wèn)題了!

01
寫(xiě)在前面
02
問(wèn)題重現(xiàn)
2021-02-28 22:03:05 131 pool-7-thread-3 ERROR [] -
commit failed
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na]
at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
03
分析問(wèn)題
既然Kafka觸發(fā)了Rebalance機(jī)制,那我就來(lái)說(shuō)說(shuō)Kafka觸發(fā)Rebalance的時(shí)機(jī)。
什么是Rebalance
觸發(fā)Rebalance的時(shí)機(jī)
組內(nèi)成員的個(gè)數(shù)發(fā)生了變化,比如有新的消費(fèi)者加入消費(fèi)組,或者離開(kāi)消費(fèi)組。組成員離開(kāi)消費(fèi)組包含組成員崩潰或者主動(dòng)離開(kāi)消費(fèi)組。 訂閱的主題個(gè)數(shù)發(fā)生了變化。 訂閱的主題分區(qū)數(shù)發(fā)生了變化。
session.timeout.ms 進(jìn)行配置。默認(rèn)值是 10 秒。heartbeat.interval.ms。這個(gè)值設(shè)置得越小,Consumer 實(shí)例發(fā)送心跳請(qǐng)求的頻率就越高。頻繁地發(fā)送心跳請(qǐng)求會(huì)額外消耗帶寬資源,但好處是能夠更加快速地知曉當(dāng)前是否開(kāi)啟 Rebalance,因?yàn)?,目? Coordinator 通知各個(gè) Consumer 實(shí)例開(kāi)啟 Rebalance 的方法,就是將 REBALANCE_NEEDED 標(biāo)志封裝進(jìn)心跳請(qǐng)求的響應(yīng)體中。max.poll.interval.ms 參數(shù)。它限定了 Consumer 端應(yīng)用程序兩次調(diào)用 poll 方法的最大時(shí)間間隔。它的默認(rèn)值是 5 分鐘,表示 Consumer 程序如果在 5 分鐘之內(nèi)無(wú)法消費(fèi)完 poll 方法返回的消息,那么 Consumer 會(huì)主動(dòng)發(fā)起 “離開(kāi)組” 的請(qǐng)求,Coordinator 也會(huì)開(kāi)啟新一輪 Rebalance。設(shè)置 session.timeout.ms = 6s。 設(shè)置 heartbeat.interval.ms = 2s。 要保證 Consumer 實(shí)例在被判定為 “dead” 之前,能夠發(fā)送至少 3 輪的心跳請(qǐng)求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
max.poll.interval.ms 參數(shù)值的設(shè)置顯得尤為關(guān)鍵。如果要避免非預(yù)期的 Rebalance,最好將該參數(shù)值設(shè)置得大一點(diǎn),比下游最大處理時(shí)間稍長(zhǎng)一點(diǎn)。拉取偏移量與提交偏移量
kafka的偏移量(offset)是由消費(fèi)者進(jìn)行管理的,偏移量有兩種,拉取偏移量(position)與提交偏移量(committed)。拉取偏移量代表當(dāng)前消費(fèi)者分區(qū)消費(fèi)進(jìn)度。每次消息消費(fèi)后,需要提交偏移量。在提交偏移量時(shí),kafka會(huì)使用拉取偏移量的值作為分區(qū)的提交偏移量發(fā)送給協(xié)調(diào)者。異常日志提示的方案
max.poll.interval.ms 時(shí)長(zhǎng)和 session.timeout.ms時(shí)長(zhǎng),減少 max.poll.records的配置值,并且消費(fèi)端在處理完消息時(shí)要及時(shí)提交偏移量。04
問(wèn)題解決
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory")
public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){
logger.info("topic is {}, offset is {}, value is {} n", record.topic(), record.offset(), record.value());
try {
Object value = record.value();
logger.info(value.toString());
ack.acknowledge();
} catch (Exception e) {
logger.error("日志消費(fèi)端異常: {}", e);
}
}
嘗試解決
spring:
kafka:
consumer:
properties:
max.poll.interval.ms: 3600000
max.poll.records: 50
session.timeout.ms: 60000
heartbeat.interval.ms: 3000
最終解決
session.timeout.ms: 60000,根本不起作用,還是拋出Rebalance異常。@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory")
public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer-logs", containerFactory = "kafkaListenerContainerFactory")
public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){
一鍵三連「分享」、「點(diǎn)贊」和「在看」
技術(shù)干貨與你天天見(jiàn)~
評(píng)論
圖片
表情
