Python 經(jīng)典面試題:并發(fā)場(chǎng)景的生產(chǎn)消費(fèi)者模式
??“Python貓” ,一個(gè)值得加星標(biāo)的公眾號(hào)

原標(biāo)題:Python | 面試的常客,經(jīng)典的生產(chǎn)消費(fèi)者模式
今天的文章,我們來(lái)聊聊關(guān)于多線程的一個(gè)經(jīng)典設(shè)計(jì)模式。
在之前的文章當(dāng)中我們?cè)?jīng)說(shuō)道,在多線程并發(fā)的場(chǎng)景當(dāng)中,如果我們需要感知線程之間的狀態(tài),交換線程之間的信息是一件非常復(fù)雜和困難的事情。因?yàn)槲覀儧](méi)有更高級(jí)的系統(tǒng)權(quán)限,也沒(méi)有上帝視角,很難知道目前運(yùn)行的狀態(tài)的全貌,所以想要設(shè)計(jì)出一個(gè)穩(wěn)健運(yùn)行沒(méi)有bug的功能,不僅非常困難,而且調(diào)試起來(lái)非常麻煩。
生產(chǎn)消費(fèi)者模式
在日常開(kāi)發(fā)當(dāng)中,從一個(gè)線程向另外的線程傳輸數(shù)據(jù)又是一件家常便飯的事情。舉個(gè)最簡(jiǎn)單的例子,我們?cè)谔幚砭W(wǎng)頁(yè)請(qǐng)求的時(shí)候,需要打印下來(lái)這一次請(qǐng)求的相關(guān)日志。打印日志是一次IO行為,這是非常消耗時(shí)間的,所以我們不能放在請(qǐng)求當(dāng)中同步進(jìn)行,否則會(huì)影響系統(tǒng)的性能。最好的辦法就是啟動(dòng)一系列線程專門負(fù)責(zé)打印,后端的線程只負(fù)責(zé)響應(yīng)請(qǐng)求,相關(guān)的日志以消息的形式傳送給打印線程打印。
這個(gè)簡(jiǎn)單的不能再簡(jiǎn)單的功能當(dāng)中涉及了諸多細(xì)節(jié),我們來(lái)盤點(diǎn)幾個(gè)。首先IO線程的數(shù)據(jù)都是從后臺(tái)線程來(lái)的,假如一段時(shí)間內(nèi)沒(méi)有請(qǐng)求,那么這些線程都應(yīng)該休眠,應(yīng)該在有請(qǐng)求的時(shí)候才會(huì)啟動(dòng)。其次,如果某一段時(shí)間內(nèi)請(qǐng)求非常多,導(dǎo)致IO線程一時(shí)間來(lái)不及打印所有的數(shù)據(jù),那么當(dāng)下的請(qǐng)求應(yīng)該先暫存起來(lái),等IO線程”忙過(guò)來(lái)“之后再進(jìn)行處理。
把這些細(xì)節(jié)都考慮到,自己來(lái)設(shè)計(jì)功能還是挺麻煩的。好在這個(gè)問(wèn)題前人已經(jīng)替我們想過(guò)了,并且得出了一個(gè)非常經(jīng)典的設(shè)計(jì)模式,使用它可以很好的解決這個(gè)問(wèn)題。這個(gè)模式就是生產(chǎn)消費(fèi)者模式。
這個(gè)設(shè)計(jì)模式的原理其實(shí)非常簡(jiǎn)單,我們來(lái)看張圖就明白了。

線程根據(jù)和數(shù)據(jù)的關(guān)系分為生產(chǎn)者線程和消費(fèi)者線程,其中生產(chǎn)者線程負(fù)責(zé)生產(chǎn)數(shù)據(jù),產(chǎn)生了數(shù)據(jù)之后會(huì)存儲(chǔ)到任務(wù)隊(duì)列當(dāng)中。消費(fèi)者線程從這個(gè)隊(duì)列獲取需要消費(fèi)的數(shù)據(jù),它和生產(chǎn)者線程之間不會(huì)直接交互,避免了線程之間互相依賴的問(wèn)題。
另外一個(gè)細(xì)節(jié)是這里的任務(wù)隊(duì)列并不是普通的隊(duì)列,一般情況下是一個(gè)阻塞隊(duì)列。也就是說(shuō)當(dāng)消費(fèi)者線程嘗試從其中獲取數(shù)據(jù)的時(shí)候,如果隊(duì)列是空的,那么這些消費(fèi)者線程會(huì)自動(dòng)掛起等待,直到它獲得了數(shù)據(jù)為止。有阻塞隊(duì)列當(dāng)然也有非阻塞隊(duì)列,如果是非阻塞隊(duì)列的話,當(dāng)我們嘗試從其中獲取數(shù)據(jù)的時(shí)候,如果它當(dāng)中沒(méi)有數(shù)據(jù)的話,并不會(huì)掛起等待,而是會(huì)返回一個(gè)空值。
當(dāng)然阻塞隊(duì)列的掛起等待時(shí)間也是可以設(shè)置的,我們可以讓它一直等待下去,也可以設(shè)置一個(gè)最長(zhǎng)等待時(shí)間。如果超過(guò)這個(gè)時(shí)間也會(huì)返回空,不同的隊(duì)列應(yīng)用在不同的場(chǎng)景當(dāng)中,我們需要根據(jù)場(chǎng)景性質(zhì)做出調(diào)整。
代碼實(shí)現(xiàn)
看完了設(shè)計(jì)模式的原理,我們下面來(lái)試著用代碼來(lái)實(shí)現(xiàn)一下。
在一般的高級(jí)語(yǔ)言當(dāng)中都有現(xiàn)成的隊(duì)列的庫(kù),由于在生產(chǎn)消費(fèi)者模式當(dāng)中用到的是阻塞型queue,有阻塞性的隊(duì)列當(dāng)然也就有非阻塞型的隊(duì)列。我們?cè)谟弥靶枰攘私馇宄绻缅e(cuò)了隊(duì)列會(huì)導(dǎo)致整個(gè)程序出現(xiàn)問(wèn)題。在Python當(dāng)中,我們最常用的queue就是一個(gè)支持多線程場(chǎng)景的阻塞隊(duì)列,所以我們直接拿來(lái)用就好了。
由于這個(gè)設(shè)計(jì)模式非常簡(jiǎn)單,這個(gè)代碼并不長(zhǎng)只有幾行:
from?queue?import?Queue
from?threading?import?Thread
def?producer(que):
????data?=?0
????while?True:
????????data?+=?1
????????que.put(data)
????????
def?consumer(que):
????while?True:
????????data?=?que.get()
????????print(data)
????????
????????
que?=?Queue()
t1?=?Thread(target=consumer,?args=(que,?))
t2?=?Thread(target=producer,?args=(que,?))
t1.start()
t2.start()
我們運(yùn)行一下就會(huì)發(fā)現(xiàn)它是可行的,并且由于隊(duì)列先進(jìn)先出的限制,可以保證了consumer線程讀取到的內(nèi)容的順序和producer生產(chǎn)的順序是一致的。
如果我們運(yùn)行一下這個(gè)代碼會(huì)發(fā)現(xiàn)它是不會(huì)結(jié)束的,因?yàn)閏onsumer和producer當(dāng)中都用到了while True構(gòu)建的死循環(huán),假設(shè)我們希望可以控制程序的結(jié)束,應(yīng)該怎么辦?
其實(shí)也很簡(jiǎn)單,我們也可以利用隊(duì)列。我們創(chuàng)建一個(gè)特殊的信號(hào)量,約定好當(dāng)consumer接受到這個(gè)特殊值的時(shí)候就停止程序。這樣當(dāng)我們要結(jié)束程序的時(shí)候,我們只需要把這個(gè)信號(hào)量加入隊(duì)列即可。
singal?=?object()
def?producer(que):
????data?=?0
????while?data?20:
????????data?+=?1
????????que.put(data)
????que.put(singal)
????????
def?consumer(que):
????while?True:
????????data?=?que.get()
????????if?data?is?singal:
????????????#?繼續(xù)插入singal
????????????que.put(singal)
????????????break
????????print(data)
這里有一個(gè)細(xì)節(jié)是我們?cè)赾onsumer當(dāng)中,當(dāng)讀取到singal的時(shí)候,在跳出循環(huán)之前我們又把singal放回了隊(duì)列。原因也很簡(jiǎn)單,因?yàn)橛袝r(shí)候consumer線程不止一個(gè),這個(gè)singal上游只放置了一個(gè),只會(huì)被一個(gè)線程讀取進(jìn)來(lái),其他線程并不會(huì)知道已經(jīng)獲得了singal的消息,所以還是會(huì)繼續(xù)執(zhí)行。
而當(dāng)consumer關(guān)閉之前放入singal就可以保證每一個(gè)consumer在關(guān)閉的之前都會(huì)再傳遞一個(gè)結(jié)束的信號(hào)給其他未關(guān)閉的consumer讀取。這樣一個(gè)一個(gè)的傳遞,就可以保證所有consumer都關(guān)閉。
這里還有一個(gè)小細(xì)節(jié),雖然利用隊(duì)列可以解決生產(chǎn)者和消費(fèi)者通信的問(wèn)題,但是上游的生產(chǎn)者并不知道下游的消費(fèi)者是否已經(jīng)執(zhí)行完成了。假如我們想要知道,應(yīng)該怎么辦?
Python的設(shè)計(jì)者們也考慮到了這個(gè)問(wèn)題,所以他們?cè)赒ueue這個(gè)類當(dāng)中加入了task_done和join方法。利用task_done,消費(fèi)者可以通知queue這一個(gè)任務(wù)已經(jīng)執(zhí)行完成了。而通過(guò)調(diào)用join,可以等待所有的consumer完成。
from?queue?import?Queue
from?threading?import?Thread
def?producer(que):
????data?=?0
????while?data?20:
????????data?+=?1
????????que.put(data)
????????
def?consumer(que):
????while?True:
????????data?=?que.get()
????????print(data)
????????que.task_done()
????????
????????
que?=?Queue()
t1?=?Thread(target=consumer,?args=(que,?))
t2?=?Thread(target=producer,?args=(que,?))
t1.start()
t2.start()
que.join()
除了使用task_done之外,我們還可以在que傳遞的消息當(dāng)中加入一個(gè)Event,這樣我們還可以繼續(xù)感知到每一個(gè)Event執(zhí)行的情況。
優(yōu)先隊(duì)列與其他設(shè)置
我們之前在介紹一些分布式調(diào)度系統(tǒng)的時(shí)候曾經(jīng)說(shuō)到過(guò),在調(diào)度系統(tǒng)當(dāng)中,調(diào)度者會(huì)用一個(gè)優(yōu)先隊(duì)列來(lái)管理所有的任務(wù)。當(dāng)有機(jī)器空閑的時(shí)候,會(huì)有限調(diào)度那些優(yōu)先級(jí)高的任務(wù)。
其實(shí)這個(gè)調(diào)度系統(tǒng)也是基于我們剛才介紹的生產(chǎn)消費(fèi)者模型開(kāi)發(fā)的,只不過(guò)將調(diào)度隊(duì)列從普通隊(duì)列換成了優(yōu)先隊(duì)列而已。所以如果我們也希望我們的consumer能夠根據(jù)任務(wù)的優(yōu)先級(jí)來(lái)改變執(zhí)行順序的話,也可以使用優(yōu)先隊(duì)列來(lái)進(jìn)行管理任務(wù)。
關(guān)于優(yōu)先隊(duì)列的實(shí)現(xiàn)我們已經(jīng)很熟悉了,但是有一個(gè)問(wèn)題是我們需要實(shí)現(xiàn)掛起等待的阻塞功能。這個(gè)我們自己實(shí)現(xiàn)是比較麻煩的,但好在我們可以通過(guò)調(diào)用相關(guān)的庫(kù)來(lái)實(shí)現(xiàn)。比如threading中的Condition,Condition是一個(gè)條件變量可以通知其他線程,也可以實(shí)現(xiàn)掛起等待。
from?threading?import?Thread,?Condition
class?PriorityQueue:
????def?__init__(self):
????????self._queue?=?[]
????????self._cv?=?Condition()
????????
????def?put(self,?item,?priority):
????????with?self._cv:
????????????heapq.heappush(self._queue,?(-priority,?self._count,?item))
????????????#?通知下游,喚醒wait狀態(tài)的線程
????????????self._cv.notify()
????def?get(self):
????????with?self._cv:
????????????#?如果對(duì)列為空則掛起
????????????while?len(self._queue)?==?0:
????????????????self._cv.wait()
????????????#?否則返回優(yōu)先級(jí)最大的
????????????return?heapq.heappop(self._queue)[-1]
最后介紹一下Queue的其他設(shè)置,比如我們可以通過(guò)size參數(shù)設(shè)置隊(duì)列的大小,由于這是一個(gè)阻塞式隊(duì)列,所以如果我們?cè)O(shè)置了隊(duì)列的大小,那么當(dāng)隊(duì)列被裝滿的時(shí)候,往其中插入數(shù)據(jù)的操作也會(huì)被阻塞。此時(shí)producer線程會(huì)被掛起,一直到隊(duì)列不再滿為止。
當(dāng)然我們也可以通過(guò)block參數(shù)將隊(duì)列的操作設(shè)置成非阻塞。比如que.get(block=False),那么當(dāng)隊(duì)列為空的時(shí)候,將會(huì)拋出一個(gè)隊(duì)列為空的異常。同樣,que.put(data, block=False)時(shí)也一樣會(huì)得到一個(gè)隊(duì)列已滿的異常。
總結(jié)
今天這篇文章當(dāng)中我們主要介紹了多線程場(chǎng)景中經(jīng)典的生產(chǎn)消費(fèi)者模式,這個(gè)模式在許多場(chǎng)景當(dāng)中都有使用。比如kafka等消息系統(tǒng),以及yarn等調(diào)度系統(tǒng)等等,幾乎只要是涉及到多線程上下游通信的,往往都會(huì)用到。也正因此它的使用場(chǎng)景太廣了,所以它經(jīng)常在各種面試當(dāng)中出現(xiàn),也可以認(rèn)為是工程師必須知道的幾種基礎(chǔ)設(shè)計(jì)模式之一。
另外,隊(duì)列也是一個(gè)在設(shè)計(jì)模式以及使用場(chǎng)景當(dāng)中經(jīng)常出現(xiàn)的數(shù)據(jù)結(jié)構(gòu)。從側(cè)面也說(shuō)明了,為什么算法和數(shù)據(jù)結(jié)構(gòu)非常重要,許多大公司喜歡問(wèn)一些算法題,也是因?yàn)?strong style="color: rgb(71, 193, 168);">有實(shí)際的使用場(chǎng)景,并且的的確確能鍛煉工程師的思維能力。經(jīng)常有同學(xué)問(wèn)我算法和數(shù)據(jù)結(jié)構(gòu)的使用案例,這就是一個(gè)很好的例子。
今天的文章到這里就結(jié)束了,如果喜歡本文的話,請(qǐng)來(lái)一波素質(zhì)三連,給我一點(diǎn)支持吧(關(guān)注、在看、點(diǎn)贊)。

優(yōu)質(zhì)文章,推薦閱讀:
Python——gRPC詳解及實(shí)戰(zhàn)避坑方案(上)

