讓API并行調(diào)用變得如絲般順滑的絕招
當數(shù)據(jù)量較大的時候,都會通過分庫分表來拆分,分擔(dān)讀寫的壓力。分庫分表后比較麻煩的就是查詢的問題,如果不是直接根據(jù)分片鍵去查詢的話,需要對多個表進行查詢。
在一些復(fù)雜的業(yè)務(wù)場景下,比如訂單搜索,除了訂單號,用戶,商家 這些常用的搜索條件,可能還有時間,商品等等。
目前常見的做法將數(shù)據(jù)同步到 ES 這類搜索框架中進行查詢,然后通過搜出來的結(jié)果,一般是主鍵 ID, 再去具體的數(shù)據(jù)表中查詢完整的數(shù)據(jù),組裝返回給調(diào)用方。
比如下面這段代碼,首先查詢出文章信息,然后根據(jù)文章中的用戶 ID 去查詢用戶的昵稱。
List articleBos = articleDoPage.getRecords().stream().map(r -> {
String nickname = userManager.getNickname(r.getUserId());
return articleBoConvert.convertPlus(r, nickname);
}).collect(Collectors.toList());
如果文章有 10 條數(shù)據(jù),那么就需要調(diào)用 10 次用戶服務(wù)提供的接口,而且是同步調(diào)用操作。
當然我們也可以用并行流來實現(xiàn)并發(fā)調(diào)用,代碼如下:
List articleBos = articleDoPage.getRecords().parallelStream().map(r -> {
String nickname = userManager.getNickname(r.getUserId());
return articleBoConvert.convertPlus(r, nickname);
}).collect(Collectors.toList());
并行流的優(yōu)點很明顯,代碼不用做特別大的改動。需要注意如果用并行流,最好單獨定義一個 ForkJoinPool。
除了用并行流,還可以使用批量查詢的方式來提高性能,降低 RPC 的調(diào)用次數(shù),代碼如下:
List userIds = articleDoPage.getRecords().stream().map(article -> article.getUserId()).collect(Collectors.toList());
Map nickNameMap = userManager.queryByIds(userIds).stream().collect(Collectors.toMap(UserResponse::getId, UserResponse::getNickname));
List articleBos = articleDoPage.getRecords().stream().map(r -> {
String nickname = nickNameMap.containsKey(r.getUserId()) ? nickNameMap.get(r.getUserId()) : CommonConstant.DEFAULT_EMPTY_STR;
return articleBoConvert.convertPlus(r, nickname);
}).collect(Collectors.toList());
但批量查詢還是同步模式,下面介紹如果使用 CompletableFuture 來實現(xiàn)異步并發(fā)調(diào)用,直接用原生的 CompletableFuture 也可以,但是編排能力沒有那么強,這里我們選擇一款基于 CompletableFuture 封裝的并行編排框來實現(xiàn),詳細介紹查看我之前的這篇文章:https://mp.weixin.qq.com/s/3EE8ccydK16gC1oY4AWnoA
稍微做了下封裝,提供了更方便使用的工具類來實現(xiàn)并發(fā)調(diào)用多個接口的邏輯。
第一種方式,適用于比如從 ES 查出了一批 ID, 然后根據(jù) ID 去數(shù)據(jù)庫中或者調(diào)用 RPC 查詢真實數(shù)據(jù),最后得到一個 Map,可以根據(jù) Key 獲取對應(yīng)的數(shù)據(jù)。
內(nèi)部是多線程并發(fā)調(diào)用,會等到結(jié)果全部返回。
public Object aggregationApi() {
long s = System.currentTimeMillis();
List ids = new ArrayList<>();
ids.add("1");
ids.add("2");
ids.add("3");
Map callResult = AsyncTemplate.call(ids, id -> {
return userService.getUser(id);
}, u -> u.getId(), COMMON_POOL);
long e = System.currentTimeMillis();
System.out.println("耗時:" + (e-s) + "ms");
return "";
}
另一個場景就是 API 聚合的場景,需要并行調(diào)用多個接口,將結(jié)果進行組裝。
Listparams = new ArrayList<>();
AsyncCallgoodsQuery = new AsyncCall("goodsQuery", 1);
params.add(goodsQuery);AsyncCallorderQuery = new AsyncCall("orderQuery", "100");
params.add(orderQuery);UserQuery q = new UserQuery();
q.setAge(18);
q.setName("yinjihuan");
AsyncCalluserQuery = new AsyncCall("userQuery", q);
params.add(userQuery);AsyncTemplate.call(params, p -> {
if (p.getTaskId().equals("goodsQuery")) {
AsyncCallquery = p;
return goodsService.getGoodsName(query.getParam());
}
if (p.getTaskId().equals("orderQuery")) {
AsyncCallquery = p;
return orderService.getOrder(query.getParam());
}
if (p.getTaskId().equals("userQuery")) {
AsyncCallquery = p;
return userService.getUser(query.getParam());
}
return null;
});
AsyncCall 中定義參數(shù)和響應(yīng)的類型,響應(yīng)結(jié)果會在執(zhí)行完后會自動設(shè)置到 AsyncCall 中。在 call 方法中需要根據(jù) taskId 去做對應(yīng)的處理邏輯,不同的 taskId 調(diào)用的接口不一樣。
源碼參考:https://github.com/yinjihuan/kitty
關(guān)于作者:尹吉歡,簡單的技術(shù)愛好者,《Spring Cloud 微服務(wù)-全棧技術(shù)與案例解析》, 《Spring Cloud 微服務(wù) 入門 實戰(zhàn)與進階》作者, 公眾號 猿天地 發(fā)起人。
有收獲,不要吝嗇你的轉(zhuǎn)發(fā)和在看。
- END -
后臺回復(fù)?學(xué)習(xí)資料?領(lǐng)取學(xué)習(xí)視頻
如有收獲,點個在看,誠摯感謝
