分布式場景怎么Join?
背景
最近在閱讀查詢優(yōu)化器的論文,發(fā)現(xiàn)System R中對于Join操作的定義一般分為了兩種,即嵌套循環(huán)、排序-合并聯(lián)接。
考慮到我的領(lǐng)域是在處理分庫分表或者其他的分區(qū)模式,這讓我開始不由得聯(lián)想我們怎么在分布式場景應(yīng)用這個Join邏輯,對于兩個不同庫里面的不同表我們是沒有辦法直接進(jìn)行Join操作的。查閱資料后發(fā)現(xiàn)原來早有定義,即分布式聯(lián)接算法。
分布式聯(lián)接算法
跨界點處理數(shù)據(jù)即分布式聯(lián)接算法,常見的有四種模型:Shuffle Join(洗牌聯(lián)接)、Broadcast Join(廣播聯(lián)接)、MapReduce Join(MapReduce聯(lián)接)、Sort-Merge Join(排序-合并聯(lián)接)。
接下來將進(jìn)行逐一了解與分析,以便后續(xù)開發(fā)的應(yīng)用。
Shuffle Join(洗牌聯(lián)接)
先上原理解釋:
“
Shuffle Join的核心思想是將來自不同節(jié)點的數(shù)據(jù)重新分發(fā)(洗牌),使得可以聯(lián)接的數(shù)據(jù)行最終位于同一個節(jié)點上。
“通常,對于要聯(lián)接的兩個表,會對聯(lián)接鍵應(yīng)用相同的哈希函數(shù),哈希函數(shù)的結(jié)果決定了數(shù)據(jù)行應(yīng)該被發(fā)送到哪個節(jié)點。這樣,所有具有相同哈希值的行都會被送到同一個節(jié)點,然后在該節(jié)點上執(zhí)行聯(lián)接操作。
可能解釋完還是有點模糊,舉個例子,有兩張表,分別以id字段進(jìn)行分庫操作,且哈希算法相同(為了簡單,這里只介紹分庫場景,分庫分表同理。算法有很多種,這里舉例是hash算法),那么這兩張表的分片或許可以在同一個物理庫中,這樣我們不需要做大表維度的處理,我們可以直接下推Join操作到對應(yīng)的物理庫操作即可。
在ShardingSphere中,這種場景類似于綁定表的定義,如果兩張表的算法相同,可以直接配置綁定表的關(guān)系,進(jìn)行相同算法的連接查詢,避免復(fù)雜的笛卡爾積。
這樣做的好處是可以盡量下推到數(shù)據(jù)庫操作,在中間件層面我們可以做并行處理,適合大規(guī)模的數(shù)據(jù)操作。
但是,這很理想,有多少表會采用相同算法處理呢。
Broadcast Join(廣播聯(lián)接)
先上原理解釋:
“當(dāng)一個表的大小相對較小時,可以將這個小表的全部數(shù)據(jù)廣播到所有包含另一個表數(shù)據(jù)的節(jié)點上。
“每個節(jié)點上都有小表的完整副本,因此可以獨立地與本地的大表數(shù)據(jù)進(jìn)行聯(lián)接操作,而不需要跨節(jié)點通信。
舉個例子,有一張非常小的表A,還有一張按照ID分片的表B,我們可以在每一個物理庫中復(fù)制一份表A,這樣我們的Join操作就可以直接下推到每一個數(shù)據(jù)庫操作了。
這種情況比Shuffle Join甚至還有性能高效,這種類似于ShardingSphere中的廣播表的定義,其存在類似于字典表,在每一個數(shù)據(jù)庫都同時存在一份,每次寫入會同步到多個節(jié)點。
這種操作的好處顯而易見,不僅支持并行操作而且性能極佳。
但是缺點也顯而易見,如果小表不夠小數(shù)據(jù)冗余不說,廣播可能會消耗大量的網(wǎng)絡(luò)帶寬和資源。
MapReduce Join(MapReduce聯(lián)接)
先上原理解釋:
MapReduce是一種編程模型,用于處理和生成大數(shù)據(jù)集,其中的聯(lián)接操作可以分為兩個階段:Map階段和Reduce階段。
Map階段:
-
每個節(jié)點讀取其數(shù)據(jù)分片,并對需要聯(lián)接的鍵值對應(yīng)用一個映射函數(shù),生成中間鍵值對。
Reduce階段:
-
中間鍵值對會根據(jù)鍵進(jìn)行排序(在某些實現(xiàn)中排序發(fā)生在Shuffle階段)和分組,然后發(fā)送到Reduce節(jié)點。
-
在Reduce節(jié)點上,具有相同鍵的所有值都會聚集在一起,這時就可以執(zhí)行聯(lián)接操作。
MapReduce Join不直接應(yīng)用于傳統(tǒng)數(shù)據(jù)庫邏輯,而是適用于Hadoop這樣的分布式處理系統(tǒng)中。但是為了方便理解,還是用SQL語言來分析,例如一條SQL:
SELECT orders.order_id, orders.date, customers.name
FROM orders
JOIN customers ON orders.customer_id = customers.customer_id;
會被轉(zhuǎn)換為兩個SQL:
SELECT customer_id, order_id, date FROM orders;
SELECT customer_id, name FROM customers;
這個過程就是Map階段,即讀取orders和customers表的數(shù)據(jù),并為每條記錄輸出鍵值對,鍵是customer_id,值是記錄的其余部分。
下一個階段可有可無,即Shuffle階段。如果不在這里排序可能會在Map階段執(zhí)行SQL時候排序/分組或者在接下來的Reduce階段進(jìn)行額外排序/分組。在這個階段主要將收集到的數(shù)據(jù)按照customer_id排序分組,以確保相同的customer_id的數(shù)據(jù)達(dá)到Reduce階段。
Reduce階段將每個對應(yīng)的customer_id進(jìn)行聯(lián)接操作,輸出并返回最后的結(jié)果。
這種操作普遍應(yīng)用于兩個算法完全不相同的表單,也是一種標(biāo)準(zhǔn)的處理模型,在這個過程中,我們以一張邏輯表的維度進(jìn)行操作。這種算法可能會消耗大量內(nèi)存,甚至導(dǎo)致內(nèi)存溢出,并且在處理大數(shù)據(jù)量時會相當(dāng)耗時,因此不適合需要低延遲的場景。
額外補(bǔ)充
內(nèi)存溢出場景普遍在如下場景:
-
大鍵值對數(shù)量: 如果Map階段產(chǎn)生了大量的鍵值對,這些數(shù)據(jù)需要在內(nèi)存中進(jìn)行緩存以進(jìn)行排序和傳輸,這可能會消耗大量內(nèi)存。
-
數(shù)據(jù)傾斜: 如果某個鍵非常常見,而其他鍵則不那么常見,那么處理這個鍵的Reducer可能會接收到大量的數(shù)據(jù),導(dǎo)致內(nèi)存不足。這種現(xiàn)象稱為數(shù)據(jù)傾斜。
-
大值列表: 在Reduce階段,如果某個鍵對應(yīng)的值列表非常長,處理這些值可能會需要很多內(nèi)存。
-
不合理的并行度: 如果Reduce任務(wù)的數(shù)量設(shè)置得不合適(太少或太多),可能會導(dǎo)致單個任務(wù)處理不均勻,從而導(dǎo)致內(nèi)存問題。
我能想到的相應(yīng)解決方案:
-
內(nèi)存到磁盤的溢寫:當(dāng)Map任務(wù)的輸出緩沖區(qū)滿了,它會將數(shù)據(jù)溢寫到磁盤。這有助于限制內(nèi)存使用,但會增加I/O開銷。
-
通過設(shè)置合適的Map和Reduce任務(wù)數(shù)量,可以更有效地分配資源,避免某些任務(wù)過載。具體操作可以將Map操作的分段比如1~100,100~200,Reduce階段開設(shè)較少的并發(fā)處理。
-
優(yōu)化數(shù)據(jù)分布,比如使用范圍分區(qū)(
range partitioning)或哈希分區(qū)(hash partitioning)來減少數(shù)據(jù)傾斜。
Sort-Merge Join(排序-合并聯(lián)接)
先上原理解釋:
“在分布式環(huán)境中,
Sort-Merge Join首先在每個節(jié)點上對數(shù)據(jù)進(jìn)行局部排序,然后將排序后的數(shù)據(jù)合并起來,最后在合并的數(shù)據(jù)上執(zhí)行聯(lián)接操作。
“這通常涉及到多階段處理,包括局部排序、數(shù)據(jù)洗牌(重新分發(fā)),以及最終的排序和合并。
舉個理解,還是上面的SQL。
SELECT orders.order_id, orders.date, customers.name
FROM orders
JOIN customers ON orders.customer_id = customers.customer_id;
-
對orders表按 customer_id進(jìn)行排序。 -
對customers表按 customer_id進(jìn)行排序。 -
同時遍歷兩個已排序的表,將具有相同 customer_id的行配對。
這個就有點類似于原生的排序-合并聯(lián)接了。也是數(shù)據(jù)庫場景的標(biāo)準(zhǔn)處理辦法。
對于已經(jīng)排序的數(shù)據(jù)集或數(shù)據(jù)分布均勻的情況,這種方法非常有效。如果數(shù)據(jù)未預(yù)先排序,這種方法可能會非常慢,因為它要求數(shù)據(jù)在聯(lián)接之前進(jìn)行排序。
當(dāng)然,這個算法也會造成內(nèi)存溢出的場景,解決方案如下:
-
當(dāng)數(shù)據(jù)集太大而無法一次性加載到內(nèi)存中時,可以使用外部排序算法。外部排序算法會將數(shù)據(jù)分割成多個批次,每個批次單獨排序,然后將排序后的批次合并。這種方法通常涉及到磁盤I/O操作,因此會比內(nèi)存中操作慢。
-
對于合并步驟,可以使用流式處理技術(shù),一次只處理數(shù)據(jù)的一小部分,并持續(xù)將結(jié)果輸出到下一個處理步驟或存儲系統(tǒng)。這樣可以避免一次性加載大量數(shù)據(jù)到內(nèi)存中。
-
當(dāng)內(nèi)存不足以處理數(shù)據(jù)時,可以使用磁盤空間作為臨時存儲。數(shù)據(jù)庫管理系統(tǒng)通常有機(jī)制來處理內(nèi)存溢出,比如創(chuàng)建磁盤上的臨時文件來存儲過程中的數(shù)據(jù)。
-
在分布式系統(tǒng)中,可以將數(shù)據(jù)分散到多個節(jié)點上進(jìn)行處理,這樣每個節(jié)點只需要處理數(shù)據(jù)的一部分,從而減少單個節(jié)點上的內(nèi)存壓力。
來源|blog.csdn.net/weixin_56270372/
article/details/135936319
程序汪資料鏈接
歡迎添加程序汪微信 itwang007 進(jìn)粉絲群
