<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          還在用Future輪詢獲取結(jié)果?CompletionService快了解下

          共 5101字,需瀏覽 11分鐘

           ·

          2021-02-22 12:20


          背景

          領(lǐng)導(dǎo)一直不給他安排其他開發(fā)任務(wù),就一直讓他看看代碼熟悉業(yè)務(wù)。二胖每天上班除了偶爾跟坐在隔壁的前端小姐姐聊聊天,就是看看這些枯燥無味的業(yè)務(wù)代碼,無聊的一匹。雖然二胖已是久經(jīng)職場(chǎng)的老油條了,但是看到同事們的周報(bào)都寫的滿滿的,而自己的周報(bào),就一兩行,熟悉了什么功能。心里還是慌得一匹,畢竟公司不養(yǎng)閑人啊。于是乎二胖終于鼓起勇氣為了向領(lǐng)導(dǎo)表明自己的上進(jìn)心,主動(dòng)向領(lǐng)導(dǎo)要開發(fā)任務(wù)。領(lǐng)導(dǎo)一看這小伙子這么有上進(jìn)心,于是就到任務(wù)看板里面挑了一個(gè)業(yè)務(wù)邏輯比較簡單的任務(wù)分配給了二胖。二胖拿到這個(gè)任務(wù)屁顛屁顛的回到座位。任務(wù)比較簡單,就是通過爬蟲去爬取某些賣機(jī)票(某豬、某攜、某團(tuán)等)的網(wǎng)站的一些機(jī)票,然后保存到數(shù)據(jù)庫。


          同步入庫

          二胖拿到任務(wù),三下五除二就把任務(wù)完成了。

          ?public?static?void?main(String[]?args)?throws?InterruptedException?{
          ????????String?mouZhuFlightPrice?=?getMouZhuFlightPrice();
          ????????String?mouXieFlightPrice?=?getMouXieFlightPrice();
          ????????String?mouTuanFlightPrice?=?getMouTuanFlightPrice();
          ????????saveDb(mouZhuFlightPrice);
          ????????saveDb(mouXieFlightPrice);
          ????????saveDb(mouTuanFlightPrice);
          ????}


          ????/**
          ?????*?模擬請(qǐng)求某豬網(wǎng)站?爬取機(jī)票信息
          ?????*
          ?????*
          ?????*?@return
          ?????*?@throws?InterruptedException
          ?????*/

          ????public?static?String?getMouZhuFlightPrice()?throws?InterruptedException?{
          ????????//?模擬請(qǐng)求某豬網(wǎng)站?爬取機(jī)票信息
          ????????Thread.sleep(10000);
          ????????return?"獲取到某豬網(wǎng)站的機(jī)票信息了";
          ????}

          ????/**
          ?????*?模擬請(qǐng)求某攜網(wǎng)站?爬取機(jī)票信息
          ?????*
          ?????*?@return
          ?????*?@throws?InterruptedException
          ?????*/

          ????public?static?String?getMouXieFlightPrice()?throws?InterruptedException?{
          ????????//?模擬請(qǐng)求某攜網(wǎng)站?爬取機(jī)票信息
          ????????Thread.sleep(5000);
          ????????return?"獲取到某攜網(wǎng)站的機(jī)票信息了";
          ????}


          ????/**
          ?????*?模擬請(qǐng)求團(tuán)網(wǎng)站?爬取機(jī)票信息
          ?????*
          ?????*?@return
          ?????*?@throws?InterruptedException
          ?????*/

          ????public?static?String?getMouTuanFlightPrice()?throws?InterruptedException?{
          ????????//?模擬請(qǐng)求某團(tuán)網(wǎng)站?爬取機(jī)票信息
          ????????Thread.sleep(3000);
          ????????return?"獲取到某團(tuán)網(wǎng)站的機(jī)票信息了";
          ????}

          ????/**
          ?????*?保存DB
          ?????*
          ?????*?@param?flightPriceList
          ?????*/

          ????public?static?void?saveDb(String?flightPriceList)?{
          ????????????//?解析字符串?進(jìn)行異步入庫
          ????}

          這次二胖學(xué)乖了,任務(wù)完成了先去找下坐他對(duì)面的技術(shù)大拿(看他那發(fā)際線就知道了)同事“二狗”讓二狗大拿幫忙指點(diǎn)一二,看看代碼是否還能有優(yōu)化的地方。畢竟領(lǐng)導(dǎo)對(duì)代碼的性能、以及代碼的優(yōu)雅是有要求的。領(lǐng)導(dǎo)多次在部門的周會(huì)上提到讓我們多看看“二狗”寫的代碼,學(xué)習(xí)下人家寫代碼的優(yōu)雅、抽象、封裝等等。二狗大概的瞄了下二胖寫的代碼,提出了個(gè)小小的建議“這個(gè)代碼可以采用多線程來優(yōu)化下哦,你看某豬這個(gè)網(wǎng)站耗時(shí)是拿到結(jié)果需要10s,其他的耗時(shí)都比它短,先有結(jié)果的我們可以先處理的,不需要等到大家都返回了再來處理的”。

          輪循futureList獲取結(jié)果

          幸好二胖對(duì)多線程了解一點(diǎn)點(diǎn),于是乎采用future的方式來實(shí)現(xiàn)。二胖使用一個(gè)List來保存每個(gè)任務(wù)返回的Future,然后去輪詢這些Future,直到每個(gè)Future都已完成。由于需要先完成的任務(wù)需要先執(zhí)行,且不希望出現(xiàn)因?yàn)榕旁谇懊娴娜蝿?wù)阻塞導(dǎo)致后面先完成的任務(wù)的結(jié)果沒有及時(shí)獲取的情況,所以在調(diào)用get方式時(shí),需要將超時(shí)時(shí)間設(shè)置為0。

          ??public?static?void?main(String[]?args)?{
          ????????int?taskSize?=?3;
          ????????Future?mouZhuFlightPriceFuture?=?executor.submit(()?->?getMouZhuFlightPrice());
          ????????Future?mouXieFlightPriceFuture?=?executor.submit(()?->?getMouXieFlightPrice());
          ????????Future?mouTuanFlightPriceFuture?=?executor.submit(()?->?getMouTuanFlightPrice());
          ????????List>?futureList?=?new?ArrayList<>();
          ????????futureList.add(mouZhuFlightPriceFuture);
          ????????futureList.add(mouXieFlightPriceFuture);
          ????????futureList.add(mouTuanFlightPriceFuture);
          ????????//?輪詢,獲取完成任務(wù)的返回結(jié)果
          ????????while?(taskSize?>?0)?{
          ????????????for?(Future?future?:?futureList)?{
          ????????????????String?result?=?null;
          ????????????????try?{
          ????????????????????result?=?future.get(0,?TimeUnit.SECONDS);
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????taskSize--;
          ????????????????????e.printStackTrace();
          ????????????????}?catch?(ExecutionException?e)?{
          ????????????????????taskSize--;
          ????????????????????e.printStackTrace();
          ????????????????}?catch?(TimeoutException?e)?{
          ????????????????????//?超時(shí)異常需要忽略,因?yàn)槲覀冊(cè)O(shè)置了等待時(shí)間為0,只要任務(wù)沒有完成,就會(huì)報(bào)該異常
          ????????????????}
          ????????????????//?任務(wù)已經(jīng)完成
          ????????????????if?(result?!=?null)?{
          ????????????????????System.out.println("result="?+?result);
          ????????????????????//?從future列表中刪除已經(jīng)完成的任務(wù)
          ????????????????????futureList.remove(future);
          ????????????????????taskSize--;
          ????????????????????//?此處必須break,否則會(huì)拋出并發(fā)修改異常。(也可以通過將futureList聲明為CopyOnWriteArrayList類型解決)
          ????????????????????break;?//?進(jìn)行下一次while循環(huán)
          ????????????????}
          ????????????}
          ????????}
          ????}

          上述代碼有兩個(gè)小細(xì)節(jié)需要注意下:

          • 如采用ArrayList的話futureList刪除之后需要break進(jìn)行下一次while循環(huán),否則會(huì)產(chǎn)生我們意想不到的ConcurrentModificationException異常。具體原因可看下《ArrayList的刪除姿勢(shì)你都掌握了嗎》這個(gè)文章,里面有詳細(xì)的介紹。

          • 在捕獲了InterruptedExceptionExecutionException異常后記得?taskSize--否則就會(huì)發(fā)生死循環(huán)。如果生產(chǎn)發(fā)生了死循環(huán)你懂的,cpu被你打滿,程序假死等。你離被開除也不遠(yuǎn)了。

          • 上面輪詢future列表非常的復(fù)雜,而且還有很多異常需要處理,還有很多細(xì)節(jié)需要考慮,還有被開除的風(fēng)險(xiǎn)。所以這種方案也被pass了。

          自定義BlockingQueue實(shí)現(xiàn)

          • 上述方案被pass之后,二胖就在思考可以借用哪種數(shù)據(jù)來實(shí)現(xiàn)下先進(jìn)先出的功能,貌似隊(duì)列可以實(shí)現(xiàn)下這個(gè)功能。所以二胖又寫了一版采用隊(duì)列來實(shí)現(xiàn)的功能。
          ??final?static?ExecutorService?executor?=?new?ThreadPoolExecutor(6,?6,
          ????????????0L,?TimeUnit.MILLISECONDS,?new?LinkedBlockingQueue<>());

          ????public?static?void?main(String[]?args)?throws?InterruptedException,?ExecutionException?{
          ????????Future?mouZhuFlightPriceFuture?=?executor.submit(()?->?getMouZhuFlightPrice());
          ????????Future?mouXieFlightPriceFuture?=?executor.submit(()?->?getMouXieFlightPrice());
          ????????Future?mouTuanFlightPriceFuture?=?executor.submit(()?->?getMouTuanFlightPrice());

          ????????//?創(chuàng)建阻塞隊(duì)列
          ????????BlockingQueue?blockingQueue?=?new?LinkedBlockingQueue<>(3);
          ????????executor.execute(()?->?run(mouZhuFlightPriceFuture,?blockingQueue));
          ????????executor.execute(()?->?run(mouXieFlightPriceFuture,?blockingQueue));
          ????????executor.execute(()?->?run(mouTuanFlightPriceFuture,?blockingQueue));
          ????????//?異步保存所有機(jī)票價(jià)格
          ????????for?(int?i?=?0;?i?3;?i++)?{
          ????????????String?result?=?blockingQueue.take();
          ????????????System.out.println(result);
          ????????????saveDb(result);
          ????????}
          ????}

          ????private?static?void?run(Future?flightPriceFuture,?BlockingQueue?blockingQueue)?{
          ????????try?{
          ????????????blockingQueue.put(flightPriceFuture.get());
          ????????}?catch?(InterruptedException?e)?{
          ????????????e.printStackTrace();
          ????????}?catch?(ExecutionException?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????}
          • 這次比上個(gè)版本好多了,代碼也簡潔多了。不過按理說這種需求應(yīng)該是大家經(jīng)常遇到的,應(yīng)該不需要自己來實(shí)現(xiàn)把,JAVA這么貼心的語言應(yīng)該會(huì)有api可以直接拿來用吧。

          CompletionService實(shí)現(xiàn)

          • 二胖現(xiàn)在畢竟也是對(duì)代碼的簡潔性有追求的人了。于是乎二胖去翻翻自己躺在書柜里吃灰的并發(fā)相關(guān)的書籍,看看是否有解決方案。終于皇天不負(fù)有心人在二胖快要放棄的時(shí)候突然發(fā)現(xiàn)了新大陸。?《Java并發(fā)編程實(shí)戰(zhàn)》一書6.3.5節(jié)CompletionService:ExecutorBlockingQueue,有這樣一段話:

          如果向Executor提交了一組計(jì)算任務(wù),并且希望在計(jì)算完成后獲得結(jié)果,那么可以保留與每個(gè)任務(wù)關(guān)聯(lián)的Future,然后反復(fù)使用get方法,同時(shí)將參數(shù)timeout指定為0,從而通過輪詢來判斷任務(wù)是否完成。這種方法雖然可行,但卻有些繁瑣。幸運(yùn)的是,還有一種更好的方法:完成服務(wù)CompletionService。

          ??final?static?ExecutorService?executor?=?new?ThreadPoolExecutor(6,?6,
          ????????????0L,?TimeUnit.MILLISECONDS,?new?LinkedBlockingQueue<>());

          ????public?static?void?main(String[]?args)?throws?ExecutionException,?InterruptedException?{
          ????????CompletionService?completionService?=?new?ExecutorCompletionService(executor);
          ????????completionService.submit(()?->?getMouZhuFlightPrice());
          ????????completionService.submit(()?->?getMouXieFlightPrice());
          ????????completionService.submit(()?->?getMouTuanFlightPrice());
          ????????for?(int?i?=?0;?i?3;?i++)?{
          ????????????String?result?=?(String)completionService.take().get();
          ????????????System.out.println(result);
          ????????????saveDb(result);
          ????????}
          ????}

          當(dāng)我們使用了CompletionService不用遍歷future列表,也不需要去自定義隊(duì)列了,代碼變得簡潔了。下面我們就來分析下CompletionService實(shí)現(xiàn)的原理吧。

          CompletionService 介紹

          • 我們可以先看下JDK源碼中CompletionServicejavadoc說明吧
          /**
          ?*?A?service?that?decouples?the?production?of?new?asynchronous?tasks
          ?*?from?the?consumption?of?the?results?of?completed?tasks.??Producers
          ?*?{@code?submit}?tasks?for?execution.?Consumers?{@code?take}
          ?*?completed?tasks?and?process?their?results?in?the?order?they
          ?*?complete.

          大概意思是CompletionService實(shí)現(xiàn)了生產(chǎn)者提交任務(wù)和消費(fèi)者獲取結(jié)果的解耦,生產(chǎn)者和消費(fèi)者都不用關(guān)心任務(wù)的完成順序,由CompletionService來保證,消費(fèi)者一定是按照任務(wù)完成的先后順序來獲取執(zhí)行結(jié)果。

          成員變量

          既然需要按照任務(wù)的完成順序獲取結(jié)果,那內(nèi)部應(yīng)該也是通過隊(duì)列來實(shí)現(xiàn)的吧。打開源碼我們可以看到,里面有三個(gè)成員變量

          public?class?ExecutorCompletionService<V>?implements?CompletionService<V>?{
          ?//?執(zhí)行task的線程池,創(chuàng)建CompletionService必須指定;
          ????private?final?Executor?executor;
          ????//主要用于創(chuàng)建待執(zhí)行task;
          ????private?final?AbstractExecutorService?aes;
          ????//存儲(chǔ)已完成狀態(tài)的task,默認(rèn)是基于鏈表結(jié)構(gòu)的阻塞隊(duì)列LinkedBlockingQueue。?????
          ????private?final?BlockingQueue>?completionQueue;

          任務(wù)提交

          ExecutorCompletionService任務(wù)的提交和執(zhí)行都是委托給Executor來完成。當(dāng)提交某個(gè)任務(wù)時(shí),該任務(wù)首先將被包裝為一個(gè)QueueingFuture

          public?Future?submit(Callable?task)?{
          ????????if?(task?==?null)?throw?new?NullPointerException();
          ????????RunnableFuture?f?=?newTaskFor(task);
          ????????executor.execute(new?QueueingFuture(f));
          ????????return?f;
          ????}

          任務(wù)完成后何時(shí)進(jìn)入隊(duì)列

          從源碼可以看出,QueueingFutureFutureTask的子類,實(shí)現(xiàn)了done方法,在task執(zhí)行完成之后將當(dāng)前task添加到completionQueue,將返回結(jié)果加入到阻塞隊(duì)列中,加入的順序就是任務(wù)完成的先后順序。done方法的具體調(diào)用在FutureTaskfinishCompletion方法。

          獲取已完成任務(wù)

          ?public?Future?take()?throws?InterruptedException?{
          ????????return?completionQueue.take();
          ????}

          ????public?Future?poll()?{
          ????????return?completionQueue.poll();
          ????}

          ????public?Future?poll(long?timeout,?TimeUnit?unit)
          ????????????throws?InterruptedException?
          {
          ????????return?completionQueue.poll(timeout,?unit);
          ????}

          takepoll都是調(diào)用BlockingQueue提供的方法。

          • take()?獲取任務(wù)阻塞,直到可以拿到任務(wù)為止。
          • poll()?獲取任務(wù)不阻塞,如果沒有獲取到任務(wù)直接返回null。
          • poll(long timeout, TimeUnit unit)?帶超時(shí)時(shí)間等待的獲取任務(wù)方法(一般推薦使用這種

          總結(jié)

          • CompletionService?把線程池?Executor?和阻塞隊(duì)列?BlockingQueue融合在一起,能夠讓批異步任務(wù)的管理更簡單,將生產(chǎn)者提交任務(wù)和消費(fèi)者獲取結(jié)果的解耦。
            CompletionService?能夠讓異步任務(wù)的執(zhí)行結(jié)果有序化,先執(zhí)行完的先進(jìn)入阻塞隊(duì)列,利用這個(gè)特性,我們可以輕松實(shí)現(xiàn)后續(xù)處理的有序性,避免無謂的等待。



          點(diǎn)個(gè)在看,贊??支持我吧
          瀏覽 32
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  啪啪网址 | 老阿姨的丝丝波涛胸涌诱惑 | 久草综合在线 | 插逼国产视频 | 九一国产视频 |