<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python 多線程、多進(jìn)程最全整理

          共 25480字,需瀏覽 51分鐘

           ·

          2022-03-10 15:57

          在下方公眾號后臺回復(fù):JGNB,可獲取杰哥原創(chuàng)的 PDF 手冊。

          作者:錢魏Way

          來源:https://www.biaodianfu.com/python-multi-thread-and-multi-process.html

          大家好,我是杰哥。

          在學(xué)習(xí)Python的過程中,有接觸到多線程編程相關(guān)的知識點,先前一直都沒有徹底的搞明白。今天準(zhǔn)備花一些時間,把里面的細(xì)節(jié)盡可能的梳理清楚。

          線程與進(jìn)程的區(qū)別

          進(jìn)程(process)和線程(thread)是操作系統(tǒng)的基本概念,但是它們比較抽象,不容易掌握。關(guān)于多進(jìn)程和多線程,教科書上最經(jīng)典的一句話是“進(jìn)程是資源分配的最小單位,線程是CPU調(diào)度的最小單位”。線程是程序中一個單一的順序控制流程。進(jìn)程內(nèi)一個相對獨(dú)立的、可調(diào)度的執(zhí)行單元,是系統(tǒng)獨(dú)立調(diào)度和分派CPU的基本單位指運(yùn)行中的程序的調(diào)度單位。在單個程序中同時運(yùn)行多個線程完成不同的工作,稱為多線程。

          進(jìn)程和線程區(qū)別

          進(jìn)程是資源分配的基本單位。所有與該進(jìn)程有關(guān)的資源,都被記錄在進(jìn)程控制塊PCB中。以表示該進(jìn)程擁有這些資源或正在使用它們。另外,進(jìn)程也是搶占處理機(jī)的調(diào)度單位,它擁有一個完整的虛擬地址空間。當(dāng)進(jìn)程發(fā)生調(diào)度時,不同的進(jìn)程擁有不同的虛擬地址空間,而同一進(jìn)程內(nèi)的不同線程共享同一地址空間。

          與進(jìn)程相對應(yīng),線程與資源分配無關(guān),它屬于某一個進(jìn)程,并與進(jìn)程內(nèi)的其他線程一起共享進(jìn)程的資源。線程只由相關(guān)堆棧(系統(tǒng)棧或用戶棧)寄存器和線程控制表TCB組成。寄存器可被用來存儲線程內(nèi)的局部變量,但不能存儲其他線程的相關(guān)變量。

          通常在一個進(jìn)程中可以包含若干個線程,它們可以利用進(jìn)程所擁有的資源。在引入線程的操作系統(tǒng)中,通常都是把進(jìn)程作為分配資源的基本單位,而把線程作為獨(dú)立運(yùn)行和獨(dú)立調(diào)度的基本單位。

          由于線程比進(jìn)程更小,基本上不擁有系統(tǒng)資源,故對它的調(diào)度所付出的開銷就會小得多,能更高效的提高系統(tǒng)內(nèi)多個程序間并發(fā)執(zhí)行的程度,從而顯著提高系統(tǒng)資源的利用率和吞吐量。

          因而近年來推出的通用操作系統(tǒng)都引入了線程,以便進(jìn)一步提高系統(tǒng)的并發(fā)性,并把它視為現(xiàn)代操作系統(tǒng)的一個重要指標(biāo)。

          線程與進(jìn)程的區(qū)別可以歸納為以下4點:

          • 地址空間和其它資源(如打開文件):進(jìn)程間相互獨(dú)立,同一進(jìn)程的各線程間共享。某進(jìn)程內(nèi)的線程在其它進(jìn)程不可見。

          • 通信:進(jìn)程間通信IPC,線程間可以直接讀寫進(jìn)程數(shù)據(jù)段(如全局變量)來進(jìn)行通信——需要進(jìn)程同步和互斥手段的輔助,以保證數(shù)據(jù)的一致性。

          • 調(diào)度和切換:線程上下文切換比進(jìn)程上下文切換要快得多。

          • 在多線程OS中,進(jìn)程不是一個可執(zhí)行的實體。

          多進(jìn)程和多線程的比較

          對比維度多進(jìn)程多線程總結(jié)
          數(shù)據(jù)共享、同步數(shù)據(jù)共享復(fù)雜,同步簡單數(shù)據(jù)共享簡單,同步復(fù)雜各有優(yōu)劣
          內(nèi)存、CPU占用內(nèi)存多,切換復(fù)雜,CPU利用率低占用內(nèi)存少,切換簡單,CPU利用率高線程占優(yōu)
          創(chuàng)建、銷毀、切換復(fù)雜,速度慢簡單,速度快線程占優(yōu)
          編程、調(diào)試編程簡單,調(diào)試簡單編程復(fù)雜,調(diào)試復(fù)雜進(jìn)程占優(yōu)
          可靠性進(jìn)程間不會互相影響一個線程掛掉將導(dǎo)致整個進(jìn)程掛掉進(jìn)程占優(yōu)
          分布式適用于多核、多機(jī),擴(kuò)展到多臺機(jī)器簡單適合于多核進(jìn)程占優(yōu)

          總結(jié),進(jìn)程和線程還可以類比為火車和車廂:

          • 線程在進(jìn)程下行進(jìn)(單純的車廂無法運(yùn)行)

          • 一個進(jìn)程可以包含多個線程(一輛火車可以有多個車廂)

          • 不同進(jìn)程間數(shù)據(jù)很難共享(一輛火車上的乘客很難換到另外一輛火車,比如站點換乘)

          • 同一進(jìn)程下不同線程間數(shù)據(jù)很易共享(A車廂換到B車廂很容易)

          • 進(jìn)程要比線程消耗更多的計算機(jī)資源(采用多列火車相比多個車廂更耗資源)

          • 進(jìn)程間不會相互影響,一個線程掛掉將導(dǎo)致整個進(jìn)程掛掉(一列火車不會影響到另外一列火車,但是如果一列火車上中間的一節(jié)車廂著火了,將影響到該趟火車的所有車廂)

          • 進(jìn)程可以拓展到多機(jī),進(jìn)程最多適合多核(不同火車可以開在多個軌道上,同一火車的車廂不能在行進(jìn)的不同的軌道上)

          • 進(jìn)程使用的內(nèi)存地址可以上鎖,即一個線程使用某些共享內(nèi)存時,其他線程必須等它結(jié)束,才能使用這一塊內(nèi)存。(比如火車上的洗手間)-”互斥鎖(mutex)”

          • 進(jìn)程使用的內(nèi)存地址可以限定使用量(比如火車上的餐廳,最多只允許多少人進(jìn)入,如果滿了需要在門口等,等有人出來了才能進(jìn)去)-“信號量(semaphore)”

          Python全局解釋器鎖GIL

          全局解釋器鎖(英語:Global Interpreter Lock,縮寫GIL),并不是Python的特性,它是在實現(xiàn)Python解析器(CPython)時所引入的一個概念。由于CPython是大部分環(huán)境下默認(rèn)的Python執(zhí)行環(huán)境。所以在很多人的概念里CPython就是Python,也就想當(dāng)然的把GIL歸結(jié)為Python語言的缺陷。那么CPython實現(xiàn)中的GIL又是什么呢?來看看官方的解釋:

          The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.

          Python代碼的執(zhí)行由Python 虛擬機(jī)(也叫解釋器主循環(huán),CPython版本)來控制,Python 在設(shè)計之初就考慮到要在解釋器的主循環(huán)中,同時只有一個線程在執(zhí)行,即在任意時刻,只有一個線程在解釋器中運(yùn)行。對Python 虛擬機(jī)的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運(yùn)行。

          GIL 有什么好處?簡單來說,它在單線程的情況更快,并且在和 C 庫結(jié)合時更方便,而且不用考慮線程安全問題,這也是早期 Python 最常見的應(yīng)用場景和優(yōu)勢。另外,GIL的設(shè)計簡化了CPython的實現(xiàn),使得對象模型,包括關(guān)鍵的內(nèi)建類型如字典,都是隱含可以并發(fā)訪問的。鎖住全局解釋器使得比較容易的實現(xiàn)對多線程的支持,但也損失了多處理器主機(jī)的并行計算能力。

          在多線程環(huán)境中,Python 虛擬機(jī)按以下方式執(zhí)行:

          1.設(shè)置GIL
          2.切換到一個線程去運(yùn)行
          3.運(yùn)行直至指定數(shù)量的字節(jié)碼指令,或者線程主動讓出控制(可以調(diào)用sleep(0))
          4.把線程設(shè)置為睡眠狀態(tài)
          5.解鎖GIL
          6.再次重復(fù)以上所有步驟

          Python3.2前,GIL的釋放邏輯是當(dāng)前線程遇見IO操作或者ticks計數(shù)達(dá)到100(ticks可以看作是python自身的一個計數(shù)器,專門做用于GIL,每次釋放后歸零,這個計數(shù)可以通過 sys.setcheckinterval 來調(diào)整),進(jìn)行釋放。因為計算密集型線程在釋放GIL之后又會立即去申請GIL,并且通常在其它線程還沒有調(diào)度完之前它就已經(jīng)重新獲取到了GIL,就會導(dǎo)致一旦計算密集型線程獲得了GIL,那么它在很長一段時間內(nèi)都將占據(jù)GIL,甚至一直到該線程執(zhí)行結(jié)束。

          Python 3.2開始使用新的GIL。新的GIL實現(xiàn)中用一個固定的超時時間來指示當(dāng)前的線程放棄全局鎖。在當(dāng)前線程保持這個鎖,且其他線程請求這個鎖時,當(dāng)前線程就會在5毫秒后被強(qiáng)制釋放該鎖。該改進(jìn)在單核的情況下,對于單個線程長期占用GIL的情況有所好轉(zhuǎn)。

          在單核CPU上,數(shù)百次的間隔檢查才會導(dǎo)致一次線程切換。在多核CPU上,存在嚴(yán)重的線程顛簸(thrashing)。而每次釋放GIL鎖,線程進(jìn)行鎖競爭、切換線程,會消耗資源。單核下多線程,每次釋放GIL,喚醒的那個線程都能獲取到GIL鎖,所以能夠無縫執(zhí)行,但多核下,CPU0釋放GIL后,其他CPU上的線程都會進(jìn)行競爭,但GIL可能會馬上又被CPU0拿到,導(dǎo)致其他幾個CPU上被喚醒后的線程會醒著等待到切換時間后又進(jìn)入待調(diào)度狀態(tài),這樣會造成線程顛簸(thrashing),導(dǎo)致效率更低。

          另外,從上面的實現(xiàn)機(jī)制可以推導(dǎo)出,Python的多線程對IO密集型代碼要比CPU密集型代碼更加友好。

          針對GIL的應(yīng)對措施:

          • 使用更高版本Python(對GIL機(jī)制進(jìn)行了優(yōu)化)

          • 使用多進(jìn)程替換多線程(多進(jìn)程之間沒有GIL,但是進(jìn)程本身的資源消耗較多)

          • 指定cpu運(yùn)行線程(使用affinity模塊)

          • 使用Jython、IronPython等無GIL解釋器

          • 全I(xiàn)O密集型任務(wù)時才使用多線程

          • 使用協(xié)程(高效的單線程模式,也稱微線程;通常與多進(jìn)程配合使用)

          • 將關(guān)鍵組件用C/C++編寫為Python擴(kuò)展,通過ctypes使Python程序直接調(diào)用C語言編譯的動態(tài)鏈接庫的導(dǎo)出函數(shù)。(with nogil調(diào)出GIL限制)

          Python的多進(jìn)程包multiprocessing

          Python的threading包主要運(yùn)用多線程的開發(fā),但由于GIL的存在,Python中的多線程其實并不是真正的多線程,如果想要充分地使用多核CPU的資源,大部分情況需要使用多進(jìn)程。在Python 2.6版本的時候引入了multiprocessing包,它完整的復(fù)制了一套threading所提供的接口方便遷移。唯一的不同就是它使用了多進(jìn)程而不是多線程。每個進(jìn)程有自己的獨(dú)立的GIL,因此也不會出現(xiàn)進(jìn)程之間的GIL爭搶。

          借助這個multiprocessing,你可以輕松完成從單進(jìn)程到并發(fā)執(zhí)行的轉(zhuǎn)換。multiprocessing支持子進(jìn)程、通信和共享數(shù)據(jù)、執(zhí)行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。

          Multiprocessing產(chǎn)生的背景

          除了應(yīng)對Python的GIL以外,產(chǎn)生multiprocessing的另外一個原因時Windows操作系統(tǒng)與Linux/Unix系統(tǒng)的不一致。

          Unix/Linux操作系統(tǒng)提供了一個fork()系統(tǒng)調(diào)用,它非常特殊。普通的函數(shù),調(diào)用一次,返回一次,但是fork()調(diào)用一次,返回兩次,因為操作系統(tǒng)自動把當(dāng)前進(jìn)程(父進(jìn)程)復(fù)制了一份(子進(jìn)程),然后,分別在父進(jìn)程和子進(jìn)程內(nèi)返回。子進(jìn)程永遠(yuǎn)返回0,而父進(jìn)程返回子進(jìn)程的ID。這樣做的理由是,一個父進(jìn)程可以fork出很多子進(jìn)程,所以,父進(jìn)程要記下每個子進(jìn)程的ID,而子進(jìn)程只需要調(diào)用getpid()就可以拿到父進(jìn)程的ID。

          Python的os模塊封裝了常見的系統(tǒng)調(diào)用,其中就包括fork,可以在Python程序中輕松創(chuàng)建子進(jìn)程:

          import?os

          print('Process?(%s)?start...'?%?os.getpid())

          \#?Only?works?on?Unix/Linux/Mac:

          pid?=?os.fork()

          if?pid?==?0:

          ????print('I?am?child?process?(%s)?and?my?parent?is?%s.'?%?(os.getpid(),?os.getppid()))

          else:

          ????print('I?(%s)?just?created?a?child?process?(%s).'?%?(os.getpid(),?pid))

          上述代碼在Linux、Unix和Mac上的執(zhí)行結(jié)果為:

          Process?(876)?start...

          I?(876)?just?created?a?child?process?(877).

          I?am?child?process?(877)?and?my?parent?is?876.

          有了fork調(diào)用,一個進(jìn)程在接到新任務(wù)時就可以復(fù)制出一個子進(jìn)程來處理新任務(wù),常見的Apache服務(wù)器就是由父進(jìn)程監(jiān)聽端口,每當(dāng)有新的http請求時,就fork出子進(jìn)程來處理新的http請求。

          由于Windows沒有fork調(diào)用,上面的代碼在Windows上無法運(yùn)行。由于Python是跨平臺的,自然也應(yīng)該提供一個跨平臺的多進(jìn)程支持。multiprocessing模塊就是跨平臺版本的多進(jìn)程模塊。multiprocessing模塊封裝了fork()調(diào)用,使我們不需要關(guān)注fork()的細(xì)節(jié)。由于Windows沒有fork調(diào)用,因此,multiprocessing需要“模擬”出fork的效果。

          multiprocessing常用組件及功能

          創(chuàng)建管理進(jìn)程模塊:

          • Process(用于創(chuàng)建進(jìn)程)

          • Pool(用于創(chuàng)建管理進(jìn)程池)

          • Queue(用于進(jìn)程通信,資源共享)

          • Value,Array(用于進(jìn)程通信,資源共享)

          • Pipe(用于管道通信)

          • Manager(用于資源共享)

          同步子進(jìn)程模塊:

          • Condition(條件變量)

          • Event(事件)

          • Lock(互斥鎖)

          • RLock(可重入的互斥鎖(同一個進(jìn)程可以多次獲得它,同時不會造成阻塞)

          • Semaphore(信號量)

          接下來就一起來學(xué)習(xí)下每個組件及功能的具體使用方法。

          Process(用于創(chuàng)建進(jìn)程)

          multiprocessing模塊提供了一個Process類來代表一個進(jìn)程對象。

          在multiprocessing中,每一個進(jìn)程都用一個Process類來表示。

          構(gòu)造方法:Process([group [, target [, name [, args [, kwargs]]]]])

          • group:分組,實際上不使用,值始終為None

          • target:表示調(diào)用對象,即子進(jìn)程要執(zhí)行的任務(wù),你可以傳入方法名

          • name:為子進(jìn)程設(shè)定名稱

          • args:要傳給target函數(shù)的位置參數(shù),以元組方式進(jìn)行傳入。

          • kwargs:要傳給target函數(shù)的字典參數(shù),以字典方式進(jìn)行傳入。

          實例方法:

          • start():啟動進(jìn)程,并調(diào)用該子進(jìn)程中的p.run()

          • run():進(jìn)程啟動時運(yùn)行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實現(xiàn)該方法

          • terminate():強(qiáng)制終止進(jìn)程p,不會進(jìn)行任何清理操作,如果p創(chuàng)建了子進(jìn)程,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進(jìn)而導(dǎo)致死鎖

          • is_alive():返回進(jìn)程是否在運(yùn)行。如果p仍然運(yùn)行,返回True

          • join([timeout]):進(jìn)程同步,主進(jìn)程等待子進(jìn)程完成后再執(zhí)行后面的代碼。線程等待p終止(強(qiáng)調(diào):是主線程處于等的狀態(tài),而p是處于運(yùn)行的狀態(tài))。timeout是可選的超時時間(超過這個時間,父線程不再等待子線程,繼續(xù)往下執(zhí)行),需要強(qiáng)調(diào)的是,p.join只能join住start開啟的進(jìn)程,而不能join住run開啟的進(jìn)程

          屬性介紹:

          • daemon:默認(rèn)值為False,如果設(shè)為True,代表p為后臺運(yùn)行的守護(hù)進(jìn)程;當(dāng)p的父進(jìn)程終止時,p也隨之終止,并且設(shè)定為True后,p不能創(chuàng)建自己的新進(jìn)程;必須在p.start()之前設(shè)置

          • name:進(jìn)程的名稱

          • pid:進(jìn)程的pid

          • exitcode:進(jìn)程在運(yùn)行時為None、如果為–N,表示被信號N結(jié)束(了解即可)

          • authkey:進(jìn)程的身份驗證鍵,默認(rèn)是由os.urandom()隨機(jī)生成的32字符的字符串。這個鍵的用途是為涉及網(wǎng)絡(luò)連接的底層進(jìn)程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

          使用示例:(注意:在windows中Process()必須放到if name == ‘main’:下)

          from?multiprocessing?import?Process

          import?os

          def?run_proc(name):

          ????print('Run?child?process?%s?(%s)...'?%?(name,?os.getpid()))

          if?__name__=='__main__':

          ????print('Parent?process?%s.'?%?os.getpid())

          ????p?=?Process(target=run_proc,?args=('test',))

          ????print('Child?process?will?start.')

          ????p.start()

          ????p.join()

          print('Child?process?end.')

          Pool(用于創(chuàng)建管理進(jìn)程池)

          Pool類用于需要執(zhí)行的目標(biāo)很多,而手動限制進(jìn)程數(shù)量又太繁瑣時,如果目標(biāo)少且不用控制進(jìn)程數(shù)量則可以用Process類。Pool可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請求提交到Pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進(jìn)程用來執(zhí)行該請求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請求就會等待,直到池中有進(jìn)程結(jié)束,就重用進(jìn)程池中的進(jìn)程。

          構(gòu)造方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

          • processes :要創(chuàng)建的進(jìn)程數(shù),如果省略,將默認(rèn)使用cpu_count()返回的數(shù)量。

          • initializer:每個工作進(jìn)程啟動時要執(zhí)行的可調(diào)用對象,默認(rèn)為None。如果initializer是None,那么每一個工作進(jìn)程在開始的時候會調(diào)用initializer(*initargs)。

          • initargs:是要傳給initializer的參數(shù)組。

          • maxtasksperchild:工作進(jìn)程退出之前可以完成的任務(wù)數(shù),完成后用一個新的工作進(jìn)程來替代原進(jìn)程,來讓閑置的資源被釋放。maxtasksperchild默認(rèn)是None,意味著只要Pool存在工作進(jìn)程就會一直存活。

          • context: 用在制定工作進(jìn)程啟動時的上下文,一般使用Pool() 或者一個context對象的Pool()方法來創(chuàng)建一個池,兩種方法都適當(dāng)?shù)脑O(shè)置了context。

          實例方法:

          • apply(func[, args[, kwargs]]):在一個池工作進(jìn)程中執(zhí)行func(args,*kwargs),然后返回結(jié)果。需要強(qiáng)調(diào)的是:此操作并不會在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()。它是阻塞的。apply很少使用

          • apply_async(func[, arg[, kwds={}[, callback=None]]]):在一個池工作進(jìn)程中執(zhí)行func(args,*kwargs),然后返回結(jié)果。此方法的結(jié)果是AsyncResult類的實例,callback是可調(diào)用對象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r,將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。它是非阻塞。

          • map(func, iterable[, chunksize=None]):Pool類中的map方法,與內(nèi)置的map函數(shù)用法行為基本一致,它會使進(jìn)程阻塞直到返回結(jié)果。注意,雖然第二個參數(shù)是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運(yùn)行子進(jìn)程。

          • map_async(func, iterable[, chunksize=None]):map_async與map的關(guān)系同apply與apply_async

          • imap():imap 與 map的區(qū)別是,map是當(dāng)所有的進(jìn)程都已經(jīng)執(zhí)行完了,并將結(jié)果返回了,imap()則是立即返回一個iterable可迭代對象。

          • imap_unordered():不保證返回的結(jié)果順序與進(jìn)程添加的順序一致。

          • close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成。

          • join():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用,讓其不再接受新的Process。

          • terminate():結(jié)束工作進(jìn)程,不再處理未處理的任務(wù)。

          方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法:

          • get():返回結(jié)果,如果有必要則等待結(jié)果到達(dá)。timeout是可選的。如果在指定時間內(nèi)還沒有到達(dá),將引發(fā)異常。如果遠(yuǎn)程操作中引發(fā)了異常,它將在調(diào)用此方法時再次被引發(fā)。

          • ready():如果調(diào)用完成,返回True

          • successful():如果調(diào)用完成且沒有引發(fā)異常,返回True,如果在結(jié)果就緒之前調(diào)用此方法,引發(fā)異常

          • wait([timeout]):等待結(jié)果變?yōu)榭捎谩?/span>

          • terminate():立即終止所有工作進(jìn)程,同時不執(zhí)行任何清理或結(jié)束任何掛起工作。如果p被垃圾回收,將自動調(diào)用此函數(shù)

          使用示例:

          \#?-*-?coding:utf-8?-*-

          \#?Pool+map

          from?multiprocessing?import?Pool

          def?test(i):

          ????print(i)

          if?__name__?==?"__main__":

          ????lists?=?range(100)

          ????pool?=?Pool(8)

          ????pool.map(test,?lists)

          ????pool.close()

          ????pool.join()

          \#?-*-?coding:utf-8?-*-

          \#?異步進(jìn)程池(非阻塞)

          from?multiprocessing?import?Pool

          def?test(i):

          ????print(i)

          if?__name__?==?"__main__":

          ????pool?=?Pool(8)

          ????for?i?in?range(100):

          ????????'''

          ??????? For循環(huán)中執(zhí)行步驟:

          ????????(1)循環(huán)遍歷,將100個子進(jìn)程添加到進(jìn)程池(相對父進(jìn)程會阻塞)

          ????????(2)每次執(zhí)行8個子進(jìn)程,等一個子進(jìn)程執(zhí)行完后,立馬啟動新的子進(jìn)程。(相對父進(jìn)程不阻塞)

          ??????? apply_async為異步進(jìn)程池寫法。異步指的是啟動子進(jìn)程的過程,與父進(jìn)程本身的執(zhí)行(print)是異步的,而For循環(huán)中往進(jìn)程池添加子進(jìn)程的過程,與父進(jìn)程本身的執(zhí)行卻是同步的。

          ????????'''


          ????????pool.apply_async(test,?args=(i,))??#?維持執(zhí)行的進(jìn)程總數(shù)為8,當(dāng)一個進(jìn)程執(zhí)行完后啟動一個新進(jìn)程.

          ????print("test")

          ????pool.close()

          ????pool.join()

          \#?-*-?coding:utf-8?-*-

          \#?異步進(jìn)程池(非阻塞)

          from?multiprocessing?import?Pool

          def?test(i):

          ????print(i)

          if?__name__?==?"__main__":

          ????pool?=?Pool(8)

          ????for?i?in?range(100):

          ????????'''

          ????????????實際測試發(fā)現(xiàn),for循環(huán)內(nèi)部執(zhí)行步驟:

          ????????????(1)遍歷100個可迭代對象,往進(jìn)程池放一個子進(jìn)程

          ????????????(2)執(zhí)行這個子進(jìn)程,等子進(jìn)程執(zhí)行完畢,再往進(jìn)程池放一個子進(jìn)程,再執(zhí)行。(同時只執(zhí)行一個子進(jìn)程)

          ??????????? for循環(huán)執(zhí)行完畢,再執(zhí)行print函數(shù)。

          ????????'''


          ????????pool.apply(test,?args=(i,))??#?維持執(zhí)行的進(jìn)程總數(shù)為8,當(dāng)一個進(jìn)程執(zhí)行完后啟動一個新進(jìn)程.

          ????print("test")

          ????pool.close()

          ????pool.join()

          Queue(用于進(jìn)程通信,資源共享)

          在使用多進(jìn)程的過程中,最好不要使用共享資源。普通的全局變量是不能被子進(jìn)程所共享的,只有通過Multiprocessing組件構(gòu)造的數(shù)據(jù)結(jié)構(gòu)可以被共享。

          Queue是用來創(chuàng)建進(jìn)程間資源共享的隊列的類,使用Queue可以達(dá)到多進(jìn)程間數(shù)據(jù)傳遞的功能(缺點:只適用Process類,不能在Pool進(jìn)程池中使用)。

          構(gòu)造方法:Queue([maxsize])

          • maxsize是隊列中允許最大項數(shù),省略則無大小限制。

          實例方法:

          • put():用以插入數(shù)據(jù)到隊列。put方法還有兩個可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。

          • get():可以從隊列讀取并且刪除一個元素。get方法有兩個可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,那么在等待時間內(nèi)沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常。若不希望在empty的時候拋出異常,令blocked為True或者參數(shù)全部置空即可。

          • get_nowait():同q.get(False)

          • put_nowait():同q.put(False)

          • empty():調(diào)用此方法時q為空則返回True,該結(jié)果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。

          • full():調(diào)用此方法時q已滿則返回True,該結(jié)果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。

          • qsize():返回隊列中目前項目的正確數(shù)量,結(jié)果也不可靠,理由同q.empty()和q.full()一樣

          使用示例:

          from?multiprocessing?import?Process,?Queue

          import?os,?time,?random

          def?write(q):

          ????print('Process?to?write:?%s'?%?os.getpid())

          ????for?value?in?['A',?'B',?'C']:

          ????????print('Put?%s?to?queue...'?%?value)

          ????????q.put(value)

          ????????time.sleep(random.random())

          def?read(q):

          ????print('Process?to?read:?%s'?%?os.getpid())

          ????while?True:

          ????????value?=?q.get(True)

          ????????print('Get?%s?from?queue.'?%?value)

          if?__name__?==?"__main__":

          ????q?=?Queue()

          ????pw?=?Process(target=write,?args=(q,))

          ????pr?=?Process(target=read,?args=(q,))

          ????pw.start()

          ????pr.start()

          ????pw.join()??#?等待pw結(jié)束

          ????pr.terminate()??#?pr進(jìn)程里是死循環(huán),無法等待其結(jié)束,只能強(qiáng)行終止

          JoinableQueue就像是一個Queue對象,但隊列允許項目的使用者通知生成者項目已經(jīng)被成功處理。通知進(jìn)程是使用共享的信號和條件變量來實現(xiàn)的。

          構(gòu)造方法:JoinableQueue([maxsize])

          • maxsize:隊列中允許最大項數(shù),省略則無大小限制。

          實例方法

          JoinableQueue的實例p除了與Queue對象相同的方法之外還具有:

          • task_done():使用者使用此方法發(fā)出信號,表示q.get()的返回項目已經(jīng)被處理。如果調(diào)用此方法的次數(shù)大于從隊列中刪除項目的數(shù)量,將引發(fā)ValueError異常

          • join():生產(chǎn)者調(diào)用此方法進(jìn)行阻塞,直到隊列中所有的項目均被處理。阻塞將持續(xù)到隊列中的每個項目均調(diào)用q.task_done()方法為止

          使用示例:

          \#?-*-?coding:utf-8?-*-

          from?multiprocessing?import?Process,?JoinableQueue

          import?time,?random

          def?consumer(q):

          ????while?True:

          ????????res?=?q.get()

          ????????print('消費(fèi)者拿到了?%s'?%?res)

          ????????q.task_done()

          def?producer(seq,?q):

          ????for?item?in?seq:

          ????????time.sleep(random.randrange(1,2))

          ????????q.put(item)

          ????????print('生產(chǎn)者做好了?%s'?%?item)

          ????q.join()

          if?__name__?==?"__main__":

          ????q?=?JoinableQueue()

          ????seq?=?('產(chǎn)品%s'?%?i?for?i?in?range(5))

          ????p?=?Process(target=consumer,?args=(q,))

          ????p.daemon?=?True??#?設(shè)置為守護(hù)進(jìn)程,在主線程停止時p也停止,但是不用擔(dān)心,producer內(nèi)調(diào)用q.join保證了consumer已經(jīng)處理完隊列中的所有元素

          ????p.start()

          ????producer(seq,?q)

          ????print('主線程')

          Value,Array(用于進(jìn)程通信,資源共享)

          multiprocessing 中Value和Array的實現(xiàn)原理都是在共享內(nèi)存中創(chuàng)建ctypes()對象來達(dá)到共享數(shù)據(jù)的目的,兩者實現(xiàn)方法大同小異,只是選用不同的ctypes數(shù)據(jù)類型而已。

          Value

          構(gòu)造方法:Value((typecode_or_type, args[, lock])

          • typecode_or_type:定義ctypes()對象的類型,可以傳Type code或 C Type,具體對照表見下文。

          • args:傳遞給typecode_or_type構(gòu)造函數(shù)的參數(shù)

          • lock:默認(rèn)為True,創(chuàng)建一個互斥鎖來限制對Value對象的訪問,如果傳入一個鎖,如Lock或RLock的實例,將用于同步。如果傳入False,Value的實例就不會被鎖保護(hù),它將不是進(jìn)程安全的。

          typecode_or_type支持的類型:

          |?Type?code?|?C?Type?????????????|?Python?Type???????|?Minimum?size?in?bytes?|

          |?---------?|?------------------?|?-----------------?|?---------------------?|

          |?`'b'`?????|?signed?char????????|?int???????????????|?1?????????????????????|

          |?`'B'`?????|?unsigned?char??????|?int???????????????|?1?????????????????????|

          |?`'u'`?????|?Py_UNICODE?????????|?Unicode?character?|?2?????????????????????|

          |?`'h'`?????|?signed?short???????|?int???????????????|?2?????????????????????|

          |?`'H'`?????|?unsigned?short?????|?int???????????????|?2?????????????????????|

          |?`'i'`?????|?signed?int?????????|?int???????????????|?2?????????????????????|

          |?`'I'`?????|?unsigned?int???????|?int???????????????|?2?????????????????????|

          |?`'l'`?????|?signed?long????????|?int???????????????|?4?????????????????????|

          |?`'L'`?????|?unsigned?long??????|?int???????????????|?4?????????????????????|

          |?`'q'`?????|?signed?long?long???|?int???????????????|?8?????????????????????|

          |?`'Q'`?????|?unsigned?long?long?|?int???????????????|?8?????????????????????|

          |?`'f'`?????|?float??????????????|?float?????????????|?4?????????????????????|

          |?`'d'`?????|?double?????????????|?float?????????????|?8?????????????????????|

          參考地址:https://docs.python.org/3/library/array.html

          Array

          構(gòu)造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])

          • typecode_or_type:同上

          • size_or_initializer:如果它是一個整數(shù),那么它確定數(shù)組的長度,并且數(shù)組將被初始化為零。否則,size_or_initializer是用于初始化數(shù)組的序列,其長度決定數(shù)組的長度。

          • kwds:傳遞給typecode_or_type構(gòu)造函數(shù)的參數(shù)

          • lock:同上

          使用示例:

          import?multiprocessing

          def?f(n,?a):

          ????n.value?=?3.14

          ????a[0]?=?5

          if?__name__?==?'__main__':

          ????num?=?multiprocessing.Value('d',?0.0)

          ????arr?=?multiprocessing.Array('i',?range(10))

          ????p?=?multiprocessing.Process(target=f,?args=(num,?arr))

          ????p.start()

          ????p.join()

          ????print(num.value)

          ????print(arr[:])

          注意:Value和Array只適用于Process類。

          Pipe(用于管道通信)

          多進(jìn)程還有一種數(shù)據(jù)傳遞方式叫管道原理和 Queue相同。Pipe可以在進(jìn)程之間創(chuàng)建一條管道,并返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接對象,強(qiáng)調(diào)一點:必須在產(chǎn)生Process對象之前產(chǎn)生管道。

          構(gòu)造方法:Pipe([duplex])

          • dumplex:默認(rèn)管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發(fā)送。

          實例方法:

          • send(obj):通過連接發(fā)送對象。obj是與序列化兼容的任意對象

          • recv():接收conn2.send(obj)發(fā)送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經(jīng)關(guān)閉,那么recv方法會拋出EOFError。

          • close():關(guān)閉連接。如果conn1被垃圾回收,將自動調(diào)用此方法

          • fileno():返回連接使用的整數(shù)文件描述符

          • poll([timeout]):如果連接上的數(shù)據(jù)可用,返回True。timeout指定等待的最長時限。如果省略此參數(shù),方法將立即返回結(jié)果。如果將timeout射成None,操作將無限期地等待數(shù)據(jù)到達(dá)。

          • recv_bytes([maxlength]):接收c.send_bytes()方法發(fā)送的一條完整的字節(jié)消息。maxlength指定要接收的最大字節(jié)數(shù)。如果進(jìn)入的消息,超過了這個最大值,將引發(fā)IOError異常,并且在連接上無法進(jìn)行進(jìn)一步讀取。如果連接的另外一端已經(jīng)關(guān)閉,再也不存在任何數(shù)據(jù),將引發(fā)EOFError異常。

          • send_bytes(buffer [, offset [, size]]):通過連接發(fā)送字節(jié)數(shù)據(jù)緩沖區(qū),buffer是支持緩沖區(qū)接口的任意對象,offset是緩沖區(qū)中的字節(jié)偏移量,而size是要發(fā)送字節(jié)數(shù)。結(jié)果數(shù)據(jù)以單條消息的形式發(fā)出,然后調(diào)用c.recv_bytes()函數(shù)進(jìn)行接收

          • recv_bytes_into(buffer [, offset]):接收一條完整的字節(jié)消息,并把它保存在buffer對象中,該對象支持可寫入的緩沖區(qū)接口(即bytearray對象或類似的對象)。offset指定緩沖區(qū)中放置消息處的字節(jié)位移。返回值是收到的字節(jié)數(shù)。如果消息長度大于可用的緩沖區(qū)空間,將引發(fā)BufferTooShort異常。

          使用示例:

          from?multiprocessing?import?Process,?Pipe

          import?time

          \#?子進(jìn)程執(zhí)行方法

          def?f(Subconn):

          ????time.sleep(1)

          ????Subconn.send("吃了嗎")

          ????print("來自父親的問候:",?Subconn.recv())

          ????Subconn.close()

          if?__name__?==?"__main__":

          ????parent_conn,?child_conn?=?Pipe()??#?創(chuàng)建管道兩端

          ????p?=?Process(target=f,?args=(child_conn,))??#?創(chuàng)建子進(jìn)程

          ????p.start()

          ????print("來自兒子的問候:",?parent_conn.recv())

          ????parent_conn.send("嗯")

          Manager(用于資源共享)

          Manager()返回的manager對象控制了一個server進(jìn)程,此進(jìn)程包含的python對象可以被其他的進(jìn)程通過proxies來訪問。從而達(dá)到多進(jìn)程間數(shù)據(jù)通信且安全。Manager模塊常與Pool模塊一起使用。

          Manager支持的類型有l(wèi)ist,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

          管理器是獨(dú)立運(yùn)行的子進(jìn)程,其中存在真實的對象,并以服務(wù)器的形式運(yùn)行,其他進(jìn)程通過使用代理訪問共享對象,這些代理作為客戶端運(yùn)行。Manager()是BaseManager的子類,返回一個啟動的SyncManager()實例,可用于創(chuàng)建共享對象并返回訪問這些共享對象的代理。

          BaseManager,創(chuàng)建管理器服務(wù)器的基類

          構(gòu)造方法:BaseManager([address[, authkey]])

          • address:(hostname,port),指定服務(wù)器的網(wǎng)址地址,默認(rèn)為簡單分配一個空閑的端口

          • authkey:連接到服務(wù)器的客戶端的身份驗證,默認(rèn)為current_process().authkey的值

          實例方法:

          • start([initializer[, initargs]]):啟動一個單獨(dú)的子進(jìn)程,并在該子進(jìn)程中啟動管理器服務(wù)器

          • get_server():獲取服務(wù)器對象

          • connect():連接管理器對象

          • shutdown():關(guān)閉管理器對象,只能在調(diào)用了start()方法之后調(diào)用

          實例屬性:

          • address:只讀屬性,管理器服務(wù)器正在使用的地址

          • SyncManager,以下類型均不是進(jìn)程安全的,需要加鎖..

          實例方法:
          • Array(self,*args,**kwds)
          • BoundedSemaphore(self,*args,**kwds)
          • Condition(self,*args,**kwds)
          • Event(self,*args,**kwds)
          • JoinableQueue(self,*args,**kwds)
          • Lock(self,*args,**kwds)
          • Namespace(self,*args,**kwds)
          • Pool(self,*args,**kwds)
          • Queue(self,*args,**kwds)
          • RLock(self,*args,**kwds)
          • Semaphore(self,*args,**kwds)
          • Value(self,*args,**kwds)
          • dict(self,*args,**kwds)
          • list(self,*args,**kwds)

          使用示例:

          import?multiprocessing

          def?f(x,?arr,?l,?d,?n):

          ????x.value?=?3.14

          ????arr[0]?=?5

          ????l.append('Hello')

          ????d[1]?=?2

          ????n.a?=?10

          if?__name__?==?'__main__':

          ????server?=?multiprocessing.Manager()

          ????x?=?server.Value('d',?0.0)

          ????arr?=?server.Array('i',?range(10))

          ????l?=?server.list()

          ????d?=?server.dict()

          ????n?=?server.Namespace()

          ????proc?=?multiprocessing.Process(target=f,?args=(x,?arr,?l,?d,?n))

          ????proc.start()

          ????proc.join()

          ????print(x.value)

          ????print(arr)

          ????print(l)

          ????print(d)

          ????print(n)

          同步子進(jìn)程模塊

          Lock(互斥鎖)

          Lock鎖的作用是當(dāng)多個進(jìn)程需要訪問共享資源的時候,避免訪問的沖突。加鎖保證了多個進(jìn)程修改同一塊數(shù)據(jù)時,同一時間只能有一個修改,即串行的修改,犧牲了速度但保證了數(shù)據(jù)安全。Lock包含兩種狀態(tài)——鎖定和非鎖定,以及兩個基本的方法。

          構(gòu)造方法:Lock()

          實例方法:

          • acquire([timeout]): 使線程進(jìn)入同步阻塞狀態(tài),嘗試獲得鎖定。

          • release(): 釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。

          使用示例:

          from?multiprocessing?import?Process,?Lock

          def?l(lock,?num):

          ????lock.acquire()

          ????print("Hello?Num:?%s"?%?(num))

          ????lock.release()

          if?__name__?==?'__main__':

          ????lock?=?Lock()??#?這個一定要定義為全局

          ????for?num?in?range(20):

          ????????Process(target=l,?args=(lock,?num)).start()

          RLock(可重入的互斥鎖(同一個進(jìn)程可以多次獲得它,同時不會造成阻塞)

          RLock(可重入鎖)是一個可以被同一個線程請求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級”的概念,處于鎖定狀態(tài)時,RLock被某個線程擁有。擁有RLock的線程可以再次調(diào)用acquire(),釋放鎖時需要調(diào)用release()相同次數(shù)。可以認(rèn)為RLock包含一個鎖定池和一個初始值為0的計數(shù)器,每次成功調(diào)用 acquire()/release(),計數(shù)器將+1/-1,為0時鎖處于未鎖定狀態(tài)。

          構(gòu)造方法:RLock()

          實例方法:

          • acquire([timeout]):同Lock

          • release(): 同Lock

          Semaphore(信號量)

          信號量是一個更高級的鎖機(jī)制。信號量內(nèi)部有一個計數(shù)器而不像鎖對象內(nèi)部有鎖標(biāo)識,而且只有當(dāng)占用信號量的線程數(shù)超過信號量時線程才阻塞。這允許了多個線程可以同時訪問相同的代碼區(qū)。比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進(jìn)去,如果指定信號量為3,那么來一個人獲得一把鎖,計數(shù)加1,當(dāng)計數(shù)等于3時,后面的人均需要等待。一旦釋放,就有人可以獲得一把鎖。

          構(gòu)造方法:Semaphore([value])

          • value:設(shè)定信號量,默認(rèn)值為1

          實例方法:

          • acquire([timeout]):同Lock

          • release(): 同Lock

          使用示例:

          from?multiprocessing?import?Process,?Semaphore

          import?time,?random

          def?go_wc(sem,?user):

          ????sem.acquire()

          ????print('%s?占到一個茅坑'?%?user)

          ????time.sleep(random.randint(0,?3))

          ????sem.release()

          ????print(user,?'OK')

          if?__name__?==?'__main__':

          ????sem?=?Semaphore(2)

          ????p_l?=?[]

          ????for?i?in?range(5):

          ????????p?=?Process(target=go_wc,?args=(sem,?'user%s'?%?i,))

          ????????p.start()

          ????????p_l.append(p)

          ????for?i?in?p_l:

          ????????i.join()

          Condition(條件變量)

          可以把Condition理解為一把高級的鎖,它提供了比Lock, RLock更高級的功能,允許我們能夠控制復(fù)雜的線程同步問題。Condition在內(nèi)部維護(hù)一個鎖對象(默認(rèn)是RLock),可以在創(chuàng)建Condigtion對象的時候把瑣對象作為參數(shù)傳入。Condition也提供了acquire, release方法,其含義與鎖的acquire, release方法一致,其實它只是簡單的調(diào)用內(nèi)部鎖對象的對應(yīng)的方法而已。Condition還提供了其他的一些方法。

          構(gòu)造方法:Condition([lock/rlock])

          • 可以傳遞一個Lock/RLock實例給構(gòu)造方法,否則它將自己生成一個RLock實例。

          實例方法:

          • acquire([timeout]):首先進(jìn)行acquire,然后判斷一些條件。如果條件不滿足則wait

          • release():釋放 Lock

          • wait([timeout]): 調(diào)用這個方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。處于wait狀態(tài)的線程接到通知后會重新判斷條件。

          • notify(): 調(diào)用這個方法將從等待池挑選一個線程并通知,收到通知的線程將自動調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池);其他線程仍然在等待池中。調(diào)用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

          • notifyAll(): 調(diào)用這個方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定。調(diào)用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

          使用示例:

          import?multiprocessing

          import?time

          def?stage_1(cond):

          ????"""perform?first?stage?of?work,

          ????then?notify?stage_2?to?continue

          ????"""


          ????name?=?multiprocessing.current_process().name

          ????print('Starting',?name)

          ????with?cond:

          ????????print('{}?done?and?ready?for?stage?2'.format(name))

          ????????cond.notify_all()

          def?stage_2(cond):

          ????"""wait?for?the?condition?telling?us?stage_1?is?done"""

          ????name?=?multiprocessing.current_process().name

          ????print('Starting',?name)

          ????with?cond:

          ????????cond.wait()

          ????????print('{}?running'.format(name))

          if?__name__?==?'__main__':

          ????condition?=?multiprocessing.Condition()

          ????s1?=?multiprocessing.Process(name='s1',

          ?????????????????????????????????target=stage_1,

          ?????????????????????????????????args=(condition,))

          ????s2_clients?=?[

          ????????multiprocessing.Process(

          ????????????name='stage_2[{}]'.format(i),

          ????????????target=stage_2,

          ????????????args=(condition,),

          ????????)

          ????????for?i?in?range(1,?3)

          ????]

          ????for?c?in?s2_clients:

          ????????c.start()

          ????????time.sleep(1)

          ????s1.start()

          ????s1.join()

          ????for?c?in?s2_clients:

          ????????c.join()

          Event(事件)

          Event內(nèi)部包含了一個標(biāo)志位,初始的時候為false。可以使用set()來將其設(shè)置為true;或者使用clear()將其從新設(shè)置為false;可以使用is_set()來檢查標(biāo)志位的狀態(tài);另一個最重要的函數(shù)就是wait(timeout=None),用來阻塞當(dāng)前線程,直到event的內(nèi)部標(biāo)志位被設(shè)置為true或者timeout超時。如果內(nèi)部標(biāo)志位為true則wait()函數(shù)理解返回。

          使用示例:

          import?multiprocessing

          import?time

          def?wait_for_event(e):

          ????"""Wait?for?the?event?to?be?set?before?doing?anything"""

          ????print('wait_for_event:?starting')

          ????e.wait()

          ????print('wait_for_event:?e.is_set()->',?e.is_set())

          def?wait_for_event_timeout(e,?t):

          ????"""Wait?t?seconds?and?then?timeout"""

          ????print('wait_for_event_timeout:?starting')

          ????e.wait(t)

          ????print('wait_for_event_timeout:?e.is_set()->',?e.is_set())

          if?__name__?==?'__main__':

          ????e?=?multiprocessing.Event()

          ????w1?=?multiprocessing.Process(

          ????????name='block',

          ????????target=wait_for_event,

          ????????args=(e,),

          ????)

          ????w1.start()

          ????w2?=?multiprocessing.Process(

          ????????name='nonblock',

          ????????target=wait_for_event_timeout,

          ????????args=(e,?2),

          ????)

          ????w2.start()

          ????print('main:?waiting?before?calling?Event.set()')

          ????time.sleep(3)

          ????e.set()

          ????print('main:?event?is?set')

          其他內(nèi)容

          multiprocessing.dummy 模塊與 multiprocessing 模塊的區(qū)別:dummy 模塊是多線程,而 multiprocessing 是多進(jìn)程, api 都是通用的。所有可以很方便將代碼在多線程和多進(jìn)程之間切換。multiprocessing.dummy通常在IO場景可以嘗試使用,比如使用如下方式引入線程池。

          from?multiprocessing.dummy?import?Pool?as?ThreadPool

          multiprocessing.dummy與早期的threading,不同的點好像是在多多核CPU下,只綁定了一個核心(具體未考證)。

          參考文檔:

          https://docs.python.org/3/library/multiprocessing.html
          https://www.rddoc.com/doc/Python/3.6.0/zh/library/multiprocessing/

          Python并發(fā)之concurrent.futures

          Python標(biāo)準(zhǔn)庫為我們提供了threading和multiprocessing模塊編寫相應(yīng)的多線程/多進(jìn)程代碼。從Python3.2開始,標(biāo)準(zhǔn)庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現(xiàn)了對threading和multiprocessing的更高級的抽象,對編寫線程池/進(jìn)程池提供了直接的支持。concurrent.futures基礎(chǔ)模塊是executor和future。

          Executor

          Executor是一個抽象類,它不能被直接使用。它為具體的異步執(zhí)行定義了一些基本的方法。ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來創(chuàng)建線程池和進(jìn)程池的代碼。

          ThreadPoolExecutor對象

          ThreadPoolExecutor類是Executor子類,使用線程池執(zhí)行異步調(diào)用。

          class?concurrent.futures.ThreadPoolExecutor(max_workers)

          使用max_workers數(shù)目的線程池執(zhí)行異步調(diào)用。

          ProcessPoolExecutor對象

          ThreadPoolExecutor類是Executor子類,使用進(jìn)程池執(zhí)行異步調(diào)用。

          class?concurrent.futures.ProcessPoolExecutor(max_workers=None)

          使用max_workers數(shù)目的進(jìn)程池執(zhí)行異步調(diào)用,如果max_workers為None則使用機(jī)器的處理器數(shù)目(如4核機(jī)器max_worker配置為None時,則使用4個進(jìn)程進(jìn)行異步并發(fā))。

          submit()方法

          Executor中定義了submit()方法,這個方法的作用是提交一個可執(zhí)行的回調(diào)task,并返回一個future實例。future對象代表的就是給定的調(diào)用。

          Executor.submit(fn, *args, **kwargs)

          • fn:需要異步執(zhí)行的函數(shù)

          • *args, **kwargs:fn參數(shù)

          使用示例:

          from?concurrent?import?futures

          def?test(num):

          ????import?time

          ????return?time.ctime(),?num

          with?futures.ThreadPoolExecutor(max_workers=1)?as?executor:

          ????future?=?executor.submit(test,?1)

          ????print(future.result())

          map()方法

          除了submit,Exectuor還為我們提供了map方法,這個方法返回一個map(func, *iterables)迭代器,迭代器中的回調(diào)執(zhí)行返回的結(jié)果有序的。

          Executor.map(func, *iterables, timeout=None)

          • func:需要異步執(zhí)行的函數(shù)

          • *iterables:可迭代對象,如列表等。每一次func執(zhí)行,都會從iterables中取參數(shù)。

          • timeout:設(shè)置每次異步操作的超時時間,timeout的值可以是int或float,如果操作超時,會返回raisesTimeoutError;如果不指定timeout參數(shù),則不設(shè)置超時間。

          使用示例:

          from?concurrent?import?futures

          def?test(num):

          ????import?time

          ????return?time.ctime(),?num

          data?=?[1,?2,?3]

          with?futures.ThreadPoolExecutor(max_workers=1)?as?executor:

          ????for?future?in?executor.map(test,?data):

          ????????print(future)

          shutdown()方法

          釋放系統(tǒng)資源,在Executor.submit()或 Executor.map()等異步操作后調(diào)用。使用with語句可以避免顯式調(diào)用此方法。

          Executor.shutdown(wait=True)

          Future

          Future可以理解為一個在未來完成的操作,這是異步編程的基礎(chǔ)。通常情況下,我們執(zhí)行io操作,訪問url時(如下)在等待結(jié)果返回之前會產(chǎn)生阻塞,cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作。

          Future類封裝了可調(diào)用的異步執(zhí)行。Future 實例通過 Executor.submit()方法創(chuàng)建。

          cancel():試圖取消調(diào)用。如果調(diào)用當(dāng)前正在執(zhí)行,并且不能被取消,那么該方法將返回False,否則調(diào)用將被取消,方法將返回True。

          cancelled():如果成功取消調(diào)用,返回True。

          running():如果調(diào)用當(dāng)前正在執(zhí)行并且不能被取消,返回True。

          done():如果調(diào)用成功地取消或結(jié)束了,返回True。

          result(timeout=None):返回調(diào)用返回的值。如果調(diào)用還沒有完成,那么這個方法將等待超時秒。如果調(diào)用在超時秒內(nèi)沒有完成,那么就會有一個Futures.TimeoutError將報出。timeout可以是一個整形或者浮點型數(shù)值,如果timeout不指定或者為None,等待時間無限。如果futures在完成之前被取消了,那么 CancelledError 將會報出。

          exception(timeout=None):返回調(diào)用拋出的異常,如果調(diào)用還未完成,該方法會等待timeout指定的時長,如果該時長后調(diào)用還未完成,就會報出超時錯誤futures.TimeoutError。timeout可以是一個整形或者浮點型數(shù)值,如果timeout不指定或者為None,等待時間無限。如果futures在完成之前被取消了,那么 CancelledError 將會報出。如果調(diào)用完成并且無異常報出,返回None.

          add_done_callback(fn):將可調(diào)用fn捆綁到future上,當(dāng)Future被取消或者結(jié)束運(yùn)行,fn作為future的唯一參數(shù)將會被調(diào)用。如果future已經(jīng)運(yùn)行完成或者取消,fn將會被立即調(diào)用。

          wait(fs, timeout=None, return_when=ALL_COMPLETED)

          • 等待fs提供的 Future 實例(possibly created by different Executor instances) 運(yùn)行結(jié)束。返回一個命名的2元集合,分表代表已完成的和未完成的

          return_when 表明什么時候函數(shù)應(yīng)該返回。它的值必須是一下值之一:

          • FIRST_COMPLETED :函數(shù)在任何future結(jié)束或者取消的時候返回。

          • FIRST_EXCEPTION :函數(shù)在任何future因為異常結(jié)束的時候返回,如果沒有future報錯,效果等于

          • ALL_COMPLETED :函數(shù)在所有future結(jié)束后才會返回。

          as_completed(fs, timeout=None):參數(shù)是一個 Future 實例列表,返回值是一個迭代器,在運(yùn)行結(jié)束后產(chǎn)出 Future實例 。

          使用示例:

          from?concurrent.futures?import?ThreadPoolExecutor,?wait,?as_completed

          from?time?import?sleep

          from?random?import?randint

          def?return_after_5_secs(num):

          ????sleep(randint(1,?5))

          ????return?"Return?of?{}".format(num)

          pool?=?ThreadPoolExecutor(5)

          futures?=?[]

          for?x?in?range(5):

          ????futures.append(pool.submit(return_after_5_secs,?x))

          print(1)

          for?x?in?as_completed(futures):

          ????print(x.result())

          print(2)

          參考鏈接:

          https://pythonhosted.org/futures

          推薦閱讀

          利用 Python 實現(xiàn)多任務(wù)進(jìn)程


          Python爬蟲實戰(zhàn) | 利用多線程爬取 LOL 高清壁紙

          瀏覽 64
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩视频手机在线观看 | 午夜无码鲁丝片午夜精品一区二区 | 99爱精品视频在线观看 | 大香蕉久久爱 | 无码人妻精品一区二区蜜桃网站文 |