手把手教你設(shè)計一個任務(wù)調(diào)度器
我們從業(yè)務(wù)現(xiàn)狀出發(fā),考慮需要覆蓋的場景,設(shè)計調(diào)度器需要提供的角色。然后確定了調(diào)度器的關(guān)鍵接口,同時給出了簡單實現(xiàn)。同時,也提到一些個人設(shè)計和實現(xiàn)時候的一些思考,比如多叉樹和有向無環(huán)圖。另外,考慮到 Rxjs 比較適合 task 的組織,借鑒 Rxjs 的 API 并將其應(yīng)用到實踐中。希望能給出一些啟發(fā),歡迎一起討論~
現(xiàn)狀
以騰訊文檔業(yè)務(wù)中的 預(yù)加載流程舉例,在進行預(yù)加載的過程中,有三個主要的流程,分別為
數(shù)據(jù)預(yù)加載
離線數(shù)據(jù)同步
預(yù)渲染
在代碼實現(xiàn)過程中,這三大邏輯竟然寫在一個方法里面,代碼已經(jīng)有上百行。三者的依賴圖大致如下:

但是隨著代碼的膨脹,業(yè)務(wù)邏輯的復(fù)雜度指數(shù)增加。且維護的同學(xué)也不可能只有一個,慢慢地,相互之間就形成這樣的情況:

我們項目中的任務(wù)調(diào)度
我們發(fā)現(xiàn), 預(yù)加載整個流程中,很多業(yè)務(wù)邏輯是可以復(fù)用的。
假設(shè)有兩個業(yè)務(wù)場景,一個是斷網(wǎng)重連,一個是列表頁滑動。在斷網(wǎng)重連場景下,需要執(zhí)行的流程為:

在列表頁滑動場景下,需要執(zhí)行的流程為:

不同業(yè)務(wù)場景下,需要執(zhí)行的操作有很多相同的地方,比如以上兩個場景,都有“檢測文檔狀態(tài)”、“檢測文檔狀態(tài)”和“預(yù)渲染”三個步驟。所以我們將每個場景需要執(zhí)行的操作拆分為可以復(fù)用的細(xì)粒度的任務(wù),這樣可以減少重復(fù)代碼。
為了更好地維護管理,減少耦合,我們打算引入一個任務(wù)調(diào)度器,以便能夠做到:
將業(yè)務(wù)流程中的各個邏輯合理拆分,使其粒度更細(xì),便于復(fù)用。
拆分后的子任務(wù),之間的依賴順序可以自由配置,靈活組配,適應(yīng)更多的業(yè)務(wù)場景。
引入 task
我們首先將業(yè)務(wù)邏輯進行拆解,將業(yè)務(wù)邏輯的最小拆分單元,描述為 task,task 是調(diào)度器能處理的最小單元。
如何描述 task 之間的執(zhí)行順序
將業(yè)務(wù)邏輯拆分為一個個細(xì)粒度的 task 之后,如何控制 task 之間的執(zhí)行順序,如何使多個 task 靈活地并行和串行,是設(shè)計時遇到的一個難題。
我們想到的可能的場景有如下幾個。
首先,最簡單的串行任務(wù)。如圖,Task1、Task2 以及 Task3 串行執(zhí)行,后面的 task 需要前面的返回值:

其次,就是并行任務(wù)。如圖,Task1 執(zhí)行完之后,需要執(zhí)行后續(xù)三個任務(wù),這三個任務(wù)是并行執(zhí)行的:

然后,還需要覆蓋到有條件的分支流程。如圖,Task1 執(zhí)行完成在滿足一定條件后,才能依次執(zhí)行 Task2 和 Task3 。若不滿足,則什么都不執(zhí)行:

也有其他的條件分支流程,比如 Task1 執(zhí)行完成在滿足一定條件后,才能執(zhí)行 Task2 。若不滿足,則需要并行執(zhí)行 Task3 和 Task4。如圖:

最后,還有最復(fù)雜的情況,就是串行和并行交織在一起,再加上條件控制比較復(fù)雜的情況。舉例如下:

設(shè)計思路
如何根據(jù)以上需要滿足的場景,去設(shè)計我們調(diào)度器的架構(gòu)呢?
一個任務(wù)完成后,可能接下來需要執(zhí)行一個或者三個后續(xù)的流程。所以第一時間想到了多叉樹。且是有向的多叉樹。
有向多叉樹
最初將這種結(jié)構(gòu)抽象為 有向多叉樹,可以滿足很多場景。比如:

但是 有向多叉樹不能表示這種:

有向無環(huán)圖設(shè)計
我們發(fā)現(xiàn) 樹 這種數(shù)據(jù)結(jié)構(gòu)可能并不能滿足需求,我們就想到圖。又因為這種圖,好像有一種流向,貌似可以進行拓?fù)渑判?。所以我們嘗試用 有向無環(huán)圖這種數(shù)據(jù)結(jié)構(gòu)。
這樣一來,我們的數(shù)據(jù)結(jié)構(gòu)抽象為 DAG (有向無環(huán)圖),問題也就變成了,如何構(gòu)建 DAG ,并操作 DAG 。
其他角色
為了更好地實現(xiàn)任務(wù)調(diào)度器,除了引入 task 之外, 我們還引入 Job 和 Scheduler 的概念。
task
我們將業(yè)務(wù)邏輯的最小拆分單元,描述為 task。task 是調(diào)度器能處理的最小單元,Scheduler 管理 Job,不直接管理 task ,task 由其所在的 Job 管理。
Job
Job 用來管理一組互相關(guān)聯(lián)的 task,成為一個有意義的作業(yè),即一個 Job 由一個或多個 task 組成。
BasicJob 是 Job 基類,業(yè)務(wù)的 Job 需要繼承并實現(xiàn)其抽象方法。
Scheduler
Scheduler 用來管理調(diào)度向其添加的 Job, 可以恢復(fù)和暫停自身的執(zhí)行。
整體結(jié)構(gòu)
上面介紹了如何描述 task 之間的執(zhí)行順序,具體的業(yè)務(wù)邏輯由 task 來執(zhí)行,那如何管理 task 呢?介紹角色的時候提到,Job 用來管理一組互相關(guān)聯(lián)的 task,成為一個有意義的 作業(yè)。且用 Scheduler 來管理調(diào)度向其添加的 Job 。
整體的架構(gòu)如下:
實現(xiàn)分析
接口定義
task 的接口
我們由底層向高層逐漸來分析。上文提到,task 是最基本的運行單位,是最細(xì)粒度的拆分單元。那我們該如何規(guī)范 task 呢?
我們希望業(yè)務(wù)方可以將自己的業(yè)務(wù)邏輯抽象封裝為一個 task ,而封裝好的 task 能夠被我們調(diào)度器所識別,并能被調(diào)度器正確處理。所以 task 需要實現(xiàn)我們自定義的接口,接口 ITask 規(guī)范 task 應(yīng)該具備怎么樣的能力,應(yīng)該怎么樣被實現(xiàn)。
下面看下:
interface ITask {/*** 執(zhí)行 task 的具體邏輯** @return {*} {Promise<ITaskResult<Result>>} task 的返回結(jié)果*/run(previousTaskResult: ITaskResult): Promise<ITaskResult>;/*** 當(dāng) task 內(nèi)部執(zhí)行失敗時,應(yīng)做的后續(xù)處理。* 比如回滾 task 的執(zhí)行* 比如僅僅上報日志* onError 是可選的,可以不實現(xiàn)*/onError?(error: Error): void;}
ITask 提供一個 run 方法,用來執(zhí)行具體的業(yè)務(wù)邏輯。它接受上一個任務(wù)的執(zhí)行結(jié)果,并按照規(guī)范返回自己的處理結(jié)果。
當(dāng) task 內(nèi)部執(zhí)行失敗時,需要做一些注入回滾或者上報之類的異常處理邏輯。所以提供 onError 方法。
job 的接口
Job 用來管理 task,需要具備兩大能力——
Job 需要能添加要處理的 task
Job 需要要能指定 task 的執(zhí)行順序
JobScheduler 的接口
JobScheduler 用來管理 Job, 由于我們賦予 task 靈活的組織方式,可以并行,可以串行,同時可以指定條件分支。用來適應(yīng)各種業(yè)務(wù)場景。
我們發(fā)現(xiàn) Job 之間不需要太復(fù)雜的組織方式,簡單地串行即可。所以 JobScheduler 相對簡單,能管理 Job 進行串行執(zhí)行即可。另外, JobScheduler 需要具備暫停當(dāng)前調(diào)度器執(zhí)行的能力,相應(yīng)地,也需要具備恢復(fù)執(zhí)行的能力。
于是,我們簡單定義 IJobScheduler 為:
interface IJobScheduler {/*** 添加 job 以便調(diào)度* 如果當(dāng)前沒有 job 正在被執(zhí)行,且該 JobSchedule 沒有被 pause,則立即執(zhí)行* 否則,只是放在隊列,以便后續(xù)執(zhí)行*/add(job: BasicJob): void;/*** 暫停 JobScheduler 的調(diào)度行為,當(dāng)前正在執(zhí)行的 Job 不受影響* 增加 key 是為了如果有多個原因要 pause ,那么需要所有的原因都* 可以 resume 的時候,才能真正 resume** @param {string} [key] 標(biāo)識當(dāng)前暫停原因的 key*/pause(key?: string): string;/*** 恢復(fù) JobSchedule 的執(zhí)行** @param {string} key pause 返回的 key* @return {*} {boolean} resume 是否成功*/resume(key: string): boolean;}
實現(xiàn)
接口 ITask 由業(yè)務(wù)方去執(zhí)行即可,相對簡單。調(diào)度器管理的 ITask 接口,不關(guān)心具體的實現(xiàn)。
BasicJob
Job 管理 task,我們需要實現(xiàn)其兩大能力——
Job 需要能添加要處理的 task
Job 需要要能指定 task 的執(zhí)行順序
這里可以用構(gòu)建和操作 有向無環(huán)圖的方式實現(xiàn)。最初設(shè)想也是將問題轉(zhuǎn)化為
將 task 的執(zhí)行依賴抽象為
有向無環(huán)圖的構(gòu)建Job 的執(zhí)行則是對
有向無環(huán)圖進行遍歷。
后面參考了 Rxjs 的 API 和 使用方式,我們決定使用 Rxjs 來實現(xiàn) Job 。
變換式編程思考
為什么使用 Rxjs ?
所有程序其實都是對數(shù)據(jù)的一種變換——將輸入轉(zhuǎn)換為輸出。然而,當(dāng)我們在構(gòu)思設(shè)計時,很少考慮創(chuàng)建變換過程。相反,我們關(guān)心的是類和模塊、數(shù)據(jù)結(jié)構(gòu)和算法、語言和框架。
我們認(rèn)為,從這個角度關(guān)注代碼往往忽略了要點——我們需要重新將程序視為輸入到輸出的一個變換。當(dāng)這樣做的時候,許多以前操心的細(xì)節(jié)就消失了。結(jié)構(gòu)變得更清晰,錯誤處理更加一致,耦合下降了很多。
如果你不能將正在做的事情描述為一個流程,那表示你不知道自己在做什么。
————《程序員修煉之道》
而 Rxjs 做這件事就特別合適。其中主要用到了 Rxjs 中的 API 有:
pipe
concatMap
forkJoin
iif
我們可以將 task 的執(zhí)行流程抽象為一個數(shù)據(jù)管道,首先需要創(chuàng)建一個數(shù)據(jù)流的起始點:
private source: Observable<TaskResultType>;// defaultData 可以是初始化的數(shù)據(jù),也可以是外界傳入的配置信息private initSource(defaultData?: unknown): void {/** @type {ITaskResult} task 添加前的起始值 */const startPoint: ITaskResult = {status: TASK_STATUS.success,data: defaultData,};// 創(chuàng)建 Observablethis.source = of(startPoint);}
這里的 source 是 Rxjs 中提供的 Observable 類型對象,即可以被觀察的,相當(dāng)于一個生產(chǎn)者。
另外這里也用到了 Rxjs 中提供的 of 操作符,簡單說就是將數(shù)據(jù)轉(zhuǎn)化為 Observable 類型對象。那如何添加并組織 task 呢?這里提供了三個方法:
/*** 添加多個 task ,串行執(zhí)行*/public serialNext(tasks: ITask[]): void {}/*** 添加多個 task ,并行執(zhí)行*/public parallellNext(tasks: ITask[]): void {}/*** 條件操作。* 第一個參數(shù)是一個 條件函數(shù),返回 true false,以此來決定走哪個子 Job*/public iif(condition: (previousResult: unknown) => boolean, trueSource: BasicJob, falseSource?: BasicJob):void {}
JobScheduler
先分析下我們的構(gòu)造函數(shù):
public constructor(queue: IQueue = new FIFOQueue()) {}這個 FIFOQueue為啥不直接使用數(shù)組在 JobScheduler實現(xiàn)呢?
其實, FIFOQueue的內(nèi)部實現(xiàn)就是數(shù)組,代碼不過十行左右。這樣做,是為了將 queue和 job-schedule解耦開,而 queue是可以依賴注入的,可以自定義實現(xiàn)的。簡單說,就是不希望 queue的實現(xiàn)細(xì)節(jié)在 job-schedule中, job-schedule只存在 queue的接口操作。
再看 JobScheduler的 pause()和 resume()方法,設(shè)計的時候考慮到,這里不能是簡單的,調(diào)用 pause()的時候暫停執(zhí)行,調(diào)用 resume()就恢復(fù)執(zhí)行這么簡單。
設(shè)想一種業(yè)務(wù)場景,如果有多個原因要暫停調(diào)度器的執(zhí)行,那么可能有一個業(yè)務(wù)模塊內(nèi)的多處都執(zhí)行了 pause()。那么當(dāng)其中一一處需要恢復(fù)執(zhí)行的時候,即某一個暫停執(zhí)行的原因不再成立的時候,這時調(diào)用了 resume()的時候,調(diào)度器是否應(yīng)該立即響應(yīng),去恢復(fù)執(zhí)行呢?
要不要恢復(fù)執(zhí)行,就看此時是否真的可以恢復(fù)調(diào)度器的執(zhí)行。如果業(yè)務(wù)方只在調(diào)度器可以恢復(fù)執(zhí)行的時候,才真正恢復(fù)。也就是所以暫停執(zhí)行的原因都不再成立的時候,才執(zhí)行 resume()。那么這樣的話,調(diào)度器就把恢復(fù)和暫停的工作交給業(yè)務(wù)方來確保。
這樣的弊端是:
增加了業(yè)務(wù)方的處理負(fù)擔(dān),需要關(guān)注調(diào)度器內(nèi)部的執(zhí)行狀態(tài)。
當(dāng)需要調(diào)用
resume()的時候,需要考慮到其他業(yè)務(wù)是否受當(dāng)前執(zhí)行狀態(tài)的影響。debug 的時候也難以追蹤,很難調(diào)試。
維護成本很高,需要知道當(dāng)前模塊下所有使用調(diào)度器的業(yè)務(wù)。
于是,我們將這一工作收斂到調(diào)度器內(nèi)部。業(yè)務(wù)只關(guān)心自己應(yīng)該暫停還是恢復(fù),不需要關(guān)心其他業(yè)務(wù)會不會受影響。那么當(dāng)調(diào)用 resume()的時候,可能并不會真正地開始恢復(fù)執(zhí)行。
具體實現(xiàn)如下:
/*** 暫停 JobScheduler 的調(diào)度行為,當(dāng)前正在執(zhí)行的 Job 不受影響* 增加 key 是為了如果有多個原因要 pause ,那么需要所有的原因都* 可以 resume 的時候,才能真正 resume** @param {string} [key] 標(biāo)識當(dāng)前暫停原因的 key*/public pause(key?: string): string {// 如果不傳 key ,則生成一個隨機字符串作為 keyconst keyNotNull = key ?? this.generatePauseKey();this.pausedKeys.add(keyNotNull);return keyNotNull;}/*** 恢復(fù) JobSchedule 的執(zhí)行** @param {string} key pause 返回的 key* @return {*} {boolean} resume 是否成功*/public resume(key: string): boolean {// 如果 key 存在于 set 中,返回true,并成功移除。否則,返回 falseconst isContainKey = this.pausedKeys.delete(key);// 如果移除后 pausedKeys 為空,則可以 resumeif (this.pausedKeys.size === 0) {this.executeNextJob();}return isContainKey;}
其他特性
除了上面介紹的,我們的調(diào)度器還支持
指定任務(wù)的優(yōu)先級
任務(wù)被處理的各個時機回調(diào)
超時處理
異常處理
...
總結(jié)
我們從業(yè)務(wù)現(xiàn)狀出發(fā),考慮需要覆蓋的場景,設(shè)計調(diào)度器需要提供的角色。然后確定了調(diào)度器的關(guān)鍵接口,同時給出了簡單實現(xiàn)。同時,也提到一些個人設(shè)計和實現(xiàn)時候的一些思考,比如多叉樹和有向無環(huán)圖。
另外,考慮到 Rxjs 比較適合 task 的組織,借鑒 Rxjs 的 API 并將其應(yīng)用到實踐中。希望能給出一些啟發(fā),歡迎一起討論~
參考文章
實現(xiàn)一個異步并發(fā)調(diào)度器
Rxjs 官網(wǎng)
learnrxjs
bullmq
內(nèi)推社群
我組建了一個氛圍特別好的騰訊內(nèi)推社群,如果你對加入騰訊感興趣的話(后續(xù)有計劃也可以),我們可以一起進行面試相關(guān)的答疑、聊聊面試的故事、并且在你準(zhǔn)備好的時候隨時幫你內(nèi)推。下方加 winty 好友回復(fù)「面試」即可。
