<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          人人都能學會的 Python 多線程指南!

          共 12361字,需瀏覽 25分鐘

           ·

          2022-01-23 12:43

          大家好,我是早起。

          Python 中,多線程最常見的一個場景就是爬蟲,例如這樣一個需求,有多個結構一樣的頁面需要爬取,例如下方的URL(豆瓣阿凡達影評,以10個為例)

            url_list = [
                'https://movie.douban.com/subject/1652587/reviews?sort=time&start=0',
                'https://movie.douban.com/subject/1652587/reviews?sort=time&start=20',
                'https://movie.douban.com/subject/1652587/reviews?sort=time&start=40',
                'https://movie.douban.com/subject/1652587/reviews?sort=time&start=60',
                'https://movie.douban.com/subject/1652587/reviews?sort=time&start=80',
                'https://movie.douban.com/subject/1652587/reviews?sort=time&start=100',
                'https://movie.douban.com/subject/1652587/reviews?sort=time&start=120',
                'https://movie.douban.com/subject/1652587/reviews?sort=time&start=140',
                'https://movie.douban.com/subject/1652587/reviews?sort=time&start=160',
                'https://movie.douban.com/subject/1652587/reviews?sort=time&start=180']

          如果依次爬取,請求第一個頁面——得到返回數(shù)據(jù)——解析數(shù)據(jù)——提取、存儲數(shù)據(jù)——請求第二個頁面,按照這樣的思路,那么大量時間都會浪費在請求、返回數(shù)據(jù)上,如果在等待第一個頁面返回數(shù)據(jù)時去請求第二個頁面,就能有效的提高效率,多線程就可以實現(xiàn)這樣的功能。

          在Python中實現(xiàn)多線程的方法也很多,我將基于 threading 模塊一點一點介紹,注意本文不會太注重于多線程背后的技術概念(面試常問),僅希望用最少的話教會大家如何實現(xiàn)。當然會在最后介紹如何使用threading模塊來解決上面的爬蟲問題。

          threading基本使用

          讓我們先從一個簡單的例子開始,定義do_something函數(shù),執(zhí)行該函數(shù)需要消耗1秒

          import time

          start = time.perf_counter()

          def do_something():
              print("-> 線程啟動")
              time.sleep(1)
              print("-> 線程結束")

          do_something()

          finish = time.perf_counter()

          print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")

          上面的代碼不難理解,執(zhí)行do_something并計算耗時,結果很明顯應該是1s

          -> 線程啟動
          -> 線程結束
          全部任務執(zhí)行完成,耗時 1.01 秒

          現(xiàn)在如果需要執(zhí)行兩次do_something,按照最基本的思路

          import time

          start = time.perf_counter()


          def do_something():
              print("-> 線程啟動")
              time.sleep(1)
              print("-> 線程結束")


          do_something()
          do_something()

          finish = time.perf_counter()

          print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")

          執(zhí)行上面代碼結果也很容易猜到是2秒

          -> 線程啟動
          -> 線程結束
          -> 線程啟動
          -> 線程結束
          全部任務執(zhí)行完成,耗時 2.01 秒

          這就是最常規(guī)的 同步 思路,在CPU執(zhí)行第一個函數(shù),也就是等待1s的時間內(nèi),什么也不干,等第一個函數(shù)執(zhí)行完畢后再執(zhí)行第二個函數(shù)

          很明顯,這樣讓CPU干等著啥也不干并不是一個很好的選擇,而多線程就是解決這一問題的方法之一,讓CPU在等待某個任務完成時去執(zhí)行更多的操作,將整個過程簡化為下圖流程,這樣就能充分節(jié)省時間

          現(xiàn)在使用threading來通過多線程的方式實現(xiàn)上面的過程,非常簡單,定義兩個線程并依次啟動即可??

          import time
          import threading

          start = time.perf_counter()


          def do_something():
              print("-> 線程啟動")
              time.sleep(1)
              print("-> 線程結束")


          thread1 = threading.Thread(target=do_something)
          thread2 = threading.Thread(target=do_something)

          thread1.start()
          thread2.start()

          finish = time.perf_counter()

          print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")

          執(zhí)行上面的代碼,結果如下

          -> 線程啟動
          -> 線程啟動
          全部任務執(zhí)行完成,耗時 0.0 秒
          -> 線程結束
          -> 線程結束

          可以看到,兩個子線程確實同時啟動,但是主線程并未等待兩個子線程執(zhí)行完畢就直接結束。

          為了解決這個問題,我們可以使用threading.join()方法,意思是在子線程完成運行之前,這個子線程的父線程將一直被阻塞

          換成人話就是讓主線程掛起,等待所有子線程結束再執(zhí)行,體現(xiàn)到代碼上也很簡單,只需要添加兩行即可

          import time
          import threading

          start = time.perf_counter()


          def do_something():
              print("-> 線程啟動")
              time.sleep(1)
              print("-> 線程結束")


          thread1 = threading.Thread(target=do_something)
          thread2 = threading.Thread(target=do_something)

          thread1.start() 
          thread2.start()

          thread1.join()
          thread2.join()

          finish = time.perf_counter()

          print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")

          運行結果如下,全部代碼在1秒內(nèi)運行完畢

          -> 線程啟動
          -> 線程啟動
          -> 線程結束
          -> 線程結束
          全部任務執(zhí)行完成,耗時 1.01 秒

          至此,我們就得到了第一個有效的多線程代碼,相信你也能大致明白threading的基本使用流程。

          傳遞參數(shù)

          現(xiàn)在來看看如何在多線程之間傳遞參數(shù),讓我們升級代碼:do_something函數(shù)來接受一個參數(shù),控制他睡眠等待的時間

          def do_something(num):
              print(f"-> 線程{num} 啟動,睡眠 {num} 秒")
              time.sleep(num)
              print(f"-> 線程{num} 結束")

          threading 中,創(chuàng)建線程時可以使用 args 來傳遞參數(shù),例如現(xiàn)在接收一個參數(shù),則上一小節(jié)的代碼可以如下修改

          import time
          import threading

          start = time.perf_counter()

          def do_something(num):
              print(f"-> 線程{num} 啟動,睡眠 {num} 秒")
              time.sleep(num)
              print(f"-> 線程{num} 結束")

          thread1 = threading.Thread(target=do_something,args = [1])
          thread2 = threading.Thread(target=do_something,args = [2])

          thread1.start()
          thread2.start()

          thread1.join()
          thread2.join()

          finish = time.perf_counter()

          print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")

          這段代碼中,我分別讓兩個線程等待1、2秒,運行結果顯然應該是2秒

          -> 線程1 啟動,睡眠 1 秒
          -> 線程2 啟動,睡眠 2 秒
          -> 線程1 結束
          -> 線程2 結束
          全部任務執(zhí)行完成,耗時 2.01 秒

          如果你的線程函數(shù)需要更多的參數(shù),只需要依次向args中追加即可。

          簡化代碼

          上面的案例中,我們僅開啟了兩個線程,如果是更多個線程的話,再依次重復定義、啟動就會顯得十分繁瑣,此時我們可以使用循環(huán)來處理。

          例如開啟10個線程,依次睡眠1-10秒,可以先創(chuàng)建一個 list 用于存儲每個線程,接著利用循環(huán)依次創(chuàng)建線程,啟動后追加到剛剛創(chuàng)建的 list 中,之后再依次等待每個線程執(zhí)行完畢,代碼如下

          import time
          import threading

          start = time.perf_counter()

          def do_something(num):
              print(f"-> 線程{num} 啟動,睡眠 {num} 秒")
              time.sleep(num)
              print(f"-> 線程{num} 結束")

          thread_list = []

          for i in range(1,11):

              thread = threading.Thread(target=do_something, args=[i])
              thread.start()
              thread_list.append(thread)

          for t in thread_list:

              t.join()

          finish = time.perf_counter()

          print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")

          結果是顯然的,雖然我們執(zhí)行了十次do_something,每次用時1-10秒,但總耗時應該為10秒

          -> 線程1 啟動,睡眠 1 秒
          -> 線程2 啟動,睡眠 2 秒
          -> 線程3 啟動,睡眠 3 秒
          -> 線程4 啟動,睡眠 4 秒
          -> 線程5 啟動,睡眠 5 秒
          -> 線程6 啟動,睡眠 6 秒
          -> 線程7 啟動,睡眠 7 秒
          -> 線程8 啟動,睡眠 8 秒
          -> 線程9 啟動,睡眠 9 秒
          -> 線程10 啟動,睡眠 10 秒
          -> 線程1 結束
          -> 線程2 結束
          -> 線程3 結束
          -> 線程4 結束
          -> 線程5 結束
          -> 線程6 結束
          -> 線程7 結束
          -> 線程8 結束
          -> 線程9 結束
          -> 線程10 結束
          全部任務執(zhí)行完成,耗時 10.01 秒

          共享變量鎖的問題

          現(xiàn)在,你應該已經(jīng)了解 threading 最基本的用法,只需要將 do_somthing 函數(shù)進行修改即可,但是如果你深入使用,還會有其他的問題出現(xiàn),例如共享變量的問題,讓我們繼續(xù)探討。

          多線程很常見的一個應用就是爬蟲,回到開頭的爬蟲問題,如果我們希望爬取10個網(wǎng)頁的評論,可能會先定一個空dataframe,然后使用多線程都往這個dataframe中寫入數(shù)據(jù),但由于多個線程同時操作這一個變量,可能會導致評論并不是按照順序?qū)懭氲摹?/p>

          例如第一個頁面有10條評論,第一個線程寫入了2條后,第二個線程將第二個頁面的前兩條寫入,最終導致十個頁面的評論是亂序存儲!

          讓我們把這個問題抽象出來,還是之前的代碼,稍微修改一下

          我們先定義了一個空list,線程函數(shù)會將傳入的數(shù)字添加到該list中,在未加鎖的情況下,由于線程競爭,雖然我們線程是按照順序開啟,但是最終數(shù)字并不是按照順序?qū)懭搿?/p>

          有沒有辦法解決呢?當然有,很自然的想法就是當?shù)谝粋€線程操作該變量時,其他線程等著,寫完了再釋放,這就是鎖!

          先看代碼

          在上面的代碼中,我們使用 threding.Lock 創(chuàng)建了一個線程鎖,之后在線程函數(shù)操作 result 前,首先使用 lock.acquire() 加上鎖,之后操作 results ,在修改完后使用 lock.relese() 釋放,此時其他線程若想操作 results 則會阻塞,等該線程釋放后才能拿走操作中,這樣我們就保證了線程是“安全的”!

          最基本的線程鎖用法就如上面代碼所示,定義鎖 --> 上鎖 --> 解鎖,但是一定要注意,lock.acquire()lock.relese()如果加了鎖但是沒有釋放,后面的線程將會全部阻塞!

          限制線程數(shù)量

          最后還有一個常見的問題,上面我們需要執(zhí)行幾次線程函數(shù)就開了幾個線程,但是如果需要爬成千上萬個網(wǎng)頁,開這么多線程cpu一定不同意,代碼也會在開啟的線程達到一定數(shù)量后報錯。

          所以如何讓程序只啟動我們指定的線程數(shù)量,例如一次開五個線程,結束一個再添加一個,直到全部任務完成?

          還是鎖!在 threading 模塊中有一個 BoundedSemaphore(信號量)類,我們可以給他一個初始的信號量(最大線程數(shù)),之后每次有線程獲得信號量的時候(即 acquire() )計數(shù)器-1,釋放信號量時候(release())計數(shù)器+1,計數(shù)器為0的時候其它線程就被阻塞無法獲得信號量。當計數(shù)器為設定好的上限的時候 BoundedSemaphore 就無法進行 release() 操作了。

          體現(xiàn)到代碼上則比較簡單,還是基于上面的例子修改

          總共需要運行十次,我們定義最大線程數(shù)為3,并在線程啟動前調(diào)用acquire方法增加一個計數(shù),在線程最后釋放。

          此時程序一次只能啟動三個線程,如圖中所示,首先啟動123,之后完成123,啟動456,當?shù)谒膫€線程結束啟動第七個線程······直到全部線程結束。

          這里我們同時使用了上一節(jié)說的線程鎖來保護變量,用 BoundedSemaphore 鎖來控制最大線程數(shù),在實際寫代碼時就需要小心檢查鎖是否正確釋放,否則就會報錯!

          一個真實的多線程爬蟲案例

          至此,threading 模塊最常見的用法就介紹完畢,現(xiàn)在讓我們回到本文一開始的問題,有多個(以十個為例)URL需要爬取,既然每個頁面需要執(zhí)行的操作一樣,如果等待一個頁面爬取完畢再爬第二頁面就太浪費時間了。這時就可以仿照上面的思路去使用多線程加速。

          我們只需要將上面的do_something函數(shù)修改為對也面的爬取操作,之后的創(chuàng)建啟動線程操作不變即可,代碼如下

          import time
          import threading
          import requests
          import pandas as pd
          from faker import Faker
          from bs4 import BeautifulSoup


          def craw_url(url):
              global df
              fake = Faker()
              headers = {'User-Agent': fake.user_agent()}
              r = requests.get(url, headers=headers)
              soup = BeautifulSoup(r.content, 'html.parser')
              review_list = soup.find_all(class_="main review-item")

              for i in range(len(review_list)):

                  rank = review_list[i].select('span')[0].get('title')
                  time1 = review_list[i].select('span')[1].get('content')
                  title = review_list[i].select('h2>a')[0].text
                  df = df.append({'時間': time1,
                                  '評分': rank,
                                  '標題': title, }, ignore_index=True)

              print("-> 爬取完成")


          if __name__ == '__main__':

              start = time.perf_counter()
              df = pd.DataFrame(columns=['時間''評分''標題'])

              url_list = [
                  'https://movie.douban.com/subject/1652587/reviews?sort=time&start=0',
                  'https://movie.douban.com/subject/1652587/reviews?sort=time&start=20',
                  'https://movie.douban.com/subject/1652587/reviews?sort=time&start=40',
                  'https://movie.douban.com/subject/1652587/reviews?sort=time&start=60',
                  'https://movie.douban.com/subject/1652587/reviews?sort=time&start=80',
                  'https://movie.douban.com/subject/1652587/reviews?sort=time&start=100',
                  'https://movie.douban.com/subject/1652587/reviews?sort=time&start=120',
                  'https://movie.douban.com/subject/1652587/reviews?sort=time&start=140',
                  'https://movie.douban.com/subject/1652587/reviews?sort=time&start=160',
                  'https://movie.douban.com/subject/1652587/reviews?sort=time&start=180']
              thread_list = []
              for i in url_list:

                  thread = threading.Thread(target=craw_url, args=[i])
                  thread.start()
                  thread_list.append(thread)

              for t in thread_list:

                  t.join()

              finish = time.perf_counter()
              
              print(f"全部任務執(zhí)行完成,耗時 {round(finish - start,2)} 秒")

          執(zhí)行這段代碼,差不多僅用了1秒就將全部內(nèi)容爬取并存儲到 dataframe 中,比同步的代碼塊了近十倍!如果感興趣的話可以自己嘗試一下。

          至此,有關 Python 多線程模塊 threading 的基本用法與需要注意的幾點就介紹完畢,如果全部認真看完的話,我相信你一定能照貓畫虎寫出第一個多線程爬蟲腳本。

          當然有關 Python 多線程還有很多飽受詬病的爭議(GIL),多線程的實現(xiàn)方法也遠不止 threading 模塊,例如更常見的寫法是通過 concurrent.futures 模塊以及多進程、協(xié)程,這些都留在本系列后續(xù)文章中再進一步討論!

          推薦閱讀


          牛逼!Python常用數(shù)據(jù)類型的基本操作(長文系列第①篇)

          牛逼!Python的判斷、循環(huán)和各種表達式(長文系列第②篇)

          牛逼!Python函數(shù)和文件操作(長文系列第③篇)

          牛逼!Python錯誤、異常和模塊(長文系列第④篇)

          瀏覽 26
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩爱爱爱爱 | 亚洲免费福利视频 | 亚洲三级在线播放 | 久久无码精品一区二区三区 | 91干在线视频 |