信息量巨大!把 Python 協(xié)程的本質(zhì)扒得干干凈凈
本文章信息量較大,從 IO 多路復(fù)用,到生成器的使用,再到 async、await 背后的實(shí)現(xiàn)原理,深入淺出,剖析得非常透徹,非常硬核!
tornado 特有的 feature,現(xiàn)在已經(jīng)有 async、await 關(guān)鍵字支持了。思考了一下其實(shí)現(xiàn),回顧了下這些年的演變,覺(jué)得還有點(diǎn)意思。都是單線程,為什么原來(lái)低效率的代碼用了 async、await加一些異步庫(kù)就變得效率高了?
0x00 開(kāi)始之前
generator 的概念。0x01 IO 多路復(fù)用
0x02。def handler(request):
# 處理請(qǐng)求
pass
# 你的 handler 運(yùn)行在 while 循環(huán)中
while True:
# 獲取一個(gè)新請(qǐng)求
request = accept()
# 根據(jù)路由映射獲取到用戶寫(xiě)的業(yè)務(wù)邏輯函數(shù)
handler = get_handler(request)
# 運(yùn)行用戶的handler,處理請(qǐng)求
handler(request)
handler,在接收到請(qǐng)求后需要一個(gè) API 調(diào)用才能響應(yīng)結(jié)果。handler 的運(yùn)行放到其他線程上,每個(gè)線程處理一個(gè)請(qǐng)求,本線程阻塞不影響新請(qǐng)求進(jìn)入。這能一定程度上解決問(wèn)題,但對(duì)于并發(fā)比較大的系統(tǒng),過(guò)多線程調(diào)度會(huì)帶來(lái)很大的性能開(kāi)銷。# 操作系統(tǒng)的IO復(fù)用示例偽代碼
# 向操作系統(tǒng)IO注冊(cè)自己關(guān)注的IO操作的id和類型
io_register(io_id, io_type)
io_register(io_id, io_type)
# 獲取完成的IO操作
events = io_get_finished()
for (io_id, io_type) in events:
if io_type == READ:
data = read_data(io_id)
elif io_type == WRITE:
write_data(io_id,data)
call_backs = {}
def handler(req):
# do jobs here
io_register(io_id, io_type)
def call_back(result):
# 使用返回的result完成剩余工作...
call_backs[io_id] = call_back
# 新的循環(huán)
while True:
# 獲取已經(jīng)完成的io事件
events = io_get_finished()
for (io_id, io_type) in events:
if io_type == READ: # 讀取
data = read(io_id)
call_back = call_backs[io_id]
call_back(data)
else:
# 其他類型io事件的處理
pass
# 獲取一個(gè)新請(qǐng)求
request = accept()
# 根據(jù)路由映射獲取到用戶寫(xiě)的業(yè)務(wù)邏輯函數(shù)
handler = get_handler(request)
# 運(yùn)行用戶的handler,處理請(qǐng)求
handler(request)
handler 對(duì)于 IO 操作,注冊(cè)了回調(diào)就立刻返回,同時(shí)每次迭代都會(huì)對(duì)已完成的 IO 執(zhí)行回調(diào),網(wǎng)絡(luò)請(qǐng)求不再阻塞整個(gè)服務(wù)器。call_backs 字典拆分到單獨(dú)模塊,就能得到一個(gè) EventLoop,也就是 Python 標(biāo)準(zhǔn)庫(kù) asyncio 包中提供的 ioloop。0x02 用生成器消除 callback
handler 函數(shù),在有獨(dú)立的 ioloop 后,它現(xiàn)在變成類似這樣:def handler(request):
# 業(yè)務(wù)邏輯代碼...
# 需要執(zhí)行一次 API 請(qǐng)求
def call_back(result):
# 使用 API 返回的result完成剩余工作
print(result)
# 沒(méi)有io_call這個(gè)方法,這里只是示意,表示注冊(cè)一個(gè)IO操作
asyncio.get_event_loop().io_call(api, call_back)
def example():
value = yield 2
print("get", value)
return value
g = example()
# 啟動(dòng)生成器,我們會(huì)得到 2
got = g.send(None)
print(got) # 2
try:
# 再次啟動(dòng) 會(huì)顯示 "get 4", 就是我們傳入的值
got = g.send(got*2)
except StopIteration as e:
# 生成器運(yùn)行完成,將會(huì)print(4),e.value 是生成器return的值
print(e.value)
yield 關(guān)鍵字,調(diào)用函數(shù)將會(huì)得到一個(gè)生成器,生成器一個(gè)關(guān)鍵的方法 send() 可以跟生成器交互。g.send(None) 會(huì)運(yùn)行生成器內(nèi)代碼直到遇到 yield,并返回其后的對(duì)象,也就是 2,生成器代碼就停在這里了,直到我們?cè)俅螆?zhí)行 g.send(got*2),會(huì)把 2*2 也就是 4 賦值給yield 前面的變量 value,然后繼續(xù)運(yùn)行生成器代碼。yield 在這里就像一扇門(mén),可以把一件東西從這里送出去,也可以把另一件東西拿進(jìn)來(lái)。send 讓生成器運(yùn)行到下一個(gè) yield 前就結(jié)束了,send 調(diào)用會(huì)引發(fā)一個(gè)特殊的異常StopIteration,這個(gè)異常自帶一個(gè)屬性 value,為生成器 return 的值。handler 用 yield 關(guān)鍵字轉(zhuǎn)換成一個(gè)生成器,運(yùn)行它來(lái)把 IO 操作的具體內(nèi)容返回,IO 完成后的回調(diào)函數(shù)中把 IO 結(jié)果放回并恢復(fù)生成器運(yùn)行,那就解決了業(yè)務(wù)代碼不流暢的問(wèn)題了:def handler(request):
# 業(yè)務(wù)邏輯代碼...
# 需要執(zhí)行一次 API 請(qǐng)求,直接把 IO 請(qǐng)求信息yield出去
result = yield io_info
# 使用 API 返回的result完成剩余工作
print(result)
# 這個(gè)函數(shù)注冊(cè)到ioloop中,用來(lái)當(dāng)有新請(qǐng)求的時(shí)候回調(diào)
def on_request(request):
# 根據(jù)路由映射獲取到用戶寫(xiě)的業(yè)務(wù)邏輯函數(shù)
handler = get_handler(request)
g = handler(request)
# 首次啟動(dòng)獲得io_info
io_info = g.send(None)
# io完成回調(diào)函數(shù)
def call_back(result):
# 重新啟動(dòng)生成器
g.send(result)
asyncio.get_event_loop().io_call(io_info, call_back)
handler 代碼已經(jīng)不會(huì)被打散到 callback 中,on_request 函數(shù)使用 callback 和 ioloop 交互,但它會(huì)被實(shí)現(xiàn)在 Web 框架中,對(duì)用戶不可見(jiàn)。callback 的啟發(fā),但局限性有兩點(diǎn):業(yè)務(wù)邏輯中僅發(fā)起一次網(wǎng)絡(luò) IO,但實(shí)際中往往更多 業(yè)務(wù)邏輯沒(méi)有調(diào)用其他異步函數(shù)(協(xié)程),但實(shí)際中我們往往會(huì)調(diào)用其他協(xié)程
0x03 解決完整調(diào)用鏈
request 執(zhí)行真正的 IO,func1、func2 僅調(diào)用。顯然我們的代碼只能寫(xiě)成這樣:def func1():
ret = yield request("http://test.com/foo")
ret = yield func2(ret)
return ret
def func2(data):
result = yield request("http://test.com/"+data)
return result
def request(url):
# 這里模擬返回一個(gè)io操作,包含io操作的所有信息,這里用字符串簡(jiǎn)化代替
result = yield "iojob of %s" % url
return result
request,我們把 IO 操作通過(guò) yield 暴露給框架。func1 和 func2,調(diào)用 request 顯然也要加 yield 關(guān)鍵字,否則 request 調(diào)用返回一個(gè)生成器后不會(huì)暫停,繼續(xù)執(zhí)行后續(xù)邏輯顯然會(huì)出錯(cuò)。yield from、aysnc、await 時(shí)代,在 tornado 框架中寫(xiě)異步代碼的樣子。調(diào)用 func1()得到生成器調(diào)用 send(None)啟動(dòng)它得到會(huì)得到request("http://test.com/foo")的結(jié)果,還是生成器對(duì)象send(None)啟動(dòng)由request()產(chǎn)生的生成器,會(huì)得到 IO 操作,由框架注冊(cè)到ioloop并指定回調(diào)IO 完成后的回調(diào)函數(shù)內(nèi)喚醒 request生成器,生成器會(huì)走到return語(yǔ)句結(jié)束捕獲異常得到 request生成器的返回值,將上一層func1喚醒,同時(shí)又得到func2()生成器繼續(xù)執(zhí)行...
send 就能不斷得到所有 IO 操作信息并推動(dòng)調(diào)用鏈前進(jìn),實(shí)現(xiàn)方法如下:第一個(gè)生成器入棧 調(diào)用 send,如果得到生成器就入棧并進(jìn)入下一輪迭代遇到到 IO 請(qǐng)求 yield出來(lái),讓框架注冊(cè)到ioloopIO 操作完成后被喚醒,緩存結(jié)果并出棧,進(jìn)入下一輪迭代,目的讓上層函數(shù)使用 IO 結(jié)果恢復(fù)運(yùn)行 如果一個(gè)生成器運(yùn)行完畢,也需要和4一樣讓上層函數(shù)恢復(fù)運(yùn)行
send,就能整個(gè)調(diào)用鏈中的 IO,完成這些 IO,繼續(xù)推動(dòng)調(diào)用鏈內(nèi)的邏輯執(zhí)行,直到整體邏輯結(jié)束:def wrapper(gen):
# 第一層調(diào)用 入棧
stack = Stack()
stack.push(gen)
# 開(kāi)始逐層調(diào)用
while True:
# 獲取棧頂元素
item = stack.peak()
result = None
# 生成器
if isgenerator(item):
try:
# 嘗試獲取下層調(diào)用并入棧
child = item.send(result)
stack.push(child)
# result 使用過(guò)后就還原為None
result = None
# 入棧后直接進(jìn)入下次循環(huán),繼續(xù)向下探索
continue
except StopIteration as e:
# 如果自己運(yùn)行結(jié)束了,就暫存result,下一步讓自己出棧
result = e.value
else: # IO 操作
# 遇到了 IO 操作,yield 出去,IO 完成后會(huì)被用 IO 結(jié)果喚醒并暫存到 result
result = yield item
# 走到這里則本層已經(jīng)執(zhí)行完畢,出棧,下次迭代將是調(diào)用鏈上一層
stack.pop()
# 沒(méi)有上一層的話,那整個(gè)調(diào)用鏈都執(zhí)行完成了,return
if stack.empty():
print("finished")
return result
w = wrapper(func1())
# 將會(huì)得到 "iojob of http://test.com/foo"
w.send(None)
# 上個(gè)iojob foo 完成后的結(jié)果"bar"傳入,繼續(xù)運(yùn)行,得到 "iojob of http://test.com/bar"
w.send("bar")
# 上個(gè)iojob bar 完成后的結(jié)構(gòu)"barz"傳入,繼續(xù)運(yùn)行,結(jié)束。
w.send("barz")
# 維護(hù)一個(gè)就緒列表,存放所有完成的IO事件,格式為(wrapper,result)
ready = []
def on_request(request):
handler = get_handler(request)
# 使用 wrapper 包裝后,可以只通過(guò) send 處理 IO 了
g = wrapper(func1())
# 把開(kāi)始狀態(tài)直接視為結(jié)果為None的就緒狀態(tài)
ready.append((g, None))
# 讓ioloop每輪循環(huán)都執(zhí)行此函數(shù),用來(lái)處理的就緒的IO
def process_ready(self):
def call_back(g, result):
ready.append((g, result))
# 遍歷所有已經(jīng)就緒生成器,將其向下推進(jìn)
for g, result in self.ready:
# 用result喚醒生成器,并得到下一個(gè)io操作
io_job = g.send(result)
# 注冊(cè)io操作 完成后把生成器加入就緒列表,等待下一輪處理
asyncio.get_event_loop().io_call(
io_job, lambda result: ready.append((g, result)
ioloop 每輪迭代都來(lái)掃一遍,推動(dòng)就緒的狀態(tài)的生成器向下運(yùn)行,并把新的 IO 操作注冊(cè),IO 完成后再次加入就緒,經(jīng)過(guò)幾輪 ioloop 的迭代一個(gè) handler 最終會(huì)被執(zhí)行完成。0x04 提高擴(kuò)展性
sleep 幾秒,用 time.sleep() 又會(huì)讓整個(gè)線程阻塞住,就需要特殊實(shí)現(xiàn)。再比如,可以把一些 CPU 密集的操作通過(guò)多線程異步化,讓另一個(gè)線程通知事件已經(jīng)完成后再執(zhí)行后續(xù)。ioloop 來(lái)注冊(cè) IO 事件還是開(kāi)一個(gè)線程完全由你自己,并提供了一個(gè)標(biāo)準(zhǔn)「占位符」Future,表示他的結(jié)果等到未來(lái)才會(huì)有,其部分原型如下:class Future:
# 設(shè)置結(jié)果
def set_result(result): pass
# 獲取結(jié)果
def result(): pass
# 表示這個(gè)future對(duì)象是不是已經(jīng)被設(shè)置過(guò)結(jié)果了
def done(): pass
# 設(shè)置在他被設(shè)置結(jié)果時(shí)應(yīng)該執(zhí)行的回調(diào)函數(shù),可以設(shè)置多個(gè)
def add_done_callback(callback): pass
Future,讓擴(kuò)展性變得更強(qiáng)。對(duì)于用戶代碼的中的網(wǎng)絡(luò)請(qǐng)求函數(shù) request:# 現(xiàn)在 request 函數(shù),不是生成器,它返回future
def request(url):
# future 理解為占位符
fut = Future()
def callback(result):
# 當(dāng)網(wǎng)絡(luò)IO完成回調(diào)的時(shí)候給占位符賦值
fut.set_result(result)
asyncio.get_event_loop().io_call(url, callback)
# 返回占位符
return future
request 不再是一個(gè)生成器,而是直接返回 future。def process_ready(self):
def callback(fut):
# future被設(shè)置結(jié)果會(huì)被放入就緒列表
ready.append((g, fut.result()))
# 遍歷所有已經(jīng)就緒生成器,將其向下推進(jìn)
for g, result in self.ready:
# 用result喚醒生成器,得到的不再是io操作,而是future
fut = g.send(result)
# future被設(shè)置結(jié)果的時(shí)候會(huì)調(diào)用callback
fut.add_done_callback(callback)
0x05 發(fā)展和變革
tornado 的時(shí)候,大概只有一個(gè) yield 關(guān)鍵字可用,協(xié)程要想實(shí)現(xiàn),就是這么個(gè)思路,甚至 yield 關(guān)鍵字和 return 關(guān)鍵字不能一個(gè)函數(shù)里面出現(xiàn),你要想在生成器運(yùn)行完后返回一個(gè)值,需要手動(dòng) raise 一個(gè)異常,雖然效果跟現(xiàn)在 return 一樣,但寫(xiě)起來(lái)還是很別扭,不優(yōu)雅。yield from 表達(dá)式。它可以做什么?wrapper 所做的事:通過(guò)棧實(shí)現(xiàn)調(diào)用鏈遍歷的 ,它是 wrapper 邏輯的語(yǔ)法糖。def func1():
# 注意 yield from
ret = yield from request("http://test.com/foo")
# 注意 yield from
ret = yield from func2(ret)
return ret
def func2(data):
# 注意 yield from
result = yield from request("http://test.com/"+data)
return result
# 現(xiàn)在 request 函數(shù),不是生成器,它返回future
def request(url):
# 同上基于future實(shí)現(xiàn)的request
wrapper 函數(shù)了:g = func1()
# 返回第一個(gè)請(qǐng)求的 future
g.send(None)
# 繼續(xù)運(yùn)行,自動(dòng)進(jìn)入func2 并得到第它里面的那個(gè)future
g.send("bar")
# 繼續(xù)運(yùn)行,完成調(diào)用鏈?zhǔn)S噙壿嫞瑨伋鯯topIteration異常
g.send("barz")
yield from 直接打通了整個(gè)調(diào)用鏈,已經(jīng)是很大的進(jìn)步了,但是用來(lái)異步編程看著還是別扭,其他語(yǔ)言都有專門(mén)的協(xié)程 async、await 關(guān)鍵字了,直到再后來(lái)的版本把這些內(nèi)容用專用的 async、await 關(guān)鍵字包裝,才成為今天比較優(yōu)雅的樣子。0x06 總結(jié)和比較
基于 IO 多路復(fù)用技術(shù),讓整個(gè)應(yīng)用在 IO 上非阻塞,實(shí)現(xiàn)高效率 通過(guò)生成器讓分散的 callback代碼變成同步代碼,減少業(yè)務(wù)編寫(xiě)困難
Future 類比 Promise 本質(zhì)相同。gevent 算作一類,都是自己實(shí)現(xiàn) runtime,并 patch 掉系統(tǒng)調(diào)用接入自己的 runtime,自己來(lái)調(diào)度協(xié)程,gevent 專注于網(wǎng)絡(luò)相關(guān),基于網(wǎng)絡(luò) IO 調(diào)度,比較簡(jiǎn)單,而 Go 實(shí)現(xiàn)了完善的多核支持,調(diào)度更加復(fù)雜和完善,而且創(chuàng)造了基于 channel 新編程范式。來(lái)源:https://zhuanlan.zhihu.com/p/330549526 作者:毛豆花生
評(píng)論
圖片
表情
