<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Paimon 與 Spark 的集成(二):查詢優(yōu)化

          共 11428字,需瀏覽 23分鐘

           ·

          2024-04-11 12:08

          Paimon

          Apache Paimon (incubating) 是一項流式數(shù)據(jù)湖存儲技術(shù),可以為用戶提供高吞吐、低延遲的數(shù)據(jù)攝入、流式訂閱以及實時查詢能力。Paimon 采用開放的數(shù)據(jù)格式和技術(shù)理念,可以與 Flink / Spark / Trino 等諸多業(yè)界主流計算引擎進(jìn)行對接,共同推進(jìn) Streaming Lakehouse 架構(gòu)的普及和發(fā)展。

          Paimon x Spark

          ??Apache Spark,作為大數(shù)據(jù)處理的統(tǒng)一計算分析引擎,不僅支持多種語言的高級API使用,也支持了豐富的大數(shù)據(jù)場景應(yīng)用,包括結(jié)構(gòu)化數(shù)據(jù)處理的Spark SQL、用于機(jī)器學(xué)習(xí)的MLlib,用于圖形處理的GraphX,以及用于增量計算和流處理的Structured Streaming。Spark已經(jīng)成為了大數(shù)據(jù)領(lǐng)域軟件棧中必不可少的組成部分。對 Paimon 來說,為了在準(zhǔn)實時和離線湖倉場景更加便利的落地,與 Spark 深度、全面的集成勢在必行。

          在之前的Paimon Release版本,我們著重豐富Paimon在功能上和Spark SQL生態(tài)的集成,包括Schema Evolution,Structured Streaming Read/Write,Dynamic Insert Overwrite Partition,Update/Merge Into等等。在最近發(fā)布的0.6和0.7版本,我們開始在Paimon基于Spark SQL查詢性能上做一些工作。在初期我們會結(jié)合Spark SQL已有的優(yōu)化規(guī)則和框架,讓Paimon充分利用到這些。通過一系列優(yōu)化,我們將 Paimon x Spark 在 TpcDS 上的性能提高了37+%,已基本和 Parquet x Spark 持平。下文將對其中的關(guān)鍵優(yōu)化點進(jìn)行詳細(xì)介紹。??

          動態(tài)分區(qū)裁剪

          動態(tài)分區(qū)裁剪(Dynamic Partition Prunning,DPP)在SQL優(yōu)化中是常見的優(yōu)化點,本質(zhì)上是謂詞下推(Predicate PushDown)的一種拓展,其目的是最小化從數(shù)據(jù)源中讀取數(shù)據(jù)的IO成本,也進(jìn)而減少了計算成本。

          在數(shù)倉中,常常將較大的事實表和很小的維度表關(guān)聯(lián)查詢,且事實表需要根據(jù)維表中的字段信息來進(jìn)行過濾,如下面TpcDS Q14中的SQL片段:

                  
                    select ss_quantity quantity ,ss_list_price list_price
                  
                  
                    from store_sales, date_dim
                  
                  
                    where ss_sold_date_sk = d_date_sk and d_year between 1999 and 1999 + 2
                  
                  
                    order by quantity limit 10;
                  
                
          在不支持DPP的情況下的執(zhí)行計劃簡化如下:

          ef558e4c94fffb53e762c2ccdc9b7967.webp

          Paimon應(yīng)用的是Spark DataSource V2的查詢框架,該框架在Spark3.2后提供了 SupportsRuntimeFiltering 接口用于V2表實現(xiàn)運(yùn)行時的動態(tài)過濾。理論上,任何字段(包括普通數(shù)據(jù)字段和分區(qū)字段)的過濾條件都能被應(yīng)用,但一般而言僅分區(qū)字段的過濾條件能夠被完全應(yīng)用,即無需上層的Filter的節(jié)點再使用該過濾條件去選擇數(shù)據(jù)。Paimon表通過該接口實現(xiàn)了動態(tài)分區(qū)裁剪的能力。在支持DPP后執(zhí)行計劃如下所示:

          ed7327687cc92a2f1f139d1431e1e66d.webp

          在1T的TpcDS數(shù)據(jù)集下,應(yīng)用DPP后  store_sales  表參與join的數(shù)據(jù)量從27億 減少到16億。僅應(yīng)用到該優(yōu)化后,Q14運(yùn)行時間減少到原來的~55%,1T TpcDS數(shù)據(jù)集的查詢性能整體提升20+%;
          相關(guān)代碼: https://github.com/apache/incubator-paimon/pull/2411 https://github.com/apache/incubator-paimon/pull/2421

          Exchange復(fù)用

          Exchange是Spark中物理計劃中一個關(guān)鍵的操作,對應(yīng)邏輯計劃中的Shuffle。在執(zhí)行階段,Exchange可以代表某個SQL中部分Plan輸出的數(shù)據(jù)。在復(fù)雜的SQL中,我們可以通過公共表表達(dá)式(Common Table Expression,CTE)語法定義一個SQL片段,用于簡化整個SQL或者被多次使用。以下面簡化的TpcDS Q23為例,定義的其中一個CTE frequent_ss_items 在整個SQL中被兩次使用。
                  
                    with frequent_ss_items as (
                  
                  
                      select substr(i_item_desc,1,30) itemdesc, i_item_sk item_sk, d_date solddate, count(*) cnt
                  
                  
                      from store_sales, date_dim, item
                  
                  
                      where ss_sold_date_sk = d_date_sk and ss_item_sk = i_item_sk and d_year in (2000,2000+1,2000+2,2000+3)
                  
                  
                      group by substr(i_item_desc,1,30),i_item_sk,d_date
                  
                  
                      having count(*) >4
                  
                  
                    ),
                  
                  
                    max_store_sales as (...),
                  
                  
                    best_ss_customer as (...)
                  
                  
                    select sum(sales)
                  
                  
                    from (
                  
                  
                      select cs_quantity*cs_list_price sales
                  
                  
                           from catalog_sales
                  
                  
                               ,date_dim
                  
                  
                           where d_year = 2000
                  
                  
                             and d_moy = 2
                  
                  
                             and cs_sold_date_sk = d_date_sk
                  
                  
                             and cs_item_sk in (select item_sk from frequent_ss_items)
                  
                  
                             and cs_bill_customer_sk in (select c_customer_sk from best_ss_customer)
                  
                  
                          union all
                  
                  
                          select ws_quantity*ws_list_price sales
                  
                  
                           from web_sales
                  
                  
                               ,date_dim
                  
                  
                           where d_year = 2000
                  
                  
                             and d_moy = 2
                  
                  
                             and ws_sold_date_sk = d_date_sk
                  
                  
                             and ws_item_sk in (select item_sk from frequent_ss_items)
                  
                  
                             and ws_bill_customer_sk in (select c_customer_sk from best_ss_customer)
                  
                  
                    ) y
                  
                  
                    limit 100;
                  
                

          顯然在執(zhí)行階段,我們希望 frequent_ss_items 僅被執(zhí)行一次,執(zhí)行后的數(shù)據(jù)可以緩存,然后分別執(zhí)行后續(xù)和 catalog_sales 以及 web_sales 表的Join操作。針對這個場景,Spark提供了Exchange復(fù)用的優(yōu)化,期待的執(zhí)行計劃簡化如下所示:

          6263b700ce56a73937d6c06f9a434914.webp但該優(yōu)化依賴算子Plan中各個物理操作的 hashCode 來確定實際運(yùn)行時是否可以復(fù)用。我們定位并解決了Paimon中存在的實現(xiàn)問題,使得Paimon可以使用到Spark提供的Exchange復(fù)用的優(yōu)化,從而減少不必要的冗余計算,也降低了IO和網(wǎng)絡(luò)的開銷。僅應(yīng)用到該優(yōu)化后,Q23運(yùn)行時間減少到原來的~50%,1T TpcDS數(shù)據(jù)集的查詢性能整體提升13+%;


          相關(guān)代碼:

          https://github.com/apache/incubator-paimon/pull/2488

          動態(tài)調(diào)整Scan并發(fā)

          任務(wù)實際執(zhí)行時的并發(fā)度是影響作業(yè)運(yùn)行性能的關(guān)鍵之一。Spark提供了 spark.sql.shuffle.partitions 參數(shù)來調(diào)整Join或者Agg等算子的并發(fā),也提供了自適應(yīng)查詢執(zhí)行(Adative Query Execution,AQE)框架動態(tài)調(diào)整并發(fā),但這些都無法影響到讀取數(shù)據(jù)源Scan階段的并發(fā)。

          在DataSource V2的框架下,數(shù)據(jù)源的Scan方式包括并發(fā)完全由DataSource自己決定。我們以TpcDS Q19為例:

                  
                    select  i_brand_id brand_id, i_brand brand, i_manufact_id, i_manufact,
                  
                  
                       sum(ss_ext_sales_price) ext_price
                  
                  
                     from date_dim, store_sales, item,customer,customer_address,store
                  
                  
                     where d_date_sk = ss_sold_date_sk
                  
                  
                       and ss_item_sk = i_item_sk
                  
                  
                       and i_manager_id=8
                  
                  
                       and d_moy=11
                  
                  
                       and d_year=1998
                  
                  
                       and ss_customer_sk = c_customer_sk 
                  
                  
                       and c_current_addr_sk = ca_address_sk
                  
                  
                       and substr(ca_zip,1,5) <> substr(s_zip,1,5) 
                  
                  
                       and ss_store_sk = s_store_sk 
                  
                  
                     group by i_brand
                  
                  
                          ,i_brand_id
                  
                  
                          ,i_manufact_id
                  
                  
                          ,i_manufact
                  
                  
                     order by ext_price desc
                  
                  
                             ,i_brand
                  
                  
                             ,i_brand_id
                  
                  
                             ,i_manufact_id
                  
                  
                             ,i_manufact
                  
                  
                    limit 100 ;
                  
                

          其中 customer_address store 基于 substr(ca_zip,1,5) <> substr(s_zip,1,5) 條件Join。

          在未引入CBO對join重排序的情況下,這兩張表通過BroadcastNestedLoopJoin來實現(xiàn),沒有引入Exchange調(diào)整Join的并發(fā)。執(zhí)行計劃如下圖所示:

          c595a2ff64a7302d654a63855b2f6ee3.webp

          在未引入優(yōu)化之前,由于 customer_address 表的數(shù)據(jù)分片較小,但任務(wù)計算負(fù)載較高(數(shù)據(jù)Join后嚴(yán)重膨脹),整體執(zhí)行性能很差。

          92d713afa17e3f5bc4f8fa74c48bdf6c.webpPaimon根據(jù)這種問題提供了基于當(dāng)前作業(yè)的可用core數(shù)來動態(tài)調(diào)整數(shù)據(jù)源的數(shù)據(jù)分片的能力,也進(jìn)而調(diào)整并發(fā),從而提升查詢效率。 c60f5a8573689e80d5ad7aac490d53a0.webp僅應(yīng)用該優(yōu)化后,Q19運(yùn)行時間減少到原來的~25%,1T TpcDS數(shù)據(jù)集的查詢性能整體提升14+%;


          相關(guān)代碼: https://github.com/apache/incubator-paimon/pull/2482

          合并標(biāo)量子查詢

          類似于Exchange復(fù)用,合并標(biāo)量子查詢優(yōu)化會遍歷整個SQL邏輯執(zhí)行計劃,提取出標(biāo)量子查詢(ScalarSubQuery),嘗試將多個標(biāo)量子查詢合并起來,使得僅執(zhí)行一次子查詢得到多個標(biāo)量值。

          我們以TpcDS Q9的片段為例,整個Q9由5個case-when語句構(gòu)成。

                  
                    select case when (select count(*) 
                  
                  
                                      from store_sales 
                  
                  
                                      where ss_quantity between 1 and 20) > 74129
                  
                  
                                then (select avg(ss_ext_discount_amt) 
                  
                  
                                      from store_sales 
                  
                  
                                      where ss_quantity between 1 and 20) 
                  
                  
                                else (select avg(ss_net_paid)
                  
                  
                                      from store_sales
                  
                  
                                      where ss_quantity between 1 and 20) end bucket1
                  
                  
                    from reason
                  
                  
                    where r_reason_sk = 1;
                  
                
          在該SQL中, case when 的條件, then else 語句三個部分使用同樣的過濾條件讀取同一張表,僅聚合表達(dá)式不同。在沒有應(yīng)用到這個優(yōu)化的情況下,執(zhí)行計劃如下所示:

          1a064989b52d0c0874be8f39be84578e.webp

          Spark本身提供了 MergeScalarSubQueries 的優(yōu)化規(guī)則,但從實現(xiàn)上沒法更好的對接到Paimon這樣的DataSource V2表,因此我們在Paimon側(cè)單獨實現(xiàn),并通過Spark提供的Extensions的接口將Paimon自實現(xiàn)的優(yōu)化注入到了Spark優(yōu)化器中。在應(yīng)用該優(yōu)化后,執(zhí)行計劃如下所示:

          117bbbc58e212d8a6bb7a8d7d99a6d19.webp

          由此可見,合并標(biāo)量子查詢優(yōu)化有效的減少了冗余的計算,提升了Paimon在該場景下的查詢性能。僅應(yīng)用該優(yōu)化后,Q9運(yùn)行時間減少到原來的~57%。
          相關(guān)代碼: https://github.com/apache/incubator-paimon/pull/2657

          Cost-Based優(yōu)化

          Spark SQL允許使用基于成本的優(yōu)化(Cost-Based Optimizer,CBO)來提升查詢性能,主要用于多路Join的場景,使用動態(tài)規(guī)劃算法來選擇Cost最低的Join順序。要想使得這個優(yōu)化能更有效,依賴于計算Cost的模型,以及表的表級和列級統(tǒng)計信息的收集,而其中列級統(tǒng)計信息在評估Plan算子節(jié)點的運(yùn)行時統(tǒng)計信息中尤為重要。

          新版本的Paimon在元數(shù)據(jù)中增加了statistics的信息,可以通過原生的Spark Analyze命令完成收集,并對接到了Spark SQL,使得Spark SQL可以利用Paimon的表級/列級信息進(jìn)行查詢優(yōu)化。我們以TpcDS Q24a為例:
                  
                    with ssales as
                  
                  
                    (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid
                  
                  
                    from store_sales, store_returns, store, item, customer, customer_address
                  
                  
                    where ss_ticket_number = sr_ticket_number
                  
                  
                      and ss_item_sk = sr_item_sk
                  
                  
                      and ss_customer_sk = c_customer_sk
                  
                  
                      and ss_item_sk = i_item_sk
                  
                  
                      and ss_store_sk = s_store_sk
                  
                  
                      and c_current_addr_sk = ca_address_sk
                  
                  
                      and c_birth_country <> upper(ca_country)
                  
                  
                      and s_zip = ca_zip
                  
                  
                    and s_market_id=8
                  
                  
                    group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size)
                  
                  
                    select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
                  
                  
                    from ssales
                  
                  
                    where i_color = 'peach'
                  
                  
                    group by c_last_name, c_first_name, s_store_name
                  
                  
                    having sum(netpaid) > (select 0.05 * avg(netpaid) from ssales)
                  
                  
                    order by c_last_name, c_first_name, s_store_name;
                  
                
          其中CTE ssales 的部分,僅提供表級統(tǒng)計信息的情況下的執(zhí)行計劃大致如下所示,包括兩個SortMergeJoin,其中左側(cè)虛線框更是大數(shù)據(jù)量間的Join操作,嚴(yán)重影響性能。 d5cfeaba5815981b67c7f5519f5ca1bd.webp

          而執(zhí)行Analyze提供了列級統(tǒng)計信息后執(zhí)行計劃大致如下所示,對參與Join的表進(jìn)行了重排序,且所有Join都是BroadcastHashJoin的方式執(zhí)行。

          8c38d2ecca8a04b23f5837a9c1748005.webpPaimon提供了完整的statistics,借助于CBO框架,不僅可以提升相應(yīng)的查詢性能,也可以使得在正常資源配置下無法跑通的SQL能夠正常運(yùn)行,比如TpcDS Q72。在之前優(yōu)化項基礎(chǔ)上疊加應(yīng)用該優(yōu)化后,Q24運(yùn)行時間減少到原來的~23%,1T TpcDS數(shù)據(jù)集的查詢性能整體提升~30%;
          相關(guān)代碼: https://github.com/apache/incubator-paimon/pull/2677 https://github.com/apache/incubator-paimon/pull/2752

          https://github.com/apache/incubator-paimon/pull/2798

          優(yōu) 化 效 果

          本文使用阿里云 EMR 5.16.0版本,集群節(jié)點的屬性如下:

          • master: 1 * ecs.g7.8xlarge 32 vCPU 128 GiB
          • core: 6 * ecs.g7.8xlarge 32 vCPU 128 GiB

          使用的組件及版本如下:
          • Paimon: 0.8-SNAPSHOT (對應(yīng)到commit:193df7345aa520f8b45125cdd85588a91a3fc3a9)
          • Spark: 3.3.1 (額外cherry-pick SPARK-41378 ,以支持DataSource V2下的stats相關(guān)功能)

          啟用的Spark相關(guān)配置:
          spark.executor.cores 4
          spark.executor.memory 14g
          spark.executor.memoryOverhead 2g
          spark.dynamicAllocation.enabled true
          spark.sql.cbo.enabled true
          spark.sql.cbo.joinReorder.enabled true
          spark.sql.autoBroadcastJoinThreshold 128m

          Paimon表選用append表(無主鍵表),使用parquet作為文件格式,設(shè)置bucket=-1(最新代碼已經(jīng)默認(rèn)設(shè)置: PAIMON-2829 ),這樣便于和Spark parquet表進(jìn)行對比。

          f8620ec3e11d5ed0ad7d9b2e96ef6401.webp 上圖為我們使用parquet表(帶有表級統(tǒng)計信息,即rowCount和sizeInByte兩個指標(biāo))作為基準(zhǔn),以此向右分別為優(yōu)化前和應(yīng)用這些優(yōu)化后的Paimon表(僅帶表統(tǒng)計信息),以及Parquet表和Paimon表在收集到Column級別統(tǒng)計信息時的查詢較基準(zhǔn)的性能對比。 對比可見,在一般情況下(無column級統(tǒng)計信息)優(yōu)化后的Paimon和Parquet已經(jīng)基本持平。開啟column級統(tǒng)計信息后,Paimon較Parquet慢~8%,這中間的差距也將是性能優(yōu)化繼續(xù)跟進(jìn)的方向之一。

          后 續(xù) 規(guī) 劃 


          在湖倉體系下,我們認(rèn)為讀寫查詢優(yōu)化一直是一項任重而道遠(yuǎn)的事情。當(dāng)前的優(yōu)化主要集中在讓Paimon充分利用到Spark SQL現(xiàn)有的優(yōu)化規(guī)則或者優(yōu)化框架。在繼續(xù)推進(jìn)的同時,我們也會利用Paimon自身的特性,比如Index或者Clustering等,以及優(yōu)化Scan等進(jìn)一步提升Paimon性能。

          另外,在當(dāng)前湖倉場景下,依然有很多無主鍵表的使用,后續(xù)對append表支持Upsert能力也是重要的規(guī)劃之一。

          < 往 期 精 彩 推 薦 >

          bdc15d9d9f03943001f406a300b186c8.webp



          ▼ 關(guān)注「 Apache Spark 技術(shù)交流社區(qū) 」,獲取更多技術(shù)干貨 
          瀏覽 102
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人黄色AV网站 | 天天干天天干天天操 | 天天日天天干天天插天天操 | 亚洲一区在线免费 | 人人摸人人操人人爽 |