asyncTool多線程編排一站式解決方案
解決任意的多線程并行、串行、阻塞、依賴、回調(diào)的并行框架,可以任意組合各線程的執(zhí)行順序,帶全鏈路執(zhí)行結(jié)果回調(diào)。多線程編排一站式解決方案。來自于京東主 App 后臺。
該框架目前正在京東 App 后臺接受苛刻、高并發(fā)、海量用戶等復雜場景業(yè)務的檢驗測試,隨時會根據(jù)實際情況發(fā)布更新和 bugFix。
并行常見的場景
1. 客戶端請求服務端接口,該接口需要調(diào)用其他N個微服務的接口
譬如 請求我的訂單,那么就需要去調(diào)用用戶的rpc、商品詳情的rpc、庫存rpc、優(yōu)惠券等等好多個服務。同時,這些服務還有相互依賴關(guān)系,譬如必須先拿到用戶的某個字段后,再去某rpc服務請求數(shù)據(jù)。 最終全部獲取完畢后,或超時了,就匯總結(jié)果,返回給客戶端。
2. 并行執(zhí)行N個任務,后續(xù)根據(jù)這1-N個任務的執(zhí)行結(jié)果來決定是否繼續(xù)執(zhí)行下一個任務
如用戶可以通過郵箱、手機號、用戶名登錄,登錄接口只有一個,那么當用戶發(fā)起登錄請求后,我們需要并行根據(jù)郵箱、手機號、用戶名來同時查數(shù)據(jù)庫,只要有一個成功了,都算成功,就可以繼續(xù)執(zhí)行下一步。而不是先試郵箱能否成功、再試手機號……
再如某接口限制了每個批次的傳參數(shù)量,每次最多查詢10個商品的信息,我有45個商品需要查詢,就可以分5堆并行去查詢,后續(xù)就是統(tǒng)計這5堆的查詢結(jié)果。就看你是否強制要求全部查成功,還是不管有幾堆查成功都給客戶做返回
再如某個接口,有5個前置任務需要處理。其中有3個是必須要執(zhí)行完畢才能執(zhí)行后續(xù)的,另外2個是非強制的,只要這3個執(zhí)行完就可以進行下一步,到時另外2個如果成功了就有值,如果還沒執(zhí)行完,就是默認值。
3. 需要進行線程隔離的多批次任務
如多組任務, 各組任務之間彼此不相關(guān),每組都需要一個獨立的線程池,每組都是獨立的一套執(zhí)行單元的組合。有點類似于hystrix的線程池隔離策略。
4. 單機工作流任務編排
5. 其他有順序編排的需求
并行場景之核心——任意編排
1 多個執(zhí)行單元的串行請求
2 多個執(zhí)行單元的并行請求
3 阻塞等待,串行的后面跟多個并行
4 阻塞等待,多個并行的執(zhí)行完畢后才執(zhí)行某個
5 串并行相互依賴
6 復雜場景
并行場景可能存在的需求之——每個執(zhí)行結(jié)果的回調(diào)
傳統(tǒng)的Future、CompleteableFuture一定程度上可以完成任務編排,并可以把結(jié)果傳遞到下一個任務。如CompletableFuture有then方法,但是卻無法做到對每一個執(zhí)行單元的回調(diào)。譬如A執(zhí)行完畢成功了,后面是B,我希望A在執(zhí)行完后就有個回調(diào)結(jié)果,方便我監(jiān)控當前的執(zhí)行狀況,或者打個日志什么的。失敗了,我也可以記錄個異常信息什么的。
此時,CompleteableFuture就無能為力了。
我的框架提供了這樣的回調(diào)功能。并且,如果執(zhí)行異常、超時,可以在定義這個執(zhí)行單元時就設(shè)定默認值。
并行場景可能存在的需求之——執(zhí)行順序的強依賴和弱依賴
如上圖的3,A和B并發(fā)執(zhí)行,最后是C。
有些場景下,我們希望A和B都執(zhí)行完畢后,才能執(zhí)行C,CompletableFuture里有個allOf(futures...).then()方法可以做到。
有些場景下,我們希望A或者B任何一個執(zhí)行完畢,就執(zhí)行C,CompletableFuture里有個anyOf(futures...).then()方法可以做到。
我的框架同樣提供了類似的功能,通過設(shè)定wrapper里的addDepend依賴時,可以指定依賴的任務是否must執(zhí)行完畢。如果依賴的是must要執(zhí)行的,那么就一定會等待所有的must依賴項全執(zhí)行完畢,才執(zhí)行自己。
如果依賴的都不是must,那么就可以任意一個依賴項執(zhí)行完畢,就可以執(zhí)行自己了。
注意:這個依賴關(guān)系是有必須和非必須之分的,還有一個重要的東西是執(zhí)行單元不能重復執(zhí)行。譬如圖4,如果B執(zhí)行完畢,然后執(zhí)行了A,此時C終于執(zhí)行完了,然后也到了A,此時就會發(fā)現(xiàn)A已經(jīng)在執(zhí)行,或者已經(jīng)完畢(失?。?,那么就不應該再重復執(zhí)行A。
還有一種場景,如下圖,A和D并行開始,D先執(zhí)行完了,開始執(zhí)行Result任務,此時B和C都還沒開始,然后Result執(zhí)行完了,雖然B和C都還沒執(zhí)行,但是已經(jīng)沒必要執(zhí)行了。B和C這些任務是可以被跳過的,跳過的原則是他們的NextWrapper已經(jīng)有結(jié)果了或者已經(jīng)在執(zhí)行了。我提供了checkNextWrapperResult方法來控制,當后面的任務已經(jīng)執(zhí)行了,自己還要不要執(zhí)行的邏輯控制。當然,這個控制僅限于nextWrapper只有一個時才有成立。
并發(fā)場景可能存在的需求之——依賴上游的執(zhí)行結(jié)果作為入?yún)?/h2>
譬如A-B-C三個執(zhí)行單元,A的入?yún)⑹荢tring,出參是int,B呢它需要用A的結(jié)果作為自己的入?yún)?。也就是說A、B并不是獨立的,而是有結(jié)果依賴關(guān)系的。
在A執(zhí)行完畢之前,B是取不到結(jié)果的,只是知道A的結(jié)果類型。
那么,我的框架也支持這樣的場景。可以在編排時,就取A的結(jié)果包裝類,作為B的入?yún)?。雖然此時尚未執(zhí)行,必然是空,但可以保證A執(zhí)行完畢后,B的入?yún)毁x值。
并發(fā)場景可能存在的需求之——全組任務的超時
一組任務,雖然內(nèi)部的各個執(zhí)行單元的時間不可控,但是我可以控制全組的執(zhí)行時間不超過某個值。通過設(shè)置timeOut,來控制全組的執(zhí)行閾值。
并發(fā)場景可能存在的需求之——高性能、低線程數(shù)
該框架全程無鎖,不依靠線程鎖來保證順序。
創(chuàng)建線程量少。 如這樣的,A會運行在B、C執(zhí)行更慢的那個單元的線程上,而不會額外創(chuàng)建線程。
asyncTool特點
解決任意的多線程并行、串行、阻塞、依賴、回調(diào)的并發(fā)框架,可以任意組合各線程的執(zhí)行順序,帶全鏈路回調(diào)和超時控制。
其中的A、B、C分別是一個最小執(zhí)行單元(worker),可以是一段耗時代碼、一次Rpc調(diào)用等,不局限于你做什么。
該框架可以將這些worker,按照你想要的各種執(zhí)行順序,加以組合編排。最終得到結(jié)果。
并且,該框架 為每一個worker都提供了執(zhí)行結(jié)果的回調(diào)和執(zhí)行失敗后自定義默認值 。譬如A執(zhí)行完畢后,A的監(jiān)聽器會收到回調(diào),帶著A的執(zhí)行結(jié)果(成功、超時、異常)。
根據(jù)你的需求,將各個執(zhí)行單元組合完畢后,開始在主線程執(zhí)行并阻塞,直到最后一個執(zhí)行完畢。并且 可以設(shè)置全組的超時時間 。
該框架支持后面的執(zhí)行單元以前面的執(zhí)行單元的結(jié)果為自己的入?yún)?/strong> 。譬如你的執(zhí)行單元B的入?yún)⑹荝esultA,ResultA就是A的執(zhí)行結(jié)果,那也可以支持。在編排時,就可以預先設(shè)定B或C的入?yún)锳的result,即便此時A尚未開始執(zhí)行。當A執(zhí)行完畢后,自然會把結(jié)果傳遞到B的入?yún)⑷ァ?/p>
快速開始
