<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          手把手教你設(shè)計一個任務(wù)調(diào)度器

          共 6588字,需瀏覽 14分鐘

           ·

          2021-07-11 11:39

          我們從業(yè)務(wù)現(xiàn)狀出發(fā),考慮需要覆蓋的場景,設(shè)計調(diào)度器需要提供的角色。然后確定了調(diào)度器的關(guān)鍵接口,同時給出了簡單實現(xiàn)。同時,也提到一些個人設(shè)計和實現(xiàn)時候的一些思考,比如多叉樹和有向無環(huán)圖。另外,考慮到 Rxjs 比較適合 task 的組織,借鑒 Rxjs 的 API 并將其應(yīng)用到實踐中。希望能給出一些啟發(fā),歡迎一起討論~

          現(xiàn)狀

          以騰訊文檔業(yè)務(wù)中的 預(yù)加載流程舉例,在進行預(yù)加載的過程中,有三個主要的流程,分別為

          1. 數(shù)據(jù)預(yù)加載

          2. 離線數(shù)據(jù)同步

          3. 預(yù)渲染

          在代碼實現(xiàn)過程中,這三大邏輯竟然寫在一個方法里面,代碼已經(jīng)有上百行。三者的依賴圖大致如下:

          但是隨著代碼的膨脹,業(yè)務(wù)邏輯的復(fù)雜度指數(shù)增加。且維護的同學(xué)也不可能只有一個,慢慢地,相互之間就形成這樣的情況:

          我們項目中的任務(wù)調(diào)度

          我們發(fā)現(xiàn), 預(yù)加載整個流程中,很多業(yè)務(wù)邏輯是可以復(fù)用的。

          假設(shè)有兩個業(yè)務(wù)場景,一個是斷網(wǎng)重連,一個是列表頁滑動。在斷網(wǎng)重連場景下,需要執(zhí)行的流程為:

          在列表頁滑動場景下,需要執(zhí)行的流程為:

          不同業(yè)務(wù)場景下,需要執(zhí)行的操作有很多相同的地方,比如以上兩個場景,都有“檢測文檔狀態(tài)”、“檢測文檔狀態(tài)”和“預(yù)渲染”三個步驟。所以我們將每個場景需要執(zhí)行的操作拆分為可以復(fù)用的細(xì)粒度的任務(wù),這樣可以減少重復(fù)代碼。

          為了更好地維護管理,減少耦合,我們打算引入一個任務(wù)調(diào)度器,以便能夠做到:

          1. 將業(yè)務(wù)流程中的各個邏輯合理拆分,使其粒度更細(xì),便于復(fù)用。

          2. 拆分后的子任務(wù),之間的依賴順序可以自由配置,靈活組配,適應(yīng)更多的業(yè)務(wù)場景。

          引入 task

          我們首先將業(yè)務(wù)邏輯進行拆解,將業(yè)務(wù)邏輯的最小拆分單元,描述為 task,task 是調(diào)度器能處理的最小單元。

          如何描述 task 之間的執(zhí)行順序

          將業(yè)務(wù)邏輯拆分為一個個細(xì)粒度的 task 之后,如何控制 task 之間的執(zhí)行順序,如何使多個 task 靈活地并行和串行,是設(shè)計時遇到的一個難題。

          我們想到的可能的場景有如下幾個。

          首先,最簡單的串行任務(wù)。如圖,Task1、Task2 以及 Task3 串行執(zhí)行,后面的 task 需要前面的返回值:

          其次,就是并行任務(wù)。如圖,Task1 執(zhí)行完之后,需要執(zhí)行后續(xù)三個任務(wù),這三個任務(wù)是并行執(zhí)行的:

          然后,還需要覆蓋到有條件的分支流程。如圖,Task1 執(zhí)行完成在滿足一定條件后,才能依次執(zhí)行 Task2 和 Task3 。若不滿足,則什么都不執(zhí)行:

          也有其他的條件分支流程,比如 Task1 執(zhí)行完成在滿足一定條件后,才能執(zhí)行 Task2 。若不滿足,則需要并行執(zhí)行 Task3 和 Task4。如圖:

          最后,還有最復(fù)雜的情況,就是串行和并行交織在一起,再加上條件控制比較復(fù)雜的情況。舉例如下:

          設(shè)計思路

          如何根據(jù)以上需要滿足的場景,去設(shè)計我們調(diào)度器的架構(gòu)呢?

          一個任務(wù)完成后,可能接下來需要執(zhí)行一個或者三個后續(xù)的流程。所以第一時間想到了多叉樹。且是有向的多叉樹。

          有向多叉樹

          最初將這種結(jié)構(gòu)抽象為 有向多叉樹,可以滿足很多場景。比如:

          但是 有向多叉樹不能表示這種:

          有向無環(huán)圖設(shè)計

          我們發(fā)現(xiàn) 這種數(shù)據(jù)結(jié)構(gòu)可能并不能滿足需求,我們就想到圖。又因為這種圖,好像有一種流向,貌似可以進行拓?fù)渑判?。所以我們嘗試用 有向無環(huán)圖這種數(shù)據(jù)結(jié)構(gòu)。

          這樣一來,我們的數(shù)據(jù)結(jié)構(gòu)抽象為 DAG (有向無環(huán)圖),問題也就變成了,如何構(gòu)建 DAG ,并操作 DAG 。

          其他角色

          為了更好地實現(xiàn)任務(wù)調(diào)度器,除了引入 task 之外, 我們還引入 Job 和 Scheduler 的概念。

          task

          我們將業(yè)務(wù)邏輯的最小拆分單元,描述為 task。task 是調(diào)度器能處理的最小單元,Scheduler 管理 Job,不直接管理 task ,task 由其所在的 Job 管理。

          Job

          Job 用來管理一組互相關(guān)聯(lián)的 task,成為一個有意義的作業(yè),即一個 Job 由一個或多個 task 組成。

          BasicJob 是 Job 基類,業(yè)務(wù)的 Job 需要繼承并實現(xiàn)其抽象方法。

          Scheduler

          Scheduler 用來管理調(diào)度向其添加的 Job, 可以恢復(fù)和暫停自身的執(zhí)行。

          整體結(jié)構(gòu)

          上面介紹了如何描述 task 之間的執(zhí)行順序,具體的業(yè)務(wù)邏輯由 task 來執(zhí)行,那如何管理 task 呢?介紹角色的時候提到,Job 用來管理一組互相關(guān)聯(lián)的 task,成為一個有意義的 作業(yè)。且用 Scheduler 來管理調(diào)度向其添加的 Job 。

          整體的架構(gòu)如下:

          實現(xiàn)分析

          接口定義

          task 的接口

          我們由底層向高層逐漸來分析。上文提到,task 是最基本的運行單位,是最細(xì)粒度的拆分單元。那我們該如何規(guī)范 task 呢?

          我們希望業(yè)務(wù)方可以將自己的業(yè)務(wù)邏輯抽象封裝為一個 task ,而封裝好的 task 能夠被我們調(diào)度器所識別,并能被調(diào)度器正確處理。所以 task 需要實現(xiàn)我們自定義的接口,接口 ITask 規(guī)范 task 應(yīng)該具備怎么樣的能力,應(yīng)該怎么樣被實現(xiàn)。

          下面看下:

          interface ITask {  /**   * 執(zhí)行 task 的具體邏輯   *   * @return {*}  {Promise<ITaskResult<Result>>} task 的返回結(jié)果   */  run(previousTaskResult: ITaskResult): Promise<ITaskResult>;
          /** * 當(dāng) task 內(nèi)部執(zhí)行失敗時,應(yīng)做的后續(xù)處理。 * 比如回滾 task 的執(zhí)行 * 比如僅僅上報日志 * onError 是可選的,可以不實現(xiàn) */ onError?(error: Error): void;}

          ITask 提供一個 run 方法,用來執(zhí)行具體的業(yè)務(wù)邏輯。它接受上一個任務(wù)的執(zhí)行結(jié)果,并按照規(guī)范返回自己的處理結(jié)果。

          當(dāng) task 內(nèi)部執(zhí)行失敗時,需要做一些注入回滾或者上報之類的異常處理邏輯。所以提供 onError 方法。

          job 的接口

          Job 用來管理 task,需要具備兩大能力——

          1. Job 需要能添加要處理的 task

          2. Job 需要要能指定 task 的執(zhí)行順序

          JobScheduler 的接口

          JobScheduler 用來管理 Job, 由于我們賦予 task 靈活的組織方式,可以并行,可以串行,同時可以指定條件分支。用來適應(yīng)各種業(yè)務(wù)場景。

          我們發(fā)現(xiàn) Job 之間不需要太復(fù)雜的組織方式,簡單地串行即可。所以 JobScheduler 相對簡單,能管理 Job 進行串行執(zhí)行即可。另外, JobScheduler 需要具備暫停當(dāng)前調(diào)度器執(zhí)行的能力,相應(yīng)地,也需要具備恢復(fù)執(zhí)行的能力。

          于是,我們簡單定義 IJobScheduler 為:

          interface IJobScheduler {  /**   * 添加 job 以便調(diào)度   * 如果當(dāng)前沒有 job 正在被執(zhí)行,且該 JobSchedule 沒有被 pause,則立即執(zhí)行   * 否則,只是放在隊列,以便后續(xù)執(zhí)行   */  add(job: BasicJob): void;
          /** * 暫停 JobScheduler 的調(diào)度行為,當(dāng)前正在執(zhí)行的 Job 不受影響 * 增加 key 是為了如果有多個原因要 pause ,那么需要所有的原因都 * 可以 resume 的時候,才能真正 resume * * @param {string} [key] 標(biāo)識當(dāng)前暫停原因的 key */ pause(key?: string): string;
          /** * 恢復(fù) JobSchedule 的執(zhí)行 * * @param {string} key pause 返回的 key * @return {*} {boolean} resume 是否成功 */ resume(key: string): boolean;}

          實現(xiàn)

          接口 ITask 由業(yè)務(wù)方去執(zhí)行即可,相對簡單。調(diào)度器管理的 ITask 接口,不關(guān)心具體的實現(xiàn)。

          BasicJob

          Job 管理 task,我們需要實現(xiàn)其兩大能力——

          1. Job 需要能添加要處理的 task

          2. Job 需要要能指定 task 的執(zhí)行順序

          這里可以用構(gòu)建和操作 有向無環(huán)圖的方式實現(xiàn)。最初設(shè)想也是將問題轉(zhuǎn)化為

          1. 將 task 的執(zhí)行依賴抽象為 有向無環(huán)圖的構(gòu)建

          2. Job 的執(zhí)行則是對 有向無環(huán)圖進行遍歷。

          后面參考了 Rxjs 的 API 和 使用方式,我們決定使用 Rxjs 來實現(xiàn) Job 。

          變換式編程思考

          為什么使用 Rxjs ?

          所有程序其實都是對數(shù)據(jù)的一種變換——將輸入轉(zhuǎn)換為輸出。然而,當(dāng)我們在構(gòu)思設(shè)計時,很少考慮創(chuàng)建變換過程。相反,我們關(guān)心的是類和模塊、數(shù)據(jù)結(jié)構(gòu)和算法、語言和框架。

          我們認(rèn)為,從這個角度關(guān)注代碼往往忽略了要點——我們需要重新將程序視為輸入到輸出的一個變換。當(dāng)這樣做的時候,許多以前操心的細(xì)節(jié)就消失了。結(jié)構(gòu)變得更清晰,錯誤處理更加一致,耦合下降了很多。

          如果你不能將正在做的事情描述為一個流程,那表示你不知道自己在做什么。

          ————《程序員修煉之道》

          而 Rxjs 做這件事就特別合適。其中主要用到了 Rxjs 中的 API 有:

          • pipe

          • concatMap

          • forkJoin

          • iif

          我們可以將 task 的執(zhí)行流程抽象為一個數(shù)據(jù)管道,首先需要創(chuàng)建一個數(shù)據(jù)流的起始點:

          private source: Observable<TaskResultType>;
          // defaultData 可以是初始化的數(shù)據(jù),也可以是外界傳入的配置信息private initSource(defaultData?: unknown): void { /** @type {ITaskResult} task 添加前的起始值 */ const startPoint: ITaskResult = { status: TASK_STATUS.success, data: defaultData, }; // 創(chuàng)建 Observable this.source = of(startPoint);}

          這里的 source 是 Rxjs 中提供的 Observable 類型對象,即可以被觀察的,相當(dāng)于一個生產(chǎn)者。

          另外這里也用到了 Rxjs 中提供的 of 操作符,簡單說就是將數(shù)據(jù)轉(zhuǎn)化為 Observable 類型對象。那如何添加并組織 task 呢?這里提供了三個方法:

          /** * 添加多個 task ,串行執(zhí)行 */public serialNext(tasks: ITask[]): void {}
          /** * 添加多個 task ,并行執(zhí)行 */public parallellNext(tasks: ITask[]): void {}
          /** * 條件操作。 * 第一個參數(shù)是一個 條件函數(shù),返回 true false,以此來決定走哪個子 Job */public iif(condition: (previousResult: unknown) => boolean, trueSource: BasicJob, falseSource?: BasicJob):void {}

          JobScheduler

          先分析下我們的構(gòu)造函數(shù):

          public constructor(queue: IQueue = new FIFOQueue()) {}

          這個 FIFOQueue為啥不直接使用數(shù)組在 JobScheduler實現(xiàn)呢?

          其實, FIFOQueue的內(nèi)部實現(xiàn)就是數(shù)組,代碼不過十行左右。這樣做,是為了將 queuejob-schedule解耦開,而 queue是可以依賴注入的,可以自定義實現(xiàn)的。簡單說,就是不希望 queue的實現(xiàn)細(xì)節(jié)在 job-schedule中, job-schedule只存在 queue的接口操作。

          再看 JobSchedulerpause()resume()方法,設(shè)計的時候考慮到,這里不能是簡單的,調(diào)用 pause()的時候暫停執(zhí)行,調(diào)用 resume()就恢復(fù)執(zhí)行這么簡單。

          設(shè)想一種業(yè)務(wù)場景,如果有多個原因要暫停調(diào)度器的執(zhí)行,那么可能有一個業(yè)務(wù)模塊內(nèi)的多處都執(zhí)行了 pause()。那么當(dāng)其中一一處需要恢復(fù)執(zhí)行的時候,即某一個暫停執(zhí)行的原因不再成立的時候,這時調(diào)用了 resume()的時候,調(diào)度器是否應(yīng)該立即響應(yīng),去恢復(fù)執(zhí)行呢?

          要不要恢復(fù)執(zhí)行,就看此時是否真的可以恢復(fù)調(diào)度器的執(zhí)行。如果業(yè)務(wù)方只在調(diào)度器可以恢復(fù)執(zhí)行的時候,才真正恢復(fù)。也就是所以暫停執(zhí)行的原因都不再成立的時候,才執(zhí)行 resume()。那么這樣的話,調(diào)度器就把恢復(fù)和暫停的工作交給業(yè)務(wù)方來確保。

          這樣的弊端是:

          1. 增加了業(yè)務(wù)方的處理負(fù)擔(dān),需要關(guān)注調(diào)度器內(nèi)部的執(zhí)行狀態(tài)。

          2. 當(dāng)需要調(diào)用 resume()的時候,需要考慮到其他業(yè)務(wù)是否受當(dāng)前執(zhí)行狀態(tài)的影響。

          3. debug 的時候也難以追蹤,很難調(diào)試。

          4. 維護成本很高,需要知道當(dāng)前模塊下所有使用調(diào)度器的業(yè)務(wù)。

          于是,我們將這一工作收斂到調(diào)度器內(nèi)部。業(yè)務(wù)只關(guān)心自己應(yīng)該暫停還是恢復(fù),不需要關(guān)心其他業(yè)務(wù)會不會受影響。那么當(dāng)調(diào)用 resume()的時候,可能并不會真正地開始恢復(fù)執(zhí)行。

          具體實現(xiàn)如下:

          /** * 暫停 JobScheduler 的調(diào)度行為,當(dāng)前正在執(zhí)行的 Job 不受影響 * 增加 key 是為了如果有多個原因要 pause ,那么需要所有的原因都 * 可以 resume 的時候,才能真正 resume * * @param {string} [key] 標(biāo)識當(dāng)前暫停原因的 key */public pause(key?: string): string {  // 如果不傳 key ,則生成一個隨機字符串作為 key  const keyNotNull = key ?? this.generatePauseKey();  this.pausedKeys.add(keyNotNull);
          return keyNotNull;}/** * 恢復(fù) JobSchedule 的執(zhí)行 * * @param {string} key pause 返回的 key * @return {*} {boolean} resume 是否成功 */public resume(key: string): boolean { // 如果 key 存在于 set 中,返回true,并成功移除。否則,返回 false const isContainKey = this.pausedKeys.delete(key);
          // 如果移除后 pausedKeys 為空,則可以 resume if (this.pausedKeys.size === 0) { this.executeNextJob(); } return isContainKey;}

          其他特性

          除了上面介紹的,我們的調(diào)度器還支持

          • 指定任務(wù)的優(yōu)先級

          • 任務(wù)被處理的各個時機回調(diào)

          • 超時處理

          • 異常處理

          • ...

          總結(jié)

          我們從業(yè)務(wù)現(xiàn)狀出發(fā),考慮需要覆蓋的場景,設(shè)計調(diào)度器需要提供的角色。然后確定了調(diào)度器的關(guān)鍵接口,同時給出了簡單實現(xiàn)。同時,也提到一些個人設(shè)計和實現(xiàn)時候的一些思考,比如多叉樹和有向無環(huán)圖。

          另外,考慮到 Rxjs 比較適合 task 的組織,借鑒 Rxjs 的 API 并將其應(yīng)用到實踐中。希望能給出一些啟發(fā),歡迎一起討論~

          參考文章

          • 實現(xiàn)一個異步并發(fā)調(diào)度器

          • Rxjs 官網(wǎng)

          • learnrxjs

          • bullmq



          內(nèi)推社群


          我組建了一個氛圍特別好的騰訊內(nèi)推社群,如果你對加入騰訊感興趣的話(后續(xù)有計劃也可以),我們可以一起進行面試相關(guān)的答疑、聊聊面試的故事、并且在你準(zhǔn)備好的時候隨時幫你內(nèi)推。下方加 winty 好友回復(fù)「面試」即可。


          瀏覽 56
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产av性爱| 成人水蜜桃视频 | 搔视频网址在线观看 | 想要xx| 婷婷人人爽 |