【Python】Python并行——速度++++++++
常規(guī)代碼一直對python的多線程、多進(jìn)程、分布式多進(jìn)程比較好奇。今天淺淺地學(xué)習(xí)了一下,里面涉及的內(nèi)容其實(shí)比較多,包括進(jìn)程鎖、進(jìn)程間的通信、進(jìn)程池、共享內(nèi)存等等。這里給一個(gè)簡單的、大家可能會常用到的例子——從多個(gè)wrfout文件中提取變量T2并單獨(dú)保存輸出為nc文件,一起感受下多進(jìn)程的魅力。如果不妥之處,還望大家不吝賜教!
這份代碼是大家實(shí)際中經(jīng)常使用的,通過循環(huán)來實(shí)現(xiàn)從多個(gè)wrfout文件中提取變量T2并單獨(dú)保存輸出為nc文件。
import xarray as xr
import numpy as np
import glob
import sys
import os
import argparse
import time
def nc2pkl(args):
st =time.time()
if not os.path.exists(args.outdir):
os.mkdir(args.outdir)
files = glob.glob(args.files)
for file in files:
filename = file.split('/')[-1]
print('Reading ',file)
sys.stdout.flush()
ds = xr.open_dataset(file)
T2 = ds['T2']
sys.stdout.flush()
# Write out everything
ofile = args.outdir+'/'+filename+'.nc'
T2.to_netcdf(ofile)
print('Written to '+ofile)
et = time.time()
print(et -st)
def parse_args():
'''
Parser for nc2pkl
'''
parser = argparse.ArgumentParser()
parser.add_argument('-t','--template',type=str,default="../data/wrfout_d01_2018-08-01_00:00:00")
parser.add_argument('-f','--files',type=str,default="../data/wrfout_d01_2018-08*")
parser.add_argument('-o','--outdir',type=str,default="../output/T2")
return parser.parse_args()
if __name__ == '__main__':
nc2pkl(parse_args())
多進(jìn)程并行代碼
這份代碼里面使用了多進(jìn)程并行,從num_processes = 4可以知道開了4個(gè)進(jìn)程同時(shí)處理,可以簡單理解為同一時(shí)間同時(shí)處理4個(gè)wrfout文件。其實(shí)能開多少進(jìn)程取決于我們的計(jì)算機(jī)有多少核數(shù),在linux上可以通過nproc命令查看核數(shù)。
如果大家想使用下面的并行代碼滿足自己的需求,只需要更改被我用-----框起來的函數(shù)定義中的操作即可,比如更改變量,或者增加計(jì)算等。
import xarray as xr
import numpy as np
import multiprocessing as mp
from functools import partial
import glob
import sys
import os
import time
#--------------------------------------
def nc2pkl(file_path, output_dir):
if not os.path.exists(output_dir):
os.mkdir(output_dir)
filename = file_path.split('/')[-1]
print('Reading ',file_path)
sys.stdout.flush()
ds = xr.open_dataset(file_path)
T2 = ds['T2']
sys.stdout.flush()
# Write out everything
ofile = output_dir+'/'+filename+'.nc'
T2.to_netcdf(ofile)
print('Written to '+ofile)
#--------------------------------------
def parallel_nc2pkl(input_dir, output_dir, num_processes):
st = time.time()
# 獲取所有需要處理的文件路徑
file_paths = glob.glob(os.path.join(input_dir, "wrfout_d01_2018-08*"))
# 創(chuàng)建進(jìn)程池
with mp.Pool(processes=num_processes) as pool:
# 使用partial函數(shù)創(chuàng)建一個(gè)只有一個(gè)參數(shù)的nc2pkl函數(shù)
worker_func = partial(nc2pkl, output_dir=output_dir)
# 將需要處理的文件路徑傳遞給進(jìn)程池
pool.map(worker_func, file_paths)
et = time.time()
print(et -st)
if __name__ == "__main__":
# 設(shè)置輸入和輸出目錄
input_dir = "../data/"
output_dir = "../output/T2_multi"
# 設(shè)置進(jìn)程數(shù)量
num_processes = 4
# 并行處理文件
parallel_nc2pkl(input_dir, output_dir, num_processes)
計(jì)算效率
常規(guī)代碼耗時(shí)及CPU使用情況


并行代碼耗時(shí)及CPU使用情況


從中可以看到,并行代碼極大地提升了速度。
注意無論是多進(jìn)程還是多線程,一旦任務(wù)數(shù)量多到一個(gè)限度,就會消耗掉系統(tǒng)所有的資源,結(jié)果就是效率急劇下降,所有任務(wù)都做不好。
參考:
【1】https://mofanpy.com/tutorials/python-basic/multiprocessing/why
【2】https://www.liaoxuefeng.com/wiki/1016959663602400/1017627212385376
歡迎加入讀友交流群因?yàn)槿毫囊褲M200人, 需掃描下方二維碼 添加小編微信拉你入群
