<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Upsert Kafka Connector - 讓實(shí)時(shí)統(tǒng)計(jì)更簡單

          共 11890字,需瀏覽 24分鐘

           ·

          2021-03-18 21:10

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)
          回復(fù)”資源“獲取更多資源


          在某些場景中,例如讀取 compacted topic 或者輸出(更新)聚合結(jié)果的時(shí)候,需要將 Kafka 消息記錄的 key 當(dāng)成主鍵處理,用來確定一條數(shù)據(jù)是應(yīng)該作為插入、刪除還是更新記錄來處理。為了實(shí)現(xiàn)該功能,社區(qū)為 Kafka 專門新增了一個(gè) upsert connector(upsert-kafka),該 connector 擴(kuò)展自現(xiàn)有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既可以作為 source 使用,也可以作為 sink 使用,并且提供了與現(xiàn)有的 kafka connector 相同的基本功能和持久性保證,因?yàn)閮烧咧g復(fù)用了大部分代碼。

          要使用 upsert-kafka connector,必須在創(chuàng)建表時(shí)定義主鍵,并為鍵(key.format)和值(value.format)指定序列化反序列化格式。

          一、Upsert Kafka Connector是什么?

          Upsert Kafka 連接器支持以 upsert 方式從 Kafka topic 中讀取數(shù)據(jù)并將數(shù)據(jù)寫入 Kafka topic。

          作為 source,upsert-kafka 連接器生產(chǎn) changelog 流,其中每條數(shù)據(jù)記錄代表一個(gè)更新或刪除事件。更準(zhǔn)確地說,數(shù)據(jù)記錄中的 value 被解釋為同一 key 的最后一個(gè) value 的 UPDATE,如果有這個(gè) key(如果不存在相應(yīng)的 key,則該更新被視為 INSERT)。用表來類比,changelog 流中的數(shù)據(jù)記錄被解釋為 UPSERT,也稱為 INSERT/UPDATE,因?yàn)槿魏尉哂邢嗤?key 的現(xiàn)有行都被覆蓋。另外,value 為空的消息將會被視作為 DELETE 消息。

          作為 sink,upsert-kafka 連接器可以消費(fèi) changelog 流。它會將 INSERT/UPDATE_AFTER 數(shù)據(jù)作為正常的 Kafka 消息寫入,并將 DELETE 數(shù)據(jù)以 value 為空的 Kafka 消息寫入(表示對應(yīng) key 的消息被刪除)。Flink 將根據(jù)主鍵列的值對數(shù)據(jù)進(jìn)行分區(qū),從而保證主鍵上的消息有序,因此同一主鍵上的更新/刪除消息將落在同一分區(qū)中。

          如果是更新,則同一個(gè)key會存儲多條數(shù)據(jù),但在讀取該表數(shù)據(jù)時(shí),只保留最后一次更新的值),并將 DELETE 數(shù)據(jù)以 value 為空的 Kafka 消息寫入(key被打上墓碑標(biāo)記,表示對應(yīng) key 的消息被刪除)。

          Flink 將根據(jù)主鍵列的值對數(shù)據(jù)進(jìn)行分區(qū),從而保證主鍵上的消息有序,因此同一主鍵上的更新/刪除消息將落在同一分區(qū)中。

          upsert-kafka connector相關(guān)參數(shù)

          connector

          必選。指定要使用的連接器,Upsert Kafka 連接器使用:'upsert-kafka'。

          topic 必選。用于讀取和寫入的 Kafka topic 名稱。

          properties.bootstrap.servers 必選。以逗號分隔的 Kafka brokers 列表。

          key.format 必選。用于對 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 語法指定。支持的格式包括 'csv'、'json'、'avro'。

          value.format 必選。用于對 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 'csv'、'json'、'avro'。

          properties 可選。該選項(xiàng)可以傳遞任意的 Kafka 參數(shù)。選項(xiàng)的后綴名必須匹配定義在 Kafka 參數(shù)文檔中的參數(shù)名。Flink 會自動移除 選項(xiàng)名中的 "properties." 前綴,并將轉(zhuǎn)換后的鍵名以及值傳入 KafkaClient。例如,你可以通過 'properties.allow.auto.create.topics' = 'false' 來禁止自動創(chuàng)建 topic。但是,某些選項(xiàng),例如'key.deserializer' 和 'value.deserializer' 是不允許通過該方式傳遞參數(shù),因?yàn)?Flink 會重寫這些參數(shù)的值。

          value.fields-include 可選,默認(rèn)為ALL。控制key字段是否出現(xiàn)在 value 中。當(dāng)取ALL時(shí),表示消息的 value 部分將包含 schema 中所有的字段,包括定義為主鍵的字段。當(dāng)取EXCEPT_KEY時(shí),表示記錄的 value 部分包含 schema 的所有字段,定義為主鍵的字段除外。

          key.fields-prefix 可選。為了避免與value字段命名沖突,為key字段添加一個(gè)自定義前綴。默認(rèn)前綴為空。一旦指定了key字段的前綴,必須在DDL中指明前綴的名稱,但是在構(gòu)建key的序列化數(shù)據(jù)類型時(shí),將移除該前綴。見下面的示例。在需要注意的是:使用該配置屬性,value.fields-include的值必須為EXCEPT_KEY。

          二、使用步驟

          1.引入庫

                  <!-- Flink kafka connector: kafka版本大于1.0.0可以直接使用通用的連接器 -->
          <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_2.11</artifactId>
          <version>1.12.0</version>
          <scope>provided</scope>
          </dependency>

          2.SQL計(jì)算

          示例:實(shí)時(shí)地統(tǒng)計(jì)網(wǎng)頁P(yáng)V和UV的總量

          -- 創(chuàng)建kafka數(shù)據(jù)源表(json格式)
          -- 'format.type' = 'json', -- required: specify the format type
          -- 'format.fail-on-missing-field' = 'true', -- optional: flag whether to fail if a field is missing or not,'false' by default
          -- 'format.ignore-parse-errors' = 'true', -- optional: skip fields and rows with parse errors instead of failing;

          CREATE TABLE source_ods_fact_user_ippv (
          user_id STRING,
          client_ip STRING,
          client_info STRING,
          pagecode STRING,
          access_time TIMESTAMP,
          dt STRING,
          WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND -- 定義watermark
          ) WITH (
          'connector' = 'kafka',
          'topic' = 'user_ippv',
          'scan.startup.mode' = 'earliest-offset',
          'properties.group.id' = 'group1',
          'properties.bootstrap.servers' = 'xxx:9092',
          'format' = 'json',
          'json.fail-on-missing-field' = 'false',
          'json.ignore-parse-errors' = 'true'
          );

          -- 創(chuàng)建kafka upsert結(jié)果表且指定組合主鍵為:do_date,do_min
          CREATE TABLE result_total_pvuv_min (
          do_date STRING, -- 統(tǒng)計(jì)日期
          do_min STRING, -- 統(tǒng)計(jì)分鐘
          pv BIGINT, -- 點(diǎn)擊量
          uv BIGINT, -- 一天內(nèi)同個(gè)訪客多次訪問僅計(jì)算一個(gè)UV
          currenttime TIMESTAMP, -- 當(dāng)前時(shí)間
          PRIMARY KEY (do_date, do_min) NOT ENFORCED
          ) WITH (
          'connector' = 'upsert-kafka',
          'topic' = 'result_total_pvuv_min',
          'properties.bootstrap.servers' = 'xxx:9092',
          'key.json.ignore-parse-errors' = 'true',
          'value.json.fail-on-missing-field' = 'false',
          'key.format' = 'json',
          'value.format' = 'json',
          'value.fields-include' = 'ALL'
          );
          -- 創(chuàng)建視圖
          CREATE VIEW view_total_pvuv_min AS
          SELECT
          dt AS do_date, -- 時(shí)間分區(qū)
          count (client_ip) AS pv, -- 客戶端的IP
          count (DISTINCT client_ip) AS uv, -- 客戶端去重
          max(access_time) AS access_time -- 請求的時(shí)間
          FROM
          source_ods_fact_user_ippv
          GROUP BY dt;


          -- 將每分鐘的pv/uv統(tǒng)計(jì)結(jié)果寫入kafka upsert表
          INSERT INTO result_total_pvuv_min
          SELECT
          do_date,
          cast(DATE_FORMAT (access_time,'HH:mm') AS STRING) AS do_min,-- 分鐘級別的時(shí)間
          pv,
          uv,
          CURRENT_TIMESTAMP AS currenttime
          from
          view_total_pvuv_min;

          該處使用示例數(shù)據(jù)和驗(yàn)證結(jié)果如下:

          kafak 數(shù)據(jù)源:
          {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-23 11:32:24","dt":"2021-01-08"}
          {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1201","access_time":"2021-01-23 11:32:55","dt":"2021-01-08"}
          {"user_id":"2","client_ip":"192.165.12.1","client_info":"pc","pagecode":"1031", "access_time":"2021-01-23 11:32:59","dt":"2021-01-08"}
          {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1101","access_time":"2021-01-23 11:33:24","dt":"2021-01-08"}
          {"user_id":"3","client_ip":"192.168.10.3","client_info":"pc","pagecode":"1001", "access_time":"2021-01-23 11:33:30","dt":"2021-01-08"}
          {"user_id":"1","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-23 11:34:24","dt":"2021-01-08"}


          實(shí)時(shí)統(tǒng)計(jì)的結(jié)果表(TOPIC:result_total_pvuv_min):
          {"do_date":"2021-01-08","do_min":"11:32","pv":1,"uv":1,"currenttime":"2021-01-23 08:22:06.431"}
          {"do_date":"2021-01-08","do_min":"11:32","pv":2,"uv":1,"currenttime":"2021-01-23 08:22:06.526"}
          {"do_date":"2021-01-08","do_min":"11:32","pv":3,"uv":2,"currenttime":"2021-01-23 08:22:06.527"}
          {"do_date":"2021-01-08","do_min":"11:33","pv":4,"uv":2,"currenttime":"2021-01-23 08:22:06.527"}
          {"do_date":"2021-01-08","do_min":"11:33","pv":5,"uv":3,"currenttime":"2021-01-23 08:22:06.528"}
          {"do_date":"2021-01-08","do_min":"11:34","pv":6,"uv":3,"currenttime":"2021-01-23 08:22:06.529"}


          ----------------分割線--------------------

          重測試輸入如下示例數(shù)據(jù):
          {"user_id":"10","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-22 10:10:24","dt":"2021-01-22"}
          {"user_id":"11","client_ip":"192.168.12.2","client_info":"phone","pagecode":"1002","access_time":"2021-01-22 11:10:24","dt":"2021-01-22"}
          {"user_id":"10","client_ip":"192.168.12.1","client_info":"phone","pagecode":"1001","access_time":"2021-01-22 10:11:24","dt":"2021-01-22"}
          {"user_id":"11","client_ip":"192.168.12.3","client_info":"phone","pagecode":"1002","access_time":"2021-01-22 11:12:14","dt":"2021-01-22"}


          打印待更新結(jié)果:
          +----+--------------------------------+--------------------------------+----------------------+----------------------+-----------------------+
          | op | do_date | do_min | pv | uv | currenttime |
          +----+--------------------------------+--------------------------------+----------------------+----------------------+-----------------------+
          | +I | 2021-01-22 | 10:10 | 1 | 1 | 2021-01-23T08:33:2... |
          | -U | 2021-01-22 | 10:10 | 1 | 1 | 2021-01-23T08:33:2... |
          | +U | 2021-01-22 | 11:10 | 2 | 2 | 2021-01-23T08:33:2... |
          | -U | 2021-01-22 | 11:10 | 2 | 2 | 2021-01-23T08:33:2... |
          | +U | 2021-01-22 | 11:10 | 3 | 2 | 2021-01-23T08:33:2... |
          | -U | 2021-01-22 | 11:10 | 3 | 2 | 2021-01-23T08:33:3... |
          | +U | 2021-01-22 | 11:12 | 4 | 3 | 2021-01-23T08:33:3... |

          3. Kafka -> FLINK -> TIDB

          Flink on TIDB 在當(dāng)前已經(jīng)有小紅書、貝殼金服等在使用,作為一個(gè)支持upsert的實(shí)時(shí)數(shù)據(jù)同步方案具備一定的可行性。

          select version(); -- 5.7.25-TiDB-v4.0.8
          drop table if exists result_user_behavior;
          CREATE TABLE `result_user_behavior` (
          `user_id` int(11) unsigned NOT NULL AUTO_INCREMENT,
          `client_ip` varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL,
          `client_info` varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL,
          `page_code` varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL,
          `access_time` TIMESTAMP COLLATE utf8mb4_general_ci DEFAULT NULL,
          `dt`varchar(30) COLLATE utf8mb4_general_ci DEFAULT NULL,
          PRIMARY KEY (`user_id`)
          ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
          // 支持upsert的一種可行數(shù)據(jù)同步方案
          tenv.executeSql("CREATE TABLE source_kafka_user_behavior (\n" +
          " user_id INT,\n" +
          " client_ip STRING, \n" +
          " client_info STRING, \n" +
          " page_code STRING, \n" +
          " access_time TIMESTAMP, \n" +
          " dt STRING, \n" +
          " WATERMARK FOR access_time AS access_time - INTERVAL '5' SECOND \n" +
          ") WITH (\n" +
          " 'connector' = 'kafka',\n" +
          " 'topic' = 'user_ippv',\n" +
          " 'scan.startup.mode' = 'latest-offset',\n" +
          " 'properties.group.id' = 'test-group1',\n" +
          " 'properties.bootstrap.servers' = 'xx:9092', \n" +
          " 'format' = 'json', \n" +
          " 'json.fail-on-missing-field' = 'false',\n" +
          " 'json.ignore-parse-errors' = 'true'\n" +
          ")").print();

          tenv.executeSql("CREATE TABLE sink_upsert_tidb (\n" +
          " user_id INT,\n" +
          " client_ip STRING, \n" +
          " client_info STRING, \n" +
          " page_code STRING, \n" +
          " access_time TIMESTAMP, \n" +
          " dt STRING, \n" +
          " PRIMARY KEY (user_id) NOT ENFORCED" +
          ") WITH (\n" +
          " 'connector' = 'jdbc',\n" +
          " 'url' = 'jdbc:mysql://xxx:4000/bi',\n" +
          " 'username' = 'bi_rw',\n" +
          " 'password' = 'xxx',\n" +
          " 'table-name' = 'result_user_behavior'\n" +
          ")");


          tenv.executeSql("insert into sink_upsert_tidb" +
          " select " +
          " user_id ,\n" +
          " client_ip , \n" +
          " client_info , \n" +
          " page_code , \n" +
          " access_time , \n" +
          " dt \n" +
          "from source_kafka_user_behavior").print();

          測試輸入:

          測試數(shù)據(jù):
          {"user_id":"11","client_ip":"192.168.12.3","client_info":"phone","page_code":"1002","access_time":"2021-01-25 11:12:14","dt":"2021-01-25"}
          {"user_id":"11","client_ip":"192.168.12.3","client_info":"phone","page_code":"1003","access_time":"2021-01-25 11:12:14","dt":"2021-01-25"}
          {"user_id":"11"} -- 值全部置空
          {"user_id":"11","client_ip":"192.168.12.4","client_info":"phone","page_code":"10","access_time":"2021-01-25 11:35:14","dt":"2021-01-25"}
          {"user_id":"12","client_ip":"192.168.12.5","client_info":"phone","page_code":"10","access_time":"2021-01-25 11:35:14","dt":"2021-01-25"}

          Tidb查詢結(jié)果示例:

          總結(jié)

          這里演示了使用kaka作為source和sink的使用示例,其中我們把從kafka source中消費(fèi)的數(shù)據(jù)進(jìn)行視圖查詢的時(shí)候則顯示以上更新結(jié)果,每一條以統(tǒng)計(jì)日期和統(tǒng)計(jì)分鐘作為聯(lián)合主鍵的數(shù)據(jù)插入都會被解析為+I(插入)-U(標(biāo)記待刪除值) +U (更新新值),這樣在最新的result_total_pvuv_min 的kafka upsert 結(jié)果表中就是最新的數(shù)據(jù)。

          當(dāng)前kafka-upsert connector 適用于Flink-1.12的版本,作為一個(gè)數(shù)據(jù)聚合的中轉(zhuǎn)對于很多業(yè)務(wù)場景有一定的普適性,比如kafka upsert結(jié)果表還可以作為維表join, 或者通過flink sink 到HDFS, iceberg table等進(jìn)行離線分析。

          如果想真正實(shí)時(shí),F(xiàn)link+Tidb就是一個(gè)很好的解決方案。雖然Tidb存儲和計(jì)算不分離,但是能使用加機(jī)器解決的問題,性能都不是事,況且Tidb完全兼容MySQL語法,非常適合MySQL平遷,而且支持事務(wù),和使用MySQL沒有什么特別大的區(qū)別,

          官方已出TiSpark查詢引擎,雖還未實(shí)測性能,但想必會比MySQL 引擎查詢的效率要高。我司也開始著手Tidb的使用,目前的實(shí)時(shí)的任務(wù)是基于微批的形式處理,還不能算是完全的實(shí)時(shí),后面隨著對其的了解原來越完善,完全實(shí)時(shí)化則指日可待。



          FileSystem/JDBC/Kafka - Flink三大Connector實(shí)現(xiàn)原理及案例

          企業(yè)數(shù)據(jù)治理及在美團(tuán)的最佳實(shí)踐


          歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連

          文章不錯(cuò)?點(diǎn)個(gè)【在看】吧! 
          瀏覽 54
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  云盘流出真实操逼免费视频国产 | 久久国产劲暴∨内射新川 | 久热这里只有精品89 | 激情内射在线 | 国产18女人水真多免费看 |