原理+實踐|Exactly-once系列實踐之KafkaToKafka
推薦閱讀:
文章目錄
一、Kafka輸入輸出流工具類
二、統(tǒng)計字符個數(shù)案例
三、消費者消費kafka的事務數(shù)據(jù)
四、總結(jié)與可能出現(xiàn)的問題
一、Kafka輸入輸出流工具類
代碼如下(示例):
//獲取kafkaStream流
public static <T> DataStream<T> getKafkaDataStream(ParameterTool parameterTool,Class<? extends DeserializationSchema> clazz,StreamExecutionEnvironment env) throws IllegalAccessException, InstantiationException {
//加入到flink的環(huán)境全局配置中,后續(xù)可以通過上下文獲取該工具類,總而得到想要的值
env.getConfig().setGlobalJobParameters(parameterTool);
//kafka配置項
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", parameterTool.get("bootstrap.servers"));
properties.setProperty("group.id",parameterTool.get("group.idsource"));
properties.setProperty("auto.offset.reset",parameterTool.get("auto.offset.reset"));
properties.setProperty("enable.auto.commit",parameterTool.get("enable.auto.commit", String.valueOf(false)));
String topics = parameterTool.get("Consumertopics");
//序列化類實例化
DeserializationSchema<T> deserializationSchema = clazz.newInstance();
FlinkKafkaConsumer<T> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializationSchema, properties);
flinkKafkaConsumer.setStartFromEarliest();
//開啟kafka的offset與checkpoint綁定
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
return env.addSource(flinkKafkaConsumer);
}
//獲取kafka生產(chǎn)者通用方法
/**
* offsets.topic.replication.factor 用于配置offset記錄的topic的partition的副本個數(shù)
* transaction.state.log.replication.factor 事務主題的復制因子
* transaction.state.log.min.isr 覆蓋事務主題的min.insync.replicas配置
*
* num.partitions 新建Topic時默認的分區(qū)數(shù)
*
* default.replication.factor 自動創(chuàng)建topic時的默認副本的個數(shù)
*
*
*
* 注意:這些參數(shù),設置得更高以確保高可用性!
*
* 其中 default.replication.factor 是真正決定,topi的副本數(shù)量的
* @param parameterTool
* @param kafkaSerializationSchema
* @param <T>
* @return
*/
public static <T> FlinkKafkaProducer<T> getFlinkKafkaProducer(ParameterTool parameterTool,KafkaSerializationSchema<T> kafkaSerializationSchema){
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", parameterTool.get("bootstrap.servers"));
properties.setProperty("group.id",parameterTool.get("group.idsink"));
// properties.setProperty("transaction.max.timeout.ms",parameterTool.get("transaction.max.timeout.ms"));
properties.setProperty("transaction.timeout.ms",parameterTool.get("transaction.timeout.ms"));
properties.setProperty("client.id", "flinkOutputTopicClient");
String topics = parameterTool.get("Producetopice");
return new FlinkKafkaProducer<T>(topics,kafkaSerializationSchema,properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
注意點事項
一、消費者注意項
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true),將kafka自動提交offset關閉并且與flink的CheckPoint綁定 bootstrap.servers kafka的broker host setStartFromEarliest()設置kafka的消息消費從最初位置開始
二、生產(chǎn)者注意項
transaction.timeout.ms 默認情況下Kafka Broker 將transaction.max.timeout.ms設置為15分鐘,我們需要將此值設置低于15分鐘 FlinkKafkaProducer.Semantic.EXACTLY_ONCE設置kafka為精確一次
二、統(tǒng)計字符個數(shù)案例
代碼如下(示例):
public static void main(String[] args) throws Exception {
//1.創(chuàng)建流式執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.設置并行度
env.setParallelism(4);
//3.設置CK和狀態(tài)后端
CkAndStateBacked.setCheckPointAndStateBackend(env,"FS");
//4.獲取kafkaStream流
InputStream kafkaPropertiesStream = KafkaToKafkaExacitly.class.getClassLoader().getResourceAsStream("kafka.properties");
ParameterTool parameterTool=ParameterTool.fromPropertiesFile(kafkaPropertiesStream);
//將配置流放到全局flink運行時環(huán)境
env.getConfig().setGlobalJobParameters(parameterTool);
SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
Class<? extends SimpleStringSchema> stringSchemaClass = simpleStringSchema.getClass();
DataStream<String> kafkaDataStream = KafkaUtil.getKafkaDataStream(parameterTool, stringSchemaClass, env);
System.out.println("==================================================");
kafkaDataStream.print();
//5.map包裝成value,1
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleStream = kafkaDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
if("error".equals(value)){
throw new RuntimeException("發(fā)生異常?。?!");
}
return new Tuple2<>(value, 1);
}
});
tupleStream.print();
//6.按照value進行分組,并且統(tǒng)計value的個數(shù)
SingleOutputStreamOperator<Tuple2<String, Integer>> reduceStream = tupleStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
});
System.out.println("=====================================================");
reduceStream.print();
//7.將數(shù)據(jù)輸出到kafka
FlinkKafkaProducer<Tuple2<String, Integer>> flinkKafkaProducer = KafkaUtil.getFlinkKafkaProducer(parameterTool, new KafkaSerializationSchema<Tuple2<String, Integer>>() {
@Override
public void open(SerializationSchema.InitializationContext context) throws Exception {
System.out.println("=========正在向KafkaProduce輸出數(shù)據(jù)?。?!=============");
}
@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, Integer> element, @Nullable Long timestamp) {
String producetopics = parameterTool.get("Producetopice");
String result = element.toString();
return new ProducerRecord<byte[], byte[]>(producetopics, result.getBytes(StandardCharsets.UTF_8));
}
});
reduceStream.addSink(flinkKafkaProducer).name("kafkasinktest").uid("kafkasink");
//任務執(zhí)行
env.execute("KafkaToKafkaTest");
}
注意事項:
這里使用的是本地FSstateBackend,注意你的路徑的設置,以hdfs://或者file://為地址標識符,否則Flink的文件系統(tǒng)將無法識別。
三、消費者消費kafka的事務數(shù)據(jù)
ublic static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties sourceProperties = new Properties();
sourceProperties.setProperty("bootstrap.servers", "*****");
sourceProperties.setProperty("group.id", "****");
//端到端一致性:消費數(shù)據(jù)時需要配置isolation.level=read_committed(默認值為read_uncommitted)
sourceProperties.put("isolation.level", "read_committed");
FlinkKafkaConsumer<String> ConsumerKafka = new FlinkKafkaConsumer<>("*****", new SimpleStringSchema(), sourceProperties);
ConsumerKafka.setStartFromEarliest();
DataStreamSource<String> dataStreamSource = env.addSource(ConsumerKafka);
dataStreamSource.print();
env.execute();
}
isolation.level這里設置為read_committed(默認為read_uncommitted) 這里可以看到以你CheckPoint設置的時間,來批量展示kafka生產(chǎn)者的消息。
四、總結(jié)與可能出現(xiàn)的問題
以上是flink 實現(xiàn)kafka的精確一次的測試例子,這里還有一點要注意,就是小伙伴們的kafka的配置里面。
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
default.replication.factor=1
這四個參數(shù)里面default.replication.factor是你kafka真正每個topic的副本數(shù)量,但是在開啟事務也就是flink的addsink的時候會默認繼承兩階段提交的方式,這里transaction.state.log.replication.factor一定要大于或者等于transaction.state.log.min.isr,否則你的kafka集群不滿足事務副本復制的基本屬性,會一直不成功,那么你的CheckPoint就會超時過期,從而導致任務的整體失敗。
kafka集群第一次有消費者消費消息時會自動創(chuàng)建 __consumer_offsets,它的副本因子受 offsets.topic.replication.factor 參數(shù)的約束,默認值為3(注意:該參數(shù)的使用限制在0.11.0.0版本發(fā)生變化),分區(qū)數(shù)可以通過 offsets.topic.num.partitions 參數(shù)設置,默認值為50,在開啟事務性的情況下就會首先會獲得一個全局的TransactionCoordinator id和transactional producer并且生成唯一的序列號等 類似于一下的例子來唯一標識當前事務的消息對應的offset,以及標識。
[2022-03-24 21:07:40,022] INFO [TransactionCoordinator id=0] Initialized transactionalId Keyed Reduce -> (Sink: Print to Std. Out, Sink: kafkasinktest)-b0c5e26be6392399cc3c8a38581a81c2-8 with producerId 11101 and producer epoch 8 on partition __transaction_state-18 (kafka.coordinator.transaction.TransactionCoordinator)
當flink任務出現(xiàn)異常的情況下,kafka會把以及提交但是未標記可以消費的數(shù)據(jù)直接銷毀,或者正常的情況下,會正式提交(本質(zhì)是修改消息的標志位),之后對于消費者在開啟isolation.level的時候就可以讀取以及標記為可以讀取的message。


