GNN框架之大規(guī)模分布式訓(xùn)練!
引言
本文為GNN教程的DGL框架之大規(guī)模分布式訓(xùn)練,前面的文章中我們介紹了圖神經(jīng)網(wǎng)絡(luò)框架DGL如何利用采樣的技術(shù)縮小計算圖的規(guī)模來通過mini-batch的方式訓(xùn)練模型,當(dāng)圖特別大的時候,非常多的batches需要被計算,因此運算時間又成了問題,一個容易想到解決方案是采用并行計算的技術(shù),很多worker同時采樣,計算并且更新梯度。這篇博文重點介紹DGL的并行計算框架。
多進程方案
概括而言,目前DGL(version 0.3)采用的是多進程的并行方案,分布式的方案正在開發(fā)中。見下圖,DGL的并行計算框架分為兩個主要部分:Graph Store和Sampler
Sampler被用來從大圖中構(gòu)建許多計算子圖(NodeFlow),DGL能夠自動得在多個設(shè)備上并行運行多個Sampler的實例。Graph Store存儲了大圖的embedding信息和結(jié)構(gòu)信息,到目前為止,DGL提供了內(nèi)存共享式的Graph Store,以用來支持多進程,多GPU的并行訓(xùn)練。DGL未來還將提供分布式的Graph Store,以支持超大規(guī)模的圖訓(xùn)練。
下面來分別介紹它們。

Graph Store
graph store 包含兩個部分,server和client,其中server需要作為守護進程(daemon)在訓(xùn)練之前運行起來。比如如下腳本啟動了一個graph store server 和 4個worker,并且載入了reddit數(shù)據(jù)集:
python3 run_store_server.py --dataset reddit --num-workers 4
在訓(xùn)練過程中,這4個worker將會和client交互以取得訓(xùn)練樣本。用戶需要做的僅僅是編寫訓(xùn)練部分的代碼。首先需要創(chuàng)建一個client對象連接到對應(yīng)的server。下面的腳本中用shared_memory初始化store_type表明client連接的是一個內(nèi)存共享式的server。
g = dgl.contrib.graph_store.create_graph_from_store("reddit", store_type="shared_mem")
在采樣的博文中,我們已經(jīng)詳細(xì)介紹了如何通過采樣的技術(shù)來減小計算子圖的規(guī)模。回憶一下,圖模型的每一層進行了如下的計算:
control-variate sampling用如下的方法近似了:
除了進行這樣的近似,作者還采用了預(yù)處理的技巧了把采樣的層數(shù)減少了1。具體來說,GCN的輸入是的原始embedding,預(yù)處理之后GCN的輸入是,這種方式使得最早的一層無需進行鄰居embedding的融合計算(也就是無需采樣),因為左乘以鄰接矩陣已經(jīng)做了這樣的計算,因為,需要采樣的層數(shù)就減少了1。
對于一個大圖來說,和都可能很大。兩個矩陣的乘法就要通過分布式計算的方式完成,即每一個trainer(worker)負(fù)責(zé)計算一部分,然后聚合起來。DGL提供了update_all來進行這種計算:
g.update_all(fn.copy_src(src='features', out='m'),
fn.sum(msg='m', out='preprocess'),
lambda node : {'preprocess': node.data['preprocess'] * node.data['norm']})
初看這段代碼和矩陣計算沒有任何關(guān)系啊,其實這段代碼要從語義上理解,在語義上表示鄰接矩陣和特征矩陣的乘法,即對于每個節(jié)點的特征跟新為鄰居特征的和。那么再看上面這段代碼就容易了,copy_src將節(jié)點特征取出來,并發(fā)送出去, sum接受到來自鄰居的特征并求和,求和結(jié)果再發(fā)給節(jié)點,最后節(jié)點自身進行一下renormalize。
update_all在graph store中是分布式進行的,每個trainer都會分派到一部分節(jié)點進行更新。
節(jié)點和邊的數(shù)據(jù)現(xiàn)在全部存儲在graph store中,因此訪問他們不再像以前那樣用 g.ndata/g.edata那樣簡單,因為這兩個方法會讀取整個節(jié)點和邊的數(shù)據(jù),而這些數(shù)據(jù)在graph store中并不存在(他們可能是分開存儲的),因此用戶只能通過g.nodes[node_ids].data[embed_name]來訪問特定節(jié)點的Embedding數(shù)據(jù)。(注意:這種讀數(shù)據(jù)的方式是通用的,并不是graph store特有的,g.ndata即是g.nodes[:].data的縮寫)。
為了高效地初始化節(jié)點和邊tensor,DGL提供了init_ndata和init_edata這兩種方法。這兩種方法都會講初始化的命令發(fā)送到graph store server上,由server來代理初始化工作,下面展示了一個例子:
for i in range(n_layers):
g.init_ndata('h_{}'.format(i), (features.shape[0], args.n_hidden), 'float32')
g.init_ndata('agg_h_{}'.format(i), (features.shape[0], args.n_hidden), 'float32')
其中h_i存儲i層節(jié)點Embedding,agg_h_i存儲i節(jié)點鄰居Embedding的聚集后的結(jié)果。
初始化節(jié)點數(shù)據(jù)之后,我們可以通過control-variate sampling的方法來訓(xùn)練GCN),這個方法在之前的博文中介紹過
for nf in NeighborSampler(g, batch_size, num_neighbors,
neighbor_type='in', num_hops=L-1,
seed_nodes=labeled_nodes):
for i in range(nf.num_blocks):
# aggregate history on the original graph
g.pull(nf.layer_parent_nid(i+1),
fn.copy_src(src='h_{}'.format(i), out='m'),
lambda node: {'agg_h_{}'.format(i): node.data['m'].mean(axis=1)})
# We need to copy data in the NodeFlow to the right context.
nf.copy_from_parent(ctx=right_context)
nf.apply_layer(0, lambda node : {'h' : layer(node.data['preprocess'])})
h = nf.layers[0].data['h']
for i in range(nf.num_blocks):
prev_h = nf.layers[i].data['h_{}'.format(i)]
# compute delta_h, the difference of the current activation and the history
nf.layers[i].data['delta_h'] = h - prev_h
# refresh the old history
nf.layers[i].data['h_{}'.format(i)] = h.detach()
# aggregate the delta_h
nf.block_compute(i,
fn.copy_src(src='delta_h', out='m'),
lambda node: {'delta_h': node.data['m'].mean(axis=1)})
delta_h = nf.layers[i + 1].data['delta_h']
agg_h = nf.layers[i + 1].data['agg_h_{}'.format(i)]
# control variate estimator
nf.layers[i + 1].data['h'] = delta_h + agg_h
nf.apply_layer(i + 1, lambda node : {'h' : layer(node.data['h'])})
h = nf.layers[i + 1].data['h']
# update history
nf.copy_to_parent()
和原來代碼稍有不同的是,這里right_context表示數(shù)據(jù)在哪個設(shè)備上,通過將數(shù)據(jù)調(diào)度到正確的設(shè)備上,我們就可以完成多設(shè)備的分布式訓(xùn)練。
Distributed Sampler
因為我們有多個設(shè)備可以進行并行計算(比如說多GPU,多CPU),那么需要不斷地給每個設(shè)備提供nodeflow(計算子圖實例)。DGL采用的做法是分出一部分設(shè)備專門負(fù)責(zé)采樣,將采樣作為服務(wù)提供給計算設(shè)備,計算設(shè)備只負(fù)責(zé)在采樣后的子圖上進行計算。DGL支持同時在多個設(shè)備上運行多個采樣程序,每個采樣程序都可以將采樣結(jié)果發(fā)到計算設(shè)備上。
一個分布式采樣的示例可以這樣寫,首先,在訓(xùn)練之前用戶需要創(chuàng)建一個分布式SamplerReceiver對象:
sampler = dgl.contrib.sampling.SamplerReceiver(graph, ip_addr, num_sampler)
SamplerReceiver`類用來從其他設(shè)備上接收采樣出來的子圖,這個API的三個參數(shù)分別為`parent_graph`, `ip_address`, 和`number_of_samplers
然后,用戶只需要在單機版的訓(xùn)練代碼中改變一行:
for nf in sampler:
for i in range(nf.num_blocks):
# aggregate history on the original graph
g.pull(nf.layer_parent_nid(i+1),
fn.copy_src(src='h_{}'.format(i), out='m'),
lambda node: {'agg_h_{}'.format(i): node.data['m'].mean(axis=1)})
...
其中,代碼for nf in sampler用來代替原單機采樣代碼:
for nf in NeighborSampler(g, batch_size, num_neighbors,
neighbor_type='in', num_hops=L-1,
seed_nodes=labeled_nodes):
其他所有的部分都可以保持不變。
因此,額外的開發(fā)工作主要是要編寫運行在采樣設(shè)備上的采樣邏輯。對于鄰居采樣來說,開發(fā)者只需要拷貝單機采樣的代碼就可以了:
sender = dgl.contrib.sampling.SamplerSender(trainer_address)
...
for n in num_epoch:
for nf in dgl.contrib.sampling.NeighborSampler(graph, batch_size, num_neighbors,
neighbor_type='in',
shuffle=shuffle,
num_workers=num_workers,
num_hops=num_hops,
add_self_loop=add_self_loop,
seed_nodes=seed_nodes):
sender.send(nf, trainer_id)
# tell trainer I have finished current epoch
sender.signal(trainer_id)
后話
本篇博文重點介紹了DGL的并行計算框架,其主要由采樣層-計算層-存儲層三層構(gòu)建而來,采樣和計算分布在不同的機器上,可以并行執(zhí)行。通過這種方式,在存儲充足的情況下,DGL可以處理數(shù)以億計節(jié)點和邊的大圖。
