Gobrs-Async高性能異步編排框架
Gobrs-Async 是一款功能強大、配置靈活、帶有全鏈路異?;卣{、內存優(yōu)化、異常狀態(tài)管理于一身的高性能異步編排框架。為企業(yè)提供在復雜應用場景下動態(tài)任務編排的能力。 針對于復雜場景下,異步線程復雜性、任務依賴性、異常狀態(tài)難控制性; Gobrs-Async 為此而生。
資源索引
能解決什么問題
能解決 CompletableFuture 所不能解決的問題。 怎么理解呢?
傳統的Future、CompleteableFuture一定程度上可以完成任務編排,并可以把結果傳遞到下一個任務。如CompletableFuture有then方法,但是卻無法做到對每一個執(zhí)行單元的回調。譬如A執(zhí)行完畢成功了,后面是B,我希望A在執(zhí)行完后就有個回調結果,方便我監(jiān)控當前的執(zhí)行狀況,或者打個日志什么的。失敗了,我也可以記錄個異常信息什么的。
此時,CompleteableFuture就無能為力了。
Gobrs-Async框架提供了這樣的回調功能。并且,如果執(zhí)行成功、失敗、異常、超時等場景下都提供了管理線程任務的能力!
場景概述
場景一
場景一
說明 任務A 執(zhí)行完了之后,繼續(xù)執(zhí)行 B、C、D
場景二
場景二
說明 任務A 執(zhí)行完了之后執(zhí)行B 然后再執(zhí)行 C、D
場景三
場景二
說明 任務A 執(zhí)行完了之后執(zhí)行B、E 然后按照順序 B的流程走C、D、G。 E的流程走F、G
還有更多場景,如果你想詳細理解任務編排的概念, 請仔細閱讀文檔,或者通過資源索引導航到官網了解全貌!
為什么寫這個項目
在開發(fā)復雜中臺業(yè)務過程中,難免會遇到調用各種中臺業(yè)務數據, 而且會出現復雜的中臺數據依賴關系,在這種情況下。代碼的復雜程度就會增加。 如下圖所示:
在電商平臺業(yè)務中, 各中臺數據可能依賴 商品Product 數據,而且需要依賴特殊屬性中 Item的數據。(有朋友會問,為什么Product 數據不和 Item數據出自同一個中臺呢?中臺業(yè)務發(fā)展是多樣性的,不同業(yè)務中臺設計方式不同 , 難道我們就不對接了嗎?所以我們要針對于這種復雜多變的中臺業(yè)務數據提供技術支撐才是一個合格的開發(fā)者應該做的)而且Item數據是HTTP的服務,但Product 是RPC服務。 如果按照Future的 開發(fā)方式。我們可能會這樣開發(fā)
// 并行處理任務 Product 、 Item 的任務
@Resource
List<ParaExector> paraExectors;
// 依賴于Product 和 Item的 任務
@Resource
List<SerExector> serExectors;
public void testFuture(HttpServletRequest httpServletRequest) {
DataContext dataContext = new DataContext();
dataContext.setHttpServletRequest(httpServletRequest);
List<Future> list = new ArrayList<>();
for (AsyncTask asyncTask : paraExectors) {
Future<?> submit = gobrsThreadPoolExecutor.submit(() -> {
asyncTask.task(dataContext, null);
});
list.add(submit);
}
for (Future future : list) {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
List<Future> ser = new ArrayList<>();
for (AsyncTask asyncTask : serExectors) {
Future<?> submit = gobrsThreadPoolExecutor.submit(() -> {
asyncTask.task(dataContext, null);
});
ser.add(submit);
}
for (Future future : ser) {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
存在的問題
以上示例中,Product數據是通過RPC 方式獲取, Item是通過HTTP服務獲取,大家都知道, RPC性能要高于HTTP性能。 但是通過Future 的方式, get會阻塞等待 Item數據返回后才會往下執(zhí)行。 這樣的話, 圖書音像、裝修數據、限購數據等都要等待Item數據返回,但是這些中臺并不依賴Item返回的數據, 所以會產生等待時間影響系統整體QPS。
起源
起源
-
作者通過對開源中間件的源碼詳細閱讀和二次開發(fā)的經驗和使用心得總結而來。
-
用戶的一些使用體驗 包括業(yè)務的需求
Gobrs-Async 核心能力
核心能力
業(yè)界對比
在開源平臺找了挺多任務異步編排框架,發(fā)現都不是很理想,唯一一款現階段比較好用的異步編排框架就數asyncTool比較好用。但是在使用時發(fā)現,API不是很好用。而且需要頻繁的創(chuàng)建 WorkerWrapper 對象 用起來有點不爽。對于業(yè)務比較復雜的場景,在開發(fā)時需要寫較多的 WorkerWrapper 代碼,而且框架不能對全局異常進行攔截。只能通過任務的 result 方法捕捉單任務的異常。 不能在任意任務出現異常后 停止全局的異步任務。同時無法在發(fā)生全局異常的時候進行異常攔截。如果需要實現在發(fā)生需停止全局任務流的時候,發(fā)送報警郵件的功能。 asyncTool就顯得力不從心了。
asyncTool 本身已經功能很強大了,本人與asyncTool 作者 都是在京東擔任研發(fā)工作。 涉及到的場景大同小異。 會有很多業(yè)務場景,很復雜的中臺接口調用關系。所以針對當前業(yè)務場景。需要探索更多的技術領域。本身技術是應該服務于業(yè)務,落地業(yè)務場景。
想給項目起一個簡單易記的名字,類似于 Eureka、Nacos、Redis;經過再三考慮后,決定命名:Gobrs-Async
| 功能 | asyncTool | Gobrs-Async | sirector |
|---|---|---|---|
| 多任務處理 | 是 | 是 | 是 |
| 單任務異?;卣{ | 是 | 是 | 否 |
| 全局異常中斷 | 否 | 是 | 否 |
| 可配置任務流 | 否 | 是 | 否 |
| 自定義異常攔截器 | 否 | 是 | 否 |
| 內存優(yōu)化 | 否 | 是 | 否 |
| 可選的任務執(zhí)行 | 否 | 是 | 否 |
它解決了什么問題
在請求調用各大中臺數據時,難免會出現多個中臺數據互相依賴的情況,現實開發(fā)中會遇到如下場景。
并行常見的場景 1 客戶端請求服務端接口,該接口需要調用其他N個微服務的接口
譬如 請求我的購物車,那么就需要去調用用戶的rpc、商品詳情的rpc、庫存rpc、優(yōu)惠券等等好多個服務。同時,這些服務還有相互依賴關系,譬如必須先拿到商品id后,才能去庫存rpc服務請求庫存信息。 最終全部獲取完畢后,或超時了,就匯總結果,返回給客戶端。
2 并行執(zhí)行N個任務,后續(xù)根據這1-N個任務的執(zhí)行結果來決定是否繼續(xù)執(zhí)行下一個任務
如用戶可以通過郵箱、手機號、用戶名登錄,登錄接口只有一個,那么當用戶發(fā)起登錄請求后,我們需要并行根據郵箱、手機號、用戶名來同時查數據庫,只要有一個成功了,都算成功,就可以繼續(xù)執(zhí)行下一步。而不是先試郵箱能否成功、再試手機號……
再如某接口限制了每個批次的傳參數量,每次最多查詢10個商品的信息,我有45個商品需要查詢,就可以分5堆并行去查詢,后續(xù)就是統計這5堆的查詢結果。就看你是否強制要求全部查成功,還是不管有幾堆查成功都給客戶做返回
再如某個接口,有5個前置任務需要處理。其中有3個是必須要執(zhí)行完畢才能執(zhí)行后續(xù)的,另外2個是非強制的,只要這3個執(zhí)行完就可以進行下一步,到時另外2個如果成功了就有值,如果還沒執(zhí)行完,就是默認值。
3 需要進行線程隔離的多批次任務。
如多組任務, 各組任務之間彼此不相關,每組都需要一個獨立的線程池,每組都是獨立的一套執(zhí)行單元的組合。有點類似于hystrix的線程池隔離策略。
4 單機工作流任務編排。
5 其他有順序編排的需求。
它有什么特性
Gobrs-Async 在開發(fā)時考慮了眾多使用者的開發(fā)喜歡,對異常處理的使用場景。并被運用到電商生產環(huán)境中,在京東經歷這嚴酷的高并發(fā)考驗。同時框架中 極簡靈活的配置、全局自定義可中斷全流程異常、內存優(yōu)化、靈活的接入方式、提供SpringBoot Start 接入方式。更加考慮使用者的開發(fā)習慣。僅需要注入GobrsTask的Spring Bean 即可實現全流程接入。
Gobrs-Async 項目目錄及其精簡
-
gobrs-async-example:Gobrs-Async 接入實例,提供測試用例。 -
gobrs-async-starter:Gobrs-Async 框架核心組件
Gobrs-Async 在設計時,就充分考慮了開發(fā)者的使用習慣, 沒有依賴任何中間件。 對并發(fā)框架做了良好的封裝。主要使用 CountDownLatch 、ReentrantLock 、volatile 等一系列并發(fā)技術開發(fā)設計。
整體架構
1.0
任務觸發(fā)器
任務流的啟動者, 負責啟動任務執(zhí)行流
規(guī)則解析引擎
負責解析使用者配置的規(guī)則,同時于Spring結合,將配置的 Spring Bean 解析成 TaskBean,進而通過解析引擎加載成 任務裝飾器。進而組裝成任務樹
任務啟動器
負責通過使用解析引擎解析的任務樹。結合 JUC 并發(fā)框架調度實現對任務的統一管理,核心方法有
-
trigger 觸發(fā)任務加載器,為加載任務準備環(huán)境
任務加載器
負責加載任務流程,開始調用任務執(zhí)行器執(zhí)行核心流程
-
load 核心任務流程方法,在這里阻塞等待整個任務流程
-
getBeginProcess 獲取子任務開始流程
-
completed 任務完成
-
errorInterrupted 任務失敗 中斷任務流程
-
error 任務失敗
任務執(zhí)行器
最終的任務執(zhí)行,每一個任務對應一個TaskActuator 任務的 攔截、異常、執(zhí)行、線程復用 等必要條件判斷都在這里處理
-
prepare 任務前置處理
-
preInterceptor 統一任務前置處理
-
task 核心任務方法,業(yè)務執(zhí)行內容
-
postInterceptor 統一后置處理
-
onSuccess 任務執(zhí)行成功回調
-
onFail 任務執(zhí)行失敗回調
任務總線
任務流程傳遞總線,包括 請求參數、任務加載器、 響應結果, 該對象暴露給使用者,拿到匹配業(yè)務的數據信息,例如: 返回結果、主動中斷任務流程等功能 需要任務總線(TaskSupport)支持
核心類圖
