<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink從1.7到1.12版本升級匯總

          共 35453字,需瀏覽 71分鐘

           ·

          2021-08-29 14:00

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

          回復(fù)”面試“獲取更多驚喜

             

             一 .前言

          最進(jìn)再看官方flink提供的視頻教程,發(fā)現(xiàn)入門版本因?yàn)闀r(shí)間關(guān)系都是基于1.7.x講解的. 在實(shí)際操作中跟1.12.x版本還是有差距的, 所以整理一下從1.7 版本到1.12版本之間的相對大的變動. 做到在學(xué)習(xí)的過程中可以做到心里有數(shù).

          二 .Flink 1.7 版本

          在 Flink 1.7.0,我們更關(guān)注實(shí)現(xiàn)快速數(shù)據(jù)處理以及以無縫方式為 Flink 社區(qū)構(gòu)建數(shù)據(jù)密集型應(yīng)用程序。我們最新版本包括一些令人興奮的新功能和改進(jìn),例如對 Scala 2.12 的支持,Exactly-Once 語義的 S3 文件接收器,復(fù)雜事件處理與流SQL的集成.

          2.1. Flink中的Scala 2.12支持

          Flink 1.7.0 是第一個完全支持 Scala 2.12 的版本。這可以讓用戶使用新的 Scala 版本編寫 Flink 應(yīng)用程序以及利用 Scala 2.12 的生態(tài)系統(tǒng)。

          2.2. 狀態(tài)變化

          在許多情況下,由于需求的變化,長期運(yùn)行的 Flink 應(yīng)用程序會在其生命周期內(nèi)發(fā)生變化。在不丟失當(dāng)前應(yīng)用程序進(jìn)度狀態(tài)的情況下更改用戶狀態(tài)是應(yīng)用程序變化的關(guān)鍵要求。Flink 1.7.0 版本中社區(qū)添加了狀態(tài)變化,允許我們靈活地調(diào)整長時(shí)間運(yùn)行的應(yīng)用程序的用戶狀態(tài)模式,同時(shí)保持與先前保存點(diǎn)的兼容。通過狀態(tài)變化,我們可以在狀態(tài)模式中添加或刪除列。當(dāng)使用 Avro 生成類作為用戶狀態(tài)時(shí),狀態(tài)模式變化可以開箱即用,這意味著狀態(tài)模式可以根據(jù) Avro 的規(guī)范進(jìn)行變化。雖然 Avro 類型是 Flink 1.7 中唯一支持模式變化的內(nèi)置類型,但社區(qū)仍在繼續(xù)致力于在未來的 Flink 版本中進(jìn)一步擴(kuò)展對其他類型的支持。

          2.3. Exactly-once語義的S3 StreamingFileSink

          Flink 1.6.0 中引入的 StreamingFileSink 現(xiàn)在已經(jīng)擴(kuò)展到 S3 文件系統(tǒng),并保證 Exactly-once 語義。使用此功能允許所有 S3 用戶構(gòu)建寫入 S3 的 Exactly-once 語義端到端管道。

          2.4. Streaming SQL中支持MATCH_RECOGNIZE

          這是 Apache Flink 1.7.0 的一個重要補(bǔ)充,它為 Flink SQL 提供了 MATCH_RECOGNIZE 標(biāo)準(zhǔn)的初始支持。此功能融合了復(fù)雜事件處理(CEP)和SQL,可以輕松地對數(shù)據(jù)流進(jìn)行模式匹配,從而實(shí)現(xiàn)一整套新的用例。此功能目前處于測試階段。

          2.5. Streaming SQL中的 Temporal Tables 和 Temporal Joins

          Temporal Tables 是 Apache Flink 中的一個新概念,它為表的更改歷史記錄提供(參數(shù)化)視圖,可以返回表在任何時(shí)間點(diǎn)的內(nèi)容。例如,我們可以使用具有歷史貨幣匯率的表。隨著時(shí)間的推移,表會不斷發(fā)生變化,并增加更新的匯率。Temporal Table 是一種視圖,可以返回匯率在任何時(shí)間點(diǎn)的實(shí)際狀態(tài)。通過這樣的表,可以使用正確的匯率將不同貨幣的訂單流轉(zhuǎn)換為通用貨幣。

          Temporal Joins 允許 Streaming 數(shù)據(jù)與不斷變化/更新的表的內(nèi)存和計(jì)算效率的連接,使用處理時(shí)間或事件時(shí)間,同時(shí)符合ANSI SQL。

          流式 SQL 的其他功能除了上面提到的主要功能外,F(xiàn)link 的 Table&SQL API 已經(jīng)擴(kuò)展到更多用例。以下內(nèi)置函數(shù)被添加到API:TO_BASE64,LOG2,LTRIM,REPEAT,REPLACE,COSH,SINH,TANH。SQL Client 現(xiàn)在支持在環(huán)境文件和 CLI 會話中自定義視圖。此外,CLI 中還添加了基本的 SQL 語句自動完成功能。社區(qū)添加了一個 Elasticsearch 6 table sink,允許存儲動態(tài)表的更新結(jié)果。

          2.6. 版本化REST API

          從 Flink 1.7.0 開始,REST API 已經(jīng)版本化。這保證了 Flink REST API 的穩(wěn)定性,因此可以在 Flink 中針對穩(wěn)定的 API開發(fā)第三方應(yīng)用程序。因此,未來的 Flink 升級不需要更改現(xiàn)有的第三方集成。

          2.7. Kafka 2.0 Connector

          Apache Flink 1.7.0 繼續(xù)添加更多的連接器,使其更容易與更多外部系統(tǒng)進(jìn)行交互。在此版本中,社區(qū)添加了 Kafka 2.0 連接器,可以從 Kafka 2.0 讀寫數(shù)據(jù)時(shí)保證 Exactly-Once 語義。

          2.8. 本地恢復(fù)

          Apache Flink 1.7.0 通過擴(kuò)展 Flink 的調(diào)度來完成本地恢復(fù)功能,以便在恢復(fù)時(shí)考慮之前的部署位置。如果啟用了本地恢復(fù),F(xiàn)link 將在運(yùn)行任務(wù)的機(jī)器上保留一份最新檢查點(diǎn)的本地副本。將任務(wù)調(diào)度到之前的位置,F(xiàn)link 可以通過從本地磁盤讀取檢查點(diǎn)狀態(tài)來最小化恢復(fù)狀態(tài)的網(wǎng)絡(luò)流量。此功能大大提高了恢復(fù)速度。

          2.9. 刪除Flink的傳統(tǒng)模式

          Apache Flink 1.7.0 標(biāo)志著 Flip-6 工作已經(jīng)完全完成并且與傳統(tǒng)模式達(dá)到功能奇偶校驗(yàn)。因此,此版本刪除了對傳統(tǒng)模式的支持。

          三 .Flink 1.8 版本

          新特性和改進(jìn):

          • Schema Evolution Story 最終版

          • 基于 TTL 持續(xù)清除舊狀態(tài)

          • 使用用戶定義的函數(shù)和聚合進(jìn)行 SQL 模式檢測

          • 符合 RFC 的 CSV 格式

          • 新的 KafkaDeserializationSchema,可以直接訪問 ConsumerRecord

          • FlinkKinesisConsumer 中的分片水印選項(xiàng)

          • DynamoDB Streams 的新用戶捕獲表更改

          • 支持用于子任務(wù)協(xié)調(diào)的全局聚合

          重要變化:

          • 使用 Flink 捆綁 Hadoop 庫的更改:不再發(fā)布包含 hadoop 的便捷二進(jìn)制文件

          • FlinkKafkaConsumer 現(xiàn)在將根據(jù)主題規(guī)范過濾已恢復(fù)的分區(qū)

          • 表 API 的 Maven 依賴更改:之前具有flink-table依賴關(guān)系的用戶需要將依賴關(guān)系從flink-table-planner更新為正確的依賴關(guān)系 flink-table-api-,具體取決于是使用 Java 還是 Scala:flink-table-api-java-bridge或者flink-table-api-scala-bridge

          3.1. 使用TTL(生存時(shí)間)連續(xù)增量清除舊的Key狀態(tài)

          我們在Flink 1.6(FLINK-9510)中為Key狀態(tài)引入了TTL(生存時(shí)間)。此功能允許在訪問時(shí)清理并使Key狀態(tài)條目無法訪問。另外,在編寫保存點(diǎn)/檢查點(diǎn)時(shí),現(xiàn)在也將清理狀態(tài)。Flink 1.8引入了對RocksDB狀態(tài)后端(FLINK-10471)和堆狀態(tài)后端(FLINK-10473)的舊條數(shù)的連續(xù)清理。這意味著舊的條數(shù)將(根據(jù)TTL設(shè)置)不斷被清理掉。

          3.2. 恢復(fù)保存點(diǎn)時(shí)對模式遷移的新支持

          使用Flink 1.7.0,我們在使用AvroSerializer時(shí)添加了對更改狀態(tài)模式的支持。使用Flink 1.8.0,我們在TypeSerializers將所有內(nèi)置遷移到新的序列化器快照抽象方面取得了很大進(jìn)展,該抽象理論上允許模式遷移。在Flink附帶的序列化程序中,我們現(xiàn)在支持PojoSerializer (FLINK-11485)和Java EnumSerializer (FLINK-11334)以及有限情況下的Kryo(FLINK-11323)的模式遷移格式。

          3.3. 保存點(diǎn)兼容性

          TraversableSerializer 此序列化程序(FLINK-11539)中的更新,包含Scala的Flink 1.2中的保存點(diǎn)將不再與Flink 1.8兼容??梢酝ㄟ^升級到Flink 1.3和Flink 1.7之間的版本,然后再更新至Flink 1.8來解決此限制。

          3.4. RocksDB版本沖突并切換到FRocksDB(FLINK-10471)

          需要切換到名為FRocksDB的RocksDB的自定義構(gòu)建,因?yàn)樾枰猂ocksDB中的某些更改來支持使用TTL進(jìn)行連續(xù)狀態(tài)清理。FRocksDB的已使用版本基于RocksDB的升級版本5.17.2。對于Mac OS X,僅支持OS X版本> =10.13的RocksDB版本5.17.2。

          另見:https://github.com/facebook/rocksdb/issues/4862

          3.5. Maven 依賴

          使用Flink捆綁Hadoop庫的更改(FLINK-11266)

          包含hadoop的便捷二進(jìn)制文件不再發(fā)布。

          如果部署依賴于flink-shaded-hadoop2包含 flink-dist,則必須從下載頁面的可選組件部分手動下載并打包Hadoop jar并將其復(fù)制到/lib目錄中。另外一種方法,可以通過打包flink-dist和激活 include-hadoopmaven配置文件來構(gòu)建包含hadoop的Flink分發(fā)。

          由于hadoop flink-dist默認(rèn)不再包含在內(nèi),因此指定-DwithoutHadoop何時(shí)打包flink-dist將不再影響構(gòu)建。

          3.6. TaskManager配置(FLINK-11716)

          TaskManagers現(xiàn)在默認(rèn)綁定到主機(jī)IP地址而不是主機(jī)名??梢酝ㄟ^配置選項(xiàng)控制此行為taskmanager.network.bind-policy。如果你的Flink集群在升級后遇到莫名其妙的連接問題,嘗試設(shè)置taskmanager.network.bind-policy: name在flink-conf.yaml 返回前的1.8的設(shè)置行為。

          3.7. Table API 的變動

          • 直接表構(gòu)造函數(shù)使用的取消預(yù)測(FLINK-11447) Flink 1.8不贊成Table在Table API中直接使用該類的構(gòu)造函數(shù)。此構(gòu)造函數(shù)以前將用于執(zhí)行與橫向表的連接。你現(xiàn)在應(yīng)該使用table.joinLateral()或 table.leftOuterJoinLateral()代替。這種更改對于將Table類轉(zhuǎn)換為接口是必要的,這將使Table API在未來更易于維護(hù)和更清潔。

          • 引入新的CSV格式符(FLINK-9964)

          此版本為符合RFC4180的CSV文件引入了新的格式符。新描述符可用作 org.apache.flink.table.descriptors.Csv。

          目前,這只能與Kafka一起使用。舊描述符可org.apache.flink.table.descriptors.OldCsv用于文件系統(tǒng)連接器。

          靜態(tài)生成器方法在TableEnvironment(FLINK-11445)上的棄用,為了將API與實(shí)際實(shí)現(xiàn)分開:TableEnvironment.getTableEnvironment()。

          不推薦使用靜態(tài)方法。你現(xiàn)在應(yīng)該使用Batch/StreamTableEnvironment.create()。

          • 表API Maven模塊中的更改(FLINK-11064)

          之前具有flink-table依賴關(guān)系的用戶需要更新其依賴關(guān)系flink-table-planner以及正確的依賴關(guān)系flink-table-api-?,具體取決于是使用Java還是Scala:flink-table-api-java-bridge或者flink-table-api-scala-bridge。

          • 更改為外部目錄表構(gòu)建器(FLINK-11522)

          ExternalCatalogTable.builder()不贊成使用ExternalCatalogTableBuilder()。

          • 更改為表API連接器jar的命名(FLINK-11026)

          Kafka/elasticsearch6 sql-jars的命名方案已經(jīng)更改。在maven術(shù)語中,它們不再具有sql-jar限定符,而artifactId現(xiàn)在以前綴為例,flink-sql而不是flink例如flink-sql-connector-kafka。

          • 更改為指定Null的方式(FLINK-11785)

          現(xiàn)在Table API中的Null需要定義nullof(type)而不是Null(type)。舊方法已被棄用。

          3.8. 連接器變動

          • 引入可直接訪問ConsumerRecord的新KafkaDeserializationSchema(FLINK-8354)

          對于FlinkKafkaConsumers,我們推出了一個新的KafkaDeserializationSchema ,可以直接訪問KafkaConsumerRecord。這包含了該 KeyedSerializationSchema功能,該功能已棄用但目前仍可以使用。

          • FlinkKafkaConsumer現(xiàn)在將根據(jù)主題規(guī)范過濾恢復(fù)的分區(qū)(FLINK-10342)

          從Flink 1.8.0開始,現(xiàn)在FlinkKafkaConsumer總是過濾掉已恢復(fù)的分區(qū),這些分區(qū)不再與要在還原的執(zhí)行中訂閱的指定主題相關(guān)聯(lián)。此行為在以前的版本中不存在FlinkKafkaConsumer。

          如果您想保留以前的行為。請使用上面的

          disableFilterRestoredPartitionsWithSubscribedTopics()

          配置方法FlinkKafkaConsumer。

          考慮這個例子:如果你有一個正在消耗topic的Kafka Consumer A,你做了一個保存點(diǎn),然后改變你的Kafka消費(fèi)者而不是從topic消費(fèi)B,然后從保存點(diǎn)重新啟動你的工作。在此更改之前,您的消費(fèi)者現(xiàn)在將使用這兩個主題A,B因?yàn)樗鎯υ谙M(fèi)者正在使用topic消費(fèi)的狀態(tài)A。通過此更改,您的使用者將僅B在還原后使用topic,因?yàn)槲覀兪褂门渲玫膖opic過濾狀態(tài)中存儲的topic。

          其它接口改變:

          1、從TypeSerializer接口(FLINK-9803)中刪除了canEqual()方法

          這些canEqual()方法通常用于跨類型層次結(jié)構(gòu)進(jìn)行適當(dāng)?shù)南嗟刃詸z查。在TypeSerializer實(shí)際上并不需要這個屬性,因此該方法現(xiàn)已刪除。

          2、刪除CompositeSerializerSnapshot實(shí)用程序類(FLINK-11073)

          該CompositeSerializerSnapshot實(shí)用工具類已被刪除。

          現(xiàn)在CompositeTypeSerializerSnapshot,你應(yīng)該使用復(fù)合序列化程序的快照,該序列化程序?qū)⑿蛄谢山o多個嵌套的序列化程序。有關(guān)使用的說明,請參閱此處CompositeTypeSerializerSnapshot。

          四 .Flink 1.9 版本

          2019年 8月22日,Apache Flink 1.9.0 版本正式發(fā)布,這也是阿里內(nèi)部版本 Blink 合并入 Flink 后的首次版本發(fā)布。

          此次版本更新帶來的重大功能包括批處理作業(yè)的批式恢復(fù),以及 Table API 和 SQL 的基于 Blink 的新查詢引擎(預(yù)覽版)。同時(shí),這一版本還推出了 State Processor API,這是社區(qū)最迫切需求的功能之一,該 API 使用戶能夠用 Flink DataSet 作業(yè)靈活地讀寫保存點(diǎn)。此外,F(xiàn)link 1.9 還包括一個重新設(shè)計(jì)的 WebUI 和新的 Python Table API (預(yù)覽版)以及與 Apache Hive 生態(tài)系統(tǒng)的集成(預(yù)覽版)。

          新功能和改進(jìn)

          • 細(xì)粒度批作業(yè)恢復(fù) (FLIP-1)

          • State Processor API (FLIP-43)

          • Stop-with-Savepoint (FLIP-34)

          • 重構(gòu) Flink WebUI

          • 預(yù)覽新的 Blink SQL 查詢處理器

          • Table API / SQL 的其他改進(jìn)

          • 預(yù)覽 Hive 集成 (FLINK-10556)

          • 預(yù)覽新的 Python Table API (FLIP-38)

          4.1. 細(xì)粒度批作業(yè)恢復(fù) (FLIP-1)

          批作業(yè)(DataSet、Table API 和 SQL)從 task 失敗中恢復(fù)的時(shí)間被顯著縮短了。在 Flink 1.9 之前,批處理作業(yè)中的 task 失敗是通過取消所有 task 并重新啟動整個作業(yè)來恢復(fù)的,即作業(yè)從頭開始,所有進(jìn)度都會廢棄。在此版本中,F(xiàn)link 將中間結(jié)果保留在網(wǎng)絡(luò) shuffle 的邊緣,并使用此數(shù)據(jù)去恢復(fù)那些僅受故障影響的 task。所謂 task 的 “failover regions” (故障區(qū))是指通過 pipelined 方式連接的數(shù)據(jù)交換方式,定義了 task 受故障影響的邊界。

          要使用這個新的故障策略,需要確保 flink-conf.yaml 中有 jobmanager.execution.failover-strategy: region 的配置。

          注意:1.9 發(fā)布包中默認(rèn)就已經(jīng)包含了該配置項(xiàng),不過當(dāng)從之前版本升級上來時(shí),如果要復(fù)用之前的配置的話,需要手動加上該配置。

          “Region” 的故障策略也能同時(shí)提升 “embarrassingly parallel” 類型的流作業(yè)的恢復(fù)速度,也就是沒有任何像 keyBy() 和 rebalance 的 shuffle 的作業(yè)。當(dāng)這種作業(yè)在恢復(fù)時(shí),只有受影響的故障區(qū)的 task 需要重啟。對于其他類型的流作業(yè),故障恢復(fù)行為與之前的版本一樣。

          4.2. State Processor API (FLIP-43)

          直到 Flink 1.9,從外部訪問作業(yè)的狀態(tài)僅局限于:Queryable State(可查詢狀態(tài))實(shí)驗(yàn)性功能。此版本中引入了一種新的、強(qiáng)大的類庫,基于 DataSet 支持讀取、寫入、和修改狀態(tài)快照。在實(shí)踐上,這意味著:

          • Flink 作業(yè)的狀態(tài)可以自主構(gòu)建了,可以通過讀取外部系統(tǒng)的數(shù)據(jù)(例如外部數(shù)據(jù)庫),然后轉(zhuǎn)換成 savepoint。

          • Savepoint 中的狀態(tài)可以使用任意的 Flink 批處理 API 查詢(DataSet、Table、SQL)。例如,分析相關(guān)的狀態(tài)模式或檢查狀態(tài)差異以支持應(yīng)用程序?qū)徍嘶蚬收吓挪椤?/span>

          • Savepoint 中的狀態(tài) schema 可以離線遷移了,而之前的方案只能在訪問狀態(tài)時(shí)進(jìn)行,是一種在線遷移。

          • Savepoint 中的無效數(shù)據(jù)可以被識別出來并糾正。

          新的 State Processor API 覆蓋了所有類型的快照:savepoint,full checkpoint 和 incremental checkpoint。

          4.3. Stop-with-Savepoint (FLIP-34)

          “Cancel-with-savepoint” 是停止、重啟、fork、或升級 Flink 作業(yè)的一個常用操作。然而,當(dāng)前的實(shí)現(xiàn)并沒有保證輸出到 exactly-once sink 的外部存儲的數(shù)據(jù)持久化。為了改進(jìn)停止作業(yè)時(shí)的端到端語義,F(xiàn)link 1.9 引入了一種新的 SUSPEND 模式,可以帶 savepoint 停止作業(yè),保證了輸出數(shù)據(jù)的一致性。你可以使用 Flink CLI 來 suspend 一個作業(yè):

          bin/flink stop -p [:targetSavepointDirectory] :jobId

          4.4. 重構(gòu) Flink WebUI

          社區(qū)討論了現(xiàn)代化 Flink WebUI 的提案,決定采用 Angular 的最新穩(wěn)定版來重構(gòu)這個組件。從 Angular 1.x 躍升到了 7.x 。重新設(shè)計(jì)的 UI 是 1.9.0 的默認(rèn)版本,不過有一個按鈕可以切換到舊版的 WebUI。

          4.5. 新 Blink SQL 查詢處理器預(yù)覽

          在 Blink 捐贈給 Apache Flink 之后,社區(qū)就致力于為 Table API 和 SQL 集成 Blink 的查詢優(yōu)化器和 runtime。第一步,我們將 flink-table 單模塊重構(gòu)成了多個小模塊(FLIP-32)。這對于 Java 和 Scala API 模塊、優(yōu)化器、以及 runtime 模塊來說,有了一個更清晰的分層和定義明確的接口。

          緊接著,我們擴(kuò)展了 Blink 的 planner 以實(shí)現(xiàn)新的優(yōu)化器接口,所以現(xiàn)在有兩個插件化的查詢處理器來執(zhí)行 Table API 和 SQL:1.9 以前的 Flink 處理器和新的基于 Blink 的處理器?;?Blink 的查詢處理器提供了更好地 SQL 覆蓋率(1.9 完整支持 TPC-H,TPC-DS 的支持在下一個版本的計(jì)劃中)并通過更廣泛的查詢優(yōu)化(基于成本的執(zhí)行計(jì)劃選擇和更多的優(yōu)化規(guī)則)、改進(jìn)的代碼生成機(jī)制、和調(diào)優(yōu)過的算子實(shí)現(xiàn)來提升批處理查詢的性能。除此之外,基于 Blink 的查詢處理器還提供了更強(qiáng)大的流處理能力,包括一些社區(qū)期待已久的新功能(如維表 Join,TopN,去重)和聚合場景緩解數(shù)據(jù)傾斜的優(yōu)化,以及內(nèi)置更多常用的函數(shù)。

          注:兩個查詢處理器之間的語義和功能大部分是一致的,但并未完全對齊。具體請查看發(fā)布日志。

          不過, Blink 的查詢處理器的集成還沒有完全完成。因此,1.9 之前的 Flink 處理器仍然是1.9 版本的默認(rèn)處理器,建議用于生產(chǎn)設(shè)置。你可以在創(chuàng)建 TableEnvironment 時(shí)通過 EnvironmentSettings 配置啟用 Blink 處理器。被選擇的處理器必須要在正在執(zhí)行的 Java 進(jìn)程的類路徑中。對于集群設(shè)置,默認(rèn)兩個查詢處理器都會自動地加載到類路徑中。當(dāng)從 IDE 中運(yùn)行一個查詢時(shí),需要在項(xiàng)目中顯式地增加一個處理器的依賴。

          4.6. Table API / SQL 的其他改進(jìn)

          除了圍繞 Blink Planner 令人興奮的進(jìn)展外,社區(qū)還做了一系列的改進(jìn),包括:

          • 為 Table API / SQL 的 Java 用戶去除 Scala 依賴 (FLIP-32) 作為重構(gòu)和拆分 flink-table 模塊工作的一部分,我們?yōu)?Java 和 Scala 創(chuàng)建了兩個單獨(dú)的 API 模塊。對于 Scala 用戶來說,沒有什么改變。不過現(xiàn)在 Java 用戶在使用 Table API 和 SQL 時(shí),可以不用引入一堆 Scala 依賴了。

          • 重構(gòu) Table API / SQL 的類型系統(tǒng)(FLIP-37) 我們實(shí)現(xiàn)了一個新的數(shù)據(jù)類型系統(tǒng),以便從 Table API 中移除對 Flink TypeInformation 的依賴,并提高其對 SQL 標(biāo)準(zhǔn)的遵從性。不過還在進(jìn)行中,預(yù)計(jì)將在下一版本完工,在 Flink 1.9 中,UDF 尚未移植到新的類型系統(tǒng)上。

          • Table API 的多行多列轉(zhuǎn)換(FLIP-29) Table API 擴(kuò)展了一組支持多行和多列、輸入和輸出的轉(zhuǎn)換的功能。這些轉(zhuǎn)換顯著簡化了處理邏輯的實(shí)現(xiàn),同樣的邏輯使用關(guān)系運(yùn)算符來實(shí)現(xiàn)是比較麻煩的。

          • 嶄新的統(tǒng)一的 Catalog API Catalog 已有的一些接口被重構(gòu)和(某些)被替換了,從而統(tǒng)一了內(nèi)部和外部 catalog 的處理。這項(xiàng)工作主要是為了 Hive 集成(見下文)而啟動的,不過也改進(jìn)了 Flink 在管理 catalog 元數(shù)據(jù)的整體便利性。

          • SQL API 中的 DDL 支持 (FLINK-10232) 到目前為止,F(xiàn)link SQL 已經(jīng)支持 DML 語句(如 SELECT,INSERT)。但是外部表(table source 和 table sink)必須通過 Java/Scala 代碼的方式或配置文件的方式注冊。1.9 版本中,我們支持 SQL DDL 語句的方式注冊和刪除表(CREATE TABLE,DROP TABLE)。然而,我們還沒有增加流特定的語法擴(kuò)展來定義時(shí)間戳抽取和 watermark 生成策略等。流式的需求將會在下一版本完整支持。

          五 .Flink 1.10 版本 [重要版本 : Blink 整合完成]

          作為 Flink 社區(qū)迄今為止規(guī)模最大的一次版本升級,F(xiàn)link 1.10 容納了超過 200 位貢獻(xiàn)者對超過 1200 個 issue 的開發(fā)實(shí)現(xiàn),包含對 Flink 作業(yè)的整體性能及穩(wěn)定性的顯著優(yōu)化、對原生 Kubernetes 的初步集成(beta 版本)以及對 Python 支持(PyFlink)的重大優(yōu)化。

          Flink 1.10 同時(shí)還標(biāo)志著對 Blink[1] 的整合宣告完成,隨著對 Hive 的生產(chǎn)級別集成及對 TPC-DS 的全面覆蓋,F(xiàn)link 在增強(qiáng)流式 SQL 處理能力的同時(shí)也具備了成熟的批處理能力。

          5.1. 內(nèi)存管理及配置優(yōu)化

          Flink 目前的 TaskExecutor 內(nèi)存模型存在著一些缺陷,導(dǎo)致優(yōu)化資源利用率比較困難,例如:

          流和批處理內(nèi)存占用的配置模型不同;流處理中的 RocksDB state backend 需要依賴用戶進(jìn)行復(fù)雜的配置。為了讓內(nèi)存配置變的對于用戶更加清晰、直觀,F(xiàn)link 1.10 對 TaskExecutor 的內(nèi)存模型和配置邏輯進(jìn)行了較大的改動 (FLIP-49 [7])。這些改動使得 Flink 能夠更好地適配所有部署環(huán)境(例如 Kubernetes, Yarn, Mesos),讓用戶能夠更加嚴(yán)格的控制其內(nèi)存開銷。

          • Managed 內(nèi)存擴(kuò)展

          Managed 內(nèi)存的范圍有所擴(kuò)展,還涵蓋了 RocksDB state backend 使用的內(nèi)存。盡管批處理作業(yè)既可以使用堆內(nèi)內(nèi)存也可以使用堆外內(nèi)存,使用 RocksDB state backend 的流處理作業(yè)卻只能利用堆外內(nèi)存。因此為了讓用戶執(zhí)行流和批處理作業(yè)時(shí)無需更改集群的配置,我們規(guī)定從現(xiàn)在起 managed 內(nèi)存只能在堆外。

          • 簡化 RocksDB 配置

          此前,配置像 RocksDB 這樣的堆外 state backend 需要進(jìn)行大量的手動調(diào)試,例如減小 JVM 堆空間、設(shè)置 Flink 使用堆外內(nèi)存等。現(xiàn)在,F(xiàn)link 的開箱配置即可支持這一切,且只需要簡單地改變 managed 內(nèi)存的大小即可調(diào)整 RocksDB state backend 的內(nèi)存預(yù)算。

          另一個重要的優(yōu)化是,F(xiàn)link 現(xiàn)在可以限制 RocksDB 的 native 內(nèi)存占用(FLINK-7289 [8]),以避免超過總的內(nèi)存預(yù)算——這對于 Kubernetes 等容器化部署環(huán)境尤為重要。關(guān)于如何開啟、調(diào)試該特性,請參考 RocksDB 調(diào)試[9]。

          注:FLIP-49 改變了集群的資源配置過程,因此從以前的 Flink 版本升級時(shí)可能需要對集群配置進(jìn)行調(diào)整。詳細(xì)的變更日志及調(diào)試指南請參考文檔[10]。

          5.2. 統(tǒng)一的作業(yè)提交邏輯

          在此之前,提交作業(yè)是由執(zhí)行環(huán)境負(fù)責(zé)的,且與不同的部署目標(biāo)(例如 Yarn, Kubernetes, Mesos)緊密相關(guān)。這導(dǎo)致用戶需要針對不同環(huán)境保留多套配置,增加了管理的成本。

          在 Flink 1.10 中,作業(yè)提交邏輯被抽象到了通用的 Executor 接口(FLIP-73 [11])。新增加的 ExecutorCLI (FLIP-81 [12])引入了為任意執(zhí)行目標(biāo)[13]指定配置參數(shù)的統(tǒng)一方法。此外,隨著引入 JobClient(FLINK-74 [14])負(fù)責(zé)獲取 JobExecutionResult,獲取作業(yè)執(zhí)行結(jié)果的邏輯也得以與作業(yè)提交解耦。

          5.3. 原生 Kubernetes 集成(Beta)

          對于想要在容器化環(huán)境中嘗試 Flink 的用戶來說,想要在 Kubernetes 上部署和管理一個 Flink standalone 集群,首先需要對容器、算子及像 kubectl 這樣的環(huán)境工具有所了解。

          在 Flink 1.10 中,我們推出了初步的支持 session 模式的主動 Kubernetes 集成(FLINK-9953 [15])。其中,“主動”指 Flink ResourceManager (K8sResMngr) 原生地與 Kubernetes 通信,像 Flink 在 Yarn 和 Mesos 上一樣按需申請 pod。用戶可以利用 namespace,在多租戶環(huán)境中以較少的資源開銷啟動 Flink。這需要用戶提前配置好 RBAC 角色和有足夠權(quán)限的服務(wù)賬號。

          正如在統(tǒng)一的作業(yè)提交邏輯一節(jié)中提到的,F(xiàn)link 1.10 將命令行參數(shù)映射到了統(tǒng)一的配置。因此,用戶可以參閱 Kubernetes 配置選項(xiàng),在命令行中使用以下命令向 Kubernetes 提交 Flink 作業(yè)。

          ./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id= examples/streaming/WindowJoin.jar

          5.4. Table API/SQL: 生產(chǎn)可用的 Hive 集成

          Flink 1.9 推出了預(yù)覽版的 Hive 集成。該版本允許用戶使用 SQL DDL 將 Flink 特有的元數(shù)據(jù)持久化到 Hive Metastore、調(diào)用 Hive 中定義的 UDF 以及讀、寫 Hive 中的表。Flink 1.10 進(jìn)一步開發(fā)和完善了這一特性,帶來了全面兼容 Hive 主要版本[17]的生產(chǎn)可用的 Hive 集成。

          • Batch SQL 原生分區(qū)支持

          此前,F(xiàn)link 只支持寫入未分區(qū)的 Hive 表。在 Flink 1.10 中,F(xiàn)link SQL 擴(kuò)展支持了 INSERT OVERWRITE 和 PARTITION 的語法(FLIP-63 [18]),允許用戶寫入 Hive 中的靜態(tài)和動態(tài)分區(qū)。

          寫入靜態(tài)分區(qū)

          INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 …)] select_statement1 FROM from_statement;

          寫入動態(tài)分區(qū)

          INSERT { INTO | OVERWRITE } TABLE tablename1 select_statement1 FROM from_statement;

          對分區(qū)表的全面支持,使得用戶在讀取數(shù)據(jù)時(shí)能夠受益于分區(qū)剪枝,減少了需要掃描的數(shù)據(jù)量,從而大幅提升了這些操作的性能。

          • 其他優(yōu)化

          除了分區(qū)剪枝,F(xiàn)link 1.10 的 Hive 集成還引入了許多數(shù)據(jù)讀取[19]方面的優(yōu)化,例如:

          投影下推:Flink 采用了投影下推技術(shù),通過在掃描表時(shí)忽略不必要的域,最小化 Flink 和 Hive 表之間的數(shù)據(jù)傳輸量。這一優(yōu)化在表的列數(shù)較多時(shí)尤為有效。

          LIMIT 下推:對于包含 LIMIT 語句的查詢,F(xiàn)link 在所有可能的地方限制返回的數(shù)據(jù)條數(shù),以降低通過網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量。讀取數(shù)據(jù)時(shí)的 ORC 向量化:為了提高讀取 ORC 文件的性能,對于 Hive 2.0.0 及以上版本以及非復(fù)合數(shù)據(jù)類型的列,F(xiàn)link 現(xiàn)在默認(rèn)使用原生的 ORC 向量化讀取器。

          • 將可插拔模塊作為 Flink 內(nèi)置對象(Beta)

          Flink 1.10 在 Flink table 核心引入了通用的可插拔模塊機(jī)制,目前主要應(yīng)用于系統(tǒng)內(nèi)置函數(shù)(FLIP-68 [20])。通過模塊,用戶可以擴(kuò)展 Flink 的系統(tǒng)對象,例如像使用 Flink 系統(tǒng)函數(shù)一樣使用 Hive 內(nèi)置函數(shù)。新版本中包含一個預(yù)先實(shí)現(xiàn)好的 HiveModule,能夠支持多個 Hive 版本,當(dāng)然用戶也可以選擇編寫自己的可插拔模塊。

          5.5. 其他 Table API/SQL 優(yōu)化

          • SQL DDL 中的 watermark 和計(jì)算列

          Flink 1.10 在 SQL DDL 中增加了針對流處理定義時(shí)間屬性及產(chǎn)生 watermark 的語法擴(kuò)展(FLIP-66 [22])。這使得用戶可以在用 DDL 語句創(chuàng)建的表上進(jìn)行基于時(shí)間的操作(例如窗口)以及定義 watermark 策略。

          CREATE TABLE table_name (

          WATERMARK FOR columnName AS <watermark_strategy_expression>

          ) WITH (
          ...
          )
          • 其他 SQL DDL 擴(kuò)展

          Flink 現(xiàn)在嚴(yán)格區(qū)分臨時(shí)/持久、系統(tǒng)/目錄函數(shù)(FLIP-57 [24])。這不僅消除了函數(shù)引用中的歧義,還帶來了確定的函數(shù)解析順序(例如,當(dāng)存在命名沖突時(shí),比起目錄函數(shù)、持久函數(shù) Flink 會優(yōu)先使用系統(tǒng)函數(shù)、臨時(shí)函數(shù))。

          在 FLIP-57 的基礎(chǔ)上,我們擴(kuò)展了 SQL DDL 的語法,支持創(chuàng)建目錄函數(shù)、臨時(shí)函數(shù)以及臨時(shí)系統(tǒng)函數(shù)(FLIP-79):

          CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION

          [IF NOT EXISTS] [catalog_name.][db_name.]function_name

          AS identifier [LANGUAGE JAVA|SCALA]

          關(guān)于目前完整的 Flink SQL DDL 支持,請參考最新的文檔[26]。

          注:為了今后正確地處理和保證元對象(表、視圖、函數(shù))上的行為一致性,F(xiàn)link 廢棄了 Table API 中的部分對象申明方法,以使留下的方法更加接近標(biāo)準(zhǔn)的 SQL DDL(FLIP-64)。

          • 批處理完整的 TPC-DS 覆蓋

          TPC-DS 是廣泛使用的業(yè)界標(biāo)準(zhǔn)決策支持 benchmark,用于衡量基于 SQL 的數(shù)據(jù)處理引擎性能。Flink 1.10 端到端地支持所有 TPC-DS 查詢(FLINK-11491 [28]),標(biāo)志著 Flink SQL 引擎已經(jīng)具備滿足現(xiàn)代數(shù)據(jù)倉庫及其他類似的處理需求的能力。

          5.6. PyFlink: 支持原生用戶自定義函數(shù)(UDF)

          作為 Flink 全面支持 Python 的第一步,在之前版本中我們發(fā)布了預(yù)覽版的 PyFlink。在新版本中,我們專注于讓用戶在 Table API/SQL 中注冊并使用自定義函數(shù)(UDF,另 UDTF / UDAF 規(guī)劃中)(FLIP-58)。

          如果你對這一特性的底層實(shí)現(xiàn)(基于 Apache Beam 的可移植框架)感興趣,請參考 FLIP-58 的 Architecture 章節(jié)以及 FLIP-78。這些數(shù)據(jù)結(jié)構(gòu)為支持 Pandas 以及今后將 PyFlink 引入到 DataStream API 奠定了基礎(chǔ)。

          從 Flink 1.10 開始,用戶只要執(zhí)行以下命令就可以輕松地通過 pip 安裝 PyFlink:

          pip install apache-flink

          5.7. 重要變更

          • FLINK-10725[34]:Flink 現(xiàn)在可以使用 Java 11 編譯和運(yùn)行。

          • FLINK-15495[35]:SQL 客戶端現(xiàn)在默認(rèn)使用 Blink planner,向用戶提供最新的特性及優(yōu)化。Table API 同樣計(jì)劃在下個版本中從舊的 planner 切換到 Blink planner,我們建議用戶現(xiàn)在就開始嘗試和熟悉 Blink planner。

          • FLINK-13025[36]:新的 Elasticsearch sink connector[37] 全面支持 Elasticsearch 7.x 版本。

          • FLINK-15115[38]:Kafka 0.8 和 0.9 的 connector 已被標(biāo)記為廢棄并不再主動支持。如果你還在使用這些版本或有其他相關(guān)問題,請通過 @dev 郵件列表聯(lián)系我們。

          • FLINK-14516[39]:非基于信用的網(wǎng)絡(luò)流控制已被移除,同時(shí)移除的還有配置項(xiàng)“taskmanager.network.credit.model”。今后,F(xiàn)link 將總是使用基于信用的網(wǎng)絡(luò)流控制。

          • FLINK-12122[40]:在 Flink 1.5.0 中,F(xiàn)LIP-6[41] 改變了 slot 在 TaskManager 之間的分布方式。要想使用此前的調(diào)度策略,既盡可能將負(fù)載分散到所有當(dāng)前可用的 TaskManager,用戶可以在 flink-conf.yaml 中設(shè)置 “cluster.evenly-spread-out-slots: true”。

          • FLINK-11956[42]:s3-hadoop 和 s3-presto 文件系統(tǒng)不再使用類重定位加載方式,而是使用插件方式加載,同時(shí)無縫集成所有認(rèn)證提供者。我們強(qiáng)烈建議其他文件系統(tǒng)也只使用插件加載方式,并將陸續(xù)移除重定位加載方式。

          Flink 1.9 推出了新的 Web UI,同時(shí)保留了原來的 Web UI 以備不時(shí)之需。截至目前,我們沒有收到關(guān)于新的 UI 存在問題的反饋,因此社區(qū)投票決定在 Flink 1.10 中移除舊的 Web UI。

          原文: https://developer.aliyun.com/article/744734

          官方地址: https://flink.apache.org/news/2020/02/11/release-1.10.0.html

          5.7. 重要變更 FLINK-10725[34]:Flink 現(xiàn)在可以使用 Java 11 編譯和運(yùn)行。FLINK-15495[35]:SQL 客戶端現(xiàn)在默認(rèn)使用 Blink planner,向用戶提供最新的特性及優(yōu)化。Table API 同樣計(jì)劃在下個版本中從舊的 planner 切換到 Blink planner,我們建議用戶現(xiàn)在就開始嘗試和熟悉 Blink planner。FLINK-13025[36]:新的 Elasticsearch sink connector[37] 全面支持 Elasticsearch 7.x 版本。FLINK-15115[38]:Kafka 0.8 和 0.9 的 connector 已被標(biāo)記為廢棄并不再主動支持。如果你還在使用這些版本或有其他相關(guān)問題,請通過 @dev 郵件列表聯(lián)系我們。FLINK-14516[39]:非基于信用的網(wǎng)絡(luò)流控制已被移除,同時(shí)移除的還有配置項(xiàng)“taskmanager.network.credit.model”。今后,F(xiàn)link 將總是使用基于信用的網(wǎng)絡(luò)流控制。FLINK-12122[40]:在 Flink 1.5.0 中,F(xiàn)LIP-6[41] 改變了 slot 在 TaskManager 之間的分布方式。要想使用此前的調(diào)度策略,既盡可能將負(fù)載分散到所有當(dāng)前可用的 TaskManager,用戶可以在 flink-conf.yaml 中設(shè)置 “cluster.evenly-spread-out-slots: true”。FLINK-11956[42]:s3-hadoop 和 s3-presto 文件系統(tǒng)不再使用類重定位加載方式,而是使用插件方式加載,同時(shí)無縫集成所有認(rèn)證提供者。我們強(qiáng)烈建議其他文件系統(tǒng)也只使用插件加載方式,并將陸續(xù)移除重定位加載方式。Flink 1.9 推出了新的 Web UI,同時(shí)保留了原來的 Web UI 以備不時(shí)之需。截至目前,我們沒有收到關(guān)于新的 UI 存在問題的反饋,因此社區(qū)投票決定[43]在 Flink 1.10 中移除舊的 Web UI。

          原文: https://developer.aliyun.com/article/744734 官方地址: https://flink.apache.org/news/2020/02/11/release-1.10.0.html?spm=a2c6h.12873639.0.0.749e4fc0c1D14m

          六 .Flink 1.11 版本 [重要版本]

          Flink 1.11.0 正式發(fā)布。歷時(shí)近 4 個月,F(xiàn)link 在生態(tài)、易用性、生產(chǎn)可用性、穩(wěn)定性等方面都進(jìn)行了增強(qiáng)和改善。

          core engine 引入了 unaligned checkpoints,這是對 Flink 的容錯機(jī)制的重大更改,該機(jī)制可改善在高背壓下的檢查點(diǎn)性能。

          一個新的 Source API 通過統(tǒng)一批處理和 streaming 執(zhí)行以及將內(nèi)部組件(例如事件時(shí)間處理、水印生成或空閑檢測)卸載到 Flink 來簡化(自定義)sources 的實(shí)現(xiàn)。

          Flink SQL 引入了對變更數(shù)據(jù)捕獲(CDC)的支持,以輕松使用和解釋來自 Debezium 之類的工具的數(shù)據(jù)庫變更日志。更新的 FileSystem 連接器還擴(kuò)展了 Table API/SQL 支持的用例和格式集,從而實(shí)現(xiàn)了直接啟用從 Kafka 到 Hive 的 streaming 數(shù)據(jù)傳輸?shù)确桨浮?/span>

          PyFlink 的多項(xiàng)性能優(yōu)化,包括對矢量化用戶定義函數(shù)(Pandas UDF)的支持。這改善了與 Pandas 和 NumPy 之類庫的互操作性,使 Flink 在數(shù)據(jù)科學(xué)和 ML 工作負(fù)載方面更強(qiáng)大。

          重要變化

          • [FLINK-17339] 從 Flink 1.11 開始,Blink planner 是 Table API/SQL中的默認(rèn)設(shè)置。自 Flink 1.10 起,SQL 客戶端已經(jīng)存在這種情況。仍支持舊的 Flink 規(guī)劃器,但未積極開發(fā)。

          • [FLINK-5763] Savepoints 現(xiàn)在將其所有狀態(tài)包含在一個目錄中(元數(shù)據(jù)和程序狀態(tài))。這樣可以很容易地找出組成 savepoint 狀態(tài)的文件,并允許用戶通過簡單地移動目錄來重新定位 savepoint。

          • [FLINK-16408] 為了減輕對 JVM metaspace 的壓力,只要任務(wù)分配了至少一個插槽,TaskExecutor就會重用用戶代碼類加載器。這會稍微改變 Flink 的恢復(fù)行為,從而不會重新加載靜態(tài)字段。

          • [FLINK-11086] Flink 現(xiàn)在支持 Hadoop 3.0.0 以上的 Hadoop 版本。請注意,F(xiàn)link 項(xiàng)目不提供任何更新的flink-shaded-hadoop-x jars。用戶需要通過HADOOP_CLASSPATH環(huán)境變量(推薦)或 lib/ folder 提供 Hadoop 依賴項(xiàng)。

          • [FLINK-16963] Flink 隨附的所有MetricReporters均已轉(zhuǎn)換為插件。這些不再應(yīng)該放在/lib中(可能導(dǎo)致依賴沖突),而應(yīng)該放在/plugins/< some_directory>中。

          • [FLINK-12639] Flink 文檔正在做一些返工,因此從 Flink 1.11 開始,內(nèi)容的導(dǎo)航和組織會有所變化。

          官方原文: https://flink.apache.org/news/2020/07/06/release-1.11.0.html

          6.1. Table & SQL 支持 Change Data Capture(CDC)

          CDC 被廣泛使用在復(fù)制數(shù)據(jù)、更新緩存、微服務(wù)間同步數(shù)據(jù)、審計(jì)日志等場景,很多公司都在使用開源的 CDC 工具,如 MySQL CDC。通過 Flink 支持在 Table & SQL 中接入和解析 CDC 是一個強(qiáng)需求,在過往的很多討論中都被提及過,可以幫助用戶以實(shí)時(shí)的方式處理 changelog 流,進(jìn)一步擴(kuò)展 Flink 的應(yīng)用場景,例如把 MySQL 中的數(shù)據(jù)同步到 PG 或 ElasticSearch 中,低延時(shí)的 temporal join 一個 changelog 等。

          除了考慮到上面的真實(shí)需求,F(xiàn)link 中定義的“Dynamic Table”概念在流上有兩種模型:append 模式和 update 模式。通過 append 模式把流轉(zhuǎn)化為“Dynamic Table”在之前的版本中已經(jīng)支持,因此在 1.11.0 中進(jìn)一步支持 update 模式也從概念層面完整的實(shí)現(xiàn)了“Dynamic Table”。

          為了支持解析和輸出 changelog,如何在外部系統(tǒng)和 Flink 系統(tǒng)之間編解碼這些更新操作是首要解決的問題。考慮到 source 和 sink 是銜接外部系統(tǒng)的一個橋梁,因此 FLIP-95 在定義全新的 Table source 和 Table sink 接口時(shí)解決了這個問題。

          在公開的 CDC 調(diào)研報(bào)告中,Debezium 和 Canal 是用戶中最流行使用的 CDC 工具,這兩種工具用來同步 changelog 到其它的系統(tǒng)中,如消息隊(duì)列。據(jù)此,F(xiàn)LIP-105 首先支持了 Debezium 和 Canal 這兩種格式,而且 Kafka source 也已經(jīng)可以支持解析上述格式并輸出更新事件,在后續(xù)的版本中會進(jìn)一步支持 Avro(Debezium) 和 Protobuf(Canal)。

          CREATE TABLE my_table (  
          ...) WITH (
          'connector'='...', -- e.g. 'kafka'
          'format'='debezium-json',
          'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)
          'debezium-json.ignore-parse-errors'='true' -- default: false
          );

          6.2. Table & SQL 支持 JDBC Catalog

          1.11.0 之前,用戶如果依賴 Flink 的 source/sink 讀寫關(guān)系型數(shù)據(jù)庫或讀取 changelog 時(shí),必須要手動創(chuàng)建對應(yīng)的 schema。而且當(dāng)數(shù)據(jù)庫中的 schema 發(fā)生變化時(shí),也需要手動更新對應(yīng)的 Flink 作業(yè)以保持一致和類型匹配,任何不匹配都會造成運(yùn)行時(shí)報(bào)錯使作業(yè)失敗。用戶經(jīng)常抱怨這個看似冗余且繁瑣的流程,體驗(yàn)極差。

          實(shí)際上對于任何和 Flink 連接的外部系統(tǒng)都可能有類似的上述問題,在 1.11.0 中重點(diǎn)解決了和關(guān)系型數(shù)據(jù)庫對接的這個問題。FLIP-93 提供了 JDBC catalog 的基礎(chǔ)接口以及 Postgres catalog 的實(shí)現(xiàn),這樣方便后續(xù)實(shí)現(xiàn)與其它類型的關(guān)系型數(shù)據(jù)庫的對接。

          1.11.0 版本后,用戶使用 Flink SQL 時(shí)可以自動獲取表的 schema 而不再需要輸入 DDL。除此之外,任何 schema 不匹配的錯誤都會在編譯階段提前進(jìn)行檢查報(bào)錯,避免了之前運(yùn)行時(shí)報(bào)錯造成的作業(yè)失敗。這是提升易用性和用戶體驗(yàn)的一個典型例子。

          6.3. Hive 實(shí)時(shí)數(shù)倉

          從 1.9.0 版本開始 Flink 從生態(tài)角度致力于集成 Hive,目標(biāo)打造批流一體的 Hive 數(shù)倉。經(jīng)過前兩個版本的迭代,已經(jīng)達(dá)到了 batch 兼容且生產(chǎn)可用,在 TPC-DS 10T benchmark 下性能達(dá)到 Hive 3.0 的 7 倍以上。

          1.11.0 在 Hive 生態(tài)中重點(diǎn)實(shí)現(xiàn)了實(shí)時(shí)數(shù)倉方案,改善了端到端流式 ETL 的用戶體驗(yàn),達(dá)到了批流一體 Hive 數(shù)倉的目標(biāo)。同時(shí)在兼容性、性能、易用性方面也進(jìn)一步進(jìn)行了加強(qiáng)。

          在實(shí)時(shí)數(shù)倉的解決方案中,憑借 Flink 的流式處理優(yōu)勢做到實(shí)時(shí)讀寫 Hive:

          • Hive 寫入:FLIP-115 完善擴(kuò)展了 FileSystem connector 的基礎(chǔ)能力和實(shí)現(xiàn),Table/SQL 層的 sink 可以支持各種格式(CSV、Json、Avro、Parquet、ORC),而且支持 Hive table 的所有格式。

          • Partition 支持:數(shù)據(jù)導(dǎo)入 Hive 引入 partition 提交機(jī)制來控制可見性,通過sink.partition-commit.trigger 控制 partition 提交的時(shí)機(jī),通過 sink.partition-commit.policy.kind 選擇提交策略,支持 SUCCESS 文件和 metastore 提交。

          • Hive 讀取:實(shí)時(shí)化的流式讀取 Hive,通過監(jiān)控 partition 生成增量讀取新 partition,或者監(jiān)控文件夾內(nèi)新文件生成來增量讀取新文件。在 Hive 可用性方面的提升:

          • FLIP-123 通過 Hive Dialect 為用戶提供語法兼容,這樣用戶無需在 Flink 和 Hive 的 CLI 之間切換,可以直接遷移 Hive 腳本到 Flink 中執(zhí)行。

          • 提供 Hive 相關(guān)依賴的內(nèi)置支持,避免用戶自己下載所需的相關(guān)依賴。現(xiàn)在只需要單獨(dú)下載一個包,配置 HADOOP_CLASSPATH 就可以運(yùn)行。

          • 在 Hive 性能方面,1.10.0 中已經(jīng)支持了 ORC(Hive 2+)的向量化讀取,1.11.0 中我們補(bǔ)全了所有版本的 Parquet 和 ORC 向量化支持來提升性能。

          6.4. 全新 Source API

          前面也提到過,source 和 sink 是 Flink 對接外部系統(tǒng)的一個橋梁,對于完善生態(tài)、可用性及端到端的用戶體驗(yàn)是很重要的環(huán)節(jié)。社區(qū)早在一年前就已經(jīng)規(guī)劃了 source 端的徹底重構(gòu),從 FLIP-27 的 ID 就可以看出是很早的一個 feature。但是由于涉及到很多復(fù)雜的內(nèi)部機(jī)制和考慮到各種 source connector 的實(shí)現(xiàn),設(shè)計(jì)上需要考慮的很全面。從 1.10.0 就開始做 POC 的實(shí)現(xiàn),最終趕上了 1.11.0 版本的發(fā)布。

          先簡要回顧下 source 之前的主要問題:

          對用戶而言,在 Flink 中改造已有的 source 或者重新實(shí)現(xiàn)一個生產(chǎn)級的 source connector 不是一件容易的事情,具體體現(xiàn)在沒有公共的代碼可以復(fù)用,而且需要理解很多 Flink 內(nèi)部細(xì)節(jié)以及實(shí)現(xiàn)具體的 event time 分配、watermark 產(chǎn)出、idleness 監(jiān)測、線程模型等。

          批和流的場景需要實(shí)現(xiàn)不同的 source。

          partitions/splits/shards 概念在接口中沒有顯式表達(dá),比如 split 的發(fā)現(xiàn)邏輯和數(shù)據(jù)消費(fèi)都耦合在 source function 的實(shí)現(xiàn)中,這樣在實(shí)現(xiàn) Kafka 或 Kinesis 類型的 source 時(shí)增加了復(fù)雜性。

          在 runtime 執(zhí)行層,checkpoint 鎖被 source function 搶占會帶來一系列問題,框架很難進(jìn)行優(yōu)化。

          FLIP-27 在設(shè)計(jì)時(shí)充分考慮了上述的痛點(diǎn):

          • 首先在 Job Manager 和 Task Manager 中分別引入兩種不同的組件 Split Enumerator 和 Source reader,解耦 split 發(fā)現(xiàn)和對應(yīng)的消費(fèi)處理,同時(shí)方便隨意組合不同的策略。比如現(xiàn)有的 Kafka connector 中有多種不同的 partition 發(fā)現(xiàn)策略和實(shí)現(xiàn)耦合在一起,在新的架構(gòu)下,我們只需要實(shí)現(xiàn)一種 source reader,就可以適配多種 split enumerator 的實(shí)現(xiàn)來對應(yīng)不同的 partition 發(fā)現(xiàn)策略。

          • 在新架構(gòu)下實(shí)現(xiàn)的 source connector 可以做到批流統(tǒng)一,唯一的小區(qū)別是對批場景的有限輸入,split enumerator 會產(chǎn)出固定數(shù)量的 split 集合并且每個 split 都是有限數(shù)據(jù)集;對于流場景的無限輸入,split enumerator 要么產(chǎn)出無限多的 split 或者 split 自身是無限數(shù)據(jù)集。

          • 復(fù)雜的 timestamp assigner 以及 watermark generator 透明的內(nèi)置在 source reader 模塊內(nèi)運(yùn)行,對用戶來說是無感知的。這樣用戶如果想實(shí)現(xiàn)新的 source connector,一般不再需要重復(fù)實(shí)現(xiàn)這部分功能。

          目前 Flink 已有的 source connector 會在后續(xù)的版本中基于新架構(gòu)來重新實(shí)現(xiàn),legacy source 也會繼續(xù)維護(hù)幾個版本保持兼容性,用戶也可以按照 release 文檔中的說明來嘗試體驗(yàn)新 source 的開發(fā)。

          6.5. PyFlink 生態(tài)

          眾所周知,Python 語言在機(jī)器學(xué)習(xí)和數(shù)據(jù)分析領(lǐng)域有著廣泛的使用。Flink 從 1.9.0 版本開始發(fā)力兼容 Python 生態(tài),Python 和 Flink 合力為 PyFlink,把 Flink 的實(shí)時(shí)分布式處理能力輸出給 Python 用戶。前兩個版本 PyFlink 已經(jīng)支持了 Python Table API 和 UDF,在 1.11.0 中擴(kuò)大對 Python 生態(tài)庫 Pandas 的支持以及和 SQL DDL/Client 的集成,同時(shí) Python UDF 性能有了極大的提升。

          具體來說,之前普通的 Python UDF 每次調(diào)用只能處理一條數(shù)據(jù),而且在 Java 端和 Python 端都需要序列化/反序列化,開銷很大。1.11.0 中 Flink 支持在 Table & SQL 作業(yè)中自定義和使用向量化 Python UDF,用戶只需要在 UDF 修飾中額外增加一個參數(shù) udf_type=“pandas” 即可。這樣帶來的好處是:

          • 每次調(diào)用可以處理 N 條數(shù)據(jù)。

          • 數(shù)據(jù)格式基于 Apache Arrow,大大降低了 Java、Python 進(jìn)程之間的序列化/反序列化開銷。

          • 方便 Python 用戶基于 Numpy 和 Pandas 等數(shù)據(jù)分析領(lǐng)域常用的 Python 庫,開發(fā)高性能的 Python UDF。

          除此之外,1.11.0 中 PyFlink 還支持:

          • PyFlink table 和 Pandas DataFrame 之間無縫切換(FLIP-120),增強(qiáng) Pandas 生態(tài)的易用性和兼容性。

          • Table & SQL 中可以定義和使用 Python UDTF(FLINK-14500),不再必需 Java/Scala UDTF。

          • Cython 優(yōu)化 Python UDF 的性能(FLIP-121),對比 1.10.0 可以提升 30 倍。

          • Python UDF 中用戶自定義 metric(FLIP-112),方便監(jiān)控和調(diào)試 UDF 的執(zhí)行。

          上述解讀的都是側(cè)重 API 層面,用戶開發(fā)作業(yè)可以直接感知到的易用性的提升。下面我們看看執(zhí)行引擎層在 1.11.0 中都有哪些值得關(guān)注的變化。

          6.6. 生產(chǎn)可用性和穩(wěn)定性提升

          6.6.1 支持 application 模式和 Kubernetes 增強(qiáng)

          1.11.0 版本前,F(xiàn)link 主要支持如下兩種模式運(yùn)行:

          Session 模式:提前啟動一個集群,所有作業(yè)都共享這個集群的資源運(yùn)行。優(yōu)勢是避免每個作業(yè)單獨(dú)啟動集群帶來的額外開銷,缺點(diǎn)是隔離性稍差。如果一個作業(yè)把某個 Task Manager(TM)容器搞掛,會導(dǎo)致這個容器內(nèi)的所有作業(yè)都跟著重啟。雖然每個作業(yè)有自己獨(dú)立的 Job Manager(JM)來管理,但是這些 JM 都運(yùn)行在一個進(jìn)程中,容易帶來負(fù)載上的瓶頸。

          Per-job 模式:為了解決 session 模式隔離性差的問題,每個作業(yè)根據(jù)資源需求啟動獨(dú)立的集群,每個作業(yè)的 JM 也是運(yùn)行在獨(dú)立的進(jìn)程中,負(fù)載相對小很多。

          以上兩種模式的共同問題是需要在客戶端執(zhí)行用戶代碼,編譯生成對應(yīng)的 Job Graph 提交到集群運(yùn)行。在這個過程需要下載相關(guān) jar 包并上傳到集群,客戶端和網(wǎng)絡(luò)負(fù)載壓力容易成為瓶頸,尤其當(dāng)一個客戶端被多個用戶共享使用。

          1.11.0 中引入了 application 模式(FLIP-85)來解決上述問題,按照 application 粒度來啟動一個集群,屬于這個 application 的所有 job 在這個集群中運(yùn)行。核心是 Job Graph 的生成以及作業(yè)的提交不在客戶端執(zhí)行,而是轉(zhuǎn)移到 JM 端執(zhí)行,這樣網(wǎng)絡(luò)下載上傳的負(fù)載也會分散到集群中,不再有上述 client 單點(diǎn)上的瓶頸。

          用戶可以通過 bin/flink run-application 來使用 application 模式,目前 Yarn 和 Kubernetes(K8s)都已經(jīng)支持這種模式。Yarn application 會在客戶端將運(yùn)行作業(yè)需要的依賴都通過 Yarn Local Resource 傳遞到 JM。K8s application 允許用戶構(gòu)建包含用戶 jar 與依賴的鏡像,同時(shí)會根據(jù)作業(yè)自動創(chuàng)建 TM,并在結(jié)束后銷毀整個集群,相比 session 模式具有更好的隔離性。K8s 不再有嚴(yán)格意義上的 per-job 模式,application 模式相當(dāng)于 per-job 在集群進(jìn)行提交作業(yè)的實(shí)現(xiàn)。

          除了支持 application 模式,F(xiàn)link 原生 K8s 在 1.11.0 中還完善了很多基礎(chǔ)的功能特性(FLINK-14460),以達(dá)到生產(chǎn)可用性的標(biāo)準(zhǔn)。例如 Node Selector、Label、Annotation、Toleration 等。為了更方便的與 Hadoop 集成,也支持根據(jù)環(huán)境變量自動掛載 Hadoop 配置的功能。

          6.6.2 Checkpoint & Savepoint 優(yōu)化

          checkpoint 和 savepoint 機(jī)制一直是 Flink 保持先進(jìn)性的核心競爭力之一,社區(qū)在這個領(lǐng)域的改動很謹(jǐn)慎,最近的幾個大版本中幾乎沒有大的功能和架構(gòu)上的調(diào)整。在用戶郵件列表中,我們經(jīng)常能看到用戶反饋和抱怨的相關(guān)問題:比如 checkpoint 長時(shí)間做不出來失敗,savepoint 在作業(yè)重啟后不可用等等。1.11.0 有選擇的解決了一些這方面的常見問題,提高生產(chǎn)可用性和穩(wěn)定性。

          1.11.0 之前, savepoint 中 meta 數(shù)據(jù)和 state 數(shù)據(jù)分別保存在兩個不同的目錄中,這樣如果想遷移 state 目錄很難識別這種映射關(guān)系,也可能導(dǎo)致目錄被誤刪除,對于目錄清理也同樣有麻煩。1.11.0 把兩部分?jǐn)?shù)據(jù)整合到一個目錄下,這樣方便整體轉(zhuǎn)移和復(fù)用。另外,之前 meta 引用 state 采用的是絕對路徑,這樣 state 目錄遷移后路徑發(fā)生變化也不可用,1.11.0 把 state 引用改成了相對路徑解決了這個問題(FLINK-5763),這樣 savepoint 的管理維護(hù)、復(fù)用更加靈活方便。

          實(shí)際生產(chǎn)環(huán)境中,用戶經(jīng)常遭遇 checkpoint 超時(shí)失敗、長時(shí)間不能完成帶來的困擾。一旦作業(yè) failover 會造成回放大量的歷史數(shù)據(jù),作業(yè)長時(shí)間沒有進(jìn)度,端到端的延遲增加。1.11.0 從不同維度對 checkpoint 的優(yōu)化和提速做了改進(jìn),目標(biāo)實(shí)現(xiàn)分鐘甚至秒級的輕量型 checkpoint。

          首先,增加了 Checkpoint Coordinator 通知 task 取消 checkpoint 的機(jī)制(FLINK-8871),這樣避免 task 端還在執(zhí)行已經(jīng)取消的 checkpoint 而對系統(tǒng)帶來不必要的壓力。同時(shí) task 端放棄已經(jīng)取消的 checkpoint,可以更快的參與執(zhí)行 coordinator 新觸發(fā)的 checkpoint,某種程度上也可以避免新 checkpoint 再次執(zhí)行超時(shí)而失敗。這個優(yōu)化也對后面默認(rèn)開啟 local recovery 提供了便利,task 端可以及時(shí)清理失效 checkpoint 的資源。

          • 在反壓場景下,整個數(shù)據(jù)鏈路堆積了大量 buffer,導(dǎo)致 checkpoint barrier 排在數(shù)據(jù) buffer 后面,不能被 task 及時(shí)處理對齊,也就導(dǎo)致了 checkpoint 長時(shí)間不能執(zhí)行。1.11.0 中從兩個維度對這個問題進(jìn)行解決:

          1)嘗試減少數(shù)據(jù)鏈路中的 buffer 總量(FLINK-16428),這樣 checkpoint barrier 可以盡快被處理對齊。

          上游輸出端控制單個 sub partition 堆積 buffer 的最大閾值(backlog),避免負(fù)載不均場景下單個鏈路上堆積大量 buffer。在不影響網(wǎng)絡(luò)吞吐性能的情況下合理修改上下游默認(rèn)的 buffer 配置。上下游數(shù)據(jù)傳輸?shù)幕A(chǔ)協(xié)議進(jìn)行了調(diào)整,允許單個數(shù)據(jù)鏈路可以配置 0 個獨(dú)占 buffer 而不死鎖,這樣總的 buffer 數(shù)量和作業(yè)并發(fā)規(guī)模解耦。根據(jù)實(shí)際需求在吞吐性能和 checkpoint 速度兩者之間權(quán)衡,自定義 buffer 配比。這個優(yōu)化有一部分工作已經(jīng)在 1.11.0 中完成,剩余部分會在下個版本繼續(xù)推進(jìn)完成。

          2)實(shí)現(xiàn)了全新的 unaligned checkpoint 機(jī)制(FLIP-76)從根本上解決了反壓場景下 checkpoint barrier 對齊的問題。

          實(shí)際上這個想法早在 1.10.0 版本之前就開始醞釀設(shè)計(jì),由于涉及到很多模塊的大改動,實(shí)現(xiàn)機(jī)制和線程模型也很復(fù)雜。我們實(shí)現(xiàn)了兩種不同方案的原型 POC 進(jìn)行了測試、性能對比,確定了最終的方案,因此直到 1.11.0 才完成了 MVP 版本,這也是 1.11.0 中執(zhí)行引擎層唯一的一個重量級 feature。其基本思想可以概括為:

          Checkpoint barrier 跨數(shù)據(jù) buffer 傳輸,不在輸入輸出隊(duì)列排隊(duì)等待處理,這樣就和算子的計(jì)算能力解耦,barrier 在節(jié)點(diǎn)之間的傳輸只有網(wǎng)絡(luò)延時(shí),可以忽略不計(jì)。每個算子多個輸入鏈路之間不需要等待 barrier 對齊來執(zhí)行 checkpoint,第一個到的 barrier 就可以提前觸發(fā) checkpoint,這樣可以進(jìn)一步提速 checkpoint,不會因?yàn)閭€別鏈路的延遲而影響整體。

          為了和之前 aligned checkpoint 的語義保持一致,所有未被處理的輸入輸出數(shù)據(jù) buffer 都將作為 channel state 在 checkpoint 執(zhí)行時(shí)進(jìn)行快照持久化,在 failover 時(shí)連同 operator state 一同進(jìn)行恢復(fù)。

          換句話說,aligned 機(jī)制保證的是 barrier 前面所有數(shù)據(jù)必須被處理完,狀態(tài)實(shí)時(shí)體現(xiàn)到 operator state 中;而 unaligned 機(jī)制把 barrier 前面的未處理數(shù)據(jù)所反映的 operator state 延后到 failover restart 時(shí)通過 channel state 回放進(jìn)行體現(xiàn),從狀態(tài)恢復(fù)的角度來說最終都是一致的。 注意這里雖然引入了額外的 in-flight buffer 的持久化,但是這個過程實(shí)際是在 checkpoint 的異步階段完成的,同步階段只是進(jìn)行了輕量級的 buffer 引用,所以不會過多占用算子的計(jì)算時(shí)間而影響吞吐性能。

          Unaligned checkpoint 在反壓嚴(yán)重的場景下可以明顯加速 checkpoint 的完成時(shí)間,因?yàn)樗辉僖蕾囉谡w的計(jì)算吞吐能力,而和系統(tǒng)的存儲性能更加相關(guān),相當(dāng)于計(jì)算和存儲的解耦。但是它的使用也有一定的局限性,它會增加整體 state 的大小,對存儲 IO 帶來額外的開銷,因此在 IO 已經(jīng)是瓶頸的場景下就不太適合使用 unaligned checkpoint 機(jī)制。

          1.11.0 中 unaligned checkpoint 還沒有作為默認(rèn)模式,需要用戶手動配置來開啟,并且只在 exactly-once 模式下生效。但目前還不支持 savepoint 模式,因?yàn)?savepoint 涉及到作業(yè)的 rescale 場景,channel state 目前還不支持 state 拆分,在后面的版本會進(jìn)一步支持,所以 savepoint 目前還是會使用之前的 aligned 模式,在反壓場景下有可能需要很長時(shí)間才能完成。

          引用文章: https://developer.aliyun.com/article/767711

          七 .Flink 1.12 版本 [重要版本]

          • 在 DataStream API 上添加了高效的批執(zhí)行模式的支持。這是批處理和流處理實(shí)現(xiàn)真正統(tǒng)一的運(yùn)行時(shí)的一個重要里程碑。

          • 實(shí)現(xiàn)了基于Kubernetes的高可用性(HA)方案,作為生產(chǎn)環(huán)境中,ZooKeeper方案之外的另外一種選擇。

          • 擴(kuò)展了 Kafka SQL connector,使其可以在 upsert 模式下工作,并且支持在 SQL DDL 中處理 connector 的 metadata?,F(xiàn)在,時(shí)態(tài)表 Join 可以完全用 SQL 來表示,不再依賴于 Table API 了。

          • PyFlink 中添加了對于 DataStream API 的支持,將 PyFlink 擴(kuò)展到了更復(fù)雜的場景,比如需要對狀態(tài)或者定時(shí)器 timer 進(jìn)行細(xì)粒度控制的場景。除此之外,現(xiàn)在原生支持將 PyFlink 作業(yè)部署到 Kubernetes上。

          7.1. DataStream API 支持批執(zhí)行模式

          Flink 的核心 API 最初是針對特定的場景設(shè)計(jì)的,盡管 Table API / SQL 針對流處理和批處理已經(jīng)實(shí)現(xiàn)了統(tǒng)一的 API,但當(dāng)用戶使用較底層的 API 時(shí),仍然需要在批處理(DataSet API)和流處理(DataStream API)這兩種不同的 API 之間進(jìn)行選擇。鑒于批處理是流處理的一種特例,將這兩種 API 合并成統(tǒng)一的 API,有一些非常明顯的好處,比如:

          • 可復(fù)用性:作業(yè)可以在流和批這兩種執(zhí)行模式之間自由地切換,而無需重寫任何代碼。因此,用戶可以復(fù)用同一個作業(yè),來處理實(shí)時(shí)數(shù)據(jù)和歷史數(shù)據(jù)。

          • 維護(hù)簡單:統(tǒng)一的 API 意味著流和批可以共用同一組 connector,維護(hù)同一套代碼,并能夠輕松地實(shí)現(xiàn)流批混合執(zhí)行,例如 backfilling 之類的場景。

          考慮到這些優(yōu)點(diǎn),社區(qū)已朝著流批統(tǒng)一的 DataStream API 邁出了第一步:支持高效的批處理(FLIP-134)。從長遠(yuǎn)來看,這意味著 DataSet API 將被棄用(FLIP-131),其功能將被包含在 DataStream API 和 Table API / SQL 中。

          • 有限流上的批處理

          您已經(jīng)可以使用 DataStream API 來處理有限流(例如文件)了,但需要注意的是,運(yùn)行時(shí)并不“知道”作業(yè)的輸入是有限的。為了優(yōu)化在有限流情況下運(yùn)行時(shí)的執(zhí)行性能,新的 BATCH 執(zhí)行模式,對于聚合操作,全部在內(nèi)存中進(jìn)行,且使用 sort-based shuffle(FLIP-140)和優(yōu)化過的調(diào)度策略(請參見 Pipelined Region Scheduling 了解更多詳細(xì)信息)。因此,DataStream API 中的 BATCH 執(zhí)行模式已經(jīng)非常接近 Flink 1.12 中 DataSet API 的性能。有關(guān)性能的更多詳細(xì)信息,請查看 FLIP-140。

          在 Flink 1.12 中,默認(rèn)執(zhí)行模式為 STREAMING,要將作業(yè)配置為以 BATCH 模式運(yùn)行,可以在提交作業(yè)的時(shí)候,設(shè)置參數(shù) execution.runtime-mode:

          $ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar

          或者通過編程的方式:

          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.setRuntimeMode(RuntimeMode.BATCH);

          注意:盡管 DataSet API 尚未被棄用,但我們建議用戶優(yōu)先使用具有 BATCH 執(zhí)行模式的 DataStream API 來開發(fā)新的批作業(yè),并考慮遷移現(xiàn)有的 DataSet 作業(yè)。

          7.2. 新的 Data Sink API (Beta)

          之前發(fā)布的 Flink 版本中[1],已經(jīng)支持了 source connector 工作在流批兩種模式下,因此在 Flink 1.12 中,社區(qū)著重實(shí)現(xiàn)了統(tǒng)一的 Data Sink API(FLIP-143)。新的抽象引入了 write/commit 協(xié)議和一個更加模塊化的接口。Sink 的實(shí)現(xiàn)者只需要定義 what 和 how:SinkWriter,用于寫數(shù)據(jù),并輸出需要 commit 的內(nèi)容(例如,committables);Committer 和 GlobalCommitter,封裝了如何處理 committables。框架會負(fù)責(zé) when 和 where:即在什么時(shí)間,以及在哪些機(jī)器或進(jìn)程中 commit。

          這種模塊化的抽象允許為 BATCH 和 STREAMING 兩種執(zhí)行模式,實(shí)現(xiàn)不同的運(yùn)行時(shí)策略,以達(dá)到僅使用一種 sink 實(shí)現(xiàn),也可以使兩種模式都可以高效執(zhí)行。Flink 1.12 中,提供了統(tǒng)一的 FileSink connector,以替換現(xiàn)有的 StreamingFileSink connector (FLINK-19758)。其它的 connector 也將逐步遷移到新的接口。

          7.3. 基于 Kubernetes 的高可用 (HA) 方案

          Flink 可以利用 Kubernetes 提供的內(nèi)置功能來實(shí)現(xiàn) JobManager 的 failover,而不用依賴 ZooKeeper。為了實(shí)現(xiàn)不依賴于 ZooKeeper 的高可用方案,社區(qū)在 Flink 1.12(FLIP-144)中實(shí)現(xiàn)了基于 Kubernetes 的高可用方案。該方案與 ZooKeeper 方案基于相同的接口[3],并使用 Kubernetes 的 ConfigMap[4] 對象來處理從 JobManager 的故障中恢復(fù)所需的所有元數(shù)據(jù)。關(guān)于如何配置高可用的 standalone 或原生 Kubernetes 集群的更多詳細(xì)信息和示例,請查閱文檔[5]。

          注意:需要注意的是,這并不意味著 ZooKeeper 將被刪除,這只是為 Kubernetes 上的 Flink 用戶提供了另外一種選擇。

          7.4. 其它功能改進(jìn)

          • 將現(xiàn)有的 connector 遷移到新的 Data Source API

          在之前的版本中,F(xiàn)link 引入了新的 Data Source API(FLIP-27),以允許實(shí)現(xiàn)同時(shí)適用于有限數(shù)據(jù)(批)作業(yè)和無限數(shù)據(jù)(流)作業(yè)使用的 connector 。在 Flink 1.12 中,社區(qū)從 FileSystem connector(FLINK-19161)出發(fā),開始將現(xiàn)有的 source connector 移植到新的接口。

          注意: 新的 source 實(shí)現(xiàn),是完全不同的實(shí)現(xiàn),與舊版本的實(shí)現(xiàn)不兼容。

          • Pipelined Region 調(diào)度 (FLIP-119)

          在之前的版本中,F(xiàn)link 對于批作業(yè)和流作業(yè)有兩套獨(dú)立的調(diào)度策略。Flink 1.12 版本中,引入了統(tǒng)一的調(diào)度策略, 該策略通過識別 blocking 數(shù)據(jù)傳輸邊,將 ExecutionGraph 分解為多個 pipelined region。這樣一來,對于一個 pipelined region 來說,僅當(dāng)有數(shù)據(jù)時(shí)才調(diào)度它,并且僅在所有其所需的資源都被滿足時(shí)才部署它;同時(shí)也可以支持獨(dú)立地重啟失敗的 region。對于批作業(yè)來說,新策略可顯著地提高資源利用率,并消除死鎖。

          • 支持 Sort-Merge Shuffle (FLIP-148)

          為了提高大規(guī)模批作業(yè)的穩(wěn)定性、性能和資源利用率,社區(qū)引入了 sort-merge shuffle,以替代 Flink 現(xiàn)有的實(shí)現(xiàn)。這種方案可以顯著減少 shuffle 的時(shí)間,并使用較少的文件句柄和文件寫緩存(這對于大規(guī)模批作業(yè)的執(zhí)行非常重要)。在后續(xù)版本中(FLINK-19614),F(xiàn)link 會進(jìn)一步優(yōu)化相關(guān)性能。

          注意:該功能是實(shí)驗(yàn)性的,在 Flink 1.12 中默認(rèn)情況下不啟用。要啟用 sort-merge shuffle,需要在 TaskManager 的網(wǎng)絡(luò)配置[6]中設(shè)置合理的最小并行度。

          • Flink WebUI 的改進(jìn) (FLIP-75)

          作為對上一個版本中,F(xiàn)link WebUI 一系列改進(jìn)的延續(xù),F(xiàn)link 1.12 在 WebUI 上暴露了 JobManager 內(nèi)存相關(guān)的指標(biāo)和配置參數(shù)(FLIP-104)。對于 TaskManager 的指標(biāo)頁面也進(jìn)行了更新,為 Managed Memory、Network Memory 和 Metaspace 添加了新的指標(biāo),以反映自 Flink 1.10(FLIP-102)開始引入的 TaskManager 內(nèi)存模型的更改[7]。

          7.5. Table API/SQL 變更

          7.5.1. SQL Connectors 中的 Metadata 處理

          如果可以將某些 source(和 format)的元數(shù)據(jù)作為額外字段暴露給用戶,對于需要將元數(shù)據(jù)與記錄數(shù)據(jù)一起處理的用戶來說很有意義。一個常見的例子是 Kafka,用戶可能需要訪問 offset、partition 或 topic 信息、讀寫 kafka 消息中的 key 或 使用消息 metadata中的時(shí)間戳進(jìn)行時(shí)間相關(guān)的操作。

          在 Flink 1.12 中,F(xiàn)link SQL 支持了元數(shù)據(jù)列用來讀取和寫入每行數(shù)據(jù)中 connector 或 format 相關(guān)的列(FLIP-107)。這些列在 CREATE TABLE 語句中使用 METADATA(保留)關(guān)鍵字來聲明。

          CREATE TABLE kafka_table (
          id BIGINT,
          name STRING,
          event_time TIMESTAMP(3) METADATA FROM 'timestamp', -- access Kafka 'timestamp' metadata
          headers MAP METADATA -- access Kafka 'headers' metadata
          ) WITH (
          'connector' = 'kafka',
          'topic' = 'test-topic',
          'format' = 'avro'
          );

          在 Flink 1.12 中,已經(jīng)支持 Kafka 和 Kinesis connector 的元數(shù)據(jù),并且 FileSystem connector 上的相關(guān)工作也已經(jīng)在計(jì)劃中(FLINK-19903)。由于 Kafka record 的結(jié)構(gòu)比較復(fù)雜,社區(qū)還專門為 Kafka connector 實(shí)現(xiàn)了新的屬性[8],以控制如何處理鍵/值對。關(guān)于 Flink SQL 中元數(shù)據(jù)支持的完整描述,請查看每個 connector 的文檔[9]以及 FLIP-107 中描述的用例。

          7.5.2. Upsert Kafka Connector

          在某些場景中,例如讀取 compacted topic 或者輸出(更新)聚合結(jié)果的時(shí)候,需要將 Kafka 消息記錄的 key 當(dāng)成主鍵處理,用來確定一條數(shù)據(jù)是應(yīng)該作為插入、刪除還是更新記錄來處理。為了實(shí)現(xiàn)該功能,社區(qū)為 Kafka 專門新增了一個 upsert connector(upsert-kafka),該 connector 擴(kuò)展自現(xiàn)有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既可以作為 source 使用,也可以作為 sink 使用,并且提供了與現(xiàn)有的 kafka connector 相同的基本功能和持久性保證,因?yàn)閮烧咧g復(fù)用了大部分代碼。

          要使用 upsert-kafka connector,必須在創(chuàng)建表時(shí)定義主鍵,并為鍵(key.format)和值(value.format)指定序列化反序列化格式。完整的示例,請查看最新的文檔[10]。

          7.5.3. SQL 中 支持 Temporal Table Join

          在之前的版本中,用戶需要通過創(chuàng)建時(shí)態(tài)表函數(shù)(temporal table function) 來支持時(shí)態(tài)表 join(temporal table join) ,而在 Flink 1.12 中,用戶可以使用標(biāo)準(zhǔn)的 SQL 語句 FOR SYSTEM_TIME AS OF(SQL:2011)來支持 join。此外,現(xiàn)在任意包含時(shí)間列和主鍵的表,都可以作為時(shí)態(tài)表,而不僅僅是 append-only 表。這帶來了一些新的應(yīng)用場景,比如將 Kafka compacted topic 或數(shù)據(jù)庫變更日志(來自 Debezium 等)作為時(shí)態(tài)表。

          CREATE TABLE orders (
          order_id STRING,
          currency STRING,
          amount INT,
          order_time TIMESTAMP(3),
          WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
          ) WITH (

          );

          -- Table backed by a Kafka compacted topic
          CREATE TABLE latest_rates (
          currency STRING,
          rate DECIMAL(38, 10),
          currency_time TIMESTAMP(3),
          WATERMARK FOR currency_time AS currency_time - INTERVAL ‘5’ SECOND,
          PRIMARY KEY (currency) NOT ENFORCED
          ) WITH (
          'connector' = 'upsert-kafka',

          );

          -- Event-time temporal table join
          SELECT
          o.order_id,
          o.order_time,
          o.amount * r.rate AS amount,
          r.currency
          FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time r
          ON o.currency = r.currency;

          上面的示例同時(shí)也展示了如何在 temporal table join 中使用 Flink 1.12 中新增的 upsert-kafka connector。

          • 使用 Hive 表進(jìn)行 Temporal Table Join

          用戶也可以將 Hive 表作為時(shí)態(tài)表來使用,F(xiàn)link 既支持自動讀取 Hive 表的最新分區(qū)作為時(shí)態(tài)表(FLINK-19644),也支持在作業(yè)執(zhí)行時(shí)追蹤整個 Hive 表的最新版本作為時(shí)態(tài)表。請參閱文檔,了解更多關(guān)于如何在 temporal table join 中使用 Hive 表的示例。

          7.5.4. Table API/SQL 中的其它改進(jìn)

          • Kinesis Flink SQL Connector (FLINK-18858)

          從 Flink 1.12 開始,Table API / SQL 原生支持將 Amazon Kinesis Data Streams(KDS)作為 source 和 sink 使用。新的 Kinesis SQL connector 提供了對于增強(qiáng)的Fan-Out(EFO)以及 Sink Partition 的支持。如需了解 Kinesis SQL connector 所有支持的功能、配置選項(xiàng)以及對外暴露的元數(shù)據(jù)信息,請查看最新的文檔。

          • 在 FileSystem/Hive connector 的流式寫入中支持小文件合并 (FLINK-19345)

          很多 bulk format,例如 Parquet,只有當(dāng)寫入的文件比較大時(shí),才比較高效。當(dāng) checkpoint 的間隔比較小時(shí),這會成為一個很大的問題,因?yàn)闀?chuàng)建大量的小文件。在 Flink 1.12 中,F(xiàn)ile Sink 增加了小文件合并功能,從而使得即使作業(yè) checkpoint 間隔比較小時(shí),也不會產(chǎn)生大量的文件。要開啟小文件合并,可以按照文檔[11]中的說明在 FileSystem connector 中設(shè)置 auto-compaction = true 屬性。

          • Kafka Connector 支持 Watermark 下推 (FLINK-20041)

          為了確保使用 Kafka 的作業(yè)的結(jié)果的正確性,通常來說,最好基于分區(qū)來生成 watermark,因?yàn)榉謪^(qū)內(nèi)數(shù)據(jù)的亂序程度通常來說比分區(qū)之間數(shù)據(jù)的亂序程度要低很多。Flink 現(xiàn)在允許將 watermark 策略下推到 Kafka connector 里面,從而支持在 Kafka connector 內(nèi)部構(gòu)造基于分區(qū)的 watermark[12]。一個 Kafka source 節(jié)點(diǎn)最終所產(chǎn)生的 watermark 由該節(jié)點(diǎn)所讀取的所有分區(qū)中的 watermark 的最小值決定,從而使整個系統(tǒng)可以獲得更好的(即更接近真實(shí)情況)的 watermark。該功能也允許用戶配置基于分區(qū)的空閑檢測策略,以防止空閑分區(qū)阻礙整個作業(yè)的 event time 增長。

          新增的 Formats

          利用 Multi-input 算子進(jìn)行 Join 優(yōu)化 (FLINK-19621)

          Shuffling 是一個 Flink 作業(yè)中最耗時(shí)的操作之一。為了消除不必要的序列化反序列化開銷、數(shù)據(jù) spilling 開銷,提升 Table API / SQL 上批作業(yè)和流作業(yè)的性能, planner 當(dāng)前會利用上一個版本中已經(jīng)引入的N元算子(FLIP-92),將由 forward 邊所連接的多個算子合并到一個 Task 里執(zhí)行。

          Type Inference for Table API UDAFs (FLIP-65)

          Flink 1.12 完成了從 Flink 1.9 開始的,針對 Table API 上的新的類型系統(tǒng)[2]的工作,并在聚合函數(shù)(UDAF)上支持了新的類型系統(tǒng)。從 Flink 1.12 開始,與標(biāo)量函數(shù)和表函數(shù)類似,聚合函數(shù)也支持了所有的數(shù)據(jù)類型。

          7.6. PyFlink: Python DataStream API

          為了擴(kuò)展 PyFlink 的可用性,F(xiàn)link 1.12 提供了對于 Python DataStream API(FLIP-130)的初步支持,該版本支持了無狀態(tài)類型的操作(例如 Map,F(xiàn)latMap,F(xiàn)ilter,KeyBy 等)。如果需要嘗試 Python DataStream API,可以安裝PyFlink,然后按照該文檔[14]進(jìn)行操作,文檔中描述了如何使用 Python DataStream API 構(gòu)建一個簡單的流應(yīng)用程序。

          from pyflink.common.typeinfo import Types
          from pyflink.datastream import MapFunction, StreamExecutionEnvironment
          class MyMapFunction(MapFunction):
          def map(self, value):
          return value + 1
          env = StreamExecutionEnvironment.get_execution_environment()
          data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
          mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
          mapped_stream.print()
          env.execute("datastream job")

          7.7.PyFlink 中的其它改進(jìn)

          • PyFlink Jobs on Kubernetes (FLINK-17480)

          除了 standalone 部署和 YARN 部署之外,現(xiàn)在也原生支持將 PyFlink 作業(yè)部署在 Kubernetes 上。最新的文檔中詳細(xì)描述了如何在 Kubernetes 上啟動 session 或 application 集群。

          • 用戶自定義聚合函數(shù) (UDAFs)

          從 Flink 1.12 開始,您可以在 PyFlink 作業(yè)中定義和使用 Python UDAF 了(FLIP-139)。普通的 UDF(標(biāo)量函數(shù))每次只能處理一行數(shù)據(jù),而 UDAF(聚合函數(shù))則可以處理多行數(shù)據(jù),用于計(jì)算多行數(shù)據(jù)的聚合值。您也可以使用 Pandas UDAF[15](FLIP-137),來進(jìn)行向量化計(jì)算(通常來說,比普通 Python UDAF 快10倍以上)。

          注意: 普通 Python UDAF,當(dāng)前僅支持在 group aggregations 以及流模式下使用。如果需要在批模式或者窗口聚合中使用,建議使用 Pandas UDAF。

          原文: https://developer.aliyun.com/article/780123

          官方原文: https://flink.apache.org/news/2020/12/10/release-1.12.0.html


          八千里路云和月 | 從零到大數(shù)據(jù)專家學(xué)習(xí)路徑指南

          我們在學(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么?

          193篇文章暴揍Flink,這個合集你需要關(guān)注一下

          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS

          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點(diǎn)

          我們在學(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么?

          在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)!

          硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)

          數(shù)據(jù)治理方法論和實(shí)踐小百科全書

          標(biāo)簽體系下的用戶畫像建設(shè)小指南

          4萬字長文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析

          【面試&個人成長】2021年過半,社招和校招的經(jīng)驗(yàn)之談

          大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)

          我寫過的關(guān)于成長/面試/職場進(jìn)階的文章

          當(dāng)我們在學(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」


          你好,我是王知無,一個大數(shù)據(jù)領(lǐng)域的硬核原創(chuàng)作者。

          做過后端架構(gòu)、數(shù)據(jù)中間件、數(shù)據(jù)平臺&架構(gòu)、算法工程化。

          專注大數(shù)據(jù)領(lǐng)域?qū)崟r(shí)動態(tài)&技術(shù)提升&個人成長&職場進(jìn)階,歡迎關(guān)注。

          瀏覽 63
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线的欧美成网站 | 电影一级黄色级网站 | 西西4444WWW无码精品 | 九九久久精品 | 欧洲色综合 |