摘要:本文由社區(qū)志愿者苗文婷整理,內(nèi)容來源自阿里巴巴技術(shù)專家高赟(云騫) 5 月 22 日在北京站 Flink Meetup 分享的《面向流批一體的 Flink 運行時與 DataStream API 優(yōu)化》。內(nèi)容包括:
回顧 Flink 流批一體的設(shè)計
介紹針對運行時的優(yōu)化點
介紹針對 DataStream API 的優(yōu)化點
總結(jié)以及后續(xù)的一些規(guī)劃
Tips:點擊文末「閱讀原文」即可查看更多技術(shù)干貨~
GitHub 地址 
1.1 架構(gòu)介紹
首先看下 Flink 流批一體的整體邏輯。Flink 在早期的時候,雖然是一個可以同時支持流處理和批處理的框架,但是它的流處理和批處理的實現(xiàn),不管是在 API 層,還是在底下的 Shuffle、調(diào)度、算子層,都是單獨的兩套。這兩套實現(xiàn)是完全獨立的,沒有特別緊密的關(guān)聯(lián)。在流批一體這一目標(biāo)的引導(dǎo)下,F(xiàn)link 現(xiàn)在已經(jīng)對底層的算子、調(diào)度、Shuffle 進(jìn)行了統(tǒng)一的抽象,以統(tǒng)一的方式向上支持 DataStream API 和 Table API 兩套接口。DataStream API 是一種比較偏物理層的接口,Table API 是一種 Declearetive 的接口,這兩套接口對流和批來說都是統(tǒng)一的。1.2 優(yōu)點
基于 DataStream API 和 Table API,用戶可以寫同一套代碼來同時處理歷史的數(shù)據(jù)和實時的數(shù)據(jù),例如數(shù)據(jù)回流的場景。
統(tǒng)一的 Connector 和算子實現(xiàn),減少開發(fā)和維護(hù)的成本。
減少學(xué)習(xí)成本,避免學(xué)習(xí)兩套相似接口。
使用同一系統(tǒng)支持流作業(yè)和批作業(yè),減少維護(hù)成本。1.3 數(shù)據(jù)處理過程
下面簡單介紹 Flink 是怎么抽象流批一體的,F(xiàn)link 把作業(yè)拆成了兩種:這種作業(yè)就是我們平時所認(rèn)知的流作業(yè),對于這種作業(yè),F(xiàn)link 采用一個標(biāo)準(zhǔn)流的執(zhí)行模式,需要考慮記錄的時間,通過 Watermark 對齊的方式推進(jìn)整個系統(tǒng)的時間以達(dá)到一些數(shù)據(jù)聚合和輸出的目的,中間通過 State 來維護(hù)中間狀態(tài)。
數(shù)據(jù)可能是保存在文件中,或者是以其他方式提前保留下來的一個有限數(shù)據(jù)集。此時可以把有限數(shù)據(jù)集看作是無限數(shù)據(jù)集的一個特例,所以它可以自然的跑在之前的流處理模式之上,無需經(jīng)過代碼修改,可以直接支持。
但這里可能會忽略掉有限數(shù)據(jù)集數(shù)據(jù)有限的特點,在接口上還需要處理更細(xì)粒度的時間、Watermark 等語義,可能會引入額外的復(fù)雜性。另外,在性能方面,因為是按流的方式處理,在一開始就需要把所有的任務(wù)拉起來,可能需要占用更多的資源,如果采用的是 RocksDB backend,相當(dāng)于是一個大的 Hash 表,在 key 比較多的情況下,可能會有隨機(jī) IO 訪問的問題。但是在批執(zhí)行模式下,可以通過排序的方式,用一種 IO 更加友好的方式來實現(xiàn)整個數(shù)據(jù)處理的流程。所以說,批處理模式在考慮數(shù)據(jù)有限的前提下,在調(diào)度、Shuffle、算子的實現(xiàn)上都給我們提供了更大的選擇空間。最后,針對有限數(shù)據(jù)流,不管是采用哪種處理模式,我們希望最終的處理結(jié)果都是一致的。
1.4 近期演進(jìn)
Flink 在最近的幾個版本中,在 API 和實現(xiàn)層都朝著流批一體的目標(biāo)做了很多的努力。Flink 統(tǒng)一了 Table/SQL API,并引入了統(tǒng)一的 blink planner,blink planner 對流和批都會翻譯到 DataStream 算子之上。此外,對流和批還引入了統(tǒng)一的 shuffle 架構(gòu)。
針對批的 shuffle 引入了一種新的基于 Sort-Merge 的 shuffle 模式,相對于之前 Flink 內(nèi)置的 Hash shuffle,性能會有很大提升。在調(diào)度方面,F(xiàn)link 引入了一種基于 Pipeline Region 的流批一體的調(diào)度器。
完善了 Sort-Merge Shuffle,并對 Pipeline Region scheduler 在大規(guī)模作業(yè)下進(jìn)行了性能優(yōu)化。另外,前面提到過,對于有限流的兩種執(zhí)行模式,我們預(yù)期它的執(zhí)行結(jié)果應(yīng)該是一致的。但是現(xiàn)在 Flink 在作業(yè)執(zhí)行結(jié)束的時候還有一些問題,導(dǎo)致它并不能完全達(dá)到一致。
所以在 1.13 中,還有一部分的工作是針對有限數(shù)據(jù)集作業(yè),怎么在流批,尤其是在流的模式下,使它的結(jié)果和預(yù)期的結(jié)果保持一致。
需要繼續(xù)完成有限作業(yè)一致性保證、批流切換 Source、逐步廢棄 DataSet API 等工作。
2.1 大規(guī)模作業(yè)調(diào)度優(yōu)化
■ 1. 邊的時間復(fù)雜度問題
Flink 提交作業(yè)時會生成一個作業(yè)的 DAG 圖,由多個頂點組成,頂點對應(yīng)著我們實際的處理節(jié)點,如 Map。每個處理節(jié)點都會有并發(fā)度,此前 Flink 的實現(xiàn)里,當(dāng)我們把作業(yè)提交到 JM 之后,JM 會對作業(yè)展開,生成一個 Execution Graph。如下圖,作業(yè)有兩個節(jié)點,并發(fā)度分別為 2 和 3。在 JM 中實際維護(hù)的數(shù)據(jù)結(jié)構(gòu)里,會分別維護(hù) 2 個 task 和 3 個 task,并由 6 條執(zhí)行邊組成,F(xiàn)link 基于此數(shù)據(jù)結(jié)構(gòu)來維護(hù)整個作業(yè)的拓?fù)湫畔?。在這個拓?fù)湫畔⒌幕A(chǔ)上,F(xiàn)link 可以單獨維護(hù)每個 task 的狀態(tài),當(dāng)任務(wù)掛了之后以識別需要拉起的 task。如果以這種 all-to-all 的通信,也就是每兩個上下游 task 之間都有邊的情況下,上游并發(fā) * 下游并發(fā),將出現(xiàn) O(N^2) 的數(shù)據(jù)結(jié)構(gòu)。這種情況下,內(nèi)存的占用是非常驚人的,如果是 10k * 10k 的邊,JM 的內(nèi)存占用將達(dá)到 4.18G。此外,作業(yè)很多的計算復(fù)雜度都是和邊的數(shù)量相關(guān)的,此時的空間復(fù)雜度為 O(N^2) 或 O(N^3),如果是 10k * 10k 的邊,作業(yè)初次調(diào)度時間將達(dá)到 62s。可以看出,除了初始調(diào)度之外,對于批作業(yè)來說,有可能是上游執(zhí)行完之后繼續(xù)執(zhí)行下游,中間的調(diào)度復(fù)雜度都是 O(N^2) 或 O(N^3),這樣就會導(dǎo)致很大的性能開銷。另外,內(nèi)存占用很大的話,GC 的性能也不會特別好。■ 2. Execution Graph 的對稱性
針對 Flink 在大規(guī)模作業(yè)下內(nèi)存和性能方面存在的一些問題,經(jīng)過一些深入分析,可以看出上述例子中上下游節(jié)點之間是有一定對稱性的。一種是 Pointwise 型,上游和下游是一一對應(yīng)的,或者上游一個對應(yīng)下游幾個,不是全部相連的,這種情況下,邊的數(shù)量基本是線性的 O(N), 和算子數(shù)在同一個量級。
另一種是 All-to-all 型,上游每一個 task 都要和下游的每一個 task 相連,在這種情況下可以看出,每一個上游的 task 產(chǎn)生的數(shù)據(jù)集都要被下游所有的 task 消費,實際上是一個對稱的關(guān)系。只要記住上游的數(shù)據(jù)集會被下游的所有 task 來消費,就不用再單獨存中間的邊了。
所以,F(xiàn)link 在 1.13 中對上游的數(shù)據(jù)集和下游的節(jié)點分別引入了 ResultPartitionGroup 和 VertexGroup 的概念。尤其是對于 All-to-all 的邊,因為上下游之間是對稱的,可以把所有上游產(chǎn)生的數(shù)據(jù)集放到一個 Group 里,把下游所有的節(jié)點也放到一個 Group 里,在實際維護(hù)時不需要存中間的邊的關(guān)系,只需要知道上游的哪個數(shù)據(jù)集是被下游的哪個 Group 消費,或下游的哪個頂點是消費上游哪個 Group 的數(shù)據(jù)集。通過這種方式,減少了內(nèi)存的占用。另外,在實際做一些調(diào)度相關(guān)計算的時候,比如在批處理里,假如所有的邊都是 blocking 邊的情況下,每個節(jié)點都屬于一個單獨的 region。之前計算 region 之間的上下游關(guān)系,對上游的每個頂點,都需要遍歷其下游的所有頂點,所以是一個 O(N^2) 的操作。而引入 ConsumerGroup 之后,就會變成一個 O(N) 的線性操作。■ 3. 優(yōu)化結(jié)果
經(jīng)過以上數(shù)據(jù)結(jié)構(gòu)的優(yōu)化,在 10k * 10k 邊的情況下,可以將 JM 內(nèi)存占用從 4.18G 縮小到 12.08M, 初次調(diào)度時間長從 62s 縮減到 12s。這個優(yōu)化其實是非常顯著的,對用戶來說,只要升級到 Flink 1.13 就可以獲得收益,不需要做任何額外的配置。2.2 Sort-Merge Shuffle
另外一個優(yōu)化,是針對批的作業(yè)在數(shù)據(jù) shuffle 方面做的優(yōu)化。一般情況下,批的作業(yè)是在上游跑完之后,會先把結(jié)果寫到一個中間文件里,然后下游再從中間文件里拉取數(shù)據(jù)進(jìn)行處理。這種方式的好處就是可以節(jié)省資源,不需要上游和下游同時起來,在失敗的情況下,也不需要從頭執(zhí)行。這是批處理的常用執(zhí)行方式。■ Hash Shuffle
那么,shuffle 過程中,中間結(jié)果是如何保存到中間文件,下游再拉取的? 之前 Flink 引入的是 Hash shuffle,再以 All-to-all 的邊舉例,上游 task 產(chǎn)生的數(shù)據(jù)集,會給下游的每個 task 寫一個單獨的文件,這樣系統(tǒng)可能會產(chǎn)生大量的小文件。并且不管是使用文件 IO 還是 mmap 的方式,寫每個文件都至少使用一塊緩沖區(qū),會造成內(nèi)存浪費。下游 task 隨機(jī)讀取的上游數(shù)據(jù)文件,也會產(chǎn)生大量隨機(jī) IO。所以,之前 Flink 的 Hash shuffle 應(yīng)用在批處理中,只能在規(guī)模比較小或者在用 SSD 的時候,在生產(chǎn)上才能比較 work。在規(guī)模比較大或者 SATA 盤上是有較大的問題的。■ Sort Shuffle
所以,在 Flink 1.12 和 Flink 1.13 中,經(jīng)過兩個版本,引入了一種新的基于 Sort Merge 的 shuffle。這個 Sort 并不是指對數(shù)據(jù)進(jìn)行 Sort,而是對下游所寫的 task 目標(biāo)進(jìn)行 Sort。大致的原理是,上游在輸出數(shù)據(jù)時,會使用一個固定大小的緩沖區(qū),避免緩沖區(qū)的大小隨著規(guī)模的增大而增大,所有的數(shù)據(jù)都寫到緩沖區(qū)里,當(dāng)緩沖區(qū)滿時,會做一次排序并寫到一個單獨文件里,后面的數(shù)據(jù)還是基于此緩存區(qū)繼續(xù)寫,續(xù)寫的一段會拼到原來的文件后面。最后單個的上游任務(wù)會產(chǎn)生一個中間文件,由很多段組成,每個段都是有序的結(jié)構(gòu)。和其他的批處理的框架不太一樣,這邊并不是基于普通的外排序。一般的外排序是指會把這些段再做一次單獨的 merge,形成一個整體有序的文件,這樣下游來讀的時候會有更好的 IO 連續(xù)性,防止每一段每一個 task 要讀取的數(shù)據(jù)段都很小。但是,這種 merge 本身也是要消耗大量的 IO 資源的,有可能 merge 的時間帶來的開銷會遠(yuǎn)超過下游順序讀帶來的收益。所以,這里采用了另外一種方式:在下游來請求數(shù)據(jù)的時候,比如下圖中的 3 個下游都要來讀上游的中間文件,會有一個調(diào)度器對下游請求要讀取的文件位置做一個排序,通過在上層增加 IO 調(diào)度的方式,來實現(xiàn)整個文件 IO 讀取的連續(xù)性,防止在 SATA 盤上產(chǎn)生大量的隨機(jī) IO。在 SATA 盤上,相對于 Hash shuffle,Sort shuffle 的 IO 性能可以提高 2~8 倍。通過 Sort shuffle,使得 Flink 批處理基本達(dá)到了生產(chǎn)可用的狀態(tài),在 SATA 盤上 IO 性能可以把磁盤打到 100 多M,而 SATA 盤最高也就能達(dá)到 200M 的讀寫速度。為了保持兼容性,Sort shuffle 并不是默認(rèn)啟用的,用戶可以控制下游并發(fā)達(dá)到多少來啟用 Sort Merge Shuffle。并且可以通過啟用壓縮來進(jìn)一步提高批處理的性能。Sort Merge shuffle 并沒有額外占用內(nèi)存,現(xiàn)在占用的上游讀寫的緩存區(qū),是從 framework.off-heap 中抽出的一塊。
三. DataStream API 優(yōu)化
3.1 2PC & 端到端一致性
為了保證端到端的一致性,對于 Flink 流作業(yè)來說,是通過兩階段提交的機(jī)制來實現(xiàn)的,結(jié)合了 Flink 的 checkpoint、failover 機(jī)制和外部系統(tǒng)的一些特性。大概的邏輯是,當(dāng)我想做端到端的一致性,比如讀取 Kafka 再寫到 Kafka,在正常處理時會把數(shù)據(jù)先寫到一個 Kafka 的事務(wù)里,當(dāng)做 checkpoint 時進(jìn)行 preCommit,這樣數(shù)據(jù)就不會再丟了。如果 checkpoint 成功的話,會進(jìn)行一次正式的 commit。這樣就保證了外部系統(tǒng)的事務(wù)和 Flink 內(nèi)部的 failover 是一致的,比如 Flink 發(fā)生了 failover 需要回滾到上一個 checkpoint , 外部系統(tǒng)中跟這一部分對應(yīng)的事務(wù)也會被 abort 掉,如果 checkpoint 成功了,外部事務(wù)的 commit 也會成功。Flink 端到端的一致性依賴于 checkpoint 機(jī)制。但是,在遇到有限流時,就會有一些問題:具有有限流的作業(yè),task 結(jié)束之后,F(xiàn)link 是不支持做 checkpoint 的,比如流批混合的作業(yè),其中有一部分會結(jié)束,之后 Flink 就沒辦法再做 checkpoint,數(shù)據(jù)也就不會再提交了。
在有限流數(shù)據(jù)結(jié)束時,因為 checkpoint 是定時執(zhí)行的,不能保證最后一個 checkpoint 一定能在處理完所有數(shù)據(jù)后執(zhí)行,可能導(dǎo)致最后一部分?jǐn)?shù)據(jù)無法提交。
以上就會導(dǎo)致在流模式下,有限流作業(yè)流/批執(zhí)行模式結(jié)果不一致。 3.2 支持部分 Task 結(jié)束后的 Checkpoint (進(jìn)行中)
從 Flink 1.13 開始,支持在一部分 task 結(jié)束之后,也能做 checkpoint。checkpoint 實際上是維護(hù)了每個算子的所有 task 的狀態(tài)列表。在有一部分 task 結(jié)束之后,如下圖的虛線部分。Flink 會把結(jié)束的 task 分為兩種:如果一個算子的所有 subtask 都已經(jīng)結(jié)束了,就會為這個算子存一個 finished 標(biāo)記。
如果一個算子只有部分 task 結(jié)束,就只存儲未結(jié)束的 task 狀態(tài)。
基于這個 checkpoint ,當(dāng) failover 之后還是會拉起所有算子,如果識別到算子的上一次執(zhí)行已經(jīng)結(jié)束,即 finsihed = true,就會跳過這個算子的執(zhí)行。尤其是針對 Source 算子來說,如果已經(jīng)結(jié)束,后面就不會再重新執(zhí)行發(fā)送數(shù)據(jù)了。通過上述方式就可以保證整個狀態(tài)的一致性,即使有一部分 task 結(jié)束,還是照樣走 checkpoint。
Flink 也重新整理了結(jié)束語義?,F(xiàn)在 Flink 作業(yè)結(jié)束有幾種可能:作業(yè)結(jié)束:數(shù)據(jù)是有限的,有限流作業(yè)正常結(jié)束;
stop-with-savepoint ,采一個 savepoint 結(jié)束;
stop-with-savepoint --drain ,采一個 savepoint 結(jié)束,并會將 watermark 推進(jìn)到正無窮大。
之前這邊是兩種不同的實現(xiàn)邏輯,并且都有最后一部分?jǐn)?shù)據(jù)無法提交的問題。
在 Flink 的整個目標(biāo)里,其中有一點是期望做一個對有限數(shù)據(jù)集和無限數(shù)據(jù)集高效處理的統(tǒng)一平臺。目前基本上已經(jīng)有了一個初步的雛形,不管是在 API 方面,還是在 runtime 方面。下面來舉個例子說明流批一體的好處。
針對用戶的回流作業(yè),平時是處理無限流的作業(yè),如果某一天想改個邏輯,用 stop-with-savepoint 方式把流停掉,但是這個變更邏輯還需要追回到前兩個月之內(nèi)的數(shù)據(jù)來保證結(jié)果的一致性。此時,就可以啟一個批的作業(yè):作業(yè)不加修改,跑到提前緩存下來的輸入數(shù)據(jù)上,用批的模式可以盡快地訂正前兩個月的數(shù)據(jù)。另外,基于新的邏輯,使用前面保存的 savepoint,可以重啟一個新的流作業(yè)。可以看出,在上述整個流程中,如果是之前流批分開的情況,是需要單獨開發(fā)作業(yè)進(jìn)行數(shù)據(jù)訂正的。但在流批一體的情況下,可以基于流的作業(yè)自然的進(jìn)行數(shù)據(jù)訂正,不需要用戶再做額外的開發(fā)。在 Flink 后續(xù)的版本中,還會進(jìn)一步考慮更多流批結(jié)合的場景,比如用戶先做一個批的處理,對狀態(tài)進(jìn)行初始化之后,再切到無限流上的場景。當(dāng)然,在流和批單獨的功能上,也會做進(jìn)一步的優(yōu)化和完善,使得 Flink 在流批方面都是具有競爭力的計算框架。
更多 Flink 相關(guān)技術(shù)交流,可掃碼加入社區(qū)釘釘大群~
▼ 關(guān)注「Flink 中文社區(qū)」,獲取更多技術(shù)干貨 ▼
戳我,立即報名!