利用 Python 實現(xiàn)多任務(wù)進(jìn)程
來源:公眾號【杰哥的IT之旅】
作者:阿拉斯加
ID:Jake_Internet
一、進(jìn)程介紹
進(jìn)程:正在執(zhí)行的程序,由程序、數(shù)據(jù)和進(jìn)程控制塊組成,是正在執(zhí)行的程序,程序的一次執(zhí)行過程,是資源調(diào)度的基本單位。
程序:沒有執(zhí)行的代碼,是一個靜態(tài)的。
二、線程和進(jìn)程之間的對比

由圖可知:此時電腦有 9 個應(yīng)用進(jìn)程,但是一個進(jìn)程又會對應(yīng)于多個線程,可以得出結(jié)論:
進(jìn)程:能夠完成多任務(wù),一臺電腦上可以同時運行多個 QQ
線程:能夠完成多任務(wù),一個 QQ 中的多個聊天窗口
根本區(qū)別:進(jìn)程是操作系統(tǒng)資源分配的基本單位,而線程是任務(wù)調(diào)度和執(zhí)行的基本單位.
使用多進(jìn)程的優(yōu)勢:
1、擁有獨立GIL:
首先由于進(jìn)程中 GIL 的存在,Python 中的多線程并不能很好地發(fā)揮多核優(yōu)勢,一個進(jìn)程中的多個線程,在同 一時刻只能有一個線程運行。而對于多進(jìn)程來說,每個進(jìn)程都有屬于自己的 GIL,所以,在多核處理器下,多進(jìn)程的運行是不會受 GIL的影響的。因此,多進(jìn) 程能更好地發(fā)揮多核的優(yōu)勢。
2、效率高
當(dāng)然,對于爬蟲這種 IO 密集型任務(wù)來說,多線程和多進(jìn)程影響差別并不大。對于計算密集型任務(wù)來說,Python 的多進(jìn)程相比多線 程,其多核運行效率會有成倍的提升。

三、Python 實現(xiàn)多進(jìn)程
我們先用一個實例來感受一下:
1、使用 process 類
import?multiprocessing?
def?process(index):?
????print(f'Process:?{index}')?
if?__name__?==?'__main__':?
????for?i?in?range(5):?
????????p?=?multiprocessing.Process(target=process,?args=(i,))?
????????p.start()?
這是一個實現(xiàn)多進(jìn)程最基礎(chǔ)的方式:通過創(chuàng)建 Process 來新建一個子進(jìn)程,其中 target 參數(shù)傳入方法名,args 是方法的參數(shù),是以 元組的形式傳入,其和被調(diào)用的方法 process 的參數(shù)是一一對應(yīng)的。
注意:這里 args 必須要是一個元組,如果只有一個參數(shù),那也要在元組第一個元素后面加一個逗號,如果沒有逗號則 和單個元素本身沒有區(qū)別,無法構(gòu)成元組,導(dǎo)致參數(shù)傳遞出現(xiàn)問題。創(chuàng)建完進(jìn)程之后,我們通過調(diào)用 start 方法即可啟動進(jìn)程了。
運行結(jié)果如下:
Process:?0?
Process:?1?
Process:?2?
Process:?3?
Process:?4?
可以看到,我們運行了 5 個子進(jìn)程,每個進(jìn)程都調(diào)用了 process 方法。process 方法的 index 參數(shù)通過 Process 的 args 傳入,分別是 0~4 這 5 個序號,最后打印出來,5 個子進(jìn)程運行結(jié)束。
2、繼承 process 類
from?multiprocessing?import?Process
import?time
class?MyProcess(Process):
????def?__init__(self,loop):
????????Process.__init__(self)
????????self.loop?=?loop
????def?run(self):
????????for?count?in?range(self.loop):
????????????time.sleep(1)
????????????print(f'Pid:{self.pid}?LoopCount:?{count}')
if?__name__?==?'__main__':
????for?i?in?range(2,5):
????????p?=?MyProcess(i)
????????p.start()
我們首先聲明了一個構(gòu)造方法,這個方法接收一個 loop 參數(shù),代表循環(huán)次數(shù),并將其設(shè)置為全局變量。在 run方法中,又使用這 個 loop 變量循環(huán)了 loop 次并打印了當(dāng)前的進(jìn)程號和循環(huán)次數(shù)。
在調(diào)用時,我們用 range 方法得到了 2、3、4 三個數(shù)字,并把它們分別初始化了 MyProcess 進(jìn)程,然后調(diào)用 start 方法將進(jìn)程啟動起 來。
注意:這里進(jìn)程的執(zhí)行邏輯需要在 run 方法中實現(xiàn),啟動進(jìn)程需要調(diào)用 start 方法,調(diào)用之后 run 方法便會執(zhí)行。
運行結(jié)果如下:
Pid:12976?LoopCount:?0
Pid:15012?LoopCount:?0
Pid:11976?LoopCount:?0
Pid:12976?LoopCount:?1
Pid:15012?LoopCount:?1
Pid:11976?LoopCount:?1
Pid:15012?LoopCount:?2
Pid:11976?LoopCount:?2
Pid:11976?LoopCount:?3
注意,這里的進(jìn)程 pid 代表進(jìn)程號,不同機(jī)器、不同時刻運行結(jié)果可能不同。
四、進(jìn)程之間的通信
1、Queue-隊列 先進(jìn)先出
from?multiprocessing?import?Queue
import?multiprocessing
def?download(p):?#?下載數(shù)據(jù)
????lst?=?[11,22,33,44]
????for?item?in?lst:
????????p.put(item)
????print('數(shù)據(jù)已經(jīng)下載成功....')
def?savedata(p):
????lst?=?[]
????while?True:
????????data?=?p.get()
????????lst.append(data)
????????if?p.empty():
????????????break
????print(lst)
def?main():
????p1?=?Queue()
????t1?=?multiprocessing.Process(target=download,args=(p1,))
????t2?=?multiprocessing.Process(target=savedata,args=(p1,))
????t1.start()
????t2.start()
if?__name__?==?'__main__':
????main()
數(shù)據(jù)已經(jīng)下載成功....
[11,?22,?33,?44]
2、共享全局變量不適用于多進(jìn)程編程
import?multiprocessing
a?=?1
def?demo1():
????global?a
????a?+=?1
def?demo2():
????print(a)
def?main():
????t1?=?multiprocessing.Process(target=demo1)
????t2?=?multiprocessing.Process(target=demo2)
????t1.start()
????t2.start()
if?__name__?==?'__main__':
????main()
運行結(jié)果:
1
有結(jié)果可知:全局變量不共享;
五、進(jìn)程池之間的通信
1、進(jìn)程池引入
當(dāng)需要創(chuàng)建的子進(jìn)程數(shù)量不多時,可以直接利用 multiprocessing 中的 Process 動態(tài)生成多個進(jìn)程,但是如果是上百甚至上千個目標(biāo),手動的去創(chuàng)建的進(jìn)程的工作量巨大,此時就可以用到 multiprocessing 模塊提供的 Pool 方法。
from?multiprocessing?import?Pool
import?os,time,random
def?worker(a):
????t_start?=?time.time()
????print('%s開始執(zhí)行,進(jìn)程號為%d'%(a,os.getpid()))
????time.sleep(random.random()*2)
????t_stop?=?time.time()
????print(a,"執(zhí)行完成,耗時%0.2f"%(t_stop-t_start))
if?__name__?==?'__main__':
????po?=?Pool(3)????????#?定義一個進(jìn)程池
????for?i?in?range(0,10):
????????po.apply_async(worker,(i,))????#?向進(jìn)程池中添加worker的任務(wù)
????print("--start--")
????po.close()??????
????po.join()???????
????print("--end--")
運行結(jié)果:
--start--
0開始執(zhí)行,進(jìn)程號為6664
1開始執(zhí)行,進(jìn)程號為4772
2開始執(zhí)行,進(jìn)程號為13256
0?執(zhí)行完成,耗時0.18
3開始執(zhí)行,進(jìn)程號為6664
2?執(zhí)行完成,耗時0.16
4開始執(zhí)行,進(jìn)程號為13256
1?執(zhí)行完成,耗時0.67
5開始執(zhí)行,進(jìn)程號為4772
4?執(zhí)行完成,耗時0.87
6開始執(zhí)行,進(jìn)程號為13256
3?執(zhí)行完成,耗時1.59
7開始執(zhí)行,進(jìn)程號為6664
5?執(zhí)行完成,耗時1.15
8開始執(zhí)行,進(jìn)程號為4772
7?執(zhí)行完成,耗時0.40
9開始執(zhí)行,進(jìn)程號為6664
6?執(zhí)行完成,耗時1.80
8?執(zhí)行完成,耗時1.49
9?執(zhí)行完成,耗時1.36
--end--
一個進(jìn)程池只能容納 3 個進(jìn)程,執(zhí)行完成才能添加新的任務(wù),在不斷的打開與釋放的過程中循環(huán)往復(fù)。

六、案例:文件批量復(fù)制
操作思路:
獲取要復(fù)制文件夾的名字
創(chuàng)建一個新的文件夾
獲取文件夾里面所有待復(fù)制的文件名
創(chuàng)建進(jìn)程池
向進(jìn)程池添加任務(wù)
代碼如下:
導(dǎo)包
import?multiprocessing
import?os
import?time
定制文件復(fù)制函數(shù)
def?copy_file(Q,oldfolderName,newfolderName,file_name):
????#?文件復(fù)制,不需要返回
????time.sleep(0.5)
????#?print('\r從%s文件夾復(fù)制到%s文件夾的%s文件'%(oldfolderName,newfolderName,file_name),end='')
????old_file?=?open(oldfolderName?+?'/'?+?file_name,'rb')?#?待復(fù)制文件
????content?=?old_file.read()
????old_file.close()
????new_file?=?open(newfolderName?+?'/'?+?file_name,'wb')?#?復(fù)制出的新文件
????new_file.write(content)
????new_file.close()
????Q.put(file_name)?#?向Q隊列中添加文件
定義主函數(shù)
def?main():
????oldfolderName?=?input('請輸入要復(fù)制的文件夾名字:')?#?步驟1獲取要復(fù)制文件夾的名字(可以手動創(chuàng)建,也可以通過代碼創(chuàng)建,這里我們手動創(chuàng)建)
????newfolderName?=?oldfolderName?+?'復(fù)件'
????#?步驟二?創(chuàng)建一個新的文件夾
????if?not?os.path.exists(newfolderName):
????????os.mkdir(newfolderName)
????filenames?=?os.listdir(oldfolderName)?#?3.獲取文件夾里面所有待復(fù)制的文件名
????#?print(filenames)
????pool?=?multiprocessing.Pool(5)?#?4.創(chuàng)建進(jìn)程池
????Q?=?multiprocessing.Manager().Queue()?#?創(chuàng)建隊列,進(jìn)行通信
????for?file_name?in?filenames:
????????pool.apply_async(copy_file,args=(Q,oldfolderName,newfolderName,file_name))?#?5.向進(jìn)程池添加任務(wù)
??????po.close()
????copy_file_num?=?0
????file_count?=?len(filenames)
????#?不知道什么時候完成,所以定義一個死循環(huán)
????while?True:
????????file_name?=?Q.get()
????????copy_file_num?+=?1
????????time.sleep(0.2)
????????print('\r拷貝進(jìn)度%.2f?%%'%(copy_file_num??*?100/file_count),end='')?#?做一個拷貝進(jìn)度條
????????if?copy_file_num?>=?file_count:
????????????break
程序運行
if?__name__?==?'__main__':
????main()
運行結(jié)果如下圖所示:

運行前后文件目錄結(jié)構(gòu)對比
運行前

運行后

推薦閱讀

