關(guān)于聚合和多線程的處理套路

作者:Honwhy
來(lái)源:SegmentFault 思否
概述
無(wú)差別地請(qǐng)求多個(gè)外部接口并聚合所有請(qǐng)求結(jié)果,應(yīng)該有屬于它自己的套路,應(yīng)該將所有多線程的操作屏蔽之,我們只關(guān)心參數(shù)和結(jié)果。因此,應(yīng)該拋棄Callable/FutureTask/Future等這些手工模式,這些代碼應(yīng)該交給框架來(lái)實(shí)現(xiàn)。
手工模式
何為手工模式,我們以Callable為例設(shè)計(jì)請(qǐng)求外部的接口,可能像下面這樣子,參數(shù)是NumberParam,兩個(gè)外部接口分別是IntToStringCallable和DoubleToStringCallable,
? ? ? ? ? ? ? ?
class IntToStringCallable implements Callable{ private final NumberParam param;IntToStringCallable(NumberParam numberParam) {this.param = numberParam;}@Overridepublic String call() {return Integer.toHexString(param.getAge());}}class DoubleToStringCallable implements Callable{ private final NumberParam param;DoubleToStringCallable(NumberParam numberParam) {this.param = numberParam;}@Overridepublic String call() {return Double.toHexString(param.getMoney());}}
如果采用FutureTask的方式多線程執(zhí)行這兩個(gè)接口,可能是這樣子的,
? ? ? ? ? ? ? ?
FutureTaskr1 = new FutureTask<>(new IntToStringCallable(numberParam)); new Thread(r1).start();FutureTaskr2 = new FutureTask<>(new DoubleToStringCallable(numberParam)); new Thread(r2).start();try {Listret = new ArrayList<>(); ret.add(r1.get());ret.add(r2.get());log.info("ret=" + ret);} catch (Exception ignore) {}
需要首先構(gòu)造FutureTask,然后使用Thread比較原始的api去執(zhí)行,當(dāng)然還可以再簡(jiǎn)化一下,比如使用Future方式,
? ? ? ? ? ? ? ?
ExecutorService threadPool = Executors.newFixedThreadPool(2);Futurer1 = threadPool.submit(new IntToStringCallable(numberParam)); Futurer2 = threadPool.submit(new DoubleToStringCallable(numberParam)); try {Listret = new ArrayList<>(); ret.add(r1.get());ret.add(r2.get());log.info("ret=" + ret);} catch (Exception ignore) {}
我相信這是一種普遍常見(jiàn)的做法了。這里沒(méi)有必要繼續(xù)評(píng)論這些做法的問(wèn)題了。
Java 8之后
Java 8之后有了更加方便的異步編程方式了,不用再辛苦地去寫(xiě)Callable的,一句話就可以表達(dá)Callable+FutureTask/...,
? ? ? ? ? ? ? ?
CompletableFuture pf = CompletableFuture.supplyAsync(() -> new IntToStringCallable(numberParam).call()); 改造之前的做法結(jié)果可能就是這個(gè)樣子了,
? ? ? ? ? ? ? ?
CompletableFuturer1 = CompletableFuture.supplyAsync(() -> new IntToStringCallable(numberParam).call()); CompletableFuturer2 = CompletableFuture.supplyAsync(() -> new DoubleToStringCallable(numberParam).call()); try {Listret = new ArrayList<>(); ret.add(r1.get());ret.add(r2.get());log.info("ret=" + ret);} catch (Exception ignore) {}
其實(shí)可以看出來(lái),這個(gè)時(shí)候我們不一定需要一個(gè)Callable了,提供異步的能力是supplyAsync來(lái)完成的,我們只需要正常的入?yún)⒊鰠⒌钠胀ǚ椒ň涂梢粤恕?/span>
Java 8之后再之后
Java 8之后的異步編程方式確實(shí)簡(jiǎn)單了很多,但是在我們的業(yè)務(wù)代碼中還是出現(xiàn)了和異步編程相關(guān)的無(wú)關(guān)業(yè)務(wù)邏輯的事情,可否繼續(xù)簡(jiǎn)化呢。本案的設(shè)計(jì)靈感來(lái)自同樣Java 8的優(yōu)秀設(shè)計(jì)——ParallelStream,舉個(gè)簡(jiǎn)單的例子,
? ? ? ? ? ? ? ?
Arrays.asList("a", "b", "c").parallelStream().map(String::toUpperCase).collect(Collectors.toList());
異步及多線程是ParallelStream來(lái)完成的,用戶只需要完成String::toUpperCase部分。
本案的設(shè)計(jì)主要有三個(gè)interface來(lái)實(shí)現(xiàn),分別是,
? ? ? ? ? ? ? ?
public interface MyProvider{ T provide(V v);}public interface MyCollector{ void collectList(T t);ListretList(); }public interface MyStream{ ListtoList(List > providers, V v); }
其實(shí)MyProvider表達(dá)是請(qǐng)求外部接口,MyStream表示一種類(lèi)似ParallelStream的思想,一種內(nèi)化異步多線程的操作模式,MyCollector屬于內(nèi)部設(shè)計(jì)api可以不暴露給用戶;
一個(gè)改寫(xiě)上面的例子的例子,
? ? ? ? ? ? ? ?
@Testpublic void testStream() {MyProviderp1 = new IntToStringProvider(); MyProviderp2 = new DoubleToStringProvider(); List> providers = Arrays.asList(p1, p2); MyStreammyStream = new CollectStringStream(); Liststrings = myStream.toList(providers, numberParam); log.info("ret=" + strings);}
在這個(gè)方法內(nèi)一點(diǎn)異步編程的內(nèi)容都沒(méi)有的,用戶只需要編程自己關(guān)心的邏輯即可,當(dāng)然是要按照Provider的思路去寫(xiě),這或許有一點(diǎn)心智負(fù)擔(dān)。
這個(gè)CollectStringStream幫我們完成來(lái)一些臟活累活,
? ? ? ? ? ? ? ?
public ListtoList(List > myProviders, NumberParam param) { MyCollectormyCollector = new NoMeaningCollector(); List> pfs = new ArrayList<>(myProviders.size()); for (MyProviderprovider : myProviders) { CompletableFuturepf = CompletableFuture.runAsync(() -> myCollector.collectList(provider.provide(param)), executor); pfs.add(pf);}try {CompletableFuture.allOf(pfs.toArray(new CompletableFuture[0])).get(3, TimeUnit.SECONDS);} catch (Exception e) {if (e instanceof TimeoutException) {pfs.forEach(p -> {if (!p.isDone()){p.cancel(true);}});}}return myCollector.retList();}
這樣看起這個(gè)設(shè)計(jì)又不美了,但是如果有更多的外部接口需要調(diào)用,CollectStringStream就顯得很有價(jià)值了,新加入再多的請(qǐng)求外部接口要改動(dòng)的代碼很少很少,所以這種思想我覺(jué)得是值得推廣的。
總結(jié)
照例附上參考代碼,不過(guò)值得思考的是我們?nèi)绾蜗駜?yōu)秀的代碼學(xué)習(xí)并運(yùn)用到自己的項(xiàng)目中。?
參考代碼
java-toy:https://github.com/honwhy/java-toy

