Flink從1.7到1.12版本升級匯總
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
回復(fù)”面試“獲取更多驚喜

一 .前言
最進(jìn)再看官方flink提供的視頻教程,發(fā)現(xiàn)入門版本因?yàn)闀r(shí)間關(guān)系都是基于1.7.x講解的. 在實(shí)際操作中跟1.12.x版本還是有差距的, 所以整理一下從1.7 版本到1.12版本之間的相對大的變動. 做到在學(xué)習(xí)的過程中可以做到心里有數(shù).
二 .Flink 1.7 版本
在 Flink 1.7.0,我們更關(guān)注實(shí)現(xiàn)快速數(shù)據(jù)處理以及以無縫方式為 Flink 社區(qū)構(gòu)建數(shù)據(jù)密集型應(yīng)用程序。我們最新版本包括一些令人興奮的新功能和改進(jìn),例如對 Scala 2.12 的支持,Exactly-Once 語義的 S3 文件接收器,復(fù)雜事件處理與流SQL的集成.
2.1. Flink中的Scala 2.12支持
Flink 1.7.0 是第一個完全支持 Scala 2.12 的版本。這可以讓用戶使用新的 Scala 版本編寫 Flink 應(yīng)用程序以及利用 Scala 2.12 的生態(tài)系統(tǒng)。
2.2. 狀態(tài)變化
在許多情況下,由于需求的變化,長期運(yùn)行的 Flink 應(yīng)用程序會在其生命周期內(nèi)發(fā)生變化。在不丟失當(dāng)前應(yīng)用程序進(jìn)度狀態(tài)的情況下更改用戶狀態(tài)是應(yīng)用程序變化的關(guān)鍵要求。Flink 1.7.0 版本中社區(qū)添加了狀態(tài)變化,允許我們靈活地調(diào)整長時(shí)間運(yùn)行的應(yīng)用程序的用戶狀態(tài)模式,同時(shí)保持與先前保存點(diǎn)的兼容。通過狀態(tài)變化,我們可以在狀態(tài)模式中添加或刪除列。當(dāng)使用 Avro 生成類作為用戶狀態(tài)時(shí),狀態(tài)模式變化可以開箱即用,這意味著狀態(tài)模式可以根據(jù) Avro 的規(guī)范進(jìn)行變化。雖然 Avro 類型是 Flink 1.7 中唯一支持模式變化的內(nèi)置類型,但社區(qū)仍在繼續(xù)致力于在未來的 Flink 版本中進(jìn)一步擴(kuò)展對其他類型的支持。
2.3. Exactly-once語義的S3 StreamingFileSink
Flink 1.6.0 中引入的 StreamingFileSink 現(xiàn)在已經(jīng)擴(kuò)展到 S3 文件系統(tǒng),并保證 Exactly-once 語義。使用此功能允許所有 S3 用戶構(gòu)建寫入 S3 的 Exactly-once 語義端到端管道。
2.4. Streaming SQL中支持MATCH_RECOGNIZE
這是 Apache Flink 1.7.0 的一個重要補(bǔ)充,它為 Flink SQL 提供了 MATCH_RECOGNIZE 標(biāo)準(zhǔn)的初始支持。此功能融合了復(fù)雜事件處理(CEP)和SQL,可以輕松地對數(shù)據(jù)流進(jìn)行模式匹配,從而實(shí)現(xiàn)一整套新的用例。此功能目前處于測試階段。
2.5. Streaming SQL中的 Temporal Tables 和 Temporal Joins
Temporal Tables 是 Apache Flink 中的一個新概念,它為表的更改歷史記錄提供(參數(shù)化)視圖,可以返回表在任何時(shí)間點(diǎn)的內(nèi)容。例如,我們可以使用具有歷史貨幣匯率的表。隨著時(shí)間的推移,表會不斷發(fā)生變化,并增加更新的匯率。Temporal Table 是一種視圖,可以返回匯率在任何時(shí)間點(diǎn)的實(shí)際狀態(tài)。通過這樣的表,可以使用正確的匯率將不同貨幣的訂單流轉(zhuǎn)換為通用貨幣。
Temporal Joins 允許 Streaming 數(shù)據(jù)與不斷變化/更新的表的內(nèi)存和計(jì)算效率的連接,使用處理時(shí)間或事件時(shí)間,同時(shí)符合ANSI SQL。
流式 SQL 的其他功能除了上面提到的主要功能外,F(xiàn)link 的 Table&SQL API 已經(jīng)擴(kuò)展到更多用例。以下內(nèi)置函數(shù)被添加到API:TO_BASE64,LOG2,LTRIM,REPEAT,REPLACE,COSH,SINH,TANH。SQL Client 現(xiàn)在支持在環(huán)境文件和 CLI 會話中自定義視圖。此外,CLI 中還添加了基本的 SQL 語句自動完成功能。社區(qū)添加了一個 Elasticsearch 6 table sink,允許存儲動態(tài)表的更新結(jié)果。
2.6. 版本化REST API
從 Flink 1.7.0 開始,REST API 已經(jīng)版本化。這保證了 Flink REST API 的穩(wěn)定性,因此可以在 Flink 中針對穩(wěn)定的 API開發(fā)第三方應(yīng)用程序。因此,未來的 Flink 升級不需要更改現(xiàn)有的第三方集成。
2.7. Kafka 2.0 Connector
Apache Flink 1.7.0 繼續(xù)添加更多的連接器,使其更容易與更多外部系統(tǒng)進(jìn)行交互。在此版本中,社區(qū)添加了 Kafka 2.0 連接器,可以從 Kafka 2.0 讀寫數(shù)據(jù)時(shí)保證 Exactly-Once 語義。
2.8. 本地恢復(fù)
Apache Flink 1.7.0 通過擴(kuò)展 Flink 的調(diào)度來完成本地恢復(fù)功能,以便在恢復(fù)時(shí)考慮之前的部署位置。如果啟用了本地恢復(fù),F(xiàn)link 將在運(yùn)行任務(wù)的機(jī)器上保留一份最新檢查點(diǎn)的本地副本。將任務(wù)調(diào)度到之前的位置,F(xiàn)link 可以通過從本地磁盤讀取檢查點(diǎn)狀態(tài)來最小化恢復(fù)狀態(tài)的網(wǎng)絡(luò)流量。此功能大大提高了恢復(fù)速度。
2.9. 刪除Flink的傳統(tǒng)模式
Apache Flink 1.7.0 標(biāo)志著 Flip-6 工作已經(jīng)完全完成并且與傳統(tǒng)模式達(dá)到功能奇偶校驗(yàn)。因此,此版本刪除了對傳統(tǒng)模式的支持。
三 .Flink 1.8 版本
新特性和改進(jìn):
Schema Evolution Story 最終版
基于 TTL 持續(xù)清除舊狀態(tài)
使用用戶定義的函數(shù)和聚合進(jìn)行 SQL 模式檢測
符合 RFC 的 CSV 格式
新的 KafkaDeserializationSchema,可以直接訪問 ConsumerRecord
FlinkKinesisConsumer 中的分片水印選項(xiàng)
DynamoDB Streams 的新用戶捕獲表更改
支持用于子任務(wù)協(xié)調(diào)的全局聚合
重要變化:
使用 Flink 捆綁 Hadoop 庫的更改:不再發(fā)布包含 hadoop 的便捷二進(jìn)制文件
FlinkKafkaConsumer 現(xiàn)在將根據(jù)主題規(guī)范過濾已恢復(fù)的分區(qū)
表 API 的 Maven 依賴更改:之前具有flink-table依賴關(guān)系的用戶需要將依賴關(guān)系從flink-table-planner更新為正確的依賴關(guān)系 flink-table-api-,具體取決于是使用 Java 還是 Scala:flink-table-api-java-bridge或者flink-table-api-scala-bridge
3.1. 使用TTL(生存時(shí)間)連續(xù)增量清除舊的Key狀態(tài)
我們在Flink 1.6(FLINK-9510)中為Key狀態(tài)引入了TTL(生存時(shí)間)。此功能允許在訪問時(shí)清理并使Key狀態(tài)條目無法訪問。另外,在編寫保存點(diǎn)/檢查點(diǎn)時(shí),現(xiàn)在也將清理狀態(tài)。Flink 1.8引入了對RocksDB狀態(tài)后端(FLINK-10471)和堆狀態(tài)后端(FLINK-10473)的舊條數(shù)的連續(xù)清理。這意味著舊的條數(shù)將(根據(jù)TTL設(shè)置)不斷被清理掉。
3.2. 恢復(fù)保存點(diǎn)時(shí)對模式遷移的新支持
使用Flink 1.7.0,我們在使用AvroSerializer時(shí)添加了對更改狀態(tài)模式的支持。使用Flink 1.8.0,我們在TypeSerializers將所有內(nèi)置遷移到新的序列化器快照抽象方面取得了很大進(jìn)展,該抽象理論上允許模式遷移。在Flink附帶的序列化程序中,我們現(xiàn)在支持PojoSerializer (FLINK-11485)和Java EnumSerializer (FLINK-11334)以及有限情況下的Kryo(FLINK-11323)的模式遷移格式。
3.3. 保存點(diǎn)兼容性
TraversableSerializer 此序列化程序(FLINK-11539)中的更新,包含Scala的Flink 1.2中的保存點(diǎn)將不再與Flink 1.8兼容??梢酝ㄟ^升級到Flink 1.3和Flink 1.7之間的版本,然后再更新至Flink 1.8來解決此限制。
3.4. RocksDB版本沖突并切換到FRocksDB(FLINK-10471)
需要切換到名為FRocksDB的RocksDB的自定義構(gòu)建,因?yàn)樾枰猂ocksDB中的某些更改來支持使用TTL進(jìn)行連續(xù)狀態(tài)清理。FRocksDB的已使用版本基于RocksDB的升級版本5.17.2。對于Mac OS X,僅支持OS X版本> =10.13的RocksDB版本5.17.2。
另見:https://github.com/facebook/rocksdb/issues/4862
3.5. Maven 依賴
使用Flink捆綁Hadoop庫的更改(FLINK-11266)
包含hadoop的便捷二進(jìn)制文件不再發(fā)布。
如果部署依賴于flink-shaded-hadoop2包含 flink-dist,則必須從下載頁面的可選組件部分手動下載并打包Hadoop jar并將其復(fù)制到/lib目錄中。另外一種方法,可以通過打包flink-dist和激活 include-hadoopmaven配置文件來構(gòu)建包含hadoop的Flink分發(fā)。
由于hadoop flink-dist默認(rèn)不再包含在內(nèi),因此指定-DwithoutHadoop何時(shí)打包flink-dist將不再影響構(gòu)建。
3.6. TaskManager配置(FLINK-11716)
TaskManagers現(xiàn)在默認(rèn)綁定到主機(jī)IP地址而不是主機(jī)名??梢酝ㄟ^配置選項(xiàng)控制此行為taskmanager.network.bind-policy。如果你的Flink集群在升級后遇到莫名其妙的連接問題,嘗試設(shè)置taskmanager.network.bind-policy: name在flink-conf.yaml 返回前的1.8的設(shè)置行為。
3.7. Table API 的變動
直接表構(gòu)造函數(shù)使用的取消預(yù)測(FLINK-11447) Flink 1.8不贊成Table在Table API中直接使用該類的構(gòu)造函數(shù)。此構(gòu)造函數(shù)以前將用于執(zhí)行與橫向表的連接。你現(xiàn)在應(yīng)該使用table.joinLateral()或 table.leftOuterJoinLateral()代替。這種更改對于將Table類轉(zhuǎn)換為接口是必要的,這將使Table API在未來更易于維護(hù)和更清潔。
引入新的CSV格式符(FLINK-9964)
此版本為符合RFC4180的CSV文件引入了新的格式符。新描述符可用作 org.apache.flink.table.descriptors.Csv。
目前,這只能與Kafka一起使用。舊描述符可org.apache.flink.table.descriptors.OldCsv用于文件系統(tǒng)連接器。
靜態(tài)生成器方法在TableEnvironment(FLINK-11445)上的棄用,為了將API與實(shí)際實(shí)現(xiàn)分開:TableEnvironment.getTableEnvironment()。
不推薦使用靜態(tài)方法。你現(xiàn)在應(yīng)該使用Batch/StreamTableEnvironment.create()。
表API Maven模塊中的更改(FLINK-11064)
之前具有flink-table依賴關(guān)系的用戶需要更新其依賴關(guān)系flink-table-planner以及正確的依賴關(guān)系flink-table-api-?,具體取決于是使用Java還是Scala:flink-table-api-java-bridge或者flink-table-api-scala-bridge。
更改為外部目錄表構(gòu)建器(FLINK-11522)
ExternalCatalogTable.builder()不贊成使用ExternalCatalogTableBuilder()。
更改為表API連接器jar的命名(FLINK-11026)
Kafka/elasticsearch6 sql-jars的命名方案已經(jīng)更改。在maven術(shù)語中,它們不再具有sql-jar限定符,而artifactId現(xiàn)在以前綴為例,flink-sql而不是flink例如flink-sql-connector-kafka。
更改為指定Null的方式(FLINK-11785)
現(xiàn)在Table API中的Null需要定義nullof(type)而不是Null(type)。舊方法已被棄用。
3.8. 連接器變動
引入可直接訪問ConsumerRecord的新KafkaDeserializationSchema(FLINK-8354)
對于FlinkKafkaConsumers,我們推出了一個新的KafkaDeserializationSchema ,可以直接訪問KafkaConsumerRecord。這包含了該 KeyedSerializationSchema功能,該功能已棄用但目前仍可以使用。
FlinkKafkaConsumer現(xiàn)在將根據(jù)主題規(guī)范過濾恢復(fù)的分區(qū)(FLINK-10342)
從Flink 1.8.0開始,現(xiàn)在FlinkKafkaConsumer總是過濾掉已恢復(fù)的分區(qū),這些分區(qū)不再與要在還原的執(zhí)行中訂閱的指定主題相關(guān)聯(lián)。此行為在以前的版本中不存在FlinkKafkaConsumer。
如果您想保留以前的行為。請使用上面的
disableFilterRestoredPartitionsWithSubscribedTopics()
配置方法FlinkKafkaConsumer。
考慮這個例子:如果你有一個正在消耗topic的Kafka Consumer A,你做了一個保存點(diǎn),然后改變你的Kafka消費(fèi)者而不是從topic消費(fèi)B,然后從保存點(diǎn)重新啟動你的工作。在此更改之前,您的消費(fèi)者現(xiàn)在將使用這兩個主題A,B因?yàn)樗鎯υ谙M(fèi)者正在使用topic消費(fèi)的狀態(tài)A。通過此更改,您的使用者將僅B在還原后使用topic,因?yàn)槲覀兪褂门渲玫膖opic過濾狀態(tài)中存儲的topic。
其它接口改變:
1、從TypeSerializer接口(FLINK-9803)中刪除了canEqual()方法
這些canEqual()方法通常用于跨類型層次結(jié)構(gòu)進(jìn)行適當(dāng)?shù)南嗟刃詸z查。在TypeSerializer實(shí)際上并不需要這個屬性,因此該方法現(xiàn)已刪除。
2、刪除CompositeSerializerSnapshot實(shí)用程序類(FLINK-11073)
該CompositeSerializerSnapshot實(shí)用工具類已被刪除。
現(xiàn)在CompositeTypeSerializerSnapshot,你應(yīng)該使用復(fù)合序列化程序的快照,該序列化程序?qū)⑿蛄谢山o多個嵌套的序列化程序。有關(guān)使用的說明,請參閱此處CompositeTypeSerializerSnapshot。
四 .Flink 1.9 版本
2019年 8月22日,Apache Flink 1.9.0 版本正式發(fā)布,這也是阿里內(nèi)部版本 Blink 合并入 Flink 后的首次版本發(fā)布。
此次版本更新帶來的重大功能包括批處理作業(yè)的批式恢復(fù),以及 Table API 和 SQL 的基于 Blink 的新查詢引擎(預(yù)覽版)。同時(shí),這一版本還推出了 State Processor API,這是社區(qū)最迫切需求的功能之一,該 API 使用戶能夠用 Flink DataSet 作業(yè)靈活地讀寫保存點(diǎn)。此外,F(xiàn)link 1.9 還包括一個重新設(shè)計(jì)的 WebUI 和新的 Python Table API (預(yù)覽版)以及與 Apache Hive 生態(tài)系統(tǒng)的集成(預(yù)覽版)。
新功能和改進(jìn)
細(xì)粒度批作業(yè)恢復(fù) (FLIP-1)
State Processor API (FLIP-43)
Stop-with-Savepoint (FLIP-34)
重構(gòu) Flink WebUI
預(yù)覽新的 Blink SQL 查詢處理器
Table API / SQL 的其他改進(jìn)
預(yù)覽 Hive 集成 (FLINK-10556)
預(yù)覽新的 Python Table API (FLIP-38)
4.1. 細(xì)粒度批作業(yè)恢復(fù) (FLIP-1)
批作業(yè)(DataSet、Table API 和 SQL)從 task 失敗中恢復(fù)的時(shí)間被顯著縮短了。在 Flink 1.9 之前,批處理作業(yè)中的 task 失敗是通過取消所有 task 并重新啟動整個作業(yè)來恢復(fù)的,即作業(yè)從頭開始,所有進(jìn)度都會廢棄。在此版本中,F(xiàn)link 將中間結(jié)果保留在網(wǎng)絡(luò) shuffle 的邊緣,并使用此數(shù)據(jù)去恢復(fù)那些僅受故障影響的 task。所謂 task 的 “failover regions” (故障區(qū))是指通過 pipelined 方式連接的數(shù)據(jù)交換方式,定義了 task 受故障影響的邊界。

要使用這個新的故障策略,需要確保 flink-conf.yaml 中有 jobmanager.execution.failover-strategy: region 的配置。
注意:1.9 發(fā)布包中默認(rèn)就已經(jīng)包含了該配置項(xiàng),不過當(dāng)從之前版本升級上來時(shí),如果要復(fù)用之前的配置的話,需要手動加上該配置。
“Region” 的故障策略也能同時(shí)提升 “embarrassingly parallel” 類型的流作業(yè)的恢復(fù)速度,也就是沒有任何像 keyBy() 和 rebalance 的 shuffle 的作業(yè)。當(dāng)這種作業(yè)在恢復(fù)時(shí),只有受影響的故障區(qū)的 task 需要重啟。對于其他類型的流作業(yè),故障恢復(fù)行為與之前的版本一樣。
4.2. State Processor API (FLIP-43)
直到 Flink 1.9,從外部訪問作業(yè)的狀態(tài)僅局限于:Queryable State(可查詢狀態(tài))實(shí)驗(yàn)性功能。此版本中引入了一種新的、強(qiáng)大的類庫,基于 DataSet 支持讀取、寫入、和修改狀態(tài)快照。在實(shí)踐上,這意味著:
Flink 作業(yè)的狀態(tài)可以自主構(gòu)建了,可以通過讀取外部系統(tǒng)的數(shù)據(jù)(例如外部數(shù)據(jù)庫),然后轉(zhuǎn)換成 savepoint。
Savepoint 中的狀態(tài)可以使用任意的 Flink 批處理 API 查詢(DataSet、Table、SQL)。例如,分析相關(guān)的狀態(tài)模式或檢查狀態(tài)差異以支持應(yīng)用程序?qū)徍嘶蚬收吓挪椤?/span>
Savepoint 中的狀態(tài) schema 可以離線遷移了,而之前的方案只能在訪問狀態(tài)時(shí)進(jìn)行,是一種在線遷移。
Savepoint 中的無效數(shù)據(jù)可以被識別出來并糾正。
新的 State Processor API 覆蓋了所有類型的快照:savepoint,full checkpoint 和 incremental checkpoint。
4.3. Stop-with-Savepoint (FLIP-34)
“Cancel-with-savepoint” 是停止、重啟、fork、或升級 Flink 作業(yè)的一個常用操作。然而,當(dāng)前的實(shí)現(xiàn)并沒有保證輸出到 exactly-once sink 的外部存儲的數(shù)據(jù)持久化。為了改進(jìn)停止作業(yè)時(shí)的端到端語義,F(xiàn)link 1.9 引入了一種新的 SUSPEND 模式,可以帶 savepoint 停止作業(yè),保證了輸出數(shù)據(jù)的一致性。你可以使用 Flink CLI 來 suspend 一個作業(yè):
bin/flink stop -p [:targetSavepointDirectory] :jobId
4.4. 重構(gòu) Flink WebUI
社區(qū)討論了現(xiàn)代化 Flink WebUI 的提案,決定采用 Angular 的最新穩(wěn)定版來重構(gòu)這個組件。從 Angular 1.x 躍升到了 7.x 。重新設(shè)計(jì)的 UI 是 1.9.0 的默認(rèn)版本,不過有一個按鈕可以切換到舊版的 WebUI。
4.5. 新 Blink SQL 查詢處理器預(yù)覽
在 Blink 捐贈給 Apache Flink 之后,社區(qū)就致力于為 Table API 和 SQL 集成 Blink 的查詢優(yōu)化器和 runtime。第一步,我們將 flink-table 單模塊重構(gòu)成了多個小模塊(FLIP-32)。這對于 Java 和 Scala API 模塊、優(yōu)化器、以及 runtime 模塊來說,有了一個更清晰的分層和定義明確的接口。

緊接著,我們擴(kuò)展了 Blink 的 planner 以實(shí)現(xiàn)新的優(yōu)化器接口,所以現(xiàn)在有兩個插件化的查詢處理器來執(zhí)行 Table API 和 SQL:1.9 以前的 Flink 處理器和新的基于 Blink 的處理器?;?Blink 的查詢處理器提供了更好地 SQL 覆蓋率(1.9 完整支持 TPC-H,TPC-DS 的支持在下一個版本的計(jì)劃中)并通過更廣泛的查詢優(yōu)化(基于成本的執(zhí)行計(jì)劃選擇和更多的優(yōu)化規(guī)則)、改進(jìn)的代碼生成機(jī)制、和調(diào)優(yōu)過的算子實(shí)現(xiàn)來提升批處理查詢的性能。除此之外,基于 Blink 的查詢處理器還提供了更強(qiáng)大的流處理能力,包括一些社區(qū)期待已久的新功能(如維表 Join,TopN,去重)和聚合場景緩解數(shù)據(jù)傾斜的優(yōu)化,以及內(nèi)置更多常用的函數(shù)。
注:兩個查詢處理器之間的語義和功能大部分是一致的,但并未完全對齊。具體請查看發(fā)布日志。
不過, Blink 的查詢處理器的集成還沒有完全完成。因此,1.9 之前的 Flink 處理器仍然是1.9 版本的默認(rèn)處理器,建議用于生產(chǎn)設(shè)置。你可以在創(chuàng)建 TableEnvironment 時(shí)通過 EnvironmentSettings 配置啟用 Blink 處理器。被選擇的處理器必須要在正在執(zhí)行的 Java 進(jìn)程的類路徑中。對于集群設(shè)置,默認(rèn)兩個查詢處理器都會自動地加載到類路徑中。當(dāng)從 IDE 中運(yùn)行一個查詢時(shí),需要在項(xiàng)目中顯式地增加一個處理器的依賴。
4.6. Table API / SQL 的其他改進(jìn)
除了圍繞 Blink Planner 令人興奮的進(jìn)展外,社區(qū)還做了一系列的改進(jìn),包括:
為 Table API / SQL 的 Java 用戶去除 Scala 依賴 (FLIP-32) 作為重構(gòu)和拆分 flink-table 模塊工作的一部分,我們?yōu)?Java 和 Scala 創(chuàng)建了兩個單獨(dú)的 API 模塊。對于 Scala 用戶來說,沒有什么改變。不過現(xiàn)在 Java 用戶在使用 Table API 和 SQL 時(shí),可以不用引入一堆 Scala 依賴了。
重構(gòu) Table API / SQL 的類型系統(tǒng)(FLIP-37) 我們實(shí)現(xiàn)了一個新的數(shù)據(jù)類型系統(tǒng),以便從 Table API 中移除對 Flink TypeInformation 的依賴,并提高其對 SQL 標(biāo)準(zhǔn)的遵從性。不過還在進(jìn)行中,預(yù)計(jì)將在下一版本完工,在 Flink 1.9 中,UDF 尚未移植到新的類型系統(tǒng)上。
Table API 的多行多列轉(zhuǎn)換(FLIP-29) Table API 擴(kuò)展了一組支持多行和多列、輸入和輸出的轉(zhuǎn)換的功能。這些轉(zhuǎn)換顯著簡化了處理邏輯的實(shí)現(xiàn),同樣的邏輯使用關(guān)系運(yùn)算符來實(shí)現(xiàn)是比較麻煩的。
嶄新的統(tǒng)一的 Catalog API Catalog 已有的一些接口被重構(gòu)和(某些)被替換了,從而統(tǒng)一了內(nèi)部和外部 catalog 的處理。這項(xiàng)工作主要是為了 Hive 集成(見下文)而啟動的,不過也改進(jìn)了 Flink 在管理 catalog 元數(shù)據(jù)的整體便利性。
SQL API 中的 DDL 支持 (FLINK-10232) 到目前為止,F(xiàn)link SQL 已經(jīng)支持 DML 語句(如 SELECT,INSERT)。但是外部表(table source 和 table sink)必須通過 Java/Scala 代碼的方式或配置文件的方式注冊。1.9 版本中,我們支持 SQL DDL 語句的方式注冊和刪除表(CREATE TABLE,DROP TABLE)。然而,我們還沒有增加流特定的語法擴(kuò)展來定義時(shí)間戳抽取和 watermark 生成策略等。流式的需求將會在下一版本完整支持。
五 .Flink 1.10 版本 [重要版本 : Blink 整合完成]
作為 Flink 社區(qū)迄今為止規(guī)模最大的一次版本升級,F(xiàn)link 1.10 容納了超過 200 位貢獻(xiàn)者對超過 1200 個 issue 的開發(fā)實(shí)現(xiàn),包含對 Flink 作業(yè)的整體性能及穩(wěn)定性的顯著優(yōu)化、對原生 Kubernetes 的初步集成(beta 版本)以及對 Python 支持(PyFlink)的重大優(yōu)化。
Flink 1.10 同時(shí)還標(biāo)志著對 Blink[1] 的整合宣告完成,隨著對 Hive 的生產(chǎn)級別集成及對 TPC-DS 的全面覆蓋,F(xiàn)link 在增強(qiáng)流式 SQL 處理能力的同時(shí)也具備了成熟的批處理能力。
5.1. 內(nèi)存管理及配置優(yōu)化
Flink 目前的 TaskExecutor 內(nèi)存模型存在著一些缺陷,導(dǎo)致優(yōu)化資源利用率比較困難,例如:
流和批處理內(nèi)存占用的配置模型不同;流處理中的 RocksDB state backend 需要依賴用戶進(jìn)行復(fù)雜的配置。為了讓內(nèi)存配置變的對于用戶更加清晰、直觀,F(xiàn)link 1.10 對 TaskExecutor 的內(nèi)存模型和配置邏輯進(jìn)行了較大的改動 (FLIP-49 [7])。這些改動使得 Flink 能夠更好地適配所有部署環(huán)境(例如 Kubernetes, Yarn, Mesos),讓用戶能夠更加嚴(yán)格的控制其內(nèi)存開銷。
Managed 內(nèi)存擴(kuò)展
Managed 內(nèi)存的范圍有所擴(kuò)展,還涵蓋了 RocksDB state backend 使用的內(nèi)存。盡管批處理作業(yè)既可以使用堆內(nèi)內(nèi)存也可以使用堆外內(nèi)存,使用 RocksDB state backend 的流處理作業(yè)卻只能利用堆外內(nèi)存。因此為了讓用戶執(zhí)行流和批處理作業(yè)時(shí)無需更改集群的配置,我們規(guī)定從現(xiàn)在起 managed 內(nèi)存只能在堆外。
簡化 RocksDB 配置
此前,配置像 RocksDB 這樣的堆外 state backend 需要進(jìn)行大量的手動調(diào)試,例如減小 JVM 堆空間、設(shè)置 Flink 使用堆外內(nèi)存等。現(xiàn)在,F(xiàn)link 的開箱配置即可支持這一切,且只需要簡單地改變 managed 內(nèi)存的大小即可調(diào)整 RocksDB state backend 的內(nèi)存預(yù)算。
另一個重要的優(yōu)化是,F(xiàn)link 現(xiàn)在可以限制 RocksDB 的 native 內(nèi)存占用(FLINK-7289 [8]),以避免超過總的內(nèi)存預(yù)算——這對于 Kubernetes 等容器化部署環(huán)境尤為重要。關(guān)于如何開啟、調(diào)試該特性,請參考 RocksDB 調(diào)試[9]。
注:FLIP-49 改變了集群的資源配置過程,因此從以前的 Flink 版本升級時(shí)可能需要對集群配置進(jìn)行調(diào)整。詳細(xì)的變更日志及調(diào)試指南請參考文檔[10]。
5.2. 統(tǒng)一的作業(yè)提交邏輯
在此之前,提交作業(yè)是由執(zhí)行環(huán)境負(fù)責(zé)的,且與不同的部署目標(biāo)(例如 Yarn, Kubernetes, Mesos)緊密相關(guān)。這導(dǎo)致用戶需要針對不同環(huán)境保留多套配置,增加了管理的成本。
在 Flink 1.10 中,作業(yè)提交邏輯被抽象到了通用的 Executor 接口(FLIP-73 [11])。新增加的 ExecutorCLI (FLIP-81 [12])引入了為任意執(zhí)行目標(biāo)[13]指定配置參數(shù)的統(tǒng)一方法。此外,隨著引入 JobClient(FLINK-74 [14])負(fù)責(zé)獲取 JobExecutionResult,獲取作業(yè)執(zhí)行結(jié)果的邏輯也得以與作業(yè)提交解耦。
5.3. 原生 Kubernetes 集成(Beta)
對于想要在容器化環(huán)境中嘗試 Flink 的用戶來說,想要在 Kubernetes 上部署和管理一個 Flink standalone 集群,首先需要對容器、算子及像 kubectl 這樣的環(huán)境工具有所了解。
在 Flink 1.10 中,我們推出了初步的支持 session 模式的主動 Kubernetes 集成(FLINK-9953 [15])。其中,“主動”指 Flink ResourceManager (K8sResMngr) 原生地與 Kubernetes 通信,像 Flink 在 Yarn 和 Mesos 上一樣按需申請 pod。用戶可以利用 namespace,在多租戶環(huán)境中以較少的資源開銷啟動 Flink。這需要用戶提前配置好 RBAC 角色和有足夠權(quán)限的服務(wù)賬號。

正如在統(tǒng)一的作業(yè)提交邏輯一節(jié)中提到的,F(xiàn)link 1.10 將命令行參數(shù)映射到了統(tǒng)一的配置。因此,用戶可以參閱 Kubernetes 配置選項(xiàng),在命令行中使用以下命令向 Kubernetes 提交 Flink 作業(yè)。
./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id= examples/streaming/WindowJoin.jar
5.4. Table API/SQL: 生產(chǎn)可用的 Hive 集成
Flink 1.9 推出了預(yù)覽版的 Hive 集成。該版本允許用戶使用 SQL DDL 將 Flink 特有的元數(shù)據(jù)持久化到 Hive Metastore、調(diào)用 Hive 中定義的 UDF 以及讀、寫 Hive 中的表。Flink 1.10 進(jìn)一步開發(fā)和完善了這一特性,帶來了全面兼容 Hive 主要版本[17]的生產(chǎn)可用的 Hive 集成。
Batch SQL 原生分區(qū)支持
此前,F(xiàn)link 只支持寫入未分區(qū)的 Hive 表。在 Flink 1.10 中,F(xiàn)link SQL 擴(kuò)展支持了 INSERT OVERWRITE 和 PARTITION 的語法(FLIP-63 [18]),允許用戶寫入 Hive 中的靜態(tài)和動態(tài)分區(qū)。
寫入靜態(tài)分區(qū)
INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 …)] select_statement1 FROM from_statement;
寫入動態(tài)分區(qū)
INSERT { INTO | OVERWRITE } TABLE tablename1 select_statement1 FROM from_statement;
對分區(qū)表的全面支持,使得用戶在讀取數(shù)據(jù)時(shí)能夠受益于分區(qū)剪枝,減少了需要掃描的數(shù)據(jù)量,從而大幅提升了這些操作的性能。
其他優(yōu)化
除了分區(qū)剪枝,F(xiàn)link 1.10 的 Hive 集成還引入了許多數(shù)據(jù)讀取[19]方面的優(yōu)化,例如:
投影下推:Flink 采用了投影下推技術(shù),通過在掃描表時(shí)忽略不必要的域,最小化 Flink 和 Hive 表之間的數(shù)據(jù)傳輸量。這一優(yōu)化在表的列數(shù)較多時(shí)尤為有效。
LIMIT 下推:對于包含 LIMIT 語句的查詢,F(xiàn)link 在所有可能的地方限制返回的數(shù)據(jù)條數(shù),以降低通過網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量。讀取數(shù)據(jù)時(shí)的 ORC 向量化:為了提高讀取 ORC 文件的性能,對于 Hive 2.0.0 及以上版本以及非復(fù)合數(shù)據(jù)類型的列,F(xiàn)link 現(xiàn)在默認(rèn)使用原生的 ORC 向量化讀取器。
將可插拔模塊作為 Flink 內(nèi)置對象(Beta)
Flink 1.10 在 Flink table 核心引入了通用的可插拔模塊機(jī)制,目前主要應(yīng)用于系統(tǒng)內(nèi)置函數(shù)(FLIP-68 [20])。通過模塊,用戶可以擴(kuò)展 Flink 的系統(tǒng)對象,例如像使用 Flink 系統(tǒng)函數(shù)一樣使用 Hive 內(nèi)置函數(shù)。新版本中包含一個預(yù)先實(shí)現(xiàn)好的 HiveModule,能夠支持多個 Hive 版本,當(dāng)然用戶也可以選擇編寫自己的可插拔模塊。
5.5. 其他 Table API/SQL 優(yōu)化
SQL DDL 中的 watermark 和計(jì)算列
Flink 1.10 在 SQL DDL 中增加了針對流處理定義時(shí)間屬性及產(chǎn)生 watermark 的語法擴(kuò)展(FLIP-66 [22])。這使得用戶可以在用 DDL 語句創(chuàng)建的表上進(jìn)行基于時(shí)間的操作(例如窗口)以及定義 watermark 策略。
CREATE TABLE table_name (
WATERMARK FOR columnName AS <watermark_strategy_expression>
) WITH (
...
)
其他 SQL DDL 擴(kuò)展
Flink 現(xiàn)在嚴(yán)格區(qū)分臨時(shí)/持久、系統(tǒng)/目錄函數(shù)(FLIP-57 [24])。這不僅消除了函數(shù)引用中的歧義,還帶來了確定的函數(shù)解析順序(例如,當(dāng)存在命名沖突時(shí),比起目錄函數(shù)、持久函數(shù) Flink 會優(yōu)先使用系統(tǒng)函數(shù)、臨時(shí)函數(shù))。
在 FLIP-57 的基礎(chǔ)上,我們擴(kuò)展了 SQL DDL 的語法,支持創(chuàng)建目錄函數(shù)、臨時(shí)函數(shù)以及臨時(shí)系統(tǒng)函數(shù)(FLIP-79):
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
[IF NOT EXISTS] [catalog_name.][db_name.]function_name
AS identifier [LANGUAGE JAVA|SCALA]
關(guān)于目前完整的 Flink SQL DDL 支持,請參考最新的文檔[26]。
注:為了今后正確地處理和保證元對象(表、視圖、函數(shù))上的行為一致性,F(xiàn)link 廢棄了 Table API 中的部分對象申明方法,以使留下的方法更加接近標(biāo)準(zhǔn)的 SQL DDL(FLIP-64)。
批處理完整的 TPC-DS 覆蓋
TPC-DS 是廣泛使用的業(yè)界標(biāo)準(zhǔn)決策支持 benchmark,用于衡量基于 SQL 的數(shù)據(jù)處理引擎性能。Flink 1.10 端到端地支持所有 TPC-DS 查詢(FLINK-11491 [28]),標(biāo)志著 Flink SQL 引擎已經(jīng)具備滿足現(xiàn)代數(shù)據(jù)倉庫及其他類似的處理需求的能力。
5.6. PyFlink: 支持原生用戶自定義函數(shù)(UDF)
作為 Flink 全面支持 Python 的第一步,在之前版本中我們發(fā)布了預(yù)覽版的 PyFlink。在新版本中,我們專注于讓用戶在 Table API/SQL 中注冊并使用自定義函數(shù)(UDF,另 UDTF / UDAF 規(guī)劃中)(FLIP-58)。

如果你對這一特性的底層實(shí)現(xiàn)(基于 Apache Beam 的可移植框架)感興趣,請參考 FLIP-58 的 Architecture 章節(jié)以及 FLIP-78。這些數(shù)據(jù)結(jié)構(gòu)為支持 Pandas 以及今后將 PyFlink 引入到 DataStream API 奠定了基礎(chǔ)。
從 Flink 1.10 開始,用戶只要執(zhí)行以下命令就可以輕松地通過 pip 安裝 PyFlink:
pip install apache-flink
5.7. 重要變更
FLINK-10725[34]:Flink 現(xiàn)在可以使用 Java 11 編譯和運(yùn)行。
FLINK-15495[35]:SQL 客戶端現(xiàn)在默認(rèn)使用 Blink planner,向用戶提供最新的特性及優(yōu)化。Table API 同樣計(jì)劃在下個版本中從舊的 planner 切換到 Blink planner,我們建議用戶現(xiàn)在就開始嘗試和熟悉 Blink planner。
FLINK-13025[36]:新的 Elasticsearch sink connector[37] 全面支持 Elasticsearch 7.x 版本。
FLINK-15115[38]:Kafka 0.8 和 0.9 的 connector 已被標(biāo)記為廢棄并不再主動支持。如果你還在使用這些版本或有其他相關(guān)問題,請通過 @dev 郵件列表聯(lián)系我們。
FLINK-14516[39]:非基于信用的網(wǎng)絡(luò)流控制已被移除,同時(shí)移除的還有配置項(xiàng)“taskmanager.network.credit.model”。今后,F(xiàn)link 將總是使用基于信用的網(wǎng)絡(luò)流控制。
FLINK-12122[40]:在 Flink 1.5.0 中,F(xiàn)LIP-6[41] 改變了 slot 在 TaskManager 之間的分布方式。要想使用此前的調(diào)度策略,既盡可能將負(fù)載分散到所有當(dāng)前可用的 TaskManager,用戶可以在 flink-conf.yaml 中設(shè)置 “cluster.evenly-spread-out-slots: true”。
FLINK-11956[42]:s3-hadoop 和 s3-presto 文件系統(tǒng)不再使用類重定位加載方式,而是使用插件方式加載,同時(shí)無縫集成所有認(rèn)證提供者。我們強(qiáng)烈建議其他文件系統(tǒng)也只使用插件加載方式,并將陸續(xù)移除重定位加載方式。
Flink 1.9 推出了新的 Web UI,同時(shí)保留了原來的 Web UI 以備不時(shí)之需。截至目前,我們沒有收到關(guān)于新的 UI 存在問題的反饋,因此社區(qū)投票決定在 Flink 1.10 中移除舊的 Web UI。
原文: https://developer.aliyun.com/article/744734
官方地址: https://flink.apache.org/news/2020/02/11/release-1.10.0.html
5.7. 重要變更 FLINK-10725[34]:Flink 現(xiàn)在可以使用 Java 11 編譯和運(yùn)行。FLINK-15495[35]:SQL 客戶端現(xiàn)在默認(rèn)使用 Blink planner,向用戶提供最新的特性及優(yōu)化。Table API 同樣計(jì)劃在下個版本中從舊的 planner 切換到 Blink planner,我們建議用戶現(xiàn)在就開始嘗試和熟悉 Blink planner。FLINK-13025[36]:新的 Elasticsearch sink connector[37] 全面支持 Elasticsearch 7.x 版本。FLINK-15115[38]:Kafka 0.8 和 0.9 的 connector 已被標(biāo)記為廢棄并不再主動支持。如果你還在使用這些版本或有其他相關(guān)問題,請通過 @dev 郵件列表聯(lián)系我們。FLINK-14516[39]:非基于信用的網(wǎng)絡(luò)流控制已被移除,同時(shí)移除的還有配置項(xiàng)“taskmanager.network.credit.model”。今后,F(xiàn)link 將總是使用基于信用的網(wǎng)絡(luò)流控制。FLINK-12122[40]:在 Flink 1.5.0 中,F(xiàn)LIP-6[41] 改變了 slot 在 TaskManager 之間的分布方式。要想使用此前的調(diào)度策略,既盡可能將負(fù)載分散到所有當(dāng)前可用的 TaskManager,用戶可以在 flink-conf.yaml 中設(shè)置 “cluster.evenly-spread-out-slots: true”。FLINK-11956[42]:s3-hadoop 和 s3-presto 文件系統(tǒng)不再使用類重定位加載方式,而是使用插件方式加載,同時(shí)無縫集成所有認(rèn)證提供者。我們強(qiáng)烈建議其他文件系統(tǒng)也只使用插件加載方式,并將陸續(xù)移除重定位加載方式。Flink 1.9 推出了新的 Web UI,同時(shí)保留了原來的 Web UI 以備不時(shí)之需。截至目前,我們沒有收到關(guān)于新的 UI 存在問題的反饋,因此社區(qū)投票決定[43]在 Flink 1.10 中移除舊的 Web UI。
原文: https://developer.aliyun.com/article/744734 官方地址: https://flink.apache.org/news/2020/02/11/release-1.10.0.html?spm=a2c6h.12873639.0.0.749e4fc0c1D14m
六 .Flink 1.11 版本 [重要版本]
Flink 1.11.0 正式發(fā)布。歷時(shí)近 4 個月,F(xiàn)link 在生態(tài)、易用性、生產(chǎn)可用性、穩(wěn)定性等方面都進(jìn)行了增強(qiáng)和改善。
core engine 引入了 unaligned checkpoints,這是對 Flink 的容錯機(jī)制的重大更改,該機(jī)制可改善在高背壓下的檢查點(diǎn)性能。
一個新的 Source API 通過統(tǒng)一批處理和 streaming 執(zhí)行以及將內(nèi)部組件(例如事件時(shí)間處理、水印生成或空閑檢測)卸載到 Flink 來簡化(自定義)sources 的實(shí)現(xiàn)。
Flink SQL 引入了對變更數(shù)據(jù)捕獲(CDC)的支持,以輕松使用和解釋來自 Debezium 之類的工具的數(shù)據(jù)庫變更日志。更新的 FileSystem 連接器還擴(kuò)展了 Table API/SQL 支持的用例和格式集,從而實(shí)現(xiàn)了直接啟用從 Kafka 到 Hive 的 streaming 數(shù)據(jù)傳輸?shù)确桨浮?/span>
PyFlink 的多項(xiàng)性能優(yōu)化,包括對矢量化用戶定義函數(shù)(Pandas UDF)的支持。這改善了與 Pandas 和 NumPy 之類庫的互操作性,使 Flink 在數(shù)據(jù)科學(xué)和 ML 工作負(fù)載方面更強(qiáng)大。
重要變化
[FLINK-17339] 從 Flink 1.11 開始,Blink planner 是 Table API/SQL中的默認(rèn)設(shè)置。自 Flink 1.10 起,SQL 客戶端已經(jīng)存在這種情況。仍支持舊的 Flink 規(guī)劃器,但未積極開發(fā)。
[FLINK-5763] Savepoints 現(xiàn)在將其所有狀態(tài)包含在一個目錄中(元數(shù)據(jù)和程序狀態(tài))。這樣可以很容易地找出組成 savepoint 狀態(tài)的文件,并允許用戶通過簡單地移動目錄來重新定位 savepoint。
[FLINK-16408] 為了減輕對 JVM metaspace 的壓力,只要任務(wù)分配了至少一個插槽,TaskExecutor就會重用用戶代碼類加載器。這會稍微改變 Flink 的恢復(fù)行為,從而不會重新加載靜態(tài)字段。
[FLINK-11086] Flink 現(xiàn)在支持 Hadoop 3.0.0 以上的 Hadoop 版本。請注意,F(xiàn)link 項(xiàng)目不提供任何更新的flink-shaded-hadoop-x jars。用戶需要通過HADOOP_CLASSPATH環(huán)境變量(推薦)或 lib/ folder 提供 Hadoop 依賴項(xiàng)。
[FLINK-16963] Flink 隨附的所有MetricReporters均已轉(zhuǎn)換為插件。這些不再應(yīng)該放在/lib中(可能導(dǎo)致依賴沖突),而應(yīng)該放在/plugins/< some_directory>中。
[FLINK-12639] Flink 文檔正在做一些返工,因此從 Flink 1.11 開始,內(nèi)容的導(dǎo)航和組織會有所變化。
官方原文: https://flink.apache.org/news/2020/07/06/release-1.11.0.html
6.1. Table & SQL 支持 Change Data Capture(CDC)
CDC 被廣泛使用在復(fù)制數(shù)據(jù)、更新緩存、微服務(wù)間同步數(shù)據(jù)、審計(jì)日志等場景,很多公司都在使用開源的 CDC 工具,如 MySQL CDC。通過 Flink 支持在 Table & SQL 中接入和解析 CDC 是一個強(qiáng)需求,在過往的很多討論中都被提及過,可以幫助用戶以實(shí)時(shí)的方式處理 changelog 流,進(jìn)一步擴(kuò)展 Flink 的應(yīng)用場景,例如把 MySQL 中的數(shù)據(jù)同步到 PG 或 ElasticSearch 中,低延時(shí)的 temporal join 一個 changelog 等。
除了考慮到上面的真實(shí)需求,F(xiàn)link 中定義的“Dynamic Table”概念在流上有兩種模型:append 模式和 update 模式。通過 append 模式把流轉(zhuǎn)化為“Dynamic Table”在之前的版本中已經(jīng)支持,因此在 1.11.0 中進(jìn)一步支持 update 模式也從概念層面完整的實(shí)現(xiàn)了“Dynamic Table”。

為了支持解析和輸出 changelog,如何在外部系統(tǒng)和 Flink 系統(tǒng)之間編解碼這些更新操作是首要解決的問題。考慮到 source 和 sink 是銜接外部系統(tǒng)的一個橋梁,因此 FLIP-95 在定義全新的 Table source 和 Table sink 接口時(shí)解決了這個問題。
在公開的 CDC 調(diào)研報(bào)告中,Debezium 和 Canal 是用戶中最流行使用的 CDC 工具,這兩種工具用來同步 changelog 到其它的系統(tǒng)中,如消息隊(duì)列。據(jù)此,F(xiàn)LIP-105 首先支持了 Debezium 和 Canal 這兩種格式,而且 Kafka source 也已經(jīng)可以支持解析上述格式并輸出更新事件,在后續(xù)的版本中會進(jìn)一步支持 Avro(Debezium) 和 Protobuf(Canal)。
CREATE TABLE my_table (
...) WITH (
'connector'='...', -- e.g. 'kafka'
'format'='debezium-json',
'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)
'debezium-json.ignore-parse-errors'='true' -- default: false
);
6.2. Table & SQL 支持 JDBC Catalog
1.11.0 之前,用戶如果依賴 Flink 的 source/sink 讀寫關(guān)系型數(shù)據(jù)庫或讀取 changelog 時(shí),必須要手動創(chuàng)建對應(yīng)的 schema。而且當(dāng)數(shù)據(jù)庫中的 schema 發(fā)生變化時(shí),也需要手動更新對應(yīng)的 Flink 作業(yè)以保持一致和類型匹配,任何不匹配都會造成運(yùn)行時(shí)報(bào)錯使作業(yè)失敗。用戶經(jīng)常抱怨這個看似冗余且繁瑣的流程,體驗(yàn)極差。
實(shí)際上對于任何和 Flink 連接的外部系統(tǒng)都可能有類似的上述問題,在 1.11.0 中重點(diǎn)解決了和關(guān)系型數(shù)據(jù)庫對接的這個問題。FLIP-93 提供了 JDBC catalog 的基礎(chǔ)接口以及 Postgres catalog 的實(shí)現(xiàn),這樣方便后續(xù)實(shí)現(xiàn)與其它類型的關(guān)系型數(shù)據(jù)庫的對接。
1.11.0 版本后,用戶使用 Flink SQL 時(shí)可以自動獲取表的 schema 而不再需要輸入 DDL。除此之外,任何 schema 不匹配的錯誤都會在編譯階段提前進(jìn)行檢查報(bào)錯,避免了之前運(yùn)行時(shí)報(bào)錯造成的作業(yè)失敗。這是提升易用性和用戶體驗(yàn)的一個典型例子。
6.3. Hive 實(shí)時(shí)數(shù)倉
從 1.9.0 版本開始 Flink 從生態(tài)角度致力于集成 Hive,目標(biāo)打造批流一體的 Hive 數(shù)倉。經(jīng)過前兩個版本的迭代,已經(jīng)達(dá)到了 batch 兼容且生產(chǎn)可用,在 TPC-DS 10T benchmark 下性能達(dá)到 Hive 3.0 的 7 倍以上。
1.11.0 在 Hive 生態(tài)中重點(diǎn)實(shí)現(xiàn)了實(shí)時(shí)數(shù)倉方案,改善了端到端流式 ETL 的用戶體驗(yàn),達(dá)到了批流一體 Hive 數(shù)倉的目標(biāo)。同時(shí)在兼容性、性能、易用性方面也進(jìn)一步進(jìn)行了加強(qiáng)。
在實(shí)時(shí)數(shù)倉的解決方案中,憑借 Flink 的流式處理優(yōu)勢做到實(shí)時(shí)讀寫 Hive:
Hive 寫入:FLIP-115 完善擴(kuò)展了 FileSystem connector 的基礎(chǔ)能力和實(shí)現(xiàn),Table/SQL 層的 sink 可以支持各種格式(CSV、Json、Avro、Parquet、ORC),而且支持 Hive table 的所有格式。
Partition 支持:數(shù)據(jù)導(dǎo)入 Hive 引入 partition 提交機(jī)制來控制可見性,通過sink.partition-commit.trigger 控制 partition 提交的時(shí)機(jī),通過 sink.partition-commit.policy.kind 選擇提交策略,支持 SUCCESS 文件和 metastore 提交。
Hive 讀取:實(shí)時(shí)化的流式讀取 Hive,通過監(jiān)控 partition 生成增量讀取新 partition,或者監(jiān)控文件夾內(nèi)新文件生成來增量讀取新文件。在 Hive 可用性方面的提升:
FLIP-123 通過 Hive Dialect 為用戶提供語法兼容,這樣用戶無需在 Flink 和 Hive 的 CLI 之間切換,可以直接遷移 Hive 腳本到 Flink 中執(zhí)行。
提供 Hive 相關(guān)依賴的內(nèi)置支持,避免用戶自己下載所需的相關(guān)依賴。現(xiàn)在只需要單獨(dú)下載一個包,配置 HADOOP_CLASSPATH 就可以運(yùn)行。
在 Hive 性能方面,1.10.0 中已經(jīng)支持了 ORC(Hive 2+)的向量化讀取,1.11.0 中我們補(bǔ)全了所有版本的 Parquet 和 ORC 向量化支持來提升性能。
6.4. 全新 Source API
前面也提到過,source 和 sink 是 Flink 對接外部系統(tǒng)的一個橋梁,對于完善生態(tài)、可用性及端到端的用戶體驗(yàn)是很重要的環(huán)節(jié)。社區(qū)早在一年前就已經(jīng)規(guī)劃了 source 端的徹底重構(gòu),從 FLIP-27 的 ID 就可以看出是很早的一個 feature。但是由于涉及到很多復(fù)雜的內(nèi)部機(jī)制和考慮到各種 source connector 的實(shí)現(xiàn),設(shè)計(jì)上需要考慮的很全面。從 1.10.0 就開始做 POC 的實(shí)現(xiàn),最終趕上了 1.11.0 版本的發(fā)布。
先簡要回顧下 source 之前的主要問題:
對用戶而言,在 Flink 中改造已有的 source 或者重新實(shí)現(xiàn)一個生產(chǎn)級的 source connector 不是一件容易的事情,具體體現(xiàn)在沒有公共的代碼可以復(fù)用,而且需要理解很多 Flink 內(nèi)部細(xì)節(jié)以及實(shí)現(xiàn)具體的 event time 分配、watermark 產(chǎn)出、idleness 監(jiān)測、線程模型等。
批和流的場景需要實(shí)現(xiàn)不同的 source。
partitions/splits/shards 概念在接口中沒有顯式表達(dá),比如 split 的發(fā)現(xiàn)邏輯和數(shù)據(jù)消費(fèi)都耦合在 source function 的實(shí)現(xiàn)中,這樣在實(shí)現(xiàn) Kafka 或 Kinesis 類型的 source 時(shí)增加了復(fù)雜性。
在 runtime 執(zhí)行層,checkpoint 鎖被 source function 搶占會帶來一系列問題,框架很難進(jìn)行優(yōu)化。
FLIP-27 在設(shè)計(jì)時(shí)充分考慮了上述的痛點(diǎn):

首先在 Job Manager 和 Task Manager 中分別引入兩種不同的組件 Split Enumerator 和 Source reader,解耦 split 發(fā)現(xiàn)和對應(yīng)的消費(fèi)處理,同時(shí)方便隨意組合不同的策略。比如現(xiàn)有的 Kafka connector 中有多種不同的 partition 發(fā)現(xiàn)策略和實(shí)現(xiàn)耦合在一起,在新的架構(gòu)下,我們只需要實(shí)現(xiàn)一種 source reader,就可以適配多種 split enumerator 的實(shí)現(xiàn)來對應(yīng)不同的 partition 發(fā)現(xiàn)策略。
在新架構(gòu)下實(shí)現(xiàn)的 source connector 可以做到批流統(tǒng)一,唯一的小區(qū)別是對批場景的有限輸入,split enumerator 會產(chǎn)出固定數(shù)量的 split 集合并且每個 split 都是有限數(shù)據(jù)集;對于流場景的無限輸入,split enumerator 要么產(chǎn)出無限多的 split 或者 split 自身是無限數(shù)據(jù)集。
復(fù)雜的 timestamp assigner 以及 watermark generator 透明的內(nèi)置在 source reader 模塊內(nèi)運(yùn)行,對用戶來說是無感知的。這樣用戶如果想實(shí)現(xiàn)新的 source connector,一般不再需要重復(fù)實(shí)現(xiàn)這部分功能。
目前 Flink 已有的 source connector 會在后續(xù)的版本中基于新架構(gòu)來重新實(shí)現(xiàn),legacy source 也會繼續(xù)維護(hù)幾個版本保持兼容性,用戶也可以按照 release 文檔中的說明來嘗試體驗(yàn)新 source 的開發(fā)。
6.5. PyFlink 生態(tài)
眾所周知,Python 語言在機(jī)器學(xué)習(xí)和數(shù)據(jù)分析領(lǐng)域有著廣泛的使用。Flink 從 1.9.0 版本開始發(fā)力兼容 Python 生態(tài),Python 和 Flink 合力為 PyFlink,把 Flink 的實(shí)時(shí)分布式處理能力輸出給 Python 用戶。前兩個版本 PyFlink 已經(jīng)支持了 Python Table API 和 UDF,在 1.11.0 中擴(kuò)大對 Python 生態(tài)庫 Pandas 的支持以及和 SQL DDL/Client 的集成,同時(shí) Python UDF 性能有了極大的提升。
具體來說,之前普通的 Python UDF 每次調(diào)用只能處理一條數(shù)據(jù),而且在 Java 端和 Python 端都需要序列化/反序列化,開銷很大。1.11.0 中 Flink 支持在 Table & SQL 作業(yè)中自定義和使用向量化 Python UDF,用戶只需要在 UDF 修飾中額外增加一個參數(shù) udf_type=“pandas” 即可。這樣帶來的好處是:
每次調(diào)用可以處理 N 條數(shù)據(jù)。
數(shù)據(jù)格式基于 Apache Arrow,大大降低了 Java、Python 進(jìn)程之間的序列化/反序列化開銷。
方便 Python 用戶基于 Numpy 和 Pandas 等數(shù)據(jù)分析領(lǐng)域常用的 Python 庫,開發(fā)高性能的 Python UDF。
除此之外,1.11.0 中 PyFlink 還支持:
PyFlink table 和 Pandas DataFrame 之間無縫切換(FLIP-120),增強(qiáng) Pandas 生態(tài)的易用性和兼容性。
Table & SQL 中可以定義和使用 Python UDTF(FLINK-14500),不再必需 Java/Scala UDTF。
Cython 優(yōu)化 Python UDF 的性能(FLIP-121),對比 1.10.0 可以提升 30 倍。
Python UDF 中用戶自定義 metric(FLIP-112),方便監(jiān)控和調(diào)試 UDF 的執(zhí)行。
上述解讀的都是側(cè)重 API 層面,用戶開發(fā)作業(yè)可以直接感知到的易用性的提升。下面我們看看執(zhí)行引擎層在 1.11.0 中都有哪些值得關(guān)注的變化。
6.6. 生產(chǎn)可用性和穩(wěn)定性提升
6.6.1 支持 application 模式和 Kubernetes 增強(qiáng)
1.11.0 版本前,F(xiàn)link 主要支持如下兩種模式運(yùn)行:
Session 模式:提前啟動一個集群,所有作業(yè)都共享這個集群的資源運(yùn)行。優(yōu)勢是避免每個作業(yè)單獨(dú)啟動集群帶來的額外開銷,缺點(diǎn)是隔離性稍差。如果一個作業(yè)把某個 Task Manager(TM)容器搞掛,會導(dǎo)致這個容器內(nèi)的所有作業(yè)都跟著重啟。雖然每個作業(yè)有自己獨(dú)立的 Job Manager(JM)來管理,但是這些 JM 都運(yùn)行在一個進(jìn)程中,容易帶來負(fù)載上的瓶頸。
Per-job 模式:為了解決 session 模式隔離性差的問題,每個作業(yè)根據(jù)資源需求啟動獨(dú)立的集群,每個作業(yè)的 JM 也是運(yùn)行在獨(dú)立的進(jìn)程中,負(fù)載相對小很多。
以上兩種模式的共同問題是需要在客戶端執(zhí)行用戶代碼,編譯生成對應(yīng)的 Job Graph 提交到集群運(yùn)行。在這個過程需要下載相關(guān) jar 包并上傳到集群,客戶端和網(wǎng)絡(luò)負(fù)載壓力容易成為瓶頸,尤其當(dāng)一個客戶端被多個用戶共享使用。
1.11.0 中引入了 application 模式(FLIP-85)來解決上述問題,按照 application 粒度來啟動一個集群,屬于這個 application 的所有 job 在這個集群中運(yùn)行。核心是 Job Graph 的生成以及作業(yè)的提交不在客戶端執(zhí)行,而是轉(zhuǎn)移到 JM 端執(zhí)行,這樣網(wǎng)絡(luò)下載上傳的負(fù)載也會分散到集群中,不再有上述 client 單點(diǎn)上的瓶頸。
用戶可以通過 bin/flink run-application 來使用 application 模式,目前 Yarn 和 Kubernetes(K8s)都已經(jīng)支持這種模式。Yarn application 會在客戶端將運(yùn)行作業(yè)需要的依賴都通過 Yarn Local Resource 傳遞到 JM。K8s application 允許用戶構(gòu)建包含用戶 jar 與依賴的鏡像,同時(shí)會根據(jù)作業(yè)自動創(chuàng)建 TM,并在結(jié)束后銷毀整個集群,相比 session 模式具有更好的隔離性。K8s 不再有嚴(yán)格意義上的 per-job 模式,application 模式相當(dāng)于 per-job 在集群進(jìn)行提交作業(yè)的實(shí)現(xiàn)。
除了支持 application 模式,F(xiàn)link 原生 K8s 在 1.11.0 中還完善了很多基礎(chǔ)的功能特性(FLINK-14460),以達(dá)到生產(chǎn)可用性的標(biāo)準(zhǔn)。例如 Node Selector、Label、Annotation、Toleration 等。為了更方便的與 Hadoop 集成,也支持根據(jù)環(huán)境變量自動掛載 Hadoop 配置的功能。
6.6.2 Checkpoint & Savepoint 優(yōu)化
checkpoint 和 savepoint 機(jī)制一直是 Flink 保持先進(jìn)性的核心競爭力之一,社區(qū)在這個領(lǐng)域的改動很謹(jǐn)慎,最近的幾個大版本中幾乎沒有大的功能和架構(gòu)上的調(diào)整。在用戶郵件列表中,我們經(jīng)常能看到用戶反饋和抱怨的相關(guān)問題:比如 checkpoint 長時(shí)間做不出來失敗,savepoint 在作業(yè)重啟后不可用等等。1.11.0 有選擇的解決了一些這方面的常見問題,提高生產(chǎn)可用性和穩(wěn)定性。
1.11.0 之前, savepoint 中 meta 數(shù)據(jù)和 state 數(shù)據(jù)分別保存在兩個不同的目錄中,這樣如果想遷移 state 目錄很難識別這種映射關(guān)系,也可能導(dǎo)致目錄被誤刪除,對于目錄清理也同樣有麻煩。1.11.0 把兩部分?jǐn)?shù)據(jù)整合到一個目錄下,這樣方便整體轉(zhuǎn)移和復(fù)用。另外,之前 meta 引用 state 采用的是絕對路徑,這樣 state 目錄遷移后路徑發(fā)生變化也不可用,1.11.0 把 state 引用改成了相對路徑解決了這個問題(FLINK-5763),這樣 savepoint 的管理維護(hù)、復(fù)用更加靈活方便。
實(shí)際生產(chǎn)環(huán)境中,用戶經(jīng)常遭遇 checkpoint 超時(shí)失敗、長時(shí)間不能完成帶來的困擾。一旦作業(yè) failover 會造成回放大量的歷史數(shù)據(jù),作業(yè)長時(shí)間沒有進(jìn)度,端到端的延遲增加。1.11.0 從不同維度對 checkpoint 的優(yōu)化和提速做了改進(jìn),目標(biāo)實(shí)現(xiàn)分鐘甚至秒級的輕量型 checkpoint。
首先,增加了 Checkpoint Coordinator 通知 task 取消 checkpoint 的機(jī)制(FLINK-8871),這樣避免 task 端還在執(zhí)行已經(jīng)取消的 checkpoint 而對系統(tǒng)帶來不必要的壓力。同時(shí) task 端放棄已經(jīng)取消的 checkpoint,可以更快的參與執(zhí)行 coordinator 新觸發(fā)的 checkpoint,某種程度上也可以避免新 checkpoint 再次執(zhí)行超時(shí)而失敗。這個優(yōu)化也對后面默認(rèn)開啟 local recovery 提供了便利,task 端可以及時(shí)清理失效 checkpoint 的資源。
在反壓場景下,整個數(shù)據(jù)鏈路堆積了大量 buffer,導(dǎo)致 checkpoint barrier 排在數(shù)據(jù) buffer 后面,不能被 task 及時(shí)處理對齊,也就導(dǎo)致了 checkpoint 長時(shí)間不能執(zhí)行。1.11.0 中從兩個維度對這個問題進(jìn)行解決:
1)嘗試減少數(shù)據(jù)鏈路中的 buffer 總量(FLINK-16428),這樣 checkpoint barrier 可以盡快被處理對齊。
上游輸出端控制單個 sub partition 堆積 buffer 的最大閾值(backlog),避免負(fù)載不均場景下單個鏈路上堆積大量 buffer。在不影響網(wǎng)絡(luò)吞吐性能的情況下合理修改上下游默認(rèn)的 buffer 配置。上下游數(shù)據(jù)傳輸?shù)幕A(chǔ)協(xié)議進(jìn)行了調(diào)整,允許單個數(shù)據(jù)鏈路可以配置 0 個獨(dú)占 buffer 而不死鎖,這樣總的 buffer 數(shù)量和作業(yè)并發(fā)規(guī)模解耦。根據(jù)實(shí)際需求在吞吐性能和 checkpoint 速度兩者之間權(quán)衡,自定義 buffer 配比。這個優(yōu)化有一部分工作已經(jīng)在 1.11.0 中完成,剩余部分會在下個版本繼續(xù)推進(jìn)完成。
2)實(shí)現(xiàn)了全新的 unaligned checkpoint 機(jī)制(FLIP-76)從根本上解決了反壓場景下 checkpoint barrier 對齊的問題。
實(shí)際上這個想法早在 1.10.0 版本之前就開始醞釀設(shè)計(jì),由于涉及到很多模塊的大改動,實(shí)現(xiàn)機(jī)制和線程模型也很復(fù)雜。我們實(shí)現(xiàn)了兩種不同方案的原型 POC 進(jìn)行了測試、性能對比,確定了最終的方案,因此直到 1.11.0 才完成了 MVP 版本,這也是 1.11.0 中執(zhí)行引擎層唯一的一個重量級 feature。其基本思想可以概括為:
Checkpoint barrier 跨數(shù)據(jù) buffer 傳輸,不在輸入輸出隊(duì)列排隊(duì)等待處理,這樣就和算子的計(jì)算能力解耦,barrier 在節(jié)點(diǎn)之間的傳輸只有網(wǎng)絡(luò)延時(shí),可以忽略不計(jì)。每個算子多個輸入鏈路之間不需要等待 barrier 對齊來執(zhí)行 checkpoint,第一個到的 barrier 就可以提前觸發(fā) checkpoint,這樣可以進(jìn)一步提速 checkpoint,不會因?yàn)閭€別鏈路的延遲而影響整體。
為了和之前 aligned checkpoint 的語義保持一致,所有未被處理的輸入輸出數(shù)據(jù) buffer 都將作為 channel state 在 checkpoint 執(zhí)行時(shí)進(jìn)行快照持久化,在 failover 時(shí)連同 operator state 一同進(jìn)行恢復(fù)。
換句話說,aligned 機(jī)制保證的是 barrier 前面所有數(shù)據(jù)必須被處理完,狀態(tài)實(shí)時(shí)體現(xiàn)到 operator state 中;而 unaligned 機(jī)制把 barrier 前面的未處理數(shù)據(jù)所反映的 operator state 延后到 failover restart 時(shí)通過 channel state 回放進(jìn)行體現(xiàn),從狀態(tài)恢復(fù)的角度來說最終都是一致的。 注意這里雖然引入了額外的 in-flight buffer 的持久化,但是這個過程實(shí)際是在 checkpoint 的異步階段完成的,同步階段只是進(jìn)行了輕量級的 buffer 引用,所以不會過多占用算子的計(jì)算時(shí)間而影響吞吐性能。

Unaligned checkpoint 在反壓嚴(yán)重的場景下可以明顯加速 checkpoint 的完成時(shí)間,因?yàn)樗辉僖蕾囉谡w的計(jì)算吞吐能力,而和系統(tǒng)的存儲性能更加相關(guān),相當(dāng)于計(jì)算和存儲的解耦。但是它的使用也有一定的局限性,它會增加整體 state 的大小,對存儲 IO 帶來額外的開銷,因此在 IO 已經(jīng)是瓶頸的場景下就不太適合使用 unaligned checkpoint 機(jī)制。
1.11.0 中 unaligned checkpoint 還沒有作為默認(rèn)模式,需要用戶手動配置來開啟,并且只在 exactly-once 模式下生效。但目前還不支持 savepoint 模式,因?yàn)?savepoint 涉及到作業(yè)的 rescale 場景,channel state 目前還不支持 state 拆分,在后面的版本會進(jìn)一步支持,所以 savepoint 目前還是會使用之前的 aligned 模式,在反壓場景下有可能需要很長時(shí)間才能完成。
引用文章: https://developer.aliyun.com/article/767711
七 .Flink 1.12 版本 [重要版本]
在 DataStream API 上添加了高效的批執(zhí)行模式的支持。這是批處理和流處理實(shí)現(xiàn)真正統(tǒng)一的運(yùn)行時(shí)的一個重要里程碑。
實(shí)現(xiàn)了基于Kubernetes的高可用性(HA)方案,作為生產(chǎn)環(huán)境中,ZooKeeper方案之外的另外一種選擇。
擴(kuò)展了 Kafka SQL connector,使其可以在 upsert 模式下工作,并且支持在 SQL DDL 中處理 connector 的 metadata?,F(xiàn)在,時(shí)態(tài)表 Join 可以完全用 SQL 來表示,不再依賴于 Table API 了。
PyFlink 中添加了對于 DataStream API 的支持,將 PyFlink 擴(kuò)展到了更復(fù)雜的場景,比如需要對狀態(tài)或者定時(shí)器 timer 進(jìn)行細(xì)粒度控制的場景。除此之外,現(xiàn)在原生支持將 PyFlink 作業(yè)部署到 Kubernetes上。
7.1. DataStream API 支持批執(zhí)行模式
Flink 的核心 API 最初是針對特定的場景設(shè)計(jì)的,盡管 Table API / SQL 針對流處理和批處理已經(jīng)實(shí)現(xiàn)了統(tǒng)一的 API,但當(dāng)用戶使用較底層的 API 時(shí),仍然需要在批處理(DataSet API)和流處理(DataStream API)這兩種不同的 API 之間進(jìn)行選擇。鑒于批處理是流處理的一種特例,將這兩種 API 合并成統(tǒng)一的 API,有一些非常明顯的好處,比如:
可復(fù)用性:作業(yè)可以在流和批這兩種執(zhí)行模式之間自由地切換,而無需重寫任何代碼。因此,用戶可以復(fù)用同一個作業(yè),來處理實(shí)時(shí)數(shù)據(jù)和歷史數(shù)據(jù)。
維護(hù)簡單:統(tǒng)一的 API 意味著流和批可以共用同一組 connector,維護(hù)同一套代碼,并能夠輕松地實(shí)現(xiàn)流批混合執(zhí)行,例如 backfilling 之類的場景。
考慮到這些優(yōu)點(diǎn),社區(qū)已朝著流批統(tǒng)一的 DataStream API 邁出了第一步:支持高效的批處理(FLIP-134)。從長遠(yuǎn)來看,這意味著 DataSet API 將被棄用(FLIP-131),其功能將被包含在 DataStream API 和 Table API / SQL 中。
有限流上的批處理
您已經(jīng)可以使用 DataStream API 來處理有限流(例如文件)了,但需要注意的是,運(yùn)行時(shí)并不“知道”作業(yè)的輸入是有限的。為了優(yōu)化在有限流情況下運(yùn)行時(shí)的執(zhí)行性能,新的 BATCH 執(zhí)行模式,對于聚合操作,全部在內(nèi)存中進(jìn)行,且使用 sort-based shuffle(FLIP-140)和優(yōu)化過的調(diào)度策略(請參見 Pipelined Region Scheduling 了解更多詳細(xì)信息)。因此,DataStream API 中的 BATCH 執(zhí)行模式已經(jīng)非常接近 Flink 1.12 中 DataSet API 的性能。有關(guān)性能的更多詳細(xì)信息,請查看 FLIP-140。

在 Flink 1.12 中,默認(rèn)執(zhí)行模式為 STREAMING,要將作業(yè)配置為以 BATCH 模式運(yùn)行,可以在提交作業(yè)的時(shí)候,設(shè)置參數(shù) execution.runtime-mode:
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
或者通過編程的方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeMode.BATCH);
注意:盡管 DataSet API 尚未被棄用,但我們建議用戶優(yōu)先使用具有 BATCH 執(zhí)行模式的 DataStream API 來開發(fā)新的批作業(yè),并考慮遷移現(xiàn)有的 DataSet 作業(yè)。
7.2. 新的 Data Sink API (Beta)
之前發(fā)布的 Flink 版本中[1],已經(jīng)支持了 source connector 工作在流批兩種模式下,因此在 Flink 1.12 中,社區(qū)著重實(shí)現(xiàn)了統(tǒng)一的 Data Sink API(FLIP-143)。新的抽象引入了 write/commit 協(xié)議和一個更加模塊化的接口。Sink 的實(shí)現(xiàn)者只需要定義 what 和 how:SinkWriter,用于寫數(shù)據(jù),并輸出需要 commit 的內(nèi)容(例如,committables);Committer 和 GlobalCommitter,封裝了如何處理 committables。框架會負(fù)責(zé) when 和 where:即在什么時(shí)間,以及在哪些機(jī)器或進(jìn)程中 commit。

這種模塊化的抽象允許為 BATCH 和 STREAMING 兩種執(zhí)行模式,實(shí)現(xiàn)不同的運(yùn)行時(shí)策略,以達(dá)到僅使用一種 sink 實(shí)現(xiàn),也可以使兩種模式都可以高效執(zhí)行。Flink 1.12 中,提供了統(tǒng)一的 FileSink connector,以替換現(xiàn)有的 StreamingFileSink connector (FLINK-19758)。其它的 connector 也將逐步遷移到新的接口。
7.3. 基于 Kubernetes 的高可用 (HA) 方案
Flink 可以利用 Kubernetes 提供的內(nèi)置功能來實(shí)現(xiàn) JobManager 的 failover,而不用依賴 ZooKeeper。為了實(shí)現(xiàn)不依賴于 ZooKeeper 的高可用方案,社區(qū)在 Flink 1.12(FLIP-144)中實(shí)現(xiàn)了基于 Kubernetes 的高可用方案。該方案與 ZooKeeper 方案基于相同的接口[3],并使用 Kubernetes 的 ConfigMap[4] 對象來處理從 JobManager 的故障中恢復(fù)所需的所有元數(shù)據(jù)。關(guān)于如何配置高可用的 standalone 或原生 Kubernetes 集群的更多詳細(xì)信息和示例,請查閱文檔[5]。
注意:需要注意的是,這并不意味著 ZooKeeper 將被刪除,這只是為 Kubernetes 上的 Flink 用戶提供了另外一種選擇。
7.4. 其它功能改進(jìn)
將現(xiàn)有的 connector 遷移到新的 Data Source API
在之前的版本中,F(xiàn)link 引入了新的 Data Source API(FLIP-27),以允許實(shí)現(xiàn)同時(shí)適用于有限數(shù)據(jù)(批)作業(yè)和無限數(shù)據(jù)(流)作業(yè)使用的 connector 。在 Flink 1.12 中,社區(qū)從 FileSystem connector(FLINK-19161)出發(fā),開始將現(xiàn)有的 source connector 移植到新的接口。
注意: 新的 source 實(shí)現(xiàn),是完全不同的實(shí)現(xiàn),與舊版本的實(shí)現(xiàn)不兼容。
Pipelined Region 調(diào)度 (FLIP-119)
在之前的版本中,F(xiàn)link 對于批作業(yè)和流作業(yè)有兩套獨(dú)立的調(diào)度策略。Flink 1.12 版本中,引入了統(tǒng)一的調(diào)度策略, 該策略通過識別 blocking 數(shù)據(jù)傳輸邊,將 ExecutionGraph 分解為多個 pipelined region。這樣一來,對于一個 pipelined region 來說,僅當(dāng)有數(shù)據(jù)時(shí)才調(diào)度它,并且僅在所有其所需的資源都被滿足時(shí)才部署它;同時(shí)也可以支持獨(dú)立地重啟失敗的 region。對于批作業(yè)來說,新策略可顯著地提高資源利用率,并消除死鎖。
支持 Sort-Merge Shuffle (FLIP-148)
為了提高大規(guī)模批作業(yè)的穩(wěn)定性、性能和資源利用率,社區(qū)引入了 sort-merge shuffle,以替代 Flink 現(xiàn)有的實(shí)現(xiàn)。這種方案可以顯著減少 shuffle 的時(shí)間,并使用較少的文件句柄和文件寫緩存(這對于大規(guī)模批作業(yè)的執(zhí)行非常重要)。在后續(xù)版本中(FLINK-19614),F(xiàn)link 會進(jìn)一步優(yōu)化相關(guān)性能。
注意:該功能是實(shí)驗(yàn)性的,在 Flink 1.12 中默認(rèn)情況下不啟用。要啟用 sort-merge shuffle,需要在 TaskManager 的網(wǎng)絡(luò)配置[6]中設(shè)置合理的最小并行度。
Flink WebUI 的改進(jìn) (FLIP-75)
作為對上一個版本中,F(xiàn)link WebUI 一系列改進(jìn)的延續(xù),F(xiàn)link 1.12 在 WebUI 上暴露了 JobManager 內(nèi)存相關(guān)的指標(biāo)和配置參數(shù)(FLIP-104)。對于 TaskManager 的指標(biāo)頁面也進(jìn)行了更新,為 Managed Memory、Network Memory 和 Metaspace 添加了新的指標(biāo),以反映自 Flink 1.10(FLIP-102)開始引入的 TaskManager 內(nèi)存模型的更改[7]。
7.5. Table API/SQL 變更
7.5.1. SQL Connectors 中的 Metadata 處理
如果可以將某些 source(和 format)的元數(shù)據(jù)作為額外字段暴露給用戶,對于需要將元數(shù)據(jù)與記錄數(shù)據(jù)一起處理的用戶來說很有意義。一個常見的例子是 Kafka,用戶可能需要訪問 offset、partition 或 topic 信息、讀寫 kafka 消息中的 key 或 使用消息 metadata中的時(shí)間戳進(jìn)行時(shí)間相關(guān)的操作。
在 Flink 1.12 中,F(xiàn)link SQL 支持了元數(shù)據(jù)列用來讀取和寫入每行數(shù)據(jù)中 connector 或 format 相關(guān)的列(FLIP-107)。這些列在 CREATE TABLE 語句中使用 METADATA(保留)關(guān)鍵字來聲明。
CREATE TABLE kafka_table (
id BIGINT,
name STRING,
event_time TIMESTAMP(3) METADATA FROM 'timestamp', -- access Kafka 'timestamp' metadata
headers MAP METADATA -- access Kafka 'headers' metadata
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'format' = 'avro'
);
在 Flink 1.12 中,已經(jīng)支持 Kafka 和 Kinesis connector 的元數(shù)據(jù),并且 FileSystem connector 上的相關(guān)工作也已經(jīng)在計(jì)劃中(FLINK-19903)。由于 Kafka record 的結(jié)構(gòu)比較復(fù)雜,社區(qū)還專門為 Kafka connector 實(shí)現(xiàn)了新的屬性[8],以控制如何處理鍵/值對。關(guān)于 Flink SQL 中元數(shù)據(jù)支持的完整描述,請查看每個 connector 的文檔[9]以及 FLIP-107 中描述的用例。
7.5.2. Upsert Kafka Connector
在某些場景中,例如讀取 compacted topic 或者輸出(更新)聚合結(jié)果的時(shí)候,需要將 Kafka 消息記錄的 key 當(dāng)成主鍵處理,用來確定一條數(shù)據(jù)是應(yīng)該作為插入、刪除還是更新記錄來處理。為了實(shí)現(xiàn)該功能,社區(qū)為 Kafka 專門新增了一個 upsert connector(upsert-kafka),該 connector 擴(kuò)展自現(xiàn)有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既可以作為 source 使用,也可以作為 sink 使用,并且提供了與現(xiàn)有的 kafka connector 相同的基本功能和持久性保證,因?yàn)閮烧咧g復(fù)用了大部分代碼。
要使用 upsert-kafka connector,必須在創(chuàng)建表時(shí)定義主鍵,并為鍵(key.format)和值(value.format)指定序列化反序列化格式。完整的示例,請查看最新的文檔[10]。
7.5.3. SQL 中 支持 Temporal Table Join
在之前的版本中,用戶需要通過創(chuàng)建時(shí)態(tài)表函數(shù)(temporal table function) 來支持時(shí)態(tài)表 join(temporal table join) ,而在 Flink 1.12 中,用戶可以使用標(biāo)準(zhǔn)的 SQL 語句 FOR SYSTEM_TIME AS OF(SQL:2011)來支持 join。此外,現(xiàn)在任意包含時(shí)間列和主鍵的表,都可以作為時(shí)態(tài)表,而不僅僅是 append-only 表。這帶來了一些新的應(yīng)用場景,比如將 Kafka compacted topic 或數(shù)據(jù)庫變更日志(來自 Debezium 等)作為時(shí)態(tài)表。
CREATE TABLE orders (
order_id STRING,
currency STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
) WITH (
…
);
-- Table backed by a Kafka compacted topic
CREATE TABLE latest_rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time - INTERVAL ‘5’ SECOND,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
…
);
-- Event-time temporal table join
SELECT
o.order_id,
o.order_time,
o.amount * r.rate AS amount,
r.currency
FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency;
上面的示例同時(shí)也展示了如何在 temporal table join 中使用 Flink 1.12 中新增的 upsert-kafka connector。
使用 Hive 表進(jìn)行 Temporal Table Join
用戶也可以將 Hive 表作為時(shí)態(tài)表來使用,F(xiàn)link 既支持自動讀取 Hive 表的最新分區(qū)作為時(shí)態(tài)表(FLINK-19644),也支持在作業(yè)執(zhí)行時(shí)追蹤整個 Hive 表的最新版本作為時(shí)態(tài)表。請參閱文檔,了解更多關(guān)于如何在 temporal table join 中使用 Hive 表的示例。
7.5.4. Table API/SQL 中的其它改進(jìn)
Kinesis Flink SQL Connector (FLINK-18858)
從 Flink 1.12 開始,Table API / SQL 原生支持將 Amazon Kinesis Data Streams(KDS)作為 source 和 sink 使用。新的 Kinesis SQL connector 提供了對于增強(qiáng)的Fan-Out(EFO)以及 Sink Partition 的支持。如需了解 Kinesis SQL connector 所有支持的功能、配置選項(xiàng)以及對外暴露的元數(shù)據(jù)信息,請查看最新的文檔。
在 FileSystem/Hive connector 的流式寫入中支持小文件合并 (FLINK-19345)
很多 bulk format,例如 Parquet,只有當(dāng)寫入的文件比較大時(shí),才比較高效。當(dāng) checkpoint 的間隔比較小時(shí),這會成為一個很大的問題,因?yàn)闀?chuàng)建大量的小文件。在 Flink 1.12 中,F(xiàn)ile Sink 增加了小文件合并功能,從而使得即使作業(yè) checkpoint 間隔比較小時(shí),也不會產(chǎn)生大量的文件。要開啟小文件合并,可以按照文檔[11]中的說明在 FileSystem connector 中設(shè)置 auto-compaction = true 屬性。
Kafka Connector 支持 Watermark 下推 (FLINK-20041)
為了確保使用 Kafka 的作業(yè)的結(jié)果的正確性,通常來說,最好基于分區(qū)來生成 watermark,因?yàn)榉謪^(qū)內(nèi)數(shù)據(jù)的亂序程度通常來說比分區(qū)之間數(shù)據(jù)的亂序程度要低很多。Flink 現(xiàn)在允許將 watermark 策略下推到 Kafka connector 里面,從而支持在 Kafka connector 內(nèi)部構(gòu)造基于分區(qū)的 watermark[12]。一個 Kafka source 節(jié)點(diǎn)最終所產(chǎn)生的 watermark 由該節(jié)點(diǎn)所讀取的所有分區(qū)中的 watermark 的最小值決定,從而使整個系統(tǒng)可以獲得更好的(即更接近真實(shí)情況)的 watermark。該功能也允許用戶配置基于分區(qū)的空閑檢測策略,以防止空閑分區(qū)阻礙整個作業(yè)的 event time 增長。
新增的 Formats

利用 Multi-input 算子進(jìn)行 Join 優(yōu)化 (FLINK-19621)
Shuffling 是一個 Flink 作業(yè)中最耗時(shí)的操作之一。為了消除不必要的序列化反序列化開銷、數(shù)據(jù) spilling 開銷,提升 Table API / SQL 上批作業(yè)和流作業(yè)的性能, planner 當(dāng)前會利用上一個版本中已經(jīng)引入的N元算子(FLIP-92),將由 forward 邊所連接的多個算子合并到一個 Task 里執(zhí)行。
Type Inference for Table API UDAFs (FLIP-65)
Flink 1.12 完成了從 Flink 1.9 開始的,針對 Table API 上的新的類型系統(tǒng)[2]的工作,并在聚合函數(shù)(UDAF)上支持了新的類型系統(tǒng)。從 Flink 1.12 開始,與標(biāo)量函數(shù)和表函數(shù)類似,聚合函數(shù)也支持了所有的數(shù)據(jù)類型。
7.6. PyFlink: Python DataStream API
為了擴(kuò)展 PyFlink 的可用性,F(xiàn)link 1.12 提供了對于 Python DataStream API(FLIP-130)的初步支持,該版本支持了無狀態(tài)類型的操作(例如 Map,F(xiàn)latMap,F(xiàn)ilter,KeyBy 等)。如果需要嘗試 Python DataStream API,可以安裝PyFlink,然后按照該文檔[14]進(jìn)行操作,文檔中描述了如何使用 Python DataStream API 構(gòu)建一個簡單的流應(yīng)用程序。
from pyflink.common.typeinfo import Types
from pyflink.datastream import MapFunction, StreamExecutionEnvironment
class MyMapFunction(MapFunction):
def map(self, value):
return value + 1
env = StreamExecutionEnvironment.get_execution_environment()
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
mapped_stream.print()
env.execute("datastream job")
7.7.PyFlink 中的其它改進(jìn)
PyFlink Jobs on Kubernetes (FLINK-17480)
除了 standalone 部署和 YARN 部署之外,現(xiàn)在也原生支持將 PyFlink 作業(yè)部署在 Kubernetes 上。最新的文檔中詳細(xì)描述了如何在 Kubernetes 上啟動 session 或 application 集群。
用戶自定義聚合函數(shù) (UDAFs)
從 Flink 1.12 開始,您可以在 PyFlink 作業(yè)中定義和使用 Python UDAF 了(FLIP-139)。普通的 UDF(標(biāo)量函數(shù))每次只能處理一行數(shù)據(jù),而 UDAF(聚合函數(shù))則可以處理多行數(shù)據(jù),用于計(jì)算多行數(shù)據(jù)的聚合值。您也可以使用 Pandas UDAF[15](FLIP-137),來進(jìn)行向量化計(jì)算(通常來說,比普通 Python UDAF 快10倍以上)。
注意: 普通 Python UDAF,當(dāng)前僅支持在 group aggregations 以及流模式下使用。如果需要在批模式或者窗口聚合中使用,建議使用 Pandas UDAF。
原文: https://developer.aliyun.com/article/780123
官方原文: https://flink.apache.org/news/2020/12/10/release-1.12.0.html

八千里路云和月 | 從零到大數(shù)據(jù)專家學(xué)習(xí)路徑指南
我們在學(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么?
193篇文章暴揍Flink,這個合集你需要關(guān)注一下
Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點(diǎn)
我們在學(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么?
在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)!
硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
4萬字長文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
【面試&個人成長】2021年過半,社招和校招的經(jīng)驗(yàn)之談
大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)
我寫過的關(guān)于成長/面試/職場進(jìn)階的文章
當(dāng)我們在學(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
你好,我是王知無,一個大數(shù)據(jù)領(lǐng)域的硬核原創(chuàng)作者。
做過后端架構(gòu)、數(shù)據(jù)中間件、數(shù)據(jù)平臺&架構(gòu)、算法工程化。
專注大數(shù)據(jù)領(lǐng)域?qū)崟r(shí)動態(tài)&技術(shù)提升&個人成長&職場進(jìn)階,歡迎關(guān)注。
