Flink 原理 | Flink 執(zhí)行引擎:流批一體的融合之路
背景 流批一體的分層架構(gòu) 流批一體DataStream 流批一體DAG Scheduler 流批一體的Shuffle架構(gòu) 流批一體的容錯策略 未來展望
一、背景


人力成本比較高。由于流和批是兩套系統(tǒng),相同的邏輯需要兩個團(tuán)隊開發(fā)兩遍。 數(shù)據(jù)鏈路冗余。在很多的場景下,流和批計算內(nèi)容其實是一致,但是由于是兩套系統(tǒng),所以相同邏輯還是需要運(yùn)行兩遍,產(chǎn)生一定的資源浪費(fèi)。 數(shù)據(jù)口徑不一致。這個是用戶遇到的最重要的問題。兩套系統(tǒng)、兩套算子,兩套 UDF,一定會產(chǎn)生不同程度的誤差,這些誤差給業(yè)務(wù)方帶來了非常大的困擾。這些誤差不是簡單依靠人力或者資源的投入就可以解決的。


二、流批一體的分層架構(gòu)

SDK 層。Flink 的 SDK 主要有兩類,第一類是關(guān)系型 Relational SDK 也就是 SQL/Table,第二類是物理型 Physical SDK 也就是 DataStream。這兩類 SDK 都是流批統(tǒng)一,即不管是 SQL 還是 DataStream,用戶的業(yè)務(wù)邏輯只要開發(fā)一遍,就可以同時在流和批的兩種場景下使用; 執(zhí)行引擎層。執(zhí)行引擎提供了統(tǒng)一的 DAG,用來描述數(shù)據(jù)處理流程 Data Processing Pipeline(Logical Plan)。不管是流任務(wù)還是批任務(wù),用戶的業(yè)務(wù)邏輯在執(zhí)行前,都會先轉(zhuǎn)化為此 DAG 圖。執(zhí)行引擎通過 Unified DAG Scheduler 把這個邏輯 DAG 轉(zhuǎn)化成在分布式環(huán)境下執(zhí)行的Task。Task 之間通過 Shuffle 傳輸數(shù)據(jù),我們通過 Pluggable Unified Shuffle 架構(gòu),同時支持流批兩種 Shuffle 方式; 狀態(tài)存儲。狀態(tài)存儲層負(fù)責(zé)存儲算子的狀態(tài)執(zhí)行狀態(tài)。針對流作業(yè)有開源 RocksdbStatebackend、MemoryStatebackend,也有商業(yè)化的版本的GemniStateBackend;針對批作業(yè)我們在社區(qū)版本引入了 BatchStateBackend。
流批一體的 DataStream 介紹了如何通過流批一體的 DataStream 來解決 Flink SDK 當(dāng)前面臨的挑戰(zhàn); 流批一體的 DAG Scheduler 介紹了如何通過統(tǒng)一的 Pipeline Region 機(jī)制充分挖掘流式引擎的性能優(yōu)勢;如何通過動態(tài)調(diào)整執(zhí)行計劃的方式來改善引擎的易用性,提高系統(tǒng)的資源利用率; 流批一體的 Shuffle 架構(gòu)介紹如何通過一套統(tǒng)一的 Shuffle 架構(gòu)既可以滿足不同 Shuffle 在策略上的定制化需求,同時還能避免在共性需求上的重復(fù)開發(fā); 流批一體的容錯策略介紹了如何通過統(tǒng)一的容錯策略既滿足批場景下容錯又可以提升流場景下的容錯效果。
三、流批一體 DataStream
SDK 分析以及面臨的挑戰(zhàn)

Table/SQL 是一種 Relational 的高級 SDK,主要用在一些數(shù)據(jù)分析的場景中,既可以支持 Bounded 也可以支持 Unbounded 的輸入。由于 Table/SQL 是 Declarative 的,所以系統(tǒng)可以幫助用戶進(jìn)行很多優(yōu)化,例如根據(jù)用戶提供的Schema,可以進(jìn)行 Filter Push Down 謂詞下推、按需反序列二進(jìn)制數(shù)據(jù)等優(yōu)化。目前 Table/SQL 可以支持 Batch 和 Streaming 兩種執(zhí)行模式。[1] DataStream 屬于一種 Physical SDK。Relatinal SDK 功能雖然強(qiáng)大,但也存在一些局限:不支持對 State、Timer 的操作;由于 Optimizer 的升級,可能導(dǎo)致用相同的 SQL 在兩個版本中出現(xiàn)物理執(zhí)行計劃不兼容的情況。而 DataStream SDK,既可以支持 State、Timer 維度 Low Level 的操作,同時由于 DataStream 是一種 Imperative SDK,所以對物理執(zhí)行計劃有很好的“掌控力”,從而也不存在版本升級導(dǎo)致的不兼容。DataStream 目前在社區(qū)仍有很大用戶群,例如目前未 Closed 的 DataStream issue 依然有近 500 個左右。雖然 DataStream 即可以支持 Bounded 又可以支持 Unbounded Input 用 DataStream 寫的 Application,但是在 Flink-1.12 之前只支持 Streaming 的執(zhí)行模式。 DataSet 是一種僅支持 Bounded 輸入的 Physical SDK,會根據(jù) Bounded 的特性對某些算子進(jìn)行做一定的優(yōu)化,但是不支持 EventTime 和 State 等操作。雖然 DataSet 是 Flink 提供最早的一種 SDK,但是隨著實時化和數(shù)據(jù)分析場景的不斷發(fā)展,相比于 DataStream 和 SQL,DataSet 在社區(qū)的影響力在逐步下降。
利用已有 Physical SDK 無法寫出一個真正生產(chǎn)可以用的流批一體的 Application。例如用戶寫一個程序用來處理 Kafka 中的實時數(shù)據(jù),那么利用相同的程序來處理存儲在 OSS/S3/HDFS 上的歷史數(shù)據(jù)也是非常自然的事情。但是目前不管是 DataSet 還是 DataStream 都無法滿足用戶這個“簡單”的訴求。大家可能覺得奇怪,DataStream 不是既支持 Bounded 的 Input 又支持 Unbounded 的 Input,為什么還會有問題呢?其實“魔鬼藏在細(xì)節(jié)中”,我會在 Unified DataStream 這一節(jié)中會做進(jìn)一的闡述。 學(xué)習(xí)和理解的成本比較高。隨著 Flink 不斷壯大,越來越多的新用戶加入 Flink 社區(qū),但是對于這些新用戶來說就要學(xué)習(xí)兩種 Physical SDK。和其他引擎相比,用戶入門的學(xué)習(xí)成本是相對比較高的;兩種 SDK 在語義上有不同的地方,例如 DataStream 上有 Watermark、EventTime,而 DataSet 卻沒有,對于用戶來說,理解兩套機(jī)制的門檻也不??;由于這兩 SDK 還不兼容,一個新用戶一旦選擇錯誤,將會面臨很大的切換成本。
Unified Physical SDK

為什么選擇 DataStream 作為 Unified Physical SDK?
Unified DataStream 比“老”的 DataStream 提供了哪些能力讓用戶可以寫出一個真正生產(chǎn)可以用的流批一體 Application?
為什么不是 Unified DataSet
用戶收益。在前邊已經(jīng)分析過,隨著 Flink 社區(qū)的發(fā)展,目前 DataSet 在社區(qū)的影響力逐漸下降。如果選擇使用 DataSet 作為 Unified Physical SDK,那么用戶之前在 DataStream 大量“投資”就會作廢。而選擇 DataStream,可以讓許多用戶的已有 DataStream “投資”得到額外的回報; 開發(fā)成本。DataSet 過于古老,缺乏大量對于現(xiàn)代實時計算引擎基本概念的支持,例如 EventTime、Watermark、State、Unbounded Source 等。另外一個更深層的原因是現(xiàn)有 DataSet 算子的實現(xiàn),在流的場景完全無法復(fù)用,例如 Join 等。而對于 DataStream 則不然,可以進(jìn)行大量的復(fù)用。那么如何在流批兩種場景下復(fù)用 DataStream 的算子呢?
Unified DataStream
效率

一致性
資源消耗大: 使用 Streaming 方式,需要同時拿到所有的資源。在某些情況下,用戶可能沒有這么多資源; 容錯成本高: 在 Bounded 場景下,為了效率一些算子可能無法支持 Snapshot 操作,一旦出錯可能需要重新執(zhí)行整個作業(yè)。
What to commit? How to commit? Where to commit? When to commit?
四、流批一體 DAG Scheduler
Unified DAG Scheduler 要解決什么問題
一種是流的調(diào)度模式,在這種模式下,Scheduler 會申請到一個作業(yè)所需要的全部資源,然后同時調(diào)度這個作業(yè)的全部 Task,所有的 Task 之間采取 Pipeline 的方式進(jìn)行通信。批作業(yè)也可以采取這種方式,并且在性能上也會有很大的提升。但是對于運(yùn)行比較長的 Batch 作業(yè)來說來說,這種模式還是存在一定的問題:規(guī)模比較大的情況下,同時消耗的資源比較多,對于某些用戶來說,他可能沒有這么多的資源;容錯代價比較高,例如一旦發(fā)生錯誤,整個作業(yè)都需要重新運(yùn)行。 一種是批的調(diào)度模式。這種模式和傳統(tǒng)的批引擎類似,所有 Task 都是可以獨(dú)立申請資源,Task 之間都是通過 Batch Shuffle 進(jìn)行通訊。這種方式的好處是容錯代價比較小。但是這種運(yùn)行方式也存在一些短板。例如,Task 之間的數(shù)據(jù)都是通過磁盤來進(jìn)行交互,引發(fā)了大量的磁盤 IO。
架構(gòu)不一致、維護(hù)成本高。調(diào)度的本質(zhì)就是進(jìn)行資源的分配,換句話說就是要解決 When to deploy which tasks to where 的問題。原有兩種調(diào)度模式,在資源分配的時機(jī)和粒度上都有一定的差異,最終導(dǎo)致了調(diào)度架構(gòu)上無法完全統(tǒng)一,需要開發(fā)人員維護(hù)兩套邏輯。例如,流的調(diào)度模式,資源分配的粒度是整個物理執(zhí)行計劃的全部 Task;批的調(diào)度模式,資源分配的粒度是單個任務(wù),當(dāng) Scheduler 拿到一個資源的時候,就需要根據(jù)作業(yè)類型走兩套不同的處理邏輯; 性能。傳統(tǒng)的批調(diào)度方式,雖然容錯代價比較小,但是引入大量的磁盤 I/O,并且性能也不是最佳,無法發(fā)揮出 Flink 流式引擎的優(yōu)勢。實際上在資源相對充足的場景下,可以采取“流”的調(diào)度方式來運(yùn)行 Batch 作業(yè),從而避免額外的磁盤 I/O,提高作業(yè)的執(zhí)行效率。尤其是在夜間,流作業(yè)可以釋放出一定資源,這就為批作業(yè)按照“Streaming”的方式運(yùn)行提供了可能。 自適應(yīng)。目前兩種調(diào)度方式的物理執(zhí)行計劃是靜態(tài)的,靜態(tài)生成物理執(zhí)行計劃存在調(diào)優(yōu)人力成本高、資源利用率低等問題。
基于 Pipeline Region 的統(tǒng)一調(diào)度



自適應(yīng)調(diào)度
配置人力成本高。對于批作業(yè)來說,雖然理論上可以根據(jù)統(tǒng)計信息推斷出物理執(zhí)行計劃中每個階段的并發(fā)度,但是由于存在大量的 UDF 或者統(tǒng)計信息的缺失等問題,導(dǎo)致靜態(tài)決策結(jié)果可能會出現(xiàn)嚴(yán)重不準(zhǔn)確的情況;為了保障業(yè)務(wù)作業(yè)的 SLA,在大促期間,業(yè)務(wù)的同學(xué)需要根據(jù)大促的流量估計,手動調(diào)整高優(yōu)批作業(yè)的并發(fā)度,由于業(yè)務(wù)變化快,一旦業(yè)務(wù)邏輯發(fā)生變化,又要不斷的重復(fù)這個過程。整個調(diào)優(yōu)過程都需要業(yè)務(wù)的同學(xué)手動操作,人力成本比較高,即便這樣也可能會出現(xiàn)誤判的情況導(dǎo)致無法滿足用戶 SLA; 資源利用率低。由于人工配置并發(fā)度成本比較高,所以不可能對所有的作業(yè)都手動配置并發(fā)度。對于中低優(yōu)先級的作業(yè),業(yè)務(wù)同學(xué)會選取一些默認(rèn)值作為并發(fā)度,但是在大多數(shù)情況下這些默認(rèn)值都偏大,造成資源的浪費(fèi);而且雖然高優(yōu)先級的作業(yè)可以進(jìn)行手工并發(fā)配置,由于配置方式比較繁瑣,所以大促過后,雖然流量已經(jīng)下降但是業(yè)務(wù)方仍然會使用大促期間的配置,也造成大量的資源浪費(fèi)現(xiàn)象; 穩(wěn)定性差。資源浪費(fèi)的情況最終導(dǎo)致資源的超額申請現(xiàn)象。目前大多數(shù)批作業(yè)都是采取和流作業(yè)集群混跑的方式,具體來說申請的資源都是非保障資源,一旦資源緊張或者出現(xiàn)機(jī)器熱點(diǎn),這些非保障資源都是優(yōu)先被調(diào)整的對象。

五、流批一體的 Shuffle 架構(gòu)

流批 Shuffle 之間的差異
Shuffle 數(shù)據(jù)的生命周期。流作業(yè)的 Shuffle 數(shù)據(jù)和 Task 的生命周期基本是一致的;而批作業(yè)的 Shuffle 數(shù)據(jù)和 Task 生命周期是解耦的; Shuffle 數(shù)據(jù)的存儲介質(zhì)。因為流作業(yè)的 Shuffle 數(shù)據(jù)生命周期比較短,所以可以把流作業(yè)的 Shuffle 數(shù)據(jù)存儲在內(nèi)存中;而批作業(yè)的 Shuffle 數(shù)據(jù)生命周期有一定的不確定性,所以需要把批作業(yè)的 Shuffle 數(shù)據(jù)存儲在磁盤中; Shuffle 部署方式[7]。把 Shuffle 服務(wù)和計算節(jié)點(diǎn)部署在一起,對流作業(yè)來說這種部署方式是有優(yōu)勢的,因為這樣會減少不必要網(wǎng)絡(luò)開銷,從而減少 Latency。但對于批作業(yè)來說,這種部署方式在資源利用率、性能、穩(wěn)定性上都存在一定的問題。[8]
流批 Shuffle 之間的共性
數(shù)據(jù)的 Meta 管理。所謂 Shuffle Meta 是指邏輯數(shù)據(jù)劃分到數(shù)據(jù)物理位置的映射。不管是流還是批的場景,在正常情況下都需要從 Meta 中找出自己的讀取或者寫入數(shù)據(jù)的物理位置;在異常情況下,為了減少容錯代價,通常也會對 Shuffle Meta 數(shù)據(jù)進(jìn)行持久化; 數(shù)據(jù)傳輸。從邏輯上講,流作業(yè)和批作業(yè)的 Shuffle 都是為了對數(shù)據(jù)進(jìn)行重新劃分(re-partition/re-distribution)。在分布式系統(tǒng)中,對數(shù)據(jù)的重新劃分都涉及到跨線程、進(jìn)程、機(jī)器的數(shù)據(jù)傳輸。
流批一體的 Shuffle 架構(gòu)

Shuffle Master 資源申請和資源釋放。也就是說插件需要通知框架 How to request/release resource。而由 Flink 來決定 When to call it; Shuffle Writer 上游的算子利用 Writer 把數(shù)據(jù)寫入 Shuffle Service——Streaming Shuffle 會把數(shù)據(jù)寫入內(nèi)存;External/Remote Batch Shuffle 可以把數(shù)據(jù)寫入到外部存儲中; Shuffle Reader 下游的算子可以通過 Reader 讀取 Shuffle 數(shù)據(jù);
六、流批一體的容錯策略
Pipeline Region Failover

JM Failover

七、未來展望
戳我,查看更多技術(shù)干貨!評論
圖片
表情
