詳解Python多線程、多進(jìn)程
線程與進(jìn)程的區(qū)別
進(jìn)程和線程區(qū)別
線程與進(jìn)程的區(qū)別可以歸納為以下4點(diǎn):
-
地址空間和其它資源(如打開文件):進(jìn)程間相互獨(dú)立,同一進(jìn)程的各線程間共享。某進(jìn)程內(nèi)的線程在其它進(jìn)程不可見(jiàn)。
-
通信:進(jìn)程間通信IPC,線程間可以直接讀寫進(jìn)程數(shù)據(jù)段(如全局變量)來(lái)進(jìn)行通信——需要進(jìn)程同步和互斥手段的輔助,以保證數(shù)據(jù)的一致性。
-
調(diào)度和切換:線程上下文切換比進(jìn)程上下文切換要快得多。
-
在多線程OS中,進(jìn)程不是一個(gè)可執(zhí)行的實(shí)體。
多進(jìn)程和多線程的比較
| 對(duì)比維度 | 多進(jìn)程 | 多線程 | 總結(jié) |
|---|---|---|---|
| 數(shù)據(jù)共享、同步 | 數(shù)據(jù)共享復(fù)雜,同步簡(jiǎn)單 | 數(shù)據(jù)共享簡(jiǎn)單,同步復(fù)雜 | 各有優(yōu)劣 |
| 內(nèi)存、CPU | 占用內(nèi)存多,切換復(fù)雜,CPU利用率低 | 占用內(nèi)存少,切換簡(jiǎn)單,CPU利用率高 | 線程占優(yōu) |
| 創(chuàng)建、銷毀、切換 | 復(fù)雜,速度慢 | 簡(jiǎn)單,速度快 | 線程占優(yōu) |
| 編程、調(diào)試 | 編程簡(jiǎn)單,調(diào)試簡(jiǎn)單 | 編程復(fù)雜,調(diào)試復(fù)雜 | 進(jìn)程占優(yōu) |
| 可靠性 | 進(jìn)程間不會(huì)互相影響 | 一個(gè)線程掛掉將導(dǎo)致整個(gè)進(jìn)程掛掉 | 進(jìn)程占優(yōu) |
| 分布式 | 適用于多核、多機(jī),擴(kuò)展到多臺(tái)機(jī)器簡(jiǎn)單 | 適合于多核 | 進(jìn)程占優(yōu) |
-
線程在進(jìn)程下行進(jìn)(單純的車廂無(wú)法運(yùn)行) -
一個(gè)進(jìn)程可以包含多個(gè)線程(一輛火車可以有多個(gè)車廂) -
不同進(jìn)程間數(shù)據(jù)很難共享(一輛火車上的乘客很難換到另外一輛火車,比如站點(diǎn)換乘) -
同一進(jìn)程下不同線程間數(shù)據(jù)很易共享(A車廂換到B車廂很容易) -
進(jìn)程要比線程消耗更多的計(jì)算機(jī)資源(采用多列火車相比多個(gè)車廂更耗資源) -
進(jìn)程間不會(huì)相互影響,一個(gè)線程掛掉將導(dǎo)致整個(gè)進(jìn)程掛掉(一列火車不會(huì)影響到另外一列火車,但是如果一列火車上中間的一節(jié)車廂著火了,將影響到該趟火車的所有車廂) -
進(jìn)程可以拓展到多機(jī),進(jìn)程最多適合多核(不同火車可以開在多個(gè)軌道上,同一火車的車廂不能在行進(jìn)的不同的軌道上) -
進(jìn)程使用的內(nèi)存地址可以上鎖,即一個(gè)線程使用某些共享內(nèi)存時(shí),其他線程必須等它結(jié)束,才能使用這一塊內(nèi)存。(比如火車上的洗手間)-”互斥鎖(mutex)” -
進(jìn)程使用的內(nèi)存地址可以限定使用量(比如火車上的餐廳,最多只允許多少人進(jìn)入,如果滿了需要在門口等,等有人出來(lái)了才能進(jìn)去)-“信號(hào)量(semaphore)”
Python全局解釋器鎖GIL
The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.
在多線程環(huán)境中,Python 虛擬機(jī)按以下方式執(zhí)行:
-
設(shè)置GIL -
切換到一個(gè)線程去運(yùn)行 -
運(yùn)行直至指定數(shù)量的字節(jié)碼指令,或者線程主動(dòng)讓出控制(可以調(diào)用sleep(0)) -
把線程設(shè)置為睡眠狀態(tài) -
解鎖GIL 再次重復(fù)以上所有步驟
-
使用更高版本Python(對(duì)GIL機(jī)制進(jìn)行了優(yōu)化) -
使用多進(jìn)程替換多線程(多進(jìn)程之間沒(méi)有GIL,但是進(jìn)程本身的資源消耗較多) -
指定cpu運(yùn)行線程(使用affinity模塊) -
使用Jython、IronPython等無(wú)GIL解釋器 -
全I(xiàn)O密集型任務(wù)時(shí)才使用多線程 -
使用協(xié)程(高效的單線程模式,也稱微線程;通常與多進(jìn)程配合使用) -
將關(guān)鍵組件用C/C++編寫為Python擴(kuò)展,通過(guò)ctypes使Python程序直接調(diào)用C語(yǔ)言編譯的動(dòng)態(tài)鏈接庫(kù)的導(dǎo)出函數(shù)。(with nogil調(diào)出GIL限制)
Python的多進(jìn)程包multiprocessing
Multiprocessing產(chǎn)生的背景
import os
print('Process (%s) start...' % os.getpid())
\# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
上述代碼在Linux、Unix和Mac上的執(zhí)行結(jié)果為:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
multiprocessing常用組件及功能
創(chuàng)建管理進(jìn)程模塊:
-
Process(用于創(chuàng)建進(jìn)程) -
Pool(用于創(chuàng)建管理進(jìn)程池) -
Queue(用于進(jìn)程通信,資源共享) -
Value,Array(用于進(jìn)程通信,資源共享) -
Pipe(用于管道通信) -
Manager(用于資源共享)
同步子進(jìn)程模塊:
-
Condition(條件變量) -
Event(事件) -
Lock(互斥鎖) -
RLock(可重入的互斥鎖(同一個(gè)進(jìn)程可以多次獲得它,同時(shí)不會(huì)造成阻塞) -
Semaphore(信號(hào)量)
接下來(lái)就一起來(lái)學(xué)習(xí)下每個(gè)組件及功能的具體使用方法。
Process(用于創(chuàng)建進(jìn)程)
multiprocessing模塊提供了一個(gè)Process類來(lái)代表一個(gè)進(jìn)程對(duì)象。
在multiprocessing中,每一個(gè)進(jìn)程都用一個(gè)Process類來(lái)表示。
構(gòu)造方法:Process([group [, target [, name [, args [, kwargs]]]]])
-
group:分組,實(shí)際上不使用,值始終為None -
target:表示調(diào)用對(duì)象,即子進(jìn)程要執(zhí)行的任務(wù),你可以傳入方法名 -
name:為子進(jìn)程設(shè)定名稱 -
args:要傳給target函數(shù)的位置參數(shù),以元組方式進(jìn)行傳入。 -
kwargs:要傳給target函數(shù)的字典參數(shù),以字典方式進(jìn)行傳入。
實(shí)例方法:
-
start():?jiǎn)?dòng)進(jìn)程,并調(diào)用該子進(jìn)程中的p.run() -
run():進(jìn)程啟動(dòng)時(shí)運(yùn)行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實(shí)現(xiàn)該方法 -
terminate():強(qiáng)制終止進(jìn)程p,不會(huì)進(jìn)行任何清理操作,如果p創(chuàng)建了子進(jìn)程,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況。如果p還保存了一個(gè)鎖那么也將不會(huì)被釋放,進(jìn)而導(dǎo)致死鎖 -
is_alive():返回進(jìn)程是否在運(yùn)行。如果p仍然運(yùn)行,返回True -
join([timeout]):進(jìn)程同步,主進(jìn)程等待子進(jìn)程完成后再執(zhí)行后面的代碼。線程等待p終止(強(qiáng)調(diào):是主線程處于等的狀態(tài),而p是處于運(yùn)行的狀態(tài))。timeout是可選的超時(shí)時(shí)間(超過(guò)這個(gè)時(shí)間,父線程不再等待子線程,繼續(xù)往下執(zhí)行),需要強(qiáng)調(diào)的是,p.join只能join住start開啟的進(jìn)程,而不能join住run開啟的進(jìn)程
屬性介紹:
-
daemon:默認(rèn)值為False,如果設(shè)為True,代表p為后臺(tái)運(yùn)行的守護(hù)進(jìn)程;當(dāng)p的父進(jìn)程終止時(shí),p也隨之終止,并且設(shè)定為True后,p不能創(chuàng)建自己的新進(jìn)程;必須在p.start()之前設(shè)置 -
name:進(jìn)程的名稱 -
pid:進(jìn)程的pid -
exitcode:進(jìn)程在運(yùn)行時(shí)為None、如果為–N,表示被信號(hào)N結(jié)束(了解即可) -
authkey:進(jìn)程的身份驗(yàn)證鍵,默認(rèn)是由os.urandom()隨機(jī)生成的32字符的字符串。這個(gè)鍵的用途是為涉及網(wǎng)絡(luò)連接的底層進(jìn)程間通信提供安全性,這類連接只有在具有相同的身份驗(yàn)證鍵時(shí)才能成功(了解即可)
使用示例:(注意:在windows中Process()必須放到if name == ‘main’:下)
from multiprocessing import Process
import os
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
Pool(用于創(chuàng)建管理進(jìn)程池)
-
processes :要?jiǎng)?chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()返回的數(shù)量。 -
initializer:每個(gè)工作進(jìn)程啟動(dòng)時(shí)要執(zhí)行的可調(diào)用對(duì)象,默認(rèn)為None。如果initializer是None,那么每一個(gè)工作進(jìn)程在開始的時(shí)候會(huì)調(diào)用initializer(*initargs)。 -
initargs:是要傳給initializer的參數(shù)組。 -
maxtasksperchild:工作進(jìn)程退出之前可以完成的任務(wù)數(shù),完成后用一個(gè)新的工作進(jìn)程來(lái)替代原進(jìn)程,來(lái)讓閑置的資源被釋放。maxtasksperchild默認(rèn)是None,意味著只要Pool存在工作進(jìn)程就會(huì)一直存活。 -
context: 用在制定工作進(jìn)程啟動(dòng)時(shí)的上下文,一般使用Pool() 或者一個(gè)context對(duì)象的Pool()方法來(lái)創(chuàng)建一個(gè)池,兩種方法都適當(dāng)?shù)脑O(shè)置了context。
-
apply(func[, args[, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(args,*kwargs),然后返回結(jié)果。需要強(qiáng)調(diào)的是:此操作并不會(huì)在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過(guò)不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()。它是阻塞的。apply很少使用 -
apply_async(func[, arg[, kwds={}[, callback=None]]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(args,*kwargs),然后返回結(jié)果。此方法的結(jié)果是AsyncResult類的實(shí)例,callback是可調(diào)用對(duì)象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r(shí),將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。它是非阻塞。 -
map(func, iterable[, chunksize=None]):Pool類中的map方法,與內(nèi)置的map函數(shù)用法行為基本一致,它會(huì)使進(jìn)程阻塞直到返回結(jié)果。注意,雖然第二個(gè)參數(shù)是一個(gè)迭代器,但在實(shí)際使用中,必須在整個(gè)隊(duì)列都就緒后,程序才會(huì)運(yùn)行子進(jìn)程。 -
map_async(func, iterable[, chunksize=None]):map_async與map的關(guān)系同apply與apply_async -
imap():imap 與 map的區(qū)別是,map是當(dāng)所有的進(jìn)程都已經(jīng)執(zhí)行完了,并將結(jié)果返回了,imap()則是立即返回一個(gè)iterable可迭代對(duì)象。 -
imap_unordered():不保證返回的結(jié)果順序與進(jìn)程添加的順序一致。 -
close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成。 -
join():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用,讓其不再接受新的Process。 -
terminate():結(jié)束工作進(jìn)程,不再處理未處理的任務(wù)。
-
get():返回結(jié)果,如果有必要?jiǎng)t等待結(jié)果到達(dá)。timeout是可選的。如果在指定時(shí)間內(nèi)還沒(méi)有到達(dá),將引發(fā)異常。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時(shí)再次被引發(fā)。 -
ready():如果調(diào)用完成,返回True -
successful():如果調(diào)用完成且沒(méi)有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常 -
wait([timeout]):等待結(jié)果變?yōu)榭捎谩?/span> -
terminate():立即終止所有工作進(jìn)程,同時(shí)不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動(dòng)調(diào)用此函數(shù)
使用示例:
\# -*- coding:utf-8 -*-
\# Pool+map
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
lists = range(100)
pool = Pool(8)
pool.map(test, lists)
pool.close()
pool.join()
\# -*- coding:utf-8 -*-
\# 異步進(jìn)程池(非阻塞)
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
pool = Pool(8)
for i in range(100):
'''
For循環(huán)中執(zhí)行步驟:
(1)循環(huán)遍歷,將100個(gè)子進(jìn)程添加到進(jìn)程池(相對(duì)父進(jìn)程會(huì)阻塞)
(2)每次執(zhí)行8個(gè)子進(jìn)程,等一個(gè)子進(jìn)程執(zhí)行完后,立馬啟動(dòng)新的子進(jìn)程。(相對(duì)父進(jìn)程不阻塞)
apply_async為異步進(jìn)程池寫法。異步指的是啟動(dòng)子進(jìn)程的過(guò)程,與父進(jìn)程本身的執(zhí)行(print)是異步的,而For循環(huán)中往進(jìn)程池添加子進(jìn)程的過(guò)程,與父進(jìn)程本身的執(zhí)行卻是同步的。
'''
pool.apply_async(test, args=(i,)) # 維持執(zhí)行的進(jìn)程總數(shù)為8,當(dāng)一個(gè)進(jìn)程執(zhí)行完后啟動(dòng)一個(gè)新進(jìn)程.
print("test")
pool.close()
pool.join()
\# -*- coding:utf-8 -*-
\# 異步進(jìn)程池(非阻塞)
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
pool = Pool(8)
for i in range(100):
'''
實(shí)際測(cè)試發(fā)現(xiàn),for循環(huán)內(nèi)部執(zhí)行步驟:
(1)遍歷100個(gè)可迭代對(duì)象,往進(jìn)程池放一個(gè)子進(jìn)程
(2)執(zhí)行這個(gè)子進(jìn)程,等子進(jìn)程執(zhí)行完畢,再往進(jìn)程池放一個(gè)子進(jìn)程,再執(zhí)行。(同時(shí)只執(zhí)行一個(gè)子進(jìn)程)
for循環(huán)執(zhí)行完畢,再執(zhí)行print函數(shù)。
'''
pool.apply(test, args=(i,)) # 維持執(zhí)行的進(jìn)程總數(shù)為8,當(dāng)一個(gè)進(jìn)程執(zhí)行完后啟動(dòng)一個(gè)新進(jìn)程.
print("test")
pool.close()
pool.join()
Queue(用于進(jìn)程通信,資源共享)
-
maxsize是隊(duì)列中允許最大項(xiàng)數(shù),省略則無(wú)大小限制。
-
put():用以插入數(shù)據(jù)到隊(duì)列。put方法還有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,該方法會(huì)阻塞timeout指定的時(shí)間,直到該隊(duì)列有剩余的空間。如果超時(shí),會(huì)拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會(huì)立即拋出Queue.Full異常。 -
get():可以從隊(duì)列讀取并且刪除一個(gè)元素。get方法有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,那么在等待時(shí)間內(nèi)沒(méi)有取到任何元素,會(huì)拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個(gè)值可用,則立即返回該值,否則,如果隊(duì)列為空,則立即拋出Queue.Empty異常。若不希望在empty的時(shí)候拋出異常,令blocked為True或者參數(shù)全部置空即可。 -
get_nowait():同q.get(False) -
put_nowait():同q.put(False) -
empty():調(diào)用此方法時(shí)q為空則返回True,該結(jié)果不可靠,比如在返回True的過(guò)程中,如果隊(duì)列中又加入了項(xiàng)目。 -
full():調(diào)用此方法時(shí)q已滿則返回True,該結(jié)果不可靠,比如在返回True的過(guò)程中,如果隊(duì)列中的項(xiàng)目被取走。 -
qsize():返回隊(duì)列中目前項(xiàng)目的正確數(shù)量,結(jié)果也不可靠,理由同q.empty()和q.full()一樣
使用示例:
from multiprocessing import Process, Queue
import os, time, random
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__ == "__main__":
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join() # 等待pw結(jié)束
pr.terminate() # pr進(jìn)程里是死循環(huán),無(wú)法等待其結(jié)束,只能強(qiáng)行終止
-
maxsize:隊(duì)列中允許最大項(xiàng)數(shù),省略則無(wú)大小限制。
-
task_done():使用者使用此方法發(fā)出信號(hào),表示q.get()的返回項(xiàng)目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除項(xiàng)目的數(shù)量,將引發(fā)ValueError異常 -
join():生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊(duì)列中所有的項(xiàng)目均被處理。阻塞將持續(xù)到隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止
使用示例:
\# -*- coding:utf-8 -*-
from multiprocessing import Process, JoinableQueue
import time, random
def consumer(q):
while True:
res = q.get()
print('消費(fèi)者拿到了 %s' % res)
q.task_done()
def producer(seq, q):
for item in seq:
time.sleep(random.randrange(1,2))
q.put(item)
print('生產(chǎn)者做好了 %s' % item)
q.join()
if __name__ == "__main__":
q = JoinableQueue()
seq = ('產(chǎn)品%s' % i for i in range(5))
p = Process(target=consumer, args=(q,))
p.daemon = True # 設(shè)置為守護(hù)進(jìn)程,在主線程停止時(shí)p也停止,但是不用擔(dān)心,producer內(nèi)調(diào)用q.join保證了consumer已經(jīng)處理完隊(duì)列中的所有元素
p.start()
producer(seq, q)
print('主線程')
Value,Array(用于進(jìn)程通信,資源共享)
-
typecode_or_type:定義ctypes()對(duì)象的類型,可以傳Type code或 C Type,具體對(duì)照表見(jiàn)下文。 -
args:傳遞給typecode_or_type構(gòu)造函數(shù)的參數(shù) -
lock:默認(rèn)為True,創(chuàng)建一個(gè)互斥鎖來(lái)限制對(duì)Value對(duì)象的訪問(wèn),如果傳入一個(gè)鎖,如Lock或RLock的實(shí)例,將用于同步。如果傳入False,Value的實(shí)例就不會(huì)被鎖保護(hù),它將不是進(jìn)程安全的。
| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'` | signed char | int | 1 |
| `'B'` | unsigned char | int | 1 |
| `'u'` | Py_UNICODE | Unicode character | 2 |
| `'h'` | signed short | int | 2 |
| `'H'` | unsigned short | int | 2 |
| `'i'` | signed int | int | 2 |
| `'I'` | unsigned int | int | 2 |
| `'l'` | signed long | int | 4 |
| `'L'` | unsigned long | int | 4 |
| `'q'` | signed long long | int | 8 |
| `'Q'` | unsigned long long | int | 8 |
| `'f'` | float | float | 4 |
| `'d'` | double | float | 8 |
參考地址:https://docs.python.org/3/library/array.html
Array
構(gòu)造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])
-
typecode_or_type:同上 -
size_or_initializer:如果它是一個(gè)整數(shù),那么它確定數(shù)組的長(zhǎng)度,并且數(shù)組將被初始化為零。否則,size_or_initializer是用于初始化數(shù)組的序列,其長(zhǎng)度決定數(shù)組的長(zhǎng)度。 -
kwds:傳遞給typecode_or_type構(gòu)造函數(shù)的參數(shù) -
lock:同上
使用示例:
import multiprocessing
def f(n, a):
n.value = 3.14
a[0] = 5
if __name__ == '__main__':
num = multiprocessing.Value('d', 0.0)
arr = multiprocessing.Array('i', range(10))
p = multiprocessing.Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
注意:Value和Array只適用于Process類。
Pipe(用于管道通信)
-
dumplex:默認(rèn)管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發(fā)送。
-
send(obj):通過(guò)連接發(fā)送對(duì)象。obj是與序列化兼容的任意對(duì)象 -
recv():接收conn2.send(obj)發(fā)送的對(duì)象。如果沒(méi)有消息可接收,recv方法會(huì)一直阻塞。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會(huì)拋出EOFError。 -
close():關(guān)閉連接。如果conn1被垃圾回收,將自動(dòng)調(diào)用此方法 -
fileno():返回連接使用的整數(shù)文件描述符 -
poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True。timeout指定等待的最長(zhǎng)時(shí)限。如果省略此參數(shù),方法將立即返回結(jié)果。如果將timeout射成None,操作將無(wú)限期地等待數(shù)據(jù)到達(dá)。 -
recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息。maxlength指定要接收的最大字節(jié)數(shù)。如果進(jìn)入的消息,超過(guò)了這個(gè)最大值,將引發(fā)IOError異常,并且在連接上無(wú)法進(jìn)行進(jìn)一步讀取。如果連接的另外一端已經(jīng)關(guān)閉,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常。 -
send_bytes(buffer [, offset [, size]]):通過(guò)連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)接口的任意對(duì)象,offset是緩沖區(qū)中的字節(jié)偏移量,而size是要發(fā)送字節(jié)數(shù)。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進(jìn)行接收 -
recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息,并把它保存在buffer對(duì)象中,該對(duì)象支持可寫入的緩沖區(qū)接口(即bytearray對(duì)象或類似的對(duì)象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移。返回值是收到的字節(jié)數(shù)。如果消息長(zhǎng)度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。
使用示例:
from multiprocessing import Process, Pipe
import time
\# 子進(jìn)程執(zhí)行方法
def f(Subconn):
time.sleep(1)
Subconn.send("吃了嗎")
print("來(lái)自父親的問(wèn)候:", Subconn.recv())
Subconn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe() # 創(chuàng)建管道兩端
p = Process(target=f, args=(child_conn,)) # 創(chuàng)建子進(jìn)程
p.start()
print("來(lái)自兒子的問(wèn)候:", parent_conn.recv())
parent_conn.send("嗯")
Manager(用于資源共享)
-
address:(hostname,port),指定服務(wù)器的網(wǎng)址地址,默認(rèn)為簡(jiǎn)單分配一個(gè)空閑的端口 -
authkey:連接到服務(wù)器的客戶端的身份驗(yàn)證,默認(rèn)為current_process().authkey的值
-
start([initializer[, initargs]]):?jiǎn)?dòng)一個(gè)單獨(dú)的子進(jìn)程,并在該子進(jìn)程中啟動(dòng)管理器服務(wù)器 -
get_server():獲取服務(wù)器對(duì)象 -
connect():連接管理器對(duì)象 -
shutdown():關(guān)閉管理器對(duì)象,只能在調(diào)用了start()方法之后調(diào)用
-
address:只讀屬性,管理器服務(wù)器正在使用的地址
-
Array(self,*args,**kwds) -
BoundedSemaphore(self,*args,**kwds) -
Condition(self,*args,**kwds) -
Event(self,*args,**kwds) -
JoinableQueue(self,*args,**kwds) -
Lock(self,*args,**kwds) -
Namespace(self,*args,**kwds) -
Pool(self,*args,**kwds) -
Queue(self,*args,**kwds) -
RLock(self,*args,**kwds) -
Semaphore(self,*args,**kwds) -
Value(self,*args,**kwds) -
dict(self,*args,**kwds) -
list(self,*args,**kwds)
使用示例:
import multiprocessing
def f(x, arr, l, d, n):
x.value = 3.14
arr[0] = 5
l.append('Hello')
d[1] = 2
n.a = 10
if __name__ == '__main__':
server = multiprocessing.Manager()
x = server.Value('d', 0.0)
arr = server.Array('i', range(10))
l = server.list()
d = server.dict()
n = server.Namespace()
proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n))
proc.start()
proc.join()
print(x.value)
print(arr)
print(l)
print(d)
print(n)
同步子進(jìn)程模塊
Lock(互斥鎖)
構(gòu)造方法:Lock()
實(shí)例方法:
-
acquire([timeout]): 使線程進(jìn)入同步阻塞狀態(tài),嘗試獲得鎖定。 -
release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
使用示例:
from multiprocessing import Process, Lock
def l(lock, num):
lock.acquire()
print("Hello Num: %s" % (num))
lock.release()
if __name__ == '__main__':
lock = Lock() # 這個(gè)一定要定義為全局
for num in range(20):
Process(target=l, args=(lock, num)).start()
RLock(可重入的互斥鎖(同一個(gè)進(jìn)程可以多次獲得它,同時(shí)不會(huì)造成阻塞)
RLock(可重入鎖)是一個(gè)可以被同一個(gè)線程請(qǐng)求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級(jí)”的概念,處于鎖定狀態(tài)時(shí),RLock被某個(gè)線程擁有。擁有RLock的線程可以再次調(diào)用acquire(),釋放鎖時(shí)需要調(diào)用release()相同次數(shù)。可以認(rèn)為RLock包含一個(gè)鎖定池和一個(gè)初始值為0的計(jì)數(shù)器,每次成功調(diào)用 acquire()/release(),計(jì)數(shù)器將+1/-1,為0時(shí)鎖處于未鎖定狀態(tài)。
構(gòu)造方法:RLock()
實(shí)例方法:
-
acquire([timeout]):同Lock -
release(): 同Lock
Semaphore(信號(hào)量)
構(gòu)造方法:Semaphore([value])
-
value:設(shè)定信號(hào)量,默認(rèn)值為1
實(shí)例方法:
-
acquire([timeout]):同Lock -
release(): 同Lock
使用示例:
from multiprocessing import Process, Semaphore
import time, random
def go_wc(sem, user):
sem.acquire()
print('%s 占到一個(gè)茅坑' % user)
time.sleep(random.randint(0, 3))
sem.release()
print(user, 'OK')
if __name__ == '__main__':
sem = Semaphore(2)
p_l = []
for i in range(5):
p = Process(target=go_wc, args=(sem, 'user%s' % i,))
p.start()
p_l.append(p)
for i in p_l:
i.join()
Condition(條件變量)
構(gòu)造方法:Condition([lock/rlock])
-
可以傳遞一個(gè)Lock/RLock實(shí)例給構(gòu)造方法,否則它將自己生成一個(gè)RLock實(shí)例。
-
acquire([timeout]):首先進(jìn)行acquire,然后判斷一些條件。如果條件不滿足則wait -
release():釋放 Lock -
wait([timeout]): 調(diào)用這個(gè)方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。處于wait狀態(tài)的線程接到通知后會(huì)重新判斷條件。 -
notify(): 調(diào)用這個(gè)方法將從等待池挑選一個(gè)線程并通知,收到通知的線程將自動(dòng)調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池);其他線程仍然在等待池中。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。 -
notifyAll(): 調(diào)用這個(gè)方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
使用示例:
import multiprocessing
import time
def stage_1(cond):
"""perform first stage of work,
then notify stage_2 to continue
"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
print('{} done and ready for stage 2'.format(name))
cond.notify_all()
def stage_2(cond):
"""wait for the condition telling us stage_1 is done"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
cond.wait()
print('{} running'.format(name))
if __name__ == '__main__':
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name='s1',
target=stage_1,
args=(condition,))
s2_clients = [
multiprocessing.Process(
name='stage_2[{}]'.format(i),
target=stage_2,
args=(condition,),
)
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()
Event(事件)
使用示例:
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')
其他內(nèi)容
from multiprocessing.dummy import Pool as ThreadPool
multiprocessing.dummy與早期的threading,不同的點(diǎn)好像是在多多核CPU下,只綁定了一個(gè)核心(具體未考證)。
參考文檔:
-
https://docs.python.org/3/library/multiprocessing.html -
https://www.rddoc.com/doc/Python/3.6.0/zh/library/multiprocessing/
Python并發(fā)之concurrent.futures
Executor
ThreadPoolExecutor對(duì)象
ThreadPoolExecutor類是Executor子類,使用線程池執(zhí)行異步調(diào)用。
class concurrent.futures.ThreadPoolExecutor(max_workers)
使用max_workers數(shù)目的線程池執(zhí)行異步調(diào)用。
ProcessPoolExecutor對(duì)象
ThreadPoolExecutor類是Executor子類,使用進(jìn)程池執(zhí)行異步調(diào)用。
class concurrent.futures.ProcessPoolExecutor(max_workers=None)
submit()方法
Executor中定義了submit()方法,這個(gè)方法的作用是提交一個(gè)可執(zhí)行的回調(diào)task,并返回一個(gè)future實(shí)例。future對(duì)象代表的就是給定的調(diào)用。
Executor.submit(fn, *args, **kwargs)
-
fn:需要異步執(zhí)行的函數(shù) -
*args, **kwargs:fn參數(shù)
使用示例:
from concurrent import futures
def test(num):
import time
return time.ctime(), num
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test, 1)
print(future.result())
map()方法
Executor.map(func, *iterables, timeout=None)
-
func:需要異步執(zhí)行的函數(shù) -
*iterables:可迭代對(duì)象,如列表等。每一次func執(zhí)行,都會(huì)從iterables中取參數(shù)。 -
timeout:設(shè)置每次異步操作的超時(shí)時(shí)間,timeout的值可以是int或float,如果操作超時(shí),會(huì)返回raisesTimeoutError;如果不指定timeout參數(shù),則不設(shè)置超時(shí)間。
使用示例:
from concurrent import futures
def test(num):
import time
return time.ctime(), num
data = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for future in executor.map(test, data):
print(future)
shutdown()方法
釋放系統(tǒng)資源,在Executor.submit()或 Executor.map()等異步操作后調(diào)用。使用with語(yǔ)句可以避免顯式調(diào)用此方法。
Executor.shutdown(wait=True)
Future
-
cancel():試圖取消調(diào)用。如果調(diào)用當(dāng)前正在執(zhí)行,并且不能被取消,那么該方法將返回False,否則調(diào)用將被取消,方法將返回True。 -
cancelled():如果成功取消調(diào)用,返回True。 -
running():如果調(diào)用當(dāng)前正在執(zhí)行并且不能被取消,返回True。 -
done():如果調(diào)用成功地取消或結(jié)束了,返回True。 -
result(timeout=None):返回調(diào)用返回的值。如果調(diào)用還沒(méi)有完成,那么這個(gè)方法將等待超時(shí)秒。如果調(diào)用在超時(shí)秒內(nèi)沒(méi)有完成,那么就會(huì)有一個(gè)Futures.TimeoutError將報(bào)出。timeout可以是一個(gè)整形或者浮點(diǎn)型數(shù)值,如果timeout不指定或者為None,等待時(shí)間無(wú)限。如果futures在完成之前被取消了,那么 CancelledError 將會(huì)報(bào)出。 -
exception(timeout=None):返回調(diào)用拋出的異常,如果調(diào)用還未完成,該方法會(huì)等待timeout指定的時(shí)長(zhǎng),如果該時(shí)長(zhǎng)后調(diào)用還未完成,就會(huì)報(bào)出超時(shí)錯(cuò)誤futures.TimeoutError。timeout可以是一個(gè)整形或者浮點(diǎn)型數(shù)值,如果timeout不指定或者為None,等待時(shí)間無(wú)限。如果futures在完成之前被取消了,那么 CancelledError 將會(huì)報(bào)出。如果調(diào)用完成并且無(wú)異常報(bào)出,返回None. -
add_done_callback(fn):將可調(diào)用fn捆綁到future上,當(dāng)Future被取消或者結(jié)束運(yùn)行,fn作為future的唯一參數(shù)將會(huì)被調(diào)用。如果future已經(jīng)運(yùn)行完成或者取消,fn將會(huì)被立即調(diào)用。 -
wait(fs, timeout=None, return_when=ALL_COMPLETED) -
等待fs提供的 Future 實(shí)例(possibly created by different Executor instances) 運(yùn)行結(jié)束。返回一個(gè)命名的2元集合,分表代表已完成的和未完成的 -
return_when 表明什么時(shí)候函數(shù)應(yīng)該返回。它的值必須是一下值之一: -
FIRST_COMPLETED :函數(shù)在任何future結(jié)束或者取消的時(shí)候返回。 -
FIRST_EXCEPTION :函數(shù)在任何future因?yàn)楫惓=Y(jié)束的時(shí)候返回,如果沒(méi)有future報(bào)錯(cuò),效果等于 -
ALL_COMPLETED :函數(shù)在所有future結(jié)束后才會(huì)返回。 -
as_completed(fs, timeout=None):參數(shù)是一個(gè) Future 實(shí)例列表,返回值是一個(gè)迭代器,在運(yùn)行結(jié)束后產(chǎn)出 Future實(shí)例 。
使用示例:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_5_secs(num):
sleep(randint(1, 5))
return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_5_secs, x))
print(1)
for x in as_completed(futures):
print(x.result())
print(2)
參考鏈接:https://pythonhosted.org/futures 作者:錢魏Way 來(lái)源:https://www.biaodianfu.com/python-multi-thread-and-multi-process.html
本文僅做學(xué)術(shù)分享,如有侵權(quán),請(qǐng)聯(lián)系刪文。
—THE END—
