<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink 執(zhí)行引擎:流批一體的融合之路

          共 12706字,需瀏覽 26分鐘

           ·

          2021-04-09 09:59


          摘要:本文由 Apache Flink Committer 馬國維分享,主要介紹 Flink 作為大數(shù)據(jù)計(jì)算引擎的流批一體融合之路。內(nèi)容包括:

          1. 背景
          2. 流批一體的分層架構(gòu)
          3. 流批一體DataStream
          4. 流批一體DAG Scheduler
          5. 流批一體的Shuffle架構(gòu)
          6. 流批一體的容錯(cuò)策略
          7. 未來展望


          Tips:點(diǎn)擊文末閱讀原文可查看更多技術(shù)干貨~
           

          一、背景



          隨著互聯(lián)網(wǎng)和移動(dòng)互聯(lián)網(wǎng)的不斷發(fā)展,各行各業(yè)都積累海量的業(yè)務(wù)數(shù)據(jù)。而企業(yè)為了改善用戶體驗(yàn),提升產(chǎn)品在市場上的競爭力,都采取了實(shí)時(shí)化方式來處理大數(shù)據(jù)。社交媒體的實(shí)時(shí)大屏、電商的實(shí)時(shí)推薦、城市大腦的實(shí)時(shí)交通預(yù)測(cè)、金融行業(yè)的實(shí)時(shí)反欺詐,這些產(chǎn)品的成功都在說明大數(shù)據(jù)處理的實(shí)時(shí)化已經(jīng)成為一個(gè)勢(shì)不可擋的潮流。


          在實(shí)時(shí)化的大趨勢(shì)下,F(xiàn)link 已經(jīng)成為實(shí)時(shí)計(jì)算行業(yè)的事實(shí)標(biāo)準(zhǔn)。我們看到,不光是阿里巴巴,國內(nèi)外各個(gè)領(lǐng)域的頭部廠商,都把 Flink 做為實(shí)時(shí)計(jì)算的技術(shù)底座,國內(nèi)有字節(jié)跳動(dòng)、騰訊、華為,國外有 Netflix、Uber 等等。

          而業(yè)務(wù)實(shí)時(shí)化只是一個(gè)起點(diǎn),F(xiàn)link 的目標(biāo)之一就是給用戶提供實(shí)時(shí)離線一體化的用戶體驗(yàn)。其實(shí)很多用戶不僅需要實(shí)時(shí)的數(shù)據(jù)統(tǒng)計(jì),為了確認(rèn)運(yùn)營或產(chǎn)品的策略的效果,用戶同時(shí)還需要和歷史(昨天,甚至是去年的同期)數(shù)據(jù)比較。而從用戶的角度來看,原有的流、批獨(dú)立方案存在一些痛點(diǎn):


          • 人力成本比較高。由于流和批是兩套系統(tǒng),相同的邏輯需要兩個(gè)團(tuán)隊(duì)開發(fā)兩遍。
          • 數(shù)據(jù)鏈路冗余。在很多的場景下,流和批計(jì)算內(nèi)容其實(shí)是一致,但是由于是兩套系統(tǒng),所以相同邏輯還是需要運(yùn)行兩遍,產(chǎn)生一定的資源浪費(fèi)。
          • 數(shù)據(jù)口徑不一致。這個(gè)是用戶遇到的最重要的問題。兩套系統(tǒng)、兩套算子,兩套 UDF,一定會(huì)產(chǎn)生不同程度的誤差,這些誤差給業(yè)務(wù)方帶來了非常大的困擾。這些誤差不是簡單依靠人力或者資源的投入就可以解決的。




          2020 年的雙十一,在實(shí)時(shí)洪峰到達(dá) 40 億的歷史新高的同時(shí),F(xiàn)link 團(tuán)隊(duì)與 DT 團(tuán)隊(duì)一起推出了基于 Flink 的全鏈路流批一體的數(shù)倉架構(gòu),很好解決了 Lambda 的架構(gòu)所帶來的一系列問題:流批作業(yè)使用同一 SQL,使研發(fā)效率提升了 3~4 倍;一套引擎確保了數(shù)據(jù)口徑天然一致;流批作業(yè)在同一集群運(yùn)行,削峰填谷大幅提升了資源效率。

          Flink 流批一體的成功,離不開 Flink 開源社區(qū)的健康蓬勃發(fā)展。從 Apache 軟件基金會(huì) 2020 年度報(bào)告可以看出,在反映開源社區(qū)繁榮情況的三個(gè)關(guān)鍵指標(biāo)上 Flink 都名列前茅:用戶郵件列表活躍度,F(xiàn)link 排名第一;開發(fā)者提交次數(shù) Flink 排名第二,Github 用戶訪問量排名第二。這些數(shù)據(jù)并不局限于大數(shù)據(jù)領(lǐng)域,而是 Apache 開源基金會(huì)下屬的所有項(xiàng)目。


          2020 年也是 Blink 反哺社區(qū)的第二年,這兩年我們把 Blink 在集團(tuán)內(nèi)積累的經(jīng)驗(yàn)逐步貢獻(xiàn)回社區(qū),讓 Flink 成為真正意義上的流批一體平臺(tái)。我希望通過這篇文章給大家分享下這兩年 Flink 在執(zhí)行引擎流批融合方面做了哪些工作。同時(shí)也希望 Flink 的老用戶和新朋友可以進(jìn)一步了解 Flink 流批一體架構(gòu)的“前世今生”。
           

          二、流批一體的分層架構(gòu)





          總體來說,F(xiàn)link 的核心引擎主要分為如下三層:


          • SDK 層。Flink 的 SDK 主要有兩類,第一類是關(guān)系型 Relational SDK 也就是  SQL/Table,第二類是物理型 Physical SDK 也就是 DataStream。這兩類 SDK 都是流批統(tǒng)一,即不管是 SQL 還是 DataStream,用戶的業(yè)務(wù)邏輯只要開發(fā)一遍,就可以同時(shí)在流和批的兩種場景下使用;
          • 執(zhí)行引擎層。執(zhí)行引擎提供了統(tǒng)一的 DAG,用來描述數(shù)據(jù)處理流程 Data Processing Pipeline(Logical Plan)。不管是流任務(wù)還是批任務(wù),用戶的業(yè)務(wù)邏輯在執(zhí)行前,都會(huì)先轉(zhuǎn)化為此 DAG 圖。執(zhí)行引擎通過 Unified DAG Scheduler 把這個(gè)邏輯 DAG 轉(zhuǎn)化成在分布式環(huán)境下執(zhí)行的Task。Task 之間通過 Shuffle 傳輸數(shù)據(jù),我們通過 Pluggable Unified Shuffle 架構(gòu),同時(shí)支持流批兩種 Shuffle 方式;
          • 狀態(tài)存儲(chǔ)。狀態(tài)存儲(chǔ)層負(fù)責(zé)存儲(chǔ)算子的狀態(tài)執(zhí)行狀態(tài)。針對(duì)流作業(yè)有開源  RocksdbStatebackend、MemoryStatebackend,也有商業(yè)化的版本的GemniStateBackend;針對(duì)批作業(yè)我們?cè)谏鐓^(qū)版本引入了 BatchStateBackend。


           
          本文主要分享如下幾個(gè)方面的內(nèi)容:


          1. 流批一體的 DataStream 介紹了如何通過流批一體的 DataStream 來解決 Flink SDK 當(dāng)前面臨的挑戰(zhàn);
          2. 流批一體的 DAG Scheduler 介紹了如何通過統(tǒng)一的 Pipeline Region 機(jī)制充分挖掘流式引擎的性能優(yōu)勢(shì);如何通過動(dòng)態(tài)調(diào)整執(zhí)行計(jì)劃的方式來改善引擎的易用性,提高系統(tǒng)的資源利用率;
          3. 流批一體的 Shuffle 架構(gòu)介紹如何通過一套統(tǒng)一的 Shuffle 架構(gòu)既可以滿足不同  Shuffle 在策略上的定制化需求,同時(shí)還能避免在共性需求上的重復(fù)開發(fā);
          4. 流批一體的容錯(cuò)策略介紹了如何通過統(tǒng)一的容錯(cuò)策略既滿足批場景下容錯(cuò)又可以提升流場景下的容錯(cuò)效果。


           

          三、流批一體 DataStream



          SDK 分析以及面臨的挑戰(zhàn)




          如上圖所示,目前 Flink 提供的 SDK 主要有三類:


          1. Table/SQL 是一種 Relational 的高級(jí) SDK,主要用在一些數(shù)據(jù)分析的場景中,既可以支持 Bounded 也可以支持 Unbounded 的輸入。由于 Table/SQL 是  Declarative 的,所以系統(tǒng)可以幫助用戶進(jìn)行很多優(yōu)化,例如根據(jù)用戶提供的Schema,可以進(jìn)行 Filter Push Down 謂詞下推、按需反序列二進(jìn)制數(shù)據(jù)等優(yōu)化。目前 Table/SQL 可以支持 Batch 和 Streaming 兩種執(zhí)行模式。[1]
          2. DataStream 屬于一種 Physical SDK。Relatinal SDK 功能雖然強(qiáng)大,但也存在一些局限:不支持對(duì) State、Timer 的操作;由于 Optimizer 的升級(jí),可能導(dǎo)致用相同的 SQL 在兩個(gè)版本中出現(xiàn)物理執(zhí)行計(jì)劃不兼容的情況。而 DataStream SDK,既可以支持 State、Timer 維度 Low Level 的操作,同時(shí)由于 DataStream 是一種  Imperative SDK,所以對(duì)物理執(zhí)行計(jì)劃有很好的“掌控力”,從而也不存在版本升級(jí)導(dǎo)致的不兼容。DataStream 目前在社區(qū)仍有很大用戶群,例如目前未 Closed 的 DataStream issue 依然有近 500 個(gè)左右。雖然 DataStream 即可以支持 Bounded  又可以支持 Unbounded Input 用 DataStream 寫的 Application,但是在 Flink-1.12 之前只支持 Streaming 的執(zhí)行模式。
          3. DataSet 是一種僅支持 Bounded 輸入的 Physical SDK,會(huì)根據(jù) Bounded 的特性對(duì)某些算子進(jìn)行做一定的優(yōu)化,但是不支持 EventTime 和 State 等操作。雖然  DataSet 是 Flink 提供最早的一種 SDK,但是隨著實(shí)時(shí)化和數(shù)據(jù)分析場景的不斷發(fā)展,相比于 DataStream 和 SQL,DataSet 在社區(qū)的影響力在逐步下降。


           
          目前 Table/SQL 對(duì)于流批統(tǒng)一的場景支持已經(jīng)比較成熟,但是對(duì)于 Phyiscal SDK 來說還面臨的一些挑戰(zhàn),主要有兩個(gè)方面:


          1. 利用已有 Physical SDK 無法寫出一個(gè)真正生產(chǎn)可以用的流批一體的 Application。例如用戶寫一個(gè)程序用來處理 Kafka 中的實(shí)時(shí)數(shù)據(jù),那么利用相同的程序來處理存儲(chǔ)在 OSS/S3/HDFS 上的歷史數(shù)據(jù)也是非常自然的事情。但是目前不管是 DataSet 還是 DataStream 都無法滿足用戶這個(gè)“簡單”的訴求。大家可能覺得奇怪,DataStream 不是既支持 Bounded 的 Input 又支持 Unbounded 的 Input,為什么還會(huì)有問題呢?其實(shí)“魔鬼藏在細(xì)節(jié)中”,我會(huì)在 Unified DataStream 這一節(jié)中會(huì)做進(jìn)一的闡述。
          2. 學(xué)習(xí)和理解的成本比較高。隨著 Flink 不斷壯大,越來越多的新用戶加入 Flink 社區(qū),但是對(duì)于這些新用戶來說就要學(xué)習(xí)兩種 Physical SDK。和其他引擎相比,用戶入門的學(xué)習(xí)成本是相對(duì)比較高的;兩種 SDK 在語義上有不同的地方,例如 DataStream 上有 Watermark、EventTime,而 DataSet 卻沒有,對(duì)于用戶來說,理解兩套機(jī)制的門檻也不小;由于這兩 SDK 還不兼容,一個(gè)新用戶一旦選擇錯(cuò)誤,將會(huì)面臨很大的切換成本。


           


          Unified Physical SDK


           

          為了解決上述 Physical SDK 所面臨的挑戰(zhàn),我們把 Unified DataStream SDK 作為  Flink 統(tǒng)一的 Physical SDK。這個(gè)部分主要解決兩個(gè)問題:


          1. 為什么選擇 DataStream 作為 Unified Physical SDK?

          2. Unified DataStream 比“老”的 DataStream 提供了哪些能力讓用戶可以寫出一個(gè)真正生產(chǎn)可以用的流批一體 Application?




          為什么不是 Unified DataSet



          為了解決學(xué)習(xí)和理解成本比較高的問題,最自然最簡單的方案就是從 DataStream 和  DataSet 中選擇一個(gè)作為 Flink 的唯一的 Physical SDK。那為什么我們選擇了  DataStream 而不是 DataSet 呢?主要有兩個(gè)原因:


          1. 用戶收益。在前邊已經(jīng)分析過,隨著 Flink 社區(qū)的發(fā)展,目前 DataSet 在社區(qū)的影響力逐漸下降。如果選擇使用 DataSet 作為 Unified Physical SDK,那么用戶之前在 DataStream 大量“投資”就會(huì)作廢。而選擇 DataStream,可以讓許多用戶的已有 DataStream “投資”得到額外的回報(bào);
          2. 開發(fā)成本。DataSet 過于古老,缺乏大量對(duì)于現(xiàn)代實(shí)時(shí)計(jì)算引擎基本概念的支持,例如 EventTime、Watermark、State、Unbounded Source 等。另外一個(gè)更深層的原因是現(xiàn)有 DataSet 算子的實(shí)現(xiàn),在流的場景完全無法復(fù)用,例如 Join 等。而對(duì)于 DataStream 則不然,可以進(jìn)行大量的復(fù)用。那么如何在流批兩種場景下復(fù)用  DataStream 的算子呢?


           


          Unified DataStream



          很多對(duì) Flink 有一定了解的用戶可能會(huì)問:DataStream 是同時(shí)支持 Bounded/Unbounded 的輸入,為什么我們會(huì)說:用 DataStream 無法寫出一個(gè)真正生產(chǎn)可以用的流批一體 Application 呢?簡單來說,DataStream 原本主要設(shè)計(jì)目標(biāo)是給 Unbounded 場景使用的,所以導(dǎo)致在 Bounded 的場景下在效率、可用性、易用性方面和傳統(tǒng)的批引擎還有一定距離。具體來說體現(xiàn)在如下兩個(gè)方面:


          • 效率



          先給大家看一個(gè)例子,下邊是一個(gè)跑同樣規(guī)模的 WordCount 的作業(yè),DataStream 和  DataSet 的性能對(duì)比圖。從這個(gè)例子可以看出,DataSet 的性能是 DataStream 將近 5  倍。


          很明顯,要讓 DataStream 在生產(chǎn)中既可以支持流的場景又要支持批的場景,就一定要大幅提高 DataStream 在 Bounded 場景下效率。那么為什么 DataStream 的效率要比  DataSet 的效率低呢?

          前面我們已經(jīng)提到,DataStream 原本主要設(shè)計(jì)目標(biāo)是給  Unbounded 的場景下使用的,而 Unounded 場景下一個(gè)主要的特點(diǎn)就是亂序,也就是說任何一個(gè) DataStream 的算子無法假設(shè)處理的 Record 是按照什么順序進(jìn)行的,所以許多算子會(huì)用一個(gè) K/V 存儲(chǔ)來緩存這些亂序的數(shù)據(jù),等到合適的時(shí)候再從 K/V 存儲(chǔ)中取出這些數(shù)據(jù)進(jìn)行處理并且進(jìn)行輸出。一般情況下,算子對(duì) K/V 存儲(chǔ)訪問涉及大量的序列化和反序列化,同時(shí)也會(huì)引發(fā)隨機(jī)磁盤 I/O;而在 DataSet 中,假設(shè)數(shù)據(jù)是有界的,也就是可以通過優(yōu)化來避免隨機(jī)的磁盤 I/O 訪問,同時(shí)也對(duì)序列化和反序列化做了相關(guān)優(yōu)化。這也是為什么用 DataSet 寫的 WorkerCount 要比用 DataStream 寫的 WordCount  快 5 倍主要原因。 

          知道到了原因,是不是要把所有的 DataStream 的算子,都重寫一遍就可以了呢?理論上沒問題,但是 DataStream 有大量的算子需要重寫,有些算子還比較復(fù)雜,例如與  Window 相關(guān)的一系列算子。可以想象到,如果都全部重寫,工程量是非常之巨大的。所以我們通過單 Key 的 BatchStateBackend 幾乎完全避免了對(duì)所有算子重寫,同時(shí)還得到了非常不錯(cuò)的效果。


          • 一致性



          對(duì)于 Flink 有一定了解的同學(xué)應(yīng)該都知道,原來用 DataStream 寫的 Application 都采用  Streaming 的執(zhí)行模式,在這種模式下是通過 Checkpoint 的方式保持端到端的 Exactly Once 的語義,具體來說一個(gè)作業(yè)的 Sink 只有當(dāng)全圖的所有算子(包括 Sink 自己)都做完各自的 Snapshot 之后,Sink 才會(huì)把數(shù)據(jù) Commit 到外部系統(tǒng)中,這是一個(gè)典型的依賴  Flink Checkpoint 機(jī)制的 2PC 協(xié)議。
           
          而在 Bounded 的場景下雖然也可以采用 Streaming 的方式但是對(duì)于用戶來說可能會(huì)存在一些問題:


          1. 資源消耗大: 使用 Streaming 方式,需要同時(shí)拿到所有的資源。在某些情況下,用戶可能沒有這么多資源;
          2. 容錯(cuò)成本高: 在 Bounded 場景下,為了效率一些算子可能無法支持 Snapshot 操作,一旦出錯(cuò)可能需要重新執(zhí)行整個(gè)作業(yè)。 


           
          所以在 Bounded 場景下,用戶希望 Application 可以采用 Batch 執(zhí)行模式,因?yàn)槔?nbsp; Batch 執(zhí)行的模式可以非常自然的解決上述兩個(gè)問題。在 Bounded 場景下支持 Batch 的執(zhí)行模式是比較簡單的,但是卻引入了一個(gè)非常棘手的問題——利用已有的 Sink API  無法保證端到端的 Exactly Once 語義。這是由于 Bounded 場景下是沒有 Checkpoint  的,而原有 Sink 都是依賴 Checkpoint 保證端到端的 ExactlyOnce 的。同時(shí)我們不希望開發(fā)者針對(duì) Sink 在不同模式下開發(fā)兩套不同的實(shí)現(xiàn),因?yàn)檫@樣非常不利用 Flink 和其他生態(tài)的對(duì)接。

          實(shí)際上,一個(gè) Transactional 的 Sink 主要解決如下 4 個(gè)問題:


          1. What to commit?
          2. How to commit?
          3. Where to commit?
          4. When to commit?


           
          而 Flink 應(yīng)該讓 Sink 開發(fā)者提供 What to commit 和 How to commit,而系統(tǒng)應(yīng)該根據(jù)不同的執(zhí)行模式,選擇 Where to commit 和 When to commit 來保證端到端的 Exactly Once。最終我們提出了一個(gè)全新 Unified Sink API,從而讓開發(fā)者只開發(fā)一套 Sink 就可以同時(shí)運(yùn)行在 Streaming 和 Batch 執(zhí)行模式下。這里介紹的只是主要思路,在有限流的場景下如何保證 End to End 的一致性;如何對(duì)接 Hive、Iceberg 等外部生態(tài),實(shí)際上還是存在一定挑戰(zhàn)。


          四、流批一體 DAG Scheduler



          Unified DAG Scheduler 要解決什么問題



          原來 Flink 有兩種調(diào)度的模式:


          1. 一種是流的調(diào)度模式,在這種模式下,Scheduler 會(huì)申請(qǐng)到一個(gè)作業(yè)所需要的全部資源,然后同時(shí)調(diào)度這個(gè)作業(yè)的全部 Task,所有的 Task 之間采取 Pipeline 的方式進(jìn)行通信。批作業(yè)也可以采取這種方式,并且在性能上也會(huì)有很大的提升。但是對(duì)于運(yùn)行比較長的 Batch 作業(yè)來說來說,這種模式還是存在一定的問題:規(guī)模比較大的情況下,同時(shí)消耗的資源比較多,對(duì)于某些用戶來說,他可能沒有這么多的資源;容錯(cuò)代價(jià)比較高,例如一旦發(fā)生錯(cuò)誤,整個(gè)作業(yè)都需要重新運(yùn)行。
          2. 一種是批的調(diào)度模式。這種模式和傳統(tǒng)的批引擎類似,所有 Task 都是可以獨(dú)立申請(qǐng)資源,Task 之間都是通過 Batch Shuffle 進(jìn)行通訊。這種方式的好處是容錯(cuò)代價(jià)比較小。但是這種運(yùn)行方式也存在一些短板。例如,Task 之間的數(shù)據(jù)都是通過磁盤來進(jìn)行交互,引發(fā)了大量的磁盤 IO。


           
          總的來說,有了這兩種調(diào)度方式是可以基本滿足流批一體的場景需求,但是也存在著很大的改進(jìn)空間,具體來說體現(xiàn)在三個(gè)方面:


          1. 架構(gòu)不一致、維護(hù)成本高。調(diào)度的本質(zhì)就是進(jìn)行資源的分配,換句話說就是要解決  When to deploy which tasks to where 的問題。原有兩種調(diào)度模式,在資源分配的時(shí)機(jī)和粒度上都有一定的差異,最終導(dǎo)致了調(diào)度架構(gòu)上無法完全統(tǒng)一,需要開發(fā)人員維護(hù)兩套邏輯。例如,流的調(diào)度模式,資源分配的粒度是整個(gè)物理執(zhí)行計(jì)劃的全部 Task;批的調(diào)度模式,資源分配的粒度是單個(gè)任務(wù),當(dāng) Scheduler 拿到一個(gè)資源的時(shí)候,就需要根據(jù)作業(yè)類型走兩套不同的處理邏輯;
          2. 性能。傳統(tǒng)的批調(diào)度方式,雖然容錯(cuò)代價(jià)比較小,但是引入大量的磁盤 I/O,并且性能也不是最佳,無法發(fā)揮出 Flink 流式引擎的優(yōu)勢(shì)。實(shí)際上在資源相對(duì)充足的場景下,可以采取“流”的調(diào)度方式來運(yùn)行 Batch 作業(yè),從而避免額外的磁盤 I/O,提高作業(yè)的執(zhí)行效率。尤其是在夜間,流作業(yè)可以釋放出一定資源,這就為批作業(yè)按照“Streaming”的方式運(yùn)行提供了可能。
          3. 自適應(yīng)。目前兩種調(diào)度方式的物理執(zhí)行計(jì)劃是靜態(tài)的,靜態(tài)生成物理執(zhí)行計(jì)劃存在調(diào)優(yōu)人力成本高、資源利用率低等問題。


           


          基于 Pipeline Region 的統(tǒng)一調(diào)度




          為了既能發(fā)揮流引擎的優(yōu)勢(shì),同時(shí)避免全圖同時(shí)調(diào)度存在的一些短板,我們引入  Pipeline Region 的概念。Unified DAG Scheduler 允許在一個(gè) DAG 圖中,Task 之間既可以通過 Pipeline 通訊,也可以通過 Blocking 方式進(jìn)行通訊。這些由 Pipeline 的數(shù)據(jù)交換方式連接的 Task 被稱為一個(gè) Pipeline Region。基于以上概念,F(xiàn)link 引入 Pipeline Region 的概念,不管是流作業(yè)還是批作業(yè),都是按照 Pipeline Region 粒度來申請(qǐng)資源和調(diào)度任務(wù)。細(xì)心的讀者可以發(fā)現(xiàn),其實(shí)原有的兩種模式都是 Pipeline Region 調(diào)度的特例。


          即便可以資源上滿足“流”的調(diào)度模式,那么哪些任務(wù)可以采取“流”的方式調(diào)度呢?

          有同學(xué)還是會(huì)擔(dān)心采取“流”的調(diào)度方式容錯(cuò)代價(jià)會(huì)比較高,因?yàn)樵凇傲鳌钡恼{(diào)度方式下,一個(gè) Task 發(fā)生錯(cuò)誤,和他聯(lián)通的所有 Task 都會(huì) Fail,然后重新運(yùn)行。

          在 Flink 中,不同 Task 之間有兩種連接方式[2],一種是 All-to-All 的連接方式,上游 Task 會(huì)和下游的所有的 Task 進(jìn)行連接;一種是 PointWise 的鏈接方式,上游的 Task 只會(huì)和下游的部分 Task 進(jìn)行連接。

          如果一個(gè)作業(yè)的所有 Task 之間都是通過 All-to-All 方式進(jìn)行連接,一旦采取“流”的調(diào)度方式,那么整個(gè)物理拓?fù)涠夹枰瑫r(shí)被調(diào)度,那么確實(shí)存在  FailOver 代價(jià)比較高的問題[3]。但是在實(shí)際 Batch 作業(yè)的拓?fù)渲校琓ask 之間不都是通過 All-to-All 的邊連接,Batch 作業(yè)中存在的大量 Task 通過 PointWise 的邊連接,通過“流”的方式調(diào)度由 PointWise 連接的 Task 連通圖,在減少作業(yè)的容錯(cuò)成本的同時(shí),可以提高作業(yè)的執(zhí)行效率,如下圖所示在,在全量的 10T TPC-DS 測(cè)試中,開啟所有  PointWise 邊都采用 Pipeline 的鏈接方式就可以讓整性能有 20% 以上的性能提升。


          上述只是 Schduler 提供的劃分 Pipeline Region 的 4 種策略中的一種[4],實(shí)際上  Planner 可以根據(jù)實(shí)際運(yùn)行場景,定制哪些 Task 之間采取 Pipeline 的傳輸方式,哪些  Task 之間采取 Batch 的傳輸方式方式。


          自適應(yīng)調(diào)度


           
          調(diào)度的本質(zhì)是給物理執(zhí)行計(jì)劃進(jìn)行資源分配的決策過程。Pipeline Region 解決了物理執(zhí)行計(jì)劃確定之后,流作業(yè)和批作業(yè)可以統(tǒng)一按照 Pipeline Region 的粒度進(jìn)行調(diào)度。對(duì)于批作業(yè)來說靜態(tài)生成物理執(zhí)行計(jì)劃存在一些問題[5]:


          1. 配置人力成本高。對(duì)于批作業(yè)來說,雖然理論上可以根據(jù)統(tǒng)計(jì)信息推斷出物理執(zhí)行計(jì)劃中每個(gè)階段的并發(fā)度,但是由于存在大量的 UDF 或者統(tǒng)計(jì)信息的缺失等問題,導(dǎo)致靜態(tài)決策結(jié)果可能會(huì)出現(xiàn)嚴(yán)重不準(zhǔn)確的情況;為了保障業(yè)務(wù)作業(yè)的 SLA,在大促期間,業(yè)務(wù)的同學(xué)需要根據(jù)大促的流量估計(jì),手動(dòng)調(diào)整高優(yōu)批作業(yè)的并發(fā)度,由于業(yè)務(wù)變化快,一旦業(yè)務(wù)邏輯發(fā)生變化,又要不斷的重復(fù)這個(gè)過程。整個(gè)調(diào)優(yōu)過程都需要業(yè)務(wù)的同學(xué)手動(dòng)操作,人力成本比較高,即便這樣也可能會(huì)出現(xiàn)誤判的情況導(dǎo)致無法滿足用戶 SLA;
          2. 資源利用率低。由于人工配置并發(fā)度成本比較高,所以不可能對(duì)所有的作業(yè)都手動(dòng)配置并發(fā)度。對(duì)于中低優(yōu)先級(jí)的作業(yè),業(yè)務(wù)同學(xué)會(huì)選取一些默認(rèn)值作為并發(fā)度,但是在大多數(shù)情況下這些默認(rèn)值都偏大,造成資源的浪費(fèi);而且雖然高優(yōu)先級(jí)的作業(yè)可以進(jìn)行手工并發(fā)配置,由于配置方式比較繁瑣,所以大促過后,雖然流量已經(jīng)下降但是業(yè)務(wù)方仍然會(huì)使用大促期間的配置,也造成大量的資源浪費(fèi)現(xiàn)象;
          3. 穩(wěn)定性差。資源浪費(fèi)的情況最終導(dǎo)致資源的超額申請(qǐng)現(xiàn)象。目前大多數(shù)批作業(yè)都是采取和流作業(yè)集群混跑的方式,具體來說申請(qǐng)的資源都是非保障資源,一旦資源緊張或者出現(xiàn)機(jī)器熱點(diǎn),這些非保障資源都是優(yōu)先被調(diào)整的對(duì)象。




          為了解決靜態(tài)生成物理執(zhí)行存在這些問題,我們?yōu)榕鳂I(yè)引入了自適應(yīng)調(diào)度功能[6],和原來的靜態(tài)物理執(zhí)行計(jì)劃相比,利用這個(gè)特性可以大幅提高用戶資源利用率。 Adaptive Scheduler 可以根據(jù)一個(gè) JobVertex 的上游 JobVertex 的執(zhí)行情況,動(dòng)態(tài)決定當(dāng)前 JobVertex 的并發(fā)度。在未來,我們也可以根據(jù)上游 JobVertex 產(chǎn)出的數(shù)據(jù),動(dòng)態(tài)決定下游采用什么樣的算子。




          五、流批一體的 Shuffle 架構(gòu)




          Flink 是一個(gè)流批一體的平臺(tái),因此引擎對(duì)于不同的執(zhí)行模式要分別提供 Streaming 和Batch 兩種類型的 Shuffle。雖然 Streaming Shuffle 和 Batch Shuffle 在具體的策略上存在一定的差異,但是本質(zhì)上都是為了對(duì)數(shù)據(jù)進(jìn)行重新劃分(re-partition),因此不同的  Shuffle 之間還存在一定的共性。所以我們的目標(biāo)是提供一套統(tǒng)一的 Shuffle 架構(gòu),既可以滿足不同 Shuffle 在策略上的定制,同時(shí)還能避免在共性需求上進(jìn)行重復(fù)開發(fā)。

          總體來說,Shuffle 架構(gòu)可以劃分成如下圖所示的四個(gè)層次。流和批的 Shuffle 需求在各層上有一定差異,也有大量的共性,下邊我做了一些簡要分析。



          流批 Shuffle 之間的差異



          大家都知道,批作業(yè)和流作業(yè)對(duì) Shuffle 的需求是有差異的,具體可以體現(xiàn)在如下 3 個(gè)方面:


          1. Shuffle 數(shù)據(jù)的生命周期。流作業(yè)的 Shuffle 數(shù)據(jù)和 Task 的生命周期基本是一致的;而批作業(yè)的 Shuffle 數(shù)據(jù)和 Task 生命周期是解耦的;
          2. Shuffle 數(shù)據(jù)的存儲(chǔ)介質(zhì)。因?yàn)榱髯鳂I(yè)的 Shuffle 數(shù)據(jù)生命周期比較短,所以可以把流作業(yè)的 Shuffle 數(shù)據(jù)存儲(chǔ)在內(nèi)存中;而批作業(yè)的 Shuffle 數(shù)據(jù)生命周期有一定的不確定性,所以需要把批作業(yè)的 Shuffle 數(shù)據(jù)存儲(chǔ)在磁盤中;
          3. Shuffle 部署方式[7]。把 Shuffle 服務(wù)和計(jì)算節(jié)點(diǎn)部署在一起,對(duì)流作業(yè)來說這種部署方式是有優(yōu)勢(shì)的,因?yàn)檫@樣會(huì)減少不必要網(wǎng)絡(luò)開銷,從而減少 Latency。但對(duì)于批作業(yè)來說,這種部署方式在資源利用率、性能、穩(wěn)定性上都存在一定的問題。[8]




          流批 Shuffle 之間的共性



          批作業(yè)和流作業(yè)的 Shuffle 有差異也有共性,共性主要體現(xiàn)在:


          1. 數(shù)據(jù)的 Meta 管理。所謂 Shuffle Meta 是指邏輯數(shù)據(jù)劃分到數(shù)據(jù)物理位置的映射。不管是流還是批的場景,在正常情況下都需要從 Meta 中找出自己的讀取或者寫入數(shù)據(jù)的物理位置;在異常情況下,為了減少容錯(cuò)代價(jià),通常也會(huì)對(duì) Shuffle Meta 數(shù)據(jù)進(jìn)行持久化;
          2. 數(shù)據(jù)傳輸。從邏輯上講,流作業(yè)和批作業(yè)的 Shuffle 都是為了對(duì)數(shù)據(jù)進(jìn)行重新劃分(re-partition/re-distribution)。在分布式系統(tǒng)中,對(duì)數(shù)據(jù)的重新劃分都涉及到跨線程、進(jìn)程、機(jī)器的數(shù)據(jù)傳輸。




          流批一體的 Shuffle 架構(gòu)


           
           
          Unified Shuffle 架構(gòu)抽象出三個(gè)組件[9]: Shuffle Master、Shuffle Reader、Shuffle Writer。Flink通過和這三個(gè)組件交互完成算子間的數(shù)據(jù)的重新劃分。通過這三個(gè)組件可以滿足不同Shuffle插件在具體策略上的差異:


          1. Shuffle Master 資源申請(qǐng)和資源釋放。也就是說插件需要通知框架 How to request/release resource。而由 Flink 來決定 When to call it;
          2. Shuffle Writer 上游的算子利用 Writer 把數(shù)據(jù)寫入 Shuffle Service——Streaming Shuffle 會(huì)把數(shù)據(jù)寫入內(nèi)存;External/Remote Batch Shuffle 可以把數(shù)據(jù)寫入到外部存儲(chǔ)中;
          3. Shuffle Reader 下游的算子可以通過 Reader 讀取 Shuffle 數(shù)據(jù);


           
          同時(shí),我們也為流批 Shuffle 的共性——Meta 管理、數(shù)據(jù)傳輸、服務(wù)部署[10]——提供了架構(gòu)層面的支持,從而避免對(duì)復(fù)雜組件的重復(fù)開發(fā)。高效穩(wěn)定的數(shù)據(jù)傳輸,是分布式系統(tǒng)最復(fù)雜的子系統(tǒng)之一,例如在傳輸中都要解決上下游反壓、數(shù)據(jù)壓縮、內(nèi)存零拷貝等問題,在新的架構(gòu)中只要開發(fā)一遍,就可以同時(shí)在流和批兩種場景下共同使用,大大減少了開發(fā)和維護(hù)的成本。


          六、流批一體的容錯(cuò)策略


           


          Flink 原有容錯(cuò)策略是以檢查點(diǎn)為前提的,具體來說無論是單個(gè) Task 出現(xiàn)失敗還是JobMaster 失敗,F(xiàn)link 都會(huì)按照最近的檢查點(diǎn)重啟整個(gè)作業(yè)。雖然這種策略存在一定的優(yōu)化空間,但是總的來說對(duì)于流的場景是基本是接受的。目前,F(xiàn)link Batch 運(yùn)行模式下不會(huì)開啟檢查點(diǎn)[11],這也意味一旦出現(xiàn)任何錯(cuò)誤,整個(gè)作業(yè)都要從頭執(zhí)行。

          雖然原有策略在理論上可以保證最終一定會(huì)產(chǎn)出正確的結(jié)果,但是明顯大多數(shù)客戶都無法接受這種容錯(cuò)策略所付出的代價(jià)。為了解決這些問題,我們分別對(duì) Task 和 JM 的容錯(cuò)都做了相應(yīng)的改進(jìn)。


          Pipeline Region Failover



          雖然在 Batch 執(zhí)行模式下沒有定時(shí)的 Checkpoint,但是在 Batch 執(zhí)行模式下,F(xiàn)link允許 Task 之間通過 Blocking Shuffle 進(jìn)行通信。對(duì)于讀取 Blocking Shuffle 的 Task 發(fā)生失敗之后,由于 Blocking Shuffle 中存儲(chǔ)了這個(gè) Task 所需要的全部數(shù)據(jù),所以只需要重啟這個(gè) Task 以及通過 Pipeline Shuffle 與其相連的全部下游任務(wù)即可,而不需要重啟整個(gè)作業(yè)。

          總的來說,Pipeline Region Failover 策略和 Scheduler 在進(jìn)行正常調(diào)度的時(shí)候一樣,都是把一個(gè) DAG 拆分成由若干 Pipeline shuffle 連接的一些 Pipeline Region,每當(dāng)一個(gè) Task 發(fā)生 FailOver 的時(shí)候,只會(huì)重啟這個(gè) Task 所在的 Region 即可。



          JM Failover



          JM 是一個(gè)作業(yè)的控制中心,包含了作業(yè)的各種執(zhí)行狀態(tài)。Flink 利用這些狀態(tài)對(duì)任務(wù)進(jìn)行調(diào)度和部署。一旦 JM 發(fā)生錯(cuò)誤之后,這些狀態(tài)將會(huì)全部丟失。如果沒有這些信息,即便所有的工作節(jié)點(diǎn)都沒有發(fā)生故障,新 JM 仍然無法繼續(xù)調(diào)度原來的作業(yè)。例如,由于任務(wù)的結(jié)束信息都已丟失,一個(gè)任務(wù)結(jié)束之后,新 JM 無法判斷現(xiàn)有的狀態(tài)是否滿足調(diào)度下游任務(wù)的條件——所有的輸入數(shù)據(jù)都已經(jīng)產(chǎn)生。

          從上邊的分析可以看出,JM Failover 的關(guān)鍵就是如何讓一個(gè) JM“恢復(fù)記憶”。在 VVR[12] 中我們通過基于 Operation Log 機(jī)制恢復(fù) JM 的關(guān)鍵狀態(tài)。


          細(xì)心的同學(xué)可能已經(jīng)發(fā)現(xiàn)了,雖然這兩個(gè)改進(jìn)的出發(fā)點(diǎn)是為了批的場景,但是實(shí)際上對(duì)于流的作業(yè)容也同樣有效。上邊只是簡要的介紹了兩種容錯(cuò)策略的思路,實(shí)際上還有很多值得思考的內(nèi)容。例如 Blocking 上游數(shù)據(jù)丟失了我們應(yīng)該如何處理?JM 中有哪些關(guān)鍵的狀態(tài)需要恢復(fù)?
           

          七、未來展望




          為了提供比現(xiàn)在更快、更穩(wěn)的用戶體驗(yàn),我們已經(jīng)開始了下一代流式架構(gòu)的研發(fā);Flink在流批一體的場景下得到了越來越多用戶的認(rèn)可,但是我們也知道業(yè)界還有很多高水平傳統(tǒng)大數(shù)據(jù)系統(tǒng)值得我們學(xué)習(xí)。最后我也希望感興趣的小伙伴可以加入我們,一起打造一個(gè)具有完美用戶體驗(yàn)的流批一體大數(shù)據(jù)計(jì)算引擎。

          瀏覽 70
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  大逼逼影院 | 蜜臀av在线观看 午夜高清无码视频 | 99热日韩 | 久久久久久日产精品 | 午夜寂寞人妻 |