<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          面試官:談?wù)凧ava中的阻塞延遲隊列DelayQueue原理和用法

          共 4121字,需瀏覽 9分鐘

           ·

          2021-10-20 02:24

          你知道的越多,不知道的就越多,業(yè)余的像一棵小草!

          你來,我們一起精進(jìn)!你不來,我和你的競爭對手一起精進(jìn)!

          編輯:業(yè)余草

          blog.csdn.net/fuzhongmin05

          推薦:https://www.xttblog.com/?p=5281

          一到周六周日,我都非常忙,每天來回兩次開車 4 個小時到徐匯交大聽研究生課,剩余時間陪孩子上課。這回再麥吉學(xué)英語,我在外邊監(jiān)控大屏下編輯文章,雖然忙,但很充實!

          下面回到正文,我們今天聊一聊 Java 中的阻塞延遲隊列 DelayQueue。

          阻塞隊列

          阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,從隊列中獲取元素的消費者線程會一直等待直到隊列變?yōu)榉强铡.?dāng)隊列滿時,向隊列中放置元素的生產(chǎn)者線程會等待直到隊列可用。阻塞隊列常用于生產(chǎn)者和消費者的場景,生產(chǎn)者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。

          在阻塞隊列不可用時,這兩個附加操作提供了 4 種處理方式:

          阻塞隊列不可用
          • 拋出異常:當(dāng)隊列滿時,插入元素會拋出IllegalStateException;
          • 返回特殊值:offer()是入隊方法,當(dāng)插入成功時返回true,插入失敗返回false;poll()是出隊方法,當(dāng)出隊成功時返回元素的值,隊列為空時返回null
          • 一直阻塞:當(dāng)隊列滿時,阻塞執(zhí)行插入方法的線程;當(dāng)隊列空時,阻塞執(zhí)行出隊方法的線程
          • 超時退出:顧名思義

          下面是Java常見的阻塞隊列。

          • ArrayBlockingQueue :一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列
          • LinkedBlockingQueue :一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列
          • PriorityBlockingQueue :一個支持優(yōu)先級排序的無界阻塞隊列
          • DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列
          • SynchronousQueue:一個不存儲元素的阻塞隊列
          • LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列
          • LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列

          DelayQueue解析

          DelayQueue隊列中每個元素都有個過期時間,并且隊列是個優(yōu)先級隊列,當(dāng)從隊列獲取元素時候,只有過期元素才會出隊,DelayQueue的類結(jié)構(gòu)如下圖所示:

          DelayQueue隊列

          如圖DelayQueue中內(nèi)部使用的是PriorityQueue存放數(shù)據(jù),使用ReentrantLock實現(xiàn)線程同步,可知是阻塞隊列。另外隊列里面的元素要實現(xiàn)Delayed接口,一個是獲取當(dāng)前剩余時間的接口,一個是元素比較的接口,因為這個是有優(yōu)先級的隊列。

          DelayQueue 的主要成員

          public?class?DelayQueue<E?extends?Delayed>?extends?AbstractQueue<E>
          ????implements?BlockingQueue<E>?
          {
          ????//?持有內(nèi)部重入鎖。
          ????private?final?transient?ReentrantLock?lock?=?new?ReentrantLock();
          ????//?優(yōu)先級隊列,存放工作任務(wù)。
          ????private?final?PriorityQueue?q?=?new?PriorityQueue();
          ????private?Thread?leader?=?null;
          ????//?依賴于重入鎖的condition。
          ????private?final?Condition?available?=?lock.newCondition();
          }

          元素入隊列

          插入元素到隊列中主要三個方法,但實際上底層調(diào)用的都是offer(e)方法

          /**
          ?*?Inserts?the?specified?element?into?this?delay?queue.
          ?*
          ?*?@param?e?the?element?to?add
          ?*?@return?{@code?true}?(as?specified?by?{@link?Collection#add})
          ?*?@throws?NullPointerException?if?the?specified?element?is?null
          ?*/

          public?boolean?add(E?e)?{
          ????return?offer(e);
          }
          /**
          ?*?Inserts?the?specified?element?into?this?delay?queue.?As?the?queue?is
          ?*?unbounded?this?method?will?never?block.
          ?*
          ?*?@param?e?the?element?to?add
          ?*?@throws?NullPointerException?{@inheritDoc}
          ?*/

          public?void?put(E?e)?{
          ????offer(e);
          }

          /**
          ?*?Inserts?the?specified?element?into?this?delay?queue.
          ?*
          ?*?@param?e?the?element?to?add
          ?*?@return?{@code?true}
          ?*?@throws?NullPointerException?if?the?specified?element?is?null
          ?*/

          public?boolean?offer(E?e)?{
          ????final?ReentrantLock?lock?=?this.lock;
          ????//獲取到重入鎖
          ????lock.lock();
          ????try?{
          ????????q.offer(e);
          ????????//添加成功元素
          ????????if?(q.peek()?==?e)?{
          ????????????leader?=?null;
          ????????????//將等待隊列中的頭節(jié)點移動到同步隊列。
          ????????????available.signal();
          ????????}
          ????????return?true;
          ????}?finally?{
          ????????lock.unlock();
          ????}
          }

          首先獲取獨占鎖,然后添加元素到優(yōu)先級隊列,由于q是優(yōu)先級隊列,所以添加完元素后,peek()方法返回的并不一定是剛才添加的元素,如果判斷為true,說明當(dāng)前元素e的優(yōu)先級最小也就是即將過期的,這時候激活avaliable變量條件隊列里面的線程,通知它們隊列里面有元素了。

          從隊列中取元素

          有兩個方法可以取元素(都是取隊頭),poll()方法取隊頭當(dāng)隊頭元素沒過期時返回null,take()方法取隊頭當(dāng)隊頭元素沒過期時會一直等待。

          /**
          ?*?Retrieves?and?removes?the?head?of?this?queue,?or?returns?{@code?null}
          ?*?if?this?queue?has?no?elements?with?an?expired?delay.
          ?*
          ?*?@return?the?head?of?this?queue,?or?{@code?null}?if?this
          ?*?????????queue?has?no?elements?with?an?expired?delay
          ?*/

          public?E?poll()?{
          ????final?ReentrantLock?lock?=?this.lock;
          ????lock.lock();
          ????try?{
          ????????E?first?=?q.peek();
          ????????//如果隊列為空,或者不為空但是隊頭元素沒有過期則返回null
          ????????if?(first?==?null?||?first.getDelay(NANOSECONDS)?>?0)
          ????????????return?null;
          ????????else
          ????????????return?q.poll();
          ????}?finally?{
          ????????lock.unlock();
          ????}
          }

          /**
          ?*?Retrieves?and?removes?the?head?of?this?queue,?waiting?if?necessary
          ?*?until?an?element?with?an?expired?delay?is?available?on?this?queue.
          ?*
          ?*?@return?the?head?of?this?queue
          ?*?@throws?InterruptedException?{@inheritDoc}
          ?*/

          public?E?take()?throws?InterruptedException?{
          ????//?獲取鎖。每個延遲隊列內(nèi)聚了一個重入鎖。
          ????final?ReentrantLock?lock?=?this.lock;
          ????//?獲取可中斷的鎖。
          ????lock.lockInterruptibly();
          ????try?{
          ????????for?(;;)?{
          ????????????//?嘗試從優(yōu)先級隊列中獲取隊列頭部元素,獲取但不移除
          ????????????E?first?=?q.peek();
          ????????????if?(first?==?null)
          ????????????????//無元素,當(dāng)前線程節(jié)點加入等待隊列,并阻塞當(dāng)前線程
          ????????????????available.await();
          ????????????else?{
          ????????????????//?通過延遲任務(wù)的getDelay()方法獲取延遲時間
          ????????????????long?delay?=?first.getDelay(NANOSECONDS);
          ????????????????if?(delay?<=?0)
          ????????????????????//延遲時間到期,獲取并刪除頭部元素。
          ????????????????????return?q.poll();
          ????????????????first?=?null;?//?don't?retain?ref?while?waiting
          ????????????????if?(leader?!=?null)
          ????????????????????available.await();
          ????????????????else?{
          ????????????????????Thread?thisThread?=?Thread.currentThread();
          ????????????????????leader?=?thisThread;
          ????????????????????try?{
          ????????????????????????//?線程節(jié)點進(jìn)入等待隊列 x 納秒。
          ????????????????????????available.awaitNanos(delay);
          ????????????????????}?finally?{
          ????????????????????????if?(leader?==?thisThread)
          ????????????????????????????leader?=?null;
          ????????????????????}
          ????????????????}
          ????????????}
          ????????}
          ????}?finally?{
          ????????//?若還存在元素的話,則將等待隊列頭節(jié)點中的線程節(jié)點移動到同步隊列中。
          ????????if?(leader?==?null?&&?q.peek()?!=?null)
          ????????????available.signal();
          ????????lock.unlock();
          ????}
          }

          重點說一下take()方法,第一次調(diào)用take時候由于隊列空,所以把當(dāng)前線程放入available的條件隊列等待,當(dāng)執(zhí)行offer()成功并且添加的新元素恰好就是優(yōu)先級隊列的隊首時就會通知最先等待的線程激活,循環(huán)重新獲取隊首元素,這時候first假如不空,則調(diào)用getDelay()方法看該元素還剩下多少時間就過期了,如果delay<=0則說明已經(jīng)過期,則直接出隊返回。否則看leader是否為null,不為null則說明是其他線程也在執(zhí)行take()則把當(dāng)前線程放入條件隊列,否則就是只有當(dāng)前線程執(zhí)行的take()方法,則當(dāng)前線程await直到剩余過期時間到,這期間該線程會釋放鎖,所以其他線程可以offer()添加元素,也可以take()阻塞自己,剩余過期時間到后,當(dāng)前線程會重新競爭鎖,重新進(jìn)入循環(huán)。

          如果已經(jīng)具備了JUC包中的Lock接口以及AQS的相關(guān)知識,上述代碼大部分應(yīng)該都比較容易理解。DelayQueue將實現(xiàn)了Delayed接口的對象添加到優(yōu)先級隊列中,通過在重入鎖的Condition上調(diào)用 await() 方法,實現(xiàn)了延遲獲取阻塞隊列中元素的功能。

          「總結(jié)」

          1. DelayQueue是一個內(nèi)部依靠AQS隊列同步器所實現(xiàn)的無界延遲阻塞隊列。
          2. 延遲對象需要覆蓋 getDelay()與compareTo()方法,并且要注意 getDelay()的時間單位的統(tǒng)一,compareTo()根據(jù)業(yè)務(wù)邏輯進(jìn)行合理的比較邏輯重寫。
          3. DelayQueue中內(nèi)聚的重入鎖是非公平的。
          4. DelayQueue是實現(xiàn)定時任務(wù)的關(guān)鍵,ScheduledThreadPoolExecutor中就用到了DelayQueue。

          DelayQueue使用例子

          寫一個簡單的例子:


          public?class?DelayQueueTest?{

          ????public?static?final?int?SIZE?=?10;

          ????public?static?void?main(String[]?args)?{
          ?????DelayQueueTest?test?=?new?DelayQueueTest();
          ????????//初始化線程池
          ????????BlockingQueue?arrayBlockingQueue?=?new?ArrayBlockingQueue<>(10);
          ????????ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor
          ????????????(5,?10,?10,?TimeUnit.MILLISECONDS,
          ????????????????arrayBlockingQueue,?Executors.defaultThreadFactory(),
          ????????????????new?ThreadPoolExecutor.AbortPolicy());

          ????????DelayQueue?delayTaskQueue?=?new?DelayQueue<>();
          ????????//模擬SIZE個延遲任務(wù)
          ????????for?(byte?i?=?0;?i?????????????Long?runAt?=?System.currentTimeMillis()?+?1000?*?i;
          ????????????String?name?=?"Zhang_"?+?i;
          ????????????byte?age?=?(byte)(10?+?i);
          ????????????String?gender?=?(i?%?2?==?0???"male"?:?"female");
          ????????????Student?student?=?new?StudentBuilder(name,?age,?gender).height(150?+?i).province("ZheJiang").build();
          ????????????delayTaskQueue.put(new?DelayedTask(student,?1,?function?->?test.print(student),?runAt));
          ????????}

          ????????while?(true)?{
          ????????????if?(delayTaskQueue.size()?==?0)?{
          ????????????????break;
          ????????????}
          ????????????try?{
          ????????????????//從延遲隊列中取值,如果沒有對象過期則取到null
          ????????????????DelayedTask?delayedTask?=?delayTaskQueue.poll();
          ????????????????if?(delayedTask?!=?null)?{
          ????????????????????threadPool.execute(delayedTask);
          ????????????????}
          ????????????}?catch?(Exception?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????}
          ????????threadPool.shutdown();
          ????}


          ????public?String?print(Object?object)?{
          ??????System.out.println(Thread.currentThread().getName());
          ????????String?str?=?">>>junit?log>>>"?+?object.getClass().getSimpleName()?+?":"?+?object.toString();
          ????????System.out.println(str);
          ????????return?str;
          ????}

          ????private?static?class?DelayedTask<T>?implements?Delayed,?Runnable?{

          ????????/**
          ?????????*?任務(wù)參數(shù)
          ?????????*/

          ????????private?T?taskParam;

          ????????/**
          ?????????*?任務(wù)類型
          ?????????*/

          ????????private?Integer?type;

          ????????/**
          ?????????*?任務(wù)函數(shù)
          ?????????*/

          ????????private?Function?function;

          ????????/**
          ?????????*?任務(wù)執(zhí)行時刻
          ?????????*/

          ????????private?Long?runAt;

          ????????public?T?getTaskParam()?{
          ????????????return?taskParam;
          ????????}
          ????????public?Integer?getType()?{
          ????????????return?type;
          ????????}
          ????????public?Function?getFunction()?{
          ????????????return?function;
          ????????}
          ????????public?Long?getRunAt()?{
          ????????????return?runAt;
          ????????}
          ????????DelayedTask(T?taskParam,?Integer?type,?Function?function,?Long?runAt)?{
          ????????????this.taskParam?=?taskParam;
          ????????????this.type?=?type;
          ????????????this.function?=?function;
          ????????????this.runAt?=?runAt;
          ????????}
          ????????@Override
          ????????public?void?run()?{
          ????????????if?(taskParam?!=?null)?{
          ????????????????function.apply(taskParam);
          ????????????}
          ????????}
          ????????@Override
          ????????public?long?getDelay(TimeUnit?unit)?{
          ????????????return?unit.convert(this.runAt?-?System.currentTimeMillis(),?TimeUnit.MILLISECONDS);
          ????????}
          ????????@Override
          ????????public?int?compareTo(Delayed?o)?{
          ????????????DelayedTask?object?=?(DelayedTask)o;
          ????????????return?this.runAt.compareTo(object.getRunAt());
          ????????}
          ????}
          }

          運行結(jié)果如下,由于10個元素的延遲時間均相差1秒,可以看到逐步打印的效果。

          DelayQueue教程

          DelayQueue典型場景是重試機制實現(xiàn),比如當(dāng)調(diào)用接口失敗后,把當(dāng)前調(diào)用信息放入delay=10s的元素,然后把元素放入隊列,那么這個隊列就是一個重試隊列,一個線程通過take()方法獲取需要重試的接口,take()返回則接口進(jìn)行重試,失敗則再次放入隊列,同時也可以在元素加上重試次數(shù)。

          瀏覽 39
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  PP特极毛片 | 久热99| 中文字幕精品无码网站人口 | 久久久久久天堂 | 五月婷婷中文 |