【深度學(xué)習(xí)】聊一聊深度學(xué)習(xí)分布式訓(xùn)練
作者?|?楊陽?
整理?|?NewBeeNLP
https://zhuanlan.zhihu.com/p/365662727
在深度學(xué)習(xí)時(shí)代,訓(xùn)練數(shù)據(jù)特別大的時(shí)候想要單卡完成訓(xùn)練基本是不可能的。所以就需要進(jìn)行分布式深度學(xué)習(xí)。在此總結(jié)下個(gè)人近期的研究成果,歡迎大佬指正。
主要從以下幾個(gè)方面進(jìn)行總結(jié):
分布式訓(xùn)練的基本原理 TensorFlow的分布式訓(xùn)練 PyTorch的分布式訓(xùn)練框架 Horovod分布式訓(xùn)練
1、分布式訓(xùn)練的基本原理
無論哪種機(jī)器學(xué)習(xí)框架,分布式訓(xùn)練的基本原理都是相同的。本文主要從 并行模式、架構(gòu)模式、同步范式、物理架構(gòu)、通信技術(shù) 等五個(gè)不同的角度來分類。
1.1 并行模式
分布式訓(xùn)練的目的在于將原本巨大的訓(xùn)練任務(wù)拆解開撐多個(gè)子任務(wù),每個(gè)子任務(wù)在獨(dú)立的機(jī)器上單獨(dú)執(zhí)行。大規(guī)模深度學(xué)習(xí)任務(wù)的難點(diǎn)在于:
訓(xùn)練數(shù)據(jù)巨大:這種情況我們需要將數(shù)據(jù)拆解成多個(gè)小模型分布到不同的node上 訓(xùn)練模型的參數(shù)巨大(NLP的預(yù)訓(xùn)練模型實(shí)在太大了):這種情況我們需要將數(shù)據(jù)集拆解分布到不同的node上。
前者我們稱之為數(shù)據(jù)并行,后者我們稱之為模型并行。
1.1.1 數(shù)據(jù)并行
數(shù)據(jù)并行相對(duì)簡單, N個(gè)node(也稱為worker)構(gòu)成一個(gè)分布式集群,每個(gè)worker處理1/N的數(shù)據(jù)。理論情況下能達(dá)到線性的加速效果。TF、torch、Horovod都可以在原生支持或者微小的改動(dòng)實(shí)現(xiàn)數(shù)據(jù)并行模式。
數(shù)據(jù)并行是在每個(gè)worker上存儲(chǔ)一個(gè)模型的備份,在各個(gè)worker 上處理不同的數(shù)據(jù)子集。然后需要規(guī)約(reduce)每個(gè)worker的結(jié)果,在各節(jié)點(diǎn)之間同步模型參數(shù)。這一步會(huì)成為數(shù)據(jù)并行的瓶頸,因?yàn)槿绻鹷orker很多的情況下,worker之間的數(shù)據(jù)傳輸會(huì)有很大的時(shí)間成本。參數(shù)同步后,需要采用不同的方法進(jìn)行參數(shù)更新:
參數(shù)平均法 更新式方法
參數(shù)平均法是最簡單的一種數(shù)據(jù)平均化。若采用參數(shù)平均法,訓(xùn)練的過程如下所示:基于模型的配置隨機(jī)初始化網(wǎng)絡(luò)模型參數(shù)
將當(dāng)前這組參數(shù)分發(fā)到各個(gè)工作節(jié)點(diǎn) 在每個(gè)工作節(jié)點(diǎn),用數(shù)據(jù)集的一部分?jǐn)?shù)據(jù)進(jìn)行訓(xùn)練 將各個(gè)工作節(jié)點(diǎn)的參數(shù)的均值作為全局參數(shù)值 若還有訓(xùn)練數(shù)據(jù)沒有參與訓(xùn)練,則繼續(xù)從第二步開始
更新式方法 與參數(shù)平均化類似,主要區(qū)別在于,在參數(shù)服務(wù)器和工作服務(wù)器之間傳遞參數(shù)時(shí),更新式方法只傳遞更新信息(梯度和張量)。
1.1.2 模型并行
模型并行 相對(duì)復(fù)雜,原理是分布式系統(tǒng)中的不同worker負(fù)責(zé)網(wǎng)絡(luò)模型的不同部分。
例如說,神經(jīng)網(wǎng)絡(luò)的不同層被分布到不同worker或者同一層的不同參數(shù)被分配到不同worker上。對(duì)于TF這種框架,可以拆分計(jì)算圖成多個(gè)最小依賴子圖到不同的worker上。同時(shí)在多個(gè)子圖之間通過通信算子來實(shí)現(xiàn)模型并行。但是這種實(shí)驗(yàn) 起來比較復(fù)雜。工業(yè)界還是以數(shù)據(jù)并行為主。
Model Parallel主要分兩種:intra-layer拆分 和inter-layer拆分
intranet-layer拆分 :深度學(xué)習(xí)的網(wǎng)絡(luò)結(jié)構(gòu)基本都是一層一層的。常規(guī)的卷積、池化、BN等等。如果對(duì)某一層進(jìn)行了拆分,那么就是intra-layer拆分。對(duì)單層的拆分其實(shí)就是拆分這一層的matrix運(yùn)算。參考論文: Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism
如上圖,這兩層的運(yùn)算是 , ,matrix運(yùn)算有一個(gè)重要的性質(zhì)是矩陣運(yùn)算可以分塊運(yùn)算。因此如上可以拆分成:

因此拆分為一個(gè)worker計(jì)算 ,一個(gè)worker計(jì)算 ,最后再累加兩個(gè)worker的結(jié)果。這在一定程度上減少了模型對(duì)計(jì)算資源的需求。
inter-layer拆分 :這中更好理解,對(duì)模型做網(wǎng)絡(luò)上的拆分。將每一層或者某幾層放在一個(gè)worker上單獨(dú)訓(xùn)練。這種拆分的問題在于,模型訓(xùn)練是串行的,整個(gè)模型的效率取決于最慢的那一層,存在資源浪費(fèi)。參考論文: PipeDream: Fast and Efficient Pipeline Parallel DNN Training

但是隨著訓(xùn)練設(shè)備的增加,多個(gè)worker之間的通信成本增加,模型Reduce的成本也越來越大,數(shù)據(jù)并行的瓶頸也隨之出現(xiàn)。故有學(xué)者提出混合并行(數(shù)據(jù)并行+模型并行)。本人對(duì)此暫無研究,感興趣可自行摸索,參考此鏈接[1]
強(qiáng)推這篇paper,DP(Data Parallel)、MP(MOdel Parallel)、PP(Pipeline Parallel)各個(gè)方面講的很透徹: ZeRO: Memory Optimizations Toward Training Trillion Parameter Models
1.2 架構(gòu)模式
分布式訓(xùn)練上會(huì)頻繁的應(yīng)用到規(guī)約(AllReduce)操作。主流的分布式架構(gòu)主要分為 參數(shù)服務(wù)器(ParameterServer) 和 基于規(guī)約(Reduce) 兩種模式。早期還有基于MPI的方式,不過現(xiàn)在已經(jīng)很少用了。
ParameterServer模式是一種基于reduce和broadcat算法的經(jīng)典架構(gòu)。其中一個(gè)/一組機(jī)器作為PS架構(gòu)的中心節(jié)點(diǎn),用來存儲(chǔ)參數(shù)和梯度。在更新梯度的時(shí)候,先全局reduce接受其他worker節(jié)點(diǎn)的數(shù)據(jù),經(jīng)過本地計(jì)算后(比如參數(shù)平均法),再broadcast回所有其他worker。PS架構(gòu)的問題在于多個(gè)worker與ps通信,PS本身可能存在瓶頸。隨著worker數(shù)量的增加,整體通信量也線性增加,加速比也可能停滯在某個(gè)點(diǎn)位上。

基于規(guī)約的模式解決了上述的問題,最典型的是百度提出的Ring-AllRuduce。多個(gè)Worker節(jié)點(diǎn)連接成一個(gè)環(huán),每個(gè)Worker依次把自己的梯度同步給下一個(gè)Worker,經(jīng)過至多2*(N-1)輪同步,就可以完成所有Worker的梯度更新。這種方式下所有節(jié)點(diǎn)的地位是平等的,因此不存在某個(gè)節(jié)點(diǎn)的負(fù)載瓶頸,隨著Worker的增加,整體的通信量并不隨著增加。加速比幾乎可以跟機(jī)器數(shù)量成線性關(guān)系且不存在明顯瓶頸。目前,越來越多的分布式訓(xùn)練采用Reduce這種模式。Horovod中主要就是用的這種分布式架構(gòu)。
更多關(guān)于reduce的算法[2]可參照進(jìn)一步學(xué)習(xí)
1.3 同步范式
在實(shí)際的訓(xùn)練過程中可能各種問題,比如:部分節(jié)點(diǎn)資源受限、卡頓、網(wǎng)絡(luò)延時(shí)等等,因此再梯度同步時(shí)就存在“木桶”效應(yīng),即集群中的某些worker比其他worker更慢,導(dǎo)致整個(gè)訓(xùn)練pipeline需要等待慢的worker,整個(gè)集群的訓(xùn)練速度受限于最慢機(jī)器的速度。
因此梯度的同步有 同步(sync) 、 異步(Async) 和 混合 三種范式。
同步范式就是上述提到的,只有所有worker完成當(dāng)前的計(jì)算任務(wù),整個(gè)集群才會(huì)開始下一次迭代。(TF中同步范式使用SyncReplicasOptimizer優(yōu)化器)
異步模式剛好相反,每個(gè)worker只關(guān)心知己的進(jìn)程,完成計(jì)算后就嘗試更新,能與其他多個(gè)worker同步梯度完成取決于各worker當(dāng)前時(shí)刻的狀態(tài)。其過程不可控,有可能出現(xiàn)模型正確性問題。(可在訓(xùn)練時(shí)logging對(duì)比)
混合范式結(jié)合以上兩種情況,各個(gè)worker都會(huì)等待其他worker的完成,但不是永久等待,有timeout的機(jī)制。如果超時(shí)了,則此情況下相當(dāng)于異步機(jī)制。并且沒來得及完成計(jì)算的worker,其梯度則被標(biāo)記為“stale”而拋棄或另做處理。
1.4 物理架構(gòu)
物理架構(gòu)主要是“GPU”架構(gòu),就是常說的(單機(jī)單卡、單機(jī)多卡、多機(jī)單卡、多機(jī)多卡)
單機(jī)單卡:常規(guī)操作
單機(jī)多卡:利用一臺(tái)GPU上的多塊GPU進(jìn)行分布式訓(xùn)練。數(shù)據(jù)并行和模型并行皆可。整個(gè)訓(xùn)練過程一般只有一個(gè)進(jìn)程,多GPU之間的通信通過多線程的方式,模型參數(shù)和梯度在進(jìn)程內(nèi)是共享的(基于NCCL的可能不大一樣)。這種情況下基于Reduce的架構(gòu)比PS架構(gòu)更合適一些,因?yàn)椴恍枰粋€(gè)顯式的PS,通過進(jìn)程內(nèi)的Reduce即可完成梯度同步。
多機(jī)單卡:操作上與多機(jī)多卡基本一致
多機(jī)多卡:多機(jī)多卡是最典型的分布式架構(gòu),所以它需要較好的進(jìn)程間的通訊機(jī)制(多worker之間的通信)。
1.5 通信技術(shù)
分布式條件下的多進(jìn)程、多worker之間的通信技術(shù),常見的主要有:MPI、NCCL,GRPC等。
MPI主要是被應(yīng)用在超算等大規(guī)模計(jì)算領(lǐng)域,機(jī)器學(xué)習(xí)場(chǎng)景下使用較少。主要是openMPI原語等。
NCCL是NVIDIA針對(duì)GPU設(shè)計(jì)的一種規(guī)約庫,可以實(shí)現(xiàn)多GPU間的直接數(shù)據(jù)同步,避免內(nèi)存和顯存的,CPU和GPU間的數(shù)據(jù)拷貝成本。當(dāng)在TensorFlow中選擇單機(jī)多卡訓(xùn)練時(shí),其默認(rèn)采用的就是NCCL方式來通信。
GRPC是比較成熟的通信技術(shù)了,spark等框架內(nèi)也都有用到。
這一部分暫無研究,有興趣的大佬自行學(xué)習(xí)。
OK,講完了理論部分,那就開始實(shí)踐吧。
2、TensorFlow的分布式訓(xùn)練
TensorFlow主要的分布式訓(xùn)練的方法有三種:
Customer Train Loop Estimator + Strategy Keras + Strategy
在實(shí)際的開發(fā)工作中,分布式的工作最好是交給框架,而工程師本身只需要關(guān)注任務(wù)模型的pipeline就行了。最經(jīng)典的是Spark框架,工程師只需要關(guān)注數(shù)據(jù)處理的workflow,分布式的大部分工作都交給框架。深度學(xué)習(xí)的開發(fā)同樣如此。
第一種方式太過原生,整個(gè)分布式的訓(xùn)練過程完全交給工程師來處理,代碼模塊比較復(fù)雜,這里不做贅述。
第二種方式,Estimator是TF的一個(gè)高級(jí)API,在分布式場(chǎng)景下,其最大的特點(diǎn)是單機(jī)和分布式代碼一致,且不需要考慮底層的硬件設(shè)施。在這里不多做介紹。Strategy是tensorflow根據(jù)分布式訓(xùn)練的復(fù)雜性,抽象出的多種分布式訓(xùn)練策略。TF1.x和TF2.x接口變化較大,不同版本名字可能不一樣,以實(shí)際使用版本為準(zhǔn)。用的比較多的是:
MirroredStrategy:適用于單機(jī)多卡、數(shù)據(jù)并行、同步更新的分布式訓(xùn)練,采用Reduce的更新范式,worker之間采用NCCL進(jìn)行通信。 MultiWorkerMirroredStrategy:與上面的類似,不同的是這種策略支持多機(jī)多卡、數(shù)據(jù)并行、同步更新的分布式策略、Reduce范式。在TF 1.15版本里,這個(gè)策略叫CollectiveAllReduceStrategy。 ParameterServerStrategy:經(jīng)典的PS架構(gòu),多機(jī)多卡、數(shù)據(jù)并行、同步/異步更新
使用Estimator+Strategy 實(shí)現(xiàn)分布式訓(xùn)練[3],參考代碼
第三種方式 Keras + Strategy[4] 是Tensorflow最新官方推薦的方案。主要是利用keras的高級(jí)API,配合Strategy實(shí)現(xiàn)多模式的分布式訓(xùn)練。
后兩種方法都需要傳入TF_CONFIG參數(shù),沒有就是單機(jī)的訓(xùn)練方式。Strategy會(huì)自動(dòng)讀取環(huán)境變量并應(yīng)用相關(guān)信息。TF_CONFIG的配置如下:

執(zhí)行腳本示例:
#?分別在各個(gè)worker上執(zhí)行對(duì)應(yīng)的腳本
TF_CONFIG='{"cluster":{"worker":["172.26.0.124:9920","172.26.0.126:9920","172.26.0.127:9920"]},"task":{"index":0,"type":"worker"}}'?python?multi_worker_with_estimator.py
TF_CONFIG='{"cluster":{"worker":["172.26.0.124:9920","172.26.0.126:9920","172.26.0.127:9920"]},"task":{"index":1,"type":"worker"}}'?python?multi_worker_with_estimator.py
TF_CONFIG='{"cluster":{"worker":["172.26.0.124:9920","172.26.0.126:9920","172.26.0.127:9920"]},"task":{"index":2,"type":"worker"}}'?python?multi_worker_with_estimator.py
3、Pytorch的分布式訓(xùn)練
相對(duì)Tensorflow,Pytorch簡單的多。分布式訓(xùn)練主要有兩個(gè)API:
DataParallel(DP):PS模式,會(huì)有一張卡為reduce(parame server),實(shí)現(xiàn)簡單,就一行代碼 DistributedDataParallel(DDP):All-Reduce模式,單機(jī)多卡/多級(jí)多卡皆可。官方建議API
1、DP:會(huì)將數(shù)據(jù)分割到多個(gè)GPU上。這是數(shù)據(jù)并行的典型,需要將模型復(fù)制到每個(gè)GPU上,并且一但GPU0計(jì)算出梯度,則需要同步梯度,這需要大量的GPU數(shù)據(jù)傳輸(類似PS模式);2、DDP:在每個(gè)GPU的進(jìn)程中創(chuàng)建模型副本,并只讓數(shù)據(jù)的一部分對(duì)改GPU可用。因?yàn)槊總€(gè)GPU中的模型是獨(dú)立運(yùn)行的,所以在所有的模型都計(jì)算出梯度后,才會(huì)在模型之間同步梯度(類似All-reduce)。DDP每個(gè)batch只需要一次數(shù)據(jù)傳輸;而DP可能存在多次數(shù)據(jù)同步(不用worker之間可能快慢不一樣)。
3.1、DataParallel
import?torch
import?torch.nn?as?nn
from?torch.autograd?import?Variable
from?torch.utils.data?import?Dataset,?DataLoader
import?os
input_size?=?5
output_size?=?2
batch_size?=?30
data_size?=?30
class?RandomDataset(Dataset):
????def?__init__(self,?size,?length):
????????self.len?=?length
????????self.data?=?torch.randn(length,?size)
????????def?__getitem__(self,?index):
????????return?self.data[index]
????def?__len__(self):
????????return?self.len
rand_loader?=?DataLoader(dataset=RandomDataset(input_size,?data_size),
?????????????????????????batch_size=batch_size,?shuffle=True)
class?Model(nn.Module):
????#?Our?model
????def?__init__(self,?input_size,?output_size):
????????super(Model,?self).__init__()
????????self.fc?=?nn.Linear(input_size,?output_size)
????def?forward(self,?input):
????????output?=?self.fc(input)
????????print("??In?Model:?input?size",?input.size(),
????????"output?size",?output.size())
????????return?output
model?=?Model(input_size,?output_size)
if?torch.cuda.is_available():
????model.cuda()
if?torch.cuda.device_count()?>?1:
????print("Let's?use",?torch.cuda.device_count(),?"GPUs!")
????#?就這一行!!!!
????model?=?nn.DataParallel(model)
for?data?in?rand_loader:
????if?torch.cuda.is_available():
????????input_var?=?Variable(data.cuda())
????else:
????????input_var?=?Variable(data)
????output?=?model(input_var)
????print("Outside:?input?size",?input_var.size(),?"output_size",?output.size())
3.2、DDP
官方建議使用DDP,采用All-Reduce架構(gòu),單機(jī)多卡、多機(jī)多卡都能用。
需要注意的是:DDP并不會(huì)自動(dòng)shard數(shù)據(jù)
如果自己寫數(shù)據(jù)流,得根據(jù)torch.distributed.get_rank()去shard數(shù)據(jù),獲取自己應(yīng)用的一份
如果用Dataset API,則需要在定義Dataloader的時(shí)候用 DistributedSampler去shard
sampler?=?DistributedSampler(dataset)?#?這個(gè)sampler會(huì)自動(dòng)分配數(shù)據(jù)到各個(gè)gpu上
DataLoader(dataset,?batch_size=batch_size,?sampler=sampler)
完整代碼如下:
import?torch
import?torch.nn?as?nn
from?torch.autograd?import?Variable
from?torch.utils.data?import?Dataset,?DataLoader
import?os
from?torch.utils.data.distributed?import?DistributedSampler
#?1)?初始化
torch.distributed.init_process_group(backend="nccl")
input_size?=?5
output_size?=?2
batch_size?=?30
data_size?=?90
#?2)?配置每個(gè)進(jìn)程的gpu
local_rank?=?torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device?=?torch.device("cuda",?local_rank)
class?RandomDataset(Dataset):
????def?__init__(self,?size,?length):
????????self.len?=?length
????????self.data?=?torch.randn(length,?size).to('cuda')
????def?__getitem__(self,?index):
????????return?self.data[index]
????def?__len__(self):
????????return?self.len
dataset?=?RandomDataset(input_size,?data_size)
#?3)使用DistributedSampler
rand_loader?=?DataLoader(dataset=dataset,
?????????????????????????batch_size=batch_size,
?????????????????????????sampler=DistributedSampler(dataset))
class?Model(nn.Module):
????def?__init__(self,?input_size,?output_size):
????????super(Model,?self).__init__()
????????self.fc?=?nn.Linear(input_size,?output_size)
????def?forward(self,?input):
????????output?=?self.fc(input)
????????print("??In?Model:?input?size",?input.size(),
??????????????"output?size",?output.size())
????????return?output
model?=?Model(input_size,?output_size)
#?4)?封裝之前要把模型移到對(duì)應(yīng)的gpu
model.to(device)
if?torch.cuda.device_count()?>?1:
????print("Let's?use",?torch.cuda.device_count(),?"GPUs!")
????#?5)?封裝
????model?=?torch.nn.parallel.DistributedDataParallel(model,
??????????????????????????????????????????????????????device_ids=[local_rank],
??????????????????????????????????????????????????????output_device=local_rank)
for?data?in?rand_loader:
????if?torch.cuda.is_available():
????????input_var?=?data
????else:
????????input_var?=?data
????output?=?model(input_var)
????print("Outside:?input?size",?input_var.size(),?"output_size",?output.size())
執(zhí)行腳本:
CUDA_VISIBLE_DEVICES=0,1?python?-m?torch.distributed.launch?--nproc_per_node=2?torch_ddp.py
apex加速(混合精度訓(xùn)練、并行訓(xùn)練、同步BN)[5] 可參考:https://zhuanlan.zhihu.com/p/158375055
4、Horovod分布式訓(xùn)練
Horovod是Uber開源的跨平臺(tái)的分布式訓(xùn)練工具,名字來自于俄國傳統(tǒng)民間舞蹈,舞者手牽手圍成一個(gè)圈跳舞,與Horovod設(shè)備之間的通信模式很像,有以下幾個(gè)特點(diǎn):
兼容TensorFlow、Keras和PyTorch機(jī)器學(xué)習(xí)框架。 使用Ring-AllReduce算法,對(duì)比Parameter Server算法,有著無需等待,負(fù)載均衡的優(yōu)點(diǎn)。 實(shí)現(xiàn)簡單,五分鐘包教包會(huì)。
Horovod環(huán)境準(zhǔn)備以及示例代碼[6],可參考作者另一篇文章
本文參考資料
此鏈接: https://help.aliyun.com/document_detail/194800.html
[2]reduce的算法: https://zhuanlan.zhihu.com/p/79030485
[3]使用Estimator+Strategy 實(shí)現(xiàn)分布式訓(xùn)練: https://github.com/kubeflow/tf-operator/blob/master/examples/v1/distribution_strategy/estimator-API/keras_model_to_estimator.py
[4]Keras + Strategy: https://github.com/kubeflow/tf-operator/blob/master/examples/v1/distribution_strategy/keras-API/multi_worker_strategy-with-keras.py
[5]apex加速(混合精度訓(xùn)練、并行訓(xùn)練、同步BN): https://zhuanlan.zhihu.com/p/158375055
[6]Horovod環(huán)境準(zhǔn)備以及示例代碼: https://zhuanlan.zhihu.com/p/351693076
[7]分布式機(jī)器學(xué)習(xí)系統(tǒng)筆記: https://www.cnblogs.com/yihaha/p/7265280.html
[8]煉丹師的工程修養(yǎng)之四:TensorFlow的分布式訓(xùn)練和K8S: https://zhuanlan.zhihu.com/p/56699786
[9]分布式訓(xùn)練】單機(jī)多卡的正確打開方式(三):PyTorch: https://zhuanlan.zhihu.com/p/74792767
往期精彩回顧
適合初學(xué)者入門人工智能的路線及資料下載 (圖文+視頻)機(jī)器學(xué)習(xí)入門系列下載 中國大學(xué)慕課《機(jī)器學(xué)習(xí)》(黃海廣主講) 機(jī)器學(xué)習(xí)及深度學(xué)習(xí)筆記等資料打印 《統(tǒng)計(jì)學(xué)習(xí)方法》的代碼復(fù)現(xiàn)專輯 機(jī)器學(xué)習(xí)交流qq群955171419,加入微信群請(qǐng)掃碼
