再談協(xié)程之Callback寫出協(xié)程范兒

點(diǎn)擊上方藍(lán)字關(guān)注我,知識會給你力量

協(xié)程的出現(xiàn),顛覆了Java多年的編程風(fēng)格,如果你是一個第三方庫的作者,你可能想用Coroutines和Flow使你的基于Java回調(diào)的庫變得更加Kotlin化、協(xié)程化。從另一方面來說,如果你是一個API消費(fèi)者,你可能更愿意接入Coroutines風(fēng)格的API,使其對Kotlin更友好,也讓開發(fā)邏輯變得更加線性化。
今天來看下如何使用Coroutine和Flow簡化API,以及如何使用suspendCancellableCoroutine和callbackFlow API構(gòu)建你自己的協(xié)程風(fēng)格適配器。
Callbacks
Callbacks是異步通信的一個非常常見的解決方案。事實(shí)上,大部分Java場景下,我們都使用了它們作為Java編程語言的解決方案。然而,Callbacks也有一些缺點(diǎn):這種設(shè)計(jì)導(dǎo)致了嵌套的回調(diào),最終導(dǎo)致了難以理解的代碼,另外,異常處理也比較復(fù)雜。
在Kotlin中,你可以使用Coroutines簡化調(diào)用Callbacks,但為此你需要建立自己的適配器,將舊的Callback轉(zhuǎn)化為Kotlin風(fēng)格的協(xié)程。
構(gòu)建Adapter
在協(xié)程中,Kotlin提供了suspendCancellableCoroutine來適配One-shot回調(diào),同時(shí)提供了callbackFlow來適配數(shù)據(jù)流場景下的回調(diào)。
下面的場景中,將用一個簡單的Callbacks例子來演示下這種轉(zhuǎn)換。
One-shot async calls
假設(shè)我們有一個「NetAPI.getData」的函數(shù),返回一個Data Callback,在協(xié)程場景下,我們想讓它返回一個suspend函數(shù)。
所以,我們給NetAPI設(shè)計(jì)一個拓展函數(shù),用來返回Location的suspend函數(shù),如下所示。
suspend?fun?NetAPI.awaitGetData():?Data
由于這是一個One-shot的異步操作,我們使用可以suspendCancellableCoroutine函數(shù),suspendCancellableCoroutine執(zhí)行作為參數(shù)傳遞給它的代碼塊,然后暫停當(dāng)前Coroutine的執(zhí)行,同時(shí)等待繼續(xù)執(zhí)行的信號。當(dāng)Coroutine的Continuation對象中的resume或resumeWithException方法被調(diào)用時(shí),Coroutine將恢復(fù)執(zhí)行。
//?NetAPI的拓展函數(shù),用于返回Data
suspend?fun?NetAPI.awaitGetData():?Data?=
????//?創(chuàng)建一個可以cancelled??suspendCancellableCoroutine
????suspendCancellableCoroutine<Data>?{?continuation?->
????????val?callback?=?object?:?NetCallback?{
????????????override?fun?success(data:?Data)?{
????????????????//?Resume?coroutine?同時(shí)返回Data
????????????????continuation.resume(data)
????????????}
????????????override?fun?error(e:?String)?{
????????????????//?Resume?the?coroutine?
????????????????continuation.resumeWithException(e)
????????????}
????????}
????????addListener(callback)
????????//?結(jié)束suspendCancellableCoroutine塊的執(zhí)行,直到在任一回調(diào)中調(diào)用continuation參數(shù)
????}
?要注意的是:Coroutines庫中也能找到suspendCancellableCoroutine的不可取消版本(即suspendCoroutine),但最好總是選擇suspendCancellableCoroutine來處理Coroutine Scope的取消。
?
suspendCancellableCoroutine背后的原理
從內(nèi)部實(shí)現(xiàn)來說,suspendCancellableCoroutine使用suspendCoroutineUninterceptedOrReturn來獲取suspend函數(shù)中Coroutine的Continuation。這個Continuation對象被一個CancellableContinuation攔截,它可以用來控制當(dāng)前Coroutine的生命周期。
在這之后,傳遞給suspendCancellableCoroutine的lambda將被執(zhí)行,如果lambda返回一個結(jié)果,Coroutine將立即恢復(fù),或者將被暫停,直到CancellableContinuation從lambda中手動進(jìn)行恢復(fù)。
源碼如下所示。
public?suspend?inline?fun?<T>?suspendCancellableCoroutine(
??crossinline?block:?(CancellableContinuation<T>)?->?Unit
):?T?=
??//?Get?the?Continuation?object?of?the?coroutine?that?it's?running?this?suspend?function
??suspendCoroutineUninterceptedOrReturn?{?uCont?->
????//?Take?over?the?control?of?the?coroutine.?The?Continuation's?been
????//?intercepted?and?it?follows?the?CancellableContinuationImpl?lifecycle?now
????val?cancellable?=?CancellableContinuationImpl(uCont.intercepted(),?...)
????/*?...?*/
?
????//?Call?block?of?code?with?the?cancellable?continuation
????block(cancellable)
????????
????//?Either?suspend?the?coroutine?and?wait?for?the?Continuation?to?be?resumed
????//?manually?in?`block`?or?return?a?result?if?`block`?has?finished?executing
????cancellable.getResult()
??}
Streaming data
如果我們想獲取多個數(shù)據(jù)流(使用NetAPI.getDataList函數(shù)),我們就需要使用Flow創(chuàng)建一個數(shù)據(jù)流。理想的API應(yīng)該是這樣的。
fun?NetAPI.getDataListFlow():?Flow<Data>
要將基于回調(diào)的流媒體API轉(zhuǎn)換為Flow,我們需要使用創(chuàng)建Flow的callbackFlow構(gòu)建器。在callbackFlow lambda中,我們處于Coroutine的上下文中,因此,可以調(diào)用suspend函數(shù)。與flow構(gòu)建器不同,callbackFlow允許通過send函數(shù)從不同CoroutineContext發(fā)出值,或者通過offer函數(shù)在協(xié)程外發(fā)出值。
通常情況下,使用callbackFlow的流適配器遵循這三個通用步驟。
- 創(chuàng)建回調(diào),使用offer將元素添加到流中。
- 注冊該回調(diào)。
- 等待消費(fèi)者取消循環(huán)程序并取消對回調(diào)的注冊。
示例代碼如下所示。
//?向consumer發(fā)送Data?updates
fun?NetAPI.getDataListFlow()?=?callbackFlow<Data>?{
??//?當(dāng)前會在一個協(xié)程作用域中創(chuàng)建一個新的Flow
??//?1.?創(chuàng)建回調(diào),使用offer將元素添加到流中
??val?callback?=?object?:?NetCallback()?{
????override?fun?success(result:?Result?)?{
??????result??:?return?//?Ignore?null?responses
??????for?(data?in?result.datas)?{
????????try?{
??????????offer(data)?//?將元素添加至flow
????????}?catch?(t:?Throwable)?{
??????????//?異常處理?
????????}
??????}
????}
??}
??//?2.?注冊該回調(diào),從而獲取數(shù)據(jù)流
??requestDataUpdates(callback).addOnFailureListener?{?e?->
????close(e)?//?異常時(shí)close
??}
??//?3.?等待消費(fèi)者取消循環(huán)程序并取消對回調(diào)的注冊,這樣會suspend當(dāng)前協(xié)程,直到這個flow被關(guān)閉
??awaitClose?{
????//?移除監(jiān)聽
????removeLocationUpdates(callback)
??}
}
callbackFlow背后的原理
在協(xié)程內(nèi)部,callbackFlow會使用channel,它在概念上與阻塞隊(duì)列非常相似。channel都有容量配置,限定了可緩沖元素?cái)?shù)的上限。
在callbackFlow中所創(chuàng)建channel的默認(rèn)容量為64個元素,當(dāng)你嘗試向已經(jīng)滿的channel添加新元素時(shí),send函數(shù)會將數(shù)據(jù)提供方掛起,直到新元素有空間能加入channel為止,而offer不會將相關(guān)元素添加到channel中,并會立即返回false。
awaitClose背后的原理
awaitClose的實(shí)現(xiàn)原理其實(shí)和suspendCancellableCoroutine是一樣的,參考下下面的代碼中的注釋。
public?suspend?fun?ProducerScope<*>.awaitClose(block:?()?->?Unit?=?{})?{
??...
??try?{
????//?Suspend?the?coroutine?with?a?cancellable?continuation
????suspendCancellableCoroutine<Unit>?{?cont?->
??????//?Suspend?forever?and?resume?the?coroutine?successfully?only?
??????//?when?the?Flow/Channel?is?closed
??????invokeOnClose?{?cont.resume(Unit)?}
????}
??}?finally?{
????//?Always?execute?caller's?clean?up?code
????block()
??}
}
有啥用?
將基于回調(diào)的API轉(zhuǎn)換為數(shù)據(jù)流,這玩意兒到底有什么用呢?我們拿最常用的View.setOnClickListener來看下,它既可以看作是一個One-shot的場景,也可以看作是數(shù)據(jù)流的場景。
我們先把它改寫成suspendCancellableCoroutine形式,代碼如下所示。
suspend?fun?View.awaitClick(block:?()?->?Unit):?View?=?suspendCancellableCoroutine?{?continuation?->
????setOnClickListener?{?view?->
????????if?(view?==?null)?{
????????????continuation.resumeWithException(Exception("error"))
????????}?else?{
????????????block()
????????????continuation.resume(view)
????????}
????}
}
使用:
lifecycleScope.launch?{
????binding.test.awaitClick?{
????????Toast.makeText(this@MainActivity,?"loading",?Toast.LENGTH_LONG).show()
????}
}
嗯,有點(diǎn)一言難盡的感覺,就差脫褲子放屁了。我們再把它改成數(shù)據(jù)流的場景。
fun?View.clickFlow():?Flow<View>?{
????return?callbackFlow?{
????????setOnClickListener?{
????????????trySend(it)?//?offer函數(shù)被Deprecated了,使用trySend替代
????????}
????????awaitClose?{?setOnClickListener(null)?}
????}
}
使用:
lifecycleScope.launch?{
????binding.test.clickFlow().collect?{
????????Toast.makeText(this@MainActivity,?"loading",?Toast.LENGTH_LONG).show()
????}
}
好了,屁是完全放出來了。
可以發(fā)現(xiàn),這種場景下,強(qiáng)行硬套這種模式,其實(shí)并沒有什么卵用,反而會讓別人覺得你是個智障。
那么到底什么場景需要使用呢?我們可以想想,為什么需要Callbback。
大部分Callback hell的場景,都是異步請求,也就是帶阻塞的那種,或者就是數(shù)據(jù)流式的數(shù)據(jù)產(chǎn)出,所以這種僅僅是調(diào)用個閉包的回調(diào),其實(shí)不能叫回調(diào),它只是一個lambda,所以,我們再來看一個例子。
現(xiàn)在有一個TextView,顯示來自一個Edittext的輸入內(nèi)容。這樣一個場景就是一個明確的數(shù)據(jù)流場景,主要是利用Edittext的TextWatcher中的afterTextChanged回調(diào),我們將它改寫成Flow形式,代碼如下所示。
fun?EditText.afterTextChangedFlow():?Flow<Editable?>?{
????return?callbackFlow?{
????????val?watcher?=?object?:?TextWatcher?{
????????????override?fun?afterTextChanged(s:?Editable?)?{
????????????????trySend(s)
????????????}
????????????override?fun?beforeTextChanged(s:?CharSequence?,?start:?Int,?count:?Int,?after:?Int)?{}
????????????override?fun?onTextChanged(s:?CharSequence?,?start:?Int,?before:?Int,?count:?Int)?{}
????????}
????????addTextChangedListener(watcher)
????????awaitClose?{?removeTextChangedListener(watcher)?}
????}
}
使用:
lifecycleScope.launch?{
????with(binding)?{
????????test.afterTextChangedFlow().collect?{?show.text?=?it?}
????}
}
有點(diǎn)意思了,我沒寫回調(diào),但是也拿到了數(shù)據(jù)流,嗯,其實(shí)有點(diǎn)「強(qiáng)行可以」的感覺。
但是,一旦這里變成了Flow,這就變得很有味道了,這可是Flow啊,我們可以利用Flow那么多的操作符,做很多有意思的事情了。
舉個例子,我們可以對輸入框做限流,這個場景很常見,例如搜索,用戶輸入的內(nèi)容會自動搜索,但是又不能一輸入內(nèi)容就搜索,這樣會產(chǎn)生大量的無效搜索內(nèi)容,所以,這個場景也有個專有名詞——輸入框防抖。
之前在處理類似的需求時(shí),大部分都是采用RxJava的方式,但現(xiàn)在,我們有了Flow,可以在滿足協(xié)程范API的場景下,依然完成這個功能。
我們增加一下debounce即可。
lifecycleScope.launch?{
????with(binding)?{
????????test.afterTextChangedFlow()
????????????.buffer(Channel.CONFLATED)
????????????.debounce(300)
????????????.collect?{
????????????????show.text?=?it
????????????????//?來點(diǎn)業(yè)務(wù)處理
????????????????viewModel.getSearchResult(it)
????????????}
????}
}
甚至你還可以增加一個背壓策略,再來個debounce,在流停止后,完成數(shù)據(jù)收集。
?當(dāng)然你還可以把buffer和debounce直接寫到afterTextChangedFlow返回的Flow中,作為當(dāng)前場景的默認(rèn)處理。
?
參考資料:
https://medium.com/androiddevelopers/simplifying-apis-with-coroutines-and-flow-a6fb65338765
向大家推薦下我的網(wǎng)站?https://xuyisheng.top/??點(diǎn)擊原文一鍵直達(dá)
專注 Android-Kotlin-Flutter 歡迎大家訪問
往期推薦
本文原創(chuàng)公眾號:群英傳,授權(quán)轉(zhuǎn)載請聯(lián)系微信(Tomcat_xu),授權(quán)后,請?jiān)谠瓌?chuàng)發(fā)表24小時(shí)后轉(zhuǎn)載。< END >作者:徐宜生
更文不易,點(diǎn)個“三連”支持一下??
