適合小白的 Asyncio 教程!

所謂「異步 IO」,就是你發(fā)起一個 IO 操作,卻不用等它結(jié)束,你可以繼續(xù)做其他事情,當(dāng)它結(jié)束時,你會得到通知。
Asyncio 是并發(fā)(concurrency)的一種方式。對 Python 來說,并發(fā)還可以通過線程(threading)和多進(jìn)程(multiprocessing)來實(shí)現(xiàn)。
Asyncio 并不能帶來真正的并行(parallelism)。當(dāng)然,因?yàn)?GIL(全局解釋器鎖)的存在,Python 的多線程也不能帶來真正的并行。
可交給 asyncio 執(zhí)行的任務(wù),稱為協(xié)程(coroutine)。一個協(xié)程可以放棄執(zhí)行,把機(jī)會讓給其它協(xié)程(即 yield from 或 await)。
1. 定義協(xié)程
協(xié)程的定義,需要使用 async def 語句。
async?def?do_some_work(x):?pass
do_some_work 便是一個協(xié)程。
準(zhǔn)確來說,do_some_work 是一個協(xié)程函數(shù),可以通過 asyncio.iscoroutinefunction 來驗(yàn)證:
print(asyncio.iscoroutinefunction(do_some_work))?#?True
這個協(xié)程什么都沒做,我們讓它睡眠幾秒,以模擬實(shí)際的工作量 :
async?def?do_some_work(x):
????print("Waiting?"?+?str(x))
????await?asyncio.sleep(x)
在解釋 await 之前,有必要說明一下協(xié)程可以做哪些事。協(xié)程可以:
等待一個 future 結(jié)束
等待另一個協(xié)程(產(chǎn)生一個結(jié)果,或引發(fā)一個異常)
產(chǎn)生一個結(jié)果給正在等它的協(xié)程
引發(fā)一個異常給正在等它的協(xié)程
asyncio.sleep 也是一個協(xié)程,所以 await asyncio.sleep(x) 就是等待另一個協(xié)程。可參見 asyncio.sleep 的文檔:
sleep(delay,?result=None,?*,?loop=None)
Coroutine?that?completes?after?a?given?time?(in?seconds).
2. 運(yùn)行協(xié)程
調(diào)用協(xié)程函數(shù),協(xié)程并不會開始運(yùn)行,只是返回一個協(xié)程對象,可以通過 asyncio.iscoroutine 來驗(yàn)證:
print(asyncio.iscoroutine(do_some_work(3)))?#?True
此處還會引發(fā)一條警告:
async1.py:16:?RuntimeWarning:?coroutine?'do_some_work'?was?never?awaited
??print(asyncio.iscoroutine(do_some_work(3)))
要讓這個協(xié)程對象運(yùn)行的話,有兩種方式:
在另一個已經(jīng)運(yùn)行的協(xié)程中用
await等待它通過
ensure_future函數(shù)計(jì)劃它的執(zhí)行
簡單來說,只有 loop 運(yùn)行了,協(xié)程才可能運(yùn)行。
下面先拿到當(dāng)前線程缺省的 loop ,然后把協(xié)程對象交給 loop.run_until_complete,協(xié)程對象隨后會在 loop 里得到運(yùn)行。
loop?=?asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))
run_until_complete 是一個阻塞(blocking)調(diào)用,直到協(xié)程運(yùn)行結(jié)束,它才返回。這一點(diǎn)從函數(shù)名不難看出。run_until_complete 的參數(shù)是一個 future,但是我們這里傳給它的卻是協(xié)程對象,之所以能這樣,是因?yàn)樗趦?nèi)部做了檢查,通過 ensure_future 函數(shù)把協(xié)程對象包裝(wrap)成了 future。所以,我們可以寫得更明顯一些:
loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))
完整代碼:
import?asyncio
async?def?do_some_work(x):
????print("Waiting?"?+?str(x))
????await?asyncio.sleep(x)
loop?=?asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))
運(yùn)行結(jié)果:
Waiting?3
<三秒鐘后程序結(jié)束>
3. 回調(diào)函數(shù)
假如協(xié)程是一個 IO 的讀操作,等它讀完數(shù)據(jù)后,我們希望得到通知,以便下一步數(shù)據(jù)的處理。這一需求可以通過往 future 添加回調(diào)來實(shí)現(xiàn)。
def?done_callback(futu):
????print('Done')
futu?=?asyncio.ensure_future(do_some_work(3))
futu.add_done_callback(done_callback)
loop.run_until_complete(futu)
4. 多個協(xié)程
實(shí)際項(xiàng)目中,往往有多個協(xié)程,同時在一個 loop 里運(yùn)行。為了把多個協(xié)程交給 loop,需要借助 asyncio.gather 函數(shù)。
loop.run_until_complete(asyncio.gather(do_some_work(1),?do_some_work(3)))
或者先把協(xié)程存在列表里:
coros?=?[do_some_work(1),?do_some_work(3)]
loop.run_until_complete(asyncio.gather(*coros))
運(yùn)行結(jié)果:
Waiting?3
Waiting?1
<等待三秒鐘>
Done
這兩個協(xié)程是并發(fā)運(yùn)行的,所以等待的時間不是 1 + 3 = 4 秒,而是以耗時較長的那個協(xié)程為準(zhǔn)。
參考函數(shù) gather 的文檔:
gather(*coros_or_futures, loop=None, return_exceptions=False)
Return a future aggregating results from the given coroutines or futures.
發(fā)現(xiàn)也可以傳 futures 給它:
futus?=?[asyncio.ensure_future(do_some_work(1)),
?????????????asyncio.ensure_future(do_some_work(3))]
loop.run_until_complete(asyncio.gather(*futus))
gather 起聚合的作用,把多個 futures 包裝成單個 future,因?yàn)?loop.run_until_complete 只接受單個 future。
5. run_until_complete和run_forever
我們一直通過 run_until_complete 來運(yùn)行 loop ,等到 future 完成,run_until_complete 也就返回了。
async?def?do_some_work(x):
????print('Waiting?'?+?str(x))
????await?asyncio.sleep(x)
????print('Done')
loop?=?asyncio.get_event_loop()
coro?=?do_some_work(3)
loop.run_until_complete(coro)
輸出:
Waiting?3
<等待三秒鐘>
Done
<程序退出>
現(xiàn)在改用 run_forever:
async?def?do_some_work(x):
????print('Waiting?'?+?str(x))
????await?asyncio.sleep(x)
????print('Done')
loop?=?asyncio.get_event_loop()
coro?=?do_some_work(3)
asyncio.ensure_future(coro)
loop.run_forever()
輸出:
Waiting?3
<等待三秒鐘>
Done
<程序沒有退出>
三秒鐘過后,future 結(jié)束,但是程序并不會退出。run_forever 會一直運(yùn)行,直到 stop 被調(diào)用,但是你不能像下面這樣調(diào) stop:
loop.run_forever()
loop.stop()
run_forever 不返回,stop 永遠(yuǎn)也不會被調(diào)用。所以,只能在協(xié)程中調(diào) stop:
async?def?do_some_work(loop,?x):
????print('Waiting?'?+?str(x))
????await?asyncio.sleep(x)
????print('Done')
????loop.stop()
這樣并非沒有問題,假如有多個協(xié)程在 loop 里運(yùn)行:
asyncio.ensure_future(do_some_work(loop,?1))
asyncio.ensure_future(do_some_work(loop,?3))
loop.run_forever()
第二個協(xié)程沒結(jié)束,loop 就停止了——被先結(jié)束的那個協(xié)程給停掉的。
要解決這個問題,可以用 gather 把多個協(xié)程合并成一個 future,并添加回調(diào),然后在回調(diào)里再去停止 loop。
async?def?do_some_work(loop,?x):
????print('Waiting?'?+?str(x))
????await?asyncio.sleep(x)
????print('Done')
def?done_callback(loop,?futu):
????loop.stop()
loop?=?asyncio.get_event_loop()
futus?=?asyncio.gather(do_some_work(loop,?1),?do_some_work(loop,?3))
futus.add_done_callback(functools.partial(done_callback,?loop))
loop.run_forever()
其實(shí)這基本上就是 run_until_complete 的實(shí)現(xiàn)了,run_until_complete 在內(nèi)部也是調(diào)用 run_forever。
6. Close Loop?
以上示例都沒有調(diào)用 loop.close,好像也沒有什么問題。所以到底要不要調(diào) loop.close 呢?
簡單來說,loop 只要不關(guān)閉,就還可以再運(yùn)行。:
loop.run_until_complete(do_some_work(loop,?1))
loop.run_until_complete(do_some_work(loop,?3))
loop.close()
但是如果關(guān)閉了,就不能再運(yùn)行了:
loop.run_until_complete(do_some_work(loop,?1))
loop.close()
loop.run_until_complete(do_some_work(loop,?3))??#?此處異常
建議調(diào)用 loop.close,以徹底清理 loop 對象防止誤用。
7. gather 和 wait
asyncio.gather 和 asyncio.wait 功能相似。
coros?=?[do_some_work(loop,?1),?do_some_work(loop,?3)]
loop.run_until_complete(asyncio.wait(coros))
具體差別可請參見 StackOverflow 的討論:Asyncio.gather vs asyncio.wait。
8. Timer
C++ Boost.Asio 提供了 IO 對象 timer,但是 Python 并沒有原生支持 timer,不過可以用 asyncio.sleep 模擬。
async?def?timer(x,?cb):
????futu?=?asyncio.ensure_future(asyncio.sleep(x))
????futu.add_done_callback(cb)
????await?futu
t?=?timer(3,?lambda?futu:?print('Done'))
loop.run_until_complete(t)- EOF -
為了大家更快速的學(xué)習(xí)知識,掌握技術(shù),隨時溝通交流問題,特組建了技術(shù)交流群,大家在群里可以分享自己的技術(shù)棧,拋出日常問題,群里會有很多大佬及時解答的,這樣我們就會結(jié)識很多志同道合的人,長按下圖可加我微信,備注:Python即可進(jìn)群。
??????????????
?掃碼加群??
回復(fù):[學(xué)習(xí)資料]獲取最新Python學(xué)習(xí)資料
