<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink SQL 性能優(yōu)化:multiple input 詳解

          共 5858字,需瀏覽 12分鐘

           ·

          2021-03-17 17:28


          執(zhí)行效率的優(yōu)化一直是 Flink 追尋的目標。在大多數(shù)作業(yè),特別是批作業(yè)中,數(shù)據(jù)通過網(wǎng)絡(luò)在 task 之間傳遞(稱為數(shù)據(jù) shuffle)的代價較大。正常情況下一條數(shù)據(jù)經(jīng)過網(wǎng)絡(luò)需要經(jīng)過序列化、磁盤讀寫、socket 讀寫與反序列化等艱難險阻,才能從上游 task 傳輸?shù)较掠?;而相同?shù)據(jù)在內(nèi)存中的傳輸,僅需要耗費幾個 CPU 周期傳輸一個八字節(jié)指針即可。

          Flink 在早期版本中已經(jīng)通過 operator chaining 機制,將并發(fā)相同的相鄰單輸入算子整合進同一個 task 中,消除了單輸入算子之間不必要的網(wǎng)絡(luò)傳輸。然而,join 等多輸入算子之間同樣存在額外的數(shù)據(jù) shuffle 問題,shuffle 數(shù)據(jù)量最大的 source 節(jié)點與多輸入算子之間的數(shù)據(jù)傳輸也無法利用 operator chaining 機制進行優(yōu)化。

          在 Flink 1.12 中,我們針對目前 operator chaining 無法覆蓋的場景,推出了 multiple input operator 與 source chaining 優(yōu)化。該優(yōu)化將消除 Flink 作業(yè)中大多數(shù)冗余 shuffle,進一步提高作業(yè)的執(zhí)行效率。本文將以一個 SQL 作業(yè)為例介紹上述優(yōu)化,并展示 Flink 1.12 在 TPC-DS 測試集上取得的成果。



          優(yōu)化案例解析:訂單量統(tǒng)計



          我們將以 TPC-DS q96 為例子詳細介紹如何消除冗余 shuffle,該 SQL 意在通過多路 join 篩選并統(tǒng)計符合特定條件的訂單量。


          select count(*) from store_sales    ,household_demographics     ,time_dim, storewhere ss_sold_time_sk = time_dim.t_time_sk       and ss_hdemo_sk = household_demographics.hd_demo_sk     and ss_store_sk = s_store_sk    and time_dim.t_hour = 8    and time_dim.t_minute >= 30    and household_demographics.hd_dep_count = 5    and store.s_store_name = 'ese'


          圖 1 - 初始執(zhí)行計劃



          冗余 Shuffle 是如何產(chǎn)生的?



          由于部分算子對輸入數(shù)據(jù)的分布有要求(如 hash join 算子要求同一并發(fā)內(nèi)數(shù)據(jù) join key 的 hash 值相同),數(shù)據(jù)在算子之間傳遞時可能需要經(jīng)過重新排布與整理。與 map-reduce 的 shuffle 過程類似,F(xiàn)link shuffle 將上游 task 產(chǎn)生的中間結(jié)果進行整理,并按需發(fā)送給需要這些中間結(jié)果的下游 task。但在一部分情況下,上游產(chǎn)出的數(shù)據(jù)已經(jīng)滿足了數(shù)據(jù)分布要求(如連續(xù)多個 join key 相同的 hash join 算子),此時對數(shù)據(jù)的整理便不再必要,由此產(chǎn)生的 shuffle 也就成為了冗余 shuffle,在執(zhí)行計劃中以 forward shuffle 表示。


          圖 1 中的 hash join 算子是一種稱為 broadcast hash join 的特殊算子。以 store_sales join time_dim 為例,由于 time_dim 表數(shù)據(jù)量很小,此時通過 broadcast shuffle 將該表的全量數(shù)據(jù)發(fā)送給 hash join 的每個并發(fā),就能讓任何并發(fā)接受 store_sales 表的任意數(shù)據(jù)而不影響 join 結(jié)果的正確性,同時提高 hash join 的執(zhí)行效率。此時 store_sales 表向 join 算子的網(wǎng)絡(luò)傳輸也成為了冗余 shuffle。同理幾個 join 之間的 shuffle 也是不必要的。




          圖 2 - 冗余的shuffle(紅框標記)



          除 hash join 與 broadcast hash join 外,產(chǎn)生冗余 shuffle 的場景還有很多,例如 group key 與 join key 相同的 hash aggregate + hash join、group key 具有包含關(guān)系的多個 hash aggregate 等等,這里不再展開描述。



          Operator Chaining 能解決嗎?



          對 Flink 優(yōu)化過程有一定了解的讀者可能會知道,為了消除不必要的 forward shuffle,F(xiàn)link 在早期就已經(jīng)引入了 operator chaining 機制。該機制將并發(fā)相同的相鄰單輸入算子整合進同一個 task 中,并在同一個線程中一起運算。Operator chaining 機制在圖 1 中其實已經(jīng)在發(fā)揮作用,如果沒有它,做 broadcast shuffle 的三個 Source 節(jié)點名稱中被“->”分隔的算子將會被拆分至多個不同的 task,產(chǎn)生冗余的數(shù)據(jù) shuffle。圖 3 為 Operator chaining 關(guān)閉是的執(zhí)行計劃。


          圖 3 - Operator chaining關(guān)閉后的執(zhí)行計劃



          減少數(shù)據(jù)在 TM 之間通過網(wǎng)絡(luò)和文件傳輸并將算子鏈接合并入 task 是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化與反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換,并減少延遲的同時提高整體吞吐量。然而,operator chaining 對算子的整合有非常嚴格的限制,其中一條就是“下游算子的入度為 1”,也就是說下游算子只能有一路輸入。這就將多路輸入的算子(如 join)排除在外。



          多輸入算子的解決方案:

          Multiple Input Operator



          如果我們能仿照 operator chaining 的優(yōu)化思路,引入新的優(yōu)化機制并滿足以下條件:


          1. 該機制可以組合多輸入的算子;

          2. 該機制支持多路輸入(為被組合的算子提供輸入)


          我們就可以將用 forward shuffle 連接的的多輸入算子放到一個 task 里執(zhí)行,從而消除不必要的 shuffle。Flink 社區(qū)很早就關(guān)注到了 operator chaining 的不足,在 Flink 1.11 中引入了 streaming api 層的 MultipleInputTransformation 以及對應(yīng)的 MultipleInputStreamTask。這些 api 滿足了上述條件 2,而 Flink 1.12 在此基礎(chǔ)上在 SQL 層中實現(xiàn)了滿足條件 1 的新算子——multiple input operator,可以參考 FLIP 文檔[1]。


          Multiple input operator 是 table 層一個可插拔的優(yōu)化。它位于 table 層優(yōu)化的最后一步,遍歷生成的執(zhí)行計劃并將不被 exchange 阻隔的相鄰算子整合進一個 multiple input operator 中。圖 4 展示了該優(yōu)化對原本 SQL 優(yōu)化步驟的修改。



          圖 4 - 加入 multiple input operator 后的優(yōu)化步驟


          讀者可能會有疑問:為什么不在現(xiàn)有的 operator chaining 上進行修改,而要另起爐灶呢?實際上,multiple input operator 除了要完成 operator chaining 的工作之外,還需要對各個輸入的優(yōu)先級進行排序。這是因為一部分多輸入算子(如 hash join 與 nested loop join)對輸入有嚴格的順序限制,若輸入優(yōu)先級排序不當很可能造成死鎖。由于算子輸入優(yōu)先級的信息僅在 table 層的算子中有描述,更加自然的方式是在 table 層引入該優(yōu)化機制。


          值得注意的是,multiple input operator 不同于管理多個 operator 的 operator chaining,其本身就是一整個大 operator,而其內(nèi)部運算在外界看來就是一個黑盒。Multiple input operator 的內(nèi)部結(jié)構(gòu)在 operator name 中完全體現(xiàn),讀者在運行包含該 operator 的作業(yè)時,可以從 operator name 看到哪些算子以怎樣的拓撲結(jié)構(gòu)被組合進了 multiple input operator 中。


          圖 5 展示了經(jīng)過 multiple input 優(yōu)化后的算子的拓撲圖以及 multiple input operator 的透視圖。圖中三個 hash join 算子之間的冗余的 shuffle 被移除后,它們可以在一個 task 里執(zhí)行,只不過 operator chaining 沒法處理這種多輸入的情況,將它們放到 multiple input operator 里執(zhí)行,由 multiple input operator 管理各個算子的輸入順序和算子之間的調(diào)用關(guān)系。





          圖 5 - 經(jīng)過 multiple input 優(yōu)化后的算子拓撲圖

          Multiple input operator 的構(gòu)建和運行過程較為復(fù)雜,對此細節(jié)有興趣的讀者可以參考設(shè)計文檔[2]。



          Source 也不能遺漏:Source Chaining



          經(jīng)過 multiple input operator 的優(yōu)化,我們將圖 1 中的執(zhí)行計劃優(yōu)化為圖 6,圖 3 經(jīng)過 operator chaining 優(yōu)化后就變?yōu)閳D 6 的執(zhí)行圖。



          圖 6 - 經(jīng)過 multiple input operator 優(yōu)化后的執(zhí)行計劃


          圖 6 中從 store_sales 表產(chǎn)生的 forward shuffle(如紅框所示)表示我們?nèi)杂袃?yōu)化空間。正如序言中所說,在大部分作業(yè)中,從 source 直接產(chǎn)生的數(shù)據(jù)由于沒有經(jīng)過 join 等算子的篩選和加工,shuffle 的數(shù)據(jù)量是最大的。以 10T 數(shù)據(jù)下的 TPC-DS q96 為例,如果不進行進一步優(yōu)化,包含 store_sales 源表的 task 將向網(wǎng)絡(luò)中傳輸 1.03T 的數(shù)據(jù),而經(jīng)過一次 join 的篩選后,數(shù)據(jù)量急速下降至 16.5G。如果我們能將源表的 forward shuffle 省去,作業(yè)整體執(zhí)行效率又能前進一大步。


          可惜的是,multiple input operator 也不能覆蓋 source shuffle 的場景,這是因為 source 不同于其它任何算子,它沒有任何輸入。Flink 1.12 為此給 operator chaining 新增了 source chaining 功能,將不被 shuffle 阻隔的 source 合并到 operator chaining 中,省去了 source 與下游算子之間的 forward shuffle。


          目前僅有 FLIP-27 source 以及 multiple input operator 可以利用 source chaining 功能,不過這已經(jīng)足夠解決本文中的優(yōu)化場景。


          結(jié)合 multiple input operator 與 source chaining 之后,圖 7 展示了本文優(yōu)化案例的最終執(zhí)行方案。





          圖 7 - 優(yōu)化后的執(zhí)行方案



          TPC-DS 測試結(jié)果



          Multiple input operator 與 source chaining 對大部分作業(yè),特別是批作業(yè)有顯著的優(yōu)化效果。我們利用 TPC-DS 測試集對 Flink 1.12 的整體性能進行了測試,與 Flink 1.10 公布的 12267s 總用時相比,F(xiàn)link 1.12 的總用時僅為 8708s,縮短了近 30% 的運行時間!



          圖 8 - TPC-DS 測試集總用時對比



          圖 9 - TPC-DS 部分測試點用時對比



          未來計劃



          通過 TPC-DS 的測試效果看到,source chaining + multiple input 能夠給我們帶來很大的性能提升。目前整體框架已完成,常用批算子已支持消除冗余 exchange 的推導(dǎo)邏輯,后續(xù)我們將支持更多的批算子和更精細的推導(dǎo)算法。



          流作業(yè)的數(shù)據(jù) shuffle 雖然不需要像批作業(yè)一樣將數(shù)據(jù)寫入磁盤,但將網(wǎng)絡(luò)傳輸變?yōu)閮?nèi)存?zhèn)鬏攷淼男阅芴嵘彩欠浅?捎^的,因此流作業(yè)支持 source chaining + multiple input 也是一個非常令人期待的優(yōu)化。同時,在流作業(yè)上支持該優(yōu)化還需要很多工作,例如流算子上消除冗余 exchange 的推導(dǎo)邏輯暫未支持,一些算子需要重構(gòu)以消除輸入數(shù)據(jù)是 binary 的要求等等,這也是為什么 Flink 1.12 暫未在流作業(yè)中推出推出該優(yōu)化的原因。后續(xù)版本我們將逐步完成這些工作,也希望更多社區(qū)的力量加入我們一起盡早的將更多的優(yōu)化落地。


          另外,阿里云實時計算團隊圍繞 Apache Flink 為核心打造的實時大數(shù)據(jù)平臺,在阿里巴巴內(nèi)部提供全集團范圍的流批一體數(shù)據(jù)分析服務(wù),同時也通過阿里云向外界提供 Flink 企業(yè)級云產(chǎn)品,服務(wù)廣大中小企業(yè)。我們的技術(shù)團隊圍繞開源大數(shù)據(jù)技術(shù)體系構(gòu)建,包括來自 Apache Flink/Hadoop/HBase/Kafka/Hive/Druid  等多個頂級開源項目的眾多 PMC/Committer 成員,加入實時計算團隊將可以與眾多技術(shù)大神共同探索大數(shù)據(jù)技術(shù)世界,感興趣的同學(xué)請速聯(lián)系:[email protected]。



          參考鏈接:

          [1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink
          [2]https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI/

          瀏覽 68
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线视频亚洲无码 | 视频二区在线 | 一区二区三区四区无码在线 | 久久精品国产青青草 | 日韩黄色免费网站 |