人人都能學(xué)會的 Python 多線程指南!
大家好,我是老表。
在 Python 中,多線程最常見的一個場景就是爬蟲,例如這樣一個需求,有多個結(jié)構(gòu)一樣的頁面需要爬取,例如下方的URL(豆瓣阿凡達(dá)影評,以10個為例)
url_list = [
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=0',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=20',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=40',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=60',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=80',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=100',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=120',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=140',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=160',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=180']
如果依次爬取,請求第一個頁面——得到返回?cái)?shù)據(jù)——解析數(shù)據(jù)——提取、存儲數(shù)據(jù)——請求第二個頁面,按照這樣的思路,那么大量時間都會浪費(fèi)在請求、返回?cái)?shù)據(jù)上,如果在等待第一個頁面返回?cái)?shù)據(jù)時去請求第二個頁面,就能有效的提高效率,多線程就可以實(shí)現(xiàn)這樣的功能。
在Python中實(shí)現(xiàn)多線程的方法也很多,我將基于 threading 模塊一點(diǎn)一點(diǎn)介紹,注意本文不會太注重于多線程背后的技術(shù)概念(面試常問),僅希望用最少的話教會大家如何實(shí)現(xiàn)。當(dāng)然會在最后介紹如何使用threading模塊來解決上面的爬蟲問題。
首先需要你的電腦安裝好了Python環(huán)境,并且安裝好了Python開發(fā)工具。
如果你還沒有安裝,可以參考以下文章:
如果僅用Python來處理數(shù)據(jù)、爬蟲、數(shù)據(jù)分析或者自動化腳本、機(jī)器學(xué)習(xí)等,建議使用Python基礎(chǔ)環(huán)境+jupyter即可,安裝使用參考Windows/Mac 安裝、使用Python環(huán)境+jupyter notebook
如果想利用Python進(jìn)行web項(xiàng)目開發(fā)等,建議使用Python基礎(chǔ)環(huán)境+Pycharm,安裝使用參考:Windows下安裝、使用Pycharm教程,這下全了 和 Mac下玩轉(zhuǎn)Python-安裝&使用Python/PyCharm 。
threading基本使用
讓我們先從一個簡單的例子開始,定義do_something函數(shù),執(zhí)行該函數(shù)需要消耗1秒
import time
start = time.perf_counter()
def do_something():
print("-> 線程啟動")
time.sleep(1)
print("-> 線程結(jié)束")
do_something()
finish = time.perf_counter()
print(f"全部任務(wù)執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
上面的代碼不難理解,執(zhí)行do_something并計(jì)算耗時,結(jié)果很明顯應(yīng)該是1s
-> 線程啟動
-> 線程結(jié)束
全部任務(wù)執(zhí)行完成,耗時 1.01 秒
現(xiàn)在如果需要執(zhí)行兩次do_something,按照最基本的思路
import time
start = time.perf_counter()
def do_something():
print("-> 線程啟動")
time.sleep(1)
print("-> 線程結(jié)束")
do_something()
do_something()
finish = time.perf_counter()
print(f"全部任務(wù)執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
執(zhí)行上面代碼結(jié)果也很容易猜到是2秒
-> 線程啟動
-> 線程結(jié)束
-> 線程啟動
-> 線程結(jié)束
全部任務(wù)執(zhí)行完成,耗時 2.01 秒
這就是最常規(guī)的 同步 思路,在CPU執(zhí)行第一個函數(shù),也就是等待1s的時間內(nèi),什么也不干,等第一個函數(shù)執(zhí)行完畢后再執(zhí)行第二個函數(shù)
很明顯,這樣讓CPU干等著啥也不干并不是一個很好的選擇,而多線程就是解決這一問題的方法之一,讓CPU在等待某個任務(wù)完成時去執(zhí)行更多的操作,將整個過程簡化為下圖流程,這樣就能充分節(jié)省時間
現(xiàn)在使用threading來通過多線程的方式實(shí)現(xiàn)上面的過程,非常簡單,定義兩個線程并依次啟動即可??
import time
import threading
start = time.perf_counter()
def do_something():
print("-> 線程啟動")
time.sleep(1)
print("-> 線程結(jié)束")
thread1 = threading.Thread(target=do_something)
thread2 = threading.Thread(target=do_something)
thread1.start()
thread2.start()
finish = time.perf_counter()
print(f"全部任務(wù)執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
執(zhí)行上面的代碼,結(jié)果如下
-> 線程啟動
-> 線程啟動
全部任務(wù)執(zhí)行完成,耗時 0.0 秒
-> 線程結(jié)束
-> 線程結(jié)束
可以看到,兩個子線程確實(shí)同時啟動,但是主線程并未等待兩個子線程執(zhí)行完畢就直接結(jié)束。
為了解決這個問題,我們可以使用threading.join()方法,意思是在子線程完成運(yùn)行之前,這個子線程的父線程將一直被阻塞
換成人話就是讓主線程掛起,等待所有子線程結(jié)束再執(zhí)行,體現(xiàn)到代碼上也很簡單,只需要添加兩行即可
import time
import threading
start = time.perf_counter()
def do_something():
print("-> 線程啟動")
time.sleep(1)
print("-> 線程結(jié)束")
thread1 = threading.Thread(target=do_something)
thread2 = threading.Thread(target=do_something)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
finish = time.perf_counter()
print(f"全部任務(wù)執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
運(yùn)行結(jié)果如下,全部代碼在1秒內(nèi)運(yùn)行完畢
-> 線程啟動
-> 線程啟動
-> 線程結(jié)束
-> 線程結(jié)束
全部任務(wù)執(zhí)行完成,耗時 1.01 秒
至此,我們就得到了第一個有效的多線程代碼,相信你也能大致明白threading的基本使用流程。
傳遞參數(shù)
現(xiàn)在來看看如何在多線程之間傳遞參數(shù),讓我們升級代碼:do_something函數(shù)來接受一個參數(shù),控制他睡眠等待的時間
def do_something(num):
print(f"-> 線程{num} 啟動,睡眠 {num} 秒")
time.sleep(num)
print(f"-> 線程{num} 結(jié)束")
在 threading 中,創(chuàng)建線程時可以使用 args 來傳遞參數(shù),例如現(xiàn)在接收一個參數(shù),則上一小節(jié)的代碼可以如下修改
import time
import threading
start = time.perf_counter()
def do_something(num):
print(f"-> 線程{num} 啟動,睡眠 {num} 秒")
time.sleep(num)
print(f"-> 線程{num} 結(jié)束")
thread1 = threading.Thread(target=do_something,args = [1])
thread2 = threading.Thread(target=do_something,args = [2])
thread1.start()
thread2.start()
thread1.join()
thread2.join()
finish = time.perf_counter()
print(f"全部任務(wù)執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
這段代碼中,我分別讓兩個線程等待1、2秒,運(yùn)行結(jié)果顯然應(yīng)該是2秒
-> 線程1 啟動,睡眠 1 秒
-> 線程2 啟動,睡眠 2 秒
-> 線程1 結(jié)束
-> 線程2 結(jié)束
全部任務(wù)執(zhí)行完成,耗時 2.01 秒
如果你的線程函數(shù)需要更多的參數(shù),只需要依次向args中追加即可。
簡化代碼
上面的案例中,我們僅開啟了兩個線程,如果是更多個線程的話,再依次重復(fù)定義、啟動就會顯得十分繁瑣,此時我們可以使用循環(huán)來處理。
例如開啟10個線程,依次睡眠1-10秒,可以先創(chuàng)建一個 list 用于存儲每個線程,接著利用循環(huán)依次創(chuàng)建線程,啟動后追加到剛剛創(chuàng)建的 list 中,之后再依次等待每個線程執(zhí)行完畢,代碼如下
import time
import threading
start = time.perf_counter()
def do_something(num):
print(f"-> 線程{num} 啟動,睡眠 {num} 秒")
time.sleep(num)
print(f"-> 線程{num} 結(jié)束")
thread_list = []
for i in range(1,11):
thread = threading.Thread(target=do_something, args=[i])
thread.start()
thread_list.append(thread)
for t in thread_list:
t.join()
finish = time.perf_counter()
print(f"全部任務(wù)執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
結(jié)果是顯然的,雖然我們執(zhí)行了十次do_something,每次用時1-10秒,但總耗時應(yīng)該為10秒
-> 線程1 啟動,睡眠 1 秒
-> 線程2 啟動,睡眠 2 秒
-> 線程3 啟動,睡眠 3 秒
-> 線程4 啟動,睡眠 4 秒
-> 線程5 啟動,睡眠 5 秒
-> 線程6 啟動,睡眠 6 秒
-> 線程7 啟動,睡眠 7 秒
-> 線程8 啟動,睡眠 8 秒
-> 線程9 啟動,睡眠 9 秒
-> 線程10 啟動,睡眠 10 秒
-> 線程1 結(jié)束
-> 線程2 結(jié)束
-> 線程3 結(jié)束
-> 線程4 結(jié)束
-> 線程5 結(jié)束
-> 線程6 結(jié)束
-> 線程7 結(jié)束
-> 線程8 結(jié)束
-> 線程9 結(jié)束
-> 線程10 結(jié)束
全部任務(wù)執(zhí)行完成,耗時 10.01 秒
共享變量鎖的問題
現(xiàn)在,你應(yīng)該已經(jīng)了解 threading 最基本的用法,只需要將 do_somthing 函數(shù)進(jìn)行修改即可,但是如果你深入使用,還會有其他的問題出現(xiàn),例如共享變量的問題,讓我們繼續(xù)探討。
多線程很常見的一個應(yīng)用就是爬蟲,回到開頭的爬蟲問題,如果我們希望爬取10個網(wǎng)頁的評論,可能會先定一個空dataframe,然后使用多線程都往這個dataframe中寫入數(shù)據(jù),但由于多個線程同時操作這一個變量,可能會導(dǎo)致評論并不是按照順序?qū)懭氲摹?/p>
例如第一個頁面有10條評論,第一個線程寫入了2條后,第二個線程將第二個頁面的前兩條寫入,最終導(dǎo)致十個頁面的評論是亂序存儲!
讓我們把這個問題抽象出來,還是之前的代碼,稍微修改一下
我們先定義了一個空list,線程函數(shù)會將傳入的數(shù)字添加到該list中,在未加鎖的情況下,由于線程競爭,雖然我們線程是按照順序開啟,但是最終數(shù)字并不是按照順序?qū)懭搿?/p>
有沒有辦法解決呢?當(dāng)然有,很自然的想法就是當(dāng)?shù)谝粋€線程操作該變量時,其他線程等著,寫完了再釋放,這就是鎖!
先看代碼
在上面的代碼中,我們使用 threding.Lock 創(chuàng)建了一個線程鎖,之后在線程函數(shù)操作 result 前,首先使用 lock.acquire() 加上鎖,之后操作 results ,在修改完后使用 lock.relese() 釋放,此時其他線程若想操作 results 則會阻塞,等該線程釋放后才能拿走操作中,這樣我們就保證了線程是“安全的”!
最基本的線程鎖用法就如上面代碼所示,定義鎖 --> 上鎖 --> 解鎖,但是一定要注意,lock.acquire() 和 lock.relese(),如果加了鎖但是沒有釋放,后面的線程將會全部阻塞!
限制線程數(shù)量
最后還有一個常見的問題,上面我們需要執(zhí)行幾次線程函數(shù)就開了幾個線程,但是如果需要爬成千上萬個網(wǎng)頁,開這么多線程cpu一定不同意,代碼也會在開啟的線程達(dá)到一定數(shù)量后報錯。
所以如何讓程序只啟動我們指定的線程數(shù)量,例如一次開五個線程,結(jié)束一個再添加一個,直到全部任務(wù)完成?
還是鎖!在 threading 模塊中有一個 BoundedSemaphore(信號量)類,我們可以給他一個初始的信號量(最大線程數(shù)),之后每次有線程獲得信號量的時候(即 acquire() )計(jì)數(shù)器-1,釋放信號量時候(release())計(jì)數(shù)器+1,計(jì)數(shù)器為0的時候其它線程就被阻塞無法獲得信號量。當(dāng)計(jì)數(shù)器為設(shè)定好的上限的時候 BoundedSemaphore 就無法進(jìn)行 release() 操作了。
體現(xiàn)到代碼上則比較簡單,還是基于上面的例子修改
總共需要運(yùn)行十次,我們定義最大線程數(shù)為3,并在線程啟動前調(diào)用acquire方法增加一個計(jì)數(shù),在線程最后釋放。
此時程序一次只能啟動三個線程,如圖中所示,首先啟動123,之后完成123,啟動456,當(dāng)?shù)谒膫€線程結(jié)束啟動第七個線程······直到全部線程結(jié)束。
這里我們同時使用了上一節(jié)說的線程鎖來保護(hù)變量,用 BoundedSemaphore 鎖來控制最大線程數(shù),在實(shí)際寫代碼時就需要小心檢查鎖是否正確釋放,否則就會報錯!
一個真實(shí)的多線程爬蟲案例
至此,threading 模塊最常見的用法就介紹完畢,現(xiàn)在讓我們回到本文一開始的問題,有多個(以十個為例)URL需要爬取,既然每個頁面需要執(zhí)行的操作一樣,如果等待一個頁面爬取完畢再爬第二頁面就太浪費(fèi)時間了。這時就可以仿照上面的思路去使用多線程加速。
我們只需要將上面的do_something函數(shù)修改為對也面的爬取操作,之后的創(chuàng)建啟動線程操作不變即可,代碼如下
import time
import threading
import requests
import pandas as pd
from faker import Faker
from bs4 import BeautifulSoup
def craw_url(url):
global df
fake = Faker()
headers = {'User-Agent': fake.user_agent()}
r = requests.get(url, headers=headers)
soup = BeautifulSoup(r.content, 'html.parser')
review_list = soup.find_all(class_="main review-item")
for i in range(len(review_list)):
rank = review_list[i].select('span')[0].get('title')
time1 = review_list[i].select('span')[1].get('content')
title = review_list[i].select('h2>a')[0].text
df = df.append({'時間': time1,
'評分': rank,
'標(biāo)題': title, }, ignore_index=True)
print("-> 爬取完成")
if __name__ == '__main__':
start = time.perf_counter()
df = pd.DataFrame(columns=['時間', '評分', '標(biāo)題'])
url_list = [
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=0',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=20',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=40',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=60',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=80',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=100',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=120',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=140',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=160',
'https://movie.douban.com/subject/1652587/reviews?sort=time&start=180']
thread_list = []
for i in url_list:
thread = threading.Thread(target=craw_url, args=[i])
thread.start()
thread_list.append(thread)
for t in thread_list:
t.join()
finish = time.perf_counter()
print(f"全部任務(wù)執(zhí)行完成,耗時 {round(finish - start,2)} 秒")
執(zhí)行這段代碼,差不多僅用了1秒就將全部內(nèi)容爬取并存儲到 dataframe 中,比同步的代碼塊了近十倍!如果感興趣的話可以自己嘗試一下。
至此,有關(guān) Python 多線程模塊 threading 的基本用法與需要注意的幾點(diǎn)就介紹完畢,如果全部認(rèn)真看完的話,我相信你一定能照貓畫虎寫出第一個多線程爬蟲腳本。
當(dāng)然有關(guān) Python 多線程還有很多飽受詬病的爭議(GIL),多線程的實(shí)現(xiàn)方法也遠(yuǎn)不止 threading 模塊,例如更常見的寫法是通過 concurrent.futures 模塊以及多進(jìn)程、協(xié)程,這些都留在本系列后續(xù)文章中再進(jìn)一步討論!
點(diǎn)贊+留言+轉(zhuǎn)發(fā),就是對我最大的支持啦~
--End--
文章點(diǎn)贊超過100+
我將在個人視頻號直播(老表Max)
帶大家一起進(jìn)行項(xiàng)目實(shí)戰(zhàn)復(fù)現(xiàn)
掃碼即可加我微信
老表朋友圈經(jīng)常有贈書/紅包福利活動
點(diǎn)擊上方卡片關(guān)注公眾號,回復(fù):1024 領(lǐng)取最新Python學(xué)習(xí)資源 學(xué)習(xí)更多: 整理了我開始分享學(xué)習(xí)筆記到現(xiàn)在超過250篇優(yōu)質(zhì)文章,涵蓋數(shù)據(jù)分析、爬蟲、機(jī)器學(xué)習(xí)等方面,別再說不知道該從哪開始,實(shí)戰(zhàn)哪里找了 “點(diǎn)贊”就是對博主最大的支持

