<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spark 優(yōu)化 | 京東 Spark 基于 Bloom Filter 算法的 Runtime Filter Join 優(yōu)化機制

          共 7717字,需瀏覽 16分鐘

           ·

          2022-02-15 18:20

          前言


          京東Spark作為京東內(nèi)部的統(tǒng)一分析計算引擎,目前服務(wù)于京東零售、京東物流、京東數(shù)字科技、京喜事業(yè)群、京東國際、京東安聯(lián)、京東工業(yè)等各子集團,其承載著諸多核心業(yè)務(wù)線,其任務(wù)量逐年攀升,其任務(wù)增長趨勢如圖1所示,面對日益增長的業(yè)務(wù)量以及復(fù)雜、多變的業(yè)務(wù)場景,京東Spark計算引擎研發(fā)團隊面臨著諸多挑戰(zhàn),其中典型問題包括:穩(wěn)定性挑戰(zhàn)、計算效率提升挑戰(zhàn)、降本增效挑戰(zhàn)、滿足業(yè)務(wù)個性化需求挑戰(zhàn)。京東Spark計算引擎研發(fā)團隊通過對Apache Spark的技術(shù)改進及架構(gòu)升級,提供了一套高可用、高性能服務(wù),以滿足內(nèi)部大量離線、實時分析型的查詢需求。


          圖 1 任務(wù)增長趨勢


          問題及現(xiàn)狀


          在京東Spark任務(wù)中,Spark SQL的任務(wù)量占比較高(90%+),在生產(chǎn)場景中,我們發(fā)現(xiàn)有很多SQL業(yè)務(wù)場景需要一張小表(相對小)Inner / Left Outer Join 一張大表。這里的小表是指不會觸發(fā)BroadcastHashJoin,默認走SortMergeJoin,了解Spark的同學(xué)會知道,SortMergeJoin 的實現(xiàn)邏輯:首先,兩張表會分別根據(jù)join條件確定每條記錄的key,基于該key做shuffle write將可能join到一起的記錄分到同一個分區(qū)中;其次,在shuffle read階段將兩個表中相同分區(qū)的數(shù)據(jù)進行拉取并進行sort merge;最終,將滿足join條件的數(shù)據(jù)進行join輸出,其中滿足join條件與具體SQL join策略相關(guān),對于上面提及的場景,滿足join條件是以左側(cè)小表為基準(zhǔn)的,即在兩表join時只會將大表側(cè)滿足與小表側(cè)數(shù)據(jù)相等條件的數(shù)據(jù)保留,對于大表側(cè)不符合join條件的數(shù)據(jù),我們認為是無效數(shù)據(jù),由于無效數(shù)據(jù)參與shuffle、sort merge計算,一方面影響任務(wù)時效,另一方面無效的shuffle數(shù)據(jù)會增加集群磁盤IO、網(wǎng)絡(luò)IO等負載,同時浪費大量的計算資源。
          下面是京東某重要業(yè)務(wù)線的一個真實案例,如圖2所示,其中左側(cè)某商品SKU表(小表)與右側(cè)某商品SKU類目表(大表)進行Inner Join,通過Spark DAQ執(zhí)行計劃我們可以看出,二者走的是SortMergeJoin,其中左側(cè)小表過濾后參與計算的數(shù)據(jù)量大概15億條、shuffle量大概是168MB;而右側(cè)大表的數(shù)據(jù)量大概是1558億條、shuffle量高達2.5TB,通過最終join輸出,可以看到滿足join條件的數(shù)據(jù)僅7000多萬,結(jié)合上面的分析, 說明大表側(cè)有大部分數(shù)據(jù)是不滿足join條件的,但這些無效數(shù)據(jù)都參與shuffle和sort merge環(huán)節(jié),不但影響任務(wù)時效,還會增加集群負載等。本文基于上述問題進行深度分析,并重點介紹我們在Spark內(nèi)核所做的優(yōu)化。


          ? ??圖 2 京東某重要業(yè)務(wù)線的真實案例


          Runtime Filter Join優(yōu)化機制


          本文討論京東Spark計算引擎研發(fā)團隊基于Bloom Filter算法的Runtime Filter Join優(yōu)化機制,助力京東大促場景的探索和實踐。
          Runtime Filter Join是京東Spark計算引擎研發(fā)團隊最近推出的新特性,其原理是兩個表進行join時,動態(tài)基于小表(相對小)構(gòu)建BloomFilter來對大表側(cè)進行過濾,過濾掉大表一側(cè)不會被命中(不滿足join條件)的輸入數(shù)據(jù),從而大幅減少磁盤IO、網(wǎng)絡(luò)IO和計算的技術(shù),并能夠提升任務(wù)的整體執(zhí)行時間。

          在詳細介紹Runtime Filter Join特性之前,先對Spark SQL 的整體架構(gòu)做一下概述, 由下圖3可知,無論使用DataFrame、 SQL 語句還是使用DataSet ,都會經(jīng)過如下步驟轉(zhuǎn)換成 DAG 對 RDD 的操作:


          圖 3 Spark SQL 的整體架構(gòu)


          1. 先解析 SQL,生成 Unresolved Logical Query Plan

          2. 由 Analyzer 結(jié)合 Catalog 信息生成 Resolved Logical Plan

          3. Optimizer根據(jù)預(yù)先定義好的規(guī)則對 Resolved Logical Plan 進行優(yōu)化并生成 Optimized Logical Plan

          4. Query Planner 將 Optimized Logical Plan 轉(zhuǎn)換成多個 Physical Plan

          5. 根據(jù) Cost Model 算出每個 Physical Plan 的代價并選取代價最小的 Physical Plan 作為最終的 Physical Plan

          6. Spark 以 DAG 的方法執(zhí)行上述 Physical Plan

          7. 在Spark 3.0版本以后,引入AQE(Adaptive Query Execution),其在執(zhí)行 DAG 的過程中,會根據(jù)運行時信息動態(tài)調(diào)整執(zhí)行計劃從而提高執(zhí)行效率。

          JD Spark Runtime Filter Join的技術(shù)實現(xiàn)包括:基于邏輯執(zhí)行計劃(Logical Plan)數(shù)據(jù)裁剪和基于物理執(zhí)行計劃(Physical Plan)數(shù)據(jù)裁剪,其中前者是在邏輯執(zhí)行計劃(Logical Plan)優(yōu)化過程中(上述步驟3),動態(tài)基于小表數(shù)據(jù)構(gòu)建BloomFilter,并將該BloomFilter 算子插入到大表側(cè)對其進行過濾;后者是對前者場景的補充,如果未能在邏輯執(zhí)行計劃優(yōu)化中動態(tài)插入BloomFilter算子對大表進行過濾,在物理執(zhí)行計劃優(yōu)化中(上述步驟7),通過AQE基于小表構(gòu)建BloomFilter并對大表進行過濾,二者的主要區(qū)別是前者是在大表shuffle 前進行過濾,后者是在大表shuffle后進行過濾,其具體實現(xiàn)架構(gòu)如圖4所示,為了描述方便我們這里結(jié)合一個case進行闡述,下面以Table-A Inner Join Table-B為例,其中Table-A是一張小表:


          圖 4 Runtime Filter Join架構(gòu)圖


          基于Bloom Filter算法的Runtime Filter Join優(yōu)化機制的執(zhí)行流程如下:

          1. 在邏輯執(zhí)行計劃過程中,我們實現(xiàn)了相應(yīng)的Rule,首先判斷小表Table-A 是否可以廣播,如果可以廣播會走BroadcastHashJoin,因BroadcastHashJoin不會引入shuffle、sort,往往性能表現(xiàn)良好,因此對于滿足BroadcastHashJoin的場景,保持Spark原生計算邏輯;否則,會默認走SortMergeJoin;

          2. 對于SortMergeJoin,上述Rule會根據(jù)表數(shù)據(jù)文件大小和schema信息對兩個表的Row count進行評估,并基于兩表的Row count信息進行代價評估(例如:Table-A-Row-count/Table-B-Row-count小于一定閾值等),當(dāng)小表Row count與大表Row count滿足一定條件,上述Rule會基于Table-A的join keys動態(tài)構(gòu)建BloomFilter,我們定義為RuntimeBloomFilter,并作為Filter算子動態(tài)插入到Table-B的過濾條件中,因RuntimeBloomFilter會在大表shuffle前進行過濾,提前過濾掉大表側(cè)join時不會被命中的數(shù)據(jù),從而減少大表的shuffle量;

          3. 在物理執(zhí)行計劃過程中,同樣實現(xiàn)了相應(yīng)的Rule,該Rule會根據(jù)Join策略的不同,檢查SortMergeJoin的Left或Right節(jié)點中,是否在邏輯執(zhí)行計劃中已基于小表join keys構(gòu)建了BloomFilter對大表側(cè)進行過濾,如果存在,該步驟會自動跳過;否則,我們會基于AQE在執(zhí)行計劃中,基于小表join keys動態(tài)構(gòu)建BloomFilter,這里定義其為ShuffleBloomFilter,并作為Filter算子動態(tài)插入讀取大表shuffle數(shù)據(jù)后面,對shuffle數(shù)據(jù)過濾,從而減少大表join 前sort的數(shù)據(jù)量、減少spill數(shù)據(jù)量。這里與以上優(yōu)化的不同點在于,其作為AQE的一條規(guī)則且依賴于AQE功能的開啟,該項優(yōu)化主要是用于優(yōu)化中間Stage。


          基于上述架構(gòu)設(shè)計在實現(xiàn)中面臨的挑戰(zhàn)及技術(shù)攻堅:


          1. BloomFilter算子支持codegen

          為了實現(xiàn)上述架構(gòu)設(shè)計,我們內(nèi)部自定義了BloomFilter算子,為了提升該算子的處理性能,內(nèi)部實現(xiàn)的BloomFilter算子支持codegen。


          2. 支持多join keys場景且多個join keys只需一個BloomFilter

          兩張表join時,可能存在多個關(guān)聯(lián)鍵,例如:Table-A a Inner Join Table-B b on a.col1=b.col1 and a.col2=b.col2,如果基于小表每個join key都構(gòu)建一個BloomFilter并分別作為Filter算子對大表側(cè)進行過濾,會導(dǎo)致多次讀取小表,增加IO,另外由于BloomFilter是以廣播向量的方式存在,如果產(chǎn)生過多的BloomFilter會帶來driver和executor的內(nèi)存壓力,為此我們采用XxHash64 對join keys 進行處理,以獲取一個新的散列值,基于新的散列值構(gòu)建BloomFilter并對大表側(cè)過濾,這樣能夠有效提升性能及減少OOM發(fā)生。
          選取XxHash64 處理多join keys的原因:首先,XxHash64支持codegen,能夠與上述實現(xiàn)的BloomFilter算子有機結(jié)合,另外基于HashBenchmark驗證可知,XxHash64在處理基本數(shù)據(jù)類型相對Murmur3Hash、HiveHash均有不錯的表現(xiàn),且京東線上的join keys一般是string類型,下面是驗證效果:


          ? ??

          3. 設(shè)計構(gòu)建BloomFilter的timeout fallback機制

          結(jié)合上面的Runtime Filter Join的執(zhí)行流程可知,兩表join時如果觸發(fā)了RuntimeBloomFilter或ShuffleBloomFilter特性,均會先基于小表構(gòu)建相應(yīng)的BloomFilter,然后再作為Filter算子對大表進行過濾,這就要求對大表的處理需要等待基于小表構(gòu)建BloomFilter完后才執(zhí)行,極端情況下可能會導(dǎo)致任務(wù)延遲問題,這在京東復(fù)雜、多變且SLA要求嚴格的背景下,如果想全場景鋪開Runtime Filter Join特性會面臨很大的挑戰(zhàn)。
          為了解決上述問題,我們設(shè)計構(gòu)建BloomFilter的timeout fallback機制,即在規(guī)定時間內(nèi)未能完成基于“小表”構(gòu)建BloomFilter,Spark執(zhí)行計劃會自動回退到原始處理邏輯,這樣能夠有效規(guī)避因大表被誤判成小表,導(dǎo)致構(gòu)建BloomFilter耗時過長所引起的性能回歸問題等。


          4. 設(shè)計Rule 支持BloomFilter 謂詞下推

          Spark SQL在多個join場景的一個典型優(yōu)化就是謂詞下推,即可以通過內(nèi)部Rule優(yōu)化,將某一組join的謂詞下推到其他join,從而過濾掉其他join的無效數(shù)據(jù),提升性能。通過分析京東線上業(yè)務(wù)場景,發(fā)現(xiàn)大部分任務(wù)存在多個join場景,例如:A as a join B as b on a.col1=b.col1 join C as c on a.col1=c.col1。
          然而,本文引入的BloomFilter算子是一種基于子查詢的封裝,基于Spark SQL 現(xiàn)有的內(nèi)置Rule不能將其進行謂詞下推,為此我們內(nèi)部實現(xiàn)了相應(yīng)的Rule支持BloomFilter算子的謂詞下推,在滿足多個join且關(guān)聯(lián)鍵相同的情況下,該Rule能夠基于某一組join構(gòu)建的BloomFilter下推到其他join,最終實現(xiàn)基于一個BloomFilter過濾多組join的能力,下面結(jié)合一個案例進行展示,其中 tb1 和 tb2是兩個大表,tb3是小表,其中tb1數(shù)據(jù)量10000條,tb2數(shù)據(jù)量是9000條,tb3數(shù)據(jù)量是10條。
          spark.range(100000)  .select(col("id").as("a"), col("id").as("b"), col("id").as("c"))  .write.format(tableFormat).mode(SaveMode.Overwrite)  .saveAsTable("tb1")spark.range(9000)  .select(col("id").as("a"), col("id").as("b"), col("id").as("c"))  .write.format(tableFormat).mode(SaveMode.Overwrite)  .saveAsTable("tb2")spark.range(10)  .select(col("id").as("a"), col("id").as("b"), col("id").as("c"))  .write.format(tableFormat).mode(SaveMode.Overwrite)  .saveAsTable("tb3")set spark.sql.autoBroadcastJoinThreshold=-1;sql(s"""     |SELECT tb1.a,     |       tb2.b     |FROM tb1     |      Inner JOIN tb3     |          ON tb1.a = tb3.a AND tb3.b < 4     |      Inner JOIN tb2     |       ON tb1.a = tb2.a     |""".stripMargin)


          為了驗證效果,我們這里關(guān)閉BroadcastHashJoin (spark.sql.autoBroadcastJoinThreshold=-1),執(zhí)行上面語句,Spark會優(yōu)先處理 tb1 Inner JOIN tb3 ON tb1.a = tb3.a AND tb3.b < 4,基于Bloom Filter算法的Runtime Filter Join優(yōu)化機制會自動識別出tb3是一張小表,并基于tb3構(gòu)建RuntimeBloomFilter對大表tb1進行過濾,然后進行SortMergeJoin,其DAG如圖5所示,由圖5可知tb1的數(shù)據(jù)量得到有效過濾,只保留滿足join條件的4條數(shù)據(jù),這在生產(chǎn)環(huán)境中的收益是顯著的,具體上線效果在京東實際場景中的收益中會闡述。


          圖 5 基于Runtime Filter Join優(yōu)化效果


          如果不引入BloomFilter謂詞下推優(yōu)化,我們的優(yōu)化到此已經(jīng)結(jié)束,上述計算結(jié)果最終再與tb2進行SortMergeJoin,這時我們發(fā)現(xiàn)tb2的數(shù)據(jù)會全部參與計算,其中9000條數(shù)據(jù)均會進行shuffle并最終與第一組join的結(jié)果進行join匹配,但滿足join條件的數(shù)據(jù)也僅4條,即tb2存在大量無效數(shù)據(jù)參與shuffle、sort,對于上述場景我們自定義的Rule能夠有效將基于tb3構(gòu)建RuntimeBloomFilter下推到第二組join里,并對tb2進行謂詞下推,其具體的優(yōu)化效果如圖6所示,由圖可知tb2的數(shù)據(jù)量也得到有效過濾,只保留了滿足join條件的4條數(shù)據(jù),最終實現(xiàn)了基于一組join的BloomFilter謂詞下推到其他join,從而減少shuffle量,減少集群負載,提升任務(wù)時效,同時節(jié)省大量的計算資源。


          圖 6 基于BloomFilter謂詞下推的優(yōu)化效果


          在TPC-DS基準(zhǔn)場景中的收益


          基于TPC-DS 10TB 基準(zhǔn)測試驗證,如圖7所示,99個查詢中有9個被Runtime Filter Join特性命中,其中shuffle量減少1.5%~73.7%。

          圖 7 基于TPC-DS基準(zhǔn)測試驗證


          在京東實際場景中的收益


          下面是京東線上的優(yōu)化案例:其為某重要業(yè)務(wù)線的某商品SKU屬性表(小表)與某商品SKU擴展屬性模型表(大表)進行Left Outer Join,因業(yè)務(wù)需要兩張表在join前會根據(jù)相關(guān)字段進行g(shù)roup by,最終再通過商品SKU編號進行關(guān)聯(lián),涉及業(yè)務(wù)較為復(fù)雜,這里不展開描述,該業(yè)務(wù)場景是非常符合本文的Runtime Filter Join優(yōu)化,下面是優(yōu)化前后的對比效果:


          圖 8 某核心任務(wù)優(yōu)化前效果


          圖 9 某核心任務(wù)優(yōu)化后效果


          由上面優(yōu)化前后的對比效果圖8和圖9可知,1)優(yōu)化前,該線上任務(wù)的shuffle量大概是:16TB+,任務(wù)運行時間:4.4小時;2)基于Runtime Filter Join優(yōu)化后,shuffle量減少至:200MB+,任務(wù)運行時間縮短至:4分鐘,計算性能提升:95%+,同時節(jié)省了大量的計算資源。基于Bloom Filter算法的Runtime Filter Join優(yōu)化機制在京東的線上優(yōu)化案例不勝枚舉,限于篇幅原因這里不一一展開。
          目前,基于Bloom Filter算法的Runtime Filter Join優(yōu)化機制已全面上線(默認關(guān)閉),基于京東自研Spark版本相對Spark社區(qū)版本,命中任務(wù)平均處理數(shù)據(jù)量(shuffle量)減少72%、性能提升53%。


          計劃與展望


          本文討論京東Spark計算引擎研發(fā)團隊基于Bloom Filter算法的Runtime Filter Join優(yōu)化機制,助力京東大促場景的探索和實踐。目前上線效果顯著,但仍有一些不足及待提升項:

          1、目前表Row count主要是通過表文件大小及其schema信息進行判斷,存在一定誤差,后續(xù)會嘗試開啟CBO進行精準(zhǔn)判斷;

          2、目前是基于小表動態(tài)構(gòu)建BloomFilter,需要讀取小表的數(shù)據(jù)信息,后續(xù)會考慮基于列式文件存儲的一些特性來豐富整個架構(gòu),例如:如果要基于小表的全量數(shù)據(jù)構(gòu)建BloomFilter,且存儲格式是ORC或Parquet等,可直接基于其Data文件內(nèi)部的Index BloomFilter構(gòu)建全表的BloomFilter,這樣可以進一步提升性能;

          3、目前優(yōu)化的主要是兩表關(guān)聯(lián)的場景,后續(xù)需要增強對多表關(guān)聯(lián)等復(fù)雜場景的支持。

          我們后續(xù)計劃將Spark Runtime Filter Join 技術(shù)與數(shù)據(jù)湖技術(shù)相結(jié)合,助力京東湖倉一體相關(guān)場景的落地及實踐。同時,我們會繼續(xù)加強行業(yè)內(nèi)技術(shù)交流,在分享內(nèi)部技術(shù)實踐與經(jīng)驗的同時,聆聽大家的反饋和建議,在滿足內(nèi)部業(yè)務(wù)增長需要的同時,相關(guān)技術(shù)會反饋給社區(qū),共同建設(shè)Spark生態(tài)。
          瀏覽 345
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  9999久久久久 | 无码免费在线观看视频 | 国产精品国内自产 | 肏逼视频在线免费观看 | 亚洲免费在线视频观看 |