還在用Future輪詢獲取結(jié)果?CompletionService快了解下
背景
領(lǐng)導(dǎo)一直不給他安排其他開發(fā)任務(wù),就一直讓他看看代碼熟悉業(yè)務(wù)。二胖每天上班除了偶爾跟坐在隔壁的前端小姐姐聊聊天,就是看看這些枯燥無味的業(yè)務(wù)代碼,無聊的一匹。雖然二胖已是久經(jīng)職場(chǎng)的老油條了,但是看到同事們的周報(bào)都寫的滿滿的,而自己的周報(bào),就一兩行,熟悉了什么功能。心里還是慌得一匹,畢竟公司不養(yǎng)閑人啊。于是乎二胖終于鼓起勇氣為了向領(lǐng)導(dǎo)表明自己的上進(jìn)心,主動(dòng)向領(lǐng)導(dǎo)要開發(fā)任務(wù)。領(lǐng)導(dǎo)一看這小伙子這么有上進(jìn)心,于是就到任務(wù)看板里面挑了一個(gè)業(yè)務(wù)邏輯比較簡單的任務(wù)分配給了二胖。二胖拿到這個(gè)任務(wù)屁顛屁顛的回到座位。任務(wù)比較簡單,就是通過爬蟲去爬取某些賣機(jī)票(某豬、某攜、某團(tuán)等)的網(wǎng)站的一些機(jī)票,然后保存到數(shù)據(jù)庫。
同步入庫
二胖拿到任務(wù),三下五除二就把任務(wù)完成了。
?public?static?void?main(String[]?args)?throws?InterruptedException?{
????????String?mouZhuFlightPrice?=?getMouZhuFlightPrice();
????????String?mouXieFlightPrice?=?getMouXieFlightPrice();
????????String?mouTuanFlightPrice?=?getMouTuanFlightPrice();
????????saveDb(mouZhuFlightPrice);
????????saveDb(mouXieFlightPrice);
????????saveDb(mouTuanFlightPrice);
????}
????/**
?????*?模擬請(qǐng)求某豬網(wǎng)站?爬取機(jī)票信息
?????*
?????*
?????*?@return
?????*?@throws?InterruptedException
?????*/
????public?static?String?getMouZhuFlightPrice()?throws?InterruptedException?{
????????//?模擬請(qǐng)求某豬網(wǎng)站?爬取機(jī)票信息
????????Thread.sleep(10000);
????????return?"獲取到某豬網(wǎng)站的機(jī)票信息了";
????}
????/**
?????*?模擬請(qǐng)求某攜網(wǎng)站?爬取機(jī)票信息
?????*
?????*?@return
?????*?@throws?InterruptedException
?????*/
????public?static?String?getMouXieFlightPrice()?throws?InterruptedException?{
????????//?模擬請(qǐng)求某攜網(wǎng)站?爬取機(jī)票信息
????????Thread.sleep(5000);
????????return?"獲取到某攜網(wǎng)站的機(jī)票信息了";
????}
????/**
?????*?模擬請(qǐng)求團(tuán)網(wǎng)站?爬取機(jī)票信息
?????*
?????*?@return
?????*?@throws?InterruptedException
?????*/
????public?static?String?getMouTuanFlightPrice()?throws?InterruptedException?{
????????//?模擬請(qǐng)求某團(tuán)網(wǎng)站?爬取機(jī)票信息
????????Thread.sleep(3000);
????????return?"獲取到某團(tuán)網(wǎng)站的機(jī)票信息了";
????}
????/**
?????*?保存DB
?????*
?????*?@param?flightPriceList
?????*/
????public?static?void?saveDb(String?flightPriceList)?{
????????????//?解析字符串?進(jìn)行異步入庫
????}
這次二胖學(xué)乖了,任務(wù)完成了先去找下坐他對(duì)面的技術(shù)大拿(看他那發(fā)際線就知道了)同事“二狗”讓二狗大拿幫忙指點(diǎn)一二,看看代碼是否還能有優(yōu)化的地方。畢竟領(lǐng)導(dǎo)對(duì)代碼的性能、以及代碼的優(yōu)雅是有要求的。領(lǐng)導(dǎo)多次在部門的周會(huì)上提到讓我們多看看“二狗”寫的代碼,學(xué)習(xí)下人家寫代碼的優(yōu)雅、抽象、封裝等等。二狗大概的瞄了下二胖寫的代碼,提出了個(gè)小小的建議“這個(gè)代碼可以采用多線程來優(yōu)化下哦,你看某豬這個(gè)網(wǎng)站耗時(shí)是拿到結(jié)果需要10s,其他的耗時(shí)都比它短,先有結(jié)果的我們可以先處理的,不需要等到大家都返回了再來處理的”。
輪循futureList獲取結(jié)果
幸好二胖對(duì)多線程了解一點(diǎn)點(diǎn),于是乎采用future的方式來實(shí)現(xiàn)。二胖使用一個(gè)List來保存每個(gè)任務(wù)返回的Future,然后去輪詢這些Future,直到每個(gè)Future都已完成。由于需要先完成的任務(wù)需要先執(zhí)行,且不希望出現(xiàn)因?yàn)榕旁谇懊娴娜蝿?wù)阻塞導(dǎo)致后面先完成的任務(wù)的結(jié)果沒有及時(shí)獲取的情況,所以在調(diào)用get方式時(shí),需要將超時(shí)時(shí)間設(shè)置為0。
??public?static?void?main(String[]?args)?{
????????int?taskSize?=?3;
????????Future?mouZhuFlightPriceFuture?=?executor.submit(()?->?getMouZhuFlightPrice());
????????Future?mouXieFlightPriceFuture?=?executor.submit(()?->?getMouXieFlightPrice());
????????Future?mouTuanFlightPriceFuture?=?executor.submit(()?->?getMouTuanFlightPrice());
????????List>?futureList?=?new?ArrayList<>();
????????futureList.add(mouZhuFlightPriceFuture);
????????futureList.add(mouXieFlightPriceFuture);
????????futureList.add(mouTuanFlightPriceFuture);
????????//?輪詢,獲取完成任務(wù)的返回結(jié)果
????????while?(taskSize?>?0)?{
????????????for?(Future?future?:?futureList)?{
????????????????String?result?=?null;
????????????????try?{
????????????????????result?=?future.get(0,?TimeUnit.SECONDS);
????????????????}?catch?(InterruptedException?e)?{
????????????????????taskSize--;
????????????????????e.printStackTrace();
????????????????}?catch?(ExecutionException?e)?{
????????????????????taskSize--;
????????????????????e.printStackTrace();
????????????????}?catch?(TimeoutException?e)?{
????????????????????//?超時(shí)異常需要忽略,因?yàn)槲覀冊(cè)O(shè)置了等待時(shí)間為0,只要任務(wù)沒有完成,就會(huì)報(bào)該異常
????????????????}
????????????????//?任務(wù)已經(jīng)完成
????????????????if?(result?!=?null)?{
????????????????????System.out.println("result="?+?result);
????????????????????//?從future列表中刪除已經(jīng)完成的任務(wù)
????????????????????futureList.remove(future);
????????????????????taskSize--;
????????????????????//?此處必須break,否則會(huì)拋出并發(fā)修改異常。(也可以通過將futureList聲明為CopyOnWriteArrayList類型解決)
????????????????????break;?//?進(jìn)行下一次while循環(huán)
????????????????}
????????????}
????????}
????}
上述代碼有兩個(gè)小細(xì)節(jié)需要注意下:
如采用
ArrayList的話futureList刪除之后需要break進(jìn)行下一次while循環(huán),否則會(huì)產(chǎn)生我們意想不到的ConcurrentModificationException異常。具體原因可看下《ArrayList的刪除姿勢(shì)你都掌握了嗎》這個(gè)文章,里面有詳細(xì)的介紹。在捕獲了
InterruptedException和ExecutionException異常后記得?taskSize--否則就會(huì)發(fā)生死循環(huán)。如果生產(chǎn)發(fā)生了死循環(huán)你懂的,cpu被你打滿,程序假死等。你離被開除也不遠(yuǎn)了。上面輪詢
future列表非常的復(fù)雜,而且還有很多異常需要處理,還有很多細(xì)節(jié)需要考慮,還有被開除的風(fēng)險(xiǎn)。所以這種方案也被pass了。
自定義BlockingQueue實(shí)現(xiàn)
上述方案被 pass之后,二胖就在思考可以借用哪種數(shù)據(jù)來實(shí)現(xiàn)下先進(jìn)先出的功能,貌似隊(duì)列可以實(shí)現(xiàn)下這個(gè)功能。所以二胖又寫了一版采用隊(duì)列來實(shí)現(xiàn)的功能。
??final?static?ExecutorService?executor?=?new?ThreadPoolExecutor(6,?6,
????????????0L,?TimeUnit.MILLISECONDS,?new?LinkedBlockingQueue<>());
????public?static?void?main(String[]?args)?throws?InterruptedException,?ExecutionException?{
????????Future?mouZhuFlightPriceFuture?=?executor.submit(()?->?getMouZhuFlightPrice());
????????Future?mouXieFlightPriceFuture?=?executor.submit(()?->?getMouXieFlightPrice());
????????Future?mouTuanFlightPriceFuture?=?executor.submit(()?->?getMouTuanFlightPrice());
????????//?創(chuàng)建阻塞隊(duì)列
????????BlockingQueue?blockingQueue?=?new?LinkedBlockingQueue<>(3);
????????executor.execute(()?->?run(mouZhuFlightPriceFuture,?blockingQueue));
????????executor.execute(()?->?run(mouXieFlightPriceFuture,?blockingQueue));
????????executor.execute(()?->?run(mouTuanFlightPriceFuture,?blockingQueue));
????????//?異步保存所有機(jī)票價(jià)格
????????for?(int?i?=?0;?i?3;?i++)?{
????????????String?result?=?blockingQueue.take();
????????????System.out.println(result);
????????????saveDb(result);
????????}
????}
????private?static?void?run(Future?flightPriceFuture,?BlockingQueue?blockingQueue) ?{
????????try?{
????????????blockingQueue.put(flightPriceFuture.get());
????????}?catch?(InterruptedException?e)?{
????????????e.printStackTrace();
????????}?catch?(ExecutionException?e)?{
????????????e.printStackTrace();
????????}
????}
這次比上個(gè)版本好多了,代碼也簡潔多了。不過按理說這種需求應(yīng)該是大家經(jīng)常遇到的,應(yīng)該不需要自己來實(shí)現(xiàn)把, JAVA這么貼心的語言應(yīng)該會(huì)有api可以直接拿來用吧。
CompletionService實(shí)現(xiàn)
二胖現(xiàn)在畢竟也是對(duì)代碼的簡潔性有追求的人了。于是乎二胖去翻翻自己躺在書柜里吃灰的并發(fā)相關(guān)的書籍,看看是否有解決方案。 終于皇天不負(fù)有心人在二胖快要放棄的時(shí)候突然發(fā)現(xiàn)了新大陸。?《Java并發(fā)編程實(shí)戰(zhàn)》一書6.3.5節(jié)
CompletionService:Executor和BlockingQueue,有這樣一段話:
如果向Executor提交了一組計(jì)算任務(wù),并且希望在計(jì)算完成后獲得結(jié)果,那么可以保留與每個(gè)任務(wù)關(guān)聯(lián)的Future,然后反復(fù)使用get方法,同時(shí)將參數(shù)timeout指定為0,從而通過輪詢來判斷任務(wù)是否完成。這種方法雖然可行,但卻有些繁瑣。幸運(yùn)的是,還有一種更好的方法:完成服務(wù)CompletionService。
??final?static?ExecutorService?executor?=?new?ThreadPoolExecutor(6,?6,
????????????0L,?TimeUnit.MILLISECONDS,?new?LinkedBlockingQueue<>());
????public?static?void?main(String[]?args)?throws?ExecutionException,?InterruptedException?{
????????CompletionService?completionService?=?new?ExecutorCompletionService(executor);
????????completionService.submit(()?->?getMouZhuFlightPrice());
????????completionService.submit(()?->?getMouXieFlightPrice());
????????completionService.submit(()?->?getMouTuanFlightPrice());
????????for?(int?i?=?0;?i?3;?i++)?{
????????????String?result?=?(String)completionService.take().get();
????????????System.out.println(result);
????????????saveDb(result);
????????}
????}
當(dāng)我們使用了CompletionService不用遍歷future列表,也不需要去自定義隊(duì)列了,代碼變得簡潔了。下面我們就來分析下CompletionService實(shí)現(xiàn)的原理吧。
CompletionService 介紹
我們可以先看下 JDK源碼中CompletionService的javadoc說明吧
/**
?*?A?service?that?decouples?the?production?of?new?asynchronous?tasks
?*?from?the?consumption?of?the?results?of?completed?tasks.??Producers
?*?{@code?submit}?tasks?for?execution.?Consumers?{@code?take}
?*?completed?tasks?and?process?their?results?in?the?order?they
?*?complete.
大概意思是CompletionService實(shí)現(xiàn)了生產(chǎn)者提交任務(wù)和消費(fèi)者獲取結(jié)果的解耦,生產(chǎn)者和消費(fèi)者都不用關(guān)心任務(wù)的完成順序,由CompletionService來保證,消費(fèi)者一定是按照任務(wù)完成的先后順序來獲取執(zhí)行結(jié)果。
成員變量
既然需要按照任務(wù)的完成順序獲取結(jié)果,那內(nèi)部應(yīng)該也是通過隊(duì)列來實(shí)現(xiàn)的吧。打開源碼我們可以看到,里面有三個(gè)成員變量
public?class?ExecutorCompletionService<V>?implements?CompletionService<V>?{
?//?執(zhí)行task的線程池,創(chuàng)建CompletionService必須指定;
????private?final?Executor?executor;
????//主要用于創(chuàng)建待執(zhí)行task;
????private?final?AbstractExecutorService?aes;
????//存儲(chǔ)已完成狀態(tài)的task,默認(rèn)是基于鏈表結(jié)構(gòu)的阻塞隊(duì)列LinkedBlockingQueue。?????
????private?final?BlockingQueue>?completionQueue;
任務(wù)提交
ExecutorCompletionService任務(wù)的提交和執(zhí)行都是委托給Executor來完成。當(dāng)提交某個(gè)任務(wù)時(shí),該任務(wù)首先將被包裝為一個(gè)QueueingFuture
public?Future?submit(Callable?task) ? {
????????if?(task?==?null)?throw?new?NullPointerException();
????????RunnableFuture?f?=?newTaskFor(task);
????????executor.execute(new?QueueingFuture(f));
????????return?f;
????}
任務(wù)完成后何時(shí)進(jìn)入隊(duì)列
從源碼可以看出,QueueingFuture是FutureTask的子類,實(shí)現(xiàn)了done方法,在task執(zhí)行完成之后將當(dāng)前task添加到completionQueue,將返回結(jié)果加入到阻塞隊(duì)列中,加入的順序就是任務(wù)完成的先后順序。done方法的具體調(diào)用在FutureTask的finishCompletion方法。
獲取已完成任務(wù)
?public?Future?take()?throws?InterruptedException? {
????????return?completionQueue.take();
????}
????public?Future?poll()? {
????????return?completionQueue.poll();
????}
????public?Future?poll(long?timeout,?TimeUnit?unit)
????????????throws?InterruptedException? {
????????return?completionQueue.poll(timeout,?unit);
????}
take和poll都是調(diào)用BlockingQueue提供的方法。
take()?獲取任務(wù)阻塞,直到可以拿到任務(wù)為止。poll()?獲取任務(wù)不阻塞,如果沒有獲取到任務(wù)直接返回null。poll(long timeout, TimeUnit unit)?帶超時(shí)時(shí)間等待的獲取任務(wù)方法(一般推薦使用這種)
總結(jié)
CompletionService?把線程池?Executor?和阻塞隊(duì)列?BlockingQueue融合在一起,能夠讓批異步任務(wù)的管理更簡單,將生產(chǎn)者提交任務(wù)和消費(fèi)者獲取結(jié)果的解耦。CompletionService?能夠讓異步任務(wù)的執(zhí)行結(jié)果有序化,先執(zhí)行完的先進(jìn)入阻塞隊(duì)列,利用這個(gè)特性,我們可以輕松實(shí)現(xiàn)后續(xù)處理的有序性,避免無謂的等待。


