<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          PyTorch Dataloader讀取時,如何在進(jìn)程之間傳輸數(shù)據(jù)?

          共 5943字,需瀏覽 12分鐘

           ·

          2021-10-12 18:58

          點擊上方機器學(xué)習(xí)與生成對抗網(wǎng)絡(luò)”,關(guān)注星標(biāo)

          獲取有趣、好玩的前沿干貨!

          來源|知乎? 作者|Envy
          鏈接|https://zhuanlan.zhihu.com/p/409629586
          編輯|人工智能前沿講習(xí)
          最近我在做PyTorch的Dataloader相關(guān)的開發(fā),有一個問題讓我比較在意:PyTorch的Dataloader在啟動多個進(jìn)程讀取樣本的時候,這些數(shù)據(jù)是怎么在進(jìn)程之間進(jìn)行傳輸?shù)模繒粫攵嘤嗟膬?nèi)存拷貝?

          01

          Dataloader使用multiprocess.Queue來傳輸數(shù)據(jù)
          先簡單介紹一下Dataloader的多進(jìn)程模式,Dataloader在構(gòu)造的時候,若num_workers不為0,就會啟動num_workers個worker進(jìn)程,然后主進(jìn)程會向worker進(jìn)程分發(fā)讀取任務(wù),worker進(jìn)程讀到數(shù)據(jù)之后,再把數(shù)據(jù)放到隊列中供主進(jìn)程取用。Worker進(jìn)程所執(zhí)行的代碼片段如下:
          while watchdog.is_alive():    try:        r = index_queue.get(timeout=MP_STATUS_CHECK_INTERVAL)    except queue.Empty:        continue    if isinstance(r, _ResumeIteration):        # Acknowledge the main process        data_queue.put((r, None))        iteration_end = False        # Recreate the fetcher for worker-reuse policy        fetcher = _DatasetKind.create_fetcher(            dataset_kind, dataset, auto_collation, collate_fn, drop_last)        continue    elif r is None:        # Received the final signal        assert done_event.is_set() or iteration_end        break    elif done_event.is_set() or iteration_end:        # `done_event` is set. But I haven't received the final signal        # (None) yet. I will keep continuing until get it, and skip the        # processing steps.        continue    idx, index = r    data: Union[_IterableDatasetStopIteration, ExceptionWrapper]    if init_exception is not None:        data = init_exception        init_exception = None    else:        try:            data = fetcher.fetch(index)        except Exception as e:            if isinstance(e, StopIteration) and dataset_kind == _DatasetKind.Iterable:                data = _IterableDatasetStopIteration(worker_id)                # Set `iteration_end`                #   (1) to save future `next(...)` calls, and                #   (2) to avoid sending multiple `_IterableDatasetStopIteration`s.                iteration_end = True            else:                # It is important that we don't store exc_info in a variable.                # `ExceptionWrapper` does the correct thing.                # See NOTE [ Python Traceback Reference Cycle Problem ]                data = ExceptionWrapper(                    where="in DataLoader worker process {}".format(worker_id))    data_queue.put((idx, data))????del?data,?idx,?index,?r??#?save?memory
          其中,data_queue通常是一個torch.multiprocessing.Queue的實例。
          看到這里,似乎Dataloader只是平平無奇地使用了torch.multiprocessing.Queue的接口,難道torch.multiprocessing.Queue有一些高級的技巧?
          繼續(xù)看torch.multiprocessing.Queue的代碼,發(fā)現(xiàn)它只是簡單地把multiprocessing.Queue包了一下。眾所周知, multiprocessing.Queue在Linux使用socket來實現(xiàn),那難道讀上來的數(shù)據(jù)需要在socket之間傳來傳去嗎,效率也太低了吧?!不對,PyTorch一定還有其他騷操作。

          02

          Tensor(CPU Tensor)在multiprocessing.Queue中的序列化和反序列化
          在通常的用法里,Dataloader從Dataset里讀出來的數(shù)據(jù)都會被collate_fn轉(zhuǎn)成CPU Tensor,那我們就繼續(xù)看看,Tensor是怎么在隊列中序列化和反序列化的。
          可以看到,torch.Tensor重載了__reduce_ex__()函數(shù),序列化的時候只會用到 Tensor.storage, Tensor.storage_offset, size, stride, requires_grad和backward_hooks;而反序列化的torch._utils._rebuild_tensor_v2()也只會用到以上信息。
          multiprocessing.Queue是使用pickle來做序列化和反序列化的,而重載__reduce_ex__()正是自定義序列化反序列化方式的方法之一。
          那看起來,CPU Tensor在進(jìn)程中傳輸時,是在接收進(jìn)程中把Tensor重新構(gòu)建了一遍,而構(gòu)建Tensor時候用到的信息, Tensor.storage_offset, size, stride, requires_grad和backward_hooks,都只是用于描述Tensor的meta信息,實際和數(shù)據(jù)相關(guān)的,就只有Tensor.storage了。

          03

          Tensor.Storage的序列化與反序列化
          Tensor.Storage同樣重載了pickle的序列化與反序列化過程,在torch/multiprocessing/reduction.py中,給Tensor.Storage 注冊了reduce函數(shù)reduce_storage.
          這里為什么使用copyreg庫而不是重載__reduce__(), 在copyreg的注釋里說copyreg是專用于C extension的.按照這個說法,在reductions.py里為Tensor注冊的reduce function應(yīng)該是沒有起效的。
          def reduce_storage(storage):    from . import get_sharing_strategy    if storage.is_cuda:        raise RuntimeError("Cannot pickle CUDA storage; try pickling a CUDA tensor instead")    elif get_sharing_strategy() == 'file_system':        metadata = storage._share_filename_()        cache_key = metadata[1]        rebuild = rebuild_storage_filename        storage._shared_incref()    elif storage.size() == 0:        # This is special cased because Empty tensors        # (with size 0) cannot be mmapped.        return (rebuild_storage_empty, (type(storage),))    else:        fd, size = storage._share_fd_()        df = multiprocessing.reduction.DupFd(fd)        cache_key = fd_id(fd)        metadata = (df, size)        rebuild = rebuild_storage_fd  # type: ignore[assignment]
          shared_cache[cache_key] = StorageWeakRef(storage) return (rebuild, (type(storage),) + metadata)
          這個函數(shù)首先根據(jù)環(huán)境中的sharing strategy來決定共享內(nèi)存的使用方式,然后若storage原本不在共享內(nèi)存中的話,就把它拷到共享內(nèi)存中去,比如_share_fd_()的實現(xiàn)。


          04

          小結(jié)
          看到這里,我們可以大概得出一個結(jié)論了。
          Worker進(jìn)程從Dataset中讀出來的Tensor本身是普通的CPU Tensor,但當(dāng)把它放到multiprocessing.Queue中去的時候,這個Tensor的數(shù)據(jù)會被拷到共享內(nèi)存中,Queue只會發(fā)送這個Tensor所具有的meta信息,主進(jìn)程接到這些meta信息之后,就可以從共享內(nèi)存中的數(shù)據(jù)重新構(gòu)建Tensor。
          這里有一點值得注意,如果你想要驗證這個過程,在發(fā)送進(jìn)程調(diào)用multiprocessing.Queue.put()之后,立即調(diào)用Tensor.is_shared()并不會返回True,因為put()是非阻塞的,只有當(dāng)Tensor被QueueFeedThread序列化完成之后再調(diào)用is_shared(),才會得到預(yù)期中的結(jié)果。

          05

          Dataloader的小心機
          在default_collate中,有這樣一段小代碼:
          if isinstance(elem, torch.Tensor):    out = None    if torch.utils.data.get_worker_info() is not None:        # If we're in a background process, concatenate directly into a        # shared memory tensor to avoid an extra copy        numel = sum([x.numel() for x in batch])        storage = elem.storage()._new_shared(numel)        out = elem.new(storage)????return?torch.stack(batch,?0,?out=out)
          含義是,如果batch中的數(shù)據(jù)已經(jīng)是Tensor了,那么,如果這是一個Worker進(jìn)程,就開一段共享內(nèi)存把這個batch放進(jìn)去。因為collate的時候無論如何都會有一次內(nèi)存拷貝(除非底層的Dataset有其他保證),那么這個操作就省掉了之后放進(jìn)隊列中的那一次內(nèi)存拷貝。
          不過我隨便找了幾個自定義了collate_fn的模型看了一下他們寫的collate過程,是沒有把這一點考慮進(jìn)去的。這也算是Dataloader的一個小心機吧,有緣人就用得上。


          猜您喜歡:

          等你著陸!【GAN生成對抗網(wǎng)絡(luò)】知識星球!

          CVPR 2021專題1:GAN的改進(jìn)

          CVPR 2021 | GAN的說話人驅(qū)動、3D人臉論文匯總

          CVPR 2021 | 圖像轉(zhuǎn)換 今如何?幾篇GAN論文

          【CVPR 2021】通過GAN提升人臉識別的遺留難題

          CVPR 2021生成對抗網(wǎng)絡(luò)GAN部分論文匯總

          經(jīng)典GAN不得不讀:StyleGAN

          最新最全20篇!基于 StyleGAN 改進(jìn)或應(yīng)用相關(guān)論文

          超100篇!CVPR 2020最全GAN論文梳理匯總!

          附下載 | 《Python進(jìn)階》中文版

          附下載 | 經(jīng)典《Think Python》中文版

          附下載 | 《Pytorch模型訓(xùn)練實用教程》

          附下載 | 最新2020李沐《動手學(xué)深度學(xué)習(xí)》

          附下載 |?《可解釋的機器學(xué)習(xí)》中文版

          附下載 |《TensorFlow 2.0 深度學(xué)習(xí)算法實戰(zhàn)》

          附下載 | 超100篇!CVPR 2020最全GAN論文梳理匯總!

          附下載 |《計算機視覺中的數(shù)學(xué)方法》分享

          瀏覽 46
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩一级片高清 | 成人偷拍自拍在线观看 | 亚洲无码app | 91在线无码精品秘 少萝 | 操逼手机视频 |