深入解讀 Flink SQL 1.13
一、Flink SQL 1.13 概覽

二、 核心 feature 解讀
1. FLIP-145:支持 Window TVF

■ 1.1 Window TVF 語法
SELECTTUMBLE_START(bidtime,INTERVAL '10' MINUTE),TUMBLE_END(bidtime,INTERVAL '10' MINUTE),TUMBLE_ROWTIME(bidtime,INTERVAL '10' MINUTE),SUM(price)FROM MyTableGROUP BY TUMBLE(bidtime,INTERVAL '10' MINUTE)
SELECT WINDOW_start,WINDOW_end,WINDOW_time,SUM(price)FROM Table(TUMBLE(Table myTable,DESCRIPTOR(biztime),INTERVAL '10' MINUTE))GROUP BY WINDOW_start,WINDOW_end

■ 1.2 Cumulate Window

第一個(gè) window 統(tǒng)計(jì)的是一個(gè)區(qū)間的數(shù)據(jù);
第二個(gè) window 統(tǒng)計(jì)的是第一區(qū)間和第二個(gè)區(qū)間的數(shù)據(jù);
第三個(gè) window 統(tǒng)計(jì)的是第一區(qū)間,第二個(gè)區(qū)間和第三個(gè)區(qū)間的數(shù)據(jù)。

INSERT INTO cumulative_UVSELECT date_str,MAX(time_str),COUNT(DISTINCT user_id) as UVFROM (SELECTDATE_FORMAT(ts,'yyyy-MM-dd') as date_str,SUBSTR(DATE_FORMAT(ts,'HH:mm'),1,4) || '0' as time_str,user_idFROM user_behavior)GROUP BY date_str
1.13 版本前的寫法有很多缺點(diǎn),首先這個(gè)聚合操作是每條記錄都會(huì)計(jì)算一次。其次,在追逆數(shù)據(jù)的時(shí)候,消費(fèi)堆積的數(shù)據(jù)時(shí),UV 大盤的曲線就會(huì)跳變。
在 1.13 版本支持了 TVF 寫法,基于 cumulate window,我們可以修改為下面的寫法,將每條數(shù)據(jù)按照 Event Time 精確地分到每個(gè) Window 里面, 每個(gè)窗口的計(jì)算通過 watermark 觸發(fā),即使在追數(shù)據(jù)場景中也不會(huì)跳變。
INSERT INTO cumulative_UVSELECT WINDOW_end,COUNT(DISTINCT user_id) as UVFROM Table(CUMULATE(Table user_behavior,DESCRIPTOR(ts),INTERVAL '10' MINUTES,INTERVAL '1' DAY)))GROUP BY WINDOW_start,WINDOW_end

■ 1.3 Window 性能優(yōu)化
內(nèi)存優(yōu)化:通過內(nèi)存預(yù)分配,緩存 window 的數(shù)據(jù),通過 window watermark 觸發(fā)計(jì)算,通過申請一些內(nèi)存 buffer 避免高頻的訪問 state;
切片優(yōu)化:將 window 切片,盡可能復(fù)用已計(jì)算結(jié)果,如 hop window,cumulate window。計(jì)算過的分片數(shù)據(jù)無需再次計(jì)算,只需對切片的計(jì)算結(jié)果進(jìn)行復(fù)用;
算子優(yōu)化:window 算子支持 local-global 優(yōu)化;同時(shí)支持 count(distinct) 自動(dòng)解熱點(diǎn)優(yōu)化;
遲到數(shù)據(jù):支持將遲到數(shù)據(jù)計(jì)算到后續(xù)分片,保證數(shù)據(jù)準(zhǔn)確性。


■ 1.4 多維數(shù)據(jù)分析


2. FLIP-162:時(shí)區(qū)和時(shí)間函數(shù)
■ 2.1 時(shí)區(qū)問題分析
PROCTIME() 函數(shù)應(yīng)該考慮時(shí)區(qū),但未考慮時(shí)區(qū);
CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW() 函數(shù)未考慮時(shí)區(qū);
Flink 的時(shí)間屬性,只支持定義在 TIMESTAMP 這種數(shù)據(jù)類型上面,這個(gè)類型是無時(shí)區(qū)的,TIMESTAMP 類型不考慮時(shí)區(qū),但用戶希望是本地時(shí)區(qū)的時(shí)間。


如果我們配置使用 TIMESTAMP,它可以是字符串類型的。用戶不管是從英國還是中國時(shí)區(qū)來觀察,這個(gè)值都是一樣的;
但是對于 TIMSTAMP_TLZ 來說,它的來源就是一個(gè) Long 值,表示從時(shí)間原點(diǎn)流逝過的時(shí)間。同一時(shí)刻,從時(shí)間原點(diǎn)流逝的時(shí)間在所有時(shí)區(qū)都是相同的,所以這個(gè) Long 值是絕對時(shí)間的概念。當(dāng)我們在不同的時(shí)區(qū)去觀察這個(gè)值,我們會(huì)用本地的時(shí)區(qū)去解釋成 “年-月-日-時(shí)-分-秒” 的可讀格式,這就是 TIMSTAMP_TLZ 類型,TIMESTAMP_LTZ 類型也更加符合用戶在不同時(shí)區(qū)下的使用習(xí)慣。

■ 2.2 時(shí)間函數(shù)糾正

在 1.13 版本之前,它總是返回 UTC 的 TIMESTAMP;
而現(xiàn)在,我們把返回類型變?yōu)榱?TIMESTAMP_LTZ。
PROCTIME 除了表示函數(shù)之外,也可以表示時(shí)間屬性的標(biāo)記。

在 1.13 版本之前,如果我們需要做按天的 window 操作,你需要手動(dòng)解決時(shí)區(qū)問題,去做一些 8 小時(shí)的偏移然后再減回去;
在 FLIP-162 中我們解決了這個(gè)問題,現(xiàn)在用戶使用的時(shí)候十分簡單,只需要聲明 proctime 屬性,因?yàn)?PROCTIME() 函數(shù)的返回值是TIMESTAMP_LTZ,所以結(jié)果是會(huì)考慮本地的時(shí)區(qū)。下圖的例子顯示了在不同的時(shí)區(qū)下,proctime 屬性的 window 的聚合是按照本地時(shí)區(qū)進(jìn)行的。

在流模式中是 per-record 計(jì)算,即每條數(shù)據(jù)都計(jì)算一次;
在 Batch 模式是 query-start 計(jì)算,即在作業(yè)開始前計(jì)算一次。例如我們常用的一些 Batch 計(jì)算引擎,如 Hive 也是在每一個(gè)批開始前計(jì)算一次。

■ 2.3 時(shí)間類型使用
當(dāng)作業(yè)的上游源數(shù)據(jù)包含了字符串的時(shí)間(如:2021-4-15 14:00:00)這樣的場景,直接聲明為 TIMESTAMP 然后把 Event time 定義在上面即可,窗口在計(jì)算的時(shí)候會(huì)基于時(shí)間字符串進(jìn)行切分,最終會(huì)計(jì)算出符合你實(shí)際想要的預(yù)想結(jié)果;

當(dāng)上游數(shù)據(jù)源的打點(diǎn)時(shí)間屬于 long 值,表示的是一個(gè)絕對時(shí)間的含義。在 1.13 版本你可以把 Event time 定義在 TIMESTAMP_LTZ 上面。此時(shí)定義在 TIMESTAMP_LTZ 類型上的各種 WINDOW 聚合,都能夠自動(dòng)的解決 8 小時(shí)的時(shí)區(qū)偏移問題,無需按照之前的 SQL 寫法額外做時(shí)區(qū)的修改和訂正。

■ 2.4 夏令時(shí)支持

三、重要改進(jìn)解讀
1. FLIP-152:提升 Hive 語法兼容性



2. FLIP-163:改進(jìn) SQL Client


通過 SET SQL-client.verbose = true , 開啟 verbose,通過開啟 verbose 打印整個(gè)信息,相對以前只輸出一句話更加容易追蹤錯(cuò)誤信息; 通過 SET execution.runtime-mode=streaming / batch 支持設(shè)置批/流作業(yè)模式; 通過 SET pipline.name=my_Flink_job 設(shè)置作業(yè)名稱; 通過 SET execution.savepoint.path=/tmp/Flink-savepoints/savepoint-bb0dab 設(shè)置作業(yè) savepoint 路徑; 對于有依賴的多個(gè)作業(yè),通過 SET Table.dml-sync=true 去選擇是否異步執(zhí)行,例如離線作業(yè),作業(yè) a 跑完才能跑作業(yè) b,通過設(shè)置為 true 實(shí)現(xiàn)執(zhí)行有依賴關(guān)系的 pipeline 調(diào)度。
4. 同時(shí)支持 STATEMENT SET語法:

在 1.13 版本之前需要啟動(dòng) 2 個(gè) query 去完成這個(gè)作業(yè); 在 1.13 版本,我們可以把這些放到一個(gè) statement 里面,以一個(gè)作業(yè)的方式去執(zhí)行,能夠?qū)崿F(xiàn)節(jié)點(diǎn)的復(fù)用,節(jié)約資源。
3. FLIP-136:增強(qiáng) DataStream 和 Table 的轉(zhuǎn)換
支持 DataStream 和 Table 轉(zhuǎn)換時(shí)傳遞 EVENT TIME 和 WATERMARK;
Table Table = TableEnv.fromDataStream(dataStream,Schema.newBuilder().columnByMetadata("rowtime","TIMESTMP(3)").watermark("rowtime","SOURCE_WATERMARK()").build());)
支持 Changelog 數(shù)據(jù)流在 Table 和 DataStream 間相互轉(zhuǎn)換。
//DATASTREAM 轉(zhuǎn) TableStreamTableEnvironment.fromChangelogStream(DataStream<ROW>): TableStreamTableEnvironment.fromChangelogStream(DataStream<ROW>,Schema): Table//Table 轉(zhuǎn) DATASTREAMStreamTableEnvironment.toChangelogStream(Table): DataStream<ROW>StreamTableEnvironment.toChangelogStream(Table,Schema): DataStream<ROW>
四、Flink SQL 1.14 未來規(guī)劃
1.14 版本主要有以下幾點(diǎn)規(guī)劃:
刪除 Legacy Planner:從 Flink 1.9 開始,在阿里貢獻(xiàn)了 Blink-Planner 之后,很多一些新的 Feature 已經(jīng)基于此 Blink Planner 進(jìn)行開發(fā),以前舊的 Legacy Planner 會(huì)徹底刪除;
完善 Window TVF:支持 session window,支持 window TVF 的 allow -lateness 等;
提升 Schema Handling:全鏈路的 Schema 處理能力以及關(guān)鍵校驗(yàn)的提升;
增強(qiáng) Flink CDC 支持:增強(qiáng)對上游 CDC 系統(tǒng)的集成能力,F(xiàn)link SQL 內(nèi)更多的算子支持 CDC 數(shù)據(jù)流。
五、總結(jié)
本文詳細(xì)解讀了 Flink SQL 1.13 的核心功能和重要改進(jìn)。
支持 Window TVF;
系統(tǒng)地解決時(shí)區(qū)和時(shí)間函數(shù)問題;
提升 Hive 和 Flink 的兼容性;
改進(jìn) SQL Client;
增強(qiáng) DataStream 和 Table 的轉(zhuǎn)換。

