Flink 基于 TDMQ for Apache Pulsar 的離線場景使用實踐
共 26716字,需瀏覽 54分鐘
·
2024-07-02 08:45
??目錄
1 背景
2 部署 Flink
3 Demo:Topic 復(fù)制
4 Demo:單詞計數(shù)
5 Flink Connector 用法總結(jié)
6 注意事項
01
02
FLINK_PROPERTIES=$'\njobmanager.rpc.address: jobmanager\ntaskmanager.memory.task.off-heap.size: 1gb\ntaskmanager.memory.process.size: 4gb' docker network create flink-network
$ docker run \\--name=jobmanager \--network flink-network \--publish 8081:8081 \--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \flink:1.17.2-scala_2.12 jobmanager
docker run \--rm \--name=taskmanager \--network flink-network \--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \flink:1.17.2-scala_2.12 taskmanager
在本地編譯打包 Pulsar 任務(wù)后,還需要使用 Flink Cli 提交本地任務(wù)到 Flink Docker 集群,從下方網(wǎng)址下載與當前 Docker 版本一致的 Flink 二進制文件并且解壓到本地。
https://flink.apache.org/downloads/
03
參考 Flink Pulsar Connector 社區(qū)文檔和 Oceanus 相關(guān)文檔,Demo 使用 1.17 版本 Flink SDK 將命名空間的一個 Topic 消息全部復(fù)制到另一個 Topic 中,Demo 主要展示 Flink Connector 的基礎(chǔ)用法,沒有使用自定義序列化器及反序列化器,而是使用的是 Connector 內(nèi)置的 String 序列化器。
https://cloud.tencent.com/document/product/849/85885#pulsar-source-.E5.92.8C-sink-.E7.A4.BA.E4.BE.8B
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#apache-pulsar-connector
public static void main(String[] args) throws Exception {final ParameterTool parameterTool = ParameterTool.fromArgs(args);if (parameterTool.getNumberOfParameters() < 2) {System.err.println("Missing parameters!");return;}final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));env.enableCheckpointing(60000);env.getConfig().setGlobalJobParameters(parameterTool);String brokerServiceUrl = parameterTool.getRequired("broker-service-url");String inputTopic = parameterTool.getRequired("input-topic");String outputTopic = parameterTool.getRequired("output-topic");String subscriptionName = parameterTool.get("subscription-name", "testDuplicate");String token = parameterTool.getRequired("token");// sourcePulsarSource<String> source = PulsarSource.builder().setServiceUrl(brokerServiceUrl).setStartCursor(StartCursor.latest()).setTopics(inputTopic).setDeserializationSchema(new SimpleStringSchema()).setSubscriptionName(subscriptionName).setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token).build();DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");// sinkPulsarSink<String> sink = PulsarSink.builder().setServiceUrl(brokerServiceUrl).setTopics(outputTopic).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token).setConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, false).setSerializationSchema(new SimpleStringSchema()).build();stream.sinkTo(sink);env.execute("Pulsar Streaming Message Duplication");}
/usr/local/services/flink/flink-1.17.2 # /usr/local/services/flink/flink-1.17.2/bin/flink run /tmp/wordCount/pulsar-flink-examples-0.0.1-SNAPSHOT-jar-with-dependencies.jar \--broker-service-url http://pulsar-xxxxx \--input-topic pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaDuplicationInput1 \--output-topic pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaDuplicationOutput1 \--subscription-name ninjaTest1 \--token eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVcJob has been submitted with JobID c1bdab89c01ef16e00579bd2c6648859
/usr/local/services/pulsar/apache-pulsar-2.9.5/bin/pulsar-client \http://pulsar-xxxxxx \--auth-params token:eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc \--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \produce \-m "i am the bone of my sword" \-n 5 \pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaDuplicationInput1
04
單詞計數(shù)作為 Flink 中最常見的 Demo,能夠比較好的闡述 Flink 的流處理思想。此 Demo 參考 StreamNative 的 Demo,使用 1.17 Flink SDK,將 Pulsar Topic 作為源和目標資源,統(tǒng)計源 Topic 消息中每個時間窗口各個單詞出現(xiàn)的次數(shù),并且將結(jié)果投遞到目標 Topic 中。
https://github.com/streamnative/examples/blob/master/pulsar-flink/README.md
整體 Demo 項目文件見下方鏈接
pulsar-flink-example.zip
file:////tencent/api/attachments/s3/url?attachmentid=20260421
核心邏輯見下方代碼,首先使用 ParameterTool 工具解析命令行中傳入的參數(shù),之后使用 Flink 內(nèi)置的反序列化器解析消息體為字符串,在數(shù)據(jù)處理部分使用系統(tǒng)時間窗口統(tǒng)計時間窗內(nèi)流入的消息,并且對于每個出現(xiàn)的單詞匯聚生成 WordCount 對象,最后使用自定義的序列化器,將 WordCount 對象序列化為 Json 字節(jié)數(shù)組,投遞到目標 Topic 中。
目前 TDMQ Pulsar Connector 支持 Pulsar Schema、Flink Schema 以及自定義序列化器三種方法將 Java 對象序列化為 Pulsar Sink 的字節(jié)數(shù)組消息體。推薦代碼使用自定義序列化器的方式序列化定義的 WordCount 對象。
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#serializer
還需要注意默認 Sink 配置是開啟 Batch Send 模式的,在控制臺消息查詢時,Batch Message 只會查詢到 Batch 中的第一條消息,不利于對照消息數(shù)量,Demo 中關(guān)閉了 Batch Send 功能。
/*** 參考 streamNative pulsar flink demo* <a >pulsar-flink example</a>* 由于上方鏈接的 streamNative flink demo 使用 1.10.1 版本 flink 以及 2.4.17 版本 pulsar connector,* 與當前 1.20 社區(qū)版本的 flink 和 pulsar connector api 已經(jīng)存在部分 api 差異* 因此本 demo 使用 1.17 flink 版本進行重構(gòu)* <a >1.17 flink doc</a>* <p>* demo 統(tǒng)計時間窗口內(nèi)源 topic 所有消息中每個單詞出現(xiàn)頻率次數(shù)* 并且將統(tǒng)計結(jié)果按照每個單詞對應(yīng)一條消息的格式,序列化后消息后投遞到目標 topic 中**/public class PulsarStreamingWordCount {private static final Logger LOG = LoggerFactory.getLogger(PulsarStreamingWordCount.class);public static void main(String[] args) throws Exception {// 解析任務(wù)傳參// 默認使用 authToken 方式鑒權(quán)final ParameterTool parameterTool = ParameterTool.fromArgs(args);if (parameterTool.getNumberOfParameters() < 2) {System.err.println("Missing parameters!");return;}final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));env.enableCheckpointing(60000);env.getConfig().setGlobalJobParameters(parameterTool);String brokerServiceUrl = parameterTool.getRequired("broker-service-url");String inputTopic = parameterTool.getRequired("input-topic");String outputTopic = parameterTool.getRequired("output-topic");String subscriptionName = parameterTool.get("subscription-name", "WordCountTest");String token = parameterTool.getRequired("token");int timeWindowSecond = parameterTool.getInt("time-window", 60);// sourcePulsarSource<String> source = PulsarSource.builder().setServiceUrl(brokerServiceUrl).setStartCursor(StartCursor.latest()).setTopics(inputTopic)// 此處將 message 中的 payload 序列化成字符串類型// 目前 source 只支持解析消息 payload 中的內(nèi)容,將 payload 中的內(nèi)容解析成 pulsar schema 對象或者自定義的 class 對象// 而無法解析 message 中 properties 中的其他屬性,例如 publish_time// 如果需要解析 message 中的 properties,需要在繼承類中實現(xiàn) PulsarDeserializationSchema.getProducedType() 方法// getProducedType 這個方法實現(xiàn)較為繁瑣,需要聲明每個反序列化后的屬性// https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#deserializer.setDeserializationSchema(new SimpleStringSchema()).setSubscriptionName(subscriptionName).setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token).build();// 由于此處沒有使用消息體中的時間,即沒有使用消息的 publish_time// 因此此處使用 noWatermark 模式,使用 taskManager 的時間作為時間窗口DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");// process// 解析 source 中每行消息,通過空格分割成單個單詞,之后進行匯聚處理并且初始化成 WordCount 結(jié)構(gòu)體// 這里使用 TumblingProcessingTimeWindows,即使用當前 taskManager 系統(tǒng)時間計算時間窗口DataStream<WordCount> wc = stream.flatMap((FlatMapFunction<String, WordCount>) (line, collector) -> {LOG.info("current line = {}, word list = {}", line, line.split("\\s"));for (String word : line.split("\\s")) {collector.collect(new WordCount(word, 1, null));}}).returns(WordCount.class).keyBy(WordCount::getWord).window(TumblingProcessingTimeWindows.of(Time.seconds(timeWindowSecond))).reduce((ReduceFunction<WordCount>) (c1, c2) -> {WordCount reducedWordCount = new WordCount(c1.getWord(), c1.getCount() + c2.getCount(), null);LOG.info("previous [{}] [{}], current wordCount {}", c1, c2, reducedWordCount);return reducedWordCount;});// sink// 目前 1.17 flink 序列化提供了兩種已經(jīng)實現(xiàn)的方法,一種是使用 pulsar 內(nèi)置 schema,另一種是使用 flink 的 schema// 但由于目前 tdmq pulsar 提供的是 2.9 版本的 pulsar,對于 schema 支持還不夠完善// 此處使用 flink PulsarSerializationSchema<T> 提供的接口,當前主要需要實現(xiàn) serialize(IN element, PulsarSinkContext sinkContext) 方法// 將傳入的 IN 對象自定義序列化為 byte 數(shù)組// https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#serializerPulsarSink<WordCount> sink = PulsarSink.builder().setServiceUrl(brokerServiceUrl).setTopics(outputTopic).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token).setConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, false).setSerializationSchema(new PulsarSerializationSchema<WordCount>() {private ObjectMapper objectMapper;public void open(SerializationSchema.InitializationContext initializationContext,PulsarSinkContext sinkContext,SinkConfiguration sinkConfiguration)throws Exception {objectMapper = new ObjectMapper();}public PulsarMessage<?> serialize(WordCount wordCount, PulsarSinkContext sinkContext) {// 此處將 wordCount 添加處理時間后,將 wordCount 使用 json 方式序列化為 byte 數(shù)組// 以便能夠直接查看消息體內(nèi)容byte[] wordCountBytes;wordCount.setSinkDateTime(LocalDateTime.now().toString());try {wordCountBytes = objectMapper.writeValueAsBytes(wordCount);} catch (Exception exception) {wordCountBytes = exception.getMessage().getBytes();}return PulsarMessage.builder(wordCountBytes).build();}}).build();wc.sinkTo(sink);env.execute("Pulsar Streaming WordCount");}}
/usr/local/services/flink/flink-1.17.2 # /usr/local/services/flink/flink-1.17.2/bin/flink run /tmp/wordCount/pulsar-flink-examples-0.0.1-SNAPSHOT-jar-with-dependencies.jar \--broker-service-url http://pulsar-xxxx \--input-topic pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountInput1 \--output-topic pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountOutput1 \--subscription-name ninjaTest3 \--token eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVcJob has been submitted with JobID 6f608d95506f96c3eac012386f840655
/usr/local/services/pulsar/apache-pulsar-2.9.5/bin/pulsar-client \http://pulsar-xxx \--auth-params token:eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc \--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \produce \-m "i am the bone of my sword" \-n 5 \pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountInput1\--url http://pulsar-g8akj4eow8z8.sap-8ywks40k.tdmq.ap-gz.qcloud.tencenttdmq.com:8080 \--auth-params token:eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc \--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \produce \-m "test1" \-n 3 \pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountInput1
05
目前 Flink 插件生產(chǎn)和消費經(jīng)過調(diào)研,在不進行管控改造以及非標操作的情況下,能滿足基本的 TDMQ Pulsar 版使用需求。截至現(xiàn)在 Apache Flink 已經(jīng)發(fā)布 1.20 版本,目前推薦使用 Apache Flink 1.15-1.17 對應(yīng) Pulsar Connector,不推薦使用 1.15 以下版本,1.18 及以上版本可以參考 1.17 版本使用。
下面介紹 1.15 和 1.17 版本 Pulsar Flink Connector 主要配置。Flink 版本對應(yīng)的 Flink Connector 依賴可以在 Pulsar Connector Dependencies 處獲取。
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#dependency
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.17.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-pulsar</artifactId><version>4.1.0-1.17</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.2</version></dependency>
PulsarSource<String> source = PulsarSource.builder().setServiceUrl(brokerServiceUrl).setStartCursor(StartCursor.latest()).setTopics(inputTopic).setDeserializationSchema(new SimpleStringSchema()).setSubscriptionName(subscriptionName).setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token).build();
PulsarSink<String> sink = PulsarSink.builder().setServiceUrl(brokerServiceUrl).setTopics(outputTopic).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token).setSerializationSchema(new SimpleStringSchema()).build();
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.15.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-pulsar</artifactId><version>1.15.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.15.4</version></dependency>
PulsarSource<String> source = PulsarSource.builder().setServiceUrl(brokerServiceUrl).setAdminUrl(brokerServiceUrl).setStartCursor(StartCursor.latest()).setTopics(inputTopic).setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema())).setSubscriptionName(subscriptionName).setSubscriptionType(SubscriptionType.Exclusive).setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME, "org.apache.pulsar.client.impl.auth.AuthenticationToken").setConfig(PulsarOptions.PULSAR_AUTH_PARAMS, token).setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true).build();
Connector Source 全部參數(shù)可參考官方文檔 ,下表是常用配置參數(shù)。
PulsarSink<String> sink = PulsarSink.builder().setServiceUrl(brokerServiceUrl).setAdminUrl(brokerServiceUrl).setTopics(outputTopic).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME, "org.apache.pulsar.client.impl.auth.AuthenticationToken").setConfig(PulsarOptions.PULSAR_AUTH_PARAMS, token).setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema())).build();
06
由于 Connector Pulsar 會使用到堆外內(nèi)存,并且默認任務(wù)的堆外內(nèi)存為 0,因此執(zhí)行 Pulsar Job 需要顯式聲明堆外內(nèi)存大小 taskmanager.memory.task.off-heap.size,例如 1gb。
SetSerializationSchema 反序列化提供了兩種已經(jīng)實現(xiàn)的方法,一種是使用 Pulsar 內(nèi)置 Schema,另一種是使用 Flink 的 Schema。但這兩種方法都會造成業(yè)務(wù)代碼與 Schema 耦合。目前建議實現(xiàn) PulsarSerializationSchema接口,主要需要實現(xiàn) Serialize(IN Element, PulsarSinkContext SinkContext) 方法,將傳入的 IN 對象自定義序列化為 Byte 數(shù)組。
目前 Sink 默認開啟 Enable_batch 批量投遞模式,會將消息打包后投遞。如果想要關(guān)閉批量投遞功能,可以配置 SetConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, False)。
Flink 時間窗口支持兩種 時間獲取方式 ,一種直接使用任務(wù)的系統(tǒng)時間 ProcessTime,另一種是事件自帶時間 EventTime。但目前 Source 只支持解析消息 Payload 中的內(nèi)容,將 Payload 中的內(nèi)容解析成 Pulsar Schema 對象或者自定義的 Class 對象,而無法解析 Message 中 Properties 中的其他屬性,例如 消息上傳時間 Publish_Time。如果需要解析 Message 中的 Properties,根據(jù)文檔 需要在繼承類中 實現(xiàn) PulsarDeserializationSchema.getProducedType() 方法。這個方法實現(xiàn)較為繁瑣,需要聲明每個反序列化后的屬性,因此目前建議直接使用 ProcessTime 作為時間窗口時間。
1.16 及以下版本 Flink Source 的 SetSubscriptionType 方法還保留了 Shared 和 Key_Shared 訂閱模式,這兩種訂閱模式依賴事務(wù) Ack 消息,并且只有當任務(wù) Checkpoint 更新時才會統(tǒng)一提交事務(wù)和 Ack。但由于目前 TDMQ Pulsar 沒有開放事務(wù)功能,因此當前不能同時配置 SetSubscriptionType(SubscriptionType.Shared) 和 SetConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, False) 參數(shù)。
Oceanus 內(nèi)置 Pulsar Connector 是基于 StreamNative 版本,適配 Flink 1.13-1.14 版本的 Connector,這兩個版本較老,與新版本存在較多 API 不兼容,如果使用 Oceanus 內(nèi)置版本 Pulsar Connector 與高版本 Flink,可能需要較多代碼改造。
????歡迎加入騰訊云開發(fā)者社群,享前沿資訊、大咖干貨,找興趣搭子,交同城好友,更有鵝廠招聘機會、限量周邊好禮等你來~
(長按圖片立即掃碼)
