<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink 原理 | Flink Sort-Shuffle 實(shí)現(xiàn)簡(jiǎn)介

          共 7912字,需瀏覽 16分鐘

           ·

          2021-11-14 08:21

          公眾號(hào)更名公告


          「Flink 中文社區(qū)」更名為「Apache Flink


          感謝你們的關(guān)注

          摘要:本文介紹 Sort-Shuffle 如何幫助 Flink 在應(yīng)對(duì)大規(guī)模批數(shù)據(jù)處理任務(wù)時(shí)更加游刃有余。主要內(nèi)容包括:

          1. 數(shù)據(jù) Shuffle 簡(jiǎn)介
          2. 引入 Sort-Shuffle 的意義
          3. Flink Sort-Shuffle 實(shí)現(xiàn)
          4. 測(cè)試結(jié)果
          5. 調(diào)優(yōu)參數(shù)
          6. 未來(lái)展望

          Tips:FFA 峰會(huì)以及 Hackathon 比賽重磅開(kāi)啟,點(diǎn)擊「閱讀原文」即可報(bào)名~

           GitHub 地址 
          歡迎大家關(guān)注 Flink ~


          Flink 作為批流一體的大數(shù)據(jù)計(jì)算引擎,大規(guī)模批數(shù)據(jù)處理也是 Flink 數(shù)據(jù)處理能力的重要組成部分。隨著 Flink 的版本迭代,其批數(shù)據(jù)處理能力也在不斷增強(qiáng),sort-shuffle 的引入,使得 Flink 在應(yīng)對(duì)大規(guī)模批數(shù)據(jù)處理任務(wù)時(shí)更加游刃有余。

          一、數(shù)據(jù) Shuffle 簡(jiǎn)介


          數(shù)據(jù) shuffle 是批數(shù)據(jù)處理作業(yè)的一個(gè)重要階段,在這一階段中,上游處理節(jié)點(diǎn)的輸出數(shù)據(jù)會(huì)被持久化到外部存儲(chǔ)中,之后下游的計(jì)算節(jié)點(diǎn)會(huì)讀取這些數(shù)據(jù)并進(jìn)行處理。這些持久化的數(shù)據(jù)不僅僅是一種計(jì)算節(jié)點(diǎn)間的數(shù)據(jù)交換形式,還在錯(cuò)誤恢復(fù)中發(fā)揮著重要作用。

          目前,有兩種批數(shù)據(jù) shuffle 模型被現(xiàn)有的大規(guī)模分布式計(jì)算系統(tǒng)采用,分別是基于 hash 的方式以及基于 sort 的方式:


          1. 基于 hash 方式的核心思路是將發(fā)送給下游不同并發(fā)消費(fèi)任務(wù)的數(shù)據(jù)寫(xiě)到單獨(dú)的文件中,這樣文件本身就成了一個(gè)自然的區(qū)分不同數(shù)據(jù)分區(qū)的邊界;

          2. 基于 sort 方式的核心思路是先將所有分區(qū)的數(shù)據(jù)寫(xiě)在一起,然后通過(guò) sort 來(lái)區(qū)分不同數(shù)據(jù)分區(qū)的邊界。

          我們?cè)?Flink 1.12 版本將基于 sort 的批處理 shuffle 實(shí)現(xiàn)引入了 Flink 并在后續(xù)進(jìn)行了持續(xù)的性能與穩(wěn)定性?xún)?yōu)化;到 Flink 1.13 版本,sort-shuffle 已經(jīng)實(shí)現(xiàn)生產(chǎn)可用。

          二、引入 Sort-Shuffle 的意義


          我們之所以要在 Flink 中引入 sort-shuffle 的實(shí)現(xiàn),一個(gè)重要的原因是 Flink 原本的基于 hash 的實(shí)現(xiàn)對(duì)大規(guī)模批作業(yè)不可用。這個(gè)也是被現(xiàn)有的其他大規(guī)模分布式計(jì)算系統(tǒng)所證明的:


          1. 穩(wěn)定性方面:對(duì)于高并發(fā)批作業(yè),基于 hash 的實(shí)現(xiàn)會(huì)產(chǎn)生大量的文件,并且會(huì)對(duì)這些文件進(jìn)行并發(fā)讀寫(xiě),這會(huì)消耗很多資源并對(duì)文件系統(tǒng)會(huì)產(chǎn)生較大的壓力。文件系統(tǒng)需要維護(hù)大量的文件元數(shù)據(jù),會(huì)產(chǎn)生文件句柄以及 inode 耗盡等不穩(wěn)定風(fēng)險(xiǎn)。

          2. 性能方面:對(duì)于高并發(fā)批作業(yè),并發(fā)讀寫(xiě)大量的文件意味著大量的隨機(jī) IO,并且每次 IO 實(shí)際讀寫(xiě)的數(shù)據(jù)量可能是非常少的,這對(duì)于 IO 性能是一個(gè)巨大的挑戰(zhàn),在機(jī)械硬盤(pán)上,這使得數(shù)據(jù) shuffle 很容易成為批處理作業(yè)的性能瓶頸。

          通過(guò)引入基于 sort 的批數(shù)據(jù) shuffle 實(shí)現(xiàn),并發(fā)讀寫(xiě)的文件數(shù)量可以大大降低,有利于實(shí)現(xiàn)更好的數(shù)據(jù)順序讀寫(xiě),從而能夠提高 Flink 大規(guī)模批處理作業(yè)的穩(wěn)定性與性能。除此之外,新的 sort-shuffle 實(shí)現(xiàn)還可以減小內(nèi)存緩沖區(qū)的消耗。對(duì)于基于 hash 的實(shí)現(xiàn),每個(gè)數(shù)據(jù)分區(qū)都需要一塊讀寫(xiě)緩沖區(qū),內(nèi)存緩沖區(qū)消耗和并發(fā)成正比。而基于 sort 的實(shí)現(xiàn)則可以做到內(nèi)存緩沖區(qū)消耗和作業(yè)并發(fā)解耦(盡管更大的內(nèi)存可能會(huì)帶來(lái)更高的性能)。

          更為重要的一點(diǎn)是我們實(shí)現(xiàn)了新的存儲(chǔ)結(jié)構(gòu)與讀寫(xiě) IO 優(yōu)化,這使得 Flink 的批數(shù)據(jù) shuffle 相比于其他的大規(guī)模分布式數(shù)據(jù)處理系統(tǒng)更具優(yōu)勢(shì)。下面的章節(jié)會(huì)更為詳細(xì)的介紹 Flink 的 sort-shuffle 實(shí)現(xiàn)以及所取得的結(jié)果。

          三、Flink Sort-Shuffle 實(shí)現(xiàn)


          和其他分布式系統(tǒng)的批數(shù)據(jù) sort-shuffle 實(shí)現(xiàn)類(lèi)似,F(xiàn)link 的整個(gè) shuffle 過(guò)程分為幾個(gè)重要的階段,包括寫(xiě)數(shù)據(jù)到內(nèi)存緩沖區(qū)、對(duì)內(nèi)存緩沖區(qū)進(jìn)行排序、將排好序的數(shù)據(jù)寫(xiě)出到文件以及從文件中讀取 shuffle 數(shù)據(jù)并發(fā)送給下游。但是,與其他系統(tǒng)相比,F(xiàn)link 的實(shí)現(xiàn)有一些根本性的不同,包括多段數(shù)據(jù)存儲(chǔ)格式、省掉數(shù)據(jù)合并流程以及數(shù)據(jù)讀取 IO 調(diào)度等。這些都使得 Flink 的實(shí)現(xiàn)有著更優(yōu)秀的表現(xiàn)。

          1. 設(shè)計(jì)目標(biāo)


          在 Flink sort-shuffle 的整個(gè)實(shí)現(xiàn)過(guò)程中,我們把下面這些點(diǎn)作為主要的設(shè)計(jì)目標(biāo)加以考量:


          ■ 1.1 減少文件數(shù)量


          正如上面所討論的,基于 hash 的實(shí)現(xiàn)會(huì)產(chǎn)生大量的文件,而減少文件的數(shù)量有利于提高穩(wěn)定性和性能。Sort-Spill-Merge 的方式被分布式計(jì)算系統(tǒng)廣泛采納以達(dá)到這一目標(biāo),首先將數(shù)據(jù)寫(xiě)入內(nèi)存緩沖區(qū),當(dāng)內(nèi)存緩沖區(qū)填滿(mǎn)后對(duì)數(shù)據(jù)進(jìn)行排序,排序后的數(shù)據(jù)被寫(xiě)出到一個(gè)文件中,這樣總的文件數(shù)量是:(總數(shù)據(jù)量 / 內(nèi)存緩沖區(qū)大?。瑥亩募?shù)量被減少。當(dāng)所有數(shù)據(jù)寫(xiě)出完成后,將產(chǎn)生的文件合并成一個(gè)文件,從而進(jìn)一步減少文件數(shù)量并增大每個(gè)數(shù)據(jù)分區(qū)的大?。ㄓ欣陧樞蜃x?。?。


          相比于其他系統(tǒng)的實(shí)現(xiàn),F(xiàn)link 的實(shí)現(xiàn)有一個(gè)重要的不同,即 Flink 始終向同一個(gè)文件中不斷追加數(shù)據(jù),而不會(huì)寫(xiě)多個(gè)文件再進(jìn)行合并,這樣的好處始終只有一個(gè)文件,文件數(shù)量實(shí)現(xiàn)了最小化。

          ■ 1.2 打開(kāi)更少的文件


          同時(shí)打開(kāi)的文件過(guò)多會(huì)消耗更多的資源,同時(shí)容易導(dǎo)致文件句柄不夠用的問(wèn)題,導(dǎo)致穩(wěn)定性變差。因此,打開(kāi)更少的文件有利于提升系統(tǒng)的穩(wěn)定性。對(duì)于數(shù)據(jù)寫(xiě)出,如上所述,通過(guò)始終向同一個(gè)文件中追加數(shù)據(jù),每個(gè)并發(fā)任務(wù)始終只打開(kāi)一個(gè)文件。對(duì)于數(shù)據(jù)讀取,雖然每個(gè)文件都需要被大量下游的并發(fā)任務(wù)讀取,F(xiàn)link 依然通過(guò)只打開(kāi)文件一次,并在這些并發(fā)讀取任務(wù)間共享文件句柄實(shí)現(xiàn)了每個(gè)文件只打開(kāi)一次的目標(biāo)。

          ■ 1.3 最大化順序讀寫(xiě)


          文件的順序讀寫(xiě)對(duì)文件的 IO 性能至關(guān)重要。通過(guò)減少 shuffle 文件數(shù)量,我們已經(jīng)在一定程度上減少了隨機(jī)文件 IO。除此之外,F(xiàn)link 的批數(shù)據(jù) sort-shuffle 還實(shí)現(xiàn)了更多 IO 優(yōu)化來(lái)最大化文件的順序讀寫(xiě)。在數(shù)據(jù)寫(xiě)階段,通過(guò)將要寫(xiě)出的數(shù)據(jù)緩沖區(qū)聚合成更大的批并通過(guò) wtitev 系統(tǒng)調(diào)用寫(xiě)出從而實(shí)現(xiàn)了更好的順序?qū)憽T跀?shù)據(jù)讀取階段,通過(guò)引入讀取 IO 調(diào)度,總是按照文件的偏移順序服務(wù)數(shù)據(jù)讀取請(qǐng)求從而最大限度的實(shí)現(xiàn)的文件的順序讀。實(shí)驗(yàn)表明這些優(yōu)化極大的提升了批數(shù)據(jù) shuffle 的性能。

          ■ 1.4 減少讀寫(xiě) IO 放大


          傳統(tǒng)的 sort-spill-merge 方式通過(guò)將生成的多個(gè)文件合并成一個(gè)更大的文件從增大讀取數(shù)據(jù)塊的大小。這種實(shí)現(xiàn)方案雖然帶來(lái)了好處,但也有一些不足,最終要的一點(diǎn)便是讀寫(xiě) IO 放大,對(duì)于計(jì)算節(jié)點(diǎn)間的數(shù)據(jù) shuffle 而言,在不發(fā)生錯(cuò)誤的情況下,本身只需要寫(xiě)入和讀取數(shù)據(jù)一次,但是數(shù)據(jù)合并使得相同的數(shù)據(jù)被讀寫(xiě)多次,從而導(dǎo)致 IO 總量變多,并且存儲(chǔ)空間的消耗也會(huì)變大。

          Flink 的實(shí)現(xiàn)通過(guò)不斷向同一個(gè)文件中追加數(shù)據(jù)以及獨(dú)特的存儲(chǔ)結(jié)構(gòu)規(guī)避了文件和并的過(guò)程,雖然單個(gè)數(shù)據(jù)塊的大小小于和并后的大小,但由于規(guī)避了文件合并的開(kāi)銷(xiāo)再結(jié)合 Flink 獨(dú)有的 IO 調(diào)度,最終可以實(shí)現(xiàn)比 sort-spill-merge 方案更高的性能。

          ■ 1.5 減少內(nèi)存緩沖區(qū)消耗


          類(lèi)似于其他分布式計(jì)算系統(tǒng)中 sort-shuffle 的實(shí)現(xiàn),F(xiàn)link 利用一塊固定大小的內(nèi)存緩沖區(qū)進(jìn)行數(shù)據(jù)的緩存與排序。這塊內(nèi)存緩沖區(qū)的大小是與并發(fā)無(wú)關(guān)的,從而使得上游 shuffle 數(shù)據(jù)寫(xiě)所需要的內(nèi)存緩沖區(qū)大小與并發(fā)解耦。結(jié)合另一個(gè)內(nèi)存管理方面的優(yōu)化 FLINK-16428 可以同時(shí)實(shí)現(xiàn)下游 shuffle 數(shù)據(jù)讀取的內(nèi)存緩沖區(qū)消耗并發(fā)無(wú)關(guān)化,從而可以減少大規(guī)模批作業(yè)的內(nèi)存緩沖區(qū)消耗。(注:FLINK-16428 同時(shí)適用于批作業(yè)與流作業(yè))

          2. 實(shí)現(xiàn)細(xì)節(jié)


          ■ 2.1 內(nèi)存數(shù)據(jù)排序


          在 shuffle 數(shù)據(jù)的 sort-spill 階段,每條數(shù)據(jù)被首先序列化并寫(xiě)入到排序緩沖區(qū)中,當(dāng)緩沖區(qū)被填滿(mǎn)后,會(huì)對(duì)緩沖區(qū)中的所有二進(jìn)制數(shù)據(jù)按照數(shù)據(jù)分區(qū)的順序進(jìn)行排序。此后,排好序的數(shù)據(jù)會(huì)按照數(shù)據(jù)分區(qū)的順序被寫(xiě)出到文件中。雖然,目前并沒(méi)有對(duì)數(shù)據(jù)本身進(jìn)行排序,但是排序緩沖區(qū)的接口足夠的泛化,可以實(shí)現(xiàn)后續(xù)潛在的更為復(fù)雜的排序要求。排序緩沖區(qū)的接口定義如下:

          public interface SortBuffer {
          */** Appends data of the specified channel to this SortBuffer. \*/* boolean append(ByteBuffer source, int targetChannel, Buffer.DataType dataType) throws IOException;
          */** Copies data in this SortBuffer to the target MemorySegment. \*/* BufferWithChannel copyIntoSegment(MemorySegment target);
          long numRecords();
          long numBytes();
          boolean hasRemaining();
          void finish();
          boolean isFinished();
          void release();
          boolean isReleased(); }

          在排序算法上,我們選擇了復(fù)雜度較低的 bucket-sort。具體而言,每條序列化后的數(shù)據(jù)前面都會(huì)被插入一個(gè) 16 字節(jié)的元數(shù)據(jù)。包括 4 字節(jié)的長(zhǎng)度、4 字節(jié)的數(shù)據(jù)類(lèi)型以及 8 字節(jié)的指向同一數(shù)據(jù)分區(qū)中下一條數(shù)據(jù)的指針。結(jié)構(gòu)如下圖所示:


          當(dāng)從緩沖區(qū)中讀取數(shù)據(jù)時(shí),只需要按照每個(gè)數(shù)據(jù)分區(qū)的鏈?zhǔn)剿饕Y(jié)構(gòu)就可以讀取到屬于這個(gè)數(shù)據(jù)分區(qū)的所有數(shù)據(jù),并且這些數(shù)據(jù)保持了數(shù)據(jù)寫(xiě)入時(shí)的順序。這樣按照數(shù)據(jù)分區(qū)的順序讀取所有的數(shù)據(jù)就可以達(dá)到按照數(shù)據(jù)分區(qū)排序的目標(biāo)。

          ■ 2.2 文件存儲(chǔ)結(jié)構(gòu)


          如前所述,每個(gè)并行任務(wù)產(chǎn)生的 shuffle 數(shù)據(jù)會(huì)被寫(xiě)到一個(gè)物理文件中。每個(gè)物理文件包含多個(gè)數(shù)據(jù)區(qū)塊(data region),每個(gè)數(shù)據(jù)區(qū)塊由數(shù)據(jù)緩沖區(qū)的一次 sort-spill 生成。在每個(gè)數(shù)據(jù)區(qū)塊中,所有屬于不同數(shù)據(jù)分區(qū)(data partition,由下游計(jì)算節(jié)點(diǎn)不同并行任務(wù)消費(fèi))的數(shù)據(jù)按照數(shù)據(jù)分區(qū)的序號(hào)順序進(jìn)行排序聚合。下圖展示了 shuffle 數(shù)據(jù)文件的詳細(xì)結(jié)構(gòu)。其中(R1,R2,R3)是 3 個(gè)不同的數(shù)據(jù)區(qū)塊,分別對(duì)應(yīng) 3 次數(shù)據(jù)的 sort-spill 寫(xiě)出。每個(gè)數(shù)據(jù)塊中有 3 個(gè)不同的數(shù)據(jù)分區(qū),分別將由(C1,C2,C3)3 個(gè)不同的并行消費(fèi)任務(wù)進(jìn)行讀取。也就是說(shuō)數(shù)據(jù) B1.1,B2.1 及 B3.1 將由 C1 處理,數(shù)據(jù) B1.2,B2.2 及 B3.2 將由 C2 處理,而數(shù)據(jù) B1.3,B2.3 及 B3.3 將由 C3 處理。


          類(lèi)似于其他的分布式處理系統(tǒng)實(shí)現(xiàn),在 Flink 中,每個(gè)數(shù)據(jù)文件還對(duì)應(yīng)一個(gè)索引文件。索引文件用來(lái)在讀取時(shí)為每個(gè)消費(fèi)者索引屬于它的數(shù)據(jù)(data partition)。索引文件包含和數(shù)據(jù)文件相同的 data region,在每個(gè) data region 中有與 data partition 相同數(shù)量的索引項(xiàng),每個(gè)索引項(xiàng)包含兩個(gè)部分,分別對(duì)應(yīng)到數(shù)據(jù)文件的偏移量以及數(shù)據(jù)的長(zhǎng)度。作為一個(gè)優(yōu)化。Flink 為每個(gè)索引文件緩存最多 4M 的索引數(shù)據(jù)。數(shù)據(jù)文件與索引文件的對(duì)應(yīng)關(guān)系如下:


          ■ 2.3 讀取 IO 調(diào)度


          為了進(jìn)一步提高文件 IO 性能,基于上面的存儲(chǔ)結(jié)構(gòu),F(xiàn)link 進(jìn)一步引入了 IO 調(diào)度機(jī)制,類(lèi)似于磁盤(pán)調(diào)度的電梯算法,F(xiàn)link 的 IO 調(diào)度總是按照 IO 請(qǐng)求的文件偏移順序進(jìn)行調(diào)度。更具體來(lái)說(shuō),如果數(shù)據(jù)文件有 n 個(gè) data region,每個(gè) data region 有 m 個(gè) data partition,同時(shí)有 m 個(gè)下游計(jì)算任務(wù)讀取這一數(shù)據(jù)文件,那么下面的偽代碼展示了 Flink 的 IO 調(diào)度算法的工作流程:

          *// let data_regions as the data region list indexed from 0 to n - 1* *// let data_readers as the concurrent downstream data readers queue indexed from 0 to m - 1* for (data_region in data_regions) {   data_reader = poll_reader_of_the_smallest_file_offset(data_readers);   if (data_reader == null)     break;   reading_buffers = request_reading_buffers();   if (reading_buffers.isEmpty())     break;   read_data(data_region, data_reader, reading_buffers); }

          ■ 2.4 數(shù)據(jù)廣播優(yōu)化


          數(shù)據(jù)廣播是指發(fā)送相同的數(shù)據(jù)給下游計(jì)算節(jié)點(diǎn)的所有并行任務(wù),一個(gè)常見(jiàn)的應(yīng)用場(chǎng)景是 broadcast-join。Flink 的 sort-shuffle 實(shí)現(xiàn)對(duì)這一過(guò)程進(jìn)行了優(yōu)化,使得在包括內(nèi)存排序緩沖區(qū)和 shuffle 文件中,廣播數(shù)據(jù)只保存一份,這可以大大提升數(shù)據(jù)廣播的性能。更具體來(lái)說(shuō),當(dāng)寫(xiě)入一條廣播數(shù)據(jù)到排序緩沖區(qū)時(shí),這條數(shù)據(jù)只會(huì)被序列化并且拷貝一次,同樣在將數(shù)據(jù)寫(xiě)出到 shuffle 文件時(shí),也只會(huì)寫(xiě)一份數(shù)據(jù)。在索引文件中,對(duì)于不同 data partition 的數(shù)據(jù)索引項(xiàng),他們均指向數(shù)據(jù)文件中的同一塊數(shù)據(jù)。下圖展示了數(shù)據(jù)廣播優(yōu)化的所有細(xì)節(jié):


          ■ 2.5 數(shù)據(jù)壓縮


          數(shù)據(jù)壓縮是一個(gè)簡(jiǎn)單而有效的優(yōu)化手段,測(cè)試結(jié)果顯示數(shù)據(jù)壓縮可以提高 TPC-DS 總體性能超過(guò) 30%。類(lèi)似于 Flink 的基于 hash 的批處理 shuffle 實(shí)現(xiàn),數(shù)據(jù)壓縮是以網(wǎng)絡(luò)緩沖區(qū)(network buffer)為單位進(jìn)行的,數(shù)據(jù)壓縮不跨 data partition,也就是說(shuō)發(fā)給不同下游并行任務(wù)的數(shù)據(jù)分開(kāi)壓縮,壓縮發(fā)生在數(shù)據(jù)排序后寫(xiě)出前,下游消費(fèi)任務(wù)在收到數(shù)據(jù)后進(jìn)行解壓。下圖展示了數(shù)據(jù)壓縮的整個(gè)流程:


          四、測(cè)試結(jié)果


          1. 穩(wěn)定性


          新的 sort-shuffle 的實(shí)現(xiàn)極大的提高 Flink 運(yùn)行批處理作業(yè)的穩(wěn)定性。除了解決了潛在的文件句柄以及 inode 耗盡的不穩(wěn)定問(wèn)題外,還解決了一些 Flink 原有 hash-shuffle 存在的已知問(wèn)題,如 FLINK-21201(創(chuàng)建過(guò)多文件導(dǎo)致主線(xiàn)程阻塞),F(xiàn)LINK-19925(在網(wǎng)絡(luò) netty 線(xiàn)程中執(zhí)行 IO 操作導(dǎo)致網(wǎng)絡(luò)穩(wěn)定性受到影響)等。

          2. 性能


          我們?cè)?1000 規(guī)模的并發(fā)下運(yùn)行了 TPC-DS 10T 數(shù)據(jù)規(guī)模的測(cè)試,結(jié)果表明,相比于 Flink 原本的批數(shù)據(jù) shuffle 實(shí)現(xiàn),新的數(shù)據(jù) shuffle 實(shí)現(xiàn)可以實(shí)現(xiàn) 2-6 倍的性能提升,如果排除計(jì)算時(shí)間,只統(tǒng)計(jì)數(shù)據(jù) shuffle 時(shí)間可以是先最高 10 倍的性能提升。下表展示了性能提升的詳細(xì)數(shù)據(jù):


          在我們的測(cè)試集群上,每塊機(jī)械硬盤(pán)的數(shù)據(jù)讀取以及寫(xiě)入帶寬可以達(dá)到 160MB/s:



          注:我們的測(cè)試環(huán)境配置如下,由于我們有較大的內(nèi)存,所以一些 shuffle 數(shù)據(jù)量小的作業(yè)實(shí)際數(shù)據(jù) shuffle 僅為讀寫(xiě)內(nèi)存,因此上面的表格僅列出了一些 shuffle 數(shù)據(jù)量大,性能提升明顯的查詢(xún):


          五、調(diào)優(yōu)參數(shù)


          在 Flink 中,sort-shuffle 默認(rèn)是不開(kāi)啟的,想要開(kāi)啟需要調(diào)小這個(gè)參數(shù)的配置taskmanager.network.sort-shuffle.min-parallelism這個(gè)參數(shù)的含義是如果數(shù)據(jù)分區(qū)的個(gè)數(shù)(一個(gè)計(jì)算任務(wù)并發(fā)需要發(fā)送數(shù)據(jù)給幾個(gè)下游計(jì)算節(jié)點(diǎn))低于這個(gè)值,則走 hash-shuffle 的實(shí)現(xiàn),如果高于這個(gè)值則啟用 sort-shuffle。實(shí)際應(yīng)用時(shí),在機(jī)械硬盤(pán)上,可以配置為 1,即使用 sort-shuffle。


          Flink 沒(méi)有默認(rèn)開(kāi)啟數(shù)據(jù)壓縮,對(duì)于批處理作業(yè),大部分場(chǎng)景下是建議開(kāi)啟的,除非數(shù)據(jù)壓縮率低。開(kāi)啟的參數(shù)taskmanager.network.blocking-shuffle.compression.enabled。

          對(duì)于 shuffle 數(shù)據(jù)寫(xiě)和數(shù)據(jù)讀,都需要占用內(nèi)存緩沖區(qū)。其中,數(shù)據(jù)寫(xiě)緩沖區(qū)的大小taskmanager.network.sort-shuffle.min-buffers制,數(shù)據(jù)讀緩沖區(qū)taskmanager.network.sort-shuffle.min-buffers制。數(shù)據(jù)寫(xiě)緩沖區(qū)從網(wǎng)絡(luò)內(nèi)存中切分出來(lái),如果要增大數(shù)據(jù)寫(xiě)緩沖區(qū)可能還需要增大網(wǎng)絡(luò)內(nèi)存總大小,以避免出現(xiàn)網(wǎng)絡(luò)內(nèi)存不足的錯(cuò)誤。數(shù)據(jù)讀緩沖區(qū)從框架的 off-heap 內(nèi)存中切分出來(lái),如果要增大數(shù)據(jù)讀緩沖區(qū),可能還需要增大框架的 off-heap 內(nèi)存,以避免出現(xiàn) direct 內(nèi)存 OOM 錯(cuò)誤。一般而言更大的內(nèi)存緩沖區(qū)可以帶來(lái)更好的性能,對(duì)于大規(guī)模批作業(yè),幾百兆的數(shù)據(jù)寫(xiě)緩沖區(qū)與讀緩沖區(qū)是足夠的。

          六、未來(lái)展望


          還有一些后續(xù)的優(yōu)化工作,包括但不限于:


          1. 網(wǎng)絡(luò)連接復(fù)用,這可以提高網(wǎng)絡(luò)的建立的性能與穩(wěn)定性,相關(guān) Jira 包括 FLINK-22643 以及 FLINK-15455;

          2. 多磁盤(pán)負(fù)載均衡,這有利于解決負(fù)載不均的問(wèn)題,相關(guān) Jira 包括 FLINK-21790 以及 FLINK-21789;

          3. 實(shí)現(xiàn)遠(yuǎn)程數(shù)據(jù) shuffle 服務(wù),這有利于進(jìn)一步提升批數(shù)據(jù) shuffle 的性能與穩(wěn)定性;

          4. 允許用戶(hù)選擇磁盤(pán)類(lèi)型,這可以提高易用性,用戶(hù)可以根據(jù)作業(yè)的優(yōu)先級(jí)選擇使用 HDD 或者 SSD。

          英文原文鏈接:
          https://flink.apache.org/2021/10/26/sort-shuffle-part1.html
          https://flink.apache.org/2021/10/26/sort-shuffle-part2.html




          12 月 4-5 日,Flink Forward Asia 2021 重磅開(kāi)啟,全球 40+ 多行業(yè)一線(xiàn)廠商,80+ 干貨議題,帶來(lái)專(zhuān)屬于開(kāi)發(fā)者的技術(shù)盛宴;

          另有首屆 Flink Forward Asia Hackathon 正式啟動(dòng),10W 獎(jiǎng)金等你來(lái)!

          點(diǎn)擊文末「閱讀原文」即可免費(fèi)報(bào)名~


          ▼ 關(guān)注「ApacheFlink」視頻號(hào),遇見(jiàn)更多大咖 
          更多 Flink 相關(guān)技術(shù)問(wèn)題,可掃碼加入社區(qū)釘釘交流群~
             戳我,報(bào)名 FFA 2021 大會(huì)!
          瀏覽 55
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日本成人中文在线 | 成人午夜影院中文 | 无码不卡在线播放 | 无码操逼视频免费有声音 | 中国一级免费毛片 |