Kafka Consumer Rebalance詳解
文章目錄
- Kafka版本
- rebalance
- rebalance策略
- rebalance generation
- rebalance協(xié)議
- rebalance流程
- rebalance監(jiān)聽器
1. Kafka版本
kafka版本1.1.1,可能絕大部分也適用于kafka 0.10.x及以上版本。
2. rebalance
-
ConsumerGroup(消費(fèi)組)里的Consumer(消費(fèi)者)共同讀取topic(主題)的partition(分區(qū)),一個(gè)新的Consumer(消費(fèi)者)加入ConsumerGroup(消費(fèi)組)時(shí),讀取的是原本由其他Consumer(消費(fèi)者)讀取的消息。當(dāng)一個(gè)Consumer(消費(fèi)者)被關(guān)閉或發(fā)生奔潰時(shí),它就離開ConsumerGroup(消費(fèi)組),原本由它讀取的分區(qū)將有ConsumerGroup(消費(fèi)組)的其他Consumer(消費(fèi)者)來讀取。在topic發(fā)生變化時(shí)(比如添加了新的分區(qū)),會(huì)發(fā)生Partition重分配,Partition的所有權(quán)從一個(gè)Consumer(消費(fèi)者)轉(zhuǎn)移到另一個(gè)Consumer(消費(fèi)者)的行為被稱為rebalance(再均衡)。rebalance(再均衡)本質(zhì)上是一種協(xié)議,規(guī)定了ConsumerGroup(消費(fèi)組)中所有Consumer(消費(fèi)者)如何達(dá)成一致來消費(fèi)topic(主題)下的partition(分區(qū)) -
rebalance(再均衡)為ConsumerGroup(消費(fèi)組)帶來了高可用性和伸縮性(可以安全的添加或移除消費(fèi)者),在rebalance(再均衡)期間,Consumer(消費(fèi)者)無法讀取消息,造成整個(gè)Consumer(消費(fèi)者)一段時(shí)間的不可用 -
Consumer(消費(fèi)者)通過向GroupCoordinator(群組協(xié)調(diào)器)(不同的ConsumerGroup(消費(fèi)組)可以有不同的)發(fā)送心跳來維持它們與群組的從屬關(guān)系以及它們對(duì)分區(qū)的所有權(quán)關(guān)系。Consumer(消費(fèi)者)會(huì)在輪詢消息或者提交偏移量時(shí)發(fā)送心跳(kafka0.10.1之前的版本),在kafka0.10.1版本里,心跳線程是獨(dú)立的 -
分配分區(qū)的過程
-
Consumer(消費(fèi)者)加入ConsumerGroup(消費(fèi)組)時(shí),會(huì)向GroupCoordinator(群組協(xié)調(diào)器)發(fā)送一個(gè)JoinGroup請(qǐng)求,第一個(gè)加入群組的Consumer(消費(fèi)者)將會(huì)成為群主,群主從GroupCoordinator(群組協(xié)調(diào)器)獲得ConsumerGroup(消費(fèi)組)的成員列表(此列表包含所有最新正常發(fā)送心跳的活躍的Consumer(消費(fèi)者)),并負(fù)責(zé)給每一個(gè)Consumer(消費(fèi)者)分配分區(qū)(PartitionAssignor的實(shí)現(xiàn)類來決定哪個(gè)分區(qū)被分配給哪個(gè)Consumer(消費(fèi)者)) -
群主把分配情況列表發(fā)送給
GroupCoordinator(群組協(xié)調(diào)器),GroupCoordinator(群組協(xié)調(diào)器)再把這些信息發(fā)送給ConsumerGroup(消費(fèi)組)里所有的Consumer(消費(fèi)者)。每個(gè)Consumer(消費(fèi)者)只能看到自己的分配信息,只有群主知道ConsumerGroup(消費(fèi)組)里所有消費(fèi)者的分配信息。
rebalance(再均衡)觸發(fā)條件
-
ConsumerGroup(消費(fèi)組)里的Consumer(消費(fèi)者)發(fā)生變更(主動(dòng)加入、主動(dòng)離開、崩潰),崩潰不一定就是指 consumer進(jìn)程"掛掉"、 consumer進(jìn)程所在的機(jī)器宕機(jī)、長時(shí)間GC、網(wǎng)絡(luò)延遲,當(dāng) consumer無法在指定的時(shí)間內(nèi)完成消息的處理,那么coordinator就認(rèn)為該 consumer已經(jīng)崩潰,從而引發(fā)新一輪 rebalance -
訂閱
topic(主題)的數(shù)量發(fā)生變更(比如使用正則表達(dá)式的方式訂閱),當(dāng)匹配正則表達(dá)式的新topic被創(chuàng)建時(shí)則會(huì)觸發(fā) rebalance。 -
訂閱
topic(主題)的partition(分區(qū))數(shù)量發(fā)生變更,比如使用命令行腳本增加了訂閱 topic 的分區(qū)數(shù)
3. rebalance策略
-
分配策略:決定訂閱topic的每個(gè)分區(qū)會(huì)被分配給哪個(gè)consumer。默認(rèn)提供了 3 種分配策略,分別是 range 策略、round-robin策略和 sticky策略,可以通過
partition.assignment.strategy參數(shù)指定。kafka1.1.x默認(rèn)使用range策略 -
range策略:將單個(gè) topic 的所有分區(qū)按照順序排列,然后把這些分區(qū)劃分成固定大小的分區(qū)段并依次分配給每個(gè) consumer。假設(shè)有ConsumerA和ConsumerB分別處理三個(gè)分區(qū)的數(shù)據(jù),當(dāng)ConsumerC加入時(shí),觸發(fā)rebalance后,ConsumerA、ConsumerB、ConsumerC每個(gè)都處理2個(gè)分區(qū)的數(shù)據(jù)。
-
round-robin 策略:把所有 topic 的所有分區(qū)順序擺開,然后輪詢式地分配給各個(gè) consumer
-
sticky策略(0.11.0.0后引入):有效地避免了上述兩種策略完全無視歷史分配方案的缺陷。采用了"有黏性"的策略對(duì)所有 consumer 實(shí)例進(jìn)行分配,可以規(guī)避極端情況下的數(shù)據(jù)傾斜并且在兩次 rebalance間最大限度地維持了之前的分配方案
4. rebalance generation
-
rebalance generation 用于標(biāo)識(shí)某次 rebalance,在 consumer中它是一個(gè)整數(shù),通常從 0開始
-
consumer generation主要是防止無效 offset提交。比如上一屆的 consumer成員由于某些原因延遲提交了 offset(已經(jīng)被踢出group),但 rebalance 之后該 group 產(chǎn)生了新一屆的 group成員,而延遲的offset提交攜帶的是舊的 generation信息,因此這次提交會(huì)被 consumer group拒絕,使用 consumer時(shí)經(jīng)常碰到的 ILLEGAL_GENERATION異常就是這個(gè)原因?qū)е碌?/p>
-
每個(gè) group進(jìn)行 rebalance之后, generation號(hào)都會(huì)加 1,表示 group進(jìn)入了 一個(gè)新的版本
5. rebalance協(xié)議
-
rebalance本質(zhì)是一組協(xié)議
-
JoinGroup: consumer請(qǐng)求加入組 -
SyncGroup:group leader把分配方案同步更新到組內(nèi)所有成員 -
Heartbeat:consumer定期向coordinator發(fā)送心跳表示自己存活 -
LeaveGroup:consumer主動(dòng)通知coordinator該consumer即將離組 -
DescribeGroup:查看組的所有信息,包括成員信息、協(xié)議信息、分配方案以及訂閱信息。該請(qǐng)求類型主要供管理員使用 。coordinator不使用該請(qǐng)求執(zhí)行 rebalance
在 rebalance過程中,coordinator主要處理 consumer發(fā)過來的JoinGroup和 SyncGroup請(qǐng)求
當(dāng) consumer主動(dòng)離組時(shí)會(huì)發(fā)送LeaveGroup請(qǐng)求給 coordinator
在成功 rebalance之后,組內(nèi)所有 consumer都需要定期地向 coordinator發(fā)送Heartbeat請(qǐng)求。每個(gè) consumer 也是根據(jù)Heartbeat請(qǐng)求的響應(yīng)中是否包含 REBALANCE_IN_PROGRESS 來判斷當(dāng)前group是否開啟了新一輪 rebalance
6. rebalance流程
- 第一步確認(rèn)coordinator所在的broker,并建立socket連接
-
計(jì)算Math.abs(groupId.hashcode)%offset.topic.num.partitions(默認(rèn)50),假設(shè)是10
-
查找
__consumer_offsets分區(qū)10的leader副本所在的broker,該broker即為這個(gè)consumer group的coordinator
- 第二步執(zhí)行rebalance,rebalance分為兩步
-
加入組
-
組內(nèi)所有consumer向coordinator發(fā)送
JoinGroup請(qǐng)求 - coordinator選擇一個(gè)consumer擔(dān)任組的leader,并把所有成員信息以及訂閱信息發(fā)送給leader
-
組內(nèi)所有consumer向coordinator發(fā)送
-
同步更新方案
- leader根據(jù)rebalance的分配策略為 group中所有成員制定分配方案,決定每個(gè) consumer都負(fù)責(zé)哪些 topic 的哪些分區(qū)。
-
分配完成后,leader通過
SyncGroup請(qǐng)求將分配方案發(fā)送給coordinator。組內(nèi)所有的consumer成員也會(huì)發(fā)送SyncGroup給coordinator -
coordinator把每個(gè)consumer的方案抽取出來作為
SyncGroup請(qǐng)求的response返回給各自的consumer
-
為什么consumer group的分配方案在consumer端執(zhí)行?
- 這樣做可以有更好的靈活性。比如同一個(gè)機(jī)架上的分區(qū)數(shù)據(jù)被分配給相同機(jī)架上的 consumer減少網(wǎng)絡(luò)傳輸?shù)拈_銷。即使以后分區(qū)策略發(fā)生了變更,也只需要重啟 consumer 應(yīng)用即可,不必重啟 Kafka服務(wù)器
加入組流程

- 同步分配方案

6. kafka為consumer group定義了5種狀態(tài)
-
Empty:表示group下沒有任何激活的consumer,但可能包含offset信息。- 每個(gè)group創(chuàng)建時(shí)處于Empty狀態(tài)
- 所有consumer都離開group時(shí)處于Empty狀態(tài)
- 由于可能包含offset信息,所以此狀態(tài)下的group可以響應(yīng) OffsetFetch請(qǐng)求,即返回 clients端對(duì)應(yīng)的位移信息
-
PreparingRebalance:表示group 正在準(zhǔn)備進(jìn)行 group rebalance。此狀態(tài)表示group已經(jīng)接收到部分JoinGroup的請(qǐng)求,同時(shí)在等待其他成員發(fā)送JoinGroup請(qǐng)求,知道所有成員都成功加入組或者超時(shí)(Kafka 0.10.1.0之后超時(shí)時(shí)間由consumer端參數(shù)max.poll.interval.ms指定)- 該狀態(tài)下的 group 依然可能保存有offset信息,因此 clients 依然可以發(fā)起 OffsetFetch 請(qǐng)求去獲取offset,甚 至還可以發(fā)起OffsetCommit請(qǐng)求去提交位移
-
AwaitingSync:所有成員都已經(jīng)加入組并等待 leader consumer 發(fā)送分區(qū)分配方案。同樣地,此時(shí)依然可以獲取位移,但若提交位移, coordinator 將會(huì)拋出REBALANCE_IN_PROGRESS異常來表明該 group 正在進(jìn)行 rebalance -
Stable:表明 group 開始正常消費(fèi) 。此時(shí) group 必須響應(yīng) clients 發(fā)送過來的任何請(qǐng)求,比如位移提交請(qǐng)求、位移獲取請(qǐng)求、心跳請(qǐng)求等 -
Dead: 表明 group 已經(jīng)徹底廢棄, group 內(nèi)沒有任何激活consumer并且 group 的所有元數(shù)據(jù)信息都己被刪除。處于此狀態(tài)的 group 不會(huì)響應(yīng)任何請(qǐng)求。嚴(yán)格來說, coordinator會(huì)返回UNKNOWN_MEMBER_ID異常
7. rebalance監(jiān)聽器
-
如果要實(shí)現(xiàn)將offset存儲(chǔ)在外部存儲(chǔ)中,需要使用rebalance。使用 rebalance 監(jiān)聽器的前提是必須使用 consumer group。如果使用的是獨(dú)立 consumer或是直接手動(dòng)分配分區(qū),那么 rebalance監(jiān)聽器是無效的
-
rebalance 監(jiān)聽器最常見的用法就是手動(dòng)提交位移到第三方存儲(chǔ)以及在 rebalance 前后執(zhí)行一些必要的審計(jì)操作
-
自動(dòng)提交位移是不需要在 rebalance監(jiān)聽器中再提交位移的,consumer 每次 rebalance 時(shí)會(huì)檢查用戶是否啟用了自動(dòng)提交位移,如果是,它會(huì)幫用戶執(zhí)行提交
-
鑒于 consumer 通常都要求 rebalance 在很短的時(shí)間內(nèi)完成,用戶千萬不要在 rebalance 監(jiān)聽器 的兩個(gè)方法中放入執(zhí)行時(shí)間很長的邏輯,特別是一些長時(shí)間阻塞方法
-
代碼案例
public?class?ConsumerOffsetSaveDB?{
????private?final?static?Logger?logger?=?LoggerFactory.getLogger("kafka-consumer");
????@Test
????public?void?testConsumerOffsetSaveDB()?{
????????Properties?props?=?new?Properties();
????????props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,?"kafka-master:9092,kafka-slave1:9093,kafka-slave2:9094");
????????props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,?10);
????????props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,?"false");
????????props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,?"1000");
????????props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,?"30000");
????????props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,?"org.apache.kafka.common.serialization.StringDeserializer");
????????props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,?"org.apache.kafka.common.serialization.StringDeserializer");
????????props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,?"earliest");
????????String?groupId?=?"test_group_offset_db11";
????????props.put(ConsumerConfig.GROUP_ID_CONFIG,?groupId);
????????KafkaConsumer<String,?String>?consumer?=?new?KafkaConsumer<>(props);
????????String?topic?=?"testTopic";
????????Map<TopicPartition,?OffsetAndMetadata>?offsetAndMetadataMap?=?new?HashMap<>();
????????consumer.subscribe(Collections.singletonList(topic),?new?SaveOffsetsRebalance(consumer,?offsetAndMetadataMap,?groupId));
????????consumer.poll(0);
????????OffsetService?offsetService?=?new?OffsetService();
????????for?(TopicPartition?partition?:?consumer.assignment())?{
????????????//?從數(shù)據(jù)庫獲取當(dāng)前分區(qū)的偏移量
????????????Offset?offset?=?offsetService.getOffset(groupId,?partition.topic(),?partition.partition());
????????????if?(offset?!=?null?&&?offset.getOffset()?!=?null)?{
????????????????consumer.seek(partition,?offset.getOffset());
????????????}?else?{
????????????????logger.info("初始時(shí)庫沒有值");
????????????}
????????}
????????try?{
????????????while?(true)?{
????????????????ConsumerRecords<String,?String>?records?=?consumer.poll(1000);
????????????????for?(ConsumerRecord<String,?String>?record?:?records)?{
????????????????????//模擬異常
???????????????????/*?if?(record.value().equals("hello?world?10"))?{
????????????????????????throw?new?RuntimeException(String.format("hello?world?10處理異常,offset:%s,partition:%s",record.offset(),record.partition()));
????????????????????}*/
????????????????????/**
?????????????????????*1.?消息處理(要考慮去重,如果消息成功,但是存儲(chǔ)偏移量失敗或者宕機(jī),此時(shí)數(shù)據(jù)庫存儲(chǔ)的消息偏移量不是最新的)
?????????????????????*2.?如果消息處理是數(shù)據(jù)庫,最好將消息處理與存儲(chǔ)offset放在一個(gè)事務(wù)當(dāng)中
?????????????????????*/
????????????????????logger.info("key:{},value:{},offset:{}",?record.key(),?record.value(),?record.offset());
????????????????????offsetAndMetadataMap.put(
????????????????????????????new?TopicPartition(record.topic(),?record.partition()),
????????????????????????????new?OffsetAndMetadata(record.offset()?+?1,?"")
????????????????????);
????????????????????//?2.存儲(chǔ)偏移量到DB,這里采用單次更新,當(dāng)然也可以批量
????????????????????offsetService.insertOffset(groupId,?record.topic(),?record.partition(),?record.offset()?+?1);
????????????????}
????????????}
????????}?catch?(WakeupException?e)?{
????????????//?不處理異常
????????}?catch?(Exception?e)?{
????????????logger.error(e.getMessage(),?e);
????????}?finally?{
????????????consumer.close();
????????}
????}
}
- rebalance代碼
public?class?SaveOffsetsRebalance?implements?ConsumerRebalanceListener?{
????private?Logger?logger?=?LoggerFactory.getLogger(SaveOffsetsRebalance.class);
????private?Consumer?consumer;
????private?Map<TopicPartition,?OffsetAndMetadata>?map;
????private?String?groupId;
????OffsetService?offsetService?=?new?OffsetService();
????public?SaveOffsetsRebalance(Consumer?consumer,?Map<TopicPartition,?OffsetAndMetadata>?map,?String?groupId)?{
????????this.consumer?=?consumer;
????????this.map?=?map;
????????this.groupId?=?groupId;
????}
????/**
?????*?此方法在rebalance操作之前調(diào)用,用于我們提交消費(fèi)者偏移,
?????*?這里不提交消費(fèi)偏移,因?yàn)閏onsumer中每處理一次消息就保存一次偏移,
?????*?consumer去做重復(fù)消費(fèi)處理
?????*/
????@Override
????public?void?onPartitionsRevoked(Collection<TopicPartition>?partitions)?{
????????int?size?=?partitions.size();
????????logger.info("rebalance?之前觸發(fā).....{}",?size);
????????Iterator<TopicPartition>?iterator?=?partitions.iterator();
????????while?(iterator.hasNext())?{
????????????TopicPartition?topicPartition?=?iterator.next();
????????????long?position?=?consumer.position(topicPartition);
????????????OffsetAndMetadata?offsetAndMetadata?=?map.get(topicPartition);
????????????if?(offsetAndMetadata?!=?null)?{
????????????????long?offset?=?offsetAndMetadata.offset();
????????????????logger.info("position:{},offset:{}",?position,?offset);
????????????}?else?{
????????????????logger.info("position:{},offset:{}",?position,?null);
????????????}
????????}
????}
????/**
?????*?此方法在rebalance操作之后調(diào)用,用于我們拉取新的分配區(qū)的偏移量。
?????*/
????@Override
????public?void?onPartitionsAssigned(Collection<TopicPartition>?partitions)?{
????????logger.info("rebalance?之后觸發(fā).....");
????????for?(TopicPartition?partition?:?partitions)?{
????????????//從數(shù)據(jù)庫獲取當(dāng)前分區(qū)的偏移量
????????????Offset?offset?=?offsetService.getOffset(this.groupId,?partition.topic(),?partition.partition());
????????????if?(offset?!=?null?&&?offset.getOffset()?!=?null)?{
????????????????consumer.seek(partition,?offset.getOffset());
????????????}
????????}
????}
}
- 數(shù)據(jù)庫sql
DROP?TABLE?IF?EXISTS?`t_offset`;
CREATE?TABLE?`t_offset`?(
??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'主鍵',
??`group_id`?varchar(50)?CHARACTER?SET?latin1?NOT?NULL,
??`topic`?varchar(50)?CHARACTER?SET?latin1?NOT?NULL?COMMENT?'topic',
??`partition`?int(11)?NOT?NULL,
??`offset`?bigint(20)?DEFAULT?NULL,
??PRIMARY?KEY?(`id`),
??UNIQUE?KEY?`unique_gtp`?(`group_id`,`topic`,`partition`)?USING?BTREE
)?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COLLATE=utf8_bin?COMMENT='偏移量保存';
- 添加依賴
compile?group:?'commons-dbutils',?name:?'commons-dbutils',?version:?'1.7'
compile?group:?'mysql',?name:?'mysql-connector-java',?version:?'5.1.30'
compile?group:?'org.aeonbits.owner',?name:?'owner',?version:?'1.0.9'
- 數(shù)據(jù)庫工具
public?class?JdbcUtils?{
????public?static?Connection?getConnection(String?driverClass,?String?url,?String?username,?String?password)?throws?SQLException,?ClassNotFoundException?{
????????Class.forName(driverClass);
????????return?DriverManager.getConnection(url,?username,?password);
????}
}
@Sources({"classpath:config.properties"})
public?interface?MysqlJdbcConfig?extends?Reloadable?{
????@Key("config.mysql.url")
????public?String?url();
????@Key("config.mysql.username")
????public?String?username();
????@Key("config.mysql.password")
????public?String?password();
????@Key("config.mysql.driverClass")
????public?String?driverClass();
????public?final?static?class?ServerConfigInner?{
????????public?final?static?MysqlJdbcConfig?config?=?ConfigFactory.create(MysqlJdbcConfig.class);
????}
????public?static?final?MysqlJdbcConfig?instance?=?ServerConfigInner.config;
}
-
配置
config.properties
config.mysql.url=jdbc:mysql://jannal.mac.com:3306/test
config.mysql.username=root
config.mysql.password=root
config.mysql.driverClass=com.mysql.jdbc.Driver
- 偏移量處理類
public?class?Offset?{
????private?Long?id;
????private?String?groupId;
????private?String?topic;
????private?String?partition;
????private?Long?offset;
??...省略getter?setter?toString..
}
public?class?OffsetService?{
????
????public?Offset?getOffset(String?groupId,?String?topic,?int?partition)?{
????????QueryRunner?queryRunner?=?new?QueryRunner();
????????Offset?offset?=?null;
????????Connection?connection?=?null;
????????try?{
????????????connection?=?JdbcUtils.getConnection(MysqlJdbcConfig.instance.driverClass(),?//
????????????????????MysqlJdbcConfig.instance.url(),//
????????????????????MysqlJdbcConfig.instance.username(),//
????????????????????MysqlJdbcConfig.instance.password());//
????????????String?sql?=?"select?`topic`,`partition`,`offset`?from?`t_offset`?where?`group_id`=??and??`topic`?=???and??`partition`?=??";
????????????offset?=?queryRunner.query(connection,?sql,?new?BeanHandler<Offset>(Offset.class),?new?Object[]{groupId,?topic,?partition});
????????}?catch?(Exception?e)?{
????????????throw?new?RuntimeException(e.getMessage(),?e);
????????}?finally?{
????????????if?(connection?!=?null)?{
????????????????try?{
????????????????????connection.close();
????????????????}?catch?(SQLException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}
????????return?offset;
????}
????public?void?insertOffset(String?groupId,?String?topic,?int?partition,?long?offset)?{
????????QueryRunner?queryRunner?=?new?QueryRunner();
????????Connection?connection?=?null;
????????try?{
????????????connection?=?JdbcUtils.getConnection(MysqlJdbcConfig.instance.driverClass(),?//
????????????????????MysqlJdbcConfig.instance.url(),//
????????????????????MysqlJdbcConfig.instance.username(),//
????????????????????MysqlJdbcConfig.instance.password());//
????????????String?sql?=?"?insert?into?`t_offset`(?`group_id`,`topic`,?`partition`,`offset`)?values?(??,?,??,?)?on?duplicate?key?update?`offset`?=VALUES(offset);";
????????????Object[]?params?=?{groupId,?topic,?partition,?offset};
????????????connection.setAutoCommit(false);
????????????queryRunner.update(connection,?sql,?params);
????????????connection.commit();
????????}?catch?(Exception?e)?{
????????????try?{
????????????????if?(connection?!=?null)?{
????????????????????connection.rollback();
????????????????}
????????????}?catch?(SQLException?e1)?{
????????????????throw?new?RuntimeException(e.getMessage(),?e);
????????????}
????????????throw?new?RuntimeException(e.getMessage(),?e);
????????}?finally?{
????????????if?(connection?!=?null)?{
????????????????try?{
????????????????????connection.close();
????????????????}?catch?(SQLException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}
????}
????public?void?batchInsertOffset(Object[][]?params)?{
????????QueryRunner?queryRunner?=?new?QueryRunner();
????????Connection?connection?=?null;
????????try?{
????????????connection?=?JdbcUtils.getConnection(MysqlJdbcConfig.instance.driverClass(),?//
????????????????????MysqlJdbcConfig.instance.url(),//
????????????????????MysqlJdbcConfig.instance.username(),//
????????????????????MysqlJdbcConfig.instance.password());//
????????????String?sql?=?"?insert?into?`t_offset`(`group_id`,?`topic`,?`partition`,`offset`)?values?(?,??,??,?)?on?duplicate?key?update?`offset`=VALUES(offset);";
????????????connection.setAutoCommit(false);
????????????queryRunner.batch(connection,?sql,?params);
????????????connection.commit();
????????}?catch?(Exception?e)?{
????????????try?{
????????????????if?(connection?!=?null)?{
????????????????????connection.rollback();
????????????????}
????????????}?catch?(SQLException?e1)?{
????????????????throw?new?RuntimeException(e.getMessage(),?e);
????????????}
????????????throw?new?RuntimeException(e.getMessage(),?e);
????????}?finally?{
????????????if?(connection?!=?null)?{
????????????????try?{
????????????????????connection.close();
????????????????}?catch?(SQLException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}
????}
}
如果這個(gè)文章對(duì)你有幫助,不要忘記?
「在看」?「點(diǎn)贊」?「收藏」
?三連啊喂!
2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專家級(jí)技能模型與學(xué)習(xí)指南(勝天半子篇)
互聯(lián)網(wǎng)最壞的時(shí)代可能真的來了
我在B站讀大學(xué),大數(shù)據(jù)專業(yè)
我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么? 193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下 Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點(diǎn) 我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么? 在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)! 硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié) 數(shù)據(jù)治理方法論和實(shí)踐小百科全書
標(biāo)簽體系下的用戶畫像建設(shè)小指南
4萬字長文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
【面試&個(gè)人成長】2021年過半,社招和校招的經(jīng)驗(yàn)之談 大數(shù)據(jù)方向另一個(gè)十年開啟 |《硬剛系列》第一版完結(jié) 我寫過的關(guān)于成長/面試/職場(chǎng)進(jìn)階的文章 當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」

