<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          基于 DAG 的任務(wù)編排框架/平臺

          共 5928字,需瀏覽 12分鐘

           ·

          2021-09-06 08:30


          -     前言    -


          最近在做的工作比較需要一個(gè)支持任務(wù)編排工作流的框架或者平臺,這里記錄下實(shí)現(xiàn)上的一些思路。


          -     任務(wù)編排工作流    -


          任務(wù)編排是什么意思呢,顧名思義就是可以把"任務(wù)"這個(gè)原子單位按照自己的方式進(jìn)行編排,任務(wù)之間可能互相依賴。復(fù)雜一點(diǎn)的編排之后就能形成一個(gè) workflow 工作流了。

          我們希望這個(gè)工作流按照我們編排的方式去執(zhí)行每個(gè)原子 task 任務(wù)。如下圖所示,我們希望先并發(fā)運(yùn)行 Task A 和 Task C,Task A 執(zhí)行完后串行運(yùn)行 Task B,在并發(fā)等待 Task B 和 C 都結(jié)束后運(yùn)行 Task D,這樣就完成了一個(gè)典型的任務(wù)編排工作流。



          -     DAG 有向無環(huán)圖    -


          首先我們了解圖這個(gè)數(shù)據(jù)結(jié)構(gòu),每個(gè)元素稱為頂點(diǎn) vertex,頂點(diǎn)之間的連線稱為邊 edge。

          像我們畫的這種帶箭頭關(guān)系的稱為有向圖,箭頭關(guān)系之間能形成一個(gè)環(huán)的成為有環(huán)圖,反之稱為無環(huán)圖。顯然運(yùn)用在我們?nèi)蝿?wù)編排工作流上,最合適的是 DAG 有向無環(huán)圖。

          我們在代碼里怎么存儲(chǔ)圖呢,有兩種數(shù)據(jù)結(jié)構(gòu):鄰接矩陣和鄰接表。

          下圖表示一個(gè)有向圖的鄰接矩陣,例如 x->y 的邊,只需將 Array[x][y]標(biāo)識為 1 即可。


          此外我們也可以使用鄰接表來存儲(chǔ),這種存儲(chǔ)方式較好地彌補(bǔ)了鄰接矩陣?yán)速M(fèi)空間的缺點(diǎn),但相對來說鄰接矩陣能更快地判斷連通性。


          一般在代碼實(shí)現(xiàn)上,我們會(huì)選擇鄰接矩陣,這樣我們在判斷兩點(diǎn)之間是否有邊更方便點(diǎn)。


          -     一個(gè)任務(wù)編排框架    -


          了解了 DAG 的基本知識后我們可以來簡單實(shí)現(xiàn)一下。

          了解JUC包的可能快速想到CompletableFuture,這個(gè)類對于多個(gè)并發(fā)線程有復(fù)雜關(guān)系耦合的場景是很適用的,如果是一次性任務(wù),那么使用CompletableFuture完全沒有問題。但是作為框架或者平臺來說,我們還需要考慮存儲(chǔ)節(jié)點(diǎn)狀態(tài)、重試執(zhí)行等邏輯,對于這些CompletableFuture是不能滿足的。

          我們需要更完整地考慮與設(shè)計(jì)這個(gè)框架。首先是存儲(chǔ)結(jié)構(gòu),我們的 Dag 表示一整個(gè)圖,Node 表示各個(gè)頂點(diǎn),每個(gè)頂點(diǎn)有其 parents 和 children:

          //Dagpublic final class DefaultDag<T, R> implements Dag<T, R> {
          private Map<T, Node<T, R>> nodes = new HashMap<T, Node<T, R>>(); ...}
          //Nodepublic final class Node<T, R> { /** * incoming dependencies for this node */ private Set<Node<T, R>> parents = new LinkedHashSet<Node<T, R>>(); /** * outgoing dependencies for this node */ private Set<Node<T, R>> children = new LinkedHashSet<Node<T, R>>(); ...}

          畫兩個(gè)頂點(diǎn),以及為這兩個(gè)頂點(diǎn)連邊操作如下:

          public void addDependency(final T evalFirstNode, final T evalLaterNode) {  Node<T, R> firstNode = createNode(evalFirstNode);  Node<T, R> afterNode = createNode(evalLaterNode);
          addEdges(firstNode, afterNode);}
          private Node<T, R> createNode(final T value) { Node<T, R> node = new Node<T, R>(value); return node;}private void addEdges(final Node<T, R> firstNode, final Node<T, R> afterNode) { if (!firstNode.equals(afterNode)) { firstNode.getChildren().add(afterNode); afterNode.getParents().add(firstNode); }}

          到現(xiàn)在我們其實(shí)已經(jīng)把基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)寫好了,但我們作為一個(gè)任務(wù)編排框架最終是需要線程去執(zhí)行的,我們把它和線程池一起給包裝一下。

          //任務(wù)編排線程池public class DefaultDexecutor <T, R> {
          //執(zhí)行線程,和2種重試線程 private final ExecutorService<T, R> executionEngine; private final ExecutorService immediatelyRetryExecutor; private final ScheduledExecutorService scheduledRetryExecutor; //執(zhí)行狀態(tài) private final ExecutorState<T, R> state; ...}//執(zhí)行狀態(tài)public class DefaultExecutorState<T, R> { //底層圖數(shù)據(jù)結(jié)構(gòu) private final Dag<T, R> graph; //已完成 private final Collection<Node<T, R>> processedNodes; //未完成 private final Collection<Node<T, R>> unProcessedNodes; //錯(cuò)誤task private final Collection<ExecutionResult<T, R>> erroredTasks; //執(zhí)行結(jié)果 private final Collection<ExecutionResult<T, R>> executionResults;}

          可以看到我們的線程包括執(zhí)行線程池,2 種重試線程池。我們使用 ExecutorState 來保存一些整個(gè)任務(wù)工作流執(zhí)行過程中的一些狀態(tài)記錄,包括已完成和未完成的 task,每個(gè) task 執(zhí)行的結(jié)果等。同時(shí)它也依賴我們底層的圖數(shù)據(jù)結(jié)構(gòu) DAG。
          接下來我們要做的事其實(shí)很簡單,就是 BFS 這整個(gè) DAG 數(shù)據(jù)結(jié)構(gòu),然后提交到線程池中去執(zhí)行就可以了,過程中注意一些節(jié)點(diǎn)狀態(tài)的保持,結(jié)果的保存即可。


          還是以上圖為例,值得說的一點(diǎn)是在 Task D 這個(gè)點(diǎn)需要有一個(gè)并發(fā)等待的操作,即 Task D 需要依賴 Task B 和 Task C 執(zhí)行結(jié)束后再往下執(zhí)行。這里有很多辦法,我選擇了共享變量的方式來完成并發(fā)等待。遍歷工作流中被遞歸的方法的偽代碼如下:

          private void doProcessNodes(final Set<Node<T, R>> nodes) {    for (Node<T, R> node : nodes) {        //共享變量 并發(fā)等待        if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents())) {            Task<T, R> task = newTask(node);            this.executionEngine.submit(task);            ...            ExecutionResult<T, R> executionResult = this.executionEngine.processResult();            if (executionResult.isSuccess()) {        state.markProcessingDone(processedNode);      }            //繼續(xù)執(zhí)行孩子節(jié)點(diǎn)      doExecute(processedNode.getChildren());            ...        }    }}
          這樣我們基本完成了這個(gè)任務(wù)編排框架的工作,現(xiàn)在我們可以如下來進(jìn)行示例圖中的任務(wù)編排以及執(zhí)行:

          DefaultExecutor<String, String> executor = newTaskExecutor();executor.addDependency("A", "B");executor.addDependency("B", "D");executor.addDependency("C", "D");executor.execute();


          -     任務(wù)編排平臺化    -


          好了現(xiàn)在我們已經(jīng)有一款任務(wù)編排框架了,但很多時(shí)候我們想要可視化、平臺化,讓使用者更加無腦。

          框架與平臺最大的區(qū)別在哪里?是可拖拽的可視化輸入么?我覺得這個(gè)的復(fù)雜度更多在前端。而對于后端平臺來講,與框架最大的區(qū)別是數(shù)據(jù)的持久化。

          對于 DAG 的頂點(diǎn)來說,我們需要將每個(gè)節(jié)點(diǎn) Task 的信息給持久化到關(guān)系數(shù)據(jù)庫中,包括 Task 的狀態(tài)、輸出結(jié)果等。

          而對于 DAG 的邊來說,我們也得用數(shù)據(jù)庫來存儲(chǔ)各 Task 之間的方向關(guān)系。此外,在遍歷執(zhí)行 DAG 的整個(gè)過程中的中間狀態(tài)數(shù)據(jù),我們也得搬運(yùn)到數(shù)據(jù)庫中。

          首先我們可以設(shè)計(jì)一個(gè) workflow 表,來表示一個(gè)工作流。接著我們設(shè)計(jì)一個(gè) task 表,來表示一個(gè)執(zhí)行單元。

          task 表主要字段如下,這里主要是 task_parents 的設(shè)計(jì),它是一個(gè) string,存儲(chǔ) parents 的 taskId,多個(gè)由分隔符分隔。

          task_idworkflow_idtask_nametask_statusresulttask_parents

          依賴是上圖這個(gè)例子,對比框架來說,我們首先得將其存儲(chǔ)到數(shù)據(jù)庫中去,最終可能得到如下數(shù)據(jù):

          task_id workflow_id task_name task_status result task_parents
          1 1 A 0 -1
          2 1 B 0 1
          3 1 C 0 -1
          4 1 D 0 2,3

          可以看到,這樣也能很好地存儲(chǔ) DAG 數(shù)據(jù),和框架中代碼的輸入方式差別并不是很大。

          接下來我們要做的是遍歷執(zhí)行整個(gè) workflow,這邊和框架的差別也不大。首先我們可以利用select * from task where workflow_id = 1 and task_parents = -1來獲取初始化節(jié)點(diǎn) Task A 和 Task C,將其提交到我們的線程池中。

          接著對應(yīng)框架代碼中的doExecute(processedNode.getChildren());,我們使用select * from task where task_parents like %3%,就可以得到 Task C 的孩子節(jié)點(diǎn) Task D,這里使用了模糊查詢是因?yàn)槲覀兊?task_parents 可能是由多個(gè)父親的 taskId 與分隔號組合而成的字符串。查詢到孩子節(jié)點(diǎn)后,繼續(xù)提交到線程池即可。

          別忘了我們在 Task D 這邊還有一個(gè)并發(fā)等待的操作,對應(yīng)框架代碼中的if (!processedNodes.contains(node)&&processedNodes.containsAll(node.getParents()))。這邊我們只要判斷select count(1) from task where task_id in (2,3) and status != 1的個(gè)數(shù)為 0 即可,即保證 parents task 全部成功。

          另外值得注意的是 task 的重試。在框架中,失敗 task 的重試可以是立即使用當(dāng)前線程重試或者放到一個(gè)定時(shí)線程池中去重試。而在平臺上,我們的重試基本上來自于用戶在界面上的點(diǎn)擊,即主線程。

          至此,我們已經(jīng)將任務(wù)編排框架的功能基本平臺化了。作為一個(gè)任務(wù)編排平臺,可拖拽編排的可視化輸入、整個(gè)工作流狀態(tài)的可視化展示、任務(wù)的可人工重試都是其優(yōu)點(diǎn)。


          作者:fredalxin

          來源:fredal.xin/task-scheduling-based-on-dag

          瀏覽 30
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  1234无码在线观看 | 蜜乳一区二区三区精品 | 天天拍天天日 | 免费在线亚洲视频 | 超碰人人看轻轻草 |