<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink 大規(guī)模作業(yè)調(diào)度性能優(yōu)化

          共 7591字,需瀏覽 16分鐘

           ·

          2022-01-14 19:57

          摘要:本文作者洪志龍(柏星)& 朱翥(長(zhǎng)耕),分享了如何在 Flink 1.13 版本和 1.14 版本中對(duì) Flink 調(diào)度大規(guī)模作業(yè)的性能進(jìn)行了優(yōu)化。主要內(nèi)容包括:

          1. 性能測(cè)評(píng)結(jié)果
          2. 基于拓?fù)浣Y(jié)構(gòu)的優(yōu)化
          3. 優(yōu)化任務(wù)部署
          4. 針對(duì) Pipelined Region 構(gòu)建的優(yōu)化


          Tips:點(diǎn)擊「閱讀原文」查看?FFA 2021 視頻回放~

          隨著 Flink 流批一體架構(gòu)不斷演進(jìn)和升級(jí),越來(lái)越多的用戶開(kāi)始選擇用 Flink 來(lái)同時(shí)承載實(shí)時(shí)和離線的業(yè)務(wù)。離線業(yè)務(wù)和實(shí)時(shí)業(yè)務(wù)有一定差異性,其中比較關(guān)鍵的一點(diǎn)是 ——?離線作業(yè)的規(guī)模通常都遠(yuǎn)遠(yuǎn)大于實(shí)時(shí)作業(yè)。超大規(guī)模的流批作業(yè)對(duì) Flink 的調(diào)度性能提出了新的挑戰(zhàn)。在基于 Flink 1.12 版本部署大規(guī)模流批作業(yè)時(shí),用戶可能會(huì)遇到以下瓶頸:


          1. 需要很長(zhǎng)時(shí)間才能完成作業(yè)的調(diào)度和部署;

          2. 需要大量?jī)?nèi)存來(lái)存儲(chǔ)作業(yè)的執(zhí)行拓?fù)鋱D以及部署時(shí)所需的臨時(shí)變量,并且在運(yùn)行過(guò)程中會(huì)出現(xiàn)頻繁的長(zhǎng)時(shí)間 GC,影響集群穩(wěn)定性;

          經(jīng)測(cè)試,對(duì)于一個(gè)并發(fā)度為 10k 的 word count 作業(yè),在其部署時(shí) JobManager 需要 30 GiB 內(nèi)存,并且從提交作業(yè)到所有任務(wù)節(jié)點(diǎn)部署完畢所需的總時(shí)間長(zhǎng)達(dá) 4 分鐘。

          此外,對(duì)于大規(guī)模作業(yè),任務(wù)部署的過(guò)程可能會(huì)長(zhǎng)時(shí)間阻塞 JobManager 的主線程。當(dāng)主線程阻塞時(shí),JobManager 無(wú)法響應(yīng)任何來(lái)自 TaskManager 的請(qǐng)求。這會(huì)使得 TaskManager 心跳超時(shí)進(jìn)而導(dǎo)致作業(yè)出錯(cuò)失敗。在最壞的情況下,作業(yè)從故障恢復(fù) (Failover) 并進(jìn)行新一輪部署時(shí)又會(huì)出現(xiàn)心跳超時(shí),從而導(dǎo)致作業(yè)一直卡在部署階段無(wú)法正常運(yùn)行。

          為了優(yōu)化 Flink 調(diào)度大規(guī)模作業(yè)的性能,我們?cè)?Flink 1.13 版本和 1.14 版本進(jìn)行了以下優(yōu)化:


          1. 針對(duì)拓?fù)浣Y(jié)構(gòu)引入分組概念,優(yōu)化與拓?fù)湎嚓P(guān)的計(jì)算邏輯,主要包括作業(yè)初始化、Task 調(diào)度以及故障恢復(fù)時(shí)計(jì)算需要重啟的 Task 節(jié)點(diǎn)等等。與此同時(shí),該優(yōu)化降低了執(zhí)行拓?fù)湔加玫膬?nèi)存空間;

          2. 引入緩存機(jī)制優(yōu)化任務(wù)部署,優(yōu)化后部署速度更快且所需內(nèi)存更少;

          3. 基于邏輯拓?fù)浜蛨?zhí)行拓?fù)涞奶匦赃M(jìn)行優(yōu)化以加快 Pipelined Region 的構(gòu)建速度,從而降低作業(yè)初始化所需的時(shí)間。

          一、性能評(píng)測(cè)結(jié)果


          為了評(píng)估優(yōu)化的效果,我們對(duì) Flink 1.12 (優(yōu)化前) 和 Flink 1.14 (優(yōu)化后) 進(jìn)行了對(duì)比測(cè)試。測(cè)試作業(yè)包含兩個(gè)節(jié)點(diǎn),由全連接邊相連,并發(fā)度均為 10k。為了通過(guò) blob 服務(wù)器分發(fā) ShuffleDescriptor,我們將配置項(xiàng) blob.offload.minsize 的值修改為 100 KiB。該配置項(xiàng)指定了通過(guò) blob 服務(wù)器傳輸數(shù)據(jù)的最小閾值,大小超過(guò)該閾值的數(shù)據(jù)將會(huì)通過(guò) Blob 服務(wù)器進(jìn)行傳輸。該配置項(xiàng)的默認(rèn)值為 1 MiB,而測(cè)試作業(yè)中節(jié)點(diǎn)的 ShuffleDescriptor 大小約為 270 KiB。測(cè)試結(jié)果如表 1 所示:

          表 1 Flink 1.12 和 1.14 各流程時(shí)間對(duì)比


          1.12

          1.14

          時(shí)間降低百分比(%)

          作業(yè)初始化

          11,431ms

          627ms

          94.51%

          任務(wù)部署

          63,118ms

          17,183ms

          72.78%

          故障恢復(fù)時(shí)計(jì)算重啟節(jié)點(diǎn)

          37,195ms

          170ms

          99.55%


          除了時(shí)間大幅縮短以外,內(nèi)存占用也明顯降低。在 Flink 1.12 版本上運(yùn)行測(cè)試作業(yè)時(shí),JobManager 需要 30 GiB 內(nèi)存才能保證作業(yè)穩(wěn)定運(yùn)行,而在 Flink 1.14 版本上只需要 2 GiB 即可。與此同時(shí),GC 情況也得以改善。在 1.12 版本上,測(cè)試作業(yè)在初始化和 Task 部署的過(guò)程中都會(huì)出現(xiàn)超過(guò) 10 秒的長(zhǎng) GC,而在 1.14 版本上均未出現(xiàn),這意味著心跳超時(shí)等問(wèn)題出現(xiàn)的概率更低,作業(yè)運(yùn)行更為穩(wěn)定。

          在 1.12 版本上,除去申請(qǐng)資源的時(shí)間,測(cè)試作業(yè)需要至少 4 分鐘才能部署完成。而作為對(duì)比,在 1.14 版本上,除去申請(qǐng)資源的時(shí)間,測(cè)試作業(yè)在 30 秒內(nèi)即可完成部署并開(kāi)始運(yùn)行。整體所需時(shí)間降低了 87%。鑒于此,對(duì)于需要部署運(yùn)行大規(guī)模作業(yè)的用戶,建議將 Flink 版本升級(jí)至 1.14 以提升作業(yè)調(diào)度和部署性能。

          在接下來(lái)的部分中我們將進(jìn)一步介紹各項(xiàng)優(yōu)化的細(xì)節(jié)。

          二、基于拓?fù)浣Y(jié)構(gòu)的優(yōu)化


          在 Flink 中,分發(fā)模式 (Distribution Pattern) 描述了上游節(jié)點(diǎn)與下游節(jié)點(diǎn)連接的方式,上游節(jié)點(diǎn)計(jì)算的結(jié)果會(huì)按照連邊分發(fā)到下游節(jié)點(diǎn)。目前 Flink 中有兩種分發(fā)模式:點(diǎn)對(duì)點(diǎn) (Pointwise) 和全連接 (All-to-all)。如圖 1 所示,當(dāng)分發(fā)模式為點(diǎn)對(duì)點(diǎn)時(shí),遍歷所有邊的計(jì)算復(fù)雜度為 O(N);當(dāng)分發(fā)模式為全連接時(shí),所有下游節(jié)點(diǎn)與上游節(jié)點(diǎn)都有連邊,遍歷所有邊的計(jì)算復(fù)雜度為 O(N2),所需時(shí)間會(huì)隨著規(guī)模增大而迅速增長(zhǎng)。


          圖 1 目前 Flink 的兩種分發(fā)模式


          Flink 1.12 版本使用執(zhí)行拓?fù)溥?(ExecutionEdge) 存儲(chǔ)任務(wù)節(jié)點(diǎn)間連接的信息。當(dāng)分發(fā)模式為全連接模式時(shí),節(jié)點(diǎn)間一共會(huì)有 O(N2) 條邊相連,當(dāng)作業(yè)規(guī)模較大時(shí)會(huì)占用大量?jī)?nèi)存。對(duì)于兩個(gè)全連接邊相連且并發(fā)度為 10k 的節(jié)點(diǎn),其連邊數(shù)量為 1 億,總共需要超過(guò) 4 GiB 內(nèi)存來(lái)存儲(chǔ)這些連邊。在生產(chǎn)作業(yè)中可能會(huì)有多個(gè)全連接邊相連的節(jié)點(diǎn),這也就意味著隨著作業(yè)規(guī)模的增長(zhǎng),所需內(nèi)存也會(huì)大幅增長(zhǎng)。

          從圖 1 可以看到,對(duì)于全連接邊相連的任務(wù)節(jié)點(diǎn),所有上游節(jié)點(diǎn)所產(chǎn)生的結(jié)果分區(qū) (Result Partition) 都是同構(gòu)的,也就是說(shuō)這些結(jié)果分區(qū)所連接的下游任務(wù)節(jié)點(diǎn)都是完全相同的。全連接邊相連的所有下游節(jié)點(diǎn)也都是同構(gòu)的,因?yàn)槠渌M(fèi)的上游分區(qū)都是相同的。鑒于節(jié)點(diǎn)間的 JobEdge 只有一種分發(fā)模式,我們可以按照分發(fā)模式對(duì)上游分區(qū)以及下游節(jié)點(diǎn)進(jìn)行分組。

          對(duì)于全連接邊,由于其所有下游節(jié)點(diǎn)都是同構(gòu)的,我們可以將這些下游節(jié)點(diǎn)劃分為一組,稱為節(jié)點(diǎn)組 (ConsumerVertexGroup),全連接邊相連的所有上游分區(qū)都與這個(gè)組連接。同樣,所有同構(gòu)的上游分區(qū)也被劃分為同一組,稱為分區(qū)組 (ConsumedPartitionGroup),全連接邊相連的所有下游節(jié)點(diǎn)都與這個(gè)組相連。優(yōu)化方案的基本思路為:將所有消費(fèi)相同結(jié)果分區(qū)的下游節(jié)點(diǎn)放入同一個(gè)節(jié)點(diǎn)組中,同時(shí)將所有與相同下游節(jié)點(diǎn)相連的結(jié)果分區(qū)放入同一個(gè)分區(qū)組中,如圖 2 所示。


          圖 2 兩種分發(fā)模式下如何對(duì)結(jié)果分區(qū)和任務(wù)節(jié)點(diǎn)進(jìn)行分組

          在調(diào)度任務(wù)節(jié)點(diǎn)時(shí),F(xiàn)link 需要遍歷每一個(gè)上游分區(qū)和下游節(jié)點(diǎn)間的所有連邊。在優(yōu)化前,由于連邊的總數(shù)量為 O(N2),因此將所有邊遍歷一遍的總時(shí)間復(fù)雜度為 O(N2)。優(yōu)化后,執(zhí)行拓?fù)溥叡环纸M的概念所替代。鑒于所有同構(gòu)的分區(qū)都連接到同一個(gè)下游節(jié)點(diǎn)組,當(dāng) Flink 需要遍歷所有連邊時(shí),只需要將該節(jié)點(diǎn)組遍歷一遍即可,不需要重復(fù)遍歷所有節(jié)點(diǎn),這樣就使得計(jì)算復(fù)雜度從 O(N2) 降到 O(N)。

          對(duì)于點(diǎn)對(duì)點(diǎn)的分發(fā)模式,上游結(jié)果分區(qū)與下游節(jié)點(diǎn)逐一相連,因此分區(qū)組和節(jié)點(diǎn)組之間也是點(diǎn)對(duì)點(diǎn)相連,分組的數(shù)量級(jí)和執(zhí)行拓?fù)溥叺臄?shù)量級(jí)是一樣的,也就是說(shuō),遍歷所有連邊的計(jì)算復(fù)雜度依舊是 O(N)。

          對(duì)于上文我們提到的 word count 作業(yè),采用上述的分組方式取代執(zhí)行拓?fù)溥吙梢詫?zhí)行拓?fù)涞膬?nèi)存占用從 4 GiB 降至 12 MiB 左右?;诜纸M的概念,我們對(duì)作業(yè)初始化、任務(wù)調(diào)度以及故障恢復(fù)時(shí)計(jì)算需要重啟的節(jié)點(diǎn)等耗時(shí)較長(zhǎng)的計(jì)算邏輯進(jìn)行了優(yōu)化。這些計(jì)算邏輯都涉及到對(duì)上下游之間所有連邊進(jìn)行遍歷的操作。在優(yōu)化后,其計(jì)算復(fù)雜度都從 O(N2) 降為 O(N)。

          三、優(yōu)化任務(wù)部署


          對(duì)于 Flink 1.12 版本,當(dāng)大規(guī)模作業(yè)內(nèi)包含全連接邊時(shí),部署所有節(jié)點(diǎn)需要花費(fèi)很長(zhǎng)時(shí)間。此外,在部署過(guò)程中容易出現(xiàn) TaskManager 心跳超時(shí)的情況,進(jìn)而導(dǎo)致集群不穩(wěn)定。

          目前任務(wù)部署包含以下幾個(gè)階段:


          1. JobManager 在主線程內(nèi)為每一個(gè) Task 創(chuàng)建任務(wù)部署描述符 (TaskDeploymentDescriptor,以下簡(jiǎn)稱 TDD);

          2. JobManager 在異步線程內(nèi)將這些 TDD 進(jìn)行序列化;

          3. JobManager 通過(guò) RPC 通信將序列化后的 TDD 發(fā)送至 TaskManager;

          4. TaskManager 基于 TDD 創(chuàng)建任務(wù)并執(zhí)行。

          TDD 包含了 TaskManager 創(chuàng)建任務(wù) (Task) 時(shí)所需的所有信息。當(dāng)任務(wù)部署開(kāi)始時(shí),JobManager 會(huì)在主線程內(nèi)為所有任務(wù)節(jié)點(diǎn)創(chuàng)建 TDD。在創(chuàng)建過(guò)程中 JobManager 無(wú)法響應(yīng)任何其他請(qǐng)求。對(duì)于大規(guī)模作業(yè),這一過(guò)程可能會(huì)導(dǎo)致 JobManager 主線程長(zhǎng)時(shí)間被阻塞,進(jìn)一步導(dǎo)致心跳超時(shí),從而觸發(fā)作業(yè)故障。

          鑒于任務(wù)部署時(shí)所有 TDD 都是由 JobManager 負(fù)責(zé)發(fā)送至各 TaskManager,這導(dǎo)致 JobManager 可能會(huì)成為性能瓶頸。尤其是對(duì)于大規(guī)模作業(yè),部署時(shí)產(chǎn)生的 TDD 會(huì)占用大量?jī)?nèi)存空間,導(dǎo)致頻繁的長(zhǎng)時(shí)間 GC,進(jìn)一步加重 JobManager 的負(fù)擔(dān)。

          因此,我們需要縮短創(chuàng)建 TDD 所需的時(shí)間,避免心跳超時(shí)的發(fā)生。此外,如果能夠縮減 TDD 的大小,網(wǎng)絡(luò)傳輸所需的時(shí)間也會(huì)縮短,這樣可以進(jìn)一步加快任務(wù)部署的速度。

          3.1 為 ShuffleDescriptor 添加緩存機(jī)制


          ShuffleDescriptor 用于描述任務(wù)在運(yùn)行時(shí)需要消費(fèi)的上游結(jié)果分區(qū)的所有信息。當(dāng)作業(yè)規(guī)模較大時(shí),ShuffleDescriptor 可能是 TDD 中所占空間最大的一部分。對(duì)于全連接邊相連的節(jié)點(diǎn),當(dāng)上游節(jié)點(diǎn)和下游節(jié)點(diǎn)的并發(fā)度都是 N 時(shí),每一個(gè)下游節(jié)點(diǎn)需要消費(fèi) N 個(gè)上游結(jié)果分區(qū),此時(shí) ShuffleDescriptor 的總數(shù)量是 N2。也就是說(shuō),計(jì)算所有節(jié)點(diǎn)的 ShuffleDescriptor 的時(shí)間復(fù)雜度為 O(N2)。

          然而,對(duì)于同構(gòu)的下游節(jié)點(diǎn)來(lái)說(shuō),他們所消費(fèi)的上游結(jié)果分區(qū)是完全一樣的,因此部署時(shí)所需要的 ShuffleDescriptor 內(nèi)容也是一樣的。鑒于此,在部署時(shí)不需要為每一個(gè)下游節(jié)點(diǎn)重復(fù)計(jì)算 ShuffleDescriptor,只需要將計(jì)算好的 ShuffleDescriptor 放入緩存以供復(fù)用即可。這樣計(jì)算 TDD 的時(shí)間復(fù)雜度就可以從 O(N2) 降至 O(N)。

          為了縮減 RPC 消息的大小,進(jìn)而降低網(wǎng)絡(luò)傳輸?shù)拈_(kāi)銷(xiāo),我們可以對(duì) ShuffleDescriptor 進(jìn)行壓縮。對(duì)于上文我們提到的 word count 作業(yè),當(dāng)節(jié)點(diǎn)并發(fā)度為 10k 時(shí),每一個(gè)下游節(jié)點(diǎn)都會(huì)有 10k 個(gè) ShuffleDescriptor,在壓縮后其序列化值的總大小降低了 72%。

          3.2 通過(guò) Blob 服務(wù)器分發(fā) ShuffleDescriptor


          Blob (Binary Large Object) 以二進(jìn)制數(shù)據(jù)的形式存儲(chǔ)大型文件。Flink 通過(guò) blob 服務(wù)器在 JobManager 和 TaskManager 之間傳輸體積較大的文件。當(dāng) JobManager 需要將大文件傳輸至 TaskManager 時(shí),它可以將文件傳輸至 blob 服務(wù)器 (同時(shí)會(huì)將文件傳輸至分布式文件系統(tǒng)),并且獲得訪問(wèn)文件所需的 token。當(dāng) TaskManager 獲取到 token 時(shí),它們會(huì)從分布式文件系統(tǒng) (Distributed File System,DFS) 下載文件。TaskManager 會(huì)同時(shí)將文件存儲(chǔ)到本地 blob 緩存中方便之后重復(fù)讀取。

          在任務(wù)部署的過(guò)程中,JobManager 負(fù)責(zé)將 ShuffleDescriptor 通過(guò) RPC 消息分發(fā)到對(duì)應(yīng)的 TaskManager 中。在發(fā)送完成后,RPC 消息會(huì)被垃圾回收器回收處理。但當(dāng) JobManager 創(chuàng)建 RPC 消息的速度大于發(fā)送的速度時(shí),RPC 消息會(huì)逐漸堆積在內(nèi)存中并且對(duì) GC 造成影響,頻繁觸發(fā)長(zhǎng)時(shí)間的 GC。這些 GC 會(huì)導(dǎo)致 JobManager 停擺,進(jìn)一步拖慢任務(wù)部署的速度。

          為了解決這個(gè)問(wèn)題,F(xiàn)link 可以通過(guò) blob 服務(wù)器來(lái)分發(fā)大體積的 ShuffleDescriptor。首先 JobManager 將 ShuffleDescriptor 發(fā)送至 blob 服務(wù)器,而 blob 服務(wù)器會(huì)將 ShuffleDescriptor 存儲(chǔ)至 DFS 中,TaskManager 在開(kāi)始處理 TDD 時(shí)會(huì)從 DFS 下載數(shù)據(jù)。這樣 JobManager 不需要將所有 ShuffleDescriptor 始終存儲(chǔ)在內(nèi)存中直至對(duì)應(yīng)的 RPC 消息發(fā)出。經(jīng)過(guò)優(yōu)化后,在部署大規(guī)模作業(yè)時(shí)長(zhǎng)時(shí)間 GC 的頻率會(huì)明顯降低。且鑒于 DFS 為 TaskManager 提供了多個(gè)分布式節(jié)點(diǎn)下載數(shù)據(jù),JobManager 網(wǎng)絡(luò)傳輸?shù)膲毫σ驳靡跃徑?,不再成為瓶頸,這樣可以加快任務(wù)部署的速度。


          圖 3 JobManager 將 ShuffleDescriptor 分發(fā)至 TaskManager

          為了避免緩存過(guò)多導(dǎo)致本地磁盤(pán)空間不足,當(dāng) ShuffleDescriptor 所對(duì)應(yīng)的結(jié)果分區(qū)被釋放時(shí),在 blob 服務(wù)器上存儲(chǔ)的對(duì)應(yīng)緩存會(huì)被清理。此外我們?yōu)?TaskManager 上 ShuffleDescriptor 的緩存添加了總大小的限制。當(dāng)緩存超過(guò)一定大小時(shí),緩存會(huì)按照最近最少使用 (LRU) 的順序移除。這樣可以保證本地磁盤(pán)不會(huì)被緩存占滿,特別是對(duì)于 session 模式運(yùn)行的集群。

          四、針對(duì) Pipelined Region 構(gòu)建的優(yōu)化


          目前 Flink 中節(jié)點(diǎn)間有兩種數(shù)據(jù)交換類(lèi)型:pipelined 和 blocking。對(duì)于 blocking 的數(shù)據(jù)交換方式,結(jié)果分區(qū)會(huì)在上游全部計(jì)算完成后再交由下游進(jìn)行消費(fèi),數(shù)據(jù)會(huì)持久化到本地,支持多次消費(fèi)。對(duì)于 pipelined 數(shù)據(jù)交換,上游結(jié)果分區(qū)的產(chǎn)出和下游任務(wù)節(jié)點(diǎn)的消費(fèi)是同時(shí)進(jìn)行的,所有數(shù)據(jù)不會(huì)被持久化且只能讀取一次。

          鑒于 pipelined 的數(shù)據(jù)流產(chǎn)出和消費(fèi)同時(shí)發(fā)生,F(xiàn)link 需要保證 pipelined 邊相連的上下游節(jié)點(diǎn)同時(shí)運(yùn)行。由 pipelined 邊相連的節(jié)點(diǎn)構(gòu)成了一個(gè) region,被稱為 Pipelined Region (以下簡(jiǎn)稱 region)。在 Flink 中,region 是任務(wù)調(diào)度和 Failover 的基本單位。在調(diào)度的過(guò)程中,同一 region 內(nèi)的所有 Task 節(jié)點(diǎn)都會(huì)被同時(shí)調(diào)度,而整個(gè)拓?fù)渲兴?region 會(huì)按照拓?fù)漤樞蛑鹨贿M(jìn)行調(diào)度。

          目前在 Flink 的調(diào)度層面有兩種 region:邏輯層面的 Logical Pipelined Region 以及執(zhí)行調(diào)度層面的 Scheduling Pipelined Region。邏輯 region 由邏輯拓?fù)?(JobGraph) 中的節(jié)點(diǎn) JobVertex 構(gòu)成,而執(zhí)行 region 則由執(zhí)行拓?fù)?(ExecutionGraph) 中的節(jié)點(diǎn) ExecutionVertex 構(gòu)成。類(lèi)似于 ExecutionVertex 基于 JobVertex 計(jì)算產(chǎn)生,執(zhí)行 region 是由邏輯 region 計(jì)算得到的,如圖 4 所示。


          圖 4 邏輯 region 以及執(zhí)行 region

          在構(gòu)建 region 的過(guò)程中會(huì)遇到一個(gè)問(wèn)題:region 之間可能存在環(huán)形依賴。對(duì)于當(dāng)前 region,當(dāng)且僅當(dāng)其所消費(fèi)的上游 region 都產(chǎn)出全部數(shù)據(jù)后才能進(jìn)行調(diào)度。如果兩個(gè)?region?之間存在環(huán)形依賴,那么就會(huì)出現(xiàn)調(diào)度死鎖:兩個(gè)?region?都需要等對(duì)方完成才能調(diào)度,最終兩個(gè)?region?都無(wú)法被調(diào)度起來(lái)。因此,F(xiàn)link 通過(guò) Tarjan 強(qiáng)連通分量算法來(lái)發(fā)現(xiàn)環(huán)形依賴,并將具有環(huán)形依賴的?region?合并成一個(gè)?region,這樣就能解決調(diào)度死鎖的問(wèn)題。Tarjan 強(qiáng)連通分量算法需要遍歷拓?fù)鋬?nèi)的所有邊,而對(duì)于全連接的分發(fā)模式來(lái)說(shuō),其邊的數(shù)量為 O(N2),因此算法整體的計(jì)算復(fù)雜度為 O(N2),隨著規(guī)模變大會(huì)顯著增長(zhǎng),從而影響大規(guī)模作業(yè)初始化的時(shí)間。


          圖 5 具有調(diào)度死鎖的拓?fù)?/span>

          為了加快 region 的構(gòu)建速度,我們可以基于邏輯拓?fù)浜蛨?zhí)行拓?fù)渲g的關(guān)聯(lián)進(jìn)行優(yōu)化。鑒于一個(gè)執(zhí)行 region 只能由一個(gè)邏輯 region 中的節(jié)點(diǎn)派生,不會(huì)出現(xiàn)跨?region?的情況,F(xiàn)link 在初始化作業(yè)時(shí)只需要遍歷所有邏輯 region 并逐一轉(zhuǎn)換成執(zhí)行 region 即可。轉(zhuǎn)換的方式跟分發(fā)模式相關(guān)。如果邏輯 region 內(nèi)的節(jié)點(diǎn)間有任何全連接邊,則整個(gè)邏輯 region 可以直接轉(zhuǎn)換成一個(gè)執(zhí)行 region。


          圖 6 如何將邏輯 region 轉(zhuǎn)換成執(zhí)行 region

          如果全連接邊采用的是 pipelined 數(shù)據(jù)交換,所有與之相連的上下游節(jié)點(diǎn)都必須同時(shí)運(yùn)行,也就是說(shuō)全連接邊所連接的所有?region?都要合并成一個(gè)?region。如果全連接邊采用的是 blocking 數(shù)據(jù)交換,則會(huì)引入環(huán)形依賴,如圖 5 所示。在這種情況下所有與之相連的?region?都必須合并以避免調(diào)度死鎖,如圖 6 所示。鑒于只要有全連接邊就直接生成一整個(gè)執(zhí)行 region,在這種情況下不需要用 Tarjan 算法,整體計(jì)算復(fù)雜度只需要 O(N) 即可。

          如果在邏輯 region 內(nèi),所有節(jié)點(diǎn)間都只有點(diǎn)對(duì)點(diǎn)的分發(fā)模式,那么 Flink 依舊直接用 Tarjan 算法來(lái)檢測(cè)環(huán)形依賴,鑒于點(diǎn)對(duì)點(diǎn)的分發(fā)模式其邊數(shù)為 O(N),算法的時(shí)間復(fù)雜度也只有 O(N)。

          在優(yōu)化后,將邏輯 region 轉(zhuǎn)換成執(zhí)行 region 的整體計(jì)算復(fù)雜度從 O(N2) 降為 O(N)。經(jīng)測(cè)試,對(duì)于上文提到的 word count 作業(yè),當(dāng)兩個(gè)節(jié)點(diǎn)間的連邊為全連接邊且數(shù)據(jù)交換方式為 blocking 時(shí),構(gòu)建 region 的總時(shí)間降低了 99%,從 8,257ms 降至 120ms。

          猜你喜歡
          數(shù)倉(cāng)建?!笜?biāo)體系
          數(shù)倉(cāng)建?!獙挶淼脑O(shè)計(jì)
          Spark SQL知識(shí)點(diǎn)與實(shí)戰(zhàn)
          Hive計(jì)算最大連續(xù)登陸天數(shù)
          Flink計(jì)算pv和uv的通用方法

          交流群

          加我微信:ddxygq,回復(fù)“加群”,我拉你進(jìn)大數(shù)據(jù)交流群。

          瀏覽 69
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产一区久久久 | 精品久久久久久久久久久久久 | 成人福利电影 | 色逼逼免费视频网站 | 欧日韩一级 |