抽絲剝繭Kotlin - 協(xié)程
前言
文章接上篇,這一篇我們好好聊一聊協(xié)程的原理,通過上一篇的學(xué)習(xí),相信大家對于如何使用協(xié)程已經(jīng)非常熟悉了。
故事還得從上次的協(xié)程分享開始,由于大家對協(xié)程的實踐并不多,所以大家對下面的這段代碼如何執(zhí)行爭論不休:
GlobalScope.launch?{
????val?a?=?async?{
????????1+2
????}
????val?b?=?async?{
????????1+3
????}
????val?c?=?a?+?b
????Log.e(TAG,"result:$c")
}
有人說,a 和 b 會串行執(zhí)行,有人說,a 和 b 會并行執(zhí)行,那么執(zhí)行的結(jié)果到底是什么樣的?我們將在下面的文章給出。

本個系列文章分為三篇,本文是第二篇:
“《即學(xué)即用Kotlin - 協(xié)程》
《抽絲剝繭Kotlin - 協(xié)程基礎(chǔ)篇》
《抽絲剝繭Kotlin - 協(xié)程Flow篇》
一、結(jié)構(gòu)簡要介紹
首先,我們得明確協(xié)程中有哪些東西,如果你會使用協(xié)程,那你肯定知道協(xié)程中有 CoroutineScope、CoroutineContext 和 CoroutineDispatcher,這些都是使用過程中我們可以接觸到的 API。
我簡單的整理了協(xié)程中主要的基礎(chǔ)類:

協(xié)程的類結(jié)構(gòu)可分為三部分:CoroutineScope、CoroutineContext 和 Continuation。
1. Continuation
如果你會使用協(xié)程,那你肯定知道,協(xié)程遇到耗時 suspend 操作可以掛起,等到任務(wù)結(jié)束的時候,協(xié)程會自動切回來。
它的奧秘就是 Continuation,Continuation 可以理解程續(xù)體,你可以理解其每次在協(xié)程掛起點(diǎn)將剩余的代碼包括起來,等到結(jié)束以后執(zhí)行剩余的內(nèi)容。一個協(xié)程的代碼塊可能會被切割成若干個 Continuation,在每個需要掛起的地方都會分配一個 Continuation。
先拋出一些結(jié)論,協(xié)程在做耗時操作的時候,如果執(zhí)行了耗時 suspend 操作,會自動掛起,但是這個耗時操作終究是要做的,只不過切換到其他線程去做了,做完以后協(xié)程就需要切回來,但是切到哪兒呢?這便是 Continuation 需要解決的問題。
Continuation 的流程是這樣的:
無論是使用 launch 還是 async 啟動的協(xié)程,都會有一個結(jié)束的時候用來回調(diào)的 continuation。
2. CoroutineScope
關(guān)于 CoroutineScope 沒有特別多要說的,它持有了 CoroutineContext,主要對協(xié)程的生命周期進(jìn)行管理。
3. CoroutineContext
一開始看 CoroutineContext 覺得特別暈,不明白為啥要這么設(shè)計,看了 Bennyhuo 大佬的文章以后才稍微好轉(zhuǎn)。
從上面協(xié)程的類的機(jī)構(gòu)中可以看出,光看這個 CoroutineContext 這個接口(源碼內(nèi)容我們下面講),會發(fā)現(xiàn)它有點(diǎn)像 List 集合,而繼承自 CoroutineContext 接口的 Element 接口則定義了其中的元素。
隨后,這個 Element 接口被劃分成了兩種類,Job 和 ContinuationInterceptor:
Job:從字面上來講,它代表一個任務(wù),Thread也是執(zhí)行任務(wù),所以我們可以理解它定義了協(xié)程的一些東西,比如協(xié)程的狀態(tài),協(xié)程和子協(xié)程的管理方式等等。ContinuationInterceptor:也從字面上來看,它是Continuation的攔截器,通過攔截Continuation,完成我們想要完成的工作,比如說線程的切換。
二、結(jié)構(gòu)源碼分析
上面我們從概念上介紹了協(xié)程的三大件,在這部分,我們從源碼分析。
1. Continuation
suspend 修飾的方法會在在編譯期間被編譯器做特殊處理,這種處理被成為CPS(續(xù)體轉(zhuǎn)換風(fēng)格) 轉(zhuǎn)化,suspend 方法會被包裹成 Continuation。
說了這么久的 Continuation,我們還沒有見過接口代碼,由于接口內(nèi)容不多,我就把所有的內(nèi)容貼出來了:
/**
?*?Interface?representing?a?continuation?after?a?suspension?point?that?returns?a?value?of?type?`T`.
?*/
@SinceKotlin("1.3")
public?interface?Continuation<in?T>?{
????/**
?????*?The?context?of?the?coroutine?that?corresponds?to?this?continuation.
?????*/
????public?val?context:?CoroutineContext
????/**
?????*?Resumes?the?execution?of?the?corresponding?coroutine?passing?a?successful?or?failed?[result]?as?the
?????*?return?value?of?the?last?suspension?point.
?????*/
????public?fun?resumeWith(result:?Result)
}
我們重點(diǎn)關(guān)注Continuation#resumeWith()方法,從注釋來看,通過返回 suspend 掛起點(diǎn)的值來恢復(fù)協(xié)程的執(zhí)行,協(xié)程可以從參數(shù) Result 獲取成功的值或者失敗的結(jié)果,如果沒有結(jié)果,那么 Result 的泛型是 Unit。Resulut 這個類也特別簡單,感興趣的同學(xué)可以查看源碼。
BaseContinuationImpl 實現(xiàn)了 Continuation 接口,我們看一下 Continuation#resumeWith 方法的實現(xiàn):
internal?abstract?class?BaseContinuationImpl(
????//?完成后調(diào)用的?Continuation
????public?val?completion:?Continuation?
)?:?Continuation,?CoroutineStackFrame,?Serializable?{
????
????public?final?override?fun?resumeWith(result:?Result)?{
????????//?This?loop?unrolls?recursion?in?current.resumeWith(param)?to?make?saner?and?shorter?stack?traces?on?resume
????????var?current?=?this
????????var?param?=?result
????????while?(true)?{
????????????probeCoroutineResumed(current)
????????????with(current)?{
????????????????val?completion?=?completion!!?//?fail?fast?when?trying?to?resume?continuation?without?completion
????????????????val?outcome:?Result?=
????????????????????try?{
????????????????????????//?1.?執(zhí)行?suspend?中的代碼塊
????????????????????????val?outcome?=?invokeSuspend(param)
????????????????????????//?2.?如果代碼掛起就提前返回
????????????????????????if?(outcome?===?COROUTINE_SUSPENDED)?return
????????????????????????//?3.?返回結(jié)果
????????????????????????Result.success(outcome)
????????????????????}?catch?(exception:?Throwable)?{
????????????????????????//?3.?返回失敗結(jié)果
????????????????????????Result.failure(exception)
????????????????????}
????????????????releaseIntercepted()?//?this?state?machine?instance?is?terminating
????????????????if?(completion?is?BaseContinuationImpl)?{
????????????????????//?4.?如果?completion?中還有子?completion,遞歸
????????????????????current?=?completion
????????????????????param?=?outcome
????????????????}?else?{
????????????????????//?5.?結(jié)果通知
????????????????????completion.resumeWith(outcome)
????????????????????return
????????????????}
????????????}
????????}
????}
}
主要的過程我在注釋中已經(jīng)標(biāo)注出來了,我來解釋一下 Continuation 的機(jī)制。
每個 suspend 方法生成的 BaseContinuationImpl,其構(gòu)造方法有一個參數(shù)叫 completion,它也是一個 Continuation,它的調(diào)用時機(jī)是在 suspen 方法執(zhí)行完畢的時候。我們后面稱

這個流程展示給我們的內(nèi)容很直觀了,簡單起見,我們直接看3、4和5這一個 launch 啟動流程就好,通常一個 launch 生成一個外層 Continuation一個相應(yīng)的結(jié)果 Continuation,我們后面稱結(jié)果 continuation 為 complete,Continuation 調(diào)用順序是:
調(diào)用外層 Continuation中的Continuation#resumeWith()方法。該方法會去執(zhí)行 launch包裹的代碼塊,并返回一個結(jié)果。將上述代碼塊執(zhí)行的結(jié)果交給 completion,由它完成協(xié)程結(jié)束的通知。
上述的過程只存在于一個 launch 并且里面沒有執(zhí)行其他耗時的掛起操作,對于這些情況,我們將會在下面的文章討論。
拋出問題一:可以看到,在注釋2,遇到耗時的 suspend,返回的結(jié)果是一個 COROUTINE_SUSPENDED,后面會直接返回,耗時操作結(jié)束的時候,我們的 completion 怎么恢復(fù)呢?
2. CoroutineContext 和 Element
在概要分析的時候,我們說 CoroutineContext 的結(jié)構(gòu)像一個集合,是從它的接口得出結(jié)論的:
public?interface?CoroutineContext?{
????//?get?方法,通過?key?獲取
????public?operator?fun??get(key:?Key):?E?
????//?累加操作
????public?fun??fold(initial:?R,?operation:?(R,?Element)?->?R):?R
????//?操作符?+?,?實際的實現(xiàn)調(diào)用了?fold?方法
????public?operator?fun?plus(context:?CoroutineContext):?CoroutineContext
????//?移除操作
????public?fun?minusKey(key:?Key<*>):?CoroutineContext
????
????//?CoroutineContext?定義的?Key
????public?interface?Key
????//?CoroutineContext?中元素的定義
????public?interface?Element?:?CoroutineContext?{
????????//?key
????????public?val?key:?Key<*>
????????//...
????}
}
從中我們可以大致看出,CoroutineContext 中可以通過 Key 來獲取元素 Element,并且 Element 接口也是繼承自 CoroutineContext 接口。
除此以外,CoroutineContext 支持增加和移除操作,并且支持 + 操作符來完成增加。+ 操作符即 plus 方法是有具體實現(xiàn)的,感興趣的可以自己看一下,主要涉及到了攔截器 ContinuationInterceptor 的添加。
1.1 Job
Job 的注釋中闡述定義是這樣的:
“A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.
從中我們可以得出:
后臺任務(wù) 可取消 生命周期在完成它的時候結(jié)束
從后臺任務(wù)的角度來看,Job 聽著有點(diǎn)像 Thread,和 Thread 一樣,Job 也有各種狀態(tài),文檔中對 Job 各種狀態(tài)的注釋(感覺大佬們的注釋寫的真棒~):
Job 另一個值得關(guān)注的點(diǎn)是對子 Job 的管理,主要的規(guī)則如下:
子 Job都會結(jié)束的時候,父Job才會結(jié)束父 Job取消的時候,子Job也會取消
上述的一些內(nèi)容都可以從 Job 的接口文檔中得出。那么,Job哪里來的?如果你看一下CoroutineScope#launch方法,你就會得出結(jié)論,該方法的返回類型就是 Job,我們每次調(diào)用該方法,都會創(chuàng)建一個 Job。
1.2 ContinuationInterceptor
顧名思義,Continuation 攔截器,先看接口:
interface?ContinuationInterceptor?:?CoroutineContext.Element?{
????//?ContinuationInterceptor?在?CoroutineContext?中的?Key
????companion?object?Key?:?CoroutineContext.Key
????/**
?????*?攔截?continuation
?????*/
????fun? ?interceptContinuation(continuation:?Continuation<T>):?Continuation
????//...
}
這個接口可以提煉的就這兩個信息:
攔截器的 Key,也就是說,無論你后面一個CoroutineContext放了多少個攔截器,Key為ContinuationInterceptor的攔截器只能有一個。我們都知道, Continuation在調(diào)用其Continuation#resumeWith()方法,會執(zhí)行其suspend修飾的函數(shù)的代碼塊,如果我們提前攔截到,是不是可以做點(diǎn)其他事情,比如說切換線程,這也是ContinuationInterceptor的作用之一。
需要說明一下,我們通過 Dispatchers 來指定協(xié)程發(fā)生的線程,Dispatchers 實現(xiàn)了 ContinuationInterceptor接口。
3. CoroutineScope
CoroutineScope 的接口很簡單:
public?interface?CoroutineScope?{
????public?val?coroutineContext:?CoroutineContext
}
它要求后續(xù)的實現(xiàn)都要提供 CoroutineContext,不過我們都知道,CoroutineContext 是協(xié)程中很重要的東西,既包括 Job,也包括調(diào)度器。
在上面的代碼中,我多次使用了 Android Jetpack 中的 Lifecycle 中協(xié)程的擴(kuò)展庫,好處我們獲取 CoroutineScope 更加簡單,無需在組件 onDestroy 的時候手動 cancel,并且它的源碼超級簡單,前提是你會使用 Lifecycle:
internal?class?LifecycleCoroutineScopeImpl(
????override?val?lifecycle:?Lifecycle,
????override?val?coroutineContext:?CoroutineContext
)?:?LifecycleCoroutineScope(),?LifecycleEventObserver?{
????//?...
????override?fun?onStateChanged(source:?LifecycleOwner,?event:?Lifecycle.Event)?{
????????if?(lifecycle.currentState?<=?Lifecycle.State.DESTROYED)?{
????????????lifecycle.removeObserver(this)
????????????coroutineContext.cancel()
????????}
????}
}
并且它也支持你在指定的生命周期調(diào)用協(xié)程,大家看一下接口就明白了。
三、過程源碼分析
先上一段使用代碼:
lifecycleScope.launch(Dispatchers.Main)?{
????val?a?=?async?{?getResult(1,?2)?}
????val?b?=?async?{?getResult(3,?5)?}
????val?c?=?a.await()?+?b.await()
????Log.e(TAG,?"result:$c")
}?
suspend?fun?getResult(a:?Int,?b:?Int):?Int?{
????return?withContext(Dispatchers.IO)?{
????????delay(1000)
????????return@withContext?a?+?b
????}
}
雖然代碼很簡單,但是源碼還是比較復(fù)雜的,我們分步講。
第一步 獲取 CoroutineScope
我已經(jīng)在上面說明了,我們使用的 Lifecycle 的協(xié)程拓展庫,如果我們不使用拓展庫,就得使用 MainScope,它們的 CoroutineContext 都是一樣的:
public?fun?MainScope():?CoroutineScope?=?ContextScope(SupervisorJob()?+?Dispatchers.Main)
//?LifecycleCoroutineScope
val?Lifecycle.coroutineScope:?LifecycleCoroutineScope
????get()?{
????????while?(true)?{
????????????//?...
????????????val?newScope?=?LifecycleCoroutineScopeImpl(
????????????????this,
????????????????SupervisorJob()?+?Dispatchers.Main.immediate
????????????)
????????????//?...
????????????return?newScope
????????}
????}
顯而易見,MainScope 和 LifecycleCoroutineScope 都使用了 SupervisorJob() + Dispatchers.Main, 作為它們的 CoroutineContext。
說明一下,SupervisorJob 和Dispatchers.Main 很重要,它們分別代表了CoroutineContext 之前提及的 Job 和 ContinuationInterceptor,后面用到的時候再分析。
第二步 啟動協(xié)程
直接進(jìn)入 CoroutineScope#launch() 方法:
public?fun?CoroutineScope.launch(
????context:?CoroutineContext?=?EmptyCoroutineContext,
????start:?CoroutineStart?=?CoroutineStart.DEFAULT,
????block:?suspend?CoroutineScope.()?->?Unit
):?Job?{
????val?newContext?=?newCoroutineContext(context)
????val?coroutine?=?if?(start.isLazy)
????????LazyStandaloneCoroutine(newContext,?block)?else
????????StandaloneCoroutine(newContext,?active?=?true)
????coroutine.start(start,?coroutine,?block)
????return?coroutine
}
上面的方法一共有三個參數(shù),前兩個不作過多介紹,第三個參數(shù):
block:?suspend?CoroutineScope.()?->?Unit)
這是一個方法,是一個 lambda 參數(shù),同時也表明了它需要被 suspend 修飾。繼續(xù)看 launch 方法,發(fā)現(xiàn)它主要做了兩件事:
組合新的 CoroutineContext再創(chuàng)建一個 Continuation
組合新的CoroutineContext
在第一行代碼 val newContext = newCoroutineContext(context) 做了第一件事,這里的 newCoroutineContext(context) 是一個擴(kuò)展方法:
public?actual?fun?CoroutineScope.newCoroutineContext(context:?CoroutineContext):?CoroutineContext?{
????val?combined?=?coroutineContext?+?context
????val?debug?=?if?(DEBUG)?combined?+?CoroutineId(COROUTINE_ID.incrementAndGet())?else?combined
????return?if?(combined?!==?Dispatchers.Default?&&?combined[ContinuationInterceptor]?==?null)
????????debug?+?Dispatchers.Default?else?debug
}
CoroutineScope 使用本身的 coroutineContext 集合,利用 + 操作符將我們在 launch 方法中提供的 coroutineContext 添加進(jìn)來。
再創(chuàng)建一個Continuation
回到上一段代碼,通常我們不會指定 start 參數(shù),所以它會使用默認(rèn)的 CoroutineStart.DEFAULT,最終 coroutine 會得到一個 StandaloneCoroutine。
StandaloneCoroutine 實現(xiàn)自 AbstractCoroutine,翻開上面的類圖,你會發(fā)現(xiàn),它實現(xiàn)了 Continuation、Job 和 CoroutineScope 等一堆接口。需要說明一下,這個 StandaloneCoroutine 其實是我們當(dāng)前 Suspend Contination 的 complete。
接著會調(diào)用
coroutine.start(start,?coroutine,?block)
這就表明協(xié)程開始啟動了。
第三步 start
進(jìn)入到 AbstractCoroutine#start 方法:
public?fun??start(start:?CoroutineStart,?receiver:?R,?block:?suspend?R.()?->?T)?{
????initParentJob()
????start(block,?receiver,?this)
}
跳過層層嵌套,最后到達(dá)了:
internal?fun? ?(suspend?(R)?->?T).startCoroutineCancellable(receiver:?R,?completion:?Continuation)?=
????runSafely(completion)?{
????????//?外面再包一層?Coroutine
????????createCoroutineUnintercepted(receiver,?completion)
????????????//?如果需要,做攔截處理
????????????.intercepted()
????????????//?調(diào)用?resumeWith?方法??????
????????????.resumeCancellableWith(Result.success(Unit))
????}
雖然這僅僅是一個函數(shù),但是后面主要的邏輯都揭露了:
創(chuàng)建一個沒有攔截過的 Continuation。攔截 Continuation。執(zhí)行 Continuation#resumeWith方法。
第四步 又創(chuàng)建 Continuation
我這里用了 又,因為我們在 launch 中已經(jīng)創(chuàng)建了一個 AbstractContinuaion,不過它是一個 complete,從各個函數(shù)的行參就可以看出來。
不過我們 suspend 修飾的外層 Continuation 還沒有創(chuàng)建,它來了,是 SuspendLambda,它繼承自 ContinuationImpl,如果你問我為什么源碼中沒找到具體實現(xiàn),我覺得可能跟 suspend 修飾符有關(guān),由編譯器處理,但是調(diào)用棧確實是這樣的:
看一下 SuspendLambda 類的實現(xiàn):
internal?abstract?class?SuspendLambda(
????public?override?val?arity:?Int,
????completion:?Continuation?
)?:?ContinuationImpl(completion),?FunctionBase,?SuspendFunction?{
????constructor(arity:?Int)?:?this(arity,?null)
????//...
}
可以看到,它的構(gòu)造方法的形參就包括一個 complete。
第五步 攔截處理
回到:
internal?fun? ?(suspend?(R)?->?T).startCoroutineCancellable(receiver:?R,?completion:?Continuation)?=
????runSafely(completion)?{
????????//?外面再包一層?Coroutine
????????createCoroutineUnintercepted(receiver,?completion)
????????????//?如果需要,做攔截處理
????????????.intercepted()
????????????//?調(diào)用?resumeWith?方法??????
????????????.resumeCancellableWith(Result.success(Unit))
????}
里面的攔截方法 Continuation#intercepted() 方法是一個擴(kuò)展方法:
@SinceKotlin("1.3")
public?actual?fun??Continuation.intercepted():?Continuation?=
????(this?as??ContinuationImpl)?.intercepted()??:?this
createCoroutineUnintercepted(receiver, completion) 返回的是一個 SuspendLambda,所以它肯定是一個 ContinuationImpl,看一下它的攔截方法的實現(xiàn):
internal?abstract?class?ContinuationImpl(
????completion:?Continuation?,
????private?val?_context:?CoroutineContext?
)?:?BaseContinuationImpl(completion)?{
????constructor(completion:?Continuation?)?:?this(completion,?completion?.context)
????public?override?val?context:?CoroutineContext
????????get()?=?_context!!
????public?fun?intercepted():?Continuation?=
????????intercepted
?????????????:?(context[ContinuationInterceptor]?.interceptContinuation(this)??:?this)
????????????????.also?{?intercepted?=?it?}
????//?...
}
在 ContinuationImpl#intercepted()方法中,直接利用 context 這個數(shù)據(jù)結(jié)構(gòu)通過 context[ContinuationInterceptor] 獲取攔截器。
CoroutineDispatcher攔截實現(xiàn)
我們都知道 ContinuationInterceptor 具有攔截作用,它的直接實現(xiàn)是 CoroutineDispatcher 這個抽象類,所有其他調(diào)度器都直接或者間接繼承這個類,我們關(guān)注一下它的攔截方法:
public?abstract?class?CoroutineDispatcher?:
????AbstractCoroutineContextElement(ContinuationInterceptor),?ContinuationInterceptor?{
????//...
????public?abstract?fun?dispatch(context:?CoroutineContext,?block:?Runnable)
????//?1.攔截的?Continuation?被包了一層?DispatchedContinuation
????public?final?override?fun??interceptContinuation(continuation:?Continuation):?Continuation?=
????????DispatchedContinuation(this,?continuation)
????//...
}
internal?class?DispatchedContinuation<in?T>(
????@JvmField?val?dispatcher:?CoroutineDispatcher,
????@JvmField?val?continuation:?Continuation
)?:?DispatchedTask(MODE_ATOMIC_DEFAULT),?CoroutineStackFrame,?Continuation?by?continuation?{
????//?...
????override?fun?resumeWith(result:?Result)?{
????????//?...
????????if?(dispatcher.isDispatchNeeded(context))?{
????????????//?2.?后面一個參數(shù)需要提供?Runnable,父類已經(jīng)實現(xiàn)
????????????dispatcher.dispatch(context,?this)
????????}?
????????//...
????}
????//?...
}
//?SchedulerTask?是一個?Runnable
internal?abstract?class?DispatchedTask<in?T>(
????@JvmField?public?var?resumeMode:?Int
)?:?SchedulerTask()?{
????//?...
????public?final?override?fun?run()?{
????????//?...
????????try?{
????????????//...
????????????withCoroutineContext(context,?delegate.countOrElement)?{
????????????????//?3.?continuation?是?DispatchedContinuation?包裹的?continuation
????????????????continuation.resume(...)
????????????}
????????}
????????//...
????}
}
簡單來說,就是對原有的 Continuation 的 resumeWith 操作加了一層攔截,就像這樣:

加入 CoroutineDispatcher 以后,執(zhí)行真正的 Continue#resumeWith() 之前,會執(zhí)行 CoroutineDispatcher#dispatch() 方法,所以我們現(xiàn)在關(guān)注 CoroutineDispatcher#dispatch 具體實現(xiàn)即可。
講一個CoroutineDispatcher具體實現(xiàn)
首先我們得明確這個 CoroutineDispatcher 來自哪里?它從 context 獲取,context來自哪里?
注意 SuspendLambda 和 ContinuationImpl 的構(gòu)造方法,SuspendLambda 中的參數(shù)沒有 CoroutineContext,所以只能來自 completion 中的 CoroutineContext,而completion 的 CoroutineContext 來自 launch 方法中來自 CoroutineScope,默認(rèn)是 SupervisorJob() + Dispatchers.Main,不過只有 Dispatchers.Main 繼承了 CoroutineDispatcher。
Dispatchers.Main 是一個 MainCoroutineDispatcher,Android 中對應(yīng)的 MainCoroutineDispatcher 是 HandlerContext:
internal?class?HandlerContext?private?constructor(
????private?val?handler:?Handler,
????private?val?name:?String?,
????private?val?invokeImmediately:?Boolean
)?:?HandlerDispatcher(),?Delay?{
????public?constructor(
????????handler:?Handler,
????????name:?String??=?null
????)?:?this(handler,?name,?false)
????//...
????override?fun?dispatch(context:?CoroutineContext,?block:?Runnable)?{
????????//?利用主線程的?Handler?執(zhí)行任務(wù)
????????handler.post(block)
????}
????override?fun?scheduleResumeAfterDelay(timeMillis:?Long,?continuation:?CancellableContinuation)?{
????????//?利用主線程的?Handler?延遲執(zhí)行任務(wù),將完成的?continuation?放在任務(wù)中執(zhí)行
????????val?block?=?Runnable?{
????????????with(continuation)?{?resumeUndispatched(Unit)?}
????????}
????????handler.postDelayed(block,?timeMillis.coerceAtMost(MAX_DELAY))
????????continuation.invokeOnCancellation?{?handler.removeCallbacks(block)?}
????}
????//..
}
重點(diǎn)來了,調(diào)度任務(wù)最后竟然交給了主線程的 Handler,其實想想也對,主線程的任務(wù)最后一般都會交給主線程的 Handler。
好奇的同學(xué)可能問了,如果不是主線程呢?不是主線程就利用的線程池:
public?open?class?ExperimentalCoroutineDispatcher(
????private?val?corePoolSize:?Int,
????private?val?maxPoolSize:?Int,
????private?val?idleWorkerKeepAliveNs:?Long,
????private?val?schedulerName:?String?=?"CoroutineScheduler"
)?:?ExecutorCoroutineDispatcher()?{
????//?執(zhí)行期
????override?val?executor:?Executor
????????get()?=?coroutineScheduler
????private?var?coroutineScheduler?=?createScheduler()
????override?fun?dispatch(context:?CoroutineContext,?block:?Runnable):?Unit?=
????????try?{
????????????coroutineScheduler.dispatch(block)
????????}?catch?(e:?RejectedExecutionException)?{
????????????DefaultExecutor.dispatch(context,?block)
????????}
}
結(jié)果可以說是很清晰了,coroutineScheduler 是一個線程池,如果像了解具體的過程,同學(xué)們可以自行查看代碼。
讀到這里,你可能有一點(diǎn)明白 CoroutineContext 為什么要設(shè)計成一種數(shù)據(jù)結(jié)構(gòu):
coroutineContext[ContinuationInterceptor]就可以直接取到當(dāng)前協(xié)程的攔截器,并且一個協(xié)程只能對應(yīng)一個調(diào)度器。調(diào)度器都放在其他 coroutineContext的前面,所以在執(zhí)行協(xié)程的時候,可以做攔截處理。
同理,我們也可以使用 coroutineContext[Job] 獲取當(dāng)前協(xié)程。
第六步 resumeWith
再次回到:
internal?fun? ?(suspend?(R)?->?T).startCoroutineCancellable(receiver:?R,?completion:?Continuation)?=
????runSafely(completion)?{
????????//?外面再包一層?Coroutine
????????createCoroutineUnintercepted(receiver,?completion)
????????????//?如果需要,做攔截處理
????????????.intercepted()
????????????//?調(diào)用?resumeWith?方法??????
????????????.resumeCancellableWith(Result.success(Unit))
????}
現(xiàn)在我們看 Continue#resumeCancellableWith() 方法,它是一個擴(kuò)展方法,里面的調(diào)度邏輯是:
DispatchContinuation#resumeCancellableWithCoroutineDispatcher#dispatchContinuation#resumeWith
這里的 Continuation 就是 SuspendLambda,它繼承了 BaseContinuationImpl,我們看一下它的實現(xiàn)方法:
internal?abstract?class?BaseContinuationImpl(
????public?val?completion:?Continuation?
)?:?Continuation,?CoroutineStackFrame,?Serializable?{
????//?This?implementation?is?final.?This?fact?is?used?to?unroll?resumeWith?recursion.
????public?final?override?fun?resumeWith(result:?Result)?{
????????//?This?loop?unrolls?recursion?in?current.resumeWith(param)?to?make?saner?and?shorter?stack?traces?on?resume
????????var?current?=?this
????????var?param?=?result
????????while?(true)?{
????????????//?Invoke?"resume"?debug?probe?on?every?resumed?continuation,?so?that?a?debugging?library?infrastructure
????????????//?can?precisely?track?what?part?of?suspended?callstack?was?already?resumed
????????????probeCoroutineResumed(current)
????????????with(current)?{
????????????????val?completion?=?completion!!?//?fail?fast?when?trying?to?resume?continuation?without?completion
????????????????val?outcome:?Result?=
????????????????????try?{
????????????????????????//?1.?執(zhí)行?suspend?里面的代碼塊
????????????????????????val?outcome?=?invokeSuspend(param)
????????????????????????//?2.?如果代碼塊里面執(zhí)行了掛起方法,會提前返回
????????????????????????if?(outcome?===?COROUTINE_SUSPENDED)?return
????????????????????????Result.success(outcome)
????????????????????}?catch?(exception:?Throwable)?{
????????????????????????Result.failure(exception)
????????????????????}
????????????????releaseIntercepted()?//?this?state?machine?instance?is?terminating
????????????????if?(completion?is?BaseContinuationImpl)?{
????????????????????//?3.?如果完成的completion也是BaseContinuationImpl,就會進(jìn)入循環(huán)
????????????????????current?=?completion
????????????????????param?=?outcome
????????????????}?else?{
????????????????????//?4.?執(zhí)行?completion?resumeWith?方法?
????????????????????completion.resumeWith(outcome)
????????????????????return
????????????????}
????????????}
????????}
????}
}
這邊被我分為2個部分:
執(zhí)行 suspend方法,并獲取結(jié)果調(diào)用 complete(放在下一步講)
執(zhí)行suspend方法
在第一處會先執(zhí)行 suspend 修飾的方法內(nèi)容,在方法里面可能又會調(diào)度 suspend 方法,比如說我們的實例方法:
lifecycleScope.launch(Dispatchers.Main)?{
????val?a?=?async?{?getResult(1,?2)?}
????val?b?=?async?{?getResult(3,?5)?}
????val?c?=?a.await()?+?b.await()
????Log.e(TAG,?"result:$c")
}?
suspend?fun?getResult(a:?Int,?b:?Int):?Int?{
????return?withContext(Dispatchers.IO)?{
????????delay(1000)
????????return@withContext?a?+?b
????}
}
因為我們在 getResult 執(zhí)行了延時操作,所以我們 launch 方法肯定執(zhí)行了耗時掛起方法,所以 BaseContinuationImpl#invokeSuspend 方法會返回一個 COROUTINE_SUSPENDED,結(jié)果你也看到了,該方法會提前結(jié)束。(說明一下,我沒有找到BaseContinuationImpl#invokeSuspend 方法的具體實現(xiàn),我猜可能跟編譯器有關(guān))
我猜你肯定跟我一樣好奇,遇到耗時掛起會提前返回,那么耗時掛起如何對 complete 進(jìn)行恢復(fù)的?
我們看一下 delay(1000) 這個延時操作在主線程是如何處理的:
public?suspend?fun?delay(timeMillis:?Long)?{
????if?(timeMillis?<=?0)?return?//?don't?delay
????return?suspendCancellableCoroutine?sc@?{?cont:?CancellableContinuation?->
????????cont.context.delay.scheduleResumeAfterDelay(timeMillis,?cont)
????}
}
internal?class?HandlerContext?private?constructor(
????private?val?handler:?Handler,
????private?val?name:?String?,
????private?val?invokeImmediately:?Boolean
)?:?HandlerDispatcher(),?Delay?{
????//...
????override?fun?scheduleResumeAfterDelay(timeMillis:?Long,?continuation:?CancellableContinuation)?{
????????val?block?=?Runnable?{
????????????with(continuation)?{?resumeUndispatched(Unit)?}
????????}
????????handler.postDelayed(block,?timeMillis.coerceAtMost(MAX_DELAY))
????????continuation.invokeOnCancellation?{?handler.removeCallbacks(block)?}
????}
????//...
}
可以看到,將恢復(fù)任務(wù)包了一個 Runnable,交給 Handler 的 Handler#postDelayed() 方法了。
第七步 complete resumeWith
對于 complete 的處理一般會有兩種。
complete是BaseContinuationImpl
第一種情況是我們稱之為套娃,完成回調(diào)的 Continuation 它本身也有自己的完成回調(diào) Continuation,接下來循環(huán)就對了。
調(diào)用complete的resumeWith
第二種情況,就是通過 complete 去完成回調(diào),由于 complete 是 AbstractContinuation,我們看一下它的 resumeWith:
public?abstract?class?AbstractCoroutine<in?T>(
????/**
?????*?The?context?of?the?parent?coroutine.
?????*/
????@JvmField
????protected?val?parentContext:?CoroutineContext,
????active:?Boolean?=?true
)?:?JobSupport(active),?Job,?Continuation,?CoroutineScope?{
????//?...
????public?final?override?fun?resumeWith(result:?Result<T>)?{
????????// 1. 獲取當(dāng)前協(xié)程的技術(shù)狀態(tài)
????????val?state?=?makeCompletingOnce(result.toState())
????????// 2. 如果當(dāng)前還在等待完成,說明還有子協(xié)程沒有結(jié)束
????????if?(state?===?COMPLETING_WAITING_CHILDREN)?return
????????//?3.?執(zhí)行結(jié)束恢復(fù)的方法,默認(rèn)為空
????????afterResume(state)
????}
????//?這是父類?JobSupport?中的?makeCompletingOnce?方法
????//?為了方便查看,我復(fù)制過來
????internal?fun?makeCompletingOnce(proposedUpdate:?Any?):?Any??{
????????loopOnState?{?state?->
????????????//?tryMakeCompleting?的內(nèi)容主要根據(jù)是否有子Job做不同處理
????????????val?finalState?=?tryMakeCompleting(state,?proposedUpdate)
????????????when?{
????????????????finalState?===?COMPLETING_ALREADY?->
????????????????????throw?IllegalStateException(
????????????????????????"Job?$this?is?already?complete?or?completing,?"?+
????????????????????????????????"but?is?being?completed?with?$proposedUpdate",?proposedUpdate.exceptionOrNull
????????????????????)
????????????????finalState?===?COMPLETING_RETRY?->?return@loopOnState
????????????????else?->?return?finalState?//?COMPLETING_WAITING_CHILDREN?or?final?state
????????????}
????????}
????}
}
這段代碼的意思其實也很簡單,就是協(xié)程即將完成,得先評估一下協(xié)程的技術(shù)狀態(tài),別協(xié)程還有東西在運(yùn)行,就給結(jié)束了。對于一些有子協(xié)程的一些協(xié)程,會等待子協(xié)程結(jié)束的時候,才會結(jié)束當(dāng)前協(xié)程。
一個 launch 的過程大概就是這樣了。大致的流程圖是這樣的:

下面我們再談?wù)?async。
四、關(guān)于async
async 和 launch 的代碼相似度很高:
public?fun? ?CoroutineScope.async(
????context:?CoroutineContext?=?EmptyCoroutineContext,
????start:?CoroutineStart?=?CoroutineStart.DEFAULT,
????block:?suspend?CoroutineScope.()?->?T
):?Deferred?{
????val?newContext?=?newCoroutineContext(context)
????val?coroutine?=?if?(start.isLazy)
????????LazyDeferredCoroutine(newContext,?block)?else
????????DeferredCoroutine(newContext,?active?=?true)
????coroutine.start(start,?coroutine,?block)
????return?coroutine
}
最終也會進(jìn)行三步走:
internal?fun??(suspend?(R)?->?T).startCoroutineCancellable(receiver:?R,?completion:?Continuation)?=
????runSafely(completion)?{
????????//?外面再包一層?Coroutine
????????createCoroutineUnintercepted(receiver,?completion)
????????????//?如果需要,做攔截處理
????????????.intercepted()
????????????//?調(diào)用?resumeWith?方法??????
????????????.resumeCancellableWith(Result.success(Unit))
????}
不同的是,async 返回的是一個 Deferred,我們需要調(diào)用 Deferred#await() 去獲取返回結(jié)果,它的實現(xiàn)在 JobSupport:
private?open?class?DeferredCoroutine(
????parentContext:?CoroutineContext,
????active:?Boolean
)?:?AbstractCoroutine(parentContext,?active),?Deferred,?SelectClause1?{
????//?...?awaitInternal方法來自父類?JobSupport
????override?suspend?fun?await():?T?=?awaitInternal()?as?T
????//?...
????//?這是?JobSupport?中的實現(xiàn)
????internal?suspend?fun?awaitInternal():?Any??{
????????//?循環(huán)獲取結(jié)果
????????while?(true)?{?//?lock-free?loop?on?state
????????????val?state?=?this.state
????????????//?1.?如果處于完成狀態(tài)
????????????if?(state?!is?Incomplete)?{
????????????????if?(state?is?CompletedExceptionally)?{?//?Slow?path?to?recover?stacktrace
????????????????????recoverAndThrow(state.cause)
????????????????}
????????????????return?state.unboxState()
????????????}
????????????//?2.?除非需要重試,不然就?break
????????????if?(startInternal(state)?>=?0)?break?
????????}
????????//?等待掛起的方法
????????return?awaitSuspend()?//?slow-path
????}
}
它的具體過程可以從我的注釋看出,就不一一介紹了,感興趣的同學(xué)可以查看源碼。
1. 本文一開始的討論
本文一開始的代碼是錯的,連編譯器都過不了,尷尬~
正確的代碼應(yīng)該是:
GlobalScope.launch?{
????val?a?=?async?{
????????1+2
????}
????val?b?=?async?{
????????1+3
????}
????val?c?=?a.await()?+?bawait()
????Log.e(TAG,"result:$c")
}
如果是正確的代碼,這里可能分兩種情況:
如果你放在UI線程,那肯定是串行的,這時候有人說,我在 a 里使用 delay(1000),在 b 里使用 delay(2000),得到 c 的時候就花了 2000 毫秒啊,這不是并行嗎?事情并不是這樣的,delay 操作使用了 Handler#postDelay 方法,一個延遲了 1000 毫秒執(zhí)行,一個延遲了 2000 毫秒執(zhí)行,但是主線程只有一個,所以只能是串行。
如果是子線程,通常都是并行的,因為我們使用了線程池啊~
總結(jié)
寫這邊源碼分析的時候,一些細(xì)節(jié)總是找不到,比如說 suspendLambda 的子類找不到,自己對 Kotlin 的學(xué)習(xí)有待深入。

所以本文有些地方還值得商榷,如果你有更好的理解,歡迎下方交流。
關(guān)于我
我是九心,新晉互聯(lián)網(wǎng)碼農(nóng),如果想要進(jìn)階和了解更多的干貨,歡迎關(guān)注我的公眾號接收到的我的最新文章。
