DDIA:數(shù)倉和大數(shù)據(jù)的雙向奔赴
DDIA 讀書分享會,會逐章進行分享,結(jié)合我在工業(yè)界分布式存儲和數(shù)據(jù)庫的一些經(jīng)驗,補充一些細節(jié)。每兩周左右分享一次,歡迎加入,Schedule 在這里[1]。我們有個對應的分布式&數(shù)據(jù)庫討論群,每次分享前會在群里通知。如想加入,可以加我的微信號:qtmuniao,簡單自我介紹下,并注明:分布式系統(tǒng)群。
在 MapReduce 流行這些年之后,針對大數(shù)據(jù)集的分布式批處理執(zhí)行引擎已經(jīng)逐漸成熟。到現(xiàn)在(2017年)已經(jīng)有比較成熟的基礎設施可以在上千臺機器上處理 PB 量級的數(shù)據(jù)。因此,針對這個量級的基本數(shù)據(jù)處理問題可以認為已經(jīng)被解決,大家的注意力開始轉(zhuǎn)到其他問題上:
- 完善編程模型
- 提升處理性能
- 擴大處理領域
之前我們討論過,由于 MapReduce 提供的編程接口實在太過難用,像 Hive, Pig,Cascading 和 Crunch 等處理 API 和框架逐漸流行。Apache Tez 更進一步,可以讓原來的代碼不做過多改動就可以遷移。Spark 和 Flink 也各自有其高層的數(shù)據(jù)流 API,基本借鑒自 FlumeJava。
這些數(shù)據(jù)流工具基本都是用關系型的算子來表達計算過程:
- 基于某些字段對數(shù)據(jù)集進行連接的 Join 算子
- 基于關鍵字對元組進行聚類的 Group 算子
- 基于條件對元組進行過濾的 Filter 算子
- 對元素進行聚合和統(tǒng)計的 Aggregate 算子
等等。這些算子內(nèi)部實現(xiàn)時,會用到我們本章之前提到的各種 join 和 group 算法。
除了能夠顯著降低使用方的代碼量外,這些高層的框架通常還支持交互式的使用。因此,你可以在 shell 中增量式的構(gòu)建分析代碼,且能夠方便的多次跑以查看運行結(jié)果。當我們拿到一個新的數(shù)據(jù)集,需要做實驗探索該如何對其進行分析時,這種交互式的方式非常方便。這其實也是我們之前討論過的 Unix 編程哲學的一個體現(xiàn)。
這些高層的 API 不僅讓用戶可以更高效的使用體驗,還能夠提升任務在物理層面的執(zhí)行效率。
向聲明式方向靠攏
相比直接實現(xiàn)代碼進行 Join,使用關系型的 Join 算子給了處理框架分析數(shù)據(jù)集特點、選擇最高效 Join 算的優(yōu)化空間。Hive,Spark 和 Flink 都有基于代價的優(yōu)化器,可以對執(zhí)行路徑進行優(yōu)化。甚至,可以交換 Join 的順序,來最小化中間數(shù)據(jù)集的物化。
不同 Join 算法的選擇對批處理任務的性能影響極大,但我們最好避免將選擇的心智負擔推給用戶,而可以自動地根據(jù)情況進行優(yōu)化。使用聲明式風格的接口使這種自動優(yōu)化稱為可能:用戶側(cè)僅需要指定哪些數(shù)據(jù)集需要 Join,而查詢優(yōu)化器會根據(jù)數(shù)據(jù)特點動態(tài)的決定其最優(yōu) Join 方式。我們在數(shù)據(jù)查詢語言一節(jié)中討論過這種思想。
但從另一方面來說,MapReduce 和其后繼的數(shù)據(jù)流框架和 SQL 這種完全的聲明式語言又不一樣。MapReduce 是基于回調(diào)函數(shù)來構(gòu)建的:對于任意的一條或一批數(shù)據(jù),用戶可以自定義處理函數(shù)(Mapper 或者 Reducer),調(diào)用任何庫代碼、決定其輸出格式。這種方式的優(yōu)點是,你可以復用很多現(xiàn)成的庫來減少開發(fā)工作量,比如 Parsing、自然語言分析、圖像分析和一些數(shù)理統(tǒng)計算法方面的庫。
在很長一段時間內(nèi),能夠自由地跑任意的代碼是批處理系統(tǒng)和 MPP 數(shù)據(jù)庫的一個重要區(qū)分點。盡管數(shù)據(jù)庫也支持 UDF(user defined function),但使用起來較為復雜,且不能很好的和編程語言的包管理工具(比如 Maven 之于 Java,npm 之于 JavaScript,Rubygems 之于 Ruby)相整合。
然而,在 Join 之外,更進一步地引入聲明式功能也對數(shù)據(jù)流工具有諸多好處。例如,一個過濾函數(shù)只有很簡單的過濾條件(過濾行)、或只是從原數(shù)據(jù)集中選擇幾列(過濾列),則針對每條數(shù)據(jù)都調(diào)用一遍回調(diào)函數(shù)會有很大的額外性能損耗。如果這些簡單的過濾和投影能夠用聲明式的方式表達,則優(yōu)化器可以充分利用面向列的存儲格式(參見列存),只讀取需要的列。Hive,Spark DataFrames 和 Impala 還使用了列式執(zhí)行引擎(vectorized execution):
以一種 CPU 緩存友好的方式,緊湊地進行迭代(每次取一個 Cache Line,使用 SIMD 指令進行運算),以減少函數(shù)調(diào)用次數(shù)。
Spark 使用 JVM 字節(jié)碼、Impala 使用 LLVM 來通過生成代碼的方式優(yōu)化這些 Join 內(nèi)層循環(huán)。
通過在高層 API 中注入聲明式的特性、在運行時使用優(yōu)化器動態(tài)地優(yōu)化,批處理框架長得越來越像 MPP 數(shù)據(jù)庫(也獲得了類似性能)。但同時,仍然保持原來允許運行任意庫代碼、讀取任意格式數(shù)據(jù)的擴展性,讓這些框架仍然可以保持原有的靈活性。
不同領域的特化
保留運行任意代碼的自由度很有必要,但對于很多非常通用、反復出現(xiàn)的處理模式,我們有必要提供系統(tǒng)實現(xiàn)以方便用戶復用。傳統(tǒng)上,MPP 數(shù)據(jù)庫通常充當商業(yè)智能(BI)分析和商業(yè)匯報領域的生態(tài)位,但這個方向只是批處理眾多應用方向的一個。
另外一個越來越重要的方向是數(shù)值統(tǒng)計算法,其在推薦和分類的機器學習算法中常常用到。可復用的實現(xiàn)逐漸多了起來:例如 Mahout 在 MapReduce、Spark 和 Flink 之上實現(xiàn)了很多機器學習算法;MADlib 也在 MPP 數(shù)據(jù)庫之上實現(xiàn)了類似的功能模塊。
其他有用的算法還有—— k 最近鄰算法(k-nearest neighbors)——一種在多維空間中搜索與給定數(shù)據(jù)條目相似度最高的數(shù)據(jù)算法,是一種近似性搜索算法。近似搜索對于基因組分析算法也很重要,因為在基因分析中,常需要找不同但類似的基因片段。近年來較火的向量數(shù)據(jù)庫也是主要基于該算法。
批處理引擎被越來越多的用到不同領域算法的分布式執(zhí)行上。隨著批處理系統(tǒng)越來越多支持內(nèi)置函數(shù)和高層聲明式算子、MPP 數(shù)據(jù)庫變的越來越可編程和靈活度高,他們開始長的越來越像——說到底,本質(zhì)上他們都是用于存儲和處理數(shù)據(jù)的系統(tǒng)。
小結(jié)在本章,我們探討了批處理的話題。我們從 Unix 的命令行工具 awk、grep 和 sort 開始,探討其背后的思想被如何應用到 MapReduce 框架和更近的數(shù)據(jù)流框架中。這些核心設計原則包括:
- 輸入數(shù)據(jù)不可變
- 一個組件的輸出可以喂給另一個組件成為輸入
- 通過組合“解決好一件事的小工具”來解決復雜問題
在 Unix 世界中,讓所有命令行具有可組合性的統(tǒng)一抽象是——文件和管道,在 MapReduce 中,這個抽象是分布式文件系統(tǒng)。之后我們注意到,數(shù)據(jù)流工具通過增加各自的“類管道”的數(shù)據(jù)傳輸方式,避免了將中間結(jié)果物化到分布式文件系統(tǒng)中的額外損耗,但最外側(cè)的輸入和輸出仍然是在 HDFS 上。
分布式處理框架最主要解決的兩個問題是:
-
分片
在 MapReduce 中,會根據(jù)輸入數(shù)據(jù)的文件塊(file chunk)的數(shù)量來調(diào)度 mappers。mappers 的輸出會在二次分片、排序、合并(我們通常稱之為 shuffle)到用戶指定數(shù)量的 Reducer 中。該過程是為了將所有相關的數(shù)據(jù)(如具有相同 key)集結(jié)到一塊。
后 MapReduce 時代的數(shù)據(jù)流工具會盡量避免不必要的排序(因為代價太高了),但他們?nèi)匀皇褂昧撕?MapReduce 類似的分區(qū)方式。
-
容錯
MapReduce 通過頻繁的(每次 MapReduce 后)刷盤,從而可以避免重啟整個任務,而只重新運行相關子任務就可以從其故障中快速恢復過來。但在錯誤頻率很低的情況下,這種頻繁刷盤做法代價很高。數(shù)據(jù)流工具通過盡可能的減少中間狀態(tài)的刷盤(當然,shuffle 之后還是要刷的),并將其盡可能的保存在內(nèi)存中,但這意味著一旦出現(xiàn)故障就要從頭重算。算子的確定性可以減少重算的數(shù)據(jù)范圍(確定性能保證只需要算失敗分區(qū),并且結(jié)果和其他分區(qū)仍然一致)。
接下來我們討論了幾種基于 MapReduce 的 Join 算法,這些算法也常被用在各種數(shù)據(jù)流工具和 MPP 數(shù)據(jù)庫里。他們很好的說明了基于數(shù)據(jù)分區(qū)的算法的工作原理:
-
Sort-merge joins
分桶排序。將多個待 join 的輸入數(shù)據(jù)使用一個 MapReduce 處理,在 Mapper 中提取待 join key ,然后通過再分區(qū)、排序和合并,會將具有相同 join key 的 records 送到同一個 Reducer 中進行 join。然后 Reducer 函數(shù)會將 join 結(jié)果進行輸出。
-
Broadcast hash joins
小表廣播。如果 join 中的一個表數(shù)據(jù)量很小,可以完全加載進內(nèi)存的哈希表里,則不用對其進行分片。我們可以將大表進行分片,分發(fā)給各個 mapper,每個 Mapper 將小表加載到內(nèi)存里,然后逐個遍歷大表每個 record,提取相應 join key,再與小表中的記錄值進行 Join。
-
Partitioned hash joins
分桶哈希。如果兩個待 join 輸入使用相同的方式進行分片(相同的 key、相同的哈希函數(shù)和分區(qū)數(shù)),則廣播哈希算法可以在每個分區(qū)內(nèi)單獨應用。
分布式批處理引擎使用了受限的編程模型:回調(diào)函數(shù)需要是無狀態(tài)的,且除了輸出之外沒有其他的副作用。在此設定下,框架可以向應用層屏蔽很多分布式系統(tǒng)的實現(xiàn)細節(jié):當遇到宕機或者網(wǎng)絡問題時,子任務可以安全的進行重試;失敗任務的輸出可以自由拋棄;如果有多個冗余計算過程都成功了,則只有其中一個可以作為輸出對后面可見。
由于框架的存在,用戶側(cè)的批處理代碼無需關心容錯機制的實現(xiàn)細節(jié):即使在物理上有大量錯誤重試的情況下,框架可以保證在邏輯上最終的輸出和沒有任何故障發(fā)生是一致的。這種可靠性語義保證(reliable semantics)通常遠強于我們在在線服務中常見到的、將用戶的請求寫到數(shù)據(jù)庫中的容錯性。
批處理任務的基本特點是——讀取輸入,進行處理,產(chǎn)生輸出的過程中,不會修改原數(shù)據(jù)。換句話說,輸出是輸入的衍生數(shù)據(jù)。其中一個重要特點是,輸入數(shù)據(jù)是有界的(bounded):輸入的大小是固定的、事先確定的(比如輸入是包含一組日志的數(shù)據(jù)或者一個快照點的數(shù)據(jù))。唯其有界,處理任務才能知道什么時候輸入讀取結(jié)束了、什么時候計算完成了。
但在下一章中,我們將會轉(zhuǎn)到流處理(stream processing)上,其中,輸入是無界的(unbounded)——你的任務面對的是不知道何時結(jié)束的無限數(shù)據(jù)流。在這種情況下,任何時刻都有可能有新的數(shù)據(jù)流入,任務會永不結(jié)束。我們之后可以看到,雖然批處理和流處理在某些方面有相似之處,但對于輸入的無界假設,會在構(gòu)建系統(tǒng)時對我們的設計產(chǎn)生諸多影響。
參考資料
[1]DDIA 讀書分享會: https://ddia.qtmuniao.com/
DDIA 學習會
這本書由于涵蓋知識點實在太多,沒有一定基礎讀的時候會有很多問題,如果沒有人交流和解惑,往往堅持一半就容易棄掉。 因此我依托小報童平臺建立了一個針對本書的學習和答疑的學習會。 主要目的,是結(jié)合我的一些工業(yè)經(jīng)驗,給大家提供一些細節(jié)的答疑,但能力所限,也難免有疏漏和謬誤之處,如果你覺得我的回答有問題,歡迎 challenge,交流才能進步! 次要目的,就是為所有喜歡這本書的同學創(chuàng)造一個交流討論環(huán)境。

入會方式:訂閱專欄:https://xiaobot.net/p/large-scale-sys (最下方,點閱讀原文即可達)。 如果有任何疑問和建議,訂購前可以加我微信:qtmuniao。
加我微信(vx: qtmuniao)拉你入學習會的群,進行答疑、討論和信息發(fā)布。大致形式:每兩周過一章,發(fā)一篇以章節(jié)標題+時間段的空白文章。
-
留言答疑:大家在上述文章下面打卡和提問。
-
文字答疑:微信群,每天我會抽時間集中回復。
-
視頻答疑:騰訊會議,每周一次,每次一小時。
-
答疑匯總:將大家的問題和我的回答整理處理,以文章形式發(fā)在小報童專欄中;答疑錄音會發(fā)到小宇宙上。
最后歡迎關注我的公眾號: 木鳥雜記 ,專注分布式系統(tǒng)、數(shù)據(jù)庫和存儲等大規(guī)模數(shù)據(jù)系統(tǒng),關注后可回復“資料”領取一份我總結(jié)的分布式系統(tǒng)和數(shù)據(jù)庫的入門資料大全。
