Python 進(jìn)程、線程和協(xié)程實戰(zhàn)指北
↑?關(guān)注 + 星標(biāo)?,每天學(xué)Python新技能
后臺回復(fù)【大禮包】送你Python自學(xué)大禮包
1. 前言
使用線程是為了并行還是加速?
為什么我使用多線程之后,處理速度并沒有預(yù)期的快,甚至更慢了?
我應(yīng)該選擇多進(jìn)程處理還是多線程處理?
協(xié)程和線程有什么不同?
什么情況下使用協(xié)程?
計算密集型任務(wù):任務(wù)包含大量計算,CPU占用率高
IO密集型任務(wù):任務(wù)包含頻繁的、持續(xù)的網(wǎng)絡(luò)IO和磁盤IO
混合型任務(wù):既有計算也有IO
2. 線程
2.1 線程的最大意義在于并行
請寫一段代碼,提示用戶從鍵盤輸入任意字符,然后等待用戶輸入。如果用戶在10秒鐘完成輸入(按回車鍵),則顯示輸入內(nèi)容并結(jié)束程序;否則,不再等待用戶輸入,而是直接提示超時并結(jié)束程序。
import?os,?time
import?threading
def?monitor(pid):
????time.sleep(10)
????print('\n超時退出!')
????os._exit(0)
m?=?threading.Thread(target=monitor,?args=(os.getpid(),?))
m.setDaemon(True)
m.start()
s?=?input('請輸入>>>')
print('接收到鍵盤輸入:%s'%s)
print('程序正常結(jié)束。')
2.2 使用線程處理IO密集型任務(wù)
import?time
import?requests
import?threading
urls?=?['https://www.baidu.com',?'https://cn.bing.com']
def?get_html(n):
????for?i?in?range(n):
????????url?=?urls[i%2]
????????resp?=?requests.get(url)
????????#print(resp.ok,?url)
t0?=?time.time()
get_html(100)?#?請求100次
t1?=?time.time()
print('1個線程請求100次,耗時%0.3f秒鐘'%(t1-t0))
for?n_thread?in?(2,5,10,20,50):
????t0?=?time.time()
????ths?=?list()
????for?i?in?range(n_thread):
????????ths.append(threading.Thread(target=get_html,?args=(100//n_thread,)))
????????ths[-1].setDaemon(True)
????????ths[-1].start()
????for?i?in?range(n_thread):
????????ths[i].join()
????t1?=?time.time()
????print('%d個線程請求100次,耗時%0.3f秒鐘'%(n_thread,?t1-t0))
1個線程請求100次,耗時30.089秒鐘
2個線程請求100次,耗時15.087秒鐘
5個線程請求100次,耗時7.803秒鐘
10個線程請求100次,耗時4.112秒鐘
20個線程請求100次,耗時3.160秒鐘
50個線程請求100次,耗時3.564秒鐘
2.3 使用線程處理計算密集型任務(wù)

低端增強算法(也有人叫做伽馬矯正)其實很簡單:對于[0,255]區(qū)間內(nèi)的灰度值v0,指定矯正系數(shù)y,使用下面的公式,即可達(dá)到矯正后的灰度值v1,其中y一般選擇2或者3,上圖右就是y為3的效果。

import?time
import?cv2
import?numpy?as?np
import?threading
def?gamma_adjust_np(im,?gamma,?out_file):
????"""伽馬增強函數(shù):使用廣播和矢量化計算"""
????out?=?(np.power(im.astype(np.float32)/255,?1/gamma)*255).astype(np.uint8)
????cv2.imwrite(out_file,?out)
def?gamma_adjust_py(im,?gamma,?out_file):
????"""伽馬增強函數(shù):使用循環(huán)逐像素計算"""
????rows,?cols?=?im.shape
????out?=?im.astype(np.float32)
????for?i?in?range(rows):
????????for?j?in?range(cols):
????????????out[i,j]?=?pow(out[i,j]/255,?1/3)*255
????cv2.imwrite(out_file,?out.astype(np.uint8))
im?=?cv2.imread('river.jpg',?cv2.IMREAD_GRAYSCALE)
rows,?cols?=?im.shape
print('照片分辨率為%dx%d'%(cols,?rows))
t0?=?time.time()
gamma_adjust_np(im,?3,?'river_3.jpg')
t1?=?time.time()
print('借助NumPy廣播特性,耗時%0.3f秒鐘'%(t1-t0))
t0?=?time.time()
im_3?=?gamma_adjust_py(im,?3,?'river_3_cycle.jpg')
t1?=?time.time()
print('單線程逐像素處理,耗時%0.3f秒鐘'%(t1-t0))
t0?=?time.time()
th_1?=?threading.Thread(target=gamma_adjust_py,?args=(im[:rows//2],?3,?'river_3_1.jpg'))
th_1.setDaemon(True)
th_1.start()
th_2?=?threading.Thread(target=gamma_adjust_py,?args=(im[rows//2:],?3,?'river_3_2.jpg'))
th_2.setDaemon(True)
th_2.start()
th_1.join()
th_2.join()
t1?=?time.time()
print('啟用兩個線程逐像素處理,耗時%0.3f秒鐘'%(t1-t0))
運行結(jié)果如下:
照片分辨率為4088x2752
借助NumPy廣播特性,耗時0.381秒鐘
單線程逐像素處理,耗時34.228秒鐘
啟用兩個線程逐像素處理,耗時36.087秒鐘
2.4 線程池
>>>?from?concurrent.futures?import?ThreadPoolExecutor
>>>?def?pow2(x):
????????return?x*x
>>>?with?ThreadPoolExecutor(max_workers=4)?as?pool:?#?4個線程的線程池
????????result?=?pool.map(pow2,?range(10))?#?使用4個線程分別計算0~9的平方
>>>?list(result)?#?result是一個生成器,轉(zhuǎn)成列表才可以直觀地看到計算結(jié)果
[0,?1,?4,?9,?16,?25,?36,?49,?64,?81]
>>>?from?concurrent.futures?import?ThreadPoolExecutor
>>>?def?pow2(x):
????????return?x*x
>>>?def?pow3(x):
????????return?x*x*x
>>>?def?save_result(task):?#?保存線程計算結(jié)果
????????global?result
????????result.append(task.result())
>>>?result?=?list()
>>>?with?ThreadPoolExecutor(max_workers=3)?as?pool:
????????for?i?in?range(10):
????????????if?i%2:?#?奇數(shù)做平方運算
????????????????task?=?pool.submit(pow2,?i)
????????????else:?#?偶數(shù)做立方運算
????????????????task?=?pool.submit(pow3,?i)
????????????task.add_done_callback(save_result)?#?為每個線程指定結(jié)束后的回調(diào)函數(shù)
>>>?result
[0,?1,?8,?9,?64,?25,?216,?49,?512,?81]
3. 進(jìn)程
3.1 使用進(jìn)程處理計算密集型任務(wù)
import?time
import?cv2
import?numpy?as?np
import?multiprocessing?as?mp
def?gamma_adjust_py(im,?gamma,?out_file):
????"""伽馬增強函數(shù):使用循環(huán)逐像素計算"""
????rows,?cols?=?im.shape
????out?=?im.astype(np.float32)
????for?i?in?range(rows):
????????for?j?in?range(cols):
????????????out[i,j]?=?pow(out[i,j]/255,?1/3)*255
????cv2.imwrite(out_file,?out.astype(np.uint8))
if?__name__?==?'__main__':
????mp.freeze_support()
????im_fn?=?'river.jpg'
????im?=?cv2.imread(im_fn,?cv2.IMREAD_GRAYSCALE)
????rows,?cols?=?im.shape
????print('照片分辨率為%dx%d'%(cols,?rows))
????t0?=?time.time()
????pro_1?=?mp.Process(target=gamma_adjust_py,?args=(im[:rows//2],?3,?'river_3_1.jpg'))
????pro_1.daemon?=?True
????pro_1.start()
????pro_2?=?mp.Process(target=gamma_adjust_py,?args=(im[rows//2:],?3,?'river_3_2.jpg'))
????pro_2.daemon?=?True
????pro_2.start()
????pro_1.join()
????pro_2.join()
????t1?=?time.time()
????print('啟用兩個進(jìn)程逐像素處理,耗時%0.3f秒鐘'%(t1-t0))
照片分辨率為4088x2752
啟用兩個進(jìn)程逐像素處理,耗時17.786秒鐘
3.2 進(jìn)程間通信示例
import?os,?time,?random
import?multiprocessing?as?mp
def?sub_process_A(q):
????"""A進(jìn)程函數(shù):生成數(shù)據(jù)"""
????while?True:
????????time.sleep(5*random.random())?#?在0-5秒之間隨機(jī)延時
????????q.put(random.randint(10,100))?#?隨機(jī)生成[10,100]之間的整數(shù)
def?sub_process_B(q):
????"""B進(jìn)程函數(shù):使用數(shù)據(jù)"""
????words?=?['哈哈,',?'天哪!',?'My God!',?'咦,天上掉餡餅了?']
????while?True:
????????print('%s撿到了%d塊錢!'%(words[random.randint(0,3)],?q.get()))
if?__name__?==?'__main__':
????print('主進(jìn)程(%s)開始,按回車鍵結(jié)束本程序'%os.getpid())
????q?=?mp.Queue(10)
????p_a?=?mp.Process(target=sub_process_A,?args=(q,))
????p_a.daemon?=?True
????p_a.start()
????p_b?=?mp.Process(target=sub_process_B,?args=(q,))
????p_b.daemon?=?True
????p_b.start()
????input()
3.3 進(jìn)程池
apply_async(func[, args[, kwds[, callback]]]) :非阻塞式提交。即使進(jìn)程池已滿,也會接
受新的任務(wù),不會阻塞主進(jìn)程。新任務(wù)將處于等待狀態(tài)。apply(func[, args[, kwds]]) :阻塞式提交。若進(jìn)程池已滿,則主進(jìn)程阻塞,直至有空閑
進(jìn)程可以使用。
import?time
import?multiprocessing?as?mp
def?power(x,?a=2):
????"""進(jìn)程函數(shù):冪函數(shù)"""
????time.sleep(1)
????print('%d的%d次方等于%d'%(x,?a,?pow(x,?a)))
def?demo():
????mpp?=?mp.Pool(processes=4)
????for?item?in?[2,3,4,5,6,7,8,9]:
????????mpp.apply_async(power,?(item,?))?#?非阻塞提交新任務(wù)
????????#mpp.apply(power,?(item,?))?#?阻塞提交新任務(wù)
????mpp.close()?#?關(guān)閉進(jìn)程池,意味著不再接受新的任務(wù)
????print('主進(jìn)程走到這里,正在等待子進(jìn)程結(jié)束')
????mpp.join()?#?等待所有子進(jìn)程結(jié)束
????print('程序結(jié)束')
if?__name__?==?'__main__':
????demo()
4. 協(xié)程
4.1 協(xié)程和線程的區(qū)別
4.2 協(xié)程演進(jìn)史
4.3 協(xié)程應(yīng)用示例
import?asyncio,?random
async?def?rich(q,?total):
????"""任性的富豪,隨機(jī)撒錢"""
????while?total?>?0:
????????money?=?random.randint(10,100)
????????total?-=?money
????????await?q.put(money)?#?隨機(jī)生成[10,100]之間的整數(shù)
????????print('富豪瀟灑地拋了%d塊錢'%money)
????????await?asyncio.sleep(3*random.random())?#?在0-3秒之間隨機(jī)延時
async?def?lucky(q,?name):
????"""隨時可以撿到錢的幸運兒"""
????while?True:
????????money?=?await?q.get()
????????q.task_done()
????????print('%s撿到了%d塊錢!'%(name,?money))
async?def?run():
????q?=?asyncio.Queue(1)
????producers?=?[asyncio.create_task(rich(q,?300))]
????consumers?=?[asyncio.create_task(lucky(q,?name))?for?name?in?'ABC']
????await?asyncio.gather(*producers,)
????await?q.join()
????for?c?in?consumers:
????????c.cancel()
if?__name__?==?'__main__':
????asyncio.run(run())
富豪瀟灑地拋了42塊錢
A撿到了42塊錢!
富豪瀟灑地拋了97塊錢
A撿到了97塊錢!
富豪瀟灑地拋了100塊錢
B撿到了100塊錢!
富豪瀟灑地拋了35塊錢
C撿到了35塊錢!
富豪瀟灑地拋了17塊錢
A撿到了17塊錢!
富豪拋完了手中的錢,轉(zhuǎn)身離去
見面禮
掃碼加我微信備注「三劍客」送你上圖三本Python入門電子書?
推薦閱讀
點分享 點收藏 點點贊 點在看





