詳解:深度學(xué)習(xí)框架數(shù)據(jù)Pipeline設(shè)計

極市導(dǎo)讀
隨著GPU算力越來越強,對于數(shù)據(jù)處理Pipeline的效率也提出了越來越高的要求。本文整理分析了Pytorch的數(shù)據(jù)Pipeline、MindSpore && Tensorflow的local數(shù)據(jù)處理pipeline、Tensorflow中的分布式數(shù)據(jù)處理Pipeline >>加入極市CV技術(shù)交流群,走在計算機視覺的最前沿
前言
過去的雙月我一直在做訓(xùn)練框架中數(shù)據(jù)模塊的工作。核心目的在于優(yōu)化IO的效率和預(yù)處理的加速,致力于支持更多更豐富的數(shù)據(jù)處理方式,以及提升整個數(shù)據(jù)處理的pipeline的效率。隨著日后GPU算力越來越強,對于數(shù)據(jù)處理Pipeline的效率也提出了越來越高的要求。于是在雙月的結(jié)束整理分析各個框架數(shù)據(jù)處理pipeline的設(shè)計。
以下文章第一節(jié),第二節(jié)主要參考MindSpore架構(gòu)師金雪峰的文章,這部分內(nèi)容文章里面寫得很全了,基本把我所想表達整理的內(nèi)容全部寫了。
金雪鋒:AI框架中數(shù)據(jù)處理的挑戰(zhàn)與解決思路
https://zhuanlan.zhihu.com/p/352487023
一、AI框架中的數(shù)據(jù)處理
深度學(xué)習(xí)框架核心三大件事:數(shù)據(jù),計算和通信。數(shù)據(jù)模塊主要包括數(shù)據(jù)讀?。↖O密集型)和數(shù)據(jù)變換(CPU密集型)。典型的訓(xùn)練數(shù)據(jù)處理流程如下圖所示:

圖片來自于 https://zhuanlan.zhihu.com/p/352487023
加載:指從各種異構(gòu)存儲中將訓(xùn)練數(shù)據(jù)加載到內(nèi)存中,加載時涉及數(shù)據(jù)的IO、解碼等處理;目前一般都會從HDFS中或者OBS中讀取序列化存儲的數(shù)據(jù),并在內(nèi)存中進行解析校驗。
Shuffle:訓(xùn)練一般是多個epoch,通過shuffle打亂數(shù)據(jù)集不同epoch的數(shù)據(jù)排序,防止訓(xùn)練過擬合。如果數(shù)據(jù)集支持隨機訪問,則只需按不同順序隨機選擇數(shù)據(jù)就可以非常有效地進行混洗shuffle。如果數(shù)據(jù)集不支持隨機訪問(或僅部分隨機訪問像多個文件對象),那么一個子集的數(shù)據(jù)可以加載到一個特殊的混洗緩沖區(qū)shuffle buffer中。
map:完成訓(xùn)練數(shù)據(jù)的預(yù)處理工作。map為數(shù)據(jù)集算子,表示對整個數(shù)據(jù)集的變換操作。這是tensorflow和mindspore這類框架的算子抽象模式。在pytorch中,更多是以單個數(shù)據(jù)為粒度的處理算子。
batch:數(shù)據(jù)的batch邏輯處理。
repeat:可以通過repeat的方式增加訓(xùn)練的總數(shù)據(jù)量;一次repeat就是加載一遍整個訓(xùn)練集。
模型在進行推理時,同樣會涉及到數(shù)據(jù)處理,不同的是推理時一般加載單樣本進行處理,而非數(shù)據(jù)集。典型的過程如下圖所示:

圖片來自于https://zhuanlan.zhihu.com/p/352487023
二、難點與挑戰(zhàn)
2.1 數(shù)據(jù)處理的高效性
當(dāng)前各AI框架的數(shù)據(jù)處理主要利用CPU運算,訓(xùn)練則利用GPU/AI芯片,兩者是并行的。理想情況下,應(yīng)該在每輪迭代開始前,就準(zhǔn)備好完成增強之后的數(shù)據(jù),保持訓(xùn)練過程持續(xù)地進行。然而在實際的訓(xùn)練中,很多時候數(shù)據(jù)處理成為了阻礙性能提升的瓶頸:或是因為從存儲中讀取數(shù)據(jù)的速度不足(I/O bound),或是因為數(shù)據(jù)增強操作效率過低(CPU bound)。
根據(jù)黃氏定律,GPU/AI芯片的算力每一年會提升一倍,相比于即將失效的摩爾定律,AI芯片的算力提升速度會遠(yuǎn)大于CPU。模型迭代計算效率的不斷提升,對數(shù)據(jù)處理也提出了更高的要求:數(shù)據(jù)處理過程必須足夠高效,才能夠避免GPU/AI芯片因為等待訓(xùn)練數(shù)據(jù)而空閑。
2.2 數(shù)據(jù)處理的靈活性
數(shù)據(jù)處理的靈活性挑戰(zhàn)主要體現(xiàn)在以下兩個方面:
多源數(shù)據(jù)集讀取
多源數(shù)據(jù)集讀取主要有兩種情況
不同數(shù)據(jù)源有著不同的格式。 在一個訓(xùn)練數(shù)據(jù)處理pipeline中涉及到不同數(shù)據(jù)源數(shù)據(jù)讀取處理如下圖所示

針對問題1目前常規(guī)的解決方案主要有針對不同的數(shù)據(jù)源定制相應(yīng)的Reader和數(shù)據(jù)打包成統(tǒng)一格式,兩種解決方案。問題2在tensorflow這種場景下不會成為大的問題,但是在pytorch中遇到這樣的問題就需要特殊的處理,避免不同數(shù)據(jù)源的數(shù)據(jù)讀取變成串行讀取。
數(shù)據(jù)增強算法非常靈活,需要框架提供足夠易用的接口來支持用戶定制數(shù)據(jù)處理過程
為了方便算法工程師快速的實驗各種預(yù)處理方法,算法框架要允許算法工程師輕易的增加新的預(yù)處理算子。但是數(shù)據(jù)pipeline的靈活性常常和效率是一對矛盾的點,很難兼顧二者。想要高效基本都需要底層C++實現(xiàn)算子,但是這對靈活性帶來了麻煩。pytorch很靈活,但是做到高效需要做很多二次開發(fā)。
三、現(xiàn)有的框架中的數(shù)據(jù)pipeline流程
接下來我們介紹一下現(xiàn)存框架的數(shù)據(jù)pipeline,我們核心關(guān)心的是pipeline的靈活性和高效性設(shè)計。靈活性體現(xiàn)在要允許算法工程師靈活的自定義算子,降低開發(fā)的成本,高效性體現(xiàn)在要能夠可拓展,能夠高性能。沒錯,這是一個既要,又要,還要的問題。
Pytorch的數(shù)據(jù)Pipeline設(shè)計與實現(xiàn)
Pytorch的pipeline設(shè)計整體比較清晰明了,所以我們首先拿他開刀。接下來的內(nèi)容中我主要依據(jù)我的這篇博客為主進行介紹,限于篇幅,這篇文章中主要以圖片為主。
大龍:Pytorch數(shù)據(jù)Pipeline設(shè)計總結(jié)
https://zhuanlan.zhihu.com/p/351666693
關(guān)于Pytorch,我們首先介紹其數(shù)據(jù)Pipeline的抽象: Sampler, Dataset, Dataloader, DataloaderItor四個層次,其關(guān)系如下圖所示。Sampler負(fù)責(zé)生成讀取處理的數(shù)據(jù)Index序列,Dataset模塊負(fù)責(zé)定義是數(shù)據(jù)的加載和預(yù)處理,DataloaderItor負(fù)責(zé)進行單進程/多進程數(shù)據(jù)處理的管理,Dataloader則負(fù)責(zé)最高層的用戶交互。

從pipeline的靈活性上講,pytorch無疑是最靈活的,因為本身就是純python的實現(xiàn),自然對算法工程師來講靈活定制成本最低,我們接下來介紹pipeline的高效性設(shè)計。pytorch中支持多進程數(shù)據(jù)加載,其核心流程圖如下所示。

由主進程生成采樣序列,放到各個讀取進程的index隊列中,每個進程讀取處理完數(shù)據(jù)之后,把數(shù)據(jù)通過進程間隊列result_queue來傳給主進程,主進程中的子線程對數(shù)據(jù)做簡單的處理(在pytorch中主要是pin memory的操作,加快CPU到GPU的數(shù)據(jù)拷貝)。主線程從數(shù)據(jù)隊列中讀取數(shù)據(jù)返回給模型進行。
MindSpore && Tensorflow的local數(shù)據(jù)處理pipeline
MindSpore和TF中的pipeline設(shè)計比較像,故在此一并介紹。這兩個框架中的pipeline的設(shè)計思想我是非常喜歡的。其核心是把數(shù)據(jù)處理算子處理抽象為了兩類:數(shù)據(jù)集算子和Tensor算子。數(shù)據(jù)集算子主要負(fù)責(zé)計算資源的調(diào)度和輸入輸出的控制,Tensor算子負(fù)責(zé)具體的數(shù)據(jù)增強的計算。一段典型的數(shù)據(jù)tensorflow預(yù)處理代碼如下所示:
import tensorflow as tfimport tensorflow_addons as tfafrom tensorflow.keras.applications.resnet50 import ResNet50from tensorflow.keras.layers.experimental import preprocessingdef get_dataset(batch_size):# parse TFRecorddef parse_image_function(example_proto):image_feature_description =: tf.io.FixedLenFeature([], tf.string),: tf.io.FixedLenFeature([], tf.int64)}features = tf.io.parse_single_example(image_feature_description)image = tf.io.decode_raw(features['image'], tf.uint8)* 32 * 32])image = tf.reshape(image, [32, 32, 3])label = tf.cast(features['label'], tf.int32)return image, label# dilation filterdef dilate(image, label):dilateFilter = tf.zeros([3, 3, 3], tf.uint8)image = tf.expand_dims(image, 0)image = tf.nn.dilation2d(dilateFilter, strides=[1, 1, 1, 1],dilations=[1, 1, 1, 1],padding='SAME',data_format='NHWC')image = tf.squeeze(image)return image, label# blur filterdef blur(image, label):image = tfa.image.gaussian_filter2d(image=image,filter_shape=(11, 11), sigma=0.8)return image, label# rescale filterdef rescale(image, label):image = preprocessing.Rescaling(1.0 / 255)(image)return image, label# augmentation filtersdef augment(image, label):data_augmentation = tf.keras.Sequential(image = data_augmentation(image)return image, labelautotune = tf.data.experimental.AUTOTUNEoptions = tf.data.Options()= Falserecords = tf.data.Dataset.list_files('data/*',shuffle=True).with_options(options)# load from TFRecord filesds = tf.data.TFRecordDataset(records,num_parallel_reads=autotune).repeat()ds = ds.map(parse_image_function, num_parallel_calls=autotune)ds = ds.map(dilate, num_parallel_calls=autotune)ds = ds.map(blur, num_parallel_calls=autotune)ds = ds.batch(batch_size)ds = ds.map(rescale,num_parallel_calls=autotune)ds = ds.map(augment, num_parallel_calls=autotune)ds = ds.prefetch(autotune)return dsif __name__ == "__main__":model = ResNet50(weights=None,input_shape=(32, 32, 3),classes=10)=tf.losses.SparseCategoricalCrossentropy(),optimizer=tf.optimizers.Adam())dataset = get_dataset(batch_size = 1024)steps_per_epoch=100, epochs=10))
整體實際上是采用了類似于Spark中RDD這樣的處理思想:對數(shù)據(jù)集進行變換。
通過map函數(shù)中傳入各種處理算子(實際上對于Tensorflow底層的節(jié)點來說,就是一個C++函數(shù)調(diào)用鏈),我們構(gòu)建了一個計算圖。計算圖上的每個節(jié)點定義了對數(shù)據(jù)集的處理操作,每個節(jié)點輸入是一個Dataset,變換后輸出仍然是一個Dataset。所以Dataset是tensorflow中處理的基礎(chǔ)核心概念。我們查看MapDatasetOp的源碼會發(fā)現(xiàn),Map的核心輸入是一個Dataset,輸出是一個Dataset,操作是一個Opcontext。
class MapDatasetOp : public UnaryDatasetOpKernel {public:static constexpr const char* const kDatasetType = "Map";static constexpr const char* const kInputDataset = "input_dataset";static constexpr const char* const kOtherArguments = "other_arguments";static constexpr const char* const kFunc = "f";static constexpr const char* const kTarguments = "Targuments";static constexpr const char* const kOutputTypes = "output_types";static constexpr const char* const kOutputShapes = "output_shapes";static constexpr const char* const kUseInterOpParallelism ="use_inter_op_parallelism";static constexpr const char* const kPreserveCardinality ="preserve_cardinality";explicit MapDatasetOp(OpKernelConstruction* ctx);protected:void MakeDataset(OpKernelContext* ctx, DatasetBase* input,DatasetBase** output) override;private:class Dataset;std::shared_ptr<FunctionMetadata> func_metadata_ = nullptr;DataTypeVector output_types_;std::vector<PartialTensorShape> output_shapes_;bool preserve_cardinality_;};
以MindSpore為例子,其數(shù)據(jù)處理pipeline如下圖所示(圖片來源于此:https://zhuanlan.zhihu.com/p/352487023)。這樣抽象的好處在于各個Map算子處理的計算資源解耦。對于處理速度慢的算子我們可以分配更多的計算資源,對于處理速度快的算子我們可以分配更少的計算資源。因為在我們的Dolphin中很明顯的發(fā)現(xiàn),不同的算子耗時差異非常大。更合理的講我們就需要對不同的算子給不同的計算資源,來加速整個預(yù)處理。
從訓(xùn)練的可復(fù)現(xiàn)性上講,Tensorflow和MindSpore采用了各自的保序處理。整個數(shù)據(jù)訓(xùn)練的數(shù)據(jù)保序是通過遞歸算子保序來保證的,即對于一個Map算子來說,保序性意味著輸出的數(shù)據(jù)和輸入的數(shù)據(jù)的順序要一致,如果所有的Map算子都能保證這一點,那么整體來講模型獲得的數(shù)據(jù)順序就是保序的。
MindSpore中使用統(tǒng)一抽象的Connector來處理,Tensorflow中則在各個Map算子中用鎖和異步等待來完成保序處理。

MindSpore中Connector的核心目的就是保證輸出的順序和輸入的順序保持一致。其源碼中的Push和Pop的實現(xiàn)代碼如下所示:
virtual Status Pop(int32_t worker_id, // The worker-id of the caller. See the requirement at the top of this file.T *result) noexcept {{MS_ASSERT(worker_id < num_consumers_);std::unique_lock<std::mutex> lk(m_);RETURN_IF_NOT_OK(cv_.Wait(&lk, [this, worker_id]() { return expect_consumer_ == worker_id; }));RETURN_IF_NOT_OK(queues_[pop_from_]->PopFront(result));pop_from_ = (pop_from_ + 1) % num_producers_;out_buffers_count_++;expect_consumer_ = (expect_consumer_ + 1) % num_consumers_;}cv_.NotifyAll();return Status::OK();}// Add an element into the DbConnector without the overhead of synchronization.// It may block when the internal queue is full.// The element passed to this function will be copied into the internal queue.// @param worker_id The id of a worker thread calling this method.// @param el A const lvalue element to be passed/added/pushed.Status Push(int32_t worker_id, const T &el) noexcept {MS_ASSERT(worker_id < static_cast<int32_t>(queues_.size()));MS_ASSERT(queues_[worker_id] != nullptr);return (queues_[worker_id]->Add(el));}
天下沒有免費的午餐,上述的pipeline我認(rèn)為基本上從效率上來講,是local能夠?qū)崿F(xiàn)的最優(yōu)pipeline,但是缺點在于不易做靈活的定制。這點MindSpore做的更好一些,Tensorflow底層pipeline實現(xiàn)由C++函數(shù)調(diào)用鏈完成,那么就意味著對于自定義的Python處理不能很好的兼容,上層的Python算子為了能夠被底層C++調(diào)用,需要滿足相應(yīng)的限制才可以。
Tensorflow中的分布式數(shù)據(jù)處理Pipeline
Tensorflow好不好用單說,從工程的角度而言,Tensorflow的每個單獨功能的設(shè)計還是做得相當(dāng)不錯。這篇文章重點介紹其中一個就是Tensorflow在2.3版本中推出的分布式數(shù)據(jù)加載Dataservice的功能, 這個功能牛就牛在對用戶來說可以完全無感的切換處理。這個功能的具體內(nèi)容可在我的這篇文章中進行查看。限于篇幅,本文主要直接對圖進行說明。
大龍:分布式數(shù)據(jù)預(yù)處理——TensorFlow DataService架構(gòu)設(shè)計總結(jié)
https://zhuanlan.zhihu.com/p/351810621
如下是一段使用Dataservice的簡短示例代碼:
dispatcher = tf.data.experimental.service.DispatchServer()dispatcher_address = dispatcher.target.split("://")[1]worker = tf.data.experimental.service.WorkerServer(tf.data.experimental.service.WorkerConfig(dispatcher_address=dispatcher_address))dataset = tf.data.Dataset.range(10)dataset = dataset.apply(tf.data.experimental.service.distribute(processing_mode="parallel_epochs", service=dispatcher.target))print(list(dataset.as_numpy_iterator()))
TFDataservice的設(shè)計實際上是一個簡易版的MapReduce。但其任務(wù)分配、執(zhí)行機制和與客戶端的交互邏輯上則簡單了很多。
我們用幾張圖介紹TFDataservice中的機制和概念。首先是任務(wù)的概念。

TFDataservice中有Dataset, Task, Job三層核心概念。對于數(shù)據(jù)集處理算子來說,Dataset是數(shù)據(jù)處理的基本單位,所以數(shù)據(jù)集的概念是最好理解的,就是多個Tensor的集合。針對該數(shù)據(jù)集的一個處理變換pipeline的定義我們稱之為一個Job。對于Job的每一個執(zhí)行實體我們稱之為一個Task。
而在任務(wù)分配與執(zhí)行中,TFDataservice也有三層概念Dispatcher,Worker, TaskRunner,其與上述的任務(wù)概念的交互關(guān)系如下圖所示。具體我們接下來一一解釋。

任務(wù)執(zhí)行這部分和MapReduce的執(zhí)行很像,所以我們回顧一下MR中的任務(wù)執(zhí)行。

在MR中有JobTracker負(fù)責(zé)Job和Task的新建、執(zhí)行以及狀態(tài)維護??蛻舳送^JobTracker新建一個Job,新建Job的同時JobTracker會根據(jù)用戶的定義進行Task的拆分以及新建,在MR中其實就是MapTask以及ReduceTask的新建。每個TaskTracker會通過Heartbeat和JobTracker進行任務(wù)的領(lǐng)取以及當(dāng)前正在執(zhí)行的Task的狀態(tài)的更新。TaskTracker領(lǐng)取到一個任務(wù)之后,會啟用或者復(fù)用一個JVM來執(zhí)行,每個Task的執(zhí)行最終由TaskRunner來完成。
Dataservice中采用幾乎一一對應(yīng)的概念,其中Dispatcher對應(yīng)JobTracker,Worker對應(yīng)TaskTracker,TaskRunner對應(yīng)TaskRunner。只不過在TF中,worker接收到一個Task之后不需要重新起一個JVM來執(zhí)行代碼,而是直接進行函數(shù)調(diào)用即可。整體而言TFDataservice中的分布式數(shù)據(jù)處理的架構(gòu)如下圖所示。這個圖看起來有點亂,但是基本上就描述清了TFDataservice中的設(shè)計。Dispatcher和Worker都是兩個RPC Server。Dataset和Dispatcher,Dispatcher和Worker以及Dataset和Worker之間都是采用RPC調(diào)用進行通信。

四、一些未來需要解決的數(shù)據(jù)pipleline的問題:
1. 資源的自適應(yīng)分配
預(yù)處理pipeline執(zhí)行的進程數(shù)和當(dāng)前各數(shù)據(jù)增強算子使用的處理線程數(shù)目由用戶手工配置,對用戶的調(diào)優(yōu)經(jīng)驗要求極高。通過自適應(yīng)判斷Pipeline瓶頸,由框架給各個數(shù)據(jù)增強算子合理分配CPU資源,可以在訓(xùn)練過程中動態(tài)優(yōu)化數(shù)據(jù)處理性能,免去用戶繁瑣的調(diào)優(yōu)過程。
2. 異構(gòu)硬件加速
當(dāng)前的數(shù)據(jù)處理Pipeline操作在CPU執(zhí)行,一旦出現(xiàn)瓶頸,帶來AI芯片/GPU等待空閑,用戶無法充分利用所有硬件的計算能力。期望構(gòu)建用戶無感知的異構(gòu)硬件資源調(diào)度能力:通過監(jiān)測硬件資源使用,完善TPU/GPU上的數(shù)據(jù)處理算子,采用代價模型自適應(yīng)地將數(shù)據(jù)處理任務(wù)調(diào)度至合適的資源,實現(xiàn)異構(gòu)硬件的充分利用。我們在音頻混響操作(核心為FFT和IFFT操作)中發(fā)現(xiàn)GPU的使用能極大的加速數(shù)據(jù)預(yù)處理,然而過多的進程申請GPU緩存池對訓(xùn)練本身會造成一定的影響,這個問題我們下個雙月會著重研究GPU顯存的精細(xì)化管理。
3. 用戶無感知的分布式數(shù)據(jù)加載
當(dāng)前大部分框架使用本地多進程、多線程進行數(shù)據(jù)預(yù)處理。但是隨著GPU的性能的逐年提升、AIOT端測模型的進一步減小,CPU Bound和IO Bound會越發(fā)的明顯,本地的數(shù)據(jù)處理已經(jīng)難以滿足模型的需求。使用分布式的預(yù)處理是解決數(shù)據(jù)處理和讀取瓶頸的出路。如何做到本地、分布式預(yù)處理用戶無感知切換是未來非常有前景的方向。Tensorflow中的TFDataservice給眾多框架開了一個頭,估計后續(xù)MindSpore等框架會持續(xù)跟上。
推薦閱讀
2020-10-17
2020-02-13
2020-11-14

# CV技術(shù)社群邀請函 #
備注:姓名-學(xué)校/公司-研究方向-城市(如:小極-北大-目標(biāo)檢測-深圳)
即可申請加入極市目標(biāo)檢測/圖像分割/工業(yè)檢測/人臉/醫(yī)學(xué)影像/3D/SLAM/自動駕駛/超分辨率/姿態(tài)估計/ReID/GAN/圖像增強/OCR/視頻理解等技術(shù)交流群
每月大咖直播分享、真實項目需求對接、求職內(nèi)推、算法競賽、干貨資訊匯總、與 10000+來自港科大、北大、清華、中科院、CMU、騰訊、百度等名校名企視覺開發(fā)者互動交流~

