快過HugeCTR:用OneFlow輕松實現(xiàn)大型推薦系統(tǒng)引擎
一、簡介
Wide & Deep Learning (以下簡稱 WDL)是解決點擊率預(yù)估(CTR Prediction)問題比較重要的模型。WDL 在訓(xùn)練時,也面臨著點擊率預(yù)估領(lǐng)域存在的兩個挑戰(zhàn):巨大的詞表(Embedding Table),以及大量的數(shù)據(jù)吞吐。
業(yè)界比較有影響力的包含了 WDL 解決方案及評測結(jié)果的項目有 HugeCTR[1],該框架通過模型并行、三級流水線等技巧,解決了以上問題。在2020年 MLPerf [2]評測中,英偉達用 HugeCTR 實現(xiàn)了當(dāng)時最快的 WDL 模型。
英偉達 Blog[3] 給出的數(shù)據(jù)顯示,在特定的、對齊后的硬件條件下,HugeCTR 的速度是 TensorFlow-CPU 的114倍,是 TensorFlow-GPU 的7.4倍,下圖的縱坐標代表每輪迭代的延遲,數(shù)值越小,意味著性能越好:

而 OneFlow-WDL 比 HugeCTR 更快,每輪迭代延遲比 HugeCTR 更少:
上圖節(jié)取自 DLPerf[4] 中的 WDL(OneFlow-WDL vs. HugeCTR)評測報告,展示了 vocabulary size 倍增實驗的結(jié)果,橫坐標為實驗時所取的 vocabulary size 參數(shù)大小,不斷翻倍,直到超出框架的最大負荷(OOM 為 Out of Memory 的縮寫)。
可以看到,相同條件下的各組實驗中,OneFlow 的訓(xùn)練速度比 HugeCTR 快。并且,隨著 vocabulary size 的增大,OneFlow 的每次迭代的 latency 幾乎無變化,性能無損失。
使用 OneFlow 在 32 張 V100-SXM2-16GB 組成的集群上訓(xùn)練 WDL,可以支持8億(819200000)大小的詞表。
原始的日志數(shù)據(jù)、更詳細的圖表說明,可以參考最近公布的 DLPerf [5]關(guān)于 OneFlow 與 HugeCTR 實現(xiàn)相同結(jié)構(gòu) WDL 的性能測試報告。
OneFlow 作為一款通用的深度學(xué)習(xí)框架,所實現(xiàn)的 OneFlow-WDL 模型性能卻超越了專為 CTR Prediction 問題設(shè)計的 HugeCTR 框架,內(nèi)部的技術(shù)原理有哪些呢?本文將詳細揭秘 OneFlow-WDL 實現(xiàn)的技術(shù)細節(jié),并將不同的場景中 OneFlow-WDL 的多種分布式實現(xiàn)方案做詳細的橫向?qū)Ρ扰c分析,以方便讀者根據(jù)自身需求與應(yīng)用場景,選擇最適合的方案。
熟悉 WDL 模型的讀者,可以直接跳到第三節(jié)“如何在 OneFlow 中實現(xiàn)分布式 WDL” 開始閱讀。
二、WDL、大詞表與 OneFlow
OneFlow-WDL 為什么這么快,OneFlow 作為分布式最易用的深度學(xué)習(xí)框架,在實現(xiàn) OneFlow-WDL 的過程中有哪些過人之處?這個問題難以簡單直接地回答,為此我們準備了此節(jié)內(nèi)容,將依次介紹:
CTR Prediction 是什么?WDL 是什么?它們解決了什么問題? WDL 模型在實際工程中為什么需要分布式?為什么困難? OneFlow 為什么適合解決 WDL 這樣的大模型問題?
讀者可以根據(jù)自身情況,略過已經(jīng)了解的部分,挑選自己還不了解的部分閱讀即可。
2.1 CTR Prediction 與 WDL
2.1.1 CTR Prediction 問題
CTR Prediction 問題的目標是預(yù)測用戶點擊率,點擊率作為一種指標,可以一定程度地反映用戶對所提供的內(nèi)容的感興趣程度。因此,CTR Prediction 技術(shù)廣泛應(yīng)用在推薦、排序搜索、在線廣告等領(lǐng)域?;ヂ?lián)網(wǎng)公司大部分的服務(wù)都或多或少與 CTR Prediction 有關(guān)系,無論是 BAT 各家的廣告業(yè)務(wù),還是美團的首頁 rank、頭條的 feed 流,背后都有 CTR Prediction 的身影。
下面用一個簡化的廣告推薦例子,來說明 CTR Prediction 所要解決的問題及其解決思路。
以上的表格中,Item 是將要作為廣告展示給用戶的內(nèi)容,我們希望能夠根據(jù)用戶的信息預(yù)測用戶是否會點擊這個廣告,從而達到更精確推薦的目的。
我們將用戶的相關(guān)信息抽象為特征(X),點擊行為作為函數(shù)的目標(y),那么問題的核心在于,如何盡可能準確地從數(shù)據(jù)中學(xué)習(xí)到 X 與 y 的關(guān)系 。這可以簡化地理解為一個分類問題。其中:y 為是否點擊,X 為用戶的相關(guān)信息。
用來解決 CTR Prediction 問題的模型有很多種,WDL 只是其中(很重要)的一種。
那么,WDL 為什么在 CTR Prediction 模型中如此重要呢?這主要是因為 WDL 模型的特殊歷史地位決定的,可以從下圖 (圖來源:https://zhuanlan.zhihu.com/p/243243145)CTR Prediction 模型演化的歷程中看到,WDL 起到了承上啟下的作用,可以說,當(dāng)今落地的 CTR Prediction 模型,都或多或少有 WDL 的影子,它們要么是通過改進 WDL 的 Wide 部分得到,要么是改進 WDL 的 Deep 部分得到,要么是 WDL 的前身。

2.1.2 WDL(Wide & Deep Learning)

上圖展示了 Google 團隊 ?WDL論文[6]中提出的模型的結(jié)構(gòu),WDL 模型分為 Wide 和 Deep 兩部分:
單看 Wide 部分,與?Logistic Regression?模型并沒有什么區(qū)別,就是一個線性模型。 Deep 部分則是先對類型特征(Categorical Features)做 Embedding,在 Embedding 后接一個由多個隱藏層組成的神經(jīng)網(wǎng)絡(luò),用于學(xué)習(xí)特征之間的高階交叉組合關(guān)系。
由于 Embedding 機制的引入,WDL 相比于單純的 Wide 模型有更強的泛化能力。Google 論文中展示了一個具體的例子:

這是一個關(guān)于 APP 推薦的例子,WDL 模型的具體網(wǎng)絡(luò)結(jié)構(gòu)以及輸入如下:
Wide 部分:線性模型部分通常輸入稀疏的類別特征進行訓(xùn)練。另外,通過利用交叉特征高效的實現(xiàn)記憶能力,達到準確推薦的目的。比如在這個例子里,選取了兩個類別特征(User Installed App 與 Impression App)做叉積變換的結(jié)果作為線性部分的輸入。 Deep部分:稀疏、高維的類別特征首先通過 Embeddings 轉(zhuǎn)換為低維稠密向量,然后與連續(xù)值特征拼接在一起,作為MLP的輸入。 Wide & Deep聯(lián)合訓(xùn)練:Wide 部分和 Deep 部分的輸出通過加權(quán)方式合并到一起,并通過 Logistic Loss 得到最終輸出。
注意其中的 Embedding 過程,通常情況下,因 Embedding 而引入的巨大詞表(Embedding Table),是 WDL 必須使用分布式的最主要原因。
2.2 詞表為什么這么大
本節(jié)將介紹:
WDL 中為什么會有巨大的 Embedding Table? 為什么實現(xiàn)模型并行的分布式 WDL 是必需的也是困難的?
2.2.1 從 One-Hot 到 Embedding
WDL 需要 Embeding,雖然 Embedding 已經(jīng)是 CTR 系統(tǒng)的基本操作,但是名氣最大的可能還是詞嵌入(word embedding),它也更容易解釋。我們先簡單介紹 One-Hot 與詞嵌入,在下一節(jié)將看到,它與 WDL 中采用的 Embedding 沒有本質(zhì)區(qū)別。眾所周知,One-Hot 編碼是最原始的用來表示字、詞的方式。假如能使用的字只有五個:“牛、年、運、氣、旺”,那么它們的 One-Hot 編碼可以是:
牛:?[1,?0,?0,?0,?0]
年:?[0,?1,?0,?0,?0]
運:?[0,?0,?1,?0,?0]
氣:?[0,?0,?0,?1,?0]
旺:?[0,?0,?0,?0,?1]
“運氣”這個詞,采用以上 One-Hot 編碼,就是:
這太稀疏了,在工程實現(xiàn)中有諸多弊端,于是我們可以準備一個矩陣,利用矩陣乘法:
將2個1×5的稀疏向量,“壓縮到”到 2個1×3 的稠密向量中(詞向量)。
以上包含有 的矩陣,就稱為詞表(Embedding Table),且由于 One-Hot 編碼的特殊性,One-Hot 向量與詞表的矩陣乘法,其實相當(dāng)于是一次“查表”的過程,如上例中,其實就是根據(jù)“1”在 One-Hot 向量中的位置(第2列、第3列),從詞表中取出對應(yīng)的向量(第2行、第3行)。
在實際工程中,并不會真正進行 One-Hot 編碼(浪費內(nèi)存),而是將 One-Hot 編碼中 “1”的位置作為編號(通常稱為 sparse ID),利用 gather[7] 操作,從詞表中取出詞向量,等價于矩陣乘法。
如,上例的矩陣乘法,在 OneFlow 中用代碼表示,則為:
embedding?=?flow.gather(embedding_table,?index,?axis=0)?#?index:?[2,?3]
2.2.2 WDL 中的 Embedding
實際場景中,圖像識別、語音識別等問題的輸入常常具有連續(xù)、稠密且空間/時間有良好局部相關(guān)性的特點,CTR Prediction 問題則不同,大多數(shù)輸入都是稀疏、離散、高維的類別特征(Categorical Features),因此通常需要通過 Embedding Table 將這些稀疏的類別特征轉(zhuǎn)換為低維稠密向量。
我們已經(jīng)知道,對于 Word Embedding,有多少個字(詞),Embedding Table 就有多少行;那么,Wide & Deep 中的 Embedding Table 的行數(shù),又是由什么決定的呢?它其實是所有 Categorical Features 做 One-Hot 編碼后的維度總和。
我們以找到以下表格對應(yīng)的詞表的大小為例:
| 手機 | 樣式 | 時間段 | 文章新嗎 |
|---|---|---|---|
| 小米 | 大圖 | 早上 | 新 |
| iPhone | 三圖 | 下午 | 舊 |
| 三星 | 中圖 | 早上 | 新 |
先對各個類別特征分別做 One-Hot 編碼:
| 手機 | 樣式 | 時間段 | 文章新嗎 |
|---|---|---|---|
| (1, 0, 0) | (1, 0, 0) | (1, 0) | (1, 0) |
| (0, 1, 0) | (0, 1, 0) | (0, 1) | (0, 1) |
| (0, 0, 1) | (0, 0, 1) | (1, 0) | (1, 0) |
理論上,每個類別特征的 One-Hot 編碼都應(yīng)該有對應(yīng)的 Embeding Table:
| 手機 | 樣式 | 時間段 | 文章新嗎 | |
|---|---|---|---|---|
| (1, 0, 0) | (1, 0, 0) | (1, 0) | (1, 0) | |
| (0, 1, 0) | (0, 1, 0) | (0, 1) | (0, 1) | |
| (0, 0, 1) | (0, 0, 1) | (1, 0) | (1, 0) | |
| 詞表行數(shù) | 3 | 3 | 2 | 2 |
工程實踐中,往往將各個特征類別的詞表拼起來成為一個大詞表,并且,像前文介紹的那樣,用 gather 方法可以代替 One-Hot 向量與矩陣的乘法,因此實際操作中,并不會真正對類別特征的值進行 One-Hot 編碼,僅僅賦予類別特征 sparse ID 作為查表索引即可:

對于我們以上有3個樣本、4個類別特征的數(shù)據(jù),詞表共需要9行。
在工業(yè)實際場景中,總的類別種類往往達到百萬、千萬甚至上億級別,因此 Embedding Table 非常巨大(GBs~TBs),將 Embedding Table 存放在一個單獨的 GPU 上通常是做不到的。
因此需要實現(xiàn)分布式 WDL。更具體地說,是要實現(xiàn)模型并行的分布式 WDL。
數(shù)據(jù)并行:將訓(xùn)練樣本分發(fā)到各個計算設(shè)備上,而每個設(shè)備上保存完整的模型。
模型并行:將模型參數(shù)分配到不同計算設(shè)備上,各設(shè)備只有部分模型。
目前已有的深度學(xué)習(xí)框架,大多數(shù)提供了對數(shù)據(jù)并行的原生支持,但是對模型并行支持的還不完善。如果用戶想要將模型參數(shù)分配到不同設(shè)備上,往往會遇到需要人工指定模型切分方式、手工編寫數(shù)據(jù)通信邏輯代碼等問題。
以 WDL 的詞表切分為例,既可以按照第0維度切分,也可以按第1維度切分,分發(fā)到各個 GPU 設(shè)備上,但是它們的效率是等價的嗎?編寫模型算法時如何表達切分的不同方式呢?大多數(shù)深度學(xué)習(xí)框架都沒有給出優(yōu)雅成熟的解決方案,而是對算法工程師提出了較高的要求,需要他們?yōu)楦鱾€部分手動進行運算設(shè)備定制。
此外,WDL 的模型訓(xùn)練中,與計算量相比,數(shù)據(jù) I/O 吞吐量非常大,如果不協(xié)調(diào)好 I/O 與 GPU 計算的關(guān)系,很容易使得 I/O 成為整個系統(tǒng)的瓶頸。
綜合以上兩點原因,在很長一段時間內(nèi),各深度學(xué)習(xí)框架都未能提出一個很好的利用 GPU 算力進行 WDL 模型訓(xùn)練的解決方案,最終表現(xiàn)為:
僅用數(shù)據(jù)并行,容易 out of memory,無法支持更大的模型。 低效實現(xiàn)的模型并行容易出現(xiàn) I/O 瓶頸問題,不能很好利用 GPU 算力,導(dǎo)致 GPU 版本解決方案與 CPU 方案比較提速不明顯。
2.3 OneFlow 為什么這么快
OneFlow 在分布式上的優(yōu)勢,主要體現(xiàn)在:
頂層設(shè)計中將數(shù)據(jù)加載、數(shù)據(jù)搬運等一起納入了優(yōu)化范疇,一勞永逸地解決了數(shù)據(jù)加載瓶頸、流水并行效率等問題。 對算法工程師友好,提供的 Placement 機制與 SBP 的概念,能輕松地解決 WDL 這類大模型的復(fù)雜并行問題。比如,后文中將要介紹三種并行方案,并均給出了實現(xiàn),我們會發(fā)現(xiàn) OneFlow 中切換三種方案只涉及改動極少量的代碼,完全不需要改模型結(jié)構(gòu)。目前這在其它框架中還難以做到。
總結(jié)而言,OneFlow 的“快”是:讓模型訓(xùn)練更快;讓算法工程師編程更快。
2.3.1 讓模型訓(xùn)練更快
對于頂層設(shè)計上的優(yōu)勢及框架內(nèi)部通用優(yōu)化技術(shù),可以參考已經(jīng)發(fā)布的文章《深度解讀:讓你掌握OneFlow系統(tǒng)設(shè)計》上篇、中篇、下篇與《深度學(xué)習(xí)框架如何進行性能優(yōu)化?》,在此不再重復(fù)。
2.3.2 讓算法工程師編程更快
OneFlow 獨創(chuàng)的 SBP 概念,使得算法工程師可以專注于邏輯,將并行時算子的復(fù)雜 placement 問題交給 OneFlow 框架。理解 OneFlow 如何搭建分布式 WDL,需要先了解 SBP 與 SBP Signature 的概念,我們在此做簡單介紹,已經(jīng)了解的讀者,可以直接跳過本節(jié)。
(1)SBP:SBP 是 OneFlow 獨有的概念,它來自 Split, Broadcast, Partial-value 三個首字母的組合。SBP 代表著1個邏輯上的Tensor 與多個物理上的 Tensor 的映射關(guān)系。
Split:表示物理上的多個 Tensor 是由邏輯上的 Tensor 進行切分后得到的。Split 會包含一個參數(shù) Axis,表示被切分的維度。如果把所有物理上的 Tensor 按照 Split 的維度進行拼接,就能還原出邏輯上的 Tensor。 Broadcast:表示物理上的多個 Tensor 是邏輯上 Tensor 的復(fù)制,完全相同。 Partial-value:表示物理上的多個 Tensor 跟邏輯上的 Tensor 的形狀相同,但每個對應(yīng)位置上元素的值是邏輯 Tensor 對應(yīng)位置元素的值的一部分。如果把所有物理上的 Tensor 對應(yīng)位置進行 element-wise 操作,即可還出原邏輯上的Tensor。常見的有 PartialSum、PartialMax、PartialMin 等。
以下是關(guān)于 SBP 的簡單示例,以邏輯上的 2×2 的張量為例,分別展示了 Split(0)、Split(1)、Broadcast、Partialsum 幾種情況下,邏輯張量(下圖左側(cè))與物理張量(下圖右側(cè))的對應(yīng)關(guān)系。
(2)SBP Signature:對于一個孤立的數(shù)據(jù),其 SBP 屬性可以隨意設(shè)置,但是對于Op,它的輸入、輸出的 SBP,應(yīng)該符合邏輯上 Op 的運算法則。讓我們以矩陣乘法為例,看看在有2個設(shè)備的分布式系統(tǒng)中,矩陣乘法的輸入、輸出的 SBP 要如何組合才合法。
邏輯上,一個性質(zhì)為 (m, k) 的矩陣 A 與形狀為 (k, n) 的矩陣 W 相乘得到 Y,Y的形狀必然為 (m, n),表示如下:
A?????*?????W?????=?????Y
(m,?k)??????(k,?n)??????(m,?n)
依據(jù)矩陣乘法的規(guī)律,我們可以將矩陣 A 按第0維進行切分,切分為形狀分別為 (m0, k)、(m1, k) 的兩個矩陣:A0 和 A1,然后分布到2個設(shè)備上分別計算:
device?0:
??A0?????*?????W?????=?????Y0
(m0,?k)?????(k,?n)??????(m0,?n)
device?1:
??A1?????*?????W?????=?????Y1
(m1,?k)?????(k,?n)??????(m1,?n)
我們?nèi)菀椎玫轿锢碓O(shè)備上的?A0、A1?與邏輯上的?A?的關(guān)系,以及?Y0、Y1?與邏輯上的?Y?的關(guān)系:
A?????==?????A0?????+?????A1
(m,?k)?????(m0,?k)????????(m1,?k)
?Y?????==?????Y0?????+?????y1
(m,?n)?????(m0,?n)????????(m1,?n)
注意,以上的“+”表示拼接(concat),而不是 element-wise 加法??梢姡凑找陨系姆绞?,將邏輯上的數(shù)據(jù)分發(fā)到各個物理設(shè)備上,是能夠完成運算,并且最終得到邏輯上的正確結(jié)果的。以上的過程,若使用 SBP 來描述,會變得異常簡單:
A 矩陣 為 S(0),W 矩陣為 B,結(jié)果 Y 為 S(0)。
對于某個 Op,其輸入輸出的 一個特定的、合法的 SBP 屬性組合,稱為這個 Op 的一個 SBP Signature。SBP Signature 描繪了 Op 如何看待邏輯視角的輸入輸出與物理視角的映射關(guān)系。
一個 Op 的 SBP Signature 往往可以有多個,對于上例的矩陣乘法,有以下 SBP Signature:
| A | W | Y |
|---|---|---|
| S(0) | B | S(0) |
| B | S(1) | S(1) |
| S(1) | S(0) | PartialSum |
| PartialSum | B | PartialSum |
| B | PartialSum | PartialSum |
OneFlow 的算子作者,在開發(fā)算子的過程中,會注冊好相關(guān)算子的 SBP Signature,使得 OneFlow 用戶使用分布式時,把精力專注于算子邏輯上,邏輯上像使用一臺“超級計算機”那樣操作整個集群。
三、如何在 OneFlow 中實現(xiàn)分布式 WDL
上一小節(jié)介紹了 WDL 模型結(jié)構(gòu)以及 OneFlow 的 SBP 與 SBP Signature 的概念,下面我們就來詳細揭秘,如何用 OneFlow 的“SBP”語言,輕松實現(xiàn)分布式 WDL。
3.1 OneFLow-WDL 網(wǎng)絡(luò)結(jié)構(gòu)

在 OneFlow-Benchmark[8] 倉庫中,給出了對標 ?HugeCTR[9] 實現(xiàn)的 OneFlow-WDL[10],模型的網(wǎng)絡(luò)結(jié)構(gòu)如上圖所示:
Wide 部分的輸入是
wide_sparse_fields,Deep 部分的輸入是deep_sparse_fields和dense_fields。其中wide_sparse_fields和deep_sparse_fields是類別特征的 sparse ID,dense_fields是稠密的連續(xù)值特征。具體的數(shù)據(jù)處理過程見:OneFlow-WDL 的 Criteo 數(shù)據(jù)集預(yù)處理[11]。Wide 部分是 Logistic Regression 線性模型,圖中的
wide_embedding_table是一個形狀為(wide_vocab_size, 1) 的向量, gather + reduce_sum 的操作組合起來實際上等價于 Logistic Regression 中的全連接層。Deep 部分的 gather 操作,就如我們之前介紹,實現(xiàn)了 Embedding 操作。
wide_sparse_fields就是 sparse ID,經(jīng)過對deep_embedding_table查表,轉(zhuǎn)換為低維稠密的deep_embedding,然后通過 concat 操作與dense_fields組成一個稠密的實值特征deep_features作為后續(xù)深度神經(jīng)網(wǎng)絡(luò)的輸入。
3.2 三種并行方案
前面已經(jīng)介紹實際場景中 WDL 需要巨大的 Embedding Table,通常單一計算設(shè)備無法放下完整的 Embedding Table,需要將 Embedding Table 切分到不同的設(shè)備上,即通過模型并行實現(xiàn) WDL 的訓(xùn)練。
借助 SBP 概念,可以很容易描述和實現(xiàn)分布式的并行方案,我們先對照 WDL 的網(wǎng)絡(luò)結(jié)構(gòu)圖總結(jié)并行時的約束:
wide_embedding_table是形狀為 (wide_vocab_size, 1) 的向量,所以只能 Split(0) 切分;deep_embedding_table可以 Split(0) 切分,也可以 Split(1) 切分;sparse ID 可以選擇通過 Broadcast 的方式聚合到各個計算設(shè)備,也可以保持 Split(0) 切分(OneFlow 在默認情況下會為數(shù)據(jù)做 Split(0) 切分,即數(shù)據(jù)并行)。
在遵循以上約束的前提下,我們共設(shè)計了三種方案,分別是:
| 并行方案 | sparse ID (wide_sparse_fields deep_sparse_fields) | wide_embedding_table | deep_embedding_table |
|---|---|---|---|
| 方案一 | Broadcast | Split(0) | Split(1) |
| 方案二 | Broadcast | Split(0) | Split(0) |
| 方案三 | Split(0) | Split(0) | Split(0) |
通過下文的具體分析可以發(fā)現(xiàn),對 deep_embedding_table 進行 Split(1) 切分, 通信量比 Split(0) 切分少,方案一是通常情況下的最優(yōu)選擇,也是 OneFlow 與 HugeCTR 對比測試時所選擇的方案。
但是,如果實際場景中,deep_embedding_table 的第1維無法被計算設(shè)備的個數(shù)整除時,方案一就失效了,若不愿意用其它手段繞過(如修改第1維的大小或使用非均衡的切分),則只能使用 Split(0) 切分,所以我們還設(shè)計了方案二。
?方案三是對方案二的一個補充,即改變了 sparse ID 的分發(fā)方式,使用 Split(0) 代替 Broadcast,雖然 sparse ID 進行 Split(0) 切分會引入額外的操作,但是通過下文的具體的分析可以發(fā)現(xiàn),在某些情況下,方案三節(jié)省的通信操作開銷會多于 Split(0) 切分引入的額外操作開銷,那樣,方案三會比方案二更有優(yōu)勢。
下面分別對這三種方案進行詳細解釋和分析。
3.2.1 方案一
| 并行方案 | sparse ID (wide_sparse_fields deep_sparse_fields) | wide_embedding_table | deep_embedding_table |
|---|---|---|---|
| 方案一 | Broadcast | Split(0) | Split(1) |
為方便讀者理解,我們在 WDL 模型的網(wǎng)絡(luò)結(jié)構(gòu)圖中,標出了 OneFlow 在邏輯視角下插入 SBP 屬性轉(zhuǎn)換操作(parallel_cast op)的位置,以及所有操作前后數(shù)據(jù)的 SBP 屬性,如下圖所示:
下面逐一介紹各個 parallel_cast op 的作用。
(1) sparse ID 廣播
方案一中 wide_sparse_fields 和 deep_sparse_fields 的 SBP 屬性是 Broadcast,需要先將完整的數(shù)據(jù)從CPU 拷貝到各個 GPU 設(shè)備上,存在以下兩種選擇:

1)由 CPU 逐個將完整數(shù)據(jù)拷貝到各個 GPU 設(shè)備上,如以上左圖所示。2)先將一部分數(shù)據(jù)塊從 CPU 拷貝到 GPU 上,然后在GPU上執(zhí)行 AllGather 操作,如上圖右側(cè)所示。
假設(shè) GPU 數(shù)為 P,完整數(shù)據(jù)塊大小為 D,上述兩種方法通信量均為 P*D,但由于通常情況下,GPU 之間的帶寬(尤其對于有 NVLink 的設(shè)備)大于 CPU 到 GPU 的帶寬,GPU 之間執(zhí)行 AllGather 操作很快,因此在實際實現(xiàn)中,我們建議采用第二種方法。
sparse ID 廣播的實現(xiàn)代碼:
#?將decoder產(chǎn)生的數(shù)據(jù)分塊從CPU拷貝到各個GPU上
labels,?dense_fields,?wide_sparse_fields,?deep_sparse_field?=?flow.identity_n(
????[labels,?dense_fields,?wide_sparse_fields,?deep_sparse_fields]
)?
#?將wide_sparse_fields的SBP屬性轉(zhuǎn)換為Broadcast
wide_sparse_fields?=?flow.parallel_cast(
????wide_sparse_fields,?distribute=flow.distribute.broadcast()
)?
#?將deep_sparse_fields的SBP屬性轉(zhuǎn)換為Broadcast
deep_sparse_fields?=?flow.parallel_cast(
????deep_sparse_fields,?distribute=flow.distribute.broadcast()
)
代碼說明:首先,因為OneFlow 默認的并行方式是數(shù)據(jù)并行, oneflow.identity_n 的作用是將 CPU 產(chǎn)生的數(shù)據(jù)分塊拷貝到各個 GPU上,每個 GPU 設(shè)備上的數(shù)據(jù)的 SBP 屬性為 Split(0)。
其次,通過調(diào)用 flow.parallel_cast ,將 wide_sparse_fields 和 deep_sparse_fields 并行方式轉(zhuǎn)換為 Broadcast,從而實現(xiàn)每個 GPU 設(shè)備上都拿到完整的數(shù)據(jù)。在這一步,OneFlow 內(nèi)部會自動執(zhí)行 AllGather 操作,完成各個 GPU 設(shè)備之間數(shù)據(jù)傳輸。
(2) Wide 部分
Wide 部分的 wide_embedding_talbe 的形狀為(wide_vocab_size, 1),我們將其進行 Split(0) 切分,即將其按第0維均分到 P 個 GPU 設(shè)備上,每個GPU設(shè)備上有(wide_vocab_size / P, 1) 大小的模型。

wide_sparse_fields 與部分wide_embedding_table 通過 gather 操作得到部分 wide_embedding 的示意圖。注意到,各個設(shè)備只持有部分模型,計算只能得到部分結(jié)果,即該操作后得到的 wide_embedding 的 SBP 屬性為 PartialSum。reduce_sum 操作不會改變數(shù)據(jù)的 SBP 屬性,因此 wide_scores 的 SBP 屬性仍然為 PartialSum。但是由于后續(xù)的計算是數(shù)據(jù)并行,因此需要將后續(xù)的 wide_scores 的 SBP 屬性轉(zhuǎn)換為 Split(0)。我們通過插入 parallel_cast op,實現(xiàn) wide_scores 的 SBP 屬性轉(zhuǎn)換,在這一步,OneFlow 內(nèi)部會自動執(zhí)行 ReduceScatter 操作完成各個 GPU 設(shè)備之間數(shù)據(jù)傳輸。
Wide 部分的實現(xiàn)代碼:
#?配置wide_embedding_table?的SBP屬性為Split(0)
wide_embedding_table?=?flow.get_variable(
????????????name='wide_embedding',
????????????shape=(FLAGS.wide_vocab_size,?1),
????????distribute=flow.distribute.split(0),
)
#?gather?+?reduce_sum的操作實現(xiàn)Logistic?Regression線性模型
#?gather:?Split(0)?+?Broadcast?->?PartialSum
wide_embedding?=?flow.gather(
????????params=wide_embedding_table,?indices=wide_sparse_fields
)
#reshepe:?PartialSum?->?PartialSum
wide_embedding?=?flow.reshape(
????????wide_embedding,?shape=(-1,?wide_embedding.shape[-1]?*?wide_embedding.shape[-2])
)
#reduce_sum:?PartialSum?->?PartialSum
wide_scores?=?flow.math.reduce_sum(
????????wide_embedding,?axis=[1],?keepdims=True
)
#?將wide_scores的SBP屬性轉(zhuǎn)換為Split(0)
wide_scores?=?flow.parallel_cast(
????????wide_scores,?distribute=flow.distribute.split(0),
?????????????????????????gradient_distribute=flow.distribute.broadcast()
)
代碼說明:首先,通過指定 flow.get_variable 的 distribute 參數(shù),將 wide_embedding_table 的 SBP 屬性設(shè)置為 Split(0),這樣每個 GPU 設(shè)備上的模型是邏輯視角上的第0維切分。
接下來, flow.gather 操作根據(jù)本設(shè)備上的 wide_embedding_table 及 wide_sparse_fields 獲得 wide_embedding,OneFlow 可以自動推導(dǎo)出 flow.gather 的返回結(jié)果(wide_embedding)的 SBP 屬性為 PartialSum。此外, flow.reshape 和 flow.math.reduce_sum 均支持 PartialSum -> PartialSum 的數(shù)據(jù)處理,OneFlow 可以自動推導(dǎo)出wide_scores的SBP屬性保持為PartialSum。
最后,為了支持后續(xù)的數(shù)據(jù)并行,因此我們通過插入parallel_cast,將 wide_scores 轉(zhuǎn)變?yōu)?Split(0)。
Wide 部分的通信量分析:假設(shè)卡數(shù)為 P,傳輸?shù)臄?shù)據(jù)塊大小 D 為 batch_size * 1,那么每次迭代訓(xùn)練需要的通信量由以下兩部分組成:
前向計算時,前向數(shù)據(jù)由 PartialSum 切分變成 Split(0) 切分,執(zhí)行 ReduceScatter 操作,通信量為: D * (P - 1)。反向計算時,梯度由 Split(0) 切分轉(zhuǎn)換為 Broadcast,執(zhí)行 AllGather 操作,通信量為: D * (P - 1)。
綜合以上兩部分,總通信量為 2 * D * (P - 1) = 2 * batch_size * (P - 1)。
(3) Deep 部分
Deep 部分 deep_embedding_table 的形狀為 (deep_vocab_size, deep_embedding_vec_size),本方案中,將 deep_embedding_table 按照第1維切分,每個GPU設(shè)備上的模型大小為 (deep_vocab_size, deep_embedding_vec_size / P) ,其SBP 屬性為 Split(1)。

上圖顯示了在 4 個計算設(shè)備上,各個設(shè)備上的 deep_sparse_fields 與部分 deep_embedding_table 通過 gather 操作得到部分deep_embedding的示意圖。注意到,由于deep_embedding_table是 Split(1),該操作后得到的deep_embedding的 SBP 屬性為 Split(2)。
Deep 部分的實現(xiàn)代碼:
#?配置deep_embedding_table的SBP屬性為Split(1)
deep_embedding_table?=?flow.get_variable(
????????name='deep_embedding',
????????shape=(FLAGS.deep_vocab_size,?deep_embedding_vec_size),
????????distribute=flow.distribute.split(1),
)
#?deep部分的embedding操作
#?gather:?Split(1)?+?Broadcast?->?Split(2)
deep_embedding?=?flow.gather(
????????params=deep_embedding_table,?indices=deep_sparse_fields
)
#?將deep_embedding的SBP屬性轉(zhuǎn)換為Split(0)
deep_embedding?=?flow.parallel_cast(
????????deep_embedding,?distribute=flow.distribute.split(0),?gradient_distribute=flow.distribute.split(2)
)
代碼說明:首先,通過指定 flow.get_variable 的 distribute 參數(shù)為 flow.distribute.split(1) 將 deep_embedding_table 的屬性設(shè)置為 Split(1),這樣,邏輯視角上的模型按第1維切分至每個 GPU 設(shè)備。
接下來, deep_embedding_table 與 deep_sparse_fields通過 flow.gather操作,生成deep_embedding。deep_embedding_table 與 deep_sparse_fields 的 SBP 屬性分別為 Split(1) 和 Broadcast,OneFlow 會自動推導(dǎo)出其結(jié)果 deep_embedding 的 SBP 為 Split(2)。
最后,為了支持后續(xù)的數(shù)據(jù)并行,通過調(diào)用 flow.parallel_cast 將將 deep_embedding 的 SBP 屬性由 Split(2) 轉(zhuǎn)為 Split(0)。在這一步,OneFlow 會自動將各個 GPU 設(shè)備上原本按照 Split(2) 切分的數(shù)據(jù),轉(zhuǎn)為按照 Split(0) 切分并分發(fā),實現(xiàn)模型并行到數(shù)據(jù)并行的轉(zhuǎn)換。
Deep 部分的通信量分析:假設(shè) GPU 設(shè)備數(shù)為 P,此時傳輸?shù)臄?shù)據(jù)塊大小 D = batch_size * num_deep_sparse_fields * deep_embedding_vec_size,每次迭代訓(xùn)練的通信量由以下兩部分組成:
前向計算時,前向數(shù)據(jù)由 Split(2)切分變成 Split(0) 切分,執(zhí)行 All2All 操作,其通信量為: D * (P - 1) / P。反向計算時,梯度由 Split(0) 切分變成 Split(2) 切分,執(zhí)行 All2All 操作,其通信量為: D * (P - 1) / P。
綜合以上兩部分,總通信量為2 * D * (P - 1) / P = 2 * deep_embedding_vec_size * batch_size * num_deep_sparse_fields * (P - 1) / P。
3.2.2 方案二
| 并行方案 | sparse ID (wide_sparse_fields deep_sparse_fields) | wide_embedding_table | deep_embedding_table |
|---|---|---|---|
| 方案二 | Broadcast | Split(0) | Split(0) |
當(dāng) deep_embedding_table 的第1維無法被計算設(shè)備的個數(shù)整除時,我們可以選擇本方案,將deep_embedding_table 進行 Split(0) 切分。
在本方案中,sparse ID 廣播問題與 Wide 部分的實現(xiàn)均與方案一相同,所以在此我們僅給出 Deep 部分的實現(xiàn)代碼并進行通信量分析。
Deep 部分的實現(xiàn)代碼:
#?配置deep_embedding_table的SBP屬性為Split(0)
deep_embedding_table?=?flow.get_variable(
????????name='deep_embedding',
????????shape=(FLAGS.deep_vocab_size,?deep_embedding_vec_size),
????????distribute=flow.distribute.split(0),
)
#?deep部分的embedding操作
#?gather:?Split(0)?+?Broadcast?->?PartialSum
deep_embedding?=?flow.gather(
????????params=deep_embedding_table,?indices=deep_sparse_fields
)
#?將deep_embedding的SBP屬性轉(zhuǎn)換為Split(0)
deep_embedding?=?flow.parallel_cast(
????????deep_embedding,?distribute=flow.distribute.split(0),?gradient_distribute=flow.distribute.broadcast()
)
代碼說明:首先,通過指定 flow.get_variable 的 distribute 參數(shù),將 wide_embedding_table 的 SBP 屬性設(shè)置為 Split(0),這樣每個 GPU 設(shè)備上的模型是邏輯視角上的第0維切分。
接下來, flow.gather 操作根據(jù)本設(shè)備上的 deep_embedding_table 及 deep_sparse_fields 獲得 deep_embedding,OneFlow 可以自動推導(dǎo)出 flow.gather 的返回結(jié)果(deep_embedding)的 SBP 屬性為 PartialSum。
最后,為了支持后續(xù)的數(shù)據(jù)并行,我們通過插入parallel_cast,將 deep_embedding 轉(zhuǎn)變?yōu)?Split(0)。
Deep 部分的通信量分析:在方案二中,Deep 部分也按照 Split(0) 切分,通信量分析的方法與 Wide 部分相同,通信量為:2 * D * (P-1)。
回顧前面的方案一, Deep 部分是按 Split(1) 進行切分,通信量為 2 * D * (P-1) / P ,顯然,按照 Split(1) 切分通信量更少。
3.2.3 方案三
| 并行方案 | sparse ID (wide_sparse_fields deep_sparse_fields) | wide_embedding_table | deep_embedding_table |
|---|---|---|---|
| 方案三 | Split(0) | Split(0) | Split(0) |
本方案是方案二的備選方案,在 sparse ID 大量重復(fù)的極端前提下,方案三會更具有優(yōu)勢。
在方案二的基礎(chǔ)上,若保持每個設(shè)備上的 sparse ID 按 Split(0) 的方式均分到各個設(shè)備上,就得到了本方案,那么每個設(shè)備上有(batch_size / P, num_sparse_fields) 大小的數(shù)據(jù)。

注意到,因為我們對 embedding_table 進行了 Split(0) 切分,所以在某個特定計算設(shè)備上進行查表操作(即 gather 操作)時,可能會出現(xiàn)本設(shè)備上的某個 sparse ID 對應(yīng)的 embedding_table 行不在本設(shè)備上的情況,這就需要額外增加一些操作,具體描述如下。
上圖展示了在計算設(shè)備 GPU0 上進行一次訓(xùn)練迭代的操作流程:
1.先將本設(shè)備的部分 sparse_ids 進行去重(Unique)。2.如果去重后的 sparse_ids 對應(yīng)的模型在本設(shè)備上已經(jīng)存在,則直接完成查表操作;否則,將 sparse_ids 發(fā)送到其對應(yīng)模型所在的設(shè)備上,完成查表操作,該操作通過 OneFlow 中的 distribute_gather 完成。3.從對應(yīng)設(shè)備上拉取對應(yīng)的 embedding_table 行,更新到本設(shè)備(Pull Model)。4.在反向計算時,將梯度回傳到 sparse_ids 對應(yīng)模型所在的卡上(Push Grad)。
代碼:
# wide部分:
wide_embedding_table?=?flow.get_variable(
????????name='wide_embedding',
????????shape=(FLAGS.wide_vocab_size,?1),
????????initializer=flow.random_uniform_initializer(minval=-0.05,?maxval=0.05),
????????distribute=flow.distribute.split(0),
?)
wide_embedding?=?flow.distribute_gather(params=wide_embedding_table,?indices=wide_sparse_fields)
wide_embedding?=?flow.reshape(wide_embedding,?shape=(-1,?wide_embedding.shape[-1]?*?wide_embedding.shape[-2]))
wide_scores?=?flow.math.reduce_sum(wide_embedding,?axis=[1],?keepdims=True)
?????????????????????????????????
# deep部分:
deep_embedding_table?=?flow.get_variable(
????????name='deep_embedding',
????????shape=(FLAGS.deep_vocab_size,?FLAGS.deep_embedding_vec_size),
????????initializer=flow.random_uniform_initializer(minval=-0.05,?maxval=0.05),
????????distribute=flow.distribute.split(1),
)
deep_embedding?=?flow.distribute_gather(params=deep_embedding_table,?indices=deep_sparse_fields)
deep_embedding?=?flow.reshape(deep_embedding,?shape=(-1,?deep_embedding.shape[-1]?*?deep_embedding.shape[-2]))
deep_features?=?flow.concat([deep_embedding,?dense_fields],?axis=1)?
注:flow.distribute_gather? 為 OneFlow 的新特性,目前(2021.2.5)還未合并到 OneFlow 倉庫[12] 的主分支中,想快速嘗試使用該新特性的讀者,可以查看這個 PR[13]。
通信量分析:
在每次訓(xùn)練過程中,此方案的通信量由以下幾個部分組成:
從CPU將(batch_size / P, num_sparse_fields) 大小的數(shù)據(jù)分別拷貝到每個 GPU 上,那么總通信量為: batch_size * num_sparse_fields。各個 GPU 將本卡上的 sparse id 去重后傳給對應(yīng)模型所在的GPU,若對應(yīng)的模型屬于本卡,則無需通信。最壞的情況下,需要把所有sparse id 傳到其他卡,此時總通信量為: batch_size * num_sparse_fields。從對應(yīng)的GPU Pull需要的模型,最壞情況下,sparse id 對應(yīng)的所有模型都在其他GPU上,則每個 GPU 需要 Pull 的模型大小為 (batch_size / P , num_sparse_fields , embedding_vec_size),總通信量為: batch_size * num_sparse_fields * embedding_vec_size。將計算得到的梯度 Push 到模型所在的GPU,最壞情況下,sparse id 對應(yīng)的所有模型都在其他GPU上,每個 GPU 需要 Push 的梯度大小為( batch_size / P , num_sparse_fields , embedding_vec_size),總通信量為: batch_size * num_sparse_fields * embedding_vec_size。
將以上幾個部分相加,得到每次訓(xùn)練的總通信量為:2 * batch_size * num_sparse_fields + 2 * batch_size * num_sparse_fields ?* embedding_vec_size。
3.2.4 三種并行方案的比較
(1) 方案一 vs. 方案二方案一與方案二的區(qū)別在于deep部分的 deep_embedding_table?切分方式不同,導(dǎo)致deep部分的通信量不同:
deep_embedding_table | 通信量 | |
|---|---|---|
| 方案一 | Split(1)切分 | 2*D*(P-1)/P |
| 方案二 | Split(0)切分 | 2*D*(P-1) |
其中,
P代表設(shè)備數(shù)。 D代表deep部分傳輸?shù)臄?shù)據(jù)塊大小, D = batch_size * num_deep_sparse_fields * deep_embedding_vec_size。
顯然,deep部分按照 Split(1) 切分時通信量更少,因此建議使用 Split(1) 切分。只有當(dāng)deep_embedding_table的第1維無法被計算設(shè)備(比如GPU)的個數(shù)整除時,才建議使用Split(0)切分。
(2) 方案二 vs. 方案三相比于方案二,方案三省略了對 sparse ID 的廣播操作,從單純的模型并行變成混合并行(既有數(shù)據(jù)并行,也有模型并行)。
方案三增加了對 sparse ID 的 Unique 操作,增加了一定的計算開銷,但是也正是因為去除了一些重復(fù)的 sparse ID,相應(yīng)的減少了一些通信開銷。兩種方案的通信量總結(jié)如下:
| 方案二 | 方案三 最壞情況 | |
|---|---|---|
| sparse ID | batch_size * num_sparse_fields *P | batch_size * num_sparse_fields |
| Wide部分 | 2 *D1 * (P - 1) | 2 * D1 |
| Deep部分 | 2 * D2 * (P-1)/P | 2 * D2 |
其中,
P 代表設(shè)備數(shù)。 D1 代表 Wide 部分傳輸?shù)臄?shù)據(jù)塊大小, D1 = batch_size * num_wide_sparse_fields * wide_embedding_vec_size。D2 代表 Wide 部分傳輸?shù)臄?shù)據(jù)塊大小, D2 = batch_size * num_deep_sparse_fields * deep_embedding_vec_size。
方案二與方案三最壞情況的通信量相比,deep部分兩種方案的通信量相差不大,sparse ID 和 Wide 部分的通信量方案三較少。
那么,能得出方案三在所有情況下都比方案二有優(yōu)勢的結(jié)論嗎?答案是否定的,原因如下:
1)注意到,方案二利用了 OneFlow 的集合通信操作進行數(shù)據(jù)傳輸,可以充分利用帶寬,通信傳輸效率比方案三的 Push、Pull 操作高很多,在通信量差距不大時,方案二往往更有優(yōu)勢。
2)方案三有對 Sparse ID 的去重操作,因此,在某些極端情況下(當(dāng)有大量sparse ID 重復(fù)時,方案三的通信量會進一步減少),方案三才會更有優(yōu)勢。
四、結(jié)語
本文以 WDL 為例,展示了如何憑借 OneFlow 獨創(chuàng)的 SBP 概念,輕松設(shè)計并實現(xiàn)不同的并行方案。除了本文介紹的高效的 WDL 訓(xùn)練模型外,OneFlow 團隊也正在開發(fā)和籌劃為大型推薦系統(tǒng)適配更多的功能,包括可將模型高頻部分切分到 GPU 上,將低頻部分切分到 CPU 上;更豐富的模型庫、高效方便的 Serving 系統(tǒng)、動態(tài)縮放的詞表、在線學(xué)習(xí)等。
適配更多種類硬件設(shè)備、全新的1.0 interface 的 OneFlow 版本也即將來襲。敬請期待:https://github.com/Oneflow-Inc/oneflow 。
參考資料
CTR Prediction 問題的介紹,參考了這篇文章:《CTR 預(yù)估[一]: Problem Description and Main Solution》[14] 詞向量和 Embedding 的解釋,參考了這篇文章:《詞向量與 Embedding 究竟是怎么回事?》[15]
文中注釋對照
HugeCTR: https://github.com/NVIDIA/HugeCTR
[2]MLPerf : https://github.com/mlperf/training_results_v0.7
[3]英偉達 Blog: https://developer.nvidia.com/blog/introducing-merlin-hugectr-training-framework-dedicated-to-recommender-systems/
[4]DLPerf: https://github.com/Oneflow-Inc/DLPerf#wide-and-deep-learning
[5]DLPerf : https://github.com/Oneflow-Inc/DLPerf/tree/master/reports
[6]WDL論文: https://arxiv.org/pdf/1606.07792.pdf%29/
[7]gather: https://oneflow.readthedocs.io/en/master/oneflow.html?#oneflow.gather
[8]OneFlow-Benchmark: https://github.com/Oneflow-Inc/OneFlow-Benchmark/tree/master/ClickThroughRate/WideDeepLearning
[9]HugeCTR: https://github.com/NVIDIA/HugeCTR
[10]OneFlow-WDL: https://github.com/Oneflow-Inc/OneFlow-Benchmark/tree/74caa439bbe0fb2368878d4fb771525b70831d27/ClickThroughRate/WideDeepLearning
[11]OneFlow-WDL 的 Criteo 數(shù)據(jù)集預(yù)處理: https://github.com/Oneflow-Inc/OneFlow-Benchmark/blob/master/ClickThroughRate/WideDeepLearning/how_to_make_ofrecord_for_wdl.md
[12]OneFlow 倉庫: https://github.com/Oneflow-Inc/oneflow
[13]PR: https://github.com/Oneflow-Inc/oneflow/pull/3931
[14]《CTR 預(yù)估[一]: Problem Description and Main Solution》: https://zhuanlan.zhihu.com/p/31499375
[15]《詞向量與 Embedding 究竟是怎么回事?》: https://spaces.ac.cn/archives/4122
