人人都能學會的 Python 多線程指南!
大家好,我是早起。
在 Python 中,多線程最常見的一個場景就是爬蟲,例如這樣一個需求,有多個結構一樣的頁面需要爬取,例如下方的URL(豆瓣阿凡達影評,以10個為例)
url_list = [
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=0',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=20',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=40',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=60',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=80',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=100',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=120',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=140',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=160',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=180']
如果依次爬取,請求第一個頁面——得到返回數(shù)據(jù)——解析數(shù)據(jù)——提取、存儲數(shù)據(jù)——請求第二個頁面,按照這樣的思路,那么大量時間都會浪費在請求、返回數(shù)據(jù)上,如果在等待第一個頁面返回數(shù)據(jù)時去請求第二個頁面,就能有效的提高效率,多線程就可以實現(xiàn)這樣的功能。
在Python中實現(xiàn)多線程的方法也很多,我將基于 threading 模塊一點一點介紹,注意本文不會太注重于多線程背后的技術概念(面試常問),僅希望用最少的話教會大家如何實現(xiàn)。當然會在最后介紹如何使用threading模塊來解決上面的爬蟲問題。
threading基本使用
讓我們先從一個簡單的例子開始,定義do_something函數(shù),執(zhí)行該函數(shù)需要消耗1秒
import time
start = time.perf_counter()
def do_something():
print("-> 線程啟動")
time.sleep(1)
print("-> 線程結束")
do_something()
finish = time.perf_counter()
print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
上面的代碼不難理解,執(zhí)行do_something并計算耗時,結果很明顯應該是1s
-> 線程啟動
-> 線程結束
全部任務執(zhí)行完成,耗時 1.01 秒
現(xiàn)在如果需要執(zhí)行兩次do_something,按照最基本的思路
import time
start = time.perf_counter()
def do_something():
print("-> 線程啟動")
time.sleep(1)
print("-> 線程結束")
do_something()
do_something()
finish = time.perf_counter()
print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
執(zhí)行上面代碼結果也很容易猜到是2秒
-> 線程啟動
-> 線程結束
-> 線程啟動
-> 線程結束
全部任務執(zhí)行完成,耗時 2.01 秒
這就是最常規(guī)的 同步 思路,在CPU執(zhí)行第一個函數(shù),也就是等待1s的時間內(nèi),什么也不干,等第一個函數(shù)執(zhí)行完畢后再執(zhí)行第二個函數(shù)
很明顯,這樣讓CPU干等著啥也不干并不是一個很好的選擇,而多線程就是解決這一問題的方法之一,讓CPU在等待某個任務完成時去執(zhí)行更多的操作,將整個過程簡化為下圖流程,這樣就能充分節(jié)省時間
現(xiàn)在使用threading來通過多線程的方式實現(xiàn)上面的過程,非常簡單,定義兩個線程并依次啟動即可??
import time
import threading
start = time.perf_counter()
def do_something():
print("-> 線程啟動")
time.sleep(1)
print("-> 線程結束")
thread1 = threading.Thread(target=do_something)
thread2 = threading.Thread(target=do_something)
thread1.start()
thread2.start()
finish = time.perf_counter()
print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
執(zhí)行上面的代碼,結果如下
-> 線程啟動
-> 線程啟動
全部任務執(zhí)行完成,耗時 0.0 秒
-> 線程結束
-> 線程結束
可以看到,兩個子線程確實同時啟動,但是主線程并未等待兩個子線程執(zhí)行完畢就直接結束。
為了解決這個問題,我們可以使用threading.join()方法,意思是在子線程完成運行之前,這個子線程的父線程將一直被阻塞
換成人話就是讓主線程掛起,等待所有子線程結束再執(zhí)行,體現(xiàn)到代碼上也很簡單,只需要添加兩行即可
import time
import threading
start = time.perf_counter()
def do_something():
print("-> 線程啟動")
time.sleep(1)
print("-> 線程結束")
thread1 = threading.Thread(target=do_something)
thread2 = threading.Thread(target=do_something)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
finish = time.perf_counter()
print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
運行結果如下,全部代碼在1秒內(nèi)運行完畢
-> 線程啟動
-> 線程啟動
-> 線程結束
-> 線程結束
全部任務執(zhí)行完成,耗時 1.01 秒
至此,我們就得到了第一個有效的多線程代碼,相信你也能大致明白threading的基本使用流程。
傳遞參數(shù)
現(xiàn)在來看看如何在多線程之間傳遞參數(shù),讓我們升級代碼:do_something函數(shù)來接受一個參數(shù),控制他睡眠等待的時間
def do_something(num):
print(f"-> 線程{num} 啟動,睡眠 {num} 秒")
time.sleep(num)
print(f"-> 線程{num} 結束")
在 threading 中,創(chuàng)建線程時可以使用 args 來傳遞參數(shù),例如現(xiàn)在接收一個參數(shù),則上一小節(jié)的代碼可以如下修改
import time
import threading
start = time.perf_counter()
def do_something(num):
print(f"-> 線程{num} 啟動,睡眠 {num} 秒")
time.sleep(num)
print(f"-> 線程{num} 結束")
thread1 = threading.Thread(target=do_something,args = [1])
thread2 = threading.Thread(target=do_something,args = [2])
thread1.start()
thread2.start()
thread1.join()
thread2.join()
finish = time.perf_counter()
print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
這段代碼中,我分別讓兩個線程等待1、2秒,運行結果顯然應該是2秒
-> 線程1 啟動,睡眠 1 秒
-> 線程2 啟動,睡眠 2 秒
-> 線程1 結束
-> 線程2 結束
全部任務執(zhí)行完成,耗時 2.01 秒
如果你的線程函數(shù)需要更多的參數(shù),只需要依次向args中追加即可。
簡化代碼
上面的案例中,我們僅開啟了兩個線程,如果是更多個線程的話,再依次重復定義、啟動就會顯得十分繁瑣,此時我們可以使用循環(huán)來處理。
例如開啟10個線程,依次睡眠1-10秒,可以先創(chuàng)建一個 list 用于存儲每個線程,接著利用循環(huán)依次創(chuàng)建線程,啟動后追加到剛剛創(chuàng)建的 list 中,之后再依次等待每個線程執(zhí)行完畢,代碼如下
import time
import threading
start = time.perf_counter()
def do_something(num):
print(f"-> 線程{num} 啟動,睡眠 {num} 秒")
time.sleep(num)
print(f"-> 線程{num} 結束")
thread_list = []
for i in range(1,11):
thread = threading.Thread(target=do_something, args=[i])
thread.start()
thread_list.append(thread)
for t in thread_list:
t.join()
finish = time.perf_counter()
print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
結果是顯然的,雖然我們執(zhí)行了十次do_something,每次用時1-10秒,但總耗時應該為10秒
-> 線程1 啟動,睡眠 1 秒
-> 線程2 啟動,睡眠 2 秒
-> 線程3 啟動,睡眠 3 秒
-> 線程4 啟動,睡眠 4 秒
-> 線程5 啟動,睡眠 5 秒
-> 線程6 啟動,睡眠 6 秒
-> 線程7 啟動,睡眠 7 秒
-> 線程8 啟動,睡眠 8 秒
-> 線程9 啟動,睡眠 9 秒
-> 線程10 啟動,睡眠 10 秒
-> 線程1 結束
-> 線程2 結束
-> 線程3 結束
-> 線程4 結束
-> 線程5 結束
-> 線程6 結束
-> 線程7 結束
-> 線程8 結束
-> 線程9 結束
-> 線程10 結束
全部任務執(zhí)行完成,耗時 10.01 秒
共享變量鎖的問題
現(xiàn)在,你應該已經(jīng)了解 threading 最基本的用法,只需要將 do_somthing 函數(shù)進行修改即可,但是如果你深入使用,還會有其他的問題出現(xiàn),例如共享變量的問題,讓我們繼續(xù)探討。
多線程很常見的一個應用就是爬蟲,回到開頭的爬蟲問題,如果我們希望爬取10個網(wǎng)頁的評論,可能會先定一個空dataframe,然后使用多線程都往這個dataframe中寫入數(shù)據(jù),但由于多個線程同時操作這一個變量,可能會導致評論并不是按照順序?qū)懭氲摹?/p>
例如第一個頁面有10條評論,第一個線程寫入了2條后,第二個線程將第二個頁面的前兩條寫入,最終導致十個頁面的評論是亂序存儲!
讓我們把這個問題抽象出來,還是之前的代碼,稍微修改一下
我們先定義了一個空list,線程函數(shù)會將傳入的數(shù)字添加到該list中,在未加鎖的情況下,由于線程競爭,雖然我們線程是按照順序開啟,但是最終數(shù)字并不是按照順序?qū)懭搿?/p>
有沒有辦法解決呢?當然有,很自然的想法就是當?shù)谝粋€線程操作該變量時,其他線程等著,寫完了再釋放,這就是鎖!
先看代碼
在上面的代碼中,我們使用 threding.Lock 創(chuàng)建了一個線程鎖,之后在線程函數(shù)操作 result 前,首先使用 lock.acquire() 加上鎖,之后操作 results ,在修改完后使用 lock.relese() 釋放,此時其他線程若想操作 results 則會阻塞,等該線程釋放后才能拿走操作中,這樣我們就保證了線程是“安全的”!
最基本的線程鎖用法就如上面代碼所示,定義鎖 --> 上鎖 --> 解鎖,但是一定要注意,lock.acquire() 和 lock.relese(),如果加了鎖但是沒有釋放,后面的線程將會全部阻塞!
限制線程數(shù)量
最后還有一個常見的問題,上面我們需要執(zhí)行幾次線程函數(shù)就開了幾個線程,但是如果需要爬成千上萬個網(wǎng)頁,開這么多線程cpu一定不同意,代碼也會在開啟的線程達到一定數(shù)量后報錯。
所以如何讓程序只啟動我們指定的線程數(shù)量,例如一次開五個線程,結束一個再添加一個,直到全部任務完成?
還是鎖!在 threading 模塊中有一個 BoundedSemaphore(信號量)類,我們可以給他一個初始的信號量(最大線程數(shù)),之后每次有線程獲得信號量的時候(即 acquire() )計數(shù)器-1,釋放信號量時候(release())計數(shù)器+1,計數(shù)器為0的時候其它線程就被阻塞無法獲得信號量。當計數(shù)器為設定好的上限的時候 BoundedSemaphore 就無法進行 release() 操作了。
體現(xiàn)到代碼上則比較簡單,還是基于上面的例子修改
總共需要運行十次,我們定義最大線程數(shù)為3,并在線程啟動前調(diào)用acquire方法增加一個計數(shù),在線程最后釋放。
此時程序一次只能啟動三個線程,如圖中所示,首先啟動123,之后完成123,啟動456,當?shù)谒膫€線程結束啟動第七個線程······直到全部線程結束。
這里我們同時使用了上一節(jié)說的線程鎖來保護變量,用 BoundedSemaphore 鎖來控制最大線程數(shù),在實際寫代碼時就需要小心檢查鎖是否正確釋放,否則就會報錯!
一個真實的多線程爬蟲案例
至此,threading 模塊最常見的用法就介紹完畢,現(xiàn)在讓我們回到本文一開始的問題,有多個(以十個為例)URL需要爬取,既然每個頁面需要執(zhí)行的操作一樣,如果等待一個頁面爬取完畢再爬第二頁面就太浪費時間了。這時就可以仿照上面的思路去使用多線程加速。
我們只需要將上面的do_something函數(shù)修改為對也面的爬取操作,之后的創(chuàng)建啟動線程操作不變即可,代碼如下
import time
import threading
import requests
import pandas as pd
from faker import Faker
from bs4 import BeautifulSoup
def craw_url(url):
global df
fake = Faker()
headers = {'User-Agent': fake.user_agent()}
r = requests.get(url, headers=headers)
soup = BeautifulSoup(r.content, 'html.parser')
review_list = soup.find_all(class_="main review-item")
for i in range(len(review_list)):
rank = review_list[i].select('span')[0].get('title')
time1 = review_list[i].select('span')[1].get('content')
title = review_list[i].select('h2>a')[0].text
df = df.append({'時間': time1,
'評分': rank,
'標題': title, }, ignore_index=True)
print("-> 爬取完成")
if __name__ == '__main__':
start = time.perf_counter()
df = pd.DataFrame(columns=['時間', '評分', '標題'])
url_list = [
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=0',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=20',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=40',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=60',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=80',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=100',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=120',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=140',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=160',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=180']
thread_list = []
for i in url_list:
thread = threading.Thread(target=craw_url, args=[i])
thread.start()
thread_list.append(thread)
for t in thread_list:
t.join()
finish = time.perf_counter()
print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
執(zhí)行這段代碼,差不多僅用了1秒就將全部內(nèi)容爬取并存儲到 dataframe 中,比同步的代碼塊了近十倍!如果感興趣的話可以自己嘗試一下。
至此,有關 Python 多線程模塊 threading 的基本用法與需要注意的幾點就介紹完畢,如果全部認真看完的話,我相信你一定能照貓畫虎寫出第一個多線程爬蟲腳本。
當然有關 Python 多線程還有很多飽受詬病的爭議(GIL),多線程的實現(xiàn)方法也遠不止 threading 模塊,例如更常見的寫法是通過 concurrent.futures 模塊以及多進程、協(xié)程,這些都留在本系列后續(xù)文章中再進一步討論!
推薦閱讀
牛逼!Python常用數(shù)據(jù)類型的基本操作(長文系列第①篇)
牛逼!Python的判斷、循環(huán)和各種表達式(長文系列第②篇)
