<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          基于 Flink SQL 構(gòu)建流批一體的 ETL 數(shù)據(jù)集成

          共 10354字,需瀏覽 21分鐘

           ·

          2021-03-14 10:11


          摘要:本文整理自云邪、雪盡在 Flink Forward Asia 2020 的分享,該分享以 4 個章節(jié)來詳細介紹如何利用 Flink SQL 構(gòu)建流批一體的 ETL 數(shù)據(jù)集成, 文章的主要內(nèi)容如下:

          1. 數(shù)據(jù)倉庫與數(shù)據(jù)集成

          2. 數(shù)據(jù)接入(E)

          3. 數(shù)據(jù)入倉/湖(L)

          4. 數(shù)據(jù)打?qū)?T)


          Tips:點擊文末「閱讀原文」即可回顧作者原版分享視頻~
           

          數(shù)據(jù)倉庫與數(shù)據(jù)集成






          數(shù)據(jù)倉庫是一個集成的(Integrated),面向主題的(Subject-Oriented),隨時間變化的(Time-Variant),不可修改的(Nonvolatile)數(shù)據(jù)集合,用于支持管理決策。這是數(shù)據(jù)倉庫之父 Bill Inmon 在 1990 年提出的數(shù)據(jù)倉庫概念。該概念里最重要的一點就是“集成的”,其余特性都是一些方法論的東西。因為數(shù)據(jù)倉庫首先要解決的問題,就是數(shù)據(jù)集成,就是將多個分散的、異構(gòu)的數(shù)據(jù)源整合在一起,消除數(shù)據(jù)孤島,便于后續(xù)的分析。這個不僅適用于傳統(tǒng)的離線數(shù)倉,也同樣適用于實時數(shù)倉,或者是現(xiàn)在火熱的數(shù)據(jù)湖。首先要解決的就是數(shù)據(jù)集成的問題。如果說業(yè)務(wù)的數(shù)據(jù)都在一個數(shù)據(jù)庫中,并且這個數(shù)據(jù)庫還能提供非常高效的查詢分析能力,那其實也用不著數(shù)據(jù)倉庫和數(shù)據(jù)湖上場了。




          數(shù)據(jù)集成就是我們常稱作 ETL 的過程,分別是數(shù)據(jù)接入、數(shù)據(jù)清洗轉(zhuǎn)換打?qū)挕⒁约皵?shù)據(jù)的入倉入湖,分別對應(yīng)三個英文單詞的首字母,所以叫 ETL。ETL 的過程也是數(shù)倉搭建中最具工作量的環(huán)節(jié)。那么 Flink 是如何改善這個 ETL 的過程的呢?我們先來看看傳統(tǒng)的數(shù)據(jù)倉庫的架構(gòu)。




          傳統(tǒng)的數(shù)據(jù)倉庫,實時和離線數(shù)倉是比較割裂的兩套鏈路,比如實時鏈路通過 Flume和 Canal 實時同步日志和數(shù)據(jù)庫數(shù)據(jù)到 Kafka 中,然后在 Kafka 中做數(shù)據(jù)清理和打?qū)挕kx線鏈路通過 Flume 和 Sqoop 定期同步日志和數(shù)據(jù)庫數(shù)據(jù)到 HDFS 和 Hive。然后在 Hive 里做數(shù)據(jù)清理和打?qū)挕?/span>

          這里我們主要關(guān)注的是數(shù)倉的前半段的構(gòu)建,也就是到 ODS、DWD 層,我們把這一塊看成是廣義的 ETL 數(shù)據(jù)集成的范圍。那么在這一塊,傳統(tǒng)的架構(gòu)主要存在的問題就是這種割裂的數(shù)倉搭建這會造成很多重復(fù)工作,重復(fù)的資源消耗,并且實時、離線底層數(shù)據(jù)模型不一致,會導(dǎo)致數(shù)據(jù)一致性和質(zhì)量難以保障。同時兩個鏈路的數(shù)據(jù)是孤立的,數(shù)據(jù)沒有實現(xiàn)打通和共享。

          那么 Flink 能給這個架構(gòu)帶來什么改變呢?




          基于 Flink SQL 我們現(xiàn)在可以方便地構(gòu)建流批一體的 ETL 數(shù)據(jù)集成,與傳統(tǒng)數(shù)倉架構(gòu)的核心區(qū)別主要是這幾點:


          1. Flink SQL 原生支持了 CDC 所以現(xiàn)在可以方便地同步數(shù)據(jù)庫數(shù)據(jù),不管是直連數(shù)據(jù)庫,還是對接常見的 CDC工具。

          2. Flink SQL 在最近的版本中持續(xù)強化了維表 join 的能力,不僅可以實時關(guān)聯(lián)數(shù)據(jù)庫中的維表數(shù)據(jù),現(xiàn)在還能關(guān)聯(lián) Hive 和 Kafka 中的維表數(shù)據(jù),能靈活滿足不同工作負載和時效性的需求。

          3. 基于 Flink 強大的流式 ETL 的能力,我們可以統(tǒng)一在實時層做數(shù)據(jù)接入和數(shù)據(jù)轉(zhuǎn)換,然后將明細層的數(shù)據(jù)回流到離線數(shù)倉中。

          4. 現(xiàn)在 Flink 流式寫入 Hive,已經(jīng)支持了自動合并小文件的功能,解決了小文件的痛苦。



          所以基于流批一體的架構(gòu),我們能獲得的收益:


          1. 統(tǒng)一了基礎(chǔ)公共數(shù)據(jù)
          2. 保障了流批結(jié)果的一致性
          3. 提升了離線數(shù)倉的時效性
          4. 減少了組件和鏈路的維護成本



          接下來我們會針對這個架構(gòu)中的各個部分,結(jié)合場景案例展開進行介紹,包括數(shù)據(jù)接入,數(shù)據(jù)入倉入湖,數(shù)據(jù)打?qū)挕?/span>
           



          數(shù)據(jù)接入





          現(xiàn)在數(shù)據(jù)倉庫典型的數(shù)據(jù)來源主要來自日志和數(shù)據(jù)庫,日志接入現(xiàn)階段已經(jīng)非常成熟了,也有非常豐富的開源產(chǎn)品可供選擇,包括 Flume,F(xiàn)ilebeat,Logstash 等等都能很方便地采集日志到 Kafka 。這里我們就不作過多展開。

          數(shù)據(jù)庫接入會復(fù)雜很多,常見的幾種 CDC 同步工具包括 Canal,Debezium,Maxwell。Flink 通過 CDC format 與這些同步工具做了很好的集成,可以直接消費這些同步工具產(chǎn)生的數(shù)據(jù)。同時 Flink 還推出了原生的 CDC connector,直連數(shù)據(jù)庫,降低接入門檻,簡化數(shù)據(jù)同步流程。




          我們先來看一個使用 CDC format 的例子。現(xiàn)在常見的方案是通過 Debezium 或者  Canal 去實時采集 MySQL 數(shù)據(jù)庫的 binlog,并將行級的變更事件同步到 Kafka 中供 Flink 分析處理。在 Flink 推出 CDC format 之前,用戶要去消費這種數(shù)據(jù)會非常麻煩,用戶需要了解 CDC 工具的數(shù)據(jù)格式,將 before,after 等字段都聲明出來,然后用 ROW_NUMBER 做個去重,來保證實時保留最后一行的語義。但這樣使用成本很高,而且也不支持 DELETE 事件。

          現(xiàn)在 Flink 支持了 CDC format,比如這里我們在 with 參數(shù)中可以直接指定 format = ‘debezium-json’,然后 schema 部分只需要填數(shù)據(jù)庫中表的 schema 即可。Flink 能自動識別 Debezium 的 INSERT/UPDATE/DELETE 事件,并轉(zhuǎn)成 Flink 內(nèi)部的 INSERT/UPDATE/DELETE 消息。之后用戶可以在該表上直接做聚合、join 等操作,就跟操作一個 MySQL 實時物化視圖一樣,非常方便。




          在 Flink 1.12 版本中,F(xiàn)link 已經(jīng)原生支持了大部分常見的 CDC format,比如 Canal json、Debezium json、Debezium avro、Maxwell 等等。同時 Flink 也開放了 CDC format 的接口,用戶可以實現(xiàn)自己的 CDC format 插件來對接自己公司的同步工具。




          除此之外,F(xiàn)link 內(nèi)部原生支持了 CDC 的語義,所以可以很自然地直接去讀取  MySQL 的 binlog 數(shù)據(jù)并轉(zhuǎn)成 Flink 內(nèi)部的變更消息。所以我們推出了 MySQL CDC connector,你只需要在 with 參數(shù)中指定 connector=mysql-cdc,然后 select 這張表就能實時讀取 MySQL 中的全量 +CDC 增量數(shù)據(jù),無需部署其他組件和服務(wù)。你可以把 Flink 中定義的這張表理解成是 MySQL 的實時物化視圖,所以在這張表上的聚合、join 等結(jié)果,跟實時在 MySQL 中運行出來的結(jié)果是一致的。相比于剛剛介紹的 Debezium,Canal 的架構(gòu),CDC connector 在使用上更加簡單易用了,不用再去學(xué)習(xí)和維護額外組件,數(shù)據(jù)不需要經(jīng)過 Kafka 落地,減少了端到端延遲。而且支持先讀取全量數(shù)據(jù),并無縫切換到 CDC 增量讀取上,也就是我們說的是流批一體,流批融合的架構(gòu)。




          我們發(fā)現(xiàn) MySQL CDC connector 非常受用戶的歡迎,尤其是結(jié)合 OLAP 引擎,可以快速構(gòu)建實時 OLAP 架構(gòu)。實時 OLAP 架構(gòu)的一個特點就是將數(shù)據(jù)庫數(shù)據(jù)同步到  OLAP 中做即席查詢,這樣就無需離線數(shù)倉了。

          以前是怎么做的呢?

          之前用戶一般先用 datax 做個全量同步,然后用 canal 同步實時增量到 Kafka,然后從 Kafka 同步到 OLAP,這種架構(gòu)比較復(fù)雜,鏈路也很長。現(xiàn)在很多公司都在用 Flink+ClickHouse 來快速構(gòu)建實時 OLAP 架構(gòu)。我們只需要在 Flink 中定義一個 mysql-cdc source,一個 ClickHouse sink,然后提交一個 insert into query 就完成了從 MySQL 到 ClickHouse 的實時同步工作,非常方便。而且,ClickHouse 有一個痛點就是 join 比較慢,所以一般我們會把 MySQL 數(shù)據(jù)打成一張大的明細寬表數(shù)據(jù),再寫入 ClickHouse。這個在 Flink 中一個 join 操作就完成了。而在 Flink 提供 MySQL CDC connector 之前,要在全量+增量的實時同步過程中做 join 是非常麻煩的。




          當然,這里我們也可以把 ClickHouse 替換成其他常見的 OLAP 引擎,比如阿里云的 Hologres。我們發(fā)現(xiàn)在阿里云上有很多的用戶都采用了這套鏈路和架構(gòu),因為它可以省掉數(shù)據(jù)同步服務(wù)和消息中間件的成本,對于很多中小公司來說,在如今的疫情時代,控制成本是非常重要的。




          當然,這里也可以使用其他 OLAP 引擎,比如 TiDB。TiDB 官方也在最近發(fā)過一篇文章介紹這種 Flink+TiDB 的實時 OLAP架構(gòu)。



          數(shù)據(jù)入倉湖




          剛剛我們介紹了基于 Flink SQL 可以非常方便地做數(shù)據(jù)接入,也就是 ETL 的 Extract 的部分。接下來,我們介紹一下 Flink SQL 在數(shù)據(jù)入倉入湖方面的能力,也就是 Load 的部分。




          我們回顧下剛剛的流批一體的架構(gòu)圖,其中最核心的部分就是 Kafka 數(shù)據(jù)的流式入倉,正是這一流程打通了實時和離線數(shù)倉,統(tǒng)一了數(shù)倉的基礎(chǔ)公共數(shù)據(jù),提升了離線數(shù)倉的時效性,所以我們針對這一塊展開講一講。




          使用 Flink SQL 做流式數(shù)據(jù)入倉,非常的方便,而且 1.12 版本已經(jīng)支持了小文件的自動合并,解決了小文件的痛點。可以看下右邊這段代碼,先在 Flink SQL 中使用  Hive dialect 創(chuàng)建一張 Hive 的結(jié)果表,然后通過 select from kafka 表 insert into Hive 表這樣一個簡單 query,就可以提交任務(wù)實時將 Kafka 數(shù)據(jù)流式寫入 Hive。

          如果要開啟小文件合并,只需要在 Hive 表參數(shù)中加上 auto-compaction = true,那么在流式寫入這張 Hive 表的時候就會自動做小文件的 compaction。小文件合并的原理,是 Flink 的 streaming sink 會起一個小拓撲,里面 temp writer 節(jié)點負責不斷將收到的數(shù)據(jù)寫入臨時文件中,當收到 checkpoint 時,通知 compact coordinator 開始做小文件合并,compact coordinator 會將 compaction 任務(wù)分發(fā)給多個 compact operator 并發(fā)地去做小文件合并。當 compaction 完成的時候,再通知 partition committer 提交整個分區(qū)文件可見。整個過程利用了 Flink 自身的 checkpoint 機制完成 compaction 的自動化,無需起另外的 compaction 服務(wù)。這也是 Flink 流式入倉對比于其他入倉工具的一個核心優(yōu)勢。




          除了流式入倉,F(xiàn)link 現(xiàn)在也支持流式入湖。以 Iceberg 舉例,基于 Iceberg 0.10,現(xiàn)在可以在 Flink SQL 里面直接 create 一個 Iceberg catalog,在 Iceberg catalog 下可以 create table 直接創(chuàng)建 Iceberg表。然后提交 insert into query 就可以將流式數(shù)據(jù)導(dǎo)入到 Iceberg 中。然后在 Flink 中可以用 batch 模式讀取這張 Iceberg  表,做離線分析。不過 Iceberg 的小文件自動合并功能目前還沒有發(fā)布,還在支持中。




          剛剛介紹的是純 append 數(shù)據(jù)流式入倉入湖的能力,接下來介紹 CDC 數(shù)據(jù)流式入倉入湖的能力。我們先介紹 CDC 數(shù)據(jù)入 Kafka 實時數(shù)倉。其實這個需求在實時數(shù)倉的搭建中是非常常見的,比如同步數(shù)據(jù)庫 binlog 數(shù)據(jù)到 Kafka 中,又比如 join,聚合的結(jié)果是個更新流,用戶想把這個更新流寫到 Kafka 作為中間數(shù)據(jù)供下游消費。

          這在以前做起來會非常的麻煩,在 Flink 1.12 版本中,F(xiàn)link 引入了一個新的 connector ,叫做 upsert-kafka,原生地支持了 Kafka 作為一個高效的 CDC 流式存儲。

          為什么說是高效的,因為存儲的形式是與 Kafka log compaction 機制高度集成的,Kafka 會對 compacted topic 數(shù)據(jù)做自動清理,且 Flink 讀取清理后的數(shù)據(jù),仍能保證語義的一致性。而且像 Canal, Debezium 會存儲 before,op_type 等很多無用的元數(shù)據(jù)信息,upsert-kafka 只會存儲數(shù)據(jù)本身的內(nèi)容,節(jié)省大量的存儲成本。使用上的話,只需要在 DDL 中聲明 connector = upsert-kafka,并定義 PK 即可。

          比如我們這里定義了 MySQL CDC 的直播間表,以及一個 upsert-kafka 的結(jié)果表,將直播間的數(shù)據(jù)庫同步到 Kafka 中。那么寫入 Kafka 的 INSERT 和 UPDATE 都是一個帶 key 的普通數(shù)據(jù),DELETE 是一個帶 key 的 NULL 數(shù)據(jù)。Flink 讀取這個 upsert-kafka 中的數(shù)據(jù)時,能自動識別出 INSERT/UPDATE/DELETE 消息,消費這張 upsert-kafka 表與消費 MySQL CDC 表的語義一致。并且當 Kafka 對 topic 數(shù)據(jù)做了 compaction 清理后,F(xiàn)link 讀取清理后的數(shù)據(jù),仍能保證語義的一致性。




          CDC 數(shù)據(jù)入 Hive 數(shù)倉會麻煩一些,因為 Hive 本身不支持 CDC 的語義,現(xiàn)在的一種常見方式是先將 CDC 數(shù)據(jù)以 changelog-json 格式流式寫入到 HDFS。然后起個  batch 任務(wù)周期性地將 HDFS 上的 CDC 數(shù)據(jù)按照 op 類型分為 INSERT, UPDATE, DELETE 三張表,然后做個 batch merge。



          數(shù)據(jù)打?qū)?/strong>




          前面介紹了基于 Flink SQL 的 ETL 流程的 Extract 和 Load,接下來介紹 Transformation 中最常見的數(shù)據(jù)打?qū)挷僮鳌?/span>




          數(shù)據(jù)打?qū)捠菙?shù)據(jù)集成中最為常見的業(yè)務(wù)加工場景,數(shù)據(jù)打?qū)捵钪饕氖侄尉褪?Join,F(xiàn)link SQL 提供了豐富的 Join 支持,包括 Regular Join、Interval Join、Temporal Join。




          Regular Join 就是大家熟知的雙流 Join,語法上就是普通的 JOIN 語法。圖中案例是通過廣告曝光流關(guān)聯(lián)廣告點擊流將廣告數(shù)據(jù)打?qū)挘驅(qū)捄罂梢赃M一步計算廣告費用。從圖中可以看出,曝光流和點擊流都會存入 join 節(jié)點的 state,join 算子通過關(guān)聯(lián)曝光流和點擊流的 state 實現(xiàn)數(shù)據(jù)打?qū)挕egular Join 的特點是,任意一側(cè)流都會觸發(fā)結(jié)果的更新,比如案例中的曝光流和點擊流。同時 Regular Join 的語法與傳統(tǒng)批 SQL 一致,用戶學(xué)習(xí)門檻低。但需要注意的是,Regular join 通過 state 來存儲雙流已經(jīng)到達的數(shù)據(jù),state 默認永久保留,所以 Regular join 的一個問題是默認情況下 state 會持續(xù)增長,一般我們會結(jié)合 state TTL 使用。




          Interval Join 是一條流上需要有時間區(qū)間的 join,比如剛剛的廣告計費案例中,它有一個非常典型的業(yè)務(wù)特點在里面,就是點擊一般發(fā)生在曝光之后的 10 分鐘內(nèi)。因此相對于 Regular Join,我們其實只需要關(guān)聯(lián)這10分鐘內(nèi)的曝光數(shù)據(jù),所以 state 不用存儲全量的曝光數(shù)據(jù),它是在 Regular Join 之上的一種優(yōu)化。要轉(zhuǎn)成一個 Interval Join,需要在兩個流上都定義時間屬性字段(如圖中的 click_time 和 show_time)。并在 join 條件中定義左右流的時間區(qū)間,比如這里我們增加了一個條件:點擊時間需要大于等于曝光時間,同時小于等于曝光后 10 分鐘。與 Regular Join 相同, Interval Join 任意一條流都會觸發(fā)結(jié)果更新,但相比 Regular Join,Interval Join 最大的優(yōu)點是 state 可以自動清理,根據(jù)時間區(qū)間保留數(shù)據(jù),state 占用大幅減少。Interval Join 適用于業(yè)務(wù)有明確的時間區(qū)間,比如曝光流關(guān)聯(lián)點擊流,點擊流關(guān)聯(lián)下單流,下單流關(guān)聯(lián)成交流。




          Temporal join (時態(tài)表關(guān)聯(lián)) 是最常用的數(shù)據(jù)打?qū)挿绞剑S脕碜鑫覀兪熘木S表  Join。在語法上,它需要一個顯式的 FOR SYSTEM_TIME AS OF 語句。它與 Regular Join 以及 Interval Join 最大的區(qū)別就是,維度數(shù)據(jù)的變化不會觸發(fā)結(jié)果更新,所以主流關(guān)聯(lián)上的維度數(shù)據(jù)不會再改變。Flink 支持非常豐富的 Temporal join 功能,包括關(guān)聯(lián) lookup DB,關(guān)聯(lián) changelog,關(guān)聯(lián) Hive 表。在以前,大家熟知的維表 join 一般都是關(guān)聯(lián)一個可以查詢的數(shù)據(jù)庫,因為維度數(shù)據(jù)在數(shù)據(jù)庫里面,但實際上維度數(shù)據(jù)可能有多種物理形態(tài),比如 binlog 形式,或者定期同步到 Hive 中變成了 Hive 分區(qū)表的形式。在 Flink 1.12 中,現(xiàn)在已經(jīng)支持關(guān)聯(lián)這兩種新的維表形態(tài)。 




          Temporal Join Lookup DB 是最常見的維表 Join 方式,比如在用戶點擊流關(guān)聯(lián)用戶畫像的案例中,用戶點擊流在 Kafka 中,用戶實時畫像存放在 HBase 數(shù)據(jù)庫中,每個點擊事件通過查詢并關(guān)聯(lián) HBase 中的用戶實時畫像完成數(shù)據(jù)打?qū)挕emporal Join Lookup DB 的特點是,維表的更新不會觸發(fā)結(jié)果的更新,維度數(shù)據(jù)存放在數(shù)據(jù)庫中,適用于實時性要求較高的場景,使用時我們一般會開啟 Async IO 和內(nèi)存 cache 提升查詢效率。
           




          在介紹 Temporal Join Changelog 前,我們再看一個 Lookup DB 的例子,這是一個直播互動數(shù)據(jù)關(guān)聯(lián)直播間維度的案例。這個案例中直播互動數(shù)據(jù)(比如點贊、評論)存放在 Kafka 中,直播間實時的維度數(shù)據(jù)(比如主播、直播間標題)存放在 MySQL 中,直播互動的數(shù)據(jù)量是非常大的,為了加速訪問,常用的方案是加個高速緩存,比如把直播間的維度數(shù)據(jù)通過 CDC 同步,再存入 Redis 中,再做維表關(guān)聯(lián)。這種方案的問題是,直播的業(yè)務(wù)數(shù)據(jù)比較特殊,直播間的創(chuàng)建和直播互動數(shù)據(jù)基本是同時產(chǎn)生的,因此互動數(shù)據(jù)可能早早地到達了 Kafka 被 Flink 消費,但是直播間的創(chuàng)建消息經(jīng)過了 Canal, Kafka,Redis, 這個鏈路比較長,數(shù)據(jù)延遲比較大,可能導(dǎo)致互動數(shù)據(jù)查詢 Redis 時,直播間數(shù)據(jù)還未同步完成,導(dǎo)致關(guān)聯(lián)不上直播間數(shù)據(jù),造成下游統(tǒng)計分析的偏差。




          針對這類場景,F(xiàn)link 1.12  支持了 Temporal Join Changelog,通過從 changelog在 Flink state 中物化出維表來實現(xiàn)維表關(guān)聯(lián)。剛剛的場景有了更簡潔的解決方案,我們可以通過 Flink CDC connector 把直播間數(shù)據(jù)庫表的 changelog 同步到 Kafka 中,注意我們看下右邊這段 SQL,我們用了 upsert-kafka connector 來將 MySQL binlog 寫入了 Kafka,也就是 Kafka 中存放了直播間變更數(shù)據(jù)的 upsert 流。然后我們將互動數(shù)據(jù) temporal join 這個直播間 upsert 流,便實現(xiàn)了直播數(shù)據(jù)打?qū)挼墓δ堋?/span>

          注意我們這里 FOR SYSTEM_TIME AS OF 不是跟一個 processing time,而是左流的 event time,它的含義是去關(guān)聯(lián)這個 event time 時刻的直播間數(shù)據(jù),同時我們在直播間 upsert 流上也定義了 watermark,所以 temporal join changelog 在執(zhí)行上會做 watermark 等待和對齊,保證關(guān)聯(lián)上精確版本的結(jié)果,從而解決先前方案中關(guān)聯(lián)不上的問題。




          我們詳細解釋下 temporal join changelog 的過程,左流是互動流數(shù)據(jù),右流是直播間 changelog。直播間 changelog 會物化到右流的維表 state 中,state 相當于一個多版本的數(shù)據(jù)庫鏡像, 主流互動數(shù)據(jù)會暫時緩存在左流的 state 中,等到 watermark 到達對齊后再去查維表 state 中的數(shù)據(jù)。比如現(xiàn)在互動流和直播流的 watermark 都到了10:01分,互動流的這條 10:01 分評論數(shù)據(jù)就會去查詢維表 state,并關(guān)聯(lián)上 103 房間的信息。當 10:05 這條評論數(shù)據(jù)到來時,它不會馬上輸出,不然就會關(guān)聯(lián)上空的房間信息。它會一直等待,等到左右兩流的 watermark 都到 10:05 后,才會去關(guān)聯(lián)維表 state 中的數(shù)據(jù)并輸出。這個時候,它能關(guān)聯(lián)上準確的 104 房間信息。

          總結(jié)下,Temporal Join Changelog 的特點是實時性高,因為是按照 event time 做的版本關(guān)聯(lián),所以能關(guān)聯(lián)上精確版本的信息,且維表會做 watermark 對齊等待,使得用戶可以通過 watermark 控制遲到的維表數(shù)。Temporal Join Changelog 中的維表數(shù)據(jù)都是存放在 temporal join 節(jié)點的 state 中,讀取非常高效,就像是一個本地的 Redis 一樣,用戶不再需要維護額外的 Redis 組件。 




          在數(shù)倉場景中,Hive 的使用是非常廣泛的,F(xiàn)link 與 Hive 的集成非常友好,現(xiàn)在已經(jīng)支持 Temporal Join Hive 分區(qū)表和非分區(qū)表。我們舉個典型的關(guān)聯(lián) Hive 分區(qū)表的案例:訂單流關(guān)聯(lián)店鋪數(shù)據(jù)。店鋪數(shù)據(jù)一般是變化比較緩慢的,所以業(yè)務(wù)方一般會按天全量同步店鋪表到 Hive 分區(qū)中,每天會產(chǎn)生一個新分區(qū),每個分區(qū)是當天全量的店鋪數(shù)據(jù)。

          為了關(guān)聯(lián)這種 Hive 數(shù)據(jù),只需我們在創(chuàng)建 Hive 分區(qū)表時指定右側(cè)這兩個紅圈中的參數(shù),便能實現(xiàn)自動關(guān)聯(lián) Hive 最新分區(qū)功能,partition.include = latestb 表示只讀取 Hive 最新分區(qū),partition-name 表示選擇最新分區(qū)時按分區(qū)名的字母序排序。到 10 月 3 號的時候,Hive 中已經(jīng)產(chǎn)生了 10 月 2 號的新分區(qū), Flink 監(jiān)控到新分區(qū)后,就會重新加載10月2號的數(shù)據(jù)到 cache 中并替換掉10月1號的數(shù)據(jù)作為最新的維表。之后的訂單流數(shù)據(jù)關(guān)聯(lián)上的都是 cache 10 月 2 號分區(qū)的數(shù)據(jù)。Temporal join Hive 的特點是可以自動關(guān)聯(lián) Hive 最新分區(qū),適用于維表緩慢更新,高吞吐的業(yè)務(wù)場景。




          總結(jié)一下我們剛剛介紹的幾種在數(shù)據(jù)打?qū)捴惺褂玫?join:


          1. Regular Join 的實效性非常高,吞吐一般,因為 state 會保留所有到達的數(shù)據(jù),適用于雙流關(guān)聯(lián)場景;
          2. Interval Jon 的時效性非常好,吞吐較好,因為 state 只保留時間區(qū)間內(nèi)的數(shù)據(jù),適用于有業(yè)務(wù)時間區(qū)間的雙流關(guān)聯(lián)場景;
          3. Temporal Join Lookup DB 的時效性比較好,吞吐較差,因為每條數(shù)據(jù)都需要查詢外部系統(tǒng),會有 IO 開銷,適用于維表在數(shù)據(jù)庫中的場景;
          4. Temporal Join Changelog 的時效性很好,吞吐也比較好,因為它沒有 IO 開銷,適用于需要維表等待,或者關(guān)聯(lián)準確版本的場景;
          5. Temporal Join Hive 的時效性一般,但吞吐非常好,因為維表的數(shù)據(jù)存放在cache 中,適用于維表緩慢更新的場景,高吞吐的場景。



          總結(jié)





          最后我們來總結(jié)下 Flink 在 ETL 數(shù)據(jù)集成上的能力。這是目前 Flink 數(shù)據(jù)集成的能力矩陣,我們將現(xiàn)有的外部存儲系統(tǒng)分為了關(guān)系型數(shù)據(jù)庫、KV 數(shù)據(jù)庫、消息隊列、數(shù)據(jù)湖、數(shù)據(jù)倉庫 5 種類型,可以從圖中看出 Flink 有非常豐富的生態(tài),并且對每種存儲引擎都有非常強大的集成能力。

          橫向上我們定義了 6 種能力,分別是 3 種數(shù)據(jù)接入能力:


          • 全量讀取
          • 流式讀取
          • CDC 流式讀取



          一種數(shù)據(jù)打?qū)捘芰Γ?/span>


          • 維度關(guān)聯(lián);



          以及兩種入倉/入湖能力:


          • 流式寫入
          • CDC 寫入



          可以看到 Flink 對各個系統(tǒng)的數(shù)據(jù)接入能力、維度打?qū)捘芰Α⑷雮}/入湖能力都已經(jīng)非常完善了。在 CDC 流式讀取上,F(xiàn)link 已經(jīng)支持了主流的數(shù)據(jù)庫和 Kafka 消息隊列。在數(shù)據(jù)湖方向,F(xiàn)link 對 Iceberg 的流式讀取和 CDC 寫入的功能也即將在接下來的 Iceberg 版本中發(fā)布。從這個能力矩陣可以看出,F(xiàn)link 的數(shù)據(jù)集成能力是非常全面的。

          瀏覽 59
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  中文字幕av网 | www.操逼网站 | 爱搞搞视频网站 | 啪啪啪啪啪啪网站 | 无码人妻一区二区三区密桃手机版 |