Presto 實踐 | Presto on Spark:通過 Spark 來擴展 Presto
概述
Presto 最初設(shè)計是對數(shù)據(jù)倉庫中的數(shù)據(jù)運行交互式查詢,但現(xiàn)在它已經(jīng)發(fā)展成為一個位于開放數(shù)據(jù)湖分析之上的統(tǒng)一 SQL 引擎,用于交互式和批處理工作負載,數(shù)據(jù)湖上的流行工作負載包括:
?報告和儀表盤:這包括為內(nèi)部和外部開發(fā)人員提供自定義報告以獲取業(yè)務(wù)洞察力,以及許多使用 Presto 進行交互式 A/B 測試分析的組織。這個用例的典型特征是要求低延遲。它在非常高的 QPS 下需要數(shù)十到數(shù)百毫秒,毫不奇怪,這個場景幾乎完全使用 Presto,而這正是 Presto 的設(shè)計目的。?使用 SQL notebooks 的數(shù)據(jù)科學(xué)家:這個用例是一種 ad-hoc 分析,通常需要從幾秒到幾分鐘的中等延遲。這些是數(shù)據(jù)科學(xué)家和業(yè)務(wù)分析師的查詢,他們希望執(zhí)行緊湊的臨時分析以了解產(chǎn)品使用情況,例如用戶趨勢和如何改進產(chǎn)品。QPS 相對較低,因為用戶必須手動啟動這些查詢。?用于大型數(shù)據(jù)管道的批處理:這些是每天、每小時或在數(shù)據(jù)準備就緒時運行的計劃作業(yè)(scheduled jobs)。它們通常包含對大量數(shù)據(jù)的查詢,延遲可達數(shù)十小時,處理的數(shù)據(jù)從 TB 到 PB 不等。
Presto 對于 ad-hoc 或交互式查詢,甚至一些批處理查詢都非常有效,其約束條件是整個查詢必須適合內(nèi)存并且運行速度足夠快,不需要容錯。大多數(shù)不適合此框的 ETL 批處理工作負載運行在像 Apache Spark 這種“大數(shù)據(jù)”計算引擎上。使用不同 SQL 方言和 API 的多個計算引擎使得數(shù)據(jù)平臺團隊管理和擴展這些工作負載變得復(fù)雜。因此,F(xiàn)acebook 決定使用 Presto on Spark 這種方式來簡化和進一步擴展 Presto。在我們介紹 Presto on Spark 之前,我們先來看下這兩種流行計算引擎的框架。
Presto 的架構(gòu)

Presto 專為低延遲而設(shè)計,遵循經(jīng)典的 MPP 架構(gòu);它使用內(nèi)存 streaming shuffle 來實現(xiàn)低延遲。Presto 每個集群有一個共享的 coordinator,并帶有許多的 workers。Presto 嘗試在同一個 Presto worker(共享執(zhí)行程序)上安排盡可能多的查詢,以支持多租戶。
這種體系結(jié)構(gòu)提供了非常低的任務(wù)延遲調(diào)度,并允許對查詢的多個 stages 進行并發(fā)處理,但其代價是 coordinator 是一個單點故障(SPOF)和瓶頸,查詢在整個集群中隔離得很差。
此外,streaming shuffle 不允許太多的容錯,從而進一步影響長時間運行的查詢的可靠性。
Spark 的架構(gòu)

另一方面,Apache Spark 從一開始就是為可擴展性而設(shè)計的,它實現(xiàn)了 Map-Reduce 架構(gòu)。Shuffle 在執(zhí)行階段之間完全具體化到磁盤,具有搶占或重新啟動任何任務(wù)的能力。Spark 維護一個獨立的 Driver 來協(xié)調(diào)每個查詢,并在按需調(diào)度的隔離容器(containers)中運行任務(wù)。這些差異提高了可靠性并降低了整體操作開銷。
為什么 Presto 不適用于批處理
眾所周知,將 MPP 架構(gòu)的數(shù)據(jù)庫擴展到處理 Internet 規(guī)模數(shù)據(jù)集的批處理是一個極其困難的問題 [1]。為了簡化這個問題,讓我們看下以下聚合查詢。本質(zhì)上,此查詢遍歷 TPCH 中的訂單表,對 custkey 列進行聚合分組,并對總價求和。Presto 利用內(nèi)存中的 shuffle 并在每個 worker 上讀取數(shù)據(jù)并為相同的 key 進行聚合。

在內(nèi)存中進行 shuffle 意味著生產(chǎn)者將在內(nèi)存中緩沖數(shù)據(jù),并等待消費者讀取數(shù)據(jù)。我們必須在交換前后同時執(zhí)行所有任務(wù)。在 mapreduce 世界中,所有的 mapper 和 reducer 必須同時運行。這使得 in-memory shuffle 成為一種全有或全無的排除模型(all-or-nothing exclusion model)。
這將導(dǎo)致不靈活的調(diào)度和查詢大小的擴展變得更加困難,因為所有操作都是并發(fā)運行的。在聚合階段,查詢可能會超過內(nèi)存限制,因為為了跟蹤每個組(custkey),所有內(nèi)容都必須以散列表的形式保存在內(nèi)存中。
此外,我們還受到集群大小的限制,即我們可以跨多少個節(jié)點對數(shù)據(jù)進行 hash partition,以避免將所有數(shù)據(jù)都放入內(nèi)存中。使用分布式磁盤(Presto-on-Spark、Presto Unlimited),我們可以進一步對數(shù)據(jù)進行分區(qū),并且僅受打開文件數(shù)量的限制,即使是這個限制,也可以通過 shuffle 服務(wù)進行相當大的擴展。
出于這個原因,Presto 很難擴展到非常大和復(fù)雜的批處理管道。這樣的管道會持續(xù)運行數(shù)小時,所有這些都是為了 join 和聚合大量數(shù)據(jù)。這推動了 Presto Unlimited 的開發(fā),它使 Presto 的 MPP 設(shè)計適應(yīng)大型 ETL 工作負載,并大規(guī)模改善用戶體驗。

雖然 Presto Unlimited 通過允許在分布式磁盤上對分區(qū)進行 shuffle 解決了部分問題,但它并沒有完全解決容錯問題,也沒有改善隔離和資源管理。
Presto on Spark
Presto on Spark 是 Presto 和 Spark 之間的集成,它利用 Presto 的 compiler/evaluation 作為類庫,并使用 Spark 的 RDD API 來管理 Presto embedded evaluation 的執(zhí)行。這類似于 Google 選擇將 F1 Query 嵌入其 MapReduce 框架的方式。
高級目標是為 Presto 的 MPP 運行時帶來完全分解的 shuffle,我們通過在 shuffle 后立即添加一個 materialization 步驟來實現(xiàn)這一點。物化后的 shuffle 被建模為臨時分區(qū)表,在 shuffle 后帶來更靈活的執(zhí)行,并允許分區(qū)級別重試。使用 Presto on Spark,我們可以在 mapper 和 reducer 端對上述查詢的 custkey 進行完全分解的 shuffle,這意味著所有 mapper 和 reducer 都可以獨立調(diào)度并且可以獨立重試。

如果想及時了解Spark、Hadoop或者HBase相關(guān)的文章,歡迎關(guān)注微信公眾號:過往記憶大數(shù)據(jù)
Presto On Spark 在 Intuit 的使用
Superglue 是 Intuit 自主研發(fā)的工具,可以幫助用戶構(gòu)建、管理和監(jiān)控數(shù)據(jù)管道。Superglue 的建立是為了讓分析師和數(shù)據(jù)科學(xué)家的數(shù)據(jù)民主化(democratize data)。Superglue 最大限度地減少了開發(fā)和調(diào)試數(shù)據(jù)管道的時間,并最大限度地增加了構(gòu)建業(yè)務(wù)洞察力和 AI/ML 的時間。
Intuit 的許多分析師使用 Presto (AWS Athena) 來探索 Data Lake/S3 中的數(shù)據(jù)。這些分析師會花幾個小時將這些為 Presto 編寫的探索 SQL 轉(zhuǎn)換為 Spark SQL,以便將它們作為 Superglue 中的數(shù)據(jù)管道進行操作/調(diào)度。為了最大限度地減少 SQL 方言轉(zhuǎn)換問題和分析師的相關(guān)生產(chǎn)力損失,Intuit 團隊開始探索各種選項,包括查詢翻譯、查詢虛擬化(query virtualization)和 presto on spark。在快速 POC 之后,Intuit 決定使用 presto on spark 模式,因為它利用 Presto 的編譯器/評估作為庫(不需要查詢轉(zhuǎn)換)和 Spark 的可擴展數(shù)據(jù)處理能力。
Presto on Spark 現(xiàn)已在 Intuit 投入生產(chǎn)。在三個月內(nèi),有數(shù)百個關(guān)鍵管道通過 Superglue 在 Presto On Spark 上運行了數(shù)千個作業(yè)。
Presto on Spark 作為庫運行,該庫通過 Spark 集群上的 spark-submit 或 Jar Task 方式提交。在臨時集群上啟動預(yù)定的批處理數(shù)據(jù)管道,以利用資源隔離、管理成本并最大限度地減少運營開銷。DDL 語句在 Hive 上執(zhí)行,DML 語句在 Presto 上執(zhí)行。這使分析師能夠編寫與 Hive 兼容的 DDL,并且用戶體驗保持不變。
該解決方案幫助實現(xiàn)了一個具有無縫端到端體驗的高性能和可擴展平臺,供分析師探索和處理數(shù)據(jù)。因此,它提高了分析師的工作效率,并使他們能夠高速提供洞察力。
何時在 Presto 中使用 Spark 的執(zhí)行引擎
Spark 是整個行業(yè)運行大規(guī)模復(fù)雜批處理 ETL 管道的首選工具。Presto on Spark 極大地受益于用 Presto 編寫的管道,這些管道可以處理 TB/PB 的數(shù)據(jù),因為它利用了 Spark 的大規(guī)模處理能力。這里最大的好處是不需要查詢轉(zhuǎn)換,在以下的場景中你可以利用 Spark:
?處理更大范圍的數(shù)據(jù)?將 Presto 的資源管理擴展到更大的集群?提高 Presto 作為計算引擎的可靠性和彈性
為什么 Presto on Spark 很重要
我們嘗試實現(xiàn)以下目標,以使 Presto on Spark 適應(yīng)互聯(lián)網(wǎng)規(guī)模的批處理工作負載:
?完全分解的 shuffle(disaggregated shuffles)?Isolated executors?Presto 資源管理、不同調(diào)度器、推測執(zhí)行等。
批量數(shù)據(jù)和 ad hoc 處理的統(tǒng)一對于創(chuàng)建可擴展的查詢體驗非常重要,其不需要在不同的 SQL 方言之間重寫。我們相信,這只是 Spark 和 Presto 社區(qū)進一步融合的第一步,也是實現(xiàn)交互用例和批處理用例之間統(tǒng)一 SQL 體驗的重要一步。今天,許多互聯(lián)網(wǎng)巨頭,如 Facebook 等,都已轉(zhuǎn)向 Presto on Spark,我們已經(jīng)看到許多組織,包括 Intuit,開始使用 Presto on Spark 運行他們復(fù)雜的數(shù)據(jù)管道。
本文翻譯自《Scaling with Presto on Spark》:https://prestodb.io/blog/2021/10/26/Scaling-with-Presto-on-Spark
