<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink從1.7到1.14版本升級(jí)匯總

          共 50663字,需瀏覽 102分鐘

           ·

          2021-10-13 10:24

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

          回復(fù)”面試“獲取更多驚喜

          ? ?

          ? ?一 .前言

          官方發(fā)布了Flink1.14版本,但是遺憾的是,中文官網(wǎng)中的案例和資料還都是基于很古老的版本。所以大家照著官網(wǎng)資料跑不通基本代碼也是很正常的。

          所以整理一下從1.7 版本到1.14版本之間的相對(duì)大的變動(dòng). 做到在學(xué)習(xí)的過(guò)程中可以做到心里有數(shù)。

          二 .Flink 1.7 版本

          在 Flink 1.7.0,我們更關(guān)注實(shí)現(xiàn)快速數(shù)據(jù)處理以及以無(wú)縫方式為 Flink 社區(qū)構(gòu)建數(shù)據(jù)密集型應(yīng)用程序。我們最新版本包括一些令人興奮的新功能和改進(jìn),例如對(duì) Scala 2.12 的支持,Exactly-Once 語(yǔ)義的 S3 文件接收器,復(fù)雜事件處理與流SQL的集成.

          2.1. Flink中的Scala 2.12支持

          Flink 1.7.0 是第一個(gè)完全支持 Scala 2.12 的版本。這可以讓用戶使用新的 Scala 版本編寫 Flink 應(yīng)用程序以及利用 Scala 2.12 的生態(tài)系統(tǒng)。

          2.2. 狀態(tài)變化

          在許多情況下,由于需求的變化,長(zhǎng)期運(yùn)行的 Flink 應(yīng)用程序會(huì)在其生命周期內(nèi)發(fā)生變化。在不丟失當(dāng)前應(yīng)用程序進(jìn)度狀態(tài)的情況下更改用戶狀態(tài)是應(yīng)用程序變化的關(guān)鍵要求。Flink 1.7.0 版本中社區(qū)添加了狀態(tài)變化,允許我們靈活地調(diào)整長(zhǎng)時(shí)間運(yùn)行的應(yīng)用程序的用戶狀態(tài)模式,同時(shí)保持與先前保存點(diǎn)的兼容。通過(guò)狀態(tài)變化,我們可以在狀態(tài)模式中添加或刪除列。當(dāng)使用 Avro 生成類作為用戶狀態(tài)時(shí),狀態(tài)模式變化可以開(kāi)箱即用,這意味著狀態(tài)模式可以根據(jù) Avro 的規(guī)范進(jìn)行變化。雖然 Avro 類型是 Flink 1.7 中唯一支持模式變化的內(nèi)置類型,但社區(qū)仍在繼續(xù)致力于在未來(lái)的 Flink 版本中進(jìn)一步擴(kuò)展對(duì)其他類型的支持。

          2.3. Exactly-once語(yǔ)義的S3 StreamingFileSink

          Flink 1.6.0 中引入的 StreamingFileSink 現(xiàn)在已經(jīng)擴(kuò)展到 S3 文件系統(tǒng),并保證 Exactly-once 語(yǔ)義。使用此功能允許所有 S3 用戶構(gòu)建寫入 S3 的 Exactly-once 語(yǔ)義端到端管道。

          2.4. Streaming SQL中支持MATCH_RECOGNIZE

          這是 Apache Flink 1.7.0 的一個(gè)重要補(bǔ)充,它為 Flink SQL 提供了 MATCH_RECOGNIZE 標(biāo)準(zhǔn)的初始支持。此功能融合了復(fù)雜事件處理(CEP)和SQL,可以輕松地對(duì)數(shù)據(jù)流進(jìn)行模式匹配,從而實(shí)現(xiàn)一整套新的用例。此功能目前處于測(cè)試階段。

          2.5. Streaming SQL中的 Temporal Tables 和 Temporal Joins

          Temporal Tables 是 Apache Flink 中的一個(gè)新概念,它為表的更改歷史記錄提供(參數(shù)化)視圖,可以返回表在任何時(shí)間點(diǎn)的內(nèi)容。例如,我們可以使用具有歷史貨幣匯率的表。隨著時(shí)間的推移,表會(huì)不斷發(fā)生變化,并增加更新的匯率。Temporal Table 是一種視圖,可以返回匯率在任何時(shí)間點(diǎn)的實(shí)際狀態(tài)。通過(guò)這樣的表,可以使用正確的匯率將不同貨幣的訂單流轉(zhuǎn)換為通用貨幣。

          Temporal Joins 允許 Streaming 數(shù)據(jù)與不斷變化/更新的表的內(nèi)存和計(jì)算效率的連接,使用處理時(shí)間或事件時(shí)間,同時(shí)符合ANSI SQL。

          流式 SQL 的其他功能除了上面提到的主要功能外,F(xiàn)link 的 Table&SQL API 已經(jīng)擴(kuò)展到更多用例。以下內(nèi)置函數(shù)被添加到API:TO_BASE64,LOG2,LTRIM,REPEAT,REPLACE,COSH,SINH,TANH。SQL Client 現(xiàn)在支持在環(huán)境文件和 CLI 會(huì)話中自定義視圖。此外,CLI 中還添加了基本的 SQL 語(yǔ)句自動(dòng)完成功能。社區(qū)添加了一個(gè) Elasticsearch 6 table sink,允許存儲(chǔ)動(dòng)態(tài)表的更新結(jié)果。

          2.6. 版本化REST API

          從 Flink 1.7.0 開(kāi)始,REST API 已經(jīng)版本化。這保證了 Flink REST API 的穩(wěn)定性,因此可以在 Flink 中針對(duì)穩(wěn)定的 API開(kāi)發(fā)第三方應(yīng)用程序。因此,未來(lái)的 Flink 升級(jí)不需要更改現(xiàn)有的第三方集成。

          2.7. Kafka 2.0 Connector

          Apache Flink 1.7.0 繼續(xù)添加更多的連接器,使其更容易與更多外部系統(tǒng)進(jìn)行交互。在此版本中,社區(qū)添加了 Kafka 2.0 連接器,可以從 Kafka 2.0 讀寫數(shù)據(jù)時(shí)保證 Exactly-Once 語(yǔ)義。

          2.8. 本地恢復(fù)

          Apache Flink 1.7.0 通過(guò)擴(kuò)展 Flink 的調(diào)度來(lái)完成本地恢復(fù)功能,以便在恢復(fù)時(shí)考慮之前的部署位置。如果啟用了本地恢復(fù),F(xiàn)link 將在運(yùn)行任務(wù)的機(jī)器上保留一份最新檢查點(diǎn)的本地副本。將任務(wù)調(diào)度到之前的位置,F(xiàn)link 可以通過(guò)從本地磁盤讀取檢查點(diǎn)狀態(tài)來(lái)最小化恢復(fù)狀態(tài)的網(wǎng)絡(luò)流量。此功能大大提高了恢復(fù)速度。

          2.9. 刪除Flink的傳統(tǒng)模式

          Apache Flink 1.7.0 標(biāo)志著 Flip-6 工作已經(jīng)完全完成并且與傳統(tǒng)模式達(dá)到功能奇偶校驗(yàn)。因此,此版本刪除了對(duì)傳統(tǒng)模式的支持。

          三 .Flink 1.8 版本

          新特性和改進(jìn):

          • Schema Evolution Story 最終版

          • 基于 TTL 持續(xù)清除舊狀態(tài)

          • 使用用戶定義的函數(shù)和聚合進(jìn)行 SQL 模式檢測(cè)

          • 符合 RFC 的 CSV 格式

          • 新的 KafkaDeserializationSchema,可以直接訪問(wèn) ConsumerRecord

          • FlinkKinesisConsumer 中的分片水印選項(xiàng)

          • DynamoDB Streams 的新用戶捕獲表更改

          • 支持用于子任務(wù)協(xié)調(diào)的全局聚合

          重要變化:

          • 使用 Flink 捆綁 Hadoop 庫(kù)的更改:不再發(fā)布包含 hadoop 的便捷二進(jìn)制文件

          • FlinkKafkaConsumer 現(xiàn)在將根據(jù)主題規(guī)范過(guò)濾已恢復(fù)的分區(qū)

          • 表 API 的 Maven 依賴更改:之前具有flink-table依賴關(guān)系的用戶需要將依賴關(guān)系從flink-table-planner更新為正確的依賴關(guān)系 flink-table-api-,具體取決于是使用 Java 還是 Scala:flink-table-api-java-bridge或者flink-table-api-scala-bridge

          3.1. 使用TTL(生存時(shí)間)連續(xù)增量清除舊的Key狀態(tài)

          我們?cè)贔link 1.6(FLINK-9510)中為Key狀態(tài)引入了TTL(生存時(shí)間)。此功能允許在訪問(wèn)時(shí)清理并使Key狀態(tài)條目無(wú)法訪問(wèn)。另外,在編寫保存點(diǎn)/檢查點(diǎn)時(shí),現(xiàn)在也將清理狀態(tài)。Flink 1.8引入了對(duì)RocksDB狀態(tài)后端(FLINK-10471)和堆狀態(tài)后端(FLINK-10473)的舊條數(shù)的連續(xù)清理。這意味著舊的條數(shù)將(根據(jù)TTL設(shè)置)不斷被清理掉。

          3.2. 恢復(fù)保存點(diǎn)時(shí)對(duì)模式遷移的新支持

          使用Flink 1.7.0,我們?cè)谑褂肁vroSerializer時(shí)添加了對(duì)更改狀態(tài)模式的支持。使用Flink 1.8.0,我們?cè)赥ypeSerializers將所有內(nèi)置遷移到新的序列化器快照抽象方面取得了很大進(jìn)展,該抽象理論上允許模式遷移。在Flink附帶的序列化程序中,我們現(xiàn)在支持PojoSerializer (FLINK-11485)和Java EnumSerializer (FLINK-11334)以及有限情況下的Kryo(FLINK-11323)的模式遷移格式。

          3.3. 保存點(diǎn)兼容性

          TraversableSerializer 此序列化程序(FLINK-11539)中的更新,包含Scala的Flink 1.2中的保存點(diǎn)將不再與Flink 1.8兼容??梢酝ㄟ^(guò)升級(jí)到Flink 1.3和Flink 1.7之間的版本,然后再更新至Flink 1.8來(lái)解決此限制。

          3.4. RocksDB版本沖突并切換到FRocksDB(FLINK-10471)

          需要切換到名為FRocksDB的RocksDB的自定義構(gòu)建,因?yàn)樾枰猂ocksDB中的某些更改來(lái)支持使用TTL進(jìn)行連續(xù)狀態(tài)清理。FRocksDB的已使用版本基于RocksDB的升級(jí)版本5.17.2。對(duì)于Mac OS X,僅支持OS X版本> =10.13的RocksDB版本5.17.2。

          另見(jiàn):https://github.com/facebook/rocksdb/issues/4862

          3.5. Maven 依賴

          使用Flink捆綁Hadoop庫(kù)的更改(FLINK-11266)

          包含hadoop的便捷二進(jìn)制文件不再發(fā)布。

          如果部署依賴于flink-shaded-hadoop2包含 flink-dist,則必須從下載頁(yè)面的可選組件部分手動(dòng)下載并打包Hadoop jar并將其復(fù)制到/lib目錄中。另外一種方法,可以通過(guò)打包flink-dist和激活 include-hadoopmaven配置文件來(lái)構(gòu)建包含hadoop的Flink分發(fā)。

          由于hadoop flink-dist默認(rèn)不再包含在內(nèi),因此指定-DwithoutHadoop何時(shí)打包flink-dist將不再影響構(gòu)建。

          3.6. TaskManager配置(FLINK-11716)

          TaskManagers現(xiàn)在默認(rèn)綁定到主機(jī)IP地址而不是主機(jī)名??梢酝ㄟ^(guò)配置選項(xiàng)控制此行為taskmanager.network.bind-policy。如果你的Flink集群在升級(jí)后遇到莫名其妙的連接問(wèn)題,嘗試設(shè)置taskmanager.network.bind-policy: name在flink-conf.yaml 返回前的1.8的設(shè)置行為。

          3.7. Table API 的變動(dòng)

          • 直接表構(gòu)造函數(shù)使用的取消預(yù)測(cè)(FLINK-11447) Flink 1.8不贊成Table在Table API中直接使用該類的構(gòu)造函數(shù)。此構(gòu)造函數(shù)以前將用于執(zhí)行與橫向表的連接。你現(xiàn)在應(yīng)該使用table.joinLateral()或 table.leftOuterJoinLateral()代替。這種更改對(duì)于將Table類轉(zhuǎn)換為接口是必要的,這將使Table API在未來(lái)更易于維護(hù)和更清潔。

          • 引入新的CSV格式符(FLINK-9964)

          此版本為符合RFC4180的CSV文件引入了新的格式符。新描述符可用作 org.apache.flink.table.descriptors.Csv。

          目前,這只能與Kafka一起使用。舊描述符可org.apache.flink.table.descriptors.OldCsv用于文件系統(tǒng)連接器。

          靜態(tài)生成器方法在TableEnvironment(FLINK-11445)上的棄用,為了將API與實(shí)際實(shí)現(xiàn)分開(kāi):TableEnvironment.getTableEnvironment()。

          不推薦使用靜態(tài)方法。你現(xiàn)在應(yīng)該使用Batch/StreamTableEnvironment.create()。

          • 表API Maven模塊中的更改(FLINK-11064)

          之前具有flink-table依賴關(guān)系的用戶需要更新其依賴關(guān)系flink-table-planner以及正確的依賴關(guān)系flink-table-api-?,具體取決于是使用Java還是Scala:flink-table-api-java-bridge或者flink-table-api-scala-bridge。

          • 更改為外部目錄表構(gòu)建器(FLINK-11522)

          ExternalCatalogTable.builder()不贊成使用ExternalCatalogTableBuilder()。

          • 更改為表API連接器jar的命名(FLINK-11026)

          Kafka/elasticsearch6 sql-jars的命名方案已經(jīng)更改。在maven術(shù)語(yǔ)中,它們不再具有sql-jar限定符,而artifactId現(xiàn)在以前綴為例,flink-sql而不是flink例如flink-sql-connector-kafka。

          • 更改為指定Null的方式(FLINK-11785)

          現(xiàn)在Table API中的Null需要定義nullof(type)而不是Null(type)。舊方法已被棄用。

          3.8. 連接器變動(dòng)

          • 引入可直接訪問(wèn)ConsumerRecord的新KafkaDeserializationSchema(FLINK-8354)

          對(duì)于FlinkKafkaConsumers,我們推出了一個(gè)新的KafkaDeserializationSchema ,可以直接訪問(wèn)KafkaConsumerRecord。這包含了該 KeyedSerializationSchema功能,該功能已棄用但目前仍可以使用。

          • FlinkKafkaConsumer現(xiàn)在將根據(jù)主題規(guī)范過(guò)濾恢復(fù)的分區(qū)(FLINK-10342)

          從Flink 1.8.0開(kāi)始,現(xiàn)在FlinkKafkaConsumer總是過(guò)濾掉已恢復(fù)的分區(qū),這些分區(qū)不再與要在還原的執(zhí)行中訂閱的指定主題相關(guān)聯(lián)。此行為在以前的版本中不存在FlinkKafkaConsumer。

          如果您想保留以前的行為。請(qǐng)使用上面的

          disableFilterRestoredPartitionsWithSubscribedTopics()

          配置方法FlinkKafkaConsumer。

          考慮這個(gè)例子:如果你有一個(gè)正在消耗topic的Kafka Consumer A,你做了一個(gè)保存點(diǎn),然后改變你的Kafka消費(fèi)者而不是從topic消費(fèi)B,然后從保存點(diǎn)重新啟動(dòng)你的工作。在此更改之前,您的消費(fèi)者現(xiàn)在將使用這兩個(gè)主題A,B因?yàn)樗鎯?chǔ)在消費(fèi)者正在使用topic消費(fèi)的狀態(tài)A。通過(guò)此更改,您的使用者將僅B在還原后使用topic,因?yàn)槲覀兪褂门渲玫膖opic過(guò)濾狀態(tài)中存儲(chǔ)的topic。

          其它接口改變:

          1、從TypeSerializer接口(FLINK-9803)中刪除了canEqual()方法

          這些canEqual()方法通常用于跨類型層次結(jié)構(gòu)進(jìn)行適當(dāng)?shù)南嗟刃詸z查。在TypeSerializer實(shí)際上并不需要這個(gè)屬性,因此該方法現(xiàn)已刪除。

          2、刪除CompositeSerializerSnapshot實(shí)用程序類(FLINK-11073)

          該CompositeSerializerSnapshot實(shí)用工具類已被刪除。

          現(xiàn)在CompositeTypeSerializerSnapshot,你應(yīng)該使用復(fù)合序列化程序的快照,該序列化程序?qū)⑿蛄谢山o多個(gè)嵌套的序列化程序。有關(guān)使用的說(shuō)明,請(qǐng)參閱此處CompositeTypeSerializerSnapshot。

          四 .Flink 1.9 版本

          2019年 8月22日,Apache Flink 1.9.0 版本正式發(fā)布,這也是阿里內(nèi)部版本 Blink 合并入 Flink 后的首次版本發(fā)布。

          此次版本更新帶來(lái)的重大功能包括批處理作業(yè)的批式恢復(fù),以及 Table API 和 SQL 的基于 Blink 的新查詢引擎(預(yù)覽版)。同時(shí),這一版本還推出了 State Processor API,這是社區(qū)最迫切需求的功能之一,該 API 使用戶能夠用 Flink DataSet 作業(yè)靈活地讀寫保存點(diǎn)。此外,F(xiàn)link 1.9 還包括一個(gè)重新設(shè)計(jì)的 WebUI 和新的 Python Table API (預(yù)覽版)以及與 Apache Hive 生態(tài)系統(tǒng)的集成(預(yù)覽版)。

          新功能和改進(jìn)

          • 細(xì)粒度批作業(yè)恢復(fù) (FLIP-1)

          • State Processor API (FLIP-43)

          • Stop-with-Savepoint (FLIP-34)

          • 重構(gòu) Flink WebUI

          • 預(yù)覽新的 Blink SQL 查詢處理器

          • Table API / SQL 的其他改進(jìn)

          • 預(yù)覽 Hive 集成 (FLINK-10556)

          • 預(yù)覽新的 Python Table API (FLIP-38)

          4.1. 細(xì)粒度批作業(yè)恢復(fù) (FLIP-1)

          批作業(yè)(DataSet、Table API 和 SQL)從 task 失敗中恢復(fù)的時(shí)間被顯著縮短了。在 Flink 1.9 之前,批處理作業(yè)中的 task 失敗是通過(guò)取消所有 task 并重新啟動(dòng)整個(gè)作業(yè)來(lái)恢復(fù)的,即作業(yè)從頭開(kāi)始,所有進(jìn)度都會(huì)廢棄。在此版本中,F(xiàn)link 將中間結(jié)果保留在網(wǎng)絡(luò) shuffle 的邊緣,并使用此數(shù)據(jù)去恢復(fù)那些僅受故障影響的 task。所謂 task 的 “failover regions” (故障區(qū))是指通過(guò) pipelined 方式連接的數(shù)據(jù)交換方式,定義了 task 受故障影響的邊界。

          要使用這個(gè)新的故障策略,需要確保 flink-conf.yaml 中有 jobmanager.execution.failover-strategy: region 的配置。

          注意:1.9 發(fā)布包中默認(rèn)就已經(jīng)包含了該配置項(xiàng),不過(guò)當(dāng)從之前版本升級(jí)上來(lái)時(shí),如果要復(fù)用之前的配置的話,需要手動(dòng)加上該配置。

          “Region” 的故障策略也能同時(shí)提升 “embarrassingly parallel” 類型的流作業(yè)的恢復(fù)速度,也就是沒(méi)有任何像 keyBy() 和 rebalance 的 shuffle 的作業(yè)。當(dāng)這種作業(yè)在恢復(fù)時(shí),只有受影響的故障區(qū)的 task 需要重啟。對(duì)于其他類型的流作業(yè),故障恢復(fù)行為與之前的版本一樣。

          4.2. State Processor API (FLIP-43)

          直到 Flink 1.9,從外部訪問(wèn)作業(yè)的狀態(tài)僅局限于:Queryable State(可查詢狀態(tài))實(shí)驗(yàn)性功能。此版本中引入了一種新的、強(qiáng)大的類庫(kù),基于 DataSet 支持讀取、寫入、和修改狀態(tài)快照。在實(shí)踐上,這意味著:

          • Flink 作業(yè)的狀態(tài)可以自主構(gòu)建了,可以通過(guò)讀取外部系統(tǒng)的數(shù)據(jù)(例如外部數(shù)據(jù)庫(kù)),然后轉(zhuǎn)換成 savepoint。

          • Savepoint 中的狀態(tài)可以使用任意的 Flink 批處理 API 查詢(DataSet、Table、SQL)。例如,分析相關(guān)的狀態(tài)模式或檢查狀態(tài)差異以支持應(yīng)用程序?qū)徍嘶蚬收吓挪椤?/span>

          • Savepoint 中的狀態(tài) schema 可以離線遷移了,而之前的方案只能在訪問(wèn)狀態(tài)時(shí)進(jìn)行,是一種在線遷移。

          • Savepoint 中的無(wú)效數(shù)據(jù)可以被識(shí)別出來(lái)并糾正。

          新的 State Processor API 覆蓋了所有類型的快照:savepoint,full checkpoint 和 incremental checkpoint。

          4.3. Stop-with-Savepoint (FLIP-34)

          “Cancel-with-savepoint” 是停止、重啟、fork、或升級(jí) Flink 作業(yè)的一個(gè)常用操作。然而,當(dāng)前的實(shí)現(xiàn)并沒(méi)有保證輸出到 exactly-once sink 的外部存儲(chǔ)的數(shù)據(jù)持久化。為了改進(jìn)停止作業(yè)時(shí)的端到端語(yǔ)義,F(xiàn)link 1.9 引入了一種新的 SUSPEND 模式,可以帶 savepoint 停止作業(yè),保證了輸出數(shù)據(jù)的一致性。你可以使用 Flink CLI 來(lái) suspend 一個(gè)作業(yè):

          bin/flink stop -p [:targetSavepointDirectory] :jobId

          4.4. 重構(gòu) Flink WebUI

          社區(qū)討論了現(xiàn)代化 Flink WebUI 的提案,決定采用 Angular 的最新穩(wěn)定版來(lái)重構(gòu)這個(gè)組件。從 Angular 1.x 躍升到了 7.x 。重新設(shè)計(jì)的 UI 是 1.9.0 的默認(rèn)版本,不過(guò)有一個(gè)按鈕可以切換到舊版的 WebUI。

          4.5. 新 Blink SQL 查詢處理器預(yù)覽

          在 Blink 捐贈(zèng)給 Apache Flink 之后,社區(qū)就致力于為 Table API 和 SQL 集成 Blink 的查詢優(yōu)化器和 runtime。第一步,我們將 flink-table 單模塊重構(gòu)成了多個(gè)小模塊(FLIP-32)。這對(duì)于 Java 和 Scala API 模塊、優(yōu)化器、以及 runtime 模塊來(lái)說(shuō),有了一個(gè)更清晰的分層和定義明確的接口。

          緊接著,我們擴(kuò)展了 Blink 的 planner 以實(shí)現(xiàn)新的優(yōu)化器接口,所以現(xiàn)在有兩個(gè)插件化的查詢處理器來(lái)執(zhí)行 Table API 和 SQL:1.9 以前的 Flink 處理器和新的基于 Blink 的處理器?;?Blink 的查詢處理器提供了更好地 SQL 覆蓋率(1.9 完整支持 TPC-H,TPC-DS 的支持在下一個(gè)版本的計(jì)劃中)并通過(guò)更廣泛的查詢優(yōu)化(基于成本的執(zhí)行計(jì)劃選擇和更多的優(yōu)化規(guī)則)、改進(jìn)的代碼生成機(jī)制、和調(diào)優(yōu)過(guò)的算子實(shí)現(xiàn)來(lái)提升批處理查詢的性能。除此之外,基于 Blink 的查詢處理器還提供了更強(qiáng)大的流處理能力,包括一些社區(qū)期待已久的新功能(如維表 Join,TopN,去重)和聚合場(chǎng)景緩解數(shù)據(jù)傾斜的優(yōu)化,以及內(nèi)置更多常用的函數(shù)。

          注:兩個(gè)查詢處理器之間的語(yǔ)義和功能大部分是一致的,但并未完全對(duì)齊。具體請(qǐng)查看發(fā)布日志。

          不過(guò), Blink 的查詢處理器的集成還沒(méi)有完全完成。因此,1.9 之前的 Flink 處理器仍然是1.9 版本的默認(rèn)處理器,建議用于生產(chǎn)設(shè)置。你可以在創(chuàng)建 TableEnvironment 時(shí)通過(guò) EnvironmentSettings 配置啟用 Blink 處理器。被選擇的處理器必須要在正在執(zhí)行的 Java 進(jìn)程的類路徑中。對(duì)于集群設(shè)置,默認(rèn)兩個(gè)查詢處理器都會(huì)自動(dòng)地加載到類路徑中。當(dāng)從 IDE 中運(yùn)行一個(gè)查詢時(shí),需要在項(xiàng)目中顯式地增加一個(gè)處理器的依賴。

          4.6. Table API / SQL 的其他改進(jìn)

          除了圍繞 Blink Planner 令人興奮的進(jìn)展外,社區(qū)還做了一系列的改進(jìn),包括:

          • 為 Table API / SQL 的 Java 用戶去除 Scala 依賴 (FLIP-32) 作為重構(gòu)和拆分 flink-table 模塊工作的一部分,我們?yōu)?Java 和 Scala 創(chuàng)建了兩個(gè)單獨(dú)的 API 模塊。對(duì)于 Scala 用戶來(lái)說(shuō),沒(méi)有什么改變。不過(guò)現(xiàn)在 Java 用戶在使用 Table API 和 SQL 時(shí),可以不用引入一堆 Scala 依賴了。

          • 重構(gòu) Table API / SQL 的類型系統(tǒng)(FLIP-37) 我們實(shí)現(xiàn)了一個(gè)新的數(shù)據(jù)類型系統(tǒng),以便從 Table API 中移除對(duì) Flink TypeInformation 的依賴,并提高其對(duì) SQL 標(biāo)準(zhǔn)的遵從性。不過(guò)還在進(jìn)行中,預(yù)計(jì)將在下一版本完工,在 Flink 1.9 中,UDF 尚未移植到新的類型系統(tǒng)上。

          • Table API 的多行多列轉(zhuǎn)換(FLIP-29) Table API 擴(kuò)展了一組支持多行和多列、輸入和輸出的轉(zhuǎn)換的功能。這些轉(zhuǎn)換顯著簡(jiǎn)化了處理邏輯的實(shí)現(xiàn),同樣的邏輯使用關(guān)系運(yùn)算符來(lái)實(shí)現(xiàn)是比較麻煩的。

          • 嶄新的統(tǒng)一的 Catalog API Catalog 已有的一些接口被重構(gòu)和(某些)被替換了,從而統(tǒng)一了內(nèi)部和外部 catalog 的處理。這項(xiàng)工作主要是為了 Hive 集成(見(jiàn)下文)而啟動(dòng)的,不過(guò)也改進(jìn)了 Flink 在管理 catalog 元數(shù)據(jù)的整體便利性。

          • SQL API 中的 DDL 支持 (FLINK-10232) 到目前為止,F(xiàn)link SQL 已經(jīng)支持 DML 語(yǔ)句(如 SELECT,INSERT)。但是外部表(table source 和 table sink)必須通過(guò) Java/Scala 代碼的方式或配置文件的方式注冊(cè)。1.9 版本中,我們支持 SQL DDL 語(yǔ)句的方式注冊(cè)和刪除表(CREATE TABLE,DROP TABLE)。然而,我們還沒(méi)有增加流特定的語(yǔ)法擴(kuò)展來(lái)定義時(shí)間戳抽取和 watermark 生成策略等。流式的需求將會(huì)在下一版本完整支持。

          五 .Flink 1.10 版本 [重要版本 : Blink 整合完成]

          作為 Flink 社區(qū)迄今為止規(guī)模最大的一次版本升級(jí),F(xiàn)link 1.10 容納了超過(guò) 200 位貢獻(xiàn)者對(duì)超過(guò) 1200 個(gè) issue 的開(kāi)發(fā)實(shí)現(xiàn),包含對(duì) Flink 作業(yè)的整體性能及穩(wěn)定性的顯著優(yōu)化、對(duì)原生 Kubernetes 的初步集成(beta 版本)以及對(duì) Python 支持(PyFlink)的重大優(yōu)化。

          Flink 1.10 同時(shí)還標(biāo)志著對(duì) Blink[1] 的整合宣告完成,隨著對(duì) Hive 的生產(chǎn)級(jí)別集成及對(duì) TPC-DS 的全面覆蓋,F(xiàn)link 在增強(qiáng)流式 SQL 處理能力的同時(shí)也具備了成熟的批處理能力。

          5.1. 內(nèi)存管理及配置優(yōu)化

          Flink 目前的 TaskExecutor 內(nèi)存模型存在著一些缺陷,導(dǎo)致優(yōu)化資源利用率比較困難,例如:

          流和批處理內(nèi)存占用的配置模型不同;流處理中的 RocksDB state backend 需要依賴用戶進(jìn)行復(fù)雜的配置。為了讓內(nèi)存配置變的對(duì)于用戶更加清晰、直觀,F(xiàn)link 1.10 對(duì) TaskExecutor 的內(nèi)存模型和配置邏輯進(jìn)行了較大的改動(dòng) (FLIP-49 [7])。這些改動(dòng)使得 Flink 能夠更好地適配所有部署環(huán)境(例如 Kubernetes, Yarn, Mesos),讓用戶能夠更加嚴(yán)格的控制其內(nèi)存開(kāi)銷。

          • Managed 內(nèi)存擴(kuò)展

          Managed 內(nèi)存的范圍有所擴(kuò)展,還涵蓋了 RocksDB state backend 使用的內(nèi)存。盡管批處理作業(yè)既可以使用堆內(nèi)內(nèi)存也可以使用堆外內(nèi)存,使用 RocksDB state backend 的流處理作業(yè)卻只能利用堆外內(nèi)存。因此為了讓用戶執(zhí)行流和批處理作業(yè)時(shí)無(wú)需更改集群的配置,我們規(guī)定從現(xiàn)在起 managed 內(nèi)存只能在堆外。

          • 簡(jiǎn)化 RocksDB 配置

          此前,配置像 RocksDB 這樣的堆外 state backend 需要進(jìn)行大量的手動(dòng)調(diào)試,例如減小 JVM 堆空間、設(shè)置 Flink 使用堆外內(nèi)存等?,F(xiàn)在,F(xiàn)link 的開(kāi)箱配置即可支持這一切,且只需要簡(jiǎn)單地改變 managed 內(nèi)存的大小即可調(diào)整 RocksDB state backend 的內(nèi)存預(yù)算。

          另一個(gè)重要的優(yōu)化是,F(xiàn)link 現(xiàn)在可以限制 RocksDB 的 native 內(nèi)存占用(FLINK-7289 [8]),以避免超過(guò)總的內(nèi)存預(yù)算——這對(duì)于 Kubernetes 等容器化部署環(huán)境尤為重要。關(guān)于如何開(kāi)啟、調(diào)試該特性,請(qǐng)參考 RocksDB 調(diào)試[9]。

          注:FLIP-49 改變了集群的資源配置過(guò)程,因此從以前的 Flink 版本升級(jí)時(shí)可能需要對(duì)集群配置進(jìn)行調(diào)整。詳細(xì)的變更日志及調(diào)試指南請(qǐng)參考文檔[10]。

          5.2. 統(tǒng)一的作業(yè)提交邏輯

          在此之前,提交作業(yè)是由執(zhí)行環(huán)境負(fù)責(zé)的,且與不同的部署目標(biāo)(例如 Yarn, Kubernetes, Mesos)緊密相關(guān)。這導(dǎo)致用戶需要針對(duì)不同環(huán)境保留多套配置,增加了管理的成本。

          在 Flink 1.10 中,作業(yè)提交邏輯被抽象到了通用的 Executor 接口(FLIP-73 [11])。新增加的 ExecutorCLI (FLIP-81 [12])引入了為任意執(zhí)行目標(biāo)[13]指定配置參數(shù)的統(tǒng)一方法。此外,隨著引入 JobClient(FLINK-74 [14])負(fù)責(zé)獲取 JobExecutionResult,獲取作業(yè)執(zhí)行結(jié)果的邏輯也得以與作業(yè)提交解耦。

          5.3. 原生 Kubernetes 集成(Beta)

          對(duì)于想要在容器化環(huán)境中嘗試 Flink 的用戶來(lái)說(shuō),想要在 Kubernetes 上部署和管理一個(gè) Flink standalone 集群,首先需要對(duì)容器、算子及像 kubectl 這樣的環(huán)境工具有所了解。

          在 Flink 1.10 中,我們推出了初步的支持 session 模式的主動(dòng) Kubernetes 集成(FLINK-9953 [15])。其中,“主動(dòng)”指 Flink ResourceManager (K8sResMngr) 原生地與 Kubernetes 通信,像 Flink 在 Yarn 和 Mesos 上一樣按需申請(qǐng) pod。用戶可以利用 namespace,在多租戶環(huán)境中以較少的資源開(kāi)銷啟動(dòng) Flink。這需要用戶提前配置好 RBAC 角色和有足夠權(quán)限的服務(wù)賬號(hào)。

          正如在統(tǒng)一的作業(yè)提交邏輯一節(jié)中提到的,F(xiàn)link 1.10 將命令行參數(shù)映射到了統(tǒng)一的配置。因此,用戶可以參閱 Kubernetes 配置選項(xiàng),在命令行中使用以下命令向 Kubernetes 提交 Flink 作業(yè)。

          ./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id= examples/streaming/WindowJoin.jar

          5.4. Table API/SQL: 生產(chǎn)可用的 Hive 集成

          Flink 1.9 推出了預(yù)覽版的 Hive 集成。該版本允許用戶使用 SQL DDL 將 Flink 特有的元數(shù)據(jù)持久化到 Hive Metastore、調(diào)用 Hive 中定義的 UDF 以及讀、寫 Hive 中的表。Flink 1.10 進(jìn)一步開(kāi)發(fā)和完善了這一特性,帶來(lái)了全面兼容 Hive 主要版本[17]的生產(chǎn)可用的 Hive 集成。

          • Batch SQL 原生分區(qū)支持

          此前,F(xiàn)link 只支持寫入未分區(qū)的 Hive 表。在 Flink 1.10 中,F(xiàn)link SQL 擴(kuò)展支持了 INSERT OVERWRITE 和 PARTITION 的語(yǔ)法(FLIP-63 [18]),允許用戶寫入 Hive 中的靜態(tài)和動(dòng)態(tài)分區(qū)。

          寫入靜態(tài)分區(qū)

          INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 …)] select_statement1 FROM from_statement;

          寫入動(dòng)態(tài)分區(qū)

          INSERT { INTO | OVERWRITE } TABLE tablename1 select_statement1 FROM from_statement;

          對(duì)分區(qū)表的全面支持,使得用戶在讀取數(shù)據(jù)時(shí)能夠受益于分區(qū)剪枝,減少了需要掃描的數(shù)據(jù)量,從而大幅提升了這些操作的性能。

          • 其他優(yōu)化

          除了分區(qū)剪枝,F(xiàn)link 1.10 的 Hive 集成還引入了許多數(shù)據(jù)讀取[19]方面的優(yōu)化,例如:

          投影下推:Flink 采用了投影下推技術(shù),通過(guò)在掃描表時(shí)忽略不必要的域,最小化 Flink 和 Hive 表之間的數(shù)據(jù)傳輸量。這一優(yōu)化在表的列數(shù)較多時(shí)尤為有效。

          LIMIT 下推:對(duì)于包含 LIMIT 語(yǔ)句的查詢,F(xiàn)link 在所有可能的地方限制返回的數(shù)據(jù)條數(shù),以降低通過(guò)網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量。讀取數(shù)據(jù)時(shí)的 ORC 向量化:為了提高讀取 ORC 文件的性能,對(duì)于 Hive 2.0.0 及以上版本以及非復(fù)合數(shù)據(jù)類型的列,F(xiàn)link 現(xiàn)在默認(rèn)使用原生的 ORC 向量化讀取器。

          • 將可插拔模塊作為 Flink 內(nèi)置對(duì)象(Beta)

          Flink 1.10 在 Flink table 核心引入了通用的可插拔模塊機(jī)制,目前主要應(yīng)用于系統(tǒng)內(nèi)置函數(shù)(FLIP-68 [20])。通過(guò)模塊,用戶可以擴(kuò)展 Flink 的系統(tǒng)對(duì)象,例如像使用 Flink 系統(tǒng)函數(shù)一樣使用 Hive 內(nèi)置函數(shù)。新版本中包含一個(gè)預(yù)先實(shí)現(xiàn)好的 HiveModule,能夠支持多個(gè) Hive 版本,當(dāng)然用戶也可以選擇編寫自己的可插拔模塊。

          5.5. 其他 Table API/SQL 優(yōu)化

          • SQL DDL 中的 watermark 和計(jì)算列

          Flink 1.10 在 SQL DDL 中增加了針對(duì)流處理定義時(shí)間屬性及產(chǎn)生 watermark 的語(yǔ)法擴(kuò)展(FLIP-66 [22])。這使得用戶可以在用 DDL 語(yǔ)句創(chuàng)建的表上進(jìn)行基于時(shí)間的操作(例如窗口)以及定義 watermark 策略。

          CREATE TABLE table_name (

          WATERMARK FOR columnName AS

          ) WITH (
          ...
          )
          • 其他 SQL DDL 擴(kuò)展

          Flink 現(xiàn)在嚴(yán)格區(qū)分臨時(shí)/持久、系統(tǒng)/目錄函數(shù)(FLIP-57 [24])。這不僅消除了函數(shù)引用中的歧義,還帶來(lái)了確定的函數(shù)解析順序(例如,當(dāng)存在命名沖突時(shí),比起目錄函數(shù)、持久函數(shù) Flink 會(huì)優(yōu)先使用系統(tǒng)函數(shù)、臨時(shí)函數(shù))。

          在 FLIP-57 的基礎(chǔ)上,我們擴(kuò)展了 SQL DDL 的語(yǔ)法,支持創(chuàng)建目錄函數(shù)、臨時(shí)函數(shù)以及臨時(shí)系統(tǒng)函數(shù)(FLIP-79):

          CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION

          [IF NOT EXISTS] [catalog_name.][db_name.]function_name

          AS identifier [LANGUAGE JAVA|SCALA]

          關(guān)于目前完整的 Flink SQL DDL 支持,請(qǐng)參考最新的文檔[26]。

          注:為了今后正確地處理和保證元對(duì)象(表、視圖、函數(shù))上的行為一致性,F(xiàn)link 廢棄了 Table API 中的部分對(duì)象申明方法,以使留下的方法更加接近標(biāo)準(zhǔn)的 SQL DDL(FLIP-64)。

          • 批處理完整的 TPC-DS 覆蓋

          TPC-DS 是廣泛使用的業(yè)界標(biāo)準(zhǔn)決策支持 benchmark,用于衡量基于 SQL 的數(shù)據(jù)處理引擎性能。Flink 1.10 端到端地支持所有 TPC-DS 查詢(FLINK-11491 [28]),標(biāo)志著 Flink SQL 引擎已經(jīng)具備滿足現(xiàn)代數(shù)據(jù)倉(cāng)庫(kù)及其他類似的處理需求的能力。

          5.6. PyFlink: 支持原生用戶自定義函數(shù)(UDF)

          作為 Flink 全面支持 Python 的第一步,在之前版本中我們發(fā)布了預(yù)覽版的 PyFlink。在新版本中,我們專注于讓用戶在 Table API/SQL 中注冊(cè)并使用自定義函數(shù)(UDF,另 UDTF / UDAF 規(guī)劃中)(FLIP-58)。

          如果你對(duì)這一特性的底層實(shí)現(xiàn)(基于 Apache Beam 的可移植框架)感興趣,請(qǐng)參考 FLIP-58 的 Architecture 章節(jié)以及 FLIP-78。這些數(shù)據(jù)結(jié)構(gòu)為支持 Pandas 以及今后將 PyFlink 引入到 DataStream API 奠定了基礎(chǔ)。

          從 Flink 1.10 開(kāi)始,用戶只要執(zhí)行以下命令就可以輕松地通過(guò) pip 安裝 PyFlink:

          pip install apache-flink

          5.7. 重要變更

          • FLINK-10725[34]:Flink 現(xiàn)在可以使用 Java 11 編譯和運(yùn)行。

          • FLINK-15495[35]:SQL 客戶端現(xiàn)在默認(rèn)使用 Blink planner,向用戶提供最新的特性及優(yōu)化。Table API 同樣計(jì)劃在下個(gè)版本中從舊的 planner 切換到 Blink planner,我們建議用戶現(xiàn)在就開(kāi)始嘗試和熟悉 Blink planner。

          • FLINK-13025[36]:新的 Elasticsearch sink connector[37] 全面支持 Elasticsearch 7.x 版本。

          • FLINK-15115[38]:Kafka 0.8 和 0.9 的 connector 已被標(biāo)記為廢棄并不再主動(dòng)支持。如果你還在使用這些版本或有其他相關(guān)問(wèn)題,請(qǐng)通過(guò) @dev 郵件列表聯(lián)系我們。

          • FLINK-14516[39]:非基于信用的網(wǎng)絡(luò)流控制已被移除,同時(shí)移除的還有配置項(xiàng)“taskmanager.network.credit.model”。今后,F(xiàn)link 將總是使用基于信用的網(wǎng)絡(luò)流控制。

          • FLINK-12122[40]:在 Flink 1.5.0 中,F(xiàn)LIP-6[41] 改變了 slot 在 TaskManager 之間的分布方式。要想使用此前的調(diào)度策略,既盡可能將負(fù)載分散到所有當(dāng)前可用的 TaskManager,用戶可以在 flink-conf.yaml 中設(shè)置 “cluster.evenly-spread-out-slots: true”。

          • FLINK-11956[42]:s3-hadoop 和 s3-presto 文件系統(tǒng)不再使用類重定位加載方式,而是使用插件方式加載,同時(shí)無(wú)縫集成所有認(rèn)證提供者。我們強(qiáng)烈建議其他文件系統(tǒng)也只使用插件加載方式,并將陸續(xù)移除重定位加載方式。

          Flink 1.9 推出了新的 Web UI,同時(shí)保留了原來(lái)的 Web UI 以備不時(shí)之需。截至目前,我們沒(méi)有收到關(guān)于新的 UI 存在問(wèn)題的反饋,因此社區(qū)投票決定在 Flink 1.10 中移除舊的 Web UI。

          原文: https://developer.aliyun.com/article/744734

          官方地址: https://flink.apache.org/news/2020/02/11/release-1.10.0.html

          5.7. 重要變更 FLINK-10725[34]:Flink 現(xiàn)在可以使用 Java 11 編譯和運(yùn)行。FLINK-15495[35]:SQL 客戶端現(xiàn)在默認(rèn)使用 Blink planner,向用戶提供最新的特性及優(yōu)化。Table API 同樣計(jì)劃在下個(gè)版本中從舊的 planner 切換到 Blink planner,我們建議用戶現(xiàn)在就開(kāi)始嘗試和熟悉 Blink planner。FLINK-13025[36]:新的 Elasticsearch sink connector[37] 全面支持 Elasticsearch 7.x 版本。FLINK-15115[38]:Kafka 0.8 和 0.9 的 connector 已被標(biāo)記為廢棄并不再主動(dòng)支持。如果你還在使用這些版本或有其他相關(guān)問(wèn)題,請(qǐng)通過(guò) @dev 郵件列表聯(lián)系我們。FLINK-14516[39]:非基于信用的網(wǎng)絡(luò)流控制已被移除,同時(shí)移除的還有配置項(xiàng)“taskmanager.network.credit.model”。今后,F(xiàn)link 將總是使用基于信用的網(wǎng)絡(luò)流控制。FLINK-12122[40]:在 Flink 1.5.0 中,F(xiàn)LIP-6[41] 改變了 slot 在 TaskManager 之間的分布方式。要想使用此前的調(diào)度策略,既盡可能將負(fù)載分散到所有當(dāng)前可用的 TaskManager,用戶可以在 flink-conf.yaml 中設(shè)置 “cluster.evenly-spread-out-slots: true”。FLINK-11956[42]:s3-hadoop 和 s3-presto 文件系統(tǒng)不再使用類重定位加載方式,而是使用插件方式加載,同時(shí)無(wú)縫集成所有認(rèn)證提供者。我們強(qiáng)烈建議其他文件系統(tǒng)也只使用插件加載方式,并將陸續(xù)移除重定位加載方式。Flink 1.9 推出了新的 Web UI,同時(shí)保留了原來(lái)的 Web UI 以備不時(shí)之需。截至目前,我們沒(méi)有收到關(guān)于新的 UI 存在問(wèn)題的反饋,因此社區(qū)投票決定[43]在 Flink 1.10 中移除舊的 Web UI。

          原文: https://developer.aliyun.com/article/744734 官方地址: https://flink.apache.org/news/2020/02/11/release-1.10.0.html?spm=a2c6h.12873639.0.0.749e4fc0c1D14m

          六 .Flink 1.11 版本 [重要版本]

          Flink 1.11.0 正式發(fā)布。歷時(shí)近 4 個(gè)月,F(xiàn)link 在生態(tài)、易用性、生產(chǎn)可用性、穩(wěn)定性等方面都進(jìn)行了增強(qiáng)和改善。

          core engine 引入了 unaligned checkpoints,這是對(duì) Flink 的容錯(cuò)機(jī)制的重大更改,該機(jī)制可改善在高背壓下的檢查點(diǎn)性能。

          一個(gè)新的 Source API 通過(guò)統(tǒng)一批處理和 streaming 執(zhí)行以及將內(nèi)部組件(例如事件時(shí)間處理、水印生成或空閑檢測(cè))卸載到 Flink 來(lái)簡(jiǎn)化(自定義)sources 的實(shí)現(xiàn)。

          Flink SQL 引入了對(duì)變更數(shù)據(jù)捕獲(CDC)的支持,以輕松使用和解釋來(lái)自 Debezium 之類的工具的數(shù)據(jù)庫(kù)變更日志。更新的 FileSystem 連接器還擴(kuò)展了 Table API/SQL 支持的用例和格式集,從而實(shí)現(xiàn)了直接啟用從 Kafka 到 Hive 的 streaming 數(shù)據(jù)傳輸?shù)确桨浮?/span>

          PyFlink 的多項(xiàng)性能優(yōu)化,包括對(duì)矢量化用戶定義函數(shù)(Pandas UDF)的支持。這改善了與 Pandas 和 NumPy 之類庫(kù)的互操作性,使 Flink 在數(shù)據(jù)科學(xué)和 ML 工作負(fù)載方面更強(qiáng)大。

          重要變化

          • [FLINK-17339] 從 Flink 1.11 開(kāi)始,Blink planner 是 Table API/SQL中的默認(rèn)設(shè)置。自 Flink 1.10 起,SQL 客戶端已經(jīng)存在這種情況。仍支持舊的 Flink 規(guī)劃器,但未積極開(kāi)發(fā)。

          • [FLINK-5763] Savepoints 現(xiàn)在將其所有狀態(tài)包含在一個(gè)目錄中(元數(shù)據(jù)和程序狀態(tài))。這樣可以很容易地找出組成 savepoint 狀態(tài)的文件,并允許用戶通過(guò)簡(jiǎn)單地移動(dòng)目錄來(lái)重新定位 savepoint。

          • [FLINK-16408] 為了減輕對(duì) JVM metaspace 的壓力,只要任務(wù)分配了至少一個(gè)插槽,TaskExecutor就會(huì)重用用戶代碼類加載器。這會(huì)稍微改變 Flink 的恢復(fù)行為,從而不會(huì)重新加載靜態(tài)字段。

          • [FLINK-11086] Flink 現(xiàn)在支持 Hadoop 3.0.0 以上的 Hadoop 版本。請(qǐng)注意,F(xiàn)link 項(xiàng)目不提供任何更新的flink-shaded-hadoop-x jars。用戶需要通過(guò)HADOOP_CLASSPATH環(huán)境變量(推薦)或 lib/ folder 提供 Hadoop 依賴項(xiàng)。

          • [FLINK-16963] Flink 隨附的所有MetricReporters均已轉(zhuǎn)換為插件。這些不再應(yīng)該放在/lib中(可能導(dǎo)致依賴沖突),而應(yīng)該放在/plugins/< some_directory>中。

          • [FLINK-12639] Flink 文檔正在做一些返工,因此從 Flink 1.11 開(kāi)始,內(nèi)容的導(dǎo)航和組織會(huì)有所變化。

          官方原文: https://flink.apache.org/news/2020/07/06/release-1.11.0.html

          6.1. Table & SQL 支持 Change Data Capture(CDC)

          CDC 被廣泛使用在復(fù)制數(shù)據(jù)、更新緩存、微服務(wù)間同步數(shù)據(jù)、審計(jì)日志等場(chǎng)景,很多公司都在使用開(kāi)源的 CDC 工具,如 MySQL CDC。通過(guò) Flink 支持在 Table & SQL 中接入和解析 CDC 是一個(gè)強(qiáng)需求,在過(guò)往的很多討論中都被提及過(guò),可以幫助用戶以實(shí)時(shí)的方式處理 changelog 流,進(jìn)一步擴(kuò)展 Flink 的應(yīng)用場(chǎng)景,例如把 MySQL 中的數(shù)據(jù)同步到 PG 或 ElasticSearch 中,低延時(shí)的 temporal join 一個(gè) changelog 等。

          除了考慮到上面的真實(shí)需求,F(xiàn)link 中定義的“Dynamic Table”概念在流上有兩種模型:append 模式和 update 模式。通過(guò) append 模式把流轉(zhuǎn)化為“Dynamic Table”在之前的版本中已經(jīng)支持,因此在 1.11.0 中進(jìn)一步支持 update 模式也從概念層面完整的實(shí)現(xiàn)了“Dynamic Table”。

          為了支持解析和輸出 changelog,如何在外部系統(tǒng)和 Flink 系統(tǒng)之間編解碼這些更新操作是首要解決的問(wèn)題??紤]到 source 和 sink 是銜接外部系統(tǒng)的一個(gè)橋梁,因此 FLIP-95 在定義全新的 Table source 和 Table sink 接口時(shí)解決了這個(gè)問(wèn)題。

          在公開(kāi)的 CDC 調(diào)研報(bào)告中,Debezium 和 Canal 是用戶中最流行使用的 CDC 工具,這兩種工具用來(lái)同步 changelog 到其它的系統(tǒng)中,如消息隊(duì)列。據(jù)此,F(xiàn)LIP-105 首先支持了 Debezium 和 Canal 這兩種格式,而且 Kafka source 也已經(jīng)可以支持解析上述格式并輸出更新事件,在后續(xù)的版本中會(huì)進(jìn)一步支持 Avro(Debezium) 和 Protobuf(Canal)。

          CREATE TABLE my_table (  
          ...) WITH (
          'connector'='...', -- e.g. 'kafka'
          'format'='debezium-json',
          'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)
          'debezium-json.ignore-parse-errors'='true' -- default: false
          );

          6.2. Table & SQL 支持 JDBC Catalog

          1.11.0 之前,用戶如果依賴 Flink 的 source/sink 讀寫關(guān)系型數(shù)據(jù)庫(kù)或讀取 changelog 時(shí),必須要手動(dòng)創(chuàng)建對(duì)應(yīng)的 schema。而且當(dāng)數(shù)據(jù)庫(kù)中的 schema 發(fā)生變化時(shí),也需要手動(dòng)更新對(duì)應(yīng)的 Flink 作業(yè)以保持一致和類型匹配,任何不匹配都會(huì)造成運(yùn)行時(shí)報(bào)錯(cuò)使作業(yè)失敗。用戶經(jīng)常抱怨這個(gè)看似冗余且繁瑣的流程,體驗(yàn)極差。

          實(shí)際上對(duì)于任何和 Flink 連接的外部系統(tǒng)都可能有類似的上述問(wèn)題,在 1.11.0 中重點(diǎn)解決了和關(guān)系型數(shù)據(jù)庫(kù)對(duì)接的這個(gè)問(wèn)題。FLIP-93 提供了 JDBC catalog 的基礎(chǔ)接口以及 Postgres catalog 的實(shí)現(xiàn),這樣方便后續(xù)實(shí)現(xiàn)與其它類型的關(guān)系型數(shù)據(jù)庫(kù)的對(duì)接。

          1.11.0 版本后,用戶使用 Flink SQL 時(shí)可以自動(dòng)獲取表的 schema 而不再需要輸入 DDL。除此之外,任何 schema 不匹配的錯(cuò)誤都會(huì)在編譯階段提前進(jìn)行檢查報(bào)錯(cuò),避免了之前運(yùn)行時(shí)報(bào)錯(cuò)造成的作業(yè)失敗。這是提升易用性和用戶體驗(yàn)的一個(gè)典型例子。

          6.3. Hive 實(shí)時(shí)數(shù)倉(cāng)

          從 1.9.0 版本開(kāi)始 Flink 從生態(tài)角度致力于集成 Hive,目標(biāo)打造批流一體的 Hive 數(shù)倉(cāng)。經(jīng)過(guò)前兩個(gè)版本的迭代,已經(jīng)達(dá)到了 batch 兼容且生產(chǎn)可用,在 TPC-DS 10T benchmark 下性能達(dá)到 Hive 3.0 的 7 倍以上。

          1.11.0 在 Hive 生態(tài)中重點(diǎn)實(shí)現(xiàn)了實(shí)時(shí)數(shù)倉(cāng)方案,改善了端到端流式 ETL 的用戶體驗(yàn),達(dá)到了批流一體 Hive 數(shù)倉(cāng)的目標(biāo)。同時(shí)在兼容性、性能、易用性方面也進(jìn)一步進(jìn)行了加強(qiáng)。

          在實(shí)時(shí)數(shù)倉(cāng)的解決方案中,憑借 Flink 的流式處理優(yōu)勢(shì)做到實(shí)時(shí)讀寫 Hive:

          • Hive 寫入:FLIP-115 完善擴(kuò)展了 FileSystem connector 的基礎(chǔ)能力和實(shí)現(xiàn),Table/SQL 層的 sink 可以支持各種格式(CSV、Json、Avro、Parquet、ORC),而且支持 Hive table 的所有格式。

          • Partition 支持:數(shù)據(jù)導(dǎo)入 Hive 引入 partition 提交機(jī)制來(lái)控制可見(jiàn)性,通過(guò)sink.partition-commit.trigger 控制 partition 提交的時(shí)機(jī),通過(guò) sink.partition-commit.policy.kind 選擇提交策略,支持 SUCCESS 文件和 metastore 提交。

          • Hive 讀?。簩?shí)時(shí)化的流式讀取 Hive,通過(guò)監(jiān)控 partition 生成增量讀取新 partition,或者監(jiān)控文件夾內(nèi)新文件生成來(lái)增量讀取新文件。在 Hive 可用性方面的提升:

          • FLIP-123 通過(guò) Hive Dialect 為用戶提供語(yǔ)法兼容,這樣用戶無(wú)需在 Flink 和 Hive 的 CLI 之間切換,可以直接遷移 Hive 腳本到 Flink 中執(zhí)行。

          • 提供 Hive 相關(guān)依賴的內(nèi)置支持,避免用戶自己下載所需的相關(guān)依賴?,F(xiàn)在只需要單獨(dú)下載一個(gè)包,配置 HADOOP_CLASSPATH 就可以運(yùn)行。

          • 在 Hive 性能方面,1.10.0 中已經(jīng)支持了 ORC(Hive 2+)的向量化讀取,1.11.0 中我們補(bǔ)全了所有版本的 Parquet 和 ORC 向量化支持來(lái)提升性能。

          6.4. 全新 Source API

          前面也提到過(guò),source 和 sink 是 Flink 對(duì)接外部系統(tǒng)的一個(gè)橋梁,對(duì)于完善生態(tài)、可用性及端到端的用戶體驗(yàn)是很重要的環(huán)節(jié)。社區(qū)早在一年前就已經(jīng)規(guī)劃了 source 端的徹底重構(gòu),從 FLIP-27 的 ID 就可以看出是很早的一個(gè) feature。但是由于涉及到很多復(fù)雜的內(nèi)部機(jī)制和考慮到各種 source connector 的實(shí)現(xiàn),設(shè)計(jì)上需要考慮的很全面。從 1.10.0 就開(kāi)始做 POC 的實(shí)現(xiàn),最終趕上了 1.11.0 版本的發(fā)布。

          先簡(jiǎn)要回顧下 source 之前的主要問(wèn)題:

          對(duì)用戶而言,在 Flink 中改造已有的 source 或者重新實(shí)現(xiàn)一個(gè)生產(chǎn)級(jí)的 source connector 不是一件容易的事情,具體體現(xiàn)在沒(méi)有公共的代碼可以復(fù)用,而且需要理解很多 Flink 內(nèi)部細(xì)節(jié)以及實(shí)現(xiàn)具體的 event time 分配、watermark 產(chǎn)出、idleness 監(jiān)測(cè)、線程模型等。

          批和流的場(chǎng)景需要實(shí)現(xiàn)不同的 source。

          partitions/splits/shards 概念在接口中沒(méi)有顯式表達(dá),比如 split 的發(fā)現(xiàn)邏輯和數(shù)據(jù)消費(fèi)都耦合在 source function 的實(shí)現(xiàn)中,這樣在實(shí)現(xiàn) Kafka 或 Kinesis 類型的 source 時(shí)增加了復(fù)雜性。

          在 runtime 執(zhí)行層,checkpoint 鎖被 source function 搶占會(huì)帶來(lái)一系列問(wèn)題,框架很難進(jìn)行優(yōu)化。

          FLIP-27 在設(shè)計(jì)時(shí)充分考慮了上述的痛點(diǎn):

          • 首先在 Job Manager 和 Task Manager 中分別引入兩種不同的組件 Split Enumerator 和 Source reader,解耦 split 發(fā)現(xiàn)和對(duì)應(yīng)的消費(fèi)處理,同時(shí)方便隨意組合不同的策略。比如現(xiàn)有的 Kafka connector 中有多種不同的 partition 發(fā)現(xiàn)策略和實(shí)現(xiàn)耦合在一起,在新的架構(gòu)下,我們只需要實(shí)現(xiàn)一種 source reader,就可以適配多種 split enumerator 的實(shí)現(xiàn)來(lái)對(duì)應(yīng)不同的 partition 發(fā)現(xiàn)策略。

          • 在新架構(gòu)下實(shí)現(xiàn)的 source connector 可以做到批流統(tǒng)一,唯一的小區(qū)別是對(duì)批場(chǎng)景的有限輸入,split enumerator 會(huì)產(chǎn)出固定數(shù)量的 split 集合并且每個(gè) split 都是有限數(shù)據(jù)集;對(duì)于流場(chǎng)景的無(wú)限輸入,split enumerator 要么產(chǎn)出無(wú)限多的 split 或者 split 自身是無(wú)限數(shù)據(jù)集。

          • 復(fù)雜的 timestamp assigner 以及 watermark generator 透明的內(nèi)置在 source reader 模塊內(nèi)運(yùn)行,對(duì)用戶來(lái)說(shuō)是無(wú)感知的。這樣用戶如果想實(shí)現(xiàn)新的 source connector,一般不再需要重復(fù)實(shí)現(xiàn)這部分功能。

          目前 Flink 已有的 source connector 會(huì)在后續(xù)的版本中基于新架構(gòu)來(lái)重新實(shí)現(xiàn),legacy source 也會(huì)繼續(xù)維護(hù)幾個(gè)版本保持兼容性,用戶也可以按照 release 文檔中的說(shuō)明來(lái)嘗試體驗(yàn)新 source 的開(kāi)發(fā)。

          6.5. PyFlink 生態(tài)

          眾所周知,Python 語(yǔ)言在機(jī)器學(xué)習(xí)和數(shù)據(jù)分析領(lǐng)域有著廣泛的使用。Flink 從 1.9.0 版本開(kāi)始發(fā)力兼容 Python 生態(tài),Python 和 Flink 合力為 PyFlink,把 Flink 的實(shí)時(shí)分布式處理能力輸出給 Python 用戶。前兩個(gè)版本 PyFlink 已經(jīng)支持了 Python Table API 和 UDF,在 1.11.0 中擴(kuò)大對(duì) Python 生態(tài)庫(kù) Pandas 的支持以及和 SQL DDL/Client 的集成,同時(shí) Python UDF 性能有了極大的提升。

          具體來(lái)說(shuō),之前普通的 Python UDF 每次調(diào)用只能處理一條數(shù)據(jù),而且在 Java 端和 Python 端都需要序列化/反序列化,開(kāi)銷很大。1.11.0 中 Flink 支持在 Table & SQL 作業(yè)中自定義和使用向量化 Python UDF,用戶只需要在 UDF 修飾中額外增加一個(gè)參數(shù) udf_type=“pandas” 即可。這樣帶來(lái)的好處是:

          • 每次調(diào)用可以處理 N 條數(shù)據(jù)。

          • 數(shù)據(jù)格式基于 Apache Arrow,大大降低了 Java、Python 進(jìn)程之間的序列化/反序列化開(kāi)銷。

          • 方便 Python 用戶基于 Numpy 和 Pandas 等數(shù)據(jù)分析領(lǐng)域常用的 Python 庫(kù),開(kāi)發(fā)高性能的 Python UDF。

          除此之外,1.11.0 中 PyFlink 還支持:

          • PyFlink table 和 Pandas DataFrame 之間無(wú)縫切換(FLIP-120),增強(qiáng) Pandas 生態(tài)的易用性和兼容性。

          • Table & SQL 中可以定義和使用 Python UDTF(FLINK-14500),不再必需 Java/Scala UDTF。

          • Cython 優(yōu)化 Python UDF 的性能(FLIP-121),對(duì)比 1.10.0 可以提升 30 倍。

          • Python UDF 中用戶自定義 metric(FLIP-112),方便監(jiān)控和調(diào)試 UDF 的執(zhí)行。

          上述解讀的都是側(cè)重 API 層面,用戶開(kāi)發(fā)作業(yè)可以直接感知到的易用性的提升。下面我們看看執(zhí)行引擎層在 1.11.0 中都有哪些值得關(guān)注的變化。

          6.6. 生產(chǎn)可用性和穩(wěn)定性提升

          6.6.1 支持 application 模式和 Kubernetes 增強(qiáng)

          1.11.0 版本前,F(xiàn)link 主要支持如下兩種模式運(yùn)行:

          Session 模式:提前啟動(dòng)一個(gè)集群,所有作業(yè)都共享這個(gè)集群的資源運(yùn)行。優(yōu)勢(shì)是避免每個(gè)作業(yè)單獨(dú)啟動(dòng)集群帶來(lái)的額外開(kāi)銷,缺點(diǎn)是隔離性稍差。如果一個(gè)作業(yè)把某個(gè) Task Manager(TM)容器搞掛,會(huì)導(dǎo)致這個(gè)容器內(nèi)的所有作業(yè)都跟著重啟。雖然每個(gè)作業(yè)有自己獨(dú)立的 Job Manager(JM)來(lái)管理,但是這些 JM 都運(yùn)行在一個(gè)進(jìn)程中,容易帶來(lái)負(fù)載上的瓶頸。

          Per-job 模式:為了解決 session 模式隔離性差的問(wèn)題,每個(gè)作業(yè)根據(jù)資源需求啟動(dòng)獨(dú)立的集群,每個(gè)作業(yè)的 JM 也是運(yùn)行在獨(dú)立的進(jìn)程中,負(fù)載相對(duì)小很多。

          以上兩種模式的共同問(wèn)題是需要在客戶端執(zhí)行用戶代碼,編譯生成對(duì)應(yīng)的 Job Graph 提交到集群運(yùn)行。在這個(gè)過(guò)程需要下載相關(guān) jar 包并上傳到集群,客戶端和網(wǎng)絡(luò)負(fù)載壓力容易成為瓶頸,尤其當(dāng)一個(gè)客戶端被多個(gè)用戶共享使用。

          1.11.0 中引入了 application 模式(FLIP-85)來(lái)解決上述問(wèn)題,按照 application 粒度來(lái)啟動(dòng)一個(gè)集群,屬于這個(gè) application 的所有 job 在這個(gè)集群中運(yùn)行。核心是 Job Graph 的生成以及作業(yè)的提交不在客戶端執(zhí)行,而是轉(zhuǎn)移到 JM 端執(zhí)行,這樣網(wǎng)絡(luò)下載上傳的負(fù)載也會(huì)分散到集群中,不再有上述 client 單點(diǎn)上的瓶頸。

          用戶可以通過(guò) bin/flink run-application 來(lái)使用 application 模式,目前 Yarn 和 Kubernetes(K8s)都已經(jīng)支持這種模式。Yarn application 會(huì)在客戶端將運(yùn)行作業(yè)需要的依賴都通過(guò) Yarn Local Resource 傳遞到 JM。K8s application 允許用戶構(gòu)建包含用戶 jar 與依賴的鏡像,同時(shí)會(huì)根據(jù)作業(yè)自動(dòng)創(chuàng)建 TM,并在結(jié)束后銷毀整個(gè)集群,相比 session 模式具有更好的隔離性。K8s 不再有嚴(yán)格意義上的 per-job 模式,application 模式相當(dāng)于 per-job 在集群進(jìn)行提交作業(yè)的實(shí)現(xiàn)。

          除了支持 application 模式,F(xiàn)link 原生 K8s 在 1.11.0 中還完善了很多基礎(chǔ)的功能特性(FLINK-14460),以達(dá)到生產(chǎn)可用性的標(biāo)準(zhǔn)。例如 Node Selector、Label、Annotation、Toleration 等。為了更方便的與 Hadoop 集成,也支持根據(jù)環(huán)境變量自動(dòng)掛載 Hadoop 配置的功能。

          6.6.2 Checkpoint & Savepoint 優(yōu)化

          checkpoint 和 savepoint 機(jī)制一直是 Flink 保持先進(jìn)性的核心競(jìng)爭(zhēng)力之一,社區(qū)在這個(gè)領(lǐng)域的改動(dòng)很謹(jǐn)慎,最近的幾個(gè)大版本中幾乎沒(méi)有大的功能和架構(gòu)上的調(diào)整。在用戶郵件列表中,我們經(jīng)常能看到用戶反饋和抱怨的相關(guān)問(wèn)題:比如 checkpoint 長(zhǎng)時(shí)間做不出來(lái)失敗,savepoint 在作業(yè)重啟后不可用等等。1.11.0 有選擇的解決了一些這方面的常見(jiàn)問(wèn)題,提高生產(chǎn)可用性和穩(wěn)定性。

          1.11.0 之前, savepoint 中 meta 數(shù)據(jù)和 state 數(shù)據(jù)分別保存在兩個(gè)不同的目錄中,這樣如果想遷移 state 目錄很難識(shí)別這種映射關(guān)系,也可能導(dǎo)致目錄被誤刪除,對(duì)于目錄清理也同樣有麻煩。1.11.0 把兩部分?jǐn)?shù)據(jù)整合到一個(gè)目錄下,這樣方便整體轉(zhuǎn)移和復(fù)用。另外,之前 meta 引用 state 采用的是絕對(duì)路徑,這樣 state 目錄遷移后路徑發(fā)生變化也不可用,1.11.0 把 state 引用改成了相對(duì)路徑解決了這個(gè)問(wèn)題(FLINK-5763),這樣 savepoint 的管理維護(hù)、復(fù)用更加靈活方便。

          實(shí)際生產(chǎn)環(huán)境中,用戶經(jīng)常遭遇 checkpoint 超時(shí)失敗、長(zhǎng)時(shí)間不能完成帶來(lái)的困擾。一旦作業(yè) failover 會(huì)造成回放大量的歷史數(shù)據(jù),作業(yè)長(zhǎng)時(shí)間沒(méi)有進(jìn)度,端到端的延遲增加。1.11.0 從不同維度對(duì) checkpoint 的優(yōu)化和提速做了改進(jìn),目標(biāo)實(shí)現(xiàn)分鐘甚至秒級(jí)的輕量型 checkpoint。

          首先,增加了 Checkpoint Coordinator 通知 task 取消 checkpoint 的機(jī)制(FLINK-8871),這樣避免 task 端還在執(zhí)行已經(jīng)取消的 checkpoint 而對(duì)系統(tǒng)帶來(lái)不必要的壓力。同時(shí) task 端放棄已經(jīng)取消的 checkpoint,可以更快的參與執(zhí)行 coordinator 新觸發(fā)的 checkpoint,某種程度上也可以避免新 checkpoint 再次執(zhí)行超時(shí)而失敗。這個(gè)優(yōu)化也對(duì)后面默認(rèn)開(kāi)啟 local recovery 提供了便利,task 端可以及時(shí)清理失效 checkpoint 的資源。

          • 在反壓場(chǎng)景下,整個(gè)數(shù)據(jù)鏈路堆積了大量 buffer,導(dǎo)致 checkpoint barrier 排在數(shù)據(jù) buffer 后面,不能被 task 及時(shí)處理對(duì)齊,也就導(dǎo)致了 checkpoint 長(zhǎng)時(shí)間不能執(zhí)行。1.11.0 中從兩個(gè)維度對(duì)這個(gè)問(wèn)題進(jìn)行解決:

          1)嘗試減少數(shù)據(jù)鏈路中的 buffer 總量(FLINK-16428),這樣 checkpoint barrier 可以盡快被處理對(duì)齊。

          上游輸出端控制單個(gè) sub partition 堆積 buffer 的最大閾值(backlog),避免負(fù)載不均場(chǎng)景下單個(gè)鏈路上堆積大量 buffer。在不影響網(wǎng)絡(luò)吞吐性能的情況下合理修改上下游默認(rèn)的 buffer 配置。上下游數(shù)據(jù)傳輸?shù)幕A(chǔ)協(xié)議進(jìn)行了調(diào)整,允許單個(gè)數(shù)據(jù)鏈路可以配置 0 個(gè)獨(dú)占 buffer 而不死鎖,這樣總的 buffer 數(shù)量和作業(yè)并發(fā)規(guī)模解耦。根據(jù)實(shí)際需求在吞吐性能和 checkpoint 速度兩者之間權(quán)衡,自定義 buffer 配比。這個(gè)優(yōu)化有一部分工作已經(jīng)在 1.11.0 中完成,剩余部分會(huì)在下個(gè)版本繼續(xù)推進(jìn)完成。

          2)實(shí)現(xiàn)了全新的 unaligned checkpoint 機(jī)制(FLIP-76)從根本上解決了反壓場(chǎng)景下 checkpoint barrier 對(duì)齊的問(wèn)題。

          實(shí)際上這個(gè)想法早在 1.10.0 版本之前就開(kāi)始醞釀設(shè)計(jì),由于涉及到很多模塊的大改動(dòng),實(shí)現(xiàn)機(jī)制和線程模型也很復(fù)雜。我們實(shí)現(xiàn)了兩種不同方案的原型 POC 進(jìn)行了測(cè)試、性能對(duì)比,確定了最終的方案,因此直到 1.11.0 才完成了 MVP 版本,這也是 1.11.0 中執(zhí)行引擎層唯一的一個(gè)重量級(jí) feature。其基本思想可以概括為:

          Checkpoint barrier 跨數(shù)據(jù) buffer 傳輸,不在輸入輸出隊(duì)列排隊(duì)等待處理,這樣就和算子的計(jì)算能力解耦,barrier 在節(jié)點(diǎn)之間的傳輸只有網(wǎng)絡(luò)延時(shí),可以忽略不計(jì)。每個(gè)算子多個(gè)輸入鏈路之間不需要等待 barrier 對(duì)齊來(lái)執(zhí)行 checkpoint,第一個(gè)到的 barrier 就可以提前觸發(fā) checkpoint,這樣可以進(jìn)一步提速 checkpoint,不會(huì)因?yàn)閭€(gè)別鏈路的延遲而影響整體。

          為了和之前 aligned checkpoint 的語(yǔ)義保持一致,所有未被處理的輸入輸出數(shù)據(jù) buffer 都將作為 channel state 在 checkpoint 執(zhí)行時(shí)進(jìn)行快照持久化,在 failover 時(shí)連同 operator state 一同進(jìn)行恢復(fù)。

          換句話說(shuō),aligned 機(jī)制保證的是 barrier 前面所有數(shù)據(jù)必須被處理完,狀態(tài)實(shí)時(shí)體現(xiàn)到 operator state 中;而 unaligned 機(jī)制把 barrier 前面的未處理數(shù)據(jù)所反映的 operator state 延后到 failover restart 時(shí)通過(guò) channel state 回放進(jìn)行體現(xiàn),從狀態(tài)恢復(fù)的角度來(lái)說(shuō)最終都是一致的。 注意這里雖然引入了額外的 in-flight buffer 的持久化,但是這個(gè)過(guò)程實(shí)際是在 checkpoint 的異步階段完成的,同步階段只是進(jìn)行了輕量級(jí)的 buffer 引用,所以不會(huì)過(guò)多占用算子的計(jì)算時(shí)間而影響吞吐性能。

          Unaligned checkpoint 在反壓嚴(yán)重的場(chǎng)景下可以明顯加速 checkpoint 的完成時(shí)間,因?yàn)樗辉僖蕾囉谡w的計(jì)算吞吐能力,而和系統(tǒng)的存儲(chǔ)性能更加相關(guān),相當(dāng)于計(jì)算和存儲(chǔ)的解耦。但是它的使用也有一定的局限性,它會(huì)增加整體 state 的大小,對(duì)存儲(chǔ) IO 帶來(lái)額外的開(kāi)銷,因此在 IO 已經(jīng)是瓶頸的場(chǎng)景下就不太適合使用 unaligned checkpoint 機(jī)制。

          1.11.0 中 unaligned checkpoint 還沒(méi)有作為默認(rèn)模式,需要用戶手動(dòng)配置來(lái)開(kāi)啟,并且只在 exactly-once 模式下生效。但目前還不支持 savepoint 模式,因?yàn)?savepoint 涉及到作業(yè)的 rescale 場(chǎng)景,channel state 目前還不支持 state 拆分,在后面的版本會(huì)進(jìn)一步支持,所以 savepoint 目前還是會(huì)使用之前的 aligned 模式,在反壓場(chǎng)景下有可能需要很長(zhǎng)時(shí)間才能完成。

          引用文章: https://developer.aliyun.com/article/767711

          七 .Flink 1.12 版本 [重要版本]

          • 在 DataStream API 上添加了高效的批執(zhí)行模式的支持。這是批處理和流處理實(shí)現(xiàn)真正統(tǒng)一的運(yùn)行時(shí)的一個(gè)重要里程碑。

          • 實(shí)現(xiàn)了基于Kubernetes的高可用性(HA)方案,作為生產(chǎn)環(huán)境中,ZooKeeper方案之外的另外一種選擇。

          • 擴(kuò)展了 Kafka SQL connector,使其可以在 upsert 模式下工作,并且支持在 SQL DDL 中處理 connector 的 metadata?,F(xiàn)在,時(shí)態(tài)表 Join 可以完全用 SQL 來(lái)表示,不再依賴于 Table API 了。

          • PyFlink 中添加了對(duì)于 DataStream API 的支持,將 PyFlink 擴(kuò)展到了更復(fù)雜的場(chǎng)景,比如需要對(duì)狀態(tài)或者定時(shí)器 timer 進(jìn)行細(xì)粒度控制的場(chǎng)景。除此之外,現(xiàn)在原生支持將 PyFlink 作業(yè)部署到 Kubernetes上。

          7.1. DataStream API 支持批執(zhí)行模式

          Flink 的核心 API 最初是針對(duì)特定的場(chǎng)景設(shè)計(jì)的,盡管 Table API / SQL 針對(duì)流處理和批處理已經(jīng)實(shí)現(xiàn)了統(tǒng)一的 API,但當(dāng)用戶使用較底層的 API 時(shí),仍然需要在批處理(DataSet API)和流處理(DataStream API)這兩種不同的 API 之間進(jìn)行選擇。鑒于批處理是流處理的一種特例,將這兩種 API 合并成統(tǒng)一的 API,有一些非常明顯的好處,比如:

          • 可復(fù)用性:作業(yè)可以在流和批這兩種執(zhí)行模式之間自由地切換,而無(wú)需重寫任何代碼。因此,用戶可以復(fù)用同一個(gè)作業(yè),來(lái)處理實(shí)時(shí)數(shù)據(jù)和歷史數(shù)據(jù)。

          • 維護(hù)簡(jiǎn)單:統(tǒng)一的 API 意味著流和批可以共用同一組 connector,維護(hù)同一套代碼,并能夠輕松地實(shí)現(xiàn)流批混合執(zhí)行,例如 backfilling 之類的場(chǎng)景。

          考慮到這些優(yōu)點(diǎn),社區(qū)已朝著流批統(tǒng)一的 DataStream API 邁出了第一步:支持高效的批處理(FLIP-134)。從長(zhǎng)遠(yuǎn)來(lái)看,這意味著 DataSet API 將被棄用(FLIP-131),其功能將被包含在 DataStream API 和 Table API / SQL 中。

          • 有限流上的批處理

          您已經(jīng)可以使用 DataStream API 來(lái)處理有限流(例如文件)了,但需要注意的是,運(yùn)行時(shí)并不“知道”作業(yè)的輸入是有限的。為了優(yōu)化在有限流情況下運(yùn)行時(shí)的執(zhí)行性能,新的 BATCH 執(zhí)行模式,對(duì)于聚合操作,全部在內(nèi)存中進(jìn)行,且使用 sort-based shuffle(FLIP-140)和優(yōu)化過(guò)的調(diào)度策略(請(qǐng)參見(jiàn) Pipelined Region Scheduling 了解更多詳細(xì)信息)。因此,DataStream API 中的 BATCH 執(zhí)行模式已經(jīng)非常接近 Flink 1.12 中 DataSet API 的性能。有關(guān)性能的更多詳細(xì)信息,請(qǐng)查看 FLIP-140。

          在 Flink 1.12 中,默認(rèn)執(zhí)行模式為 STREAMING,要將作業(yè)配置為以 BATCH 模式運(yùn)行,可以在提交作業(yè)的時(shí)候,設(shè)置參數(shù) execution.runtime-mode:

          $ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar

          或者通過(guò)編程的方式:

          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          env.setRuntimeMode(RuntimeMode.BATCH);

          注意:盡管 DataSet API 尚未被棄用,但我們建議用戶優(yōu)先使用具有 BATCH 執(zhí)行模式的 DataStream API 來(lái)開(kāi)發(fā)新的批作業(yè),并考慮遷移現(xiàn)有的 DataSet 作業(yè)。

          7.2. 新的 Data Sink API (Beta)

          之前發(fā)布的 Flink 版本中[1],已經(jīng)支持了 source connector 工作在流批兩種模式下,因此在 Flink 1.12 中,社區(qū)著重實(shí)現(xiàn)了統(tǒng)一的 Data Sink API(FLIP-143)。新的抽象引入了 write/commit 協(xié)議和一個(gè)更加模塊化的接口。Sink 的實(shí)現(xiàn)者只需要定義 what 和 how:SinkWriter,用于寫數(shù)據(jù),并輸出需要 commit 的內(nèi)容(例如,committables);Committer 和 GlobalCommitter,封裝了如何處理 committables。框架會(huì)負(fù)責(zé) when 和 where:即在什么時(shí)間,以及在哪些機(jī)器或進(jìn)程中 commit。

          這種模塊化的抽象允許為 BATCH 和 STREAMING 兩種執(zhí)行模式,實(shí)現(xiàn)不同的運(yùn)行時(shí)策略,以達(dá)到僅使用一種 sink 實(shí)現(xiàn),也可以使兩種模式都可以高效執(zhí)行。Flink 1.12 中,提供了統(tǒng)一的 FileSink connector,以替換現(xiàn)有的 StreamingFileSink connector (FLINK-19758)。其它的 connector 也將逐步遷移到新的接口。

          7.3. 基于 Kubernetes 的高可用 (HA) 方案

          Flink 可以利用 Kubernetes 提供的內(nèi)置功能來(lái)實(shí)現(xiàn) JobManager 的 failover,而不用依賴 ZooKeeper。為了實(shí)現(xiàn)不依賴于 ZooKeeper 的高可用方案,社區(qū)在 Flink 1.12(FLIP-144)中實(shí)現(xiàn)了基于 Kubernetes 的高可用方案。該方案與 ZooKeeper 方案基于相同的接口[3],并使用 Kubernetes 的 ConfigMap[4] 對(duì)象來(lái)處理從 JobManager 的故障中恢復(fù)所需的所有元數(shù)據(jù)。關(guān)于如何配置高可用的 standalone 或原生 Kubernetes 集群的更多詳細(xì)信息和示例,請(qǐng)查閱文檔[5]。

          注意:需要注意的是,這并不意味著 ZooKeeper 將被刪除,這只是為 Kubernetes 上的 Flink 用戶提供了另外一種選擇。

          7.4. 其它功能改進(jìn)

          • 將現(xiàn)有的 connector 遷移到新的 Data Source API

          在之前的版本中,F(xiàn)link 引入了新的 Data Source API(FLIP-27),以允許實(shí)現(xiàn)同時(shí)適用于有限數(shù)據(jù)(批)作業(yè)和無(wú)限數(shù)據(jù)(流)作業(yè)使用的 connector 。在 Flink 1.12 中,社區(qū)從 FileSystem connector(FLINK-19161)出發(fā),開(kāi)始將現(xiàn)有的 source connector 移植到新的接口。

          注意: 新的 source 實(shí)現(xiàn),是完全不同的實(shí)現(xiàn),與舊版本的實(shí)現(xiàn)不兼容。

          • Pipelined Region 調(diào)度 (FLIP-119)

          在之前的版本中,F(xiàn)link 對(duì)于批作業(yè)和流作業(yè)有兩套獨(dú)立的調(diào)度策略。Flink 1.12 版本中,引入了統(tǒng)一的調(diào)度策略, 該策略通過(guò)識(shí)別 blocking 數(shù)據(jù)傳輸邊,將 ExecutionGraph 分解為多個(gè) pipelined region。這樣一來(lái),對(duì)于一個(gè) pipelined region 來(lái)說(shuō),僅當(dāng)有數(shù)據(jù)時(shí)才調(diào)度它,并且僅在所有其所需的資源都被滿足時(shí)才部署它;同時(shí)也可以支持獨(dú)立地重啟失敗的 region。對(duì)于批作業(yè)來(lái)說(shuō),新策略可顯著地提高資源利用率,并消除死鎖。

          • 支持 Sort-Merge Shuffle (FLIP-148)

          為了提高大規(guī)模批作業(yè)的穩(wěn)定性、性能和資源利用率,社區(qū)引入了 sort-merge shuffle,以替代 Flink 現(xiàn)有的實(shí)現(xiàn)。這種方案可以顯著減少 shuffle 的時(shí)間,并使用較少的文件句柄和文件寫緩存(這對(duì)于大規(guī)模批作業(yè)的執(zhí)行非常重要)。在后續(xù)版本中(FLINK-19614),F(xiàn)link 會(huì)進(jìn)一步優(yōu)化相關(guān)性能。

          注意:該功能是實(shí)驗(yàn)性的,在 Flink 1.12 中默認(rèn)情況下不啟用。要啟用 sort-merge shuffle,需要在 TaskManager 的網(wǎng)絡(luò)配置[6]中設(shè)置合理的最小并行度。

          • Flink WebUI 的改進(jìn) (FLIP-75)

          作為對(duì)上一個(gè)版本中,F(xiàn)link WebUI 一系列改進(jìn)的延續(xù),F(xiàn)link 1.12 在 WebUI 上暴露了 JobManager 內(nèi)存相關(guān)的指標(biāo)和配置參數(shù)(FLIP-104)。對(duì)于 TaskManager 的指標(biāo)頁(yè)面也進(jìn)行了更新,為 Managed Memory、Network Memory 和 Metaspace 添加了新的指標(biāo),以反映自 Flink 1.10(FLIP-102)開(kāi)始引入的 TaskManager 內(nèi)存模型的更改[7]。

          7.5. Table API/SQL 變更

          7.5.1. SQL Connectors 中的 Metadata 處理

          如果可以將某些 source(和 format)的元數(shù)據(jù)作為額外字段暴露給用戶,對(duì)于需要將元數(shù)據(jù)與記錄數(shù)據(jù)一起處理的用戶來(lái)說(shuō)很有意義。一個(gè)常見(jiàn)的例子是 Kafka,用戶可能需要訪問(wèn) offset、partition 或 topic 信息、讀寫 kafka 消息中的 key 或 使用消息 metadata中的時(shí)間戳進(jìn)行時(shí)間相關(guān)的操作。

          在 Flink 1.12 中,F(xiàn)link SQL 支持了元數(shù)據(jù)列用來(lái)讀取和寫入每行數(shù)據(jù)中 connector 或 format 相關(guān)的列(FLIP-107)。這些列在 CREATE TABLE 語(yǔ)句中使用 METADATA(保留)關(guān)鍵字來(lái)聲明。

          CREATE TABLE kafka_table (
          id BIGINT,
          name STRING,
          event_time TIMESTAMP(3) METADATA FROM 'timestamp', -- access Kafka 'timestamp' metadata
          headers MAP METADATA -- access Kafka 'headers' metadata
          ) WITH (
          'connector' = 'kafka',
          'topic' = 'test-topic',
          'format' = 'avro'
          );

          在 Flink 1.12 中,已經(jīng)支持 Kafka 和 Kinesis connector 的元數(shù)據(jù),并且 FileSystem connector 上的相關(guān)工作也已經(jīng)在計(jì)劃中(FLINK-19903)。由于 Kafka record 的結(jié)構(gòu)比較復(fù)雜,社區(qū)還專門為 Kafka connector 實(shí)現(xiàn)了新的屬性[8],以控制如何處理鍵/值對(duì)。關(guān)于 Flink SQL 中元數(shù)據(jù)支持的完整描述,請(qǐng)查看每個(gè) connector 的文檔[9]以及 FLIP-107 中描述的用例。

          7.5.2. Upsert Kafka Connector

          在某些場(chǎng)景中,例如讀取 compacted topic 或者輸出(更新)聚合結(jié)果的時(shí)候,需要將 Kafka 消息記錄的 key 當(dāng)成主鍵處理,用來(lái)確定一條數(shù)據(jù)是應(yīng)該作為插入、刪除還是更新記錄來(lái)處理。為了實(shí)現(xiàn)該功能,社區(qū)為 Kafka 專門新增了一個(gè) upsert connector(upsert-kafka),該 connector 擴(kuò)展自現(xiàn)有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既可以作為 source 使用,也可以作為 sink 使用,并且提供了與現(xiàn)有的 kafka connector 相同的基本功能和持久性保證,因?yàn)閮烧咧g復(fù)用了大部分代碼。

          要使用 upsert-kafka connector,必須在創(chuàng)建表時(shí)定義主鍵,并為鍵(key.format)和值(value.format)指定序列化反序列化格式。完整的示例,請(qǐng)查看最新的文檔[10]。

          7.5.3. SQL 中 支持 Temporal Table Join

          在之前的版本中,用戶需要通過(guò)創(chuàng)建時(shí)態(tài)表函數(shù)(temporal table function) 來(lái)支持時(shí)態(tài)表 join(temporal table join) ,而在 Flink 1.12 中,用戶可以使用標(biāo)準(zhǔn)的 SQL 語(yǔ)句 FOR SYSTEM_TIME AS OF(SQL:2011)來(lái)支持 join。此外,現(xiàn)在任意包含時(shí)間列和主鍵的表,都可以作為時(shí)態(tài)表,而不僅僅是 append-only 表。這帶來(lái)了一些新的應(yīng)用場(chǎng)景,比如將 Kafka compacted topic 或數(shù)據(jù)庫(kù)變更日志(來(lái)自 Debezium 等)作為時(shí)態(tài)表。

          CREATE TABLE orders (
          order_id STRING,
          currency STRING,
          amount INT,
          order_time TIMESTAMP(3),
          WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
          ) WITH (

          );

          -- Table backed by a Kafka compacted topic
          CREATE TABLE latest_rates (
          currency STRING,
          rate DECIMAL(38, 10),
          currency_time TIMESTAMP(3),
          WATERMARK FOR currency_time AS currency_time - INTERVAL ‘5’ SECOND,
          PRIMARY KEY (currency) NOT ENFORCED
          ) WITH (
          'connector' = 'upsert-kafka',

          );

          -- Event-time temporal table join
          SELECT
          o.order_id,
          o.order_time,
          o.amount * r.rate AS amount,
          r.currency
          FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time r
          ON o.currency = r.currency;

          上面的示例同時(shí)也展示了如何在 temporal table join 中使用 Flink 1.12 中新增的 upsert-kafka connector。

          • 使用 Hive 表進(jìn)行 Temporal Table Join

          用戶也可以將 Hive 表作為時(shí)態(tài)表來(lái)使用,F(xiàn)link 既支持自動(dòng)讀取 Hive 表的最新分區(qū)作為時(shí)態(tài)表(FLINK-19644),也支持在作業(yè)執(zhí)行時(shí)追蹤整個(gè) Hive 表的最新版本作為時(shí)態(tài)表。請(qǐng)參閱文檔,了解更多關(guān)于如何在 temporal table join 中使用 Hive 表的示例。

          7.5.4. Table API/SQL 中的其它改進(jìn)

          • Kinesis Flink SQL Connector (FLINK-18858)

          從 Flink 1.12 開(kāi)始,Table API / SQL 原生支持將 Amazon Kinesis Data Streams(KDS)作為 source 和 sink 使用。新的 Kinesis SQL connector 提供了對(duì)于增強(qiáng)的Fan-Out(EFO)以及 Sink Partition 的支持。如需了解 Kinesis SQL connector 所有支持的功能、配置選項(xiàng)以及對(duì)外暴露的元數(shù)據(jù)信息,請(qǐng)查看最新的文檔。

          • 在 FileSystem/Hive connector 的流式寫入中支持小文件合并 (FLINK-19345)

          很多 bulk format,例如 Parquet,只有當(dāng)寫入的文件比較大時(shí),才比較高效。當(dāng) checkpoint 的間隔比較小時(shí),這會(huì)成為一個(gè)很大的問(wèn)題,因?yàn)闀?huì)創(chuàng)建大量的小文件。在 Flink 1.12 中,F(xiàn)ile Sink 增加了小文件合并功能,從而使得即使作業(yè) checkpoint 間隔比較小時(shí),也不會(huì)產(chǎn)生大量的文件。要開(kāi)啟小文件合并,可以按照文檔[11]中的說(shuō)明在 FileSystem connector 中設(shè)置 auto-compaction = true 屬性。

          • Kafka Connector 支持 Watermark 下推 (FLINK-20041)

          為了確保使用 Kafka 的作業(yè)的結(jié)果的正確性,通常來(lái)說(shuō),最好基于分區(qū)來(lái)生成 watermark,因?yàn)榉謪^(qū)內(nèi)數(shù)據(jù)的亂序程度通常來(lái)說(shuō)比分區(qū)之間數(shù)據(jù)的亂序程度要低很多。Flink 現(xiàn)在允許將 watermark 策略下推到 Kafka connector 里面,從而支持在 Kafka connector 內(nèi)部構(gòu)造基于分區(qū)的 watermark[12]。一個(gè) Kafka source 節(jié)點(diǎn)最終所產(chǎn)生的 watermark 由該節(jié)點(diǎn)所讀取的所有分區(qū)中的 watermark 的最小值決定,從而使整個(gè)系統(tǒng)可以獲得更好的(即更接近真實(shí)情況)的 watermark。該功能也允許用戶配置基于分區(qū)的空閑檢測(cè)策略,以防止空閑分區(qū)阻礙整個(gè)作業(yè)的 event time 增長(zhǎng)。

          新增的 Formats

          利用 Multi-input 算子進(jìn)行 Join 優(yōu)化 (FLINK-19621)

          Shuffling 是一個(gè) Flink 作業(yè)中最耗時(shí)的操作之一。為了消除不必要的序列化反序列化開(kāi)銷、數(shù)據(jù) spilling 開(kāi)銷,提升 Table API / SQL 上批作業(yè)和流作業(yè)的性能, planner 當(dāng)前會(huì)利用上一個(gè)版本中已經(jīng)引入的N元算子(FLIP-92),將由 forward 邊所連接的多個(gè)算子合并到一個(gè) Task 里執(zhí)行。

          Type Inference for Table API UDAFs (FLIP-65)

          Flink 1.12 完成了從 Flink 1.9 開(kāi)始的,針對(duì) Table API 上的新的類型系統(tǒng)[2]的工作,并在聚合函數(shù)(UDAF)上支持了新的類型系統(tǒng)。從 Flink 1.12 開(kāi)始,與標(biāo)量函數(shù)和表函數(shù)類似,聚合函數(shù)也支持了所有的數(shù)據(jù)類型。

          7.6. PyFlink: Python DataStream API

          為了擴(kuò)展 PyFlink 的可用性,F(xiàn)link 1.12 提供了對(duì)于 Python DataStream API(FLIP-130)的初步支持,該版本支持了無(wú)狀態(tài)類型的操作(例如 Map,F(xiàn)latMap,F(xiàn)ilter,KeyBy 等)。如果需要嘗試 Python DataStream API,可以安裝PyFlink,然后按照該文檔[14]進(jìn)行操作,文檔中描述了如何使用 Python DataStream API 構(gòu)建一個(gè)簡(jiǎn)單的流應(yīng)用程序。

          from pyflink.common.typeinfo import Types
          from pyflink.datastream import MapFunction, StreamExecutionEnvironment
          class MyMapFunction(MapFunction):
          def map(self, value):
          return value + 1
          env = StreamExecutionEnvironment.get_execution_environment()
          data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
          mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
          mapped_stream.print()
          env.execute("datastream job")

          7.7.PyFlink 中的其它改進(jìn)

          • PyFlink Jobs on Kubernetes (FLINK-17480)

          除了 standalone 部署和 YARN 部署之外,現(xiàn)在也原生支持將 PyFlink 作業(yè)部署在 Kubernetes 上。最新的文檔中詳細(xì)描述了如何在 Kubernetes 上啟動(dòng) session 或 application 集群。

          • 用戶自定義聚合函數(shù) (UDAFs)

          從 Flink 1.12 開(kāi)始,您可以在 PyFlink 作業(yè)中定義和使用 Python UDAF 了(FLIP-139)。普通的 UDF(標(biāo)量函數(shù))每次只能處理一行數(shù)據(jù),而 UDAF(聚合函數(shù))則可以處理多行數(shù)據(jù),用于計(jì)算多行數(shù)據(jù)的聚合值。您也可以使用 Pandas UDAF[15](FLIP-137),來(lái)進(jìn)行向量化計(jì)算(通常來(lái)說(shuō),比普通 Python UDAF 快10倍以上)。

          注意: 普通 Python UDAF,當(dāng)前僅支持在 group aggregations 以及流模式下使用。如果需要在批模式或者窗口聚合中使用,建議使用 Pandas UDAF。

          原文: https://developer.aliyun.com/article/780123

          官方原文: https://flink.apache.org/news/2020/12/10/release-1.12.0.html


          八 .Flink 1.13 版本

          概要

          這個(gè)版本是一些永久性的更新,幫助用戶更好理解Flink程序的性能。當(dāng)我們的流處理的速度并不是我們希望看到的性能的時(shí)候,這些新特性能幫助我們找到原因:數(shù)據(jù)加載和背壓圖能幫助定位性能瓶頸所在, CPU火焰圖可以定位哪些代碼是程序中的熱點(diǎn)代碼,State Access Latencies可以查看狀態(tài)的保存情況

          除了上述的特征,F(xiàn)link社區(qū)還改進(jìn)了系統(tǒng)的許多地方,其中有一些會(huì)在下面展示。

          主要功能點(diǎn)
          響應(yīng)式伸縮

          響應(yīng)式伸縮是Flink的最新功能,它使流處理應(yīng)用程序和其他應(yīng)用程序一樣自然,一樣管理簡(jiǎn)單。

          Flink的資源管理和部署具有雙重特性,我們可以將Flink應(yīng)用程序部署到K8S或者YARN等資源協(xié)調(diào)器上,這樣Flink會(huì)積極管理和分配資源,并釋放workers. 這對(duì)于快速修改Jobs或者Application的資源要求是非常有用的,比如批處理的應(yīng)用或者ad-hoc的SQL查詢。worker的數(shù)量將遵循Flink應(yīng)用的并行度。

          對(duì)于長(zhǎng)時(shí)間運(yùn)行的無(wú)限流程序,部署模式和其他長(zhǎng)期運(yùn)行的程序一樣:應(yīng)用程序不知道自己是運(yùn)行在K8S,EKS或者YARN平臺(tái)上,也不需要嘗試獲取一定數(shù)量的worker;相反,只需要提供給應(yīng)用程序worker的數(shù)量,應(yīng)用程序會(huì)根據(jù)提供的worker的數(shù)量自動(dòng)調(diào)節(jié)并行度,我們稱這種特性為響應(yīng)式伸縮。

          應(yīng)用程序的部署模式開(kāi)啟了這項(xiàng)工作,類似于其他的程序部署一樣(通過(guò)避開(kāi)兩個(gè)單獨(dú)的步驟部署:1. 開(kāi)啟一個(gè)集群 2. 提交一個(gè)應(yīng)用)。Flink的響應(yīng)式伸縮模型完成了這點(diǎn),使用者不需要使用額外的工具(腳本或者K8S命令)來(lái)保持worder的數(shù)量和程序的并行度的一致。

          現(xiàn)在可以像對(duì)待其他典型應(yīng)用程序一樣,在Flink的應(yīng)用程序中放一個(gè)自動(dòng)伸縮器。同時(shí)在配置自動(dòng)伸縮器的時(shí)候,你需要關(guān)心重新縮放的成本,因?yàn)橛袪顟B(tài)的流處理在伸縮的時(shí)候需要移動(dòng)它的狀態(tài)。

          如果你需要嘗試這個(gè)伸縮器的,需要添加scheduler-mode: reactive 這個(gè)配置到集群(只能是standalone 或者K8S集群)。詳細(xì)參考

          分析應(yīng)用程序性能

          和其他的程序一樣,分析和理解Flink應(yīng)用程序的性能是非常重要的。通常更關(guān)鍵的是,在了解性能的同時(shí)我們希望Flink能夠在(近)實(shí)時(shí)延遲內(nèi)提供結(jié)果,因?yàn)镕link應(yīng)用程序通常是數(shù)據(jù)密集型的應(yīng)用。

          當(dāng)程序處理的速度跟不上數(shù)據(jù)進(jìn)來(lái)的速度,或者應(yīng)用程序占用的資源超過(guò)了預(yù)期,下面的功能能幫助我們追蹤到原因:

          Bottleneck detection, Back Pressure monitoring

          性能分析期間的第一個(gè)問(wèn)題通常是:哪個(gè)Operation是瓶頸?

          為了幫助回答這個(gè)問(wèn)題,F(xiàn)link公開(kāi)了一些指標(biāo)來(lái)描述那些當(dāng)前處于繁忙或者背壓狀態(tài)的tasks的繁忙程度或者被壓程度(背壓是指有能力工作但不能工作,因?yàn)樗鼈兊暮罄m(xù)操作符不能接受更多數(shù)據(jù))。瓶頸所在都是那些繁忙的operators, 它們的上游operator實(shí)際承擔(dān)大數(shù)據(jù)量的壓力。

          Flink 1.13帶來(lái)了一個(gè)改進(jìn)的背壓度量系統(tǒng)(使用任務(wù)郵箱計(jì)時(shí)而不是線程堆棧采樣),以及一個(gè)重新設(shè)計(jì)的作業(yè)數(shù)據(jù)流圖形表示,用顏色編碼和繁忙度和背壓比率表示。

          • CPU flame graphs in Web UI

          性能分析的第二個(gè)問(wèn)題是:在所有有性能瓶頸的operators中,哪些operator的工作開(kāi)銷是最昂貴的?

          回答這個(gè)問(wèn)題,最直觀的就是看CPU的火焰圖:

          1. 當(dāng)前哪些方法在消耗CPU的資源?

          2. 各個(gè)方法消耗的CPU的資源的多少對(duì)比?

          3. 堆棧上的哪些調(diào)用會(huì)導(dǎo)致執(zhí)行特定的方法?

          火焰圖是跟蹤堆棧線程然后重復(fù)多次采樣而生成的。每個(gè)方法的調(diào)用都會(huì)有一個(gè)長(zhǎng)方型表示,長(zhǎng)方型的長(zhǎng)度和它在采樣中出現(xiàn)的次數(shù)成正比。啟用后,可以在Operator UI上查看:

          Access Latency Metrics for State

          還有一個(gè)性能瓶頸的地方可能是backend state, 特別是當(dāng)你的狀態(tài)大小大于Flink當(dāng)前可用的主內(nèi)存并且你使用的是RockDB存儲(chǔ)你的狀態(tài)。

          這并不是說(shuō)RockDB慢,而是它在一定的條件下才能實(shí)現(xiàn)良好的性能。如果在云上使用了錯(cuò)誤的硬盤資源類型,可有可能導(dǎo)致RockDB對(duì)磁盤IOPs的需求不足。

          在CPU火焰圖之上,新的后端狀態(tài)延遲指標(biāo)可以幫助解狀態(tài)是否響應(yīng)。e.g. 如果您看到RocksDB狀態(tài)訪問(wèn)開(kāi)始花費(fèi)幾毫秒的時(shí)間,可能需要查看您的內(nèi)存和I/O配置。這些指標(biāo)可以通過(guò)設(shè)置state.backend.rocksdb.latency-track-enabled可選項(xiàng)來(lái)激活使用。指標(biāo)抽樣收集應(yīng)該對(duì)RocksDB狀態(tài)后端性能有很小的影響。

          Switching State Backend with savepoints

          當(dāng)需要從savepoint中回復(fù)Flink Job的時(shí)候,現(xiàn)在可以更改state backend。 這就意味著Flink的應(yīng)用的狀態(tài)不再鎖定在程序最初啟動(dòng)時(shí)使用的狀態(tài)了。e.g. 基于這個(gè)特性,我們可以在開(kāi)始時(shí)使用HashMap來(lái)記錄狀態(tài)(純粹在JVM中), 然后再狀態(tài)增長(zhǎng)太大的時(shí)候切換到RockDB來(lái)記錄狀態(tài)。

          實(shí)際上,F(xiàn)link現(xiàn)在有了規(guī)范的Savepoint格式,當(dāng)為Savepoint創(chuàng)建數(shù)據(jù)快照時(shí),所有狀態(tài)后端都使用這種格式。

          User-specified pod templates for Kubernetes deployments 在native K8S 部署模式下,用戶可以指定pod模板。

          使用這些模板,用戶可以以Kubernetes-y的方式配置JobManagers和TaskManagers,其靈活性超出了直接內(nèi)置到Flink的Kubernetes集成中的配置選項(xiàng)。

          Unaligned Checkpoints - production-ready

          非對(duì)齊的checkpoint可以在生產(chǎn)中使用了。如果你想在背壓狀態(tài)下看到程序的問(wèn)題,鼓勵(lì)使用unaligned checkpoints.

          下面這些改變使unaligned checkpoints更容易使用:

          1. 在可以從unaligned checkpoints重新調(diào)整應(yīng)用程序。如果您的應(yīng)用程序由于無(wú)法(負(fù)擔(dān)不起)創(chuàng)建Savepoints而需要從checkpoints進(jìn)行擴(kuò)展,那么這將非常方便

          2. 對(duì)于沒(méi)有back-pressured的應(yīng)用程序,啟用unaligned checkpoints成本更低。unaligned checkpoints現(xiàn)在可以通過(guò)超時(shí)自適應(yīng)地觸發(fā),這意味著一個(gè)checkpoint作為一個(gè)對(duì)齊的checkpoint開(kāi)始(不存儲(chǔ)任何飛行中的事件),并回落到一個(gè)未對(duì)齊的checkpoint(存儲(chǔ)一些飛行中的事件),如果對(duì)齊階段花費(fèi)的時(shí)間超過(guò)了一定的時(shí)間.

          Machine Learning Library moving to a separate repository

          為了加快Flink機(jī)器學(xué)習(xí)(流、批處理和統(tǒng)一機(jī)器學(xué)習(xí))的開(kāi)發(fā), 我們把主要經(jīng)歷放在Flink項(xiàng)目下的新庫(kù)Flink-ml。在這里,我們遵循類似于Stateful Functions的方法,通過(guò)允許更多輕量級(jí)貢獻(xiàn)工作流和單獨(dú)的發(fā)布周期,單獨(dú)的存儲(chǔ)庫(kù)幫助加快了開(kāi)發(fā)。

          請(qǐng)繼續(xù)關(guān)注機(jī)器學(xué)習(xí)方面的更多更新,比如與ALink (Flink上許多常見(jiàn)的機(jī)器學(xué)習(xí)算法套件)的相互作用,或者Flink與TensorFlow的集成。

          SQL和表的接口改進(jìn)

          與以前的版本一樣,SQL和Table API仍然需求量最大的部分。

          Windows via Table-valued functions 表值函數(shù)定義窗口

          定義時(shí)間窗口是流SQL查詢中最常見(jiàn)的操作之一。Flink 1.13介紹了一種定義窗口的新方法: 通過(guò)表值函數(shù)。這種方法不僅表達(dá)能力更強(qiáng)(允許您定義新的窗口類型),而且完全符合SQL標(biāo)準(zhǔn)。

          Flink 1.13支持新的語(yǔ)法中的TUMBLE和HOP窗口,后續(xù)的版本中還會(huì)有SESSION窗口。為了演示增加的表達(dá)能力,考慮下面兩個(gè)例子。

          -- 一種新的 CUMULATE 窗函數(shù),它給窗口分配一個(gè)擴(kuò)展步長(zhǎng)直到達(dá)到最大窗口大小:
          SELECT window_time, window_start, window_end, SUM(price) AS total_price
          FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
          GROUP BY window_start, window_end, window_time;

          可以引用表值窗口函數(shù)的窗口開(kāi)始時(shí)間和窗口結(jié)束時(shí)間,從而使新類型的構(gòu)造成為可能。除了常規(guī)的窗口聚合和窗口連接之外,現(xiàn)在可以表示窗口Top-K聚合:

          SELECT window_time, ...
          FROM (
          SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC)
          as rank
          FROM t
          ) WHERE rank <= 100;

          改進(jìn)DataStream API和Table API/SQL之間的互操轉(zhuǎn)換

          這個(gè)版本從根本上簡(jiǎn)化了DataStream API和Table API程序的混合。Table API是開(kāi)發(fā)應(yīng)用程序的好方法,它具有聲明特性和許多內(nèi)置函數(shù)。 但有時(shí)需要轉(zhuǎn)到使用DataStream API,以獲得其表達(dá)性、靈活性和對(duì)狀態(tài)的顯式控制。

          新的方法StreamTableEnvironment.toDataStream()/.fromDataStream() 從DataStream API創(chuàng)建一個(gè)DataStream作為表的Source或者Sink.

          顯著的改進(jìn)有:

          1. DataStream和Table API類型自動(dòng)類型轉(zhuǎn)換

          2. Event Time配置的無(wú)縫集成; 為了高一致性,水印在邊界流動(dòng)

          3. 對(duì)Row類(表示來(lái)自Table API的行事件)的增強(qiáng)已經(jīng)得到了重大改進(jìn)(改進(jìn)了toString()/hashCode()/equals()方法的行為),現(xiàn)在支持通過(guò)名稱訪問(wèn)實(shí)例屬性值,并支持稀疏表示。

          Table table=tableEnv.fromDataStream(
          dataStream,Schema.newBuilder()
          .columnByMetadata("rowtime","TIMESTAMP(3)")
          .watermark("rowtime","SOURCE_WATERMARK()")
          .build());

          DataStream dataStream=tableEnv.toDataStream(table)
          .keyBy(r->r.getField("user"))
          .window(...)
          SQL Client: Init scripts and Statement Sets

          SQL客戶端是一種直接運(yùn)行和部署SQL流作業(yè)和批處理作業(yè)的簡(jiǎn)便方法,不需要命令行編寫代碼,也不需要CI/CD的支持。

          本版本改進(jìn)了許多SQL客戶端的功能,幾乎Java應(yīng)用中可以使用所有的operations都可以在SQL客戶端或者SQL腳本中使用。也就是說(shuō)SQL用戶將寫更少的代碼。

          Easier Configuration and Code Sharing 更簡(jiǎn)單的配置和代碼共享

          SQL客戶端將停止對(duì)YAML文件的支持,轉(zhuǎn)而在執(zhí)行主SQL腳本前接受一個(gè)或者多個(gè)初始化腳本來(lái)配置session.

          這些初始化的腳本通常在軟對(duì)或者部署之間共享,可以用于加載公共的catalogs,應(yīng)用公共配置設(shè)置或者定義標(biāo)準(zhǔn)視圖。

          ./sql-client.sh -i init1.sql init2.sql -f sqljob.sql
          更多的配置選項(xiàng)

          一組更大的可識(shí)別配置選項(xiàng)和改進(jìn)的set/RESET命令使得在SQL客戶端和SQL腳本中定義和控制應(yīng)用程序的執(zhí)行變得更容易。

          在一個(gè)上下文中支持多查詢

          支持在一個(gè)Flink Job中執(zhí)行多個(gè)SQL語(yǔ)句查詢,這對(duì)無(wú)限流中的SQL查詢非常有用。

          Statement Set是將應(yīng)該放在一起執(zhí)行的查詢分組在一起的機(jī)制

          下面是一個(gè)可以通過(guò)Flink SQL命令行客戶端運(yùn)行的SQL腳本例子。 它設(shè)置和配置環(huán)境并執(zhí)行多個(gè)查詢。 該腳本捕獲端到端查詢和所有環(huán)境構(gòu)建和配置工作,使其成為自包含的artifact。

          -- set up a catalog
          CREATE CATALOG hive_catalog WITH ('type' = 'hive');
          USE CATALOG hive_catalog;

          -- or use temporary objects
          CREATE TEMPORARY TABLE clicks (
          user_id BIGINT,
          page_id BIGINT,
          viewtime TIMESTAMP
          ) WITH (
          'connector' = 'kafka',
          'topic' = 'clicks',
          'properties.bootstrap.servers' = '...',
          'format' = 'avro'
          );

          -- set the execution mode for jobs
          SET execution.runtime-mode=streaming;

          -- set the sync/async mode for INSERT INTOs
          SET table.dml-sync=false;

          -- set the job's parallelism
          SET parallism.default=10;

          -- set the job name
          SET pipeline.name = my_flink_job;

          -- restore state from the specific savepoint path
          SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-bb0dab;

          BEGIN STATEMENT SET;

          INSERT INTO pageview_pv_sink
          SELECT page_id, count(1) FROM clicks GROUP BY page_id;

          INSERT INTO pageview_uv_sink
          SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;

          END;
          Hive查詢語(yǔ)法兼容性

          現(xiàn)在可以使用Hive SQL語(yǔ)法編寫針對(duì)Flink的SQL查詢。 除了Hive的DDL方言,F(xiàn)link現(xiàn)在也接受常用的Hive DML和DQL方言。

          要使用Hive SQL方言,設(shè)置 table.sql-dialect 為 hive并加載 HiveModule 。 HiveModule 的加載很重要,因?yàn)镠ive的內(nèi)置函數(shù)需要適當(dāng)?shù)恼Z(yǔ)法和語(yǔ)義兼容性。 下面的例子說(shuō)明了這一點(diǎn):

          CREATE CATALOG myhive WITH ('type' = 'hive'); -- setup HiveCatalog
          USE CATALOG myhive;
          LOAD MODULE hive; -- setup HiveModule
          USE MODULES hive,core;
          SET table.sql-dialect = hive; -- enable Hive dialect
          SELECT key, value FROM src CLUSTER BY key; -- run some Hive queries

          請(qǐng)注意,Hive方言不再支持Flink的SQL語(yǔ)法的DML和DQL語(yǔ)句, 需要切換回Flink語(yǔ)法的默認(rèn)方言。

          改進(jìn)SQL時(shí)間函數(shù)的行為

          處理時(shí)間是任何數(shù)據(jù)處理的關(guān)鍵要素。 但同時(shí),處理包含不同的時(shí)區(qū)、日期和時(shí)間的數(shù)據(jù)時(shí)是一項(xiàng)非常精細(xì)的任務(wù)。

          在Flink 1.13。 官方花了很多精力簡(jiǎn)化與時(shí)間相關(guān)的函數(shù)的使用。 調(diào)整了(更具體地)函數(shù)的返回類型,如 PROCTIME() 、 CURRENT_TIMESTAMP 、 NOW()。

          此外,您現(xiàn)在還可以在 TIMESTAMP_LTZ 列上定義event time屬性,以便在Daylight Saving Time的支持下優(yōu)雅地進(jìn)行窗口處理。

          PyFlink的改進(jìn)

          PyFlink這個(gè)版本的主要的主題是讓Python DataStream API和Table API在特性上更接近Java/Scala API。

          Python DataStream API中的有狀態(tài)操作

          在Flink 1.13中, Python程序員現(xiàn)在也可以充分享受Apache Flink的有狀態(tài)流處理api的潛力。 Flink 1.12中引入的重新架構(gòu)過(guò)的Python DataStream API,現(xiàn)在擁有完整的狀態(tài)功能,允許用戶記住狀態(tài)中的事件的信息,并在以后對(duì)其進(jìn)行操作。

          這種有狀態(tài)處理能力是許多更復(fù)雜的處理操作的基礎(chǔ),這些操作需要記住跨單個(gè)事件的信息(例如,Windowing operations)。

          下面這個(gè)例子展示了一個(gè)自定義計(jì)數(shù)窗口的實(shí)現(xiàn),使用state:

          class CountWindowAverage(FlatMapFunction):
          def __init__(self, window_size):
          self.window_size = window_size

          def open(self, runtime_context: RuntimeContext):
          descriptor = ValueStateDescriptor("average", Types.TUPLE([Types.LONG(), Types.LONG()]))
          self.sum = runtime_context.get_state(descriptor)

          def flat_map(self, value):
          current_sum = self.sum.value()
          if current_sum is None:
          current_sum = (0, 0)
          # update the count
          current_sum = (current_sum[0] + 1, current_sum[1] + value[1])
          # if the count reaches window_size, emit the average and clear the state
          if current_sum[0] >= self.window_size:
          self.sum.clear()
          yield value[0], current_sum[1] // current_sum[0]
          else:
          self.sum.update(current_sum)

          ds = ... # type: DataStream
          ds.key_by(lambda row: row[0]) \
          .flat_map(CountWindowAverage(5))
          PyFlink DataStream API中用戶自定義窗口

          Flink 1.13為PyFlink DataStream API增加了對(duì)用戶定義的窗口的支持。 Flink程序現(xiàn)在可以在標(biāo)準(zhǔn)窗口定義之外使用窗口。

          因?yàn)榇翱谑撬刑幚頍o(wú)限流的程序的核心(通過(guò)將無(wú)限流分割成有大小的“桶”),這大大提高了API的表達(dá)能力。

          PyFlink Table API中基于行的操作

          Python Table API現(xiàn)在支持基于行的操作,例如,自定義的行轉(zhuǎn)換函數(shù)。 這些函數(shù)是在內(nèi)置函數(shù)之外對(duì)表應(yīng)用數(shù)據(jù)轉(zhuǎn)換的一種簡(jiǎn)單方法。

          這是一個(gè)在Python Table API中使用map()操作的例子:

          @udf(result_type=DataTypes.ROW(
          [DataTypes.FIELD("c1", DataTypes.BIGINT()),
          DataTypes.FIELD("c2", DataTypes.STRING())]))
          def increment_column(r: Row) -> Row:
          return Row(r[0] + 1, r[1])

          table = ... # type: Table
          mapped_result = table.map(increment_column)

          除了map()之外,該API還支持flat_map()、aggregate()、flat_aggregate()和其他基于行操作的函數(shù)。 這使得Python Table API與Java Table API在特性上更相近了。

          PyFlink DataStream程序的批處理執(zhí)行模式

          PyFlink DataStream API現(xiàn)在也支持有界流的批處理執(zhí)行模式,這是在Flink 1.12中為Java DataStream API引入的。

          批處理執(zhí)行模式通過(guò)利用有界流的特性,繞過(guò)狀態(tài)后端和檢查點(diǎn),簡(jiǎn)化了有界流上的操作并提高了程序的性能。

          其他改進(jìn)

          使用Hugo查看Flink Documentation

          Flink文檔已經(jīng)從Jekyll遷移到了Hugo。

          Web UI中的歷史異常

          Flink Web UI將顯示多個(gè)(n個(gè))導(dǎo)致作業(yè)失敗的異常。 這有助于調(diào)試由根故障導(dǎo)致后續(xù)故障的場(chǎng)景。 可以在異常歷史記錄中找到失敗的根本原因。

          更好的報(bào)告失敗的checkpoints的異?;蛘呤〉脑?/span>

          Flink現(xiàn)在提供了失敗或被中止的檢查點(diǎn)的統(tǒng)計(jì)信息,以便更容易地確定失敗原因,而不必分析日志

          以前版本的Flink只有在檢查點(diǎn)成功的情況下才會(huì)報(bào)告指標(biāo)(例如,持久化數(shù)據(jù)的大小、觸發(fā)時(shí)間)。

          PyFlink Table API支持用戶在Group Windows自定義聚合函數(shù)

          PyFlink的Table API中的組窗口現(xiàn)在同時(shí)支持一般的Python用戶定義聚合函數(shù)(udaf)和Pandas udaf。 這些功能對(duì)于許多分析和ML訓(xùn)練程序是至關(guān)重要的。

          Flink 1.13對(duì)以前的版本進(jìn)行了改進(jìn),在以前的版本中,這些函數(shù)只支持無(wú)界的Group-by聚合。

          改進(jìn)Batch Execution下的Sort-Merge Shuffle

          Flink 1.13提高了批執(zhí)行程序的內(nèi)存穩(wěn)定性和Sort-Merge Shuffle的性能,F(xiàn)link 1.12最初是通過(guò)FLIP-148引入的。

          具有更高并行度(1000秒)的程序應(yīng)該不再頻繁觸發(fā)OutOfMemoryError: Direct Memory。 通過(guò)更好的I/O調(diào)度和廣播優(yōu)化提高了性能(特別是在旋轉(zhuǎn)磁盤上)。

          HBase連接器支持異步查找和查找緩存

          HBase Lookup Table Source現(xiàn)在支持異步查找模式和查找緩存。 這極大地提高了對(duì)HBase進(jìn)行查找連接的Table/SQL作業(yè)的性能,同時(shí)減少了典型場(chǎng)景下對(duì)HBase的I/O請(qǐng)求。

          在以前的版本中,HBase Lookup Source只進(jìn)行同步通信,導(dǎo)致管道利用率和吞吐量較低。

          九、Flink1.14版本

          批流一體

          流批一體其實(shí)從 Flink 1.9 版本開(kāi)始就受到持續(xù)的關(guān)注,它作為社區(qū) RoadMap 的重要組成部分,是大數(shù)據(jù)實(shí)時(shí)化必然的趨勢(shì)。但是另一方面,傳統(tǒng)離線的計(jì)算需求其實(shí)并不會(huì)被實(shí)時(shí)任務(wù)完全取代,而是會(huì)長(zhǎng)期存在。

          在實(shí)時(shí)和離線的需求同時(shí)存在的狀態(tài)下,以往的流批獨(dú)立技術(shù)方案存在著一些痛點(diǎn),比如:

          • 需要維護(hù)兩套系統(tǒng),相應(yīng)的就需要兩組開(kāi)發(fā)人員,人力的投入成本很高;

          • 另外,兩套數(shù)據(jù)鏈路處理相似內(nèi)容帶來(lái)維護(hù)的風(fēng)險(xiǎn)性和冗余;

          • 最重要的一點(diǎn)是,如果流批使用的不是同一套數(shù)據(jù)處理系統(tǒng),引擎本身差異可能會(huì)存在數(shù)據(jù)口徑不一致的問(wèn)題,從而導(dǎo)致業(yè)務(wù)數(shù)據(jù)存在一定的誤差。這種誤差對(duì)于大數(shù)據(jù)分析會(huì)有比較大的影響。

          在這樣的背景下,F(xiàn)link 社區(qū)認(rèn)定了實(shí)時(shí)離線一體化的技術(shù)路線是比較重要的技術(shù)趨勢(shì)和方向。

          Flink 在過(guò)去的幾個(gè)版本中,在流批一體方面做了很多的工作??梢哉J(rèn)為 Flink 在引擎層面,API 層面和算子的執(zhí)行層面上做到了真正的流與批用同一套機(jī)制運(yùn)行。但是在任務(wù)具體的執(zhí)行模式上會(huì)有 2 種不同的模式:

          對(duì)于無(wú)限的數(shù)據(jù)流,統(tǒng)一采用了流的執(zhí)行模式。流的執(zhí)行模式指的是所有計(jì)算節(jié)點(diǎn)是通過(guò) Pipeline 模式去連接的,Pipeline 是指上游和下游計(jì)算任務(wù)是同時(shí)運(yùn)行的,隨著上游不斷產(chǎn)出數(shù)據(jù),下游同時(shí)在不斷消費(fèi)數(shù)據(jù)。這種全 Pipeline 的執(zhí)行方式可以:

          • 通過(guò) eventTime 表示數(shù)據(jù)是什么時(shí)候產(chǎn)生的;

          • 通過(guò) watermark 得知在哪個(gè)時(shí)間點(diǎn),數(shù)據(jù)已經(jīng)到達(dá)了;

          • 通過(guò) state 來(lái)維護(hù)計(jì)算中間狀態(tài);

          • 通過(guò) Checkpoint 做容錯(cuò)的處理。

          下圖是不同的執(zhí)行模式:

          • 對(duì)于有限的數(shù)據(jù)集有 2 種執(zhí)行模式,我們可以把它看成一個(gè)有限的數(shù)據(jù)流去做處理,也可以把它看成批的執(zhí)行模式。批的執(zhí)行模式雖然也有 eventTime,但是對(duì)于 watermark 來(lái)說(shuō)只支持正無(wú)窮。對(duì)數(shù)據(jù)和 state 排序后,它在任務(wù)的調(diào)度和 shuffle 上會(huì)有更多的選擇。

          流批的執(zhí)行模式是有區(qū)別的,最主要的就是批的執(zhí)行模式會(huì)有落盤的中間過(guò)程,只有當(dāng)前面任務(wù)執(zhí)行完成,下游的任務(wù)才會(huì)觸發(fā),這個(gè)容錯(cuò)機(jī)制是通過(guò) shuffle 進(jìn)行容錯(cuò)的。

          這 2 者也各有各的執(zhí)行優(yōu)勢(shì):

          • 對(duì)于流的執(zhí)行模式來(lái)說(shuō),它沒(méi)有落盤的壓力,同時(shí)容錯(cuò)是基于數(shù)據(jù)的分段,通過(guò)不斷對(duì)數(shù)據(jù)進(jìn)行打點(diǎn) Checkpoint 去保證斷點(diǎn)恢復(fù);

          • 然而在批處理上,因?yàn)橐?jīng)過(guò) shuffle 落盤,所以對(duì)磁盤會(huì)有壓力。但是因?yàn)閿?shù)據(jù)是經(jīng)過(guò)排序的,所以對(duì)批來(lái)說(shuō),后續(xù)的計(jì)算效率可能會(huì)有一定的提升。同時(shí),在執(zhí)行時(shí)候是經(jīng)過(guò)分段去執(zhí)行任務(wù)的,無(wú)需同時(shí)執(zhí)行。在容錯(cuò)計(jì)算方面是根據(jù) stage 進(jìn)行容錯(cuò)。

          這兩種各有優(yōu)劣,可以根據(jù)作業(yè)的具體場(chǎng)景來(lái)進(jìn)行選擇。

          Flink 1.14 的優(yōu)化點(diǎn)主要是針對(duì)在流的執(zhí)行模式下,如何去處理有限數(shù)據(jù)集。之前處理無(wú)限數(shù)據(jù)集,和現(xiàn)在處理有限數(shù)據(jù)集最大的區(qū)別在于引入了 "任務(wù)可能會(huì)結(jié)束" 的概念。在這種情況下帶來(lái)一些新的問(wèn)題,如下圖:

          在流的執(zhí)行模式下的 Checkpoint 機(jī)制

          • 對(duì)于無(wú)限流,它的 Checkpoint 是由所有的 source 節(jié)點(diǎn)進(jìn)行觸發(fā)的,由 source 節(jié)點(diǎn)發(fā)送 Checkpoint Barrier ,當(dāng) Checkpoint Barrier 流過(guò)整個(gè)作業(yè)時(shí)候,同時(shí)會(huì)存儲(chǔ)當(dāng)前作業(yè)所有的 state 狀態(tài)。

          • 而在有限流的 Checkpoint 機(jī)制中,Task 是有可能提早結(jié)束的。上游的 Task 有可能先處理完任務(wù)提早退出了,但下游的 Task 卻還在執(zhí)行中。在同一個(gè) stage 不同并發(fā)下,有可能因?yàn)閿?shù)據(jù)量不一致導(dǎo)致部分任務(wù)提早完成了。這種情況下,在后續(xù)的執(zhí)行作業(yè)中,如何進(jìn)行 Checkpoint?

          在 1.14 中,JobManager 動(dòng)態(tài)根據(jù)當(dāng)前任務(wù)的執(zhí)行情況,去明確 Checkpoint Barrier 是從哪里開(kāi)始觸發(fā)。同時(shí)在部分任務(wù)結(jié)束后,后續(xù)的 Checkpoint 只會(huì)保存仍在運(yùn)行 Task 所對(duì)應(yīng)的 stage,通過(guò)這種方式能夠讓任務(wù)執(zhí)行完成后,還可以繼續(xù)做 Checkpoint ,在有限流執(zhí)行中提供更好的容錯(cuò)保障。

          Task 結(jié)束后的兩階段提交

          我們?cè)诓糠?Sink 使用上,例如下圖的 Kafka Sink 上,涉及到 Task 需要依靠 Checkpoint 機(jī)制,進(jìn)行二階段提交,從而保證數(shù)據(jù)的 Exactly-once 一致性。

          具體可以這樣說(shuō):在 Checkpoint 過(guò)程中,每個(gè)算子只會(huì)進(jìn)行準(zhǔn)備提交的操作。比如數(shù)據(jù)會(huì)提交到外部的臨時(shí)存儲(chǔ)目錄下,所有任務(wù)都完成這次 Checkpoint 后會(huì)收到一個(gè)信號(hào),之后才會(huì)執(zhí)行正式的 commit,把所有分布式的臨時(shí)文件一次性以事務(wù)的方式提交到外部系統(tǒng)。

          這種算法在當(dāng)前有限流的情況下,作業(yè)結(jié)束后并不能保證有 Checkpoint,那么最后一部分?jǐn)?shù)據(jù)如何提交?

          在 1.14 中,這個(gè)問(wèn)題得到了解決。Task 處理完所有數(shù)據(jù)之后,必須等待 Checkpoint 完成后才可以正式的退出,這是流批一體方面針對(duì)有限流任務(wù)結(jié)束的一些改進(jìn)。

          Checkpoint 機(jī)制
          1. 現(xiàn)有 Checkpoint 機(jī)制痛點(diǎn)

          目前 Flink 觸發(fā) Checkpoint 是依靠 barrier 在算子間進(jìn)行流通,barrier 隨著算子一直往下游進(jìn)行發(fā)送,當(dāng)算子下游遇到 barrier 的時(shí)候就會(huì)進(jìn)行快照操作,然后再把 barrier 往下游繼續(xù)發(fā)送。對(duì)于多路的情況我們會(huì)把 barrier 進(jìn)行對(duì)齊,把先到 barrier 的這一路數(shù)據(jù)暫時(shí)性的 block,等到兩路 barrier 都到了之后再做快照,最后才會(huì)去繼續(xù)往下發(fā)送 barrier。

          現(xiàn)有的 Checkpoint 機(jī)制存在以下問(wèn)題:

          • 反壓時(shí)無(wú)法做出 Checkpoint :在反壓時(shí)候 barrier 無(wú)法隨著數(shù)據(jù)往下游流動(dòng),造成反壓的時(shí)候無(wú)法做出 Checkpoint。但是其實(shí)在發(fā)生反壓情況的時(shí)候,我們更加需要去做出對(duì)數(shù)據(jù)的 Checkpoint,因?yàn)檫@個(gè)時(shí)候性能遇到了瓶頸,是更加容易出問(wèn)題的階段;

          • Barrier 對(duì)齊阻塞數(shù)據(jù)處理 :阻塞對(duì)齊對(duì)于性能上存在一定的影響;

          • 恢復(fù)性能受限于 Checkpoint 間隔 :在做恢復(fù)的時(shí)候,延遲受到多大的影響很多時(shí)候是取決于 Checkpoint 的間隔,間隔越大,需要 replay 的數(shù)據(jù)就會(huì)越多,從而造成中斷的影響也就會(huì)越大。但是目前 Checkpoint 間隔受制于持久化操作的時(shí)間,所以沒(méi)辦法做的很快。

          2. Unaligned Checkpoint

          針對(duì)這些痛點(diǎn),F(xiàn)link 在最近幾個(gè)版本一直在持續(xù)的優(yōu)化,Unaligned Checkpoint 就是其中一個(gè)機(jī)制。barrier 算子在到達(dá) input buffer 最前面的時(shí)候,就會(huì)開(kāi)始觸發(fā) Checkpoint 操作。它會(huì)立刻把 barrier 傳到算子的 OutPut Buffer 的最前面,相當(dāng)于它會(huì)立刻被下游的算子所讀取到。通過(guò)這種方式可以使得 barrier 不受到數(shù)據(jù)阻塞,解決反壓時(shí)候無(wú)法進(jìn)行 Checkpoint 的問(wèn)題。

          當(dāng)我們把 barrier 發(fā)下去后,需要做一個(gè)短暫的暫停,暫停的時(shí)候會(huì)把算子的 State 和 input output buffer 中的數(shù)據(jù)進(jìn)行一個(gè)標(biāo)記,以方便后續(xù)隨時(shí)準(zhǔn)備上傳。對(duì)于多路情況會(huì)一直等到另外一路 barrier 到達(dá)之前數(shù)據(jù),全部進(jìn)行標(biāo)注。

          通過(guò)這種方式整個(gè)在做 Checkpoint 的時(shí)候,也不需要對(duì) barrier 進(jìn)行對(duì)齊,唯一需要做的停頓就是在整個(gè)過(guò)程中對(duì)所有 buffer 和 state 標(biāo)注。這種方式可以很好的解決反壓時(shí)無(wú)法做出 Checkpoint ,和 Barrier 對(duì)齊阻塞數(shù)據(jù)影響性能處理的問(wèn)題。

          3. Generalized Incremental Checkpoint

          Generalized Incremental Checkpoint 主要是用于減少 Checkpoint 間隔,如左圖 1 所示,在 Incremental Checkpoint 當(dāng)中,先讓算子寫入 state 的 changelog。寫完后才把變化真正的數(shù)據(jù)寫入到 StateTable 上。state 的 changelog 不斷向外部進(jìn)行持久的存儲(chǔ)化。在這個(gè)過(guò)程中我們其實(shí)無(wú)需等待整個(gè) StateTable 去做一個(gè)持久化操作,我們只需要保證對(duì)應(yīng)的 Checkpoint 這一部分的 changelog 能夠持久化完成,就可以開(kāi)始做下一次 Checkpoint。StateTable 是以一個(gè)周期性的方式,獨(dú)立的去對(duì)外做持續(xù)化的一個(gè)過(guò)程。

          這兩個(gè)過(guò)程進(jìn)行拆分后,就有了從之前的需要做全量持久化 (Per Checkpoint) 變成 增量持久化 (Per Checkpoint) + 后臺(tái)周期性全量持久化,從而達(dá)到同樣容錯(cuò)的效果。在這個(gè)過(guò)程中,每一次 Checkpoint 需要做持久化的數(shù)據(jù)量減少了,從而使得做 Checkpoint 的間隔能夠大幅度減少。

          其實(shí)在 RocksDB 也是能支持 Incremental Checkpoint 。但是有兩個(gè)問(wèn)題:

          • 第一個(gè)問(wèn)題是 RocksDB 的 Incremental Checkpoint 是依賴它自己本身的一些實(shí)現(xiàn),當(dāng)中會(huì)存在一些數(shù)據(jù)壓縮,壓縮所消耗的時(shí)間以及壓縮效果具有不確定性,這個(gè)是和數(shù)據(jù)是相關(guān)的;

          • 第二個(gè)問(wèn)題是只能針對(duì)特定的 StateBackend 來(lái)使用,目前在做的 Generalized Incremental Checkpoint 實(shí)際上能夠保證的是,它與 StateBackend 是無(wú)關(guān)的,從運(yùn)行時(shí)的機(jī)制來(lái)保證了一個(gè)比較穩(wěn)定、更小的 Checkpoint 間隔。 目前 Unaligned Checkpoint 是在 Flink 1.13 就已經(jīng)發(fā)布了,在 1.14 版本主要是針對(duì) bug 的修復(fù)和補(bǔ)充,針對(duì) Generalized Incremental Checkpoint,目前社區(qū)還在做最后的沖刺,比較有希望在 1.14 中和大家見(jiàn)面。

          性能與效率
          1. 大規(guī)模作業(yè)調(diào)度的優(yōu)化

          構(gòu)建 Pipeline Region 的性能提升:所有由 pipline 邊所連接構(gòu)成的子圖 。在 Flink 任務(wù)調(diào)度中需要通過(guò)識(shí)別 Pipeline Region 來(lái)保證由同一個(gè) Pipline 邊所連接的任務(wù)能夠同時(shí)進(jìn)行調(diào)度。否則有可能上游的任務(wù)開(kāi)始調(diào)度,但是下游的任務(wù)并沒(méi)有運(yùn)行。從而導(dǎo)致上游運(yùn)行完的數(shù)據(jù)無(wú)法給下游的節(jié)點(diǎn)進(jìn)行消費(fèi),可能會(huì)造成死鎖的情況 任務(wù)部署階段:每個(gè)任務(wù)都要從哪些上游讀取數(shù)據(jù),這些信息會(huì)生成 Result Partition Deployment Descriptor。 這兩個(gè)構(gòu)建過(guò)程在之前的版本都有 O (n^2) 的時(shí)間復(fù)雜度,主要問(wèn)題需要對(duì)于每個(gè)下游節(jié)點(diǎn)去遍歷每一個(gè)上游節(jié)點(diǎn)的情況。例如去遍歷每一個(gè)上游是不是一個(gè) Pipeline 邊連接的關(guān)系,或者去遍歷它的每一個(gè)上游生成對(duì)應(yīng)的 Result Partition 信息。

          目前通過(guò)引入 group 概念,假設(shè)已知上下游 2 個(gè)任務(wù)的連接方式是 all-to-all,那相當(dāng)于把所有 Pipeline Region 信息或者 Result Partition 信息以 Group 的形式進(jìn)行組合,這樣只需知道下游對(duì)應(yīng)的是上游的哪一個(gè) group,就可以把一個(gè) O (n^2) 的復(fù)雜度優(yōu)化到了 O (n)。我們用 wordcount 任務(wù)做了一下測(cè)試,對(duì)比優(yōu)化前后的性能。

          從表格中可以看到構(gòu)建速度具有大幅度提升,構(gòu)建 Pipeline Region 的性能從秒級(jí)提升至毫秒級(jí)別。任務(wù)部署我們是從第一個(gè)任務(wù)開(kāi)始部署到所有任務(wù)開(kāi)始運(yùn)行的狀態(tài),這邊只統(tǒng)計(jì)了流,因?yàn)榕枰嫌谓Y(jié)束后才能結(jié)束調(diào)度。從整體時(shí)間來(lái)看,整個(gè)任務(wù)初始化,調(diào)度以及部署的階段,大概能夠減少分鐘級(jí)的時(shí)間消耗。

          1. 細(xì)粒度資源管理

          細(xì)粒度資源管理在過(guò)去很多的版本都一直在做,在 Flink1.14 終于可以把這一部分 API 開(kāi)放出來(lái)在 DataSteam 提供給用戶使用了。用戶可以在 DataStream 中自定義 SlotSharingGroup 的劃分情況,如下圖所示的方式去定義 Slot 的資源劃分,實(shí)現(xiàn)了支持 DataStream API,自定義 SSG 劃分方式以及資源配置 TaskManager 動(dòng)態(tài)資源扣減。

          對(duì)于每一個(gè) Slot 可以通過(guò)比較細(xì)粒度的配置,我們?cè)?Runtime 上會(huì)自動(dòng)根據(jù)用戶資源配置進(jìn)行動(dòng)態(tài)的資源切割。

          這樣做的好處是不會(huì)像之前那樣有固定資源的 Slot,而是做資源的動(dòng)態(tài)扣減,通過(guò)這樣的方式希望能夠達(dá)到更加精細(xì)的資源管理和資源的使用率。

          Table / SQL / Python API

          1. Table API / SQL

          Window Table-Valued Function 支持更多算子與窗口類型 ,可以看如下表格的對(duì)比:

          從表格中可以看出對(duì)于原有的三個(gè)窗口類型進(jìn)行加強(qiáng),同時(shí)新增 Session 窗口類型,目前支持 Aggregate 的操作。

          1.1 支持聲明式注冊(cè) Source/Sink

          • Table API 支持使用聲明式的方式注冊(cè) Source / Sink 功能對(duì)齊 SQL DDL;

          • 同時(shí)支持 FLIP-27 新的 Source 接口;

          • new Source 替代舊的 connect() 接口。

          1.2 全新代碼生成器

          解決了大家在生成代碼超過(guò) Java 最長(zhǎng)代碼限制,新的代碼生成器會(huì)對(duì)代碼進(jìn)行拆解,徹底解決代碼超長(zhǎng)的問(wèn)題。

          1.3 移除 Flink Planner

          新版本中,Blink Planner 將成為 Flink Planner 的唯一實(shí)現(xiàn)。

          2. Python API

          在之前的版本中,如果有先后執(zhí)行的兩個(gè) UDF,它的執(zhí)行過(guò)程如下圖左方。在 JVM 上面有 Java 的 Operator,先把數(shù)據(jù)發(fā)給 Python 下面的 UDF 去執(zhí)行,執(zhí)行后又發(fā)回給 Java,然后傳送給下游的 Operator,最后再進(jìn)行一次 Python 的這種跨進(jìn)程的傳輸去處理,會(huì)導(dǎo)致存在很多次冗余的數(shù)據(jù)傳輸。

          在 1.14 版本中,改進(jìn)如右圖,可以把它們連接在一起,只需要一個(gè)來(lái)回的 Java 和 Python 進(jìn)行數(shù)據(jù)通信,通過(guò)減少傳輸數(shù)據(jù)次數(shù)就能夠達(dá)到比較好的性能上的提升。

          3. 支持 LoopBack 模式

          在以往本地執(zhí)行實(shí)際是在 Python 的進(jìn)程中去運(yùn)行客戶端程序,提交 Java 進(jìn)程啟動(dòng)一個(gè)迷你集群去執(zhí)行 Java 部分代碼。Java 部分代碼也會(huì)和生產(chǎn)環(huán)境部分的一樣,去啟動(dòng)一個(gè)新的 Python 進(jìn)程去執(zhí)行對(duì)應(yīng)的 Python UDF,從圖下可以看出新的進(jìn)程其實(shí)在本地調(diào)試中是沒(méi)有必要存在的。

          所以支持 lookback 模式后可以讓 Java 的 opt 直接把 UDF 運(yùn)行在之前 Python client 所運(yùn)行的相同的進(jìn)程內(nèi),通過(guò)這種方式:

          1. 首先是避免了啟動(dòng)額外進(jìn)程所帶來(lái)的開(kāi)銷;

          2. 最重要的是在本地調(diào)試中,我們可以在同一個(gè)進(jìn)程內(nèi)能夠更好利用一些工具進(jìn)行 debug,這個(gè)是對(duì)開(kāi)發(fā)者體驗(yàn)上的一個(gè)提升。

          原文:https://developer.aliyun.com/article/789352

          官方地址:https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/


          八千里路云和月 | 從零到大數(shù)據(jù)專家學(xué)習(xí)路徑指南

          我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么?

          193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下

          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS

          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問(wèn)題小盤點(diǎn)

          我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么?

          在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)!

          硬剛Hive | 4萬(wàn)字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)

          數(shù)據(jù)治理方法論和實(shí)踐小百科全書

          標(biāo)簽體系下的用戶畫像建設(shè)小指南

          4萬(wàn)字長(zhǎng)文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析

          【面試&個(gè)人成長(zhǎng)】2021年過(guò)半,社招和校招的經(jīng)驗(yàn)之談

          大數(shù)據(jù)方向另一個(gè)十年開(kāi)啟 |《硬剛系列》第一版完結(jié)

          我寫過(guò)的關(guān)于成長(zhǎng)/面試/職場(chǎng)進(jìn)階的文章

          當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」


          你好,我是王知無(wú),一個(gè)大數(shù)據(jù)領(lǐng)域的硬核原創(chuàng)作者。

          做過(guò)后端架構(gòu)、數(shù)據(jù)中間件、數(shù)據(jù)平臺(tái)&架構(gòu)、算法工程化。

          專注大數(shù)據(jù)領(lǐng)域?qū)崟r(shí)動(dòng)態(tài)&技術(shù)提升&個(gè)人成長(zhǎng)&職場(chǎng)進(jìn)階,歡迎關(guān)注。

          瀏覽 142
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  美女黄18禁 | 欧美波多野结衣 | 亚洲免费黄色视频 | 女人裸体一级片久久久 | 91一区二 |