#multiprocessing# Python多進(jìn)程與多線程
“ 文章所涉及內(nèi)容更多來自網(wǎng)絡(luò),在此聲明,并感謝知識的貢獻(xiàn)者!”
Python的多線程效率問題
—
Python的多線程效率問題
盡管Python完全支持多線程編程, 但是解釋器的C語言實(shí)現(xiàn)部分在完全并行執(zhí)行時(shí)并不是線程安全的。實(shí)際上,解釋器被一個(gè)全局解釋器鎖保護(hù)著,它確保任何時(shí)候都只有一個(gè)Python線程執(zhí)行。GIL最大的問題就是Python的多線程程序并不能利用多核CPU的優(yōu)勢 (比如一個(gè)使用了多個(gè)線程的計(jì)算密集型程序只會(huì)在一個(gè)單CPU上面運(yùn)行)。
在討論普通的GIL之前,有一點(diǎn)要強(qiáng)調(diào)的是GIL只會(huì)影響到那些嚴(yán)重依賴CPU的程序(比如計(jì)算型的)。如果你的程序大部分只會(huì)涉及到I/O,比如網(wǎng)絡(luò)交互,那么使用多線程就很合適, 因?yàn)樗鼈兇蟛糠謺r(shí)間都在等待。實(shí)際上,你完全可以放心的創(chuàng)建幾千個(gè)Python線程, 現(xiàn)代操作系統(tǒng)運(yùn)行這么多線程沒有任何壓力,沒啥可擔(dān)心的。
Python的多線程與多進(jìn)程
—
Python的多線程與多進(jìn)程分析
-多進(jìn)程是多核運(yùn)算
-效率:運(yùn)行耗時(shí)最少是:多進(jìn)程 < 普通 < 多線程
Python的多線程與多進(jìn)程示例
—
Python的多線程與多進(jìn)程示例
import multiprocessing as mp
import threading as td
def joba(a, d):
print('job-a')
for i in range(20):
pass
print('job-a-finish')
def jobb(a, d):
print('job-b')
while 1:
pass
print('job-b-finish')
if __name__ == "__main__":
t1 = td.Thread(target=jobb, args=(1, 2))
p1 = mp.Process(target=joba, args=(1, 2))
t1.start()
p1.start()
t1.join()
p1.join()
Python多線程或多進(jìn)程的輸出存儲(chǔ)
Queue的功能是將每個(gè)核或線程的運(yùn)算結(jié)果放在隊(duì)里中, 等到每個(gè)線程或核運(yùn)行完畢后再從隊(duì)列中取出結(jié)果, 繼續(xù)加載運(yùn)算。原因很簡單, 多線程調(diào)用的函數(shù)不能有返回值, 所以使用Queue存儲(chǔ)多個(gè)線程運(yùn)算的結(jié)果
Python的多進(jìn)程池示例
—
Python的多進(jìn)程池示例
import multiprocessing as mp
def job(x):
return x * x
def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)
if __name__ == '__main__':
multicore()
import multiprocessing as mp
def job(x):
return x * x
def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)
res = pool.apply_async(job, (2,))
# 用get獲得結(jié)果
print(res.get())
# 迭代器,i=0時(shí)apply一次,i=1時(shí)apply一次等等
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
# 從迭代器中取出
print([res.get() for res in multi_res])
if __name__ == '__main__':
multicore()
總結(jié)
Pool默認(rèn)調(diào)用是CPU的核數(shù),傳入processes參數(shù)可自定義CPU核數(shù)
map() 放入迭代參數(shù),返回多個(gè)結(jié)果
apply_async()只能放入一組參數(shù),并返回一個(gè)結(jié)果,如果想得到map()的效果需要通過迭代
Python的共享內(nèi)存
—
共享內(nèi)存
只有用共享內(nèi)存才能讓CPU之間有交流
import multiprocessing as mp
value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)
array = mp.Array('i', [1, 2, 3, 4])
其中d和i參數(shù)用來設(shè)置數(shù)據(jù)類型的,d表示一個(gè)雙精浮點(diǎn)類型,i表示一個(gè)帶符號的整型。
在Python的mutiprocessing中,有還有一個(gè)Array類,可以和共享內(nèi)存交互,來實(shí)現(xiàn)在進(jìn)程之間共享數(shù)據(jù)。
| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'` | signed char | int | 1 |
| `'B'` | unsigned char | int | 1 |
| `'u'` | Py_UNICODE | Unicode character | 2 |
| `'h'` | signed short | int | 2 |
| `'H'` | unsigned short | int | 2 |
| `'i'` | signed int | int | 2 |
| `'I'` | unsigned int | int | 2 |
| `'l'` | signed long | int | 4 |
| `'L'` | unsigned long | int | 4 |
| `'q'` | signed long long | int | 8 |
| `'Q'` | unsigned long long | int | 8 |
| `'f'` | float | float | 4 |
| `'d'` | double | float | 8 |
Python共享內(nèi)存變量的鎖
import multiprocessing as mp
import time
def job(v, num, l):
l.acquire() # 鎖住
for _ in range(5):
time.sleep(0.1)
v.value += num # 獲取共享內(nèi)存
print(v.value)
l.release() # 釋放
def multicore():
l = mp.Lock() # 定義一個(gè)進(jìn)程鎖
v = mp.Value('i', 0) # 定義共享內(nèi)存
p1 = mp.Process(target=job, args=(v, 1, l)) # 需要將lock傳入
p2 = mp.Process(target=job, args=(v, 3, l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()
進(jìn)程鎖保證了進(jìn)程p1的完整運(yùn)行,然后才進(jìn)行了進(jìn)程p2的運(yùn)行
Python的多進(jìn)程的multiprocessing模塊
—
Python多進(jìn)程multiprocessing模塊
1、multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
target 是函數(shù)名字,需要調(diào)用的函數(shù)
args 函數(shù)需要的參數(shù),以 tuple 的形式傳入
將daemon設(shè)置為True時(shí),則主線程不必等待子進(jìn)程,主線程結(jié)束則所有結(jié)束
2、相關(guān)方法
star() 方法啟動(dòng)進(jìn)程,
join() 方法實(shí)現(xiàn)進(jìn)程間的同步,等待所有進(jìn)程退出。
close() 用來阻止多余的進(jìn)程涌入進(jìn)程池 Pool 造成進(jìn)程阻塞。
如果要啟動(dòng)大量的子進(jìn)程,可以用進(jìn)程池的方式批量創(chuàng)建子進(jìn)程
Pool提供了一種快捷的方法,賦予函數(shù)并行化處理一系列輸入值的能力,可以將輸入數(shù)據(jù)分配給不同進(jìn)程處理(數(shù)據(jù)并行)。
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
(1)p.apply(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(args,kwargs),然后返回結(jié)果。需要強(qiáng)調(diào)的是:此操作并不會(huì)在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()
(2)p.apply_async(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(args,**kwargs),然后返回結(jié)果。此方法的結(jié)果是 AsyncResult類的實(shí)例,callback是可調(diào)用對象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r(shí),將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。多進(jìn)程并發(fā)!
(3)p.close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成
(4)p.jion():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用
import multiprocessing
def worker(num):
"""該函數(shù)將在子進(jìn)程中執(zhí)行"""
print('Worker %d' % num)
if __name__ == '__main__':
# 創(chuàng)建進(jìn)程池
pool = multiprocessing.Pool(4)
# 啟動(dòng)進(jìn)程池中的進(jìn)程
pool.map(worker, range(10))
# 關(guān)閉進(jìn)程池
pool.close()
# 等待進(jìn)程池中的進(jìn)程結(jié)束
pool.join()
Python的多進(jìn)程間通訊
—
Python進(jìn)程間的通信
多進(jìn)程編程中,不同的進(jìn)程之間需要進(jìn)行通信。multiprocessing模塊提供了多種進(jìn)程間通信的方式,例如使用隊(duì)列、管道、共享內(nèi)存等。
(1)隊(duì)列
隊(duì)列是一種常用的進(jìn)程間通信方式。multiprocessing模塊中提供了Queue類,可以用來創(chuàng)建隊(duì)列。下面是一個(gè)簡單的示例:
import multiprocessing
def producer(q):
"""該函數(shù)將在生產(chǎn)者進(jìn)程中執(zhí)行"""
for i in range(10):
q.put(i)
def consumer(q):
"""該函數(shù)將在消費(fèi)者進(jìn)程中執(zhí)行"""
while True:
item = q.get()
if item is None:
break
print(item)
if __name__ == '__main__':
# 創(chuàng)建隊(duì)列
q = multiprocessing.Queue()
# 創(chuàng)建生產(chǎn)者進(jìn)程
p1 = multiprocessing.Process(target=producer, args=(q,))
# 創(chuàng)建消費(fèi)者進(jìn)程
p2 = multiprocessing.Process(target=consumer, args=(q,))
# 啟動(dòng)進(jìn)程
p1.start()
p2.start()
# 等待進(jìn)程結(jié)束
p1.join()
# 發(fā)送結(jié)束信號
q.put(None)
p2.join()
在上面的代碼中,首先創(chuàng)建了一個(gè)Queue對象,然后創(chuàng)建了一個(gè)生產(chǎn)者進(jìn)程和一個(gè)消費(fèi)者進(jìn)程。生產(chǎn)者進(jìn)程通過調(diào)用put方法將0~9的數(shù)字放入隊(duì)列中,消費(fèi)者進(jìn)程通過調(diào)用get方法從隊(duì)列中獲取數(shù)據(jù),并將其打印出來。最后,調(diào)用put方法發(fā)送結(jié)束信號,然后等待兩個(gè)進(jìn)程結(jié)束。
(2)管道
管道是另一種常用的進(jìn)程間通信方式。multiprocessing模塊中提供了Pipe類,可以用來創(chuàng)建管道。下面是一個(gè)簡單的示例:
import multiprocessing
def producer(conn):
"""該函數(shù)將在生產(chǎn)者進(jìn)程中執(zhí)行"""
for i in range(10):
conn.send(i)
conn.close()
def consumer(conn):
"""該函數(shù)將在消費(fèi)者進(jìn)程中執(zhí)行"""
while True:
item = conn.recv()
if item is None:
break
print(item)
if __name__ == '__main__':
# 創(chuàng)建管道
conn1, conn2 = multiprocessing.Pipe()
# 創(chuàng)建生產(chǎn)者進(jìn)程
p1 = multiprocessing.Process(target=producer, args=(conn1,))
# 創(chuàng)建消費(fèi)者進(jìn)程
p2 = multiprocessing.Process(target=consumer, args=(conn2,))
# 啟動(dòng)進(jìn)程
p1.start()
p2.start()
# 等待進(jìn)程結(jié)束
p1.join()
# 發(fā)送結(jié)束信號
conn1.send(None)
p2.join()
在上面的代碼中,首先創(chuàng)建了一個(gè)管道,然后創(chuàng)建了一個(gè)生產(chǎn)者進(jìn)程和一個(gè)消費(fèi)者進(jìn)程。生產(chǎn)者進(jìn)程通過調(diào)用send方法將0~9的數(shù)字發(fā)送到管道中,消費(fèi)者進(jìn)程通過調(diào)用recv方法從管道中獲取數(shù)據(jù),并將其打印出來。最后,調(diào)用send方法發(fā)送結(jié)束信號,然后等待兩個(gè)進(jìn)程結(jié)束。
(3)共享內(nèi)存
共享內(nèi)存是一種高效的進(jìn)程間通信方式,它允許多個(gè)進(jìn)程共享同一塊內(nèi)存區(qū)域。multiprocessing模塊中提供了Value和Array類,可以用來創(chuàng)建共享內(nèi)存。下面是一個(gè)簡單的示例:
import multiprocessing
def worker1(n):
"""該函數(shù)將在進(jìn)程1中執(zhí)行"""
n.value += 1
print('worker1:', n.value)
def worker2(n):
"""該函數(shù)將在進(jìn)程2中執(zhí)行"""
n.value += 1
print('worker2:', n.value)
if __name__ == '__main__':
# 創(chuàng)建共享內(nèi)存
n = multiprocessing.Value('i', 0)
# 創(chuàng)建進(jìn)程1
p1 = multiprocessing.Process(target=worker1, args=(n,))
# 創(chuàng)建進(jìn)程2
p2 = multiprocessing.Process(target=worker2, args=(n,))
# 啟動(dòng)進(jìn)程
p1.start()
p2.start()
# 等待進(jìn)程結(jié)束
p1.join()
p2.join()
在上面的代碼中,首先創(chuàng)建了一個(gè)Value對象,用于存儲(chǔ)一個(gè)整數(shù)值。然后創(chuàng)建了兩個(gè)進(jìn)程,每個(gè)進(jìn)程都會(huì)將共享內(nèi)存中的值加1,并將其打印出來。最后,等待兩個(gè)進(jìn)程結(jié)束。
除了Value類之外,multiprocessing模塊還提供了Array類,用于創(chuàng)建共享內(nèi)存數(shù)組。
參考資料
—
GIL 不一定有效率
https://yulizi123.github.io/tutorials/python-basic/threading/5-GIL/
The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable. raise RuntimeError
https://blog.csdn.net/qq_34905587/article/details/112020651
Python Multiprocessing(多進(jìn)程)
https://blog.csdn.net/sikh_0529/article/details/126728914
Python多進(jìn)程multiprocessing模塊介紹
https://www.jianshu.com/p/3ff7d04a39bf
Python編程之多進(jìn)程(multiprocessing)詳解
http://stack.itcast.cn/news/20230404/10534657760.shtml
Python multiprocessing進(jìn)程池使用詳解
https://blog.csdn.net/weixin_39253570/article/details/130817783
multiprocessing --- 基于進(jìn)程的并行
https://docs.python.org/zh-cn/3/library/multiprocessing.html?highlight=multi
Python 異步編程 多進(jìn)程
https://www.cjavapy.com/article/2427/
python 中的進(jìn)程池 -- multiprocessing.pool.Pool
http://www.manongjc.com/detail/63-pndlbcnipsucczm.html
Python 異步 IO(asyncio)、多進(jìn)程(multiprocessing)、多線程(multithreading)性能對比
https://www.jianshu.com/p/cac56b3d9a18
