<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kafka Consumer Rebalance詳解

          共 18805字,需瀏覽 38分鐘

           ·

          2022-11-18 17:55

          9358ed5c66b8c9dd72116d261eb62d9c.webp 全網(wǎng)最全大數(shù)據(jù)面試提升手冊(cè)!

          文章目錄
          1. Kafka版本
          2. rebalance
          3. rebalance策略
          4. rebalance generation
          5. rebalance協(xié)議
          6. rebalance流程
          7. rebalance監(jiān)聽器

          1. Kafka版本

          kafka版本1.1.1,可能絕大部分也適用于kafka 0.10.x及以上版本。

          2. rebalance

          1. ConsumerGroup(消費(fèi)組)里的Consumer(消費(fèi)者)共同讀取topic(主題)partition(分區(qū)),一個(gè)新的Consumer(消費(fèi)者)加入ConsumerGroup(消費(fèi)組)時(shí),讀取的是原本由其他Consumer(消費(fèi)者)讀取的消息。當(dāng)一個(gè)Consumer(消費(fèi)者)被關(guān)閉或發(fā)生奔潰時(shí),它就離開ConsumerGroup(消費(fèi)組),原本由它讀取的分區(qū)將有ConsumerGroup(消費(fèi)組)的其他Consumer(消費(fèi)者)來讀取。在topic發(fā)生變化時(shí)(比如添加了新的分區(qū)),會(huì)發(fā)生Partition重分配,Partition的所有權(quán)從一個(gè)Consumer(消費(fèi)者)轉(zhuǎn)移到另一個(gè)Consumer(消費(fèi)者)的行為被稱為rebalance(再均衡)。rebalance(再均衡)本質(zhì)上是一種協(xié)議,規(guī)定了ConsumerGroup(消費(fèi)組)中所有Consumer(消費(fèi)者)如何達(dá)成一致來消費(fèi)topic(主題)下的partition(分區(qū))

          2. rebalance(再均衡)ConsumerGroup(消費(fèi)組)帶來了高可用性和伸縮性(可以安全的添加或移除消費(fèi)者),在rebalance(再均衡)期間,Consumer(消費(fèi)者)無法讀取消息,造成整個(gè)Consumer(消費(fèi)者)一段時(shí)間的不可用

          3. Consumer(消費(fèi)者)通過向GroupCoordinator(群組協(xié)調(diào)器)(不同的ConsumerGroup(消費(fèi)組)可以有不同的)發(fā)送心跳來維持它們與群組的從屬關(guān)系以及它們對(duì)分區(qū)的所有權(quán)關(guān)系。Consumer(消費(fèi)者)會(huì)在輪詢消息或者提交偏移量時(shí)發(fā)送心跳(kafka0.10.1之前的版本),在kafka0.10.1版本里,心跳線程是獨(dú)立的

          4. 分配分區(qū)的過程

          • Consumer(消費(fèi)者)加入ConsumerGroup(消費(fèi)組)時(shí),會(huì)向GroupCoordinator(群組協(xié)調(diào)器)發(fā)送一個(gè)JoinGroup請(qǐng)求,第一個(gè)加入群組的Consumer(消費(fèi)者)將會(huì)成為群主,群主從GroupCoordinator(群組協(xié)調(diào)器)獲得ConsumerGroup(消費(fèi)組)的成員列表(此列表包含所有最新正常發(fā)送心跳的活躍的Consumer(消費(fèi)者)),并負(fù)責(zé)給每一個(gè)Consumer(消費(fèi)者)分配分區(qū)(PartitionAssignor的實(shí)現(xiàn)類來決定哪個(gè)分區(qū)被分配給哪個(gè)Consumer(消費(fèi)者))

          • 群主把分配情況列表發(fā)送給GroupCoordinator(群組協(xié)調(diào)器),GroupCoordinator(群組協(xié)調(diào)器)再把這些信息發(fā)送給ConsumerGroup(消費(fèi)組)里所有的Consumer(消費(fèi)者)。每個(gè)Consumer(消費(fèi)者)只能看到自己的分配信息,只有群主知道ConsumerGroup(消費(fèi)組)里所有消費(fèi)者的分配信息。

          rebalance(再均衡)觸發(fā)條件

          • ConsumerGroup(消費(fèi)組)里的Consumer(消費(fèi)者)發(fā)生變更(主動(dòng)加入、主動(dòng)離開、崩潰),崩潰不一定就是指 consumer進(jìn)程"掛掉"、 consumer進(jìn)程所在的機(jī)器宕機(jī)、長時(shí)間GC、網(wǎng)絡(luò)延遲,當(dāng) consumer無法在指定的時(shí)間內(nèi)完成消息的處理,那么coordinator就認(rèn)為該 consumer已經(jīng)崩潰,從而引發(fā)新一輪 rebalance

          • 訂閱topic(主題)的數(shù)量發(fā)生變更(比如使用正則表達(dá)式的方式訂閱),當(dāng)匹配正則表達(dá)式的新topic被創(chuàng)建時(shí)則會(huì)觸發(fā) rebalance。

          • 訂閱topic(主題)partition(分區(qū))數(shù)量發(fā)生變更,比如使用命令行腳本增加了訂閱 topic 的分區(qū)數(shù)

          3. rebalance策略

          1. 分配策略:決定訂閱topic的每個(gè)分區(qū)會(huì)被分配給哪個(gè)consumer。默認(rèn)提供了 3 種分配策略,分別是 range 策略、round-robin策略和 sticky策略,可以通過partition.assignment.strategy參數(shù)指定。kafka1.1.x默認(rèn)使用range策略

          2. range策略:將單個(gè) topic 的所有分區(qū)按照順序排列,然后把這些分區(qū)劃分成固定大小的分區(qū)段并依次分配給每個(gè) consumer。假設(shè)有ConsumerA和ConsumerB分別處理三個(gè)分區(qū)的數(shù)據(jù),當(dāng)ConsumerC加入時(shí),觸發(fā)rebalance后,ConsumerA、ConsumerB、ConsumerC每個(gè)都處理2個(gè)分區(qū)的數(shù)據(jù)。

          3. round-robin 策略:把所有 topic 的所有分區(qū)順序擺開,然后輪詢式地分配給各個(gè) consumer

          4. sticky策略(0.11.0.0后引入):有效地避免了上述兩種策略完全無視歷史分配方案的缺陷。采用了"有黏性"的策略對(duì)所有 consumer 實(shí)例進(jìn)行分配,可以規(guī)避極端情況下的數(shù)據(jù)傾斜并且在兩次 rebalance間最大限度地維持了之前的分配方案

          4. rebalance generation

          1. rebalance generation 用于標(biāo)識(shí)某次 rebalance,在 consumer中它是一個(gè)整數(shù),通常從 0開始

          2. consumer generation主要是防止無效 offset提交。比如上一屆的 consumer成員由于某些原因延遲提交了 offset(已經(jīng)被踢出group),但 rebalance 之后該 group 產(chǎn)生了新一屆的 group成員,而延遲的offset提交攜帶的是舊的 generation信息,因此這次提交會(huì)被 consumer group拒絕,使用 consumer時(shí)經(jīng)常碰到的 ILLEGAL_GENERATION異常就是這個(gè)原因?qū)е碌?/p>

          3. 每個(gè) group進(jìn)行 rebalance之后, generation號(hào)都會(huì)加 1,表示 group進(jìn)入了 一個(gè)新的版本

          5. rebalance協(xié)議

          1. rebalance本質(zhì)是一組協(xié)議

          • JoinGroup: consumer請(qǐng)求加入組
          • SyncGroup:group leader把分配方案同步更新到組內(nèi)所有成員
          • Heartbeat:consumer定期向coordinator發(fā)送心跳表示自己存活
          • LeaveGroup:consumer主動(dòng)通知coordinator該consumer即將離組
          • DescribeGroup:查看組的所有信息,包括成員信息、協(xié)議信息、分配方案以及訂閱信息。該請(qǐng)求類型主要供管理員使用 。coordinator不使用該請(qǐng)求執(zhí)行 rebalance

          在 rebalance過程中,coordinator主要處理 consumer發(fā)過來的JoinGroupSyncGroup請(qǐng)求

          當(dāng) consumer主動(dòng)離組時(shí)會(huì)發(fā)送LeaveGroup請(qǐng)求給 coordinator

          在成功 rebalance之后,組內(nèi)所有 consumer都需要定期地向 coordinator發(fā)送Heartbeat請(qǐng)求。每個(gè) consumer 也是根據(jù)Heartbeat請(qǐng)求的響應(yīng)中是否包含 REBALANCE_IN_PROGRESS 來判斷當(dāng)前group是否開啟了新一輪 rebalance

          6. rebalance流程

          1. 第一步確認(rèn)coordinator所在的broker,并建立socket連接
          • 計(jì)算Math.abs(groupId.hashcode)%offset.topic.num.partitions(默認(rèn)50),假設(shè)是10

          • 查找__consumer_offsets分區(qū)10的leader副本所在的broker,該broker即為這個(gè)consumer group的coordinator

          1. 第二步執(zhí)行rebalance,rebalance分為兩步
          • 加入組

            • 組內(nèi)所有consumer向coordinator發(fā)送JoinGroup請(qǐng)求
            • coordinator選擇一個(gè)consumer擔(dān)任組的leader,并把所有成員信息以及訂閱信息發(fā)送給leader
          • 同步更新方案

            • leader根據(jù)rebalance的分配策略為 group中所有成員制定分配方案,決定每個(gè) consumer都負(fù)責(zé)哪些 topic 的哪些分區(qū)。
            • 分配完成后,leader通過SyncGroup請(qǐng)求將分配方案發(fā)送給coordinator。組內(nèi)所有的consumer成員也會(huì)發(fā)送SyncGroup給coordinator
            • coordinator把每個(gè)consumer的方案抽取出來作為SyncGroup請(qǐng)求的response返回給各自的consumer
          1. 為什么consumer group的分配方案在consumer端執(zhí)行?

          • 這樣做可以有更好的靈活性。比如同一個(gè)機(jī)架上的分區(qū)數(shù)據(jù)被分配給相同機(jī)架上的 consumer減少網(wǎng)絡(luò)傳輸?shù)拈_銷。即使以后分區(qū)策略發(fā)生了變更,也只需要重啟 consumer 應(yīng)用即可,不必重啟 Kafka服務(wù)器

          加入組流程

          562caa31b1adef614267e1bdf97b369a.webp
          1. 同步分配方案
          3c4a6d939a79510ced85524ef8cfbc6e.webp

          6. kafka為consumer group定義了5種狀態(tài)

          • Empty:表示group下沒有任何激活的consumer,但可能包含offset信息。

            • 每個(gè)group創(chuàng)建時(shí)處于Empty狀態(tài)
            • 所有consumer都離開group時(shí)處于Empty狀態(tài)
            • 由于可能包含offset信息,所以此狀態(tài)下的group可以響應(yīng) OffsetFetch請(qǐng)求,即返回 clients端對(duì)應(yīng)的位移信息
          • PreparingRebalance:表示group 正在準(zhǔn)備進(jìn)行 group rebalance。此狀態(tài)表示group已經(jīng)接收到部分JoinGroup的請(qǐng)求,同時(shí)在等待其他成員發(fā)送JoinGroup請(qǐng)求,知道所有成員都成功加入組或者超時(shí)(Kafka 0.10.1.0之后超時(shí)時(shí)間由consumer端參數(shù)max.poll.interval.ms指定)

            • 該狀態(tài)下的 group 依然可能保存有offset信息,因此 clients 依然可以發(fā)起 OffsetFetch 請(qǐng)求去獲取offset,甚 至還可以發(fā)起OffsetCommit請(qǐng)求去提交位移
          • AwaitingSync:所有成員都已經(jīng)加入組并等待 leader consumer 發(fā)送分區(qū)分配方案。同樣地,此時(shí)依然可以獲取位移,但若提交位移, coordinator 將會(huì)拋出REBALANCE_IN_PROGRESS異常來表明該 group 正在進(jìn)行 rebalance

          • Stable:表明 group 開始正常消費(fèi) 。此時(shí) group 必須響應(yīng) clients 發(fā)送過來的任何請(qǐng)求,比如位移提交請(qǐng)求、位移獲取請(qǐng)求、心跳請(qǐng)求等

          • Dead: 表明 group 已經(jīng)徹底廢棄, group 內(nèi)沒有任何激活consumer并且 group 的所有元數(shù)據(jù)信息都己被刪除。處于此狀態(tài)的 group 不會(huì)響應(yīng)任何請(qǐng)求。嚴(yán)格來說, coordinator會(huì)返回UNKNOWN_MEMBER_ID異常

          7. rebalance監(jiān)聽器

          1. 如果要實(shí)現(xiàn)將offset存儲(chǔ)在外部存儲(chǔ)中,需要使用rebalance。使用 rebalance 監(jiān)聽器的前提是必須使用 consumer group。如果使用的是獨(dú)立 consumer或是直接手動(dòng)分配分區(qū),那么 rebalance監(jiān)聽器是無效的

          2. rebalance 監(jiān)聽器最常見的用法就是手動(dòng)提交位移到第三方存儲(chǔ)以及在 rebalance 前后執(zhí)行一些必要的審計(jì)操作

          3. 自動(dòng)提交位移是不需要在 rebalance監(jiān)聽器中再提交位移的,consumer 每次 rebalance 時(shí)會(huì)檢查用戶是否啟用了自動(dòng)提交位移,如果是,它會(huì)幫用戶執(zhí)行提交

          4. 鑒于 consumer 通常都要求 rebalance 在很短的時(shí)間內(nèi)完成,用戶千萬不要在 rebalance 監(jiān)聽器 的兩個(gè)方法中放入執(zhí)行時(shí)間很長的邏輯,特別是一些長時(shí)間阻塞方法

          5. 代碼案例

                
                public?class?ConsumerOffsetSaveDB?{
          ????private?final?static?Logger?logger?=?LoggerFactory.getLogger("kafka-consumer");
          ????@Test
          ????public?void?testConsumerOffsetSaveDB()?{
          ????????Properties?props?=?new?Properties();
          ????????props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,?"kafka-master:9092,kafka-slave1:9093,kafka-slave2:9094");
          ????????props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,?10);
          ????????props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,?"false");
          ????????props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,?"1000");
          ????????props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,?"30000");
          ????????props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,?"org.apache.kafka.common.serialization.StringDeserializer");
          ????????props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,?"org.apache.kafka.common.serialization.StringDeserializer");

          ????????props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,?"earliest");
          ????????String?groupId?=?"test_group_offset_db11";
          ????????props.put(ConsumerConfig.GROUP_ID_CONFIG,?groupId);

          ????????KafkaConsumer<String,?String>?consumer?=?new?KafkaConsumer<>(props);
          ????????String?topic?=?"testTopic";
          ????????Map<TopicPartition,?OffsetAndMetadata>?offsetAndMetadataMap?=?new?HashMap<>();
          ????????consumer.subscribe(Collections.singletonList(topic),?new?SaveOffsetsRebalance(consumer,?offsetAndMetadataMap,?groupId));
          ????????consumer.poll(0);

          ????????OffsetService?offsetService?=?new?OffsetService();

          ????????for?(TopicPartition?partition?:?consumer.assignment())?{
          ????????????//?從數(shù)據(jù)庫獲取當(dāng)前分區(qū)的偏移量
          ????????????Offset?offset?=?offsetService.getOffset(groupId,?partition.topic(),?partition.partition());
          ????????????if?(offset?!=?null?&&?offset.getOffset()?!=?null)?{
          ????????????????consumer.seek(partition,?offset.getOffset());
          ????????????}?else?{
          ????????????????logger.info("初始時(shí)庫沒有值");
          ????????????}
          ????????}

          ????????try?{
          ????????????while?(true)?{
          ????????????????ConsumerRecords<String,?String>?records?=?consumer.poll(1000);
          ????????????????for?(ConsumerRecord<String,?String>?record?:?records)?{
          ????????????????????//模擬異常
          ???????????????????/*?if?(record.value().equals("hello?world?10"))?{
          ????????????????????????throw?new?RuntimeException(String.format("hello?world?10處理異常,offset:%s,partition:%s",record.offset(),record.partition()));
          ????????????????????}*/
          ????????????????????/**
          ?????????????????????*1.?消息處理(要考慮去重,如果消息成功,但是存儲(chǔ)偏移量失敗或者宕機(jī),此時(shí)數(shù)據(jù)庫存儲(chǔ)的消息偏移量不是最新的)
          ?????????????????????*2.?如果消息處理是數(shù)據(jù)庫,最好將消息處理與存儲(chǔ)offset放在一個(gè)事務(wù)當(dāng)中
          ?????????????????????*/
          ????????????????????logger.info("key:{},value:{},offset:{}",?record.key(),?record.value(),?record.offset());
          ????????????????????offsetAndMetadataMap.put(
          ????????????????????????????new?TopicPartition(record.topic(),?record.partition()),
          ????????????????????????????new?OffsetAndMetadata(record.offset()?+?1,?"")
          ????????????????????);
          ????????????????????//?2.存儲(chǔ)偏移量到DB,這里采用單次更新,當(dāng)然也可以批量
          ????????????????????offsetService.insertOffset(groupId,?record.topic(),?record.partition(),?record.offset()?+?1);
          ????????????????}

          ????????????}
          ????????}?catch?(WakeupException?e)?{
          ????????????//?不處理異常
          ????????}?catch?(Exception?e)?{
          ????????????logger.error(e.getMessage(),?e);
          ????????}?finally?{
          ????????????consumer.close();
          ????????}
          ????}
          }
          1. rebalance代碼
                
                public?class?SaveOffsetsRebalance?implements?ConsumerRebalanceListener?{

          ????private?Logger?logger?=?LoggerFactory.getLogger(SaveOffsetsRebalance.class);

          ????private?Consumer?consumer;
          ????private?Map<TopicPartition,?OffsetAndMetadata>?map;
          ????private?String?groupId;
          ????OffsetService?offsetService?=?new?OffsetService();

          ????public?SaveOffsetsRebalance(Consumer?consumer,?Map<TopicPartition,?OffsetAndMetadata>?map,?String?groupId)?{
          ????????this.consumer?=?consumer;
          ????????this.map?=?map;
          ????????this.groupId?=?groupId;
          ????}

          ????/**
          ?????*?此方法在rebalance操作之前調(diào)用,用于我們提交消費(fèi)者偏移,
          ?????*?這里不提交消費(fèi)偏移,因?yàn)閏onsumer中每處理一次消息就保存一次偏移,
          ?????*?consumer去做重復(fù)消費(fèi)處理
          ?????*/
          ????@Override
          ????public?void?onPartitionsRevoked(Collection<TopicPartition>?partitions)?{
          ????????int?size?=?partitions.size();
          ????????logger.info("rebalance?之前觸發(fā).....{}",?size);
          ????????Iterator<TopicPartition>?iterator?=?partitions.iterator();
          ????????while?(iterator.hasNext())?{
          ????????????TopicPartition?topicPartition?=?iterator.next();
          ????????????long?position?=?consumer.position(topicPartition);
          ????????????OffsetAndMetadata?offsetAndMetadata?=?map.get(topicPartition);
          ????????????if?(offsetAndMetadata?!=?null)?{
          ????????????????long?offset?=?offsetAndMetadata.offset();
          ????????????????logger.info("position:{},offset:{}",?position,?offset);
          ????????????}?else?{
          ????????????????logger.info("position:{},offset:{}",?position,?null);
          ????????????}
          ????????}
          ????}

          ????/**
          ?????*?此方法在rebalance操作之后調(diào)用,用于我們拉取新的分配區(qū)的偏移量。
          ?????*/
          ????@Override
          ????public?void?onPartitionsAssigned(Collection<TopicPartition>?partitions)?{
          ????????logger.info("rebalance?之后觸發(fā).....");
          ????????for?(TopicPartition?partition?:?partitions)?{
          ????????????//從數(shù)據(jù)庫獲取當(dāng)前分區(qū)的偏移量
          ????????????Offset?offset?=?offsetService.getOffset(this.groupId,?partition.topic(),?partition.partition());
          ????????????if?(offset?!=?null?&&?offset.getOffset()?!=?null)?{
          ????????????????consumer.seek(partition,?offset.getOffset());
          ????????????}
          ????????}
          ????}
          }
          1. 數(shù)據(jù)庫sql
                
                DROP?TABLE?IF?EXISTS?`t_offset`;
          CREATE?TABLE?`t_offset`?(
          ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'主鍵',
          ??`group_id`?varchar(50)?CHARACTER?SET?latin1?NOT?NULL,
          ??`topic`?varchar(50)?CHARACTER?SET?latin1?NOT?NULL?COMMENT?'topic',
          ??`partition`?int(11)?NOT?NULL,
          ??`offset`?bigint(20)?DEFAULT?NULL,
          ??PRIMARY?KEY?(`id`),
          ??UNIQUE?KEY?`unique_gtp`?(`group_id`,`topic`,`partition`)?USING?BTREE
          )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COLLATE=utf8_bin?COMMENT='偏移量保存';
          1. 添加依賴
                
                compile?group:?'commons-dbutils',?name:?'commons-dbutils',?version:?'1.7'
          compile?group:?'mysql',?name:?'mysql-connector-java',?version:?'5.1.30'
          compile?group:?'org.aeonbits.owner',?name:?'owner',?version:?'1.0.9'
          1. 數(shù)據(jù)庫工具
                
                public?class?JdbcUtils?{
          ????public?static?Connection?getConnection(String?driverClass,?String?url,?String?username,?String?password)?throws?SQLException,?ClassNotFoundException?{
          ????????Class.forName(driverClass);
          ????????return?DriverManager.getConnection(url,?username,?password);
          ????}
          }

          @Sources({"classpath:config.properties"})
          public?interface?MysqlJdbcConfig?extends?Reloadable?{
          ????@Key("config.mysql.url")
          ????public?String?url();

          ????@Key("config.mysql.username")
          ????public?String?username();

          ????@Key("config.mysql.password")
          ????public?String?password();

          ????@Key("config.mysql.driverClass")
          ????public?String?driverClass();

          ????public?final?static?class?ServerConfigInner?{
          ????????public?final?static?MysqlJdbcConfig?config?=?ConfigFactory.create(MysqlJdbcConfig.class);
          ????}

          ????public?static?final?MysqlJdbcConfig?instance?=?ServerConfigInner.config;

          }
          1. 配置config.properties
                
                config.mysql.url=jdbc:mysql://jannal.mac.com:3306/test
          config.mysql.username=root
          config.mysql.password=root
          config.mysql.driverClass=com.mysql.jdbc.Driver
          1. 偏移量處理類
                
                public?class?Offset?{
          ????private?Long?id;

          ????private?String?groupId;

          ????private?String?topic;

          ????private?String?partition;

          ????private?Long?offset;
          ??...省略getter?setter?toString..
          }

          public?class?OffsetService?{
          ????
          ????public?Offset?getOffset(String?groupId,?String?topic,?int?partition)?{
          ????????QueryRunner?queryRunner?=?new?QueryRunner();
          ????????Offset?offset?=?null;
          ????????Connection?connection?=?null;
          ????????try?{
          ????????????connection?=?JdbcUtils.getConnection(MysqlJdbcConfig.instance.driverClass(),?//
          ????????????????????MysqlJdbcConfig.instance.url(),//
          ????????????????????MysqlJdbcConfig.instance.username(),//
          ????????????????????MysqlJdbcConfig.instance.password());//
          ????????????String?sql?=?"select?`topic`,`partition`,`offset`?from?`t_offset`?where?`group_id`=??and??`topic`?=???and??`partition`?=??";
          ????????????offset?=?queryRunner.query(connection,?sql,?new?BeanHandler<Offset>(Offset.class),?new?Object[]{groupId,?topic,?partition});
          ????????}?catch?(Exception?e)?{
          ????????????throw?new?RuntimeException(e.getMessage(),?e);
          ????????}?finally?{
          ????????????if?(connection?!=?null)?{
          ????????????????try?{
          ????????????????????connection.close();
          ????????????????}?catch?(SQLException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????}
          ????????return?offset;
          ????}


          ????public?void?insertOffset(String?groupId,?String?topic,?int?partition,?long?offset)?{
          ????????QueryRunner?queryRunner?=?new?QueryRunner();
          ????????Connection?connection?=?null;
          ????????try?{
          ????????????connection?=?JdbcUtils.getConnection(MysqlJdbcConfig.instance.driverClass(),?//
          ????????????????????MysqlJdbcConfig.instance.url(),//
          ????????????????????MysqlJdbcConfig.instance.username(),//
          ????????????????????MysqlJdbcConfig.instance.password());//
          ????????????String?sql?=?"?insert?into?`t_offset`(?`group_id`,`topic`,?`partition`,`offset`)?values?(??,?,??,?)?on?duplicate?key?update?`offset`?=VALUES(offset);";
          ????????????Object[]?params?=?{groupId,?topic,?partition,?offset};
          ????????????connection.setAutoCommit(false);
          ????????????queryRunner.update(connection,?sql,?params);
          ????????????connection.commit();
          ????????}?catch?(Exception?e)?{
          ????????????try?{
          ????????????????if?(connection?!=?null)?{
          ????????????????????connection.rollback();
          ????????????????}
          ????????????}?catch?(SQLException?e1)?{
          ????????????????throw?new?RuntimeException(e.getMessage(),?e);
          ????????????}
          ????????????throw?new?RuntimeException(e.getMessage(),?e);
          ????????}?finally?{
          ????????????if?(connection?!=?null)?{
          ????????????????try?{
          ????????????????????connection.close();
          ????????????????}?catch?(SQLException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????}

          ????}


          ????public?void?batchInsertOffset(Object[][]?params)?{
          ????????QueryRunner?queryRunner?=?new?QueryRunner();
          ????????Connection?connection?=?null;
          ????????try?{
          ????????????connection?=?JdbcUtils.getConnection(MysqlJdbcConfig.instance.driverClass(),?//
          ????????????????????MysqlJdbcConfig.instance.url(),//
          ????????????????????MysqlJdbcConfig.instance.username(),//
          ????????????????????MysqlJdbcConfig.instance.password());//
          ????????????String?sql?=?"?insert?into?`t_offset`(`group_id`,?`topic`,?`partition`,`offset`)?values?(?,??,??,?)?on?duplicate?key?update?`offset`=VALUES(offset);";
          ????????????connection.setAutoCommit(false);
          ????????????queryRunner.batch(connection,?sql,?params);
          ????????????connection.commit();
          ????????}?catch?(Exception?e)?{
          ????????????try?{
          ????????????????if?(connection?!=?null)?{
          ????????????????????connection.rollback();
          ????????????????}
          ????????????}?catch?(SQLException?e1)?{
          ????????????????throw?new?RuntimeException(e.getMessage(),?e);
          ????????????}
          ????????????throw?new?RuntimeException(e.getMessage(),?e);
          ????????}?finally?{
          ????????????if?(connection?!=?null)?{
          ????????????????try?{
          ????????????????????connection.close();
          ????????????????}?catch?(SQLException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????}
          ????}
          }
          如果這個(gè)文章對(duì)你有幫助,不要忘記? 「在看」?「點(diǎn)贊」?「收藏」 ?三連啊喂!

          d1f2fe82c4584d7e04a0ca769f3345d1.webp2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專家級(jí)技能模型與學(xué)習(xí)指南(勝天半子篇) 互聯(lián)網(wǎng)最壞的時(shí)代可能真的來了
          我在B站讀大學(xué),大數(shù)據(jù)專業(yè)
          我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么? 193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下 Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點(diǎn) 我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么? 在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)! 硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié) 數(shù)據(jù)治理方法論和實(shí)踐小百科全書
          標(biāo)簽體系下的用戶畫像建設(shè)小指南
          4萬字長文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
          【面試&個(gè)人成長】2021年過半,社招和校招的經(jīng)驗(yàn)之談 大數(shù)據(jù)方向另一個(gè)十年開啟 |《硬剛系列》第一版完結(jié) 我寫過的關(guān)于成長/面試/職場(chǎng)進(jìn)階的文章 當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
          瀏覽 62
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产乱论视频 | 北条麻妃一区在线 | 黄色片AA | 91.xxxxx | 亚洲老女人BB |