在詳細介紹Runtime Filter Join特性之前,先對Spark SQL 的整體架構(gòu)做一下概述, 由下圖3可知,無論使用DataFrame、 SQL 語句還是使用DataSet ,都會經(jīng)過如下步驟轉(zhuǎn)換成 DAG 對 RDD 的操作:

圖 3 Spark SQL 的整體架構(gòu)
1. 先解析 SQL,生成 Unresolved Logical Query Plan
2. 由 Analyzer 結(jié)合 Catalog 信息生成 Resolved Logical Plan
3. Optimizer根據(jù)預(yù)先定義好的規(guī)則對 Resolved Logical Plan 進行優(yōu)化并生成 Optimized Logical Plan
4. Query Planner 將 Optimized Logical Plan 轉(zhuǎn)換成多個 Physical Plan
5. 根據(jù) Cost Model 算出每個 Physical Plan 的代價并選取代價最小的 Physical Plan 作為最終的 Physical Plan
6. Spark 以 DAG 的方法執(zhí)行上述 Physical Plan
7. 在Spark 3.0版本以后,引入AQE(Adaptive Query Execution),其在執(zhí)行 DAG 的過程中,會根據(jù)運行時信息動態(tài)調(diào)整執(zhí)行計劃從而提高執(zhí)行效率。
JD Spark Runtime Filter Join的技術(shù)實現(xiàn)包括:基于邏輯執(zhí)行計劃(Logical Plan)數(shù)據(jù)裁剪和基于物理執(zhí)行計劃(Physical Plan)數(shù)據(jù)裁剪,其中前者是在邏輯執(zhí)行計劃(Logical Plan)優(yōu)化過程中(上述步驟3),動態(tài)基于小表數(shù)據(jù)構(gòu)建BloomFilter,并將該BloomFilter 算子插入到大表側(cè)對其進行過濾;后者是對前者場景的補充,如果未能在邏輯執(zhí)行計劃優(yōu)化中動態(tài)插入BloomFilter算子對大表進行過濾,在物理執(zhí)行計劃優(yōu)化中(上述步驟7),通過AQE基于小表構(gòu)建BloomFilter并對大表進行過濾,二者的主要區(qū)別是前者是在大表shuffle 前進行過濾,后者是在大表shuffle后進行過濾,其具體實現(xiàn)架構(gòu)如圖4所示,為了描述方便我們這里結(jié)合一個case進行闡述,下面以Table-A Inner Join Table-B為例,其中Table-A是一張小表:

圖 4 Runtime Filter Join架構(gòu)圖
基于Bloom Filter算法的Runtime Filter Join優(yōu)化機制的執(zhí)行流程如下:
1. 在邏輯執(zhí)行計劃過程中,我們實現(xiàn)了相應(yīng)的Rule,首先判斷小表Table-A 是否可以廣播,如果可以廣播會走BroadcastHashJoin,因BroadcastHashJoin不會引入shuffle、sort,往往性能表現(xiàn)良好,因此對于滿足BroadcastHashJoin的場景,保持Spark原生計算邏輯;否則,會默認走SortMergeJoin;
2. 對于SortMergeJoin,上述Rule會根據(jù)表數(shù)據(jù)文件大小和schema信息對兩個表的Row count進行評估,并基于兩表的Row count信息進行代價評估(例如:Table-A-Row-count/Table-B-Row-count小于一定閾值等),當(dāng)小表Row count與大表Row count滿足一定條件,上述Rule會基于Table-A的join keys動態(tài)構(gòu)建BloomFilter,我們定義為RuntimeBloomFilter,并作為Filter算子動態(tài)插入到Table-B的過濾條件中,因RuntimeBloomFilter會在大表shuffle前進行過濾,提前過濾掉大表側(cè)join時不會被命中的數(shù)據(jù),從而減少大表的shuffle量;
3. 在物理執(zhí)行計劃過程中,同樣實現(xiàn)了相應(yīng)的Rule,該Rule會根據(jù)Join策略的不同,檢查SortMergeJoin的Left或Right節(jié)點中,是否在邏輯執(zhí)行計劃中已基于小表join keys構(gòu)建了BloomFilter對大表側(cè)進行過濾,如果存在,該步驟會自動跳過;否則,我們會基于AQE在執(zhí)行計劃中,基于小表join keys動態(tài)構(gòu)建BloomFilter,這里定義其為ShuffleBloomFilter,并作為Filter算子動態(tài)插入讀取大表shuffle數(shù)據(jù)后面,對shuffle數(shù)據(jù)過濾,從而減少大表join 前sort的數(shù)據(jù)量、減少spill數(shù)據(jù)量。這里與以上優(yōu)化的不同點在于,其作為AQE的一條規(guī)則且依賴于AQE功能的開啟,該項優(yōu)化主要是用于優(yōu)化中間Stage。
基于上述架構(gòu)設(shè)計在實現(xiàn)中面臨的挑戰(zhàn)及技術(shù)攻堅:
1. BloomFilter算子支持codegen
為了實現(xiàn)上述架構(gòu)設(shè)計,我們內(nèi)部自定義了BloomFilter算子,為了提升該算子的處理性能,內(nèi)部實現(xiàn)的BloomFilter算子支持codegen。
2. 支持多join keys場景且多個join keys只需一個BloomFilter
兩張表join時,可能存在多個關(guān)聯(lián)鍵,例如:Table-A a Inner Join Table-B b on a.col1=b.col1 and a.col2=b.col2,如果基于小表每個join key都構(gòu)建一個BloomFilter并分別作為Filter算子對大表側(cè)進行過濾,會導(dǎo)致多次讀取小表,增加IO,另外由于BloomFilter是以廣播向量的方式存在,如果產(chǎn)生過多的BloomFilter會帶來driver和executor的內(nèi)存壓力,為此我們采用XxHash64 對join keys 進行處理,以獲取一個新的散列值,基于新的散列值構(gòu)建BloomFilter并對大表側(cè)過濾,這樣能夠有效提升性能及減少OOM發(fā)生。
選取XxHash64 處理多join keys的原因:首先,XxHash64支持codegen,能夠與上述實現(xiàn)的BloomFilter算子有機結(jié)合,另外基于HashBenchmark驗證可知,XxHash64在處理基本數(shù)據(jù)類型相對Murmur3Hash、HiveHash均有不錯的表現(xiàn),且京東線上的join keys一般是string類型,下面是驗證效果:

? ??
3. 設(shè)計構(gòu)建BloomFilter的timeout fallback機制
結(jié)合上面的Runtime Filter Join的執(zhí)行流程可知,兩表join時如果觸發(fā)了RuntimeBloomFilter或ShuffleBloomFilter特性,均會先基于小表構(gòu)建相應(yīng)的BloomFilter,然后再作為Filter算子對大表進行過濾,這就要求對大表的處理需要等待基于小表構(gòu)建BloomFilter完后才執(zhí)行,極端情況下可能會導(dǎo)致任務(wù)延遲問題,這在京東復(fù)雜、多變且SLA要求嚴格的背景下,如果想全場景鋪開Runtime Filter Join特性會面臨很大的挑戰(zhàn)。為了解決上述問題,我們設(shè)計構(gòu)建BloomFilter的timeout fallback機制,即在規(guī)定時間內(nèi)未能完成基于“小表”構(gòu)建BloomFilter,Spark執(zhí)行計劃會自動回退到原始處理邏輯,這樣能夠有效規(guī)避因大表被誤判成小表,導(dǎo)致構(gòu)建BloomFilter耗時過長所引起的性能回歸問題等。
4. 設(shè)計Rule 支持BloomFilter 謂詞下推
Spark SQL在多個join場景的一個典型優(yōu)化就是謂詞下推,即可以通過內(nèi)部Rule優(yōu)化,將某一組join的謂詞下推到其他join,從而過濾掉其他join的無效數(shù)據(jù),提升性能。通過分析京東線上業(yè)務(wù)場景,發(fā)現(xiàn)大部分任務(wù)存在多個join場景,例如:A as a join B as b on a.col1=b.col1 join C as c on a.col1=c.col1。然而,本文引入的BloomFilter算子是一種基于子查詢的封裝,基于Spark SQL 現(xiàn)有的內(nèi)置Rule不能將其進行謂詞下推,為此我們內(nèi)部實現(xiàn)了相應(yīng)的Rule支持BloomFilter算子的謂詞下推,在滿足多個join且關(guān)聯(lián)鍵相同的情況下,該Rule能夠基于某一組join構(gòu)建的BloomFilter下推到其他join,最終實現(xiàn)基于一個BloomFilter過濾多組join的能力,下面結(jié)合一個案例進行展示,其中 tb1 和 tb2是兩個大表,tb3是小表,其中tb1數(shù)據(jù)量10000條,tb2數(shù)據(jù)量是9000條,tb3數(shù)據(jù)量是10條。spark.range(100000) .select(col("id").as("a"), col("id").as("b"), col("id").as("c")) .write.format(tableFormat).mode(SaveMode.Overwrite) .saveAsTable("tb1")spark.range(9000) .select(col("id").as("a"), col("id").as("b"), col("id").as("c")) .write.format(tableFormat).mode(SaveMode.Overwrite) .saveAsTable("tb2")spark.range(10) .select(col("id").as("a"), col("id").as("b"), col("id").as("c")) .write.format(tableFormat).mode(SaveMode.Overwrite) .saveAsTable("tb3")set spark.sql.autoBroadcastJoinThreshold=-1;sql(s""" |SELECT tb1.a, | tb2.b |FROM tb1 | Inner JOIN tb3 | ON tb1.a = tb3.a AND tb3.b < 4 | Inner JOIN tb2 | ON tb1.a = tb2.a |""".stripMargin)
為了驗證效果,我們這里關(guān)閉BroadcastHashJoin (spark.sql.autoBroadcastJoinThreshold=-1),執(zhí)行上面語句,Spark會優(yōu)先處理 tb1 Inner JOIN tb3 ON tb1.a = tb3.a AND tb3.b < 4,基于Bloom Filter算法的Runtime Filter Join優(yōu)化機制會自動識別出tb3是一張小表,并基于tb3構(gòu)建RuntimeBloomFilter對大表tb1進行過濾,然后進行SortMergeJoin,其DAG如圖5所示,由圖5可知tb1的數(shù)據(jù)量得到有效過濾,只保留滿足join條件的4條數(shù)據(jù),這在生產(chǎn)環(huán)境中的收益是顯著的,具體上線效果在京東實際場景中的收益中會闡述。

圖 5 基于Runtime Filter Join優(yōu)化效果
如果不引入BloomFilter謂詞下推優(yōu)化,我們的優(yōu)化到此已經(jīng)結(jié)束,上述計算結(jié)果最終再與tb2進行SortMergeJoin,這時我們發(fā)現(xiàn)tb2的數(shù)據(jù)會全部參與計算,其中9000條數(shù)據(jù)均會進行shuffle并最終與第一組join的結(jié)果進行join匹配,但滿足join條件的數(shù)據(jù)也僅4條,即tb2存在大量無效數(shù)據(jù)參與shuffle、sort,對于上述場景我們自定義的Rule能夠有效將基于tb3構(gòu)建RuntimeBloomFilter下推到第二組join里,并對tb2進行謂詞下推,其具體的優(yōu)化效果如圖6所示,由圖可知tb2的數(shù)據(jù)量也得到有效過濾,只保留了滿足join條件的4條數(shù)據(jù),最終實現(xiàn)了基于一組join的BloomFilter謂詞下推到其他join,從而減少shuffle量,減少集群負載,提升任務(wù)時效,同時節(jié)省大量的計算資源。

圖 6 基于BloomFilter謂詞下推的優(yōu)化效果
基于TPC-DS 10TB 基準(zhǔn)測試驗證,如圖7所示,99個查詢中有9個被Runtime Filter Join特性命中,其中shuffle量減少1.5%~73.7%。
圖 7 基于TPC-DS基準(zhǔn)測試驗證
下面是京東線上的優(yōu)化案例:其為某重要業(yè)務(wù)線的某商品SKU屬性表(小表)與某商品SKU擴展屬性模型表(大表)進行Left Outer Join,因業(yè)務(wù)需要兩張表在join前會根據(jù)相關(guān)字段進行g(shù)roup by,最終再通過商品SKU編號進行關(guān)聯(lián),涉及業(yè)務(wù)較為復(fù)雜,這里不展開描述,該業(yè)務(wù)場景是非常符合本文的Runtime Filter Join優(yōu)化,下面是優(yōu)化前后的對比效果:

圖 8 某核心任務(wù)優(yōu)化前效果

圖 9 某核心任務(wù)優(yōu)化后效果
由上面優(yōu)化前后的對比效果圖8和圖9可知,1)優(yōu)化前,該線上任務(wù)的shuffle量大概是:16TB+,任務(wù)運行時間:4.4小時;2)基于Runtime Filter Join優(yōu)化后,shuffle量減少至:200MB+,任務(wù)運行時間縮短至:4分鐘,計算性能提升:95%+,同時節(jié)省了大量的計算資源。基于Bloom Filter算法的Runtime Filter Join優(yōu)化機制在京東的線上優(yōu)化案例不勝枚舉,限于篇幅原因這里不一一展開。目前,基于Bloom Filter算法的Runtime Filter Join優(yōu)化機制已全面上線(默認關(guān)閉),基于京東自研Spark版本相對Spark社區(qū)版本,命中任務(wù)平均處理數(shù)據(jù)量(shuffle量)減少72%、性能提升53%。
本文討論京東Spark計算引擎研發(fā)團隊基于Bloom Filter算法的Runtime Filter Join優(yōu)化機制,助力京東大促場景的探索和實踐。目前上線效果顯著,但仍有一些不足及待提升項:1、目前表Row count主要是通過表文件大小及其schema信息進行判斷,存在一定誤差,后續(xù)會嘗試開啟CBO進行精準(zhǔn)判斷;
2、目前是基于小表動態(tài)構(gòu)建BloomFilter,需要讀取小表的數(shù)據(jù)信息,后續(xù)會考慮基于列式文件存儲的一些特性來豐富整個架構(gòu),例如:如果要基于小表的全量數(shù)據(jù)構(gòu)建BloomFilter,且存儲格式是ORC或Parquet等,可直接基于其Data文件內(nèi)部的Index BloomFilter構(gòu)建全表的BloomFilter,這樣可以進一步提升性能;
3、目前優(yōu)化的主要是兩表關(guān)聯(lián)的場景,后續(xù)需要增強對多表關(guān)聯(lián)等復(fù)雜場景的支持。
我們后續(xù)計劃將Spark Runtime Filter Join 技術(shù)與數(shù)據(jù)湖技術(shù)相結(jié)合,助力京東湖倉一體相關(guān)場景的落地及實踐。同時,我們會繼續(xù)加強行業(yè)內(nèi)技術(shù)交流,在分享內(nèi)部技術(shù)實踐與經(jīng)驗的同時,聆聽大家的反饋和建議,在滿足內(nèi)部業(yè)務(wù)增長需要的同時,相關(guān)技術(shù)會反饋給社區(qū),共同建設(shè)Spark生態(tài)。