<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          原理+實踐|Exactly-once系列實踐之KafkaToKafka

          共 15461字,需瀏覽 31分鐘

           ·

          2023-03-11 12:56

          全網(wǎng)最全大數(shù)據(jù)面試提升手冊!

          推薦閱讀:


          文章目錄
          一、Kafka輸入輸出流工具類
          二、統(tǒng)計字符個數(shù)案例
          三、消費者消費kafka的事務數(shù)據(jù)
          四、總結(jié)與可能出現(xiàn)的問題

          一、Kafka輸入輸出流工具類

          代碼如下(示例):

          //獲取kafkaStream流
              public static <T> DataStream<T> getKafkaDataStream(ParameterTool parameterTool,Class<? extends DeserializationSchema> clazz,StreamExecutionEnvironment env) throws IllegalAccessException, InstantiationException {
                  //加入到flink的環(huán)境全局配置中,后續(xù)可以通過上下文獲取該工具類,總而得到想要的值
                  env.getConfig().setGlobalJobParameters(parameterTool);


                  //kafka配置項
                  Properties properties = new Properties();
                  properties.setProperty("bootstrap.servers", parameterTool.get("bootstrap.servers"));
                  properties.setProperty("group.id",parameterTool.get("group.idsource"));
                  properties.setProperty("auto.offset.reset",parameterTool.get("auto.offset.reset"));
                  properties.setProperty("enable.auto.commit",parameterTool.get("enable.auto.commit", String.valueOf(false)));
                  String topics = parameterTool.get("Consumertopics");

                  //序列化類實例化
                  DeserializationSchema<T> deserializationSchema = clazz.newInstance();

                  FlinkKafkaConsumer<T> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializationSchema, properties);

                  flinkKafkaConsumer.setStartFromEarliest();
                  //開啟kafka的offset與checkpoint綁定
                  flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);

                  return env.addSource(flinkKafkaConsumer);
              }



              //獲取kafka生產(chǎn)者通用方法

              /**
               * offsets.topic.replication.factor 用于配置offset記錄的topic的partition的副本個數(shù)
               * transaction.state.log.replication.factor 事務主題的復制因子
               * transaction.state.log.min.isr 覆蓋事務主題的min.insync.replicas配置
               *
               * num.partitions 新建Topic時默認的分區(qū)數(shù)
               *
               * default.replication.factor 自動創(chuàng)建topic時的默認副本的個數(shù)
               *
               *
               *
               * 注意:這些參數(shù),設置得更高以確保高可用性!
               *
               * 其中 default.replication.factor 是真正決定,topi的副本數(shù)量的
               * @param parameterTool
               * @param kafkaSerializationSchema
               * @param <T>
               * @return
               */
              public static <T> FlinkKafkaProducer<T> getFlinkKafkaProducer(ParameterTool parameterTool,KafkaSerializationSchema<T> kafkaSerializationSchema){
                  Properties properties = new Properties();
                  properties.setProperty("bootstrap.servers", parameterTool.get("bootstrap.servers"));
                  properties.setProperty("group.id",parameterTool.get("group.idsink"));
          //        properties.setProperty("transaction.max.timeout.ms",parameterTool.get("transaction.max.timeout.ms"));
                  properties.setProperty("transaction.timeout.ms",parameterTool.get("transaction.timeout.ms"));
                  properties.setProperty("client.id""flinkOutputTopicClient");
                  String topics = parameterTool.get("Producetopice");

                  return new FlinkKafkaProducer<T>(topics,kafkaSerializationSchema,properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

              }

          注意點事項

          一、消費者注意項

          1. flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true),將kafka自動提交offset關閉并且與flink的CheckPoint綁定
          2. bootstrap.servers kafka的broker host
          3. setStartFromEarliest()設置kafka的消息消費從最初位置開始

          二、生產(chǎn)者注意項

          1. transaction.timeout.ms 默認情況下Kafka Broker 將transaction.max.timeout.ms設置為15分鐘,我們需要將此值設置低于15分鐘
          2. FlinkKafkaProducer.Semantic.EXACTLY_ONCE設置kafka為精確一次

          二、統(tǒng)計字符個數(shù)案例

          代碼如下(示例):

          public static void main(String[] args) throws Exception {
                  //1.創(chuàng)建流式執(zhí)行環(huán)境
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                  //2.設置并行度
                  env.setParallelism(4);

                  //3.設置CK和狀態(tài)后端
                  CkAndStateBacked.setCheckPointAndStateBackend(env,"FS");

                  //4.獲取kafkaStream流
                  InputStream kafkaPropertiesStream = KafkaToKafkaExacitly.class.getClassLoader().getResourceAsStream("kafka.properties");
                  ParameterTool parameterTool=ParameterTool.fromPropertiesFile(kafkaPropertiesStream);
                  //將配置流放到全局flink運行時環(huán)境
                  env.getConfig().setGlobalJobParameters(parameterTool);
                  SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
                  Class<? extends SimpleStringSchema> stringSchemaClass = simpleStringSchema.getClass();
                  DataStream<String> kafkaDataStream = KafkaUtil.getKafkaDataStream(parameterTool, stringSchemaClass, env);

                  System.out.println("==================================================");
                  kafkaDataStream.print();

                  //5.map包裝成value,1
                  SingleOutputStreamOperator<Tuple2<String, Integer>> tupleStream = kafkaDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
                      @Override
                      public Tuple2<String, Integer> map(String value) throws Exception {
                          if("error".equals(value)){
                              throw new RuntimeException("發(fā)生異常?。?!");
                          }
                          return new Tuple2<>(value, 1);
                      }
                  });
                  tupleStream.print();
                  //6.按照value進行分組,并且統(tǒng)計value的個數(shù)
                  SingleOutputStreamOperator<Tuple2<String, Integer>> reduceStream = tupleStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                      @Override
                      public String getKey(Tuple2<String, Integer> value) throws Exception {
                          return value.f0;
                      }
                  }).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                      @Override
                      public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                          return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                      }
                  });


                  System.out.println("=====================================================");
                  reduceStream.print();
                  //7.將數(shù)據(jù)輸出到kafka
                  FlinkKafkaProducer<Tuple2<String, Integer>> flinkKafkaProducer = KafkaUtil.getFlinkKafkaProducer(parameterTool, new KafkaSerializationSchema<Tuple2<String, Integer>>() {
                      @Override
                      public void open(SerializationSchema.InitializationContext context) throws Exception {
                          System.out.println("=========正在向KafkaProduce輸出數(shù)據(jù)?。?!=============");
                      }

                      @Override
                      public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, Integer> element, @Nullable Long timestamp) {
                          String producetopics = parameterTool.get("Producetopice");
                          String result = element.toString();
                          return new ProducerRecord<byte[], byte[]>(producetopics, result.getBytes(StandardCharsets.UTF_8));
                      }
                  });
                  reduceStream.addSink(flinkKafkaProducer).name("kafkasinktest").uid("kafkasink");

                  //任務執(zhí)行
                  env.execute("KafkaToKafkaTest");
              }

          注意事項:
          這里使用的是本地FSstateBackend,注意你的路徑的設置,以hdfs://或者file://為地址標識符,否則Flink的文件系統(tǒng)將無法識別。

          三、消費者消費kafka的事務數(shù)據(jù)

          ublic static void main(String[] args) throws Exception {
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                  Properties sourceProperties = new Properties();
                  sourceProperties.setProperty("bootstrap.servers""*****");
                  sourceProperties.setProperty("group.id""****");
                  //端到端一致性:消費數(shù)據(jù)時需要配置isolation.level=read_committed(默認值為read_uncommitted)
                  sourceProperties.put("isolation.level""read_committed");

                  FlinkKafkaConsumer<String> ConsumerKafka = new FlinkKafkaConsumer<>("*****", new SimpleStringSchema(), sourceProperties);
                  ConsumerKafka.setStartFromEarliest();
                  DataStreamSource<String> dataStreamSource = env.addSource(ConsumerKafka);

                  dataStreamSource.print();

                  env.execute();
              }

          isolation.level這里設置為read_committed(默認為read_uncommitted) 這里可以看到以你CheckPoint設置的時間,來批量展示kafka生產(chǎn)者的消息。

          四、總結(jié)與可能出現(xiàn)的問題

          以上是flink 實現(xiàn)kafka的精確一次的測試例子,這里還有一點要注意,就是小伙伴們的kafka的配置里面。

          offsets.topic.replication.factor=1
          transaction.state.log.replication.factor=1
          transaction.state.log.min.isr=1
          default.replication.factor=1

          這四個參數(shù)里面default.replication.factor是你kafka真正每個topic的副本數(shù)量,但是在開啟事務也就是flink的addsink的時候會默認繼承兩階段提交的方式,這里transaction.state.log.replication.factor一定要大于或者等于transaction.state.log.min.isr,否則你的kafka集群不滿足事務副本復制的基本屬性,會一直不成功,那么你的CheckPoint就會超時過期,從而導致任務的整體失敗。

          kafka集群第一次有消費者消費消息時會自動創(chuàng)建 __consumer_offsets,它的副本因子受 offsets.topic.replication.factor 參數(shù)的約束,默認值為3(注意:該參數(shù)的使用限制在0.11.0.0版本發(fā)生變化),分區(qū)數(shù)可以通過 offsets.topic.num.partitions 參數(shù)設置,默認值為50,在開啟事務性的情況下就會首先會獲得一個全局的TransactionCoordinator id和transactional producer并且生成唯一的序列號等 類似于一下的例子來唯一標識當前事務的消息對應的offset,以及標識。

          [2022-03-24 21:07:40,022] INFO [TransactionCoordinator id=0] Initialized transactionalId Keyed Reduce -> (Sink: Print to Std. Out, Sink: kafkasinktest)-b0c5e26be6392399cc3c8a38581a81c2-8 with producerId 11101 and producer epoch 8 on partition __transaction_state-18 (kafka.coordinator.transaction.TransactionCoordinator)

          當flink任務出現(xiàn)異常的情況下,kafka會把以及提交但是未標記可以消費的數(shù)據(jù)直接銷毀,或者正常的情況下,會正式提交(本質(zhì)是修改消息的標志位),之后對于消費者在開啟isolation.level的時候就可以讀取以及標記為可以讀取的message。


          如果這個文章對你有幫助,不要忘記 「在看」 「點贊」 「收藏」 三連啊喂!

          2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專家級技能模型與學習指南(勝天半子篇)
          互聯(lián)網(wǎng)最壞的時代可能真的來了
          我在B站讀大學,大數(shù)據(jù)專業(yè)
          我們在學習Flink的時候,到底在學習什么?
          193篇文章暴揍Flink,這個合集你需要關注一下
          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點
          我們在學習Spark的時候,到底在學習什么?
          在所有Spark模塊中,我愿稱SparkSQL為最強!
          硬剛Hive | 4萬字基礎調(diào)優(yōu)面試小總結(jié)
          數(shù)據(jù)治理方法論和實踐小百科全書
          標簽體系下的用戶畫像建設小指南
          4萬字長文 | ClickHouse基礎&實踐&調(diào)優(yōu)全視角解析
          【面試&個人成長】2021年過半,社招和校招的經(jīng)驗之談
          大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)
          我寫過的關于成長/面試/職場進階的文章
          當我們在學習Hive的時候在學習什么?「硬剛Hive續(xù)集」

          瀏覽 47
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  久久久精品欧美 | 伊人久久久 | 天天草天天射天天撸 | 强伦人妻一区二区三区 | 在线免费观看亚洲视频 |