<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python Queue 進(jìn)階:多生產(chǎn)者&單消費(fèi)者問(wèn)題

          共 4093字,需瀏覽 9分鐘

           ·

          2021-07-05 15:54

          今天我們學(xué)習(xí)下 Queue 的進(jìn)階用法。

          生產(chǎn)者消費(fèi)者模型

          在并發(fā)編程中,比如爬蟲(chóng),有的線程負(fù)責(zé)爬取數(shù)據(jù),有的線程負(fù)責(zé)對(duì)爬取到的數(shù)據(jù)做處理(清洗、分類(lèi)和入庫(kù))。假如他們是直接交互的,那么當(dāng)二者的速度不匹配時(shí)勢(shì)必出現(xiàn)等待現(xiàn)象,這也就產(chǎn)生了資源的浪費(fèi)。

          抽象是一種很重要的通用能力,而生產(chǎn)者消費(fèi)者模型是前人將一系列同類(lèi)型的具體的問(wèn)題抽象出來(lái)的一個(gè)一致的最佳解決方案。

          該模型有三個(gè)重要角色,容器,生產(chǎn)者和消費(fèi)者,顧名思義,生產(chǎn)者就是負(fù)責(zé)生產(chǎn)數(shù)據(jù)或任務(wù)的,消費(fèi)者就是負(fù)責(zé)消費(fèi)數(shù)據(jù)或者任務(wù)的(下文統(tǒng)稱為任務(wù)),容器是二者進(jìn)行通訊的媒介。

          在該模型中,生產(chǎn)者和消費(fèi)者不在直接進(jìn)行通訊,而是通過(guò)引入一個(gè)第三者容器(通常都是用阻塞隊(duì)列)來(lái)達(dá)到解耦的目的。

          這樣生產(chǎn)者不必在因?yàn)橄M(fèi)者速度過(guò)慢而等待,直接將任務(wù)放入容器即可,消費(fèi)者也不必因生產(chǎn)者生產(chǎn)速度過(guò)慢而等待,直接從容器中獲取任務(wù),以此達(dá)到了資源的最大利用。

          使用該模型可以解決并發(fā)編程中的絕大部分并發(fā)問(wèn)題。

          簡(jiǎn)易版

          我們先寫(xiě)一個(gè)單生產(chǎn)者和單消費(fèi)者的簡(jiǎn)易版生產(chǎn)者消費(fèi)者模型。

          import threadingimport timeimport queue
          def consume(thread_name, q): while True: time.sleep(2) product = q.get() print("%s consume %s" % (thread_name, product))
          def produce(thread_name, q): for i in range(3): product = 'product-' + str(i) q.put(product) print("%s produce %s" % (thread_name, product)) time.sleep(1) q = queue.Queue()p = threading.Thread(target=produce, args=("producer",q))c = threading.Thread(target=consume, args=("consumer",q))
          p.start()c.start()
          p.join()
          # 輸出如下producer produce product-0producer produce product-1consumer consume product-0producer produce product-2consumer consume product-1consumer consume product-2...

          以上就是最簡(jiǎn)單的生產(chǎn)者消費(fèi)者模型了,生產(chǎn)者生產(chǎn)三個(gè)任務(wù)供消費(fèi)者消費(fèi)。但是上面的寫(xiě)法有個(gè)問(wèn)題,就是生產(chǎn)者將任務(wù)生產(chǎn)完畢之后就和主線程一起退出了,但是消費(fèi)者將所有的任務(wù)消費(fèi)完之后還沒(méi)停止,一直處于阻塞狀態(tài)。

          那可不可以將 while True 的判斷改為 while not q.empty()呢,肯定是不行的。因?yàn)?empty() 返回 False ,不保證后續(xù)調(diào)用的 get()不被阻塞。

          同時(shí),如果用 empty() 函數(shù)來(lái)做判斷的話,那么就要保證消費(fèi)者線程開(kāi)啟之時(shí)生產(chǎn)者一定至少生產(chǎn)了一個(gè)任務(wù),否則消費(fèi)者線程就會(huì)因條件不滿足直接退出程序;同時(shí)如果生產(chǎn)者生產(chǎn)速度比較慢,一旦消費(fèi)者將任務(wù)消費(fèi)完且下次判斷時(shí)還沒(méi)有新的任務(wù)入隊(duì),那么消費(fèi)者線程也會(huì)因條件不滿足直接退出程序。自此以后,生產(chǎn)者生產(chǎn)的任務(wù)就永遠(yuǎn)不會(huì)被消費(fèi)了。

          那我們可以做一個(gè)約定,當(dāng)生產(chǎn)者生產(chǎn)完任務(wù)之后,放入一個(gè)標(biāo)志,類(lèi)似于 q.put(None),一旦消費(fèi)者接收到為 None 的任務(wù)時(shí)就意味著結(jié)束,直接退出程序即可。

          這種做法在上面的程序中是沒(méi)有問(wèn)題的,唯一的缺點(diǎn)就是有 N 個(gè)消費(fèi)者線程就需要放入 N 個(gè) None 標(biāo)志,這對(duì)于多消費(fèi)者類(lèi)型的程序顯然是很不友好的。

          最佳實(shí)踐

          我們可以結(jié)合隊(duì)列的內(nèi)置函數(shù) task_done() 和 join() 來(lái)達(dá)到我們的目的。

          join() 函數(shù)是阻塞的。當(dāng)消費(fèi)者通過(guò) get() 從隊(duì)列獲取一項(xiàng)任務(wù)并處理完成之后,需要調(diào)用且只可以調(diào)用一次 task_done(),該方法會(huì)給隊(duì)列發(fā)送一個(gè)信號(hào),join()函數(shù)則在監(jiān)聽(tīng)這個(gè)信號(hào)。

          可以簡(jiǎn)單理解為隊(duì)列內(nèi)部維護(hù)了一個(gè)計(jì)數(shù)器,該計(jì)數(shù)器標(biāo)識(shí)未完成的任務(wù)數(shù),每當(dāng)添加任務(wù)時(shí),計(jì)數(shù)器會(huì)增加,調(diào)用 task_done()時(shí)計(jì)數(shù)器則會(huì)減少,直到隊(duì)列為空。而 join() 就是在監(jiān)聽(tīng)隊(duì)列是否為空,一旦條件滿足則結(jié)束阻塞狀態(tài)。

          import threadingimport timeimport queue
          def consume(thread_name, q): while True: time.sleep(2) product = q.get() print("%s consume %s" % (thread_name, product)) q.task_done()
          def produce(thread_name, q): for i in range(3): product = 'product-' + str(i) q.put(product) print("%s produce %s" % (thread_name, product)) time.sleep(1) q.join() q = queue.Queue()p = threading.Thread(target=produce, args=("producer",q))c = threading.Thread(target=consume, args=("consumer",q))c1 = threading.Thread(target=consume, args=("consumer-1",q))
          c.setDaemon(True)c1.setDaemon(True)p.start()c.start()c1.start()
          p.join()
          # 輸出如下producer produce product-0producer produce product-1consumer-1 consume product-0consumer consume product-1producer produce product-2consumer consume product-2

          上述示例中,我們將消費(fèi)者線程設(shè)置為守護(hù)線程,這樣當(dāng)主線程結(jié)束時(shí)消費(fèi)者線程也會(huì)一并結(jié)束。然后主線程最后一句 p.join() 又表示主線程必須等待生產(chǎn)者線程結(jié)束后才可以結(jié)束。

          再細(xì)看生產(chǎn)者線程的主函數(shù) produce(),該函數(shù)中出現(xiàn)了我們上面說(shuō)過(guò)的 q.join() 函數(shù)。而 task_done 則是在消費(fèi)者線程的主函數(shù)中調(diào)用的。

          故當(dāng)生產(chǎn)者線程生產(chǎn)完所有任務(wù)后就會(huì)被阻塞,只有當(dāng)消費(fèi)者線程處理完所有任務(wù)后生產(chǎn)者才會(huì)阻塞結(jié)束。隨著生產(chǎn)者線程的結(jié)束,主線程也一并結(jié)束,守護(hù)線程消費(fèi)者線程也一并結(jié)束,自此所有線程均安全退出。

          Queue 總結(jié)

          本章節(jié)介紹了隊(duì)列的高級(jí)應(yīng)用,從簡(jiǎn)易版的示例到最佳實(shí)踐,介紹了生產(chǎn)者消費(fèi)者模型的基本用法,在該模型中,隊(duì)列扮演了非常重要的角色,起到了解耦的目的。

          本模型有固定的步驟,其中最重要的就是通過(guò) task_done() 和 join() 來(lái)互相通信。task_done() 僅僅用來(lái)通知隊(duì)列消費(fèi)者已完成一個(gè)任務(wù),至于任務(wù)是什么它毫不關(guān)心,它只關(guān)心隊(duì)列中未完成的任務(wù)數(shù)量。

          注意:task_done() 不可以在 put() 之前調(diào)用,否則會(huì)引發(fā) ValueError: task_done() called too many times。同時(shí)在處理完任務(wù)后只可以調(diào)用一次該函數(shù),否則隊(duì)列將不能準(zhǔn)確計(jì)算未完成任務(wù)數(shù)量。

          參考資料

          https://stackoverflow.com/questions/1593299/python-queue-get-task-done-issue

          https://www.ibm.com/developerworks/cn/aix/library/au-threadingpython/index.html

          Python貓技術(shù)交流群開(kāi)放啦!群里既有國(guó)內(nèi)一二線大廠在職員工,也有國(guó)內(nèi)外高校在讀學(xué)生,既有十多年碼齡的編程老鳥(niǎo),也有中小學(xué)剛剛?cè)腴T(mén)的新人,學(xué)習(xí)氛圍良好!想入群的同學(xué),請(qǐng)?jiān)诠?hào)內(nèi)回復(fù)『交流群』,獲取貓哥的微信(謝絕廣告黨,非誠(chéng)勿擾!)~


          還不過(guò)癮?試試它們




          Python 的縮進(jìn)是不是反人類(lèi)的設(shè)計(jì)?

          Python 經(jīng)典面試題:并發(fā)場(chǎng)景的生產(chǎn)消費(fèi)者模式

          冗長(zhǎng)的 Python 代碼,如何重構(gòu)?

          把Redis當(dāng)作隊(duì)列來(lái)用,真的合適嗎?

          Python 之父爆料:明年至少令 Python 提速 1 倍!

          為什么 Python 沒(méi)有函數(shù)重載?如何用裝飾器實(shí)現(xiàn)函數(shù)重載?


          如果你覺(jué)得本文有幫助
          請(qǐng)慷慨分享點(diǎn)贊,感謝啦
          瀏覽 103
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美 日韩 另类 激情 精品 | 无码在线免费看 | 青青草亚洲最新 | 免费一级做a爰片性视频 | 色老板最新凹凸视频 |