<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink 基于 TDMQ for Apache Pulsar 的離線場景使用實踐

          共 26716字,需瀏覽 54分鐘

           ·

          2024-07-02 08:45


          ??目錄


          1 背景

          2 部署 Flink

          3 Demo:Topic 復(fù)制

          4 Demo:單詞計數(shù)

          5 Flink Connector 用法總結(jié)

          6 注意事項




          01



          背景

          Apache Flink 是一個開源的流處理和批處理框架,具有高吞吐量、低延遲的流式引擎,支持事件時間處理和狀態(tài)管理,以及確保在機器故障時的容錯性和一次性語義。Flink 的核心是一個分布式流數(shù)據(jù)處理引擎,支持 Java、Scala、Python 和 SQL 編程語言,可以在集群或云環(huán)境中執(zhí)行數(shù)據(jù)流程序。它提供了 DataStream API 用于處理有界或無界數(shù)據(jù)流,DataSet API 用于處理有界數(shù)據(jù)集,以及 Table API 和 SQL 接口用于關(guān)系型流和批處理。目前 Flink 最新已經(jīng)迭代至 1.20 版本,在此過程中不光是 Flink 框架,插件本身也有部分 API 以及配置存在變更,本文主要針對較高版本的 1.17 Flink Pulsar 插件進行測試驗證,目前 Flink 版本如下:https://nightlies.apache.org/flink/




          02



          部署 Flink

             2.1 設(shè)置 Flink 環(huán)境配置


          參考 Flink 1.17 官方文檔,部署 Flink Docker 版本。
          https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/resource-providers/standalone/docker/#getting-started

          首先配置 Flink 集群 JobManager 和 TaskManager 環(huán)境信息,注意由于 Connector Pulsar 會使用到堆外內(nèi)存,并且默認任務(wù)的堆外內(nèi)存為 0,因此此處需要顯式聲明堆外內(nèi)存大小 taskmanager.memory.task.off-heap.size,這里設(shè)置為 1GB。
          https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/memory/mem_setup_tm/#configure-off-heap-memory-direct-or-native


              
          $ FLINK_PROPERTIES=$'\njobmanager.rpc.address: jobmanager\ntaskmanager.memory.task.off-heap.size: 1gb\ntaskmanager.memory.process.size: 4gb'$ docker network create flink-network
           
             2.2 部署 JobManager


          配置環(huán)境變量后部署 JobManager,這里默認映射端口為 8081,部署后登錄 8081 端口可以看到 Flink Dashboard 信息。

          $ docker run \    --rm \    --name=jobmanager \    --network flink-network \    --publish 8081:8081 \    --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \    flink:1.17.2-scala_2.12 jobmanager


             2.3 部署 TaskManager


          JobManager 是維護協(xié)調(diào)任務(wù)的組件,部署 JobManager 后還需要部署具體運行任務(wù)的 TaskManager。

          $ docker run \    --rm \    --name=taskmanager \    --network flink-network \    --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \    flink:1.17.2-scala_2.12 taskmanager

          運行 TaskManager 后,可以在 8081 JobManager 控制臺看到 TaskManager 已經(jīng)被成功注冊,至此 Flink Docker 組件部署完成。


             2.4 下載 Flink Cli


          在本地編譯打包 Pulsar 任務(wù)后,還需要使用 Flink Cli 提交本地任務(wù)到 Flink Docker 集群,從下方網(wǎng)址下載與當前 Docker 版本一致的 Flink 二進制文件并且解壓到本地。

          https://flink.apache.org/downloads/





          03



          Demo:Topic 復(fù)制

          參考 Flink Pulsar Connector 社區(qū)文檔和 Oceanus 相關(guān)文檔,Demo 使用 1.17 版本 Flink SDK 將命名空間的一個 Topic 消息全部復(fù)制到另一個 Topic 中,Demo 主要展示 Flink Connector 的基礎(chǔ)用法,沒有使用自定義序列化器及反序列化器,而是使用的是 Connector 內(nèi)置的 String 序列化器。

          https://cloud.tencent.com/document/product/849/85885#pulsar-source-.E5.92.8C-sink-.E7.A4.BA.E4.BE.8B

          https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#apache-pulsar-connector


             3.1 主要邏輯


          核心邏輯見下方代碼,首先使用 ParameterTool 工具解析命令行中傳入的參數(shù),之后根據(jù)參數(shù)信息使用 Connector Source 和 Sink Builder 方法創(chuàng)建一個從 InputTopic 中獲取消息發(fā)送到 OutputTopic 的 Flink Stream。

          public static void main(String[] args) throws Exception {    final ParameterTool parameterTool = ParameterTool.fromArgs(args);    if (parameterTool.getNumberOfParameters() < 2) {        System.err.println("Missing parameters!");        return;    }    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));    env.enableCheckpointing(60000);    env.getConfig().setGlobalJobParameters(parameterTool);    String brokerServiceUrl = parameterTool.getRequired("broker-service-url");    String inputTopic = parameterTool.getRequired("input-topic");    String outputTopic = parameterTool.getRequired("output-topic");    String subscriptionName = parameterTool.get("subscription-name", "testDuplicate");    String token = parameterTool.getRequired("token");
          // source PulsarSource<String> source = PulsarSource.builder() .setServiceUrl(brokerServiceUrl) .setStartCursor(StartCursor.latest()) .setTopics(inputTopic) .setDeserializationSchema(new SimpleStringSchema()) .setSubscriptionName(subscriptionName) .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token) .build(); DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
          // sink PulsarSink<String> sink = PulsarSink.builder() .setServiceUrl(brokerServiceUrl) .setTopics(outputTopic) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token) .setConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, false) .setSerializationSchema(new SimpleStringSchema()) .build(); stream.sinkTo(sink); env.execute("Pulsar Streaming Message Duplication");}

             3.2 驗證


          在 TDMQ Pulsar 版控制臺創(chuàng)建流入 Topic  NinjaDuplicationInput1 和流出 Topic NinjaDuplicationOutput1。


          代碼編譯為 Jar 包后,本地上傳 Jar 包到 Flink Docker,從角色管理界面獲取具有生產(chǎn)和消費角色的 Token,命令如下所示:

          /usr/local/services/flink/flink-1.17.2 # /usr/local/services/flink/flink-1.17.2/bin/flink run /tmp/wordCount/pulsar-flink-examples-0.0.1-SNAPSHOT-jar-with-dependencies.jar \    --broker-service-url http://pulsar-xxxxx \    --input-topic pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaDuplicationInput1 \    --output-topic pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaDuplicationOutput1 \    --subscription-name ninjaTest1 \    --token eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc
          Job has been submitted with JobID c1bdab89c01ef16e00579bd2c6648859

          提交任務(wù)后,可以看到 Flink Dashboard 出現(xiàn)對應(yīng)任務(wù),并且狀態(tài)處于 Running。


          在命令行往 NinjaDuplicationInput1 Topic 發(fā)送消息。

          /usr/local/services/pulsar/apache-pulsar-2.9.5/bin/pulsar-client \--url http://pulsar-xxxxxx \--auth-params token:eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc  \--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \produce \-m "i am the bone of my sword" \-n 5 \pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaDuplicationInput1

          消息發(fā)送完成后,可以在消息查詢控制臺觀察到目標 Topic NinjaDuplicationOutput1 也出現(xiàn)了五條消息,并且消息內(nèi)容和發(fā)送消息一致。


          查看 Docker TaskManager 標準輸出也能觀察到 Sink 往目標 Topic 發(fā)送消息的日志。




          04



          Demo:單詞計數(shù)

          單詞計數(shù)作為 Flink 中最常見的 Demo,能夠比較好的闡述 Flink 的流處理思想。此 Demo 參考 StreamNative 的 Demo,使用 1.17 Flink SDK,將 Pulsar Topic 作為源和目標資源,統(tǒng)計源 Topic 消息中每個時間窗口各個單詞出現(xiàn)的次數(shù),并且將結(jié)果投遞到目標 Topic 中。

          https://github.com/streamnative/examples/blob/master/pulsar-flink/README.md


             4.1 主要邏輯


          整體 Demo 項目文件見下方鏈接

          pulsar-flink-example.zip

          file:////tencent/api/attachments/s3/url?attachmentid=20260421


          核心邏輯見下方代碼,首先使用 ParameterTool 工具解析命令行中傳入的參數(shù),之后使用 Flink 內(nèi)置的反序列化器解析消息體為字符串,在數(shù)據(jù)處理部分使用系統(tǒng)時間窗口統(tǒng)計時間窗內(nèi)流入的消息,并且對于每個出現(xiàn)的單詞匯聚生成 WordCount 對象,最后使用自定義的序列化器,將 WordCount 對象序列化為 Json 字節(jié)數(shù)組,投遞到目標 Topic 中。


          目前 TDMQ Pulsar Connector 支持 Pulsar Schema、Flink Schema 以及自定義序列化器三種方法將 Java 對象序列化為 Pulsar Sink 的字節(jié)數(shù)組消息體。推薦代碼使用自定義序列化器的方式序列化定義的 WordCount 對象。

          https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#serializer


          還需要注意默認 Sink 配置是開啟 Batch Send 模式的,在控制臺消息查詢時,Batch Message 只會查詢到 Batch 中的第一條消息,不利于對照消息數(shù)量,Demo 中關(guān)閉了 Batch Send 功能。


          /** * 參考 streamNative pulsar flink demo * <a >pulsar-flink example</a> * 由于上方鏈接的 streamNative flink demo 使用 1.10.1 版本 flink 以及 2.4.17 版本 pulsar connector, * 與當前 1.20 社區(qū)版本的 flink 和 pulsar connector api 已經(jīng)存在部分 api 差異 * 因此本 demo 使用 1.17 flink 版本進行重構(gòu) * <a >1.17 flink doc</a> * <p> * demo 統(tǒng)計時間窗口內(nèi)源 topic 所有消息中每個單詞出現(xiàn)頻率次數(shù) * 并且將統(tǒng)計結(jié)果按照每個單詞對應(yīng)一條消息的格式,序列化后消息后投遞到目標 topic 中 * */public class PulsarStreamingWordCount {    private static final Logger LOG = LoggerFactory.getLogger(PulsarStreamingWordCount.class);
          public static void main(String[] args) throws Exception { // 解析任務(wù)傳參 // 默認使用 authToken 方式鑒權(quán) final ParameterTool parameterTool = ParameterTool.fromArgs(args); if (parameterTool.getNumberOfParameters() < 2) { System.err.println("Missing parameters!"); return; } final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); env.enableCheckpointing(60000); env.getConfig().setGlobalJobParameters(parameterTool); String brokerServiceUrl = parameterTool.getRequired("broker-service-url"); String inputTopic = parameterTool.getRequired("input-topic"); String outputTopic = parameterTool.getRequired("output-topic"); String subscriptionName = parameterTool.get("subscription-name", "WordCountTest"); String token = parameterTool.getRequired("token"); int timeWindowSecond = parameterTool.getInt("time-window", 60);
          // source PulsarSource<String> source = PulsarSource.builder() .setServiceUrl(brokerServiceUrl) .setStartCursor(StartCursor.latest()) .setTopics(inputTopic) // 此處將 message 中的 payload 序列化成字符串類型 // 目前 source 只支持解析消息 payload 中的內(nèi)容,將 payload 中的內(nèi)容解析成 pulsar schema 對象或者自定義的 class 對象 // 而無法解析 message 中 properties 中的其他屬性,例如 publish_time // 如果需要解析 message 中的 properties,需要在繼承類中實現(xiàn) PulsarDeserializationSchema.getProducedType() 方法 // getProducedType 這個方法實現(xiàn)較為繁瑣,需要聲明每個反序列化后的屬性 // https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#deserializer .setDeserializationSchema(new SimpleStringSchema()) .setSubscriptionName(subscriptionName) .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token) .build(); // 由于此處沒有使用消息體中的時間,即沒有使用消息的 publish_time // 因此此處使用 noWatermark 模式,使用 taskManager 的時間作為時間窗口 DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
          // process // 解析 source 中每行消息,通過空格分割成單個單詞,之后進行匯聚處理并且初始化成 WordCount 結(jié)構(gòu)體 // 這里使用 TumblingProcessingTimeWindows,即使用當前 taskManager 系統(tǒng)時間計算時間窗口 DataStream<WordCount> wc = stream .flatMap((FlatMapFunction<String, WordCount>) (line, collector) -> { LOG.info("current line = {}, word list = {}", line, line.split("\\s")); for (String word : line.split("\\s")) { collector.collect(new WordCount(word, 1, null)); } }) .returns(WordCount.class) .keyBy(WordCount::getWord) .window(TumblingProcessingTimeWindows.of(Time.seconds(timeWindowSecond))) .reduce((ReduceFunction<WordCount>) (c1, c2) -> { WordCount reducedWordCount = new WordCount(c1.getWord(), c1.getCount() + c2.getCount(), null); LOG.info("previous [{}] [{}], current wordCount {}", c1, c2, reducedWordCount); return reducedWordCount; });
          // sink // 目前 1.17 flink 序列化提供了兩種已經(jīng)實現(xiàn)的方法,一種是使用 pulsar 內(nèi)置 schema,另一種是使用 flink 的 schema // 但由于目前 tdmq pulsar 提供的是 2.9 版本的 pulsar,對于 schema 支持還不夠完善 // 此處使用 flink PulsarSerializationSchema<T> 提供的接口,當前主要需要實現(xiàn) serialize(IN element, PulsarSinkContext sinkContext) 方法 // 將傳入的 IN 對象自定義序列化為 byte 數(shù)組 // https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#serializer PulsarSink<WordCount> sink = PulsarSink.builder() .setServiceUrl(brokerServiceUrl) .setTopics(outputTopic) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token) .setConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, false) .setSerializationSchema(new PulsarSerializationSchema<WordCount>() { private ObjectMapper objectMapper;
          @Override public void open( SerializationSchema.InitializationContext initializationContext, PulsarSinkContext sinkContext, SinkConfiguration sinkConfiguration) throws Exception { objectMapper = new ObjectMapper(); }
          @Override public PulsarMessage<?> serialize(WordCount wordCount, PulsarSinkContext sinkContext) { // 此處將 wordCount 添加處理時間后,將 wordCount 使用 json 方式序列化為 byte 數(shù)組 // 以便能夠直接查看消息體內(nèi)容 byte[] wordCountBytes; wordCount.setSinkDateTime(LocalDateTime.now().toString()); try { wordCountBytes = objectMapper.writeValueAsBytes(wordCount); } catch (Exception exception) { wordCountBytes = exception.getMessage().getBytes(); } return PulsarMessage.builder(wordCountBytes).build(); }
          }) .build(); wc.sinkTo(sink); env.execute("Pulsar Streaming WordCount"); }
          }

             4.2 驗證


          在 TDMQ Pulsar 版控制臺創(chuàng)建流入 Topic NinjaWordCountInput1 和流出 Topic NinjaWordCountOutput1。


          代碼編譯為 Jar 包后,本地上傳 Jar 包到 Flink Docker,從角色管理界面獲取具有生產(chǎn)和消費角色的 Token,命令如下所示。

          /usr/local/services/flink/flink-1.17.2 # /usr/local/services/flink/flink-1.17.2/bin/flink run /tmp/wordCount/pulsar-flink-examples-0.0.1-SNAPSHOT-jar-with-dependencies.jar \    --broker-service-url http://pulsar-xxxx \    --input-topic pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountInput1 \    --output-topic pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountOutput1 \    --subscription-name ninjaTest3 \    --token eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc  Job has been submitted with JobID 6f608d95506f96c3eac012386f840655

          提交任務(wù)后,可以看到 Flink Dashboard 出現(xiàn)對應(yīng)任務(wù),并且狀態(tài)處于 Running。


          在命令行往 NinjaWordCountInput1 Topic 發(fā)送消息,此處一共發(fā)送兩批消息,第一批發(fā)送 I am the bone of my sword 5 次,第二批發(fā)送 Test1 3 次。

          /usr/local/services/pulsar/apache-pulsar-2.9.5/bin/pulsar-client \--url http://pulsar-xxx \--auth-params token:eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc  \--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \produce \-m "i am the bone of my sword" \-n 5 \pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountInput1
          /usr/local/services/pulsar/apache-pulsar-2.9.5/bin/pulsar-client \--url http://pulsar-g8akj4eow8z8.sap-8ywks40k.tdmq.ap-gz.qcloud.tencenttdmq.com:8080 \--auth-params token:eyJrZXlJZCI6InB1bHNhci1nOGFrajRlb3c4ejgiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItZzhha2o0ZW93OHo4X2Rldi10ZG1xLW5pbmphemhvdS0xNzEzODU2OTI3LWRldmNsb3VkIn0.O89TuTl0PMca7tLN9aGvHvqt7QZ9yMrJh1z3VOz7EVc \--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \produce \-m "test1" \-n 3 \pulsar-g8akj4eow8z8/dev-tdmq-ninjazhou-1713856927/ninjaWordCountInput1

          消息發(fā)送完成后,可以在消息查詢控制臺觀察到目標 Topic NinjaWordCountOutput1 出現(xiàn)了 8 條消息。


          每條消息為單詞名稱,單詞出現(xiàn)的次數(shù),單詞處理的時間點的 Json 字節(jié)數(shù)組,下圖為 am 單詞的消息結(jié)構(gòu),可以發(fā)現(xiàn)出現(xiàn)數(shù)量與投遞消息數(shù)吻合,證明任務(wù)運行正常。


          查看 TaskManager 可以看出消息體,以及每次解析的消息過程。




          05



          Flink Connector 用法總結(jié)

             5.1 版本選擇


          目前 Flink 插件生產(chǎn)和消費經(jīng)過調(diào)研,在不進行管控改造以及非標操作的情況下,能滿足基本的 TDMQ Pulsar 版使用需求。截至現(xiàn)在 Apache Flink 已經(jīng)發(fā)布 1.20 版本,目前推薦使用 Apache Flink 1.15-1.17 對應(yīng) Pulsar Connector,不推薦使用 1.15 以下版本,1.18 及以上版本可以參考 1.17 版本使用。


          下面介紹 1.15 和 1.17 版本 Pulsar Flink Connector 主要配置。Flink 版本對應(yīng)的 Flink Connector 依賴可以在 Pulsar Connector Dependencies 處獲取。

          https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/pulsar/#dependency



          各個版本文檔鏈接:https://nightlies.apache.org/flink/

             5.2  1.17 Flink Pulsar Connector


             5.2.1 代碼依賴


          Java 項目中引入相關(guān)依賴,以 Maven 工程為例,在 pom.xml 添加以下依賴:

          <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-base</artifactId>    <version>1.17.2</version></dependency><dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-pulsar</artifactId>    <version>4.1.0-1.17</version></dependency><dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-streaming-java</artifactId>    <version>1.17.2</version></dependency>

             5.2.2 Source 代碼示例


          PulsarSource<String> source = PulsarSource.builder()        .setServiceUrl(brokerServiceUrl)        .setStartCursor(StartCursor.latest())        .setTopics(inputTopic)        .setDeserializationSchema(new SimpleStringSchema())        .setSubscriptionName(subscriptionName)        .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)        .build();

             5.2.3 Source 參數(shù)說明

          Connector Source 全部參數(shù)可參考 官方文檔 ,下表是常用配置參數(shù):


             5.2.4 Sink 代碼示例

          PulsarSink<String> sink = PulsarSink.builder()       .setServiceUrl(brokerServiceUrl)       .setTopics(outputTopic)       .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)       .setAuthentication("org.apache.pulsar.client.impl.auth.AuthenticationToken", token)       .setSerializationSchema(new SimpleStringSchema())       .build();

             5.2.5 Sink 參數(shù)說明

          Connector Sink 全部參數(shù)可參考官方文檔 ,下表是常用配置參數(shù)。


             5.3 Flink Pulsar Connector

             5.3.1 代碼依賴

          Java 項目中引入相關(guān)依賴,以 Maven 工程為例,在 pom.xml 添加以下依賴:

          <dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-base</artifactId>    <version>1.15.4</version></dependency><dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-pulsar</artifactId>    <version>1.15.4</version></dependency><dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-streaming-java</artifactId>    <version>1.15.4</version></dependency>

             5.3.2 Source 代碼示例

          PulsarSource<String> source = PulsarSource.builder()        .setServiceUrl(brokerServiceUrl)        .setAdminUrl(brokerServiceUrl)        .setStartCursor(StartCursor.latest())        .setTopics(inputTopic)        .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))        .setSubscriptionName(subscriptionName)    .setSubscriptionType(SubscriptionType.Exclusive)        .setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME, "org.apache.pulsar.client.impl.auth.AuthenticationToken")        .setConfig(PulsarOptions.PULSAR_AUTH_PARAMS, token)        .setConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true)        .build();

             5.3.3 Source 參數(shù)說明

          Connector Source 全部參數(shù)可參考官方文檔 ,下表是常用配置參數(shù)。


             5.3.4 Sink 代碼示例

          PulsarSink<String> sink = PulsarSink.builder()        .setServiceUrl(brokerServiceUrl)        .setAdminUrl(brokerServiceUrl)        .setTopics(outputTopic)        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)        .setConfig(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME, "org.apache.pulsar.client.impl.auth.AuthenticationToken")        .setConfig(PulsarOptions.PULSAR_AUTH_PARAMS, token)        .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new SimpleStringSchema()))        .build();

             5.3.5 Sink 參數(shù)說明

          Connector Sink 全部參數(shù)可參考官方文檔 ,下表是常用配置參數(shù)。




          06



          注意事項

          1. 由于 Connector Pulsar 會使用到堆外內(nèi)存,并且默認任務(wù)的堆外內(nèi)存為 0,因此執(zhí)行 Pulsar Job 需要顯式聲明堆外內(nèi)存大小 taskmanager.memory.task.off-heap.size,例如 1gb。

          2. SetSerializationSchema 反序列化提供了兩種已經(jīng)實現(xiàn)的方法,一種是使用 Pulsar 內(nèi)置 Schema,另一種是使用 Flink 的 Schema。但這兩種方法都會造成業(yè)務(wù)代碼與 Schema 耦合。目前建議實現(xiàn) PulsarSerializationSchema接口,主要需要實現(xiàn) Serialize(IN Element, PulsarSinkContext SinkContext) 方法,將傳入的 IN 對象自定義序列化為 Byte 數(shù)組。

          3. 目前 Sink 默認開啟 Enable_batch 批量投遞模式,會將消息打包后投遞。如果想要關(guān)閉批量投遞功能,可以配置 SetConfig(PulsarSinkOptions.PULSAR_BATCHING_ENABLED, False)。

          4. Flink 時間窗口支持兩種 時間獲取方式 ,一種直接使用任務(wù)的系統(tǒng)時間 ProcessTime,另一種是事件自帶時間 EventTime。但目前 Source 只支持解析消息 Payload 中的內(nèi)容,將 Payload 中的內(nèi)容解析成 Pulsar Schema 對象或者自定義的 Class 對象,而無法解析 Message 中 Properties 中的其他屬性,例如 消息上傳時間 Publish_Time。如果需要解析 Message 中的 Properties,根據(jù)文檔 需要在繼承類中 實現(xiàn) PulsarDeserializationSchema.getProducedType() 方法。這個方法實現(xiàn)較為繁瑣,需要聲明每個反序列化后的屬性,因此目前建議直接使用 ProcessTime 作為時間窗口時間。

          5. 1.16 及以下版本 Flink Source 的 SetSubscriptionType 方法還保留了 Shared 和 Key_Shared 訂閱模式,這兩種訂閱模式依賴事務(wù) Ack 消息,并且只有當任務(wù) Checkpoint 更新時才會統(tǒng)一提交事務(wù)和 Ack。但由于目前 TDMQ Pulsar 沒有開放事務(wù)功能,因此當前不能同時配置 SetSubscriptionType(SubscriptionType.Shared) 和 SetConfig(PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, False) 參數(shù)。

          6. Oceanus 內(nèi)置 Pulsar Connector 是基于 StreamNative 版本,適配 Flink 1.13-1.14 版本的 Connector,這兩個版本較老,與新版本存在較多 API 不兼容,如果使用 Oceanus 內(nèi)置版本 Pulsar Connector 與高版本 Flink,可能需要較多代碼改造。


          -End-

          ????歡迎加入騰訊云開發(fā)者社群,享前沿資訊、大咖干貨,找興趣搭子,交同城好友,更有鵝廠招聘機會、限量周邊好禮等你來~


          (長按圖片立即掃碼)



          瀏覽 60
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产一级一片免费播放放a | 玖玖视频在线免费观看 | 麻豆一区二区99久久久久 | 琪琪在线 | 欧美肏屄视频 |