<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          多線程編程-分析阻塞隊列的源碼實現(xiàn)

          共 2921字,需瀏覽 6分鐘

           ·

          2020-06-29 23:50

          ??看過我上一篇文章的應(yīng)該知道(家里條件允許的可以先看看上一篇文章),如果想實現(xiàn)一個生產(chǎn)者消費者模型,我們可以基于JVM自帶的synchronized+wait+notify實現(xiàn),也可以用JDK里面的ReentrantLock+Condition實現(xiàn)!不過從上篇文章的demo看,實現(xiàn)起來也不是那么容易!因為我們既要關(guān)心什么時候需要阻塞線程,又要需要關(guān)心何時喚醒線程。控制的細(xì)節(jié)太多,一個疏忽可能就導(dǎo)致了一個不易發(fā)現(xiàn)的bug,比如上篇文章中的虛假喚醒的例子!那有沒有一種我們不用關(guān)心那么多復(fù)雜細(xì)節(jié)就能實現(xiàn)生產(chǎn)者消費者模式的方法呢?本文要講的阻塞隊列就是一種很好的實現(xiàn)!

          ??在我們剛開始學(xué)數(shù)據(jù)結(jié)構(gòu)的時候,都接觸過一種先進(jìn)先出(first in first out,簡稱“FIFO”)的數(shù)據(jù)結(jié)構(gòu),叫隊列。阻塞隊列從名字看也是隊列的一種,因此滿足隊列的特性,然后這個隊列是可阻塞的!這個阻塞怎么理解呢?就是當(dāng)我們一個線程往阻塞隊列里面添加元素的時候,如果隊列滿了,那這個線程不會直接返回,而是會被阻塞,直到元素添加成功!當(dāng)我們一個線程從阻塞隊列里面獲取元素的時候,如果隊列是空的,那這個線程不會直接返回,而是會被阻塞直到元素獲取成功。而阻塞以及喚醒的操作都由阻塞隊列來管理!

          常用阻塞隊列類圖

          ??我們先看在java中阻塞隊列基本的繼承關(guān)系圖:

          9b6e3f84506bf38b701cd24e10d109cc.webp1583070100191

          ??完整的繼承關(guān)系要比這張圖復(fù)雜一些,但為了清晰起見圖中我只畫了主要的類和關(guān)系。隊列的基接口Queue與我們開發(fā)中經(jīng)常用到的List、Set是兄弟關(guān)系,因此我這里也列出來了方便對比記憶!阻塞隊列的基接口是繼承自Queue接口的BlockingQueue接口,其他阻塞隊列具體實現(xiàn)都繼承BlockingQueue接口!

          BlockingQueue常用方法

          ??我們先看隊列基接口Queue中的方法

          0f001ba5275bd0217c74a3848c0cec2f.webp1583071182495

          ??這個接口一共6個方法,我們可以分為兩組
          ??1、“異常”組

          1、add(e):將元素放到隊列末尾,成功返回true,失敗則拋異常。
          2、remove():獲取并移除隊首元素,獲取失敗則拋異常。
          3、element():獲取隊首元素,不移除,獲取失敗則拋異常。

          ??2、“特殊值”組

          1、offer(e):將元素放到隊列末尾,成功返回true,失敗返回false。
          2、poll():獲取并返回隊首元素,獲取失敗則返回null。
          3、peek():獲取隊首元素,不移除,獲取失敗則返回null。

          ??“異常”組的3個方法在操作失敗的時候會拋異常,因此叫“異常”組!

          ??“特殊值”組3個方法與“異常”組的3個方法是一一對應(yīng)的,功能都一樣,只是在操作失敗的時候不會拋異常而是返回一個特殊值,因此叫“特殊值組”。

          ??這兩組方法都是在Queue接口中定義的,因此跟阻塞就沒有什么關(guān)系了。那我們再看看BlockingQueue接口中的方法

          e4b7bdc9acf41c91277f817c32c66f5c.webp1583071557087

          ??這個接口我們重點關(guān)注標(biāo)記出來的4個方法,這幾個方法我們也可以分為兩組
          ??3、“阻塞”組

          1、put(e):將元素放到隊列末尾,如果隊列滿了,則等待。
          2、take():獲取并移除隊首元素,如果隊列為空,則等待。

          ??4、“超時”組

          1、offer(e,time,unit):將元素放到隊列末尾,如果隊列滿了,則等待,當(dāng)?shù)却^指定時間后仍添加元素失敗,則返回false,否則返回true。
          2、poll(time,unit):獲取并返回隊首元素,如果隊列為空,則等待,當(dāng)?shù)却^指定時間后仍獲取失敗則返回null,否則返回獲取到的元素。

          ??這兩組方法都是在BlockingQueue接口中定義的,因此都是跟阻塞相關(guān)的!

          ??“阻塞”組2個方法在操作不成功的時候會一直阻塞線程,直到能夠操作成功,因此叫“阻塞”組!用一個成語形容就是“不見不散”!

          ??“超時”組2個方法與“超時”組的2個方法是一一對應(yīng)的,功能都一樣,只是這2個方法不會一直阻塞,超過了指定的時間還沒成功就停止阻塞并返回,因此叫“超時”組!用一個成語形容就是“過時不候”!

          ??這四組方法合在一起就有了下面的一張表格:

          方法功能異常組特殊值組阻塞組超時組
          元素入隊add(e)offer(e)put(e)offer(e,time,unit)
          元素出隊remove()pool()take()poll(time,unit)
          檢查元素element()peek()

          源碼分析常用阻塞隊列

          ??BlockingQueue的實現(xiàn)類有多個,但是如果每一個源碼都進(jìn)行分析那不僅很影響篇幅且沒必要,因此我這里拿三個常用的阻塞隊列源碼進(jìn)行分析!在源碼中jdk的版本為1.8!

          ArrayBlockingQueue

          ??我們先看下ArrayBlockingQueue中的幾個屬性

          ????/**?The?queued?items?使用數(shù)組存儲元素?*/
          ????final?Object[]?items;

          ????/** items index for next take, poll, peek or remove 下一個出隊元素索引?*/
          ????int?takeIndex;

          ????/**?items?index?for?next?put,?offer,?or?add?下一個入隊元素索引?*/
          ????int?putIndex;

          ????/**?Number?of?elements?in?the?queue?隊列元素個數(shù)?*/
          ????int?count;

          ????/*
          ?????*?ReentrantLock+Condition控制并發(fā)
          ?????*?Concurrency?control?uses?the?classic?two-condition?algorithm
          ?????*?found?in?any?textbook.
          ?????*/


          ????/**?Main?lock?guarding?all?access?*/
          ????final?ReentrantLock?lock;

          ????/**?Condition?for?waiting?takes?*/
          ????private?final?Condition?notEmpty;

          ????/**?Condition?for?waiting?puts?*/
          ????private?final?Condition?notFull;

          1.object類型數(shù)組,也意味著ArrayBlockingQueue底層數(shù)據(jù)結(jié)構(gòu)是數(shù)組。
          2.ReentrantLock+Condition,如果看過我上一篇文章的應(yīng)該很熟悉,這是用作來線程同步和線程通信的。

          ??我們再看下ArrayBlockingQueue的構(gòu)造函數(shù)。

          ????public?ArrayBlockingQueue(int?capacity)?{
          ????????this(capacity,?false);
          ????}
          ????public?ArrayBlockingQueue(int?capacity,?boolean?fair)?{
          ????????if?(capacity?<=?0)
          ????????????throw?new?IllegalArgumentException();
          ????????this.items?=?new?Object[capacity];
          ????????lock?=?new?ReentrantLock(fair);
          ????????notEmpty?=?lock.newCondition();
          ????????notFull?=??lock.newCondition();
          ????}
          ????public?ArrayBlockingQueue(int?capacity,?boolean?fair,
          ??????????????????????????????Collection?c)
          {
          ????????this(capacity,?fair);
          ????????//初始化一個集合到隊列
          ????????....
          ????}

          ??這三個構(gòu)造函數(shù)都必須傳入一個int類型的capacity參數(shù),這個參數(shù)也意味著ArrayBlockingQueue是一個有界的阻塞隊列!

          ??我們前面說過隊列有常用的四組方法,而跟阻塞相關(guān)的是“阻塞”組和“超時”組的四個方法!我們以“阻塞”組的put()和take()方法為例,來窺探一下源碼里面的奧秘:

          ????/**
          ?????*?Inserts?the?specified?element?at?the?tail?of?this?queue,?waiting
          ?????*?for?space?to?become?available?if?the?queue?is?full.
          ?????*/

          ????public?void?put(E?e)?throws?InterruptedException?{
          ????????checkNotNull(e);
          ????????//加鎖操作
          ????????final?ReentrantLock?lock?=?this.lock;
          ????????lock.lockInterruptibly();
          ????????try?{
          ????????????//判斷隊列是否滿足入隊條件,如果隊列已滿,則阻塞等待一個“不滿”的信號
          ????????????while?(count?==?items.length)
          ????????????????notFull.await();

          ????????????//滿足條件,則進(jìn)行入隊操作
          ????????????enqueue(e);
          ????????}?finally?{
          ????????????lock.unlock();
          ????????}
          ????}

          ????private?void?enqueue(E?x)?{

          ????????final?Object[]?items?=?this.items;
          ????????items[putIndex]?=?x;

          ????????//?下一個入隊元素索引超過了數(shù)組的長度,則又從0開始。
          ????????if?(++putIndex?==?items.length)
          ????????????putIndex?=?0;

          ????????count++;

          ????????//放入元素后,釋放一個“不空”的信號。喚醒等待中的出隊線程。
          ????????notEmpty.signal();
          ????}
          ????public?E?take()?throws?InterruptedException?{
          ????????//加鎖操作
          ????????final?ReentrantLock?lock?=?this.lock;
          ????????lock.lockInterruptibly();
          ????????try?{
          ????????????//判斷隊列是否滿足出隊條件,如果隊列為空,則阻塞等待一個“不空”的信號
          ????????????while?(count?==?0)
          ????????????????notEmpty.await();

          ????????????//滿足條件,則進(jìn)行出隊操作
          ????????????return?dequeue();
          ????????}?finally?{
          ????????????lock.unlock();
          ????????}
          ????}

          ????private?E?dequeue()?{

          ????????final?Object[]?items?=?this.items;
          ????????E?x?=?(E)?items[takeIndex];
          ????????items[takeIndex]?=?null;//help?GC

          ????????//?下一個出隊元素索引超過了數(shù)組的長度,則又從0開始。
          ????????if?(++takeIndex?==?items.length)
          ????????????takeIndex?=?0;

          ????????count--;
          ????????if?(itrs?!=?null)
          ????????????itrs.elementDequeued();//更新迭代器元素數(shù)據(jù)

          ????????//取出元素后,釋放一個“不滿”的信號。喚醒等待中的入隊線程。
          ????????notFull.signal();
          ????????return?x;
          ????}

          ??ArrayBlockingQueue的入隊出隊代碼還是很簡單的,當(dāng)我們往一個阻塞隊列里面添加數(shù)據(jù)的時候,阻塞隊列用一個固定長度的數(shù)據(jù)存儲數(shù)據(jù),如果數(shù)組的長度達(dá)到了最大容量,則添加數(shù)據(jù)的線程會被阻塞。當(dāng)我們從阻塞隊列獲取數(shù)據(jù)的時候,如果隊列為空,則獲取數(shù)據(jù)的線程會被阻塞!相信代碼上的注釋已經(jīng)足夠理解這塊的代碼邏輯了!

          LinkedBlockingQueue

          ??我們先看下LinkedBlockingQueue中的幾個屬性

          /**?The?capacity?bound,?or?Integer.MAX_VALUE?if?none?隊列容量?*/
          private?final?int?capacity;

          /**?Current?number?of?elements?隊列元素個數(shù)?*/
          private?final?AtomicInteger?count?=?new?AtomicInteger();

          /**
          ?*?隊列頭
          ?*?Head?of?linked?list.
          ?*?Invariant:?head.item?==?null
          ?*/

          transient?Node?head;

          /**
          ?*?隊列尾
          ?*?Tail?of?linked?list.
          ?*?Invariant:?last.next?==?null
          ?*/

          private?transient?Node?last;

          /**?Lock?held?by?take,?poll,?etc?出隊操作用到的鎖?*/
          private?final?ReentrantLock?takeLock?=?new?ReentrantLock();

          /**?Wait?queue?for?waiting?takes?*/
          private?final?Condition?notEmpty?=?takeLock.newCondition();

          /**?Lock?held?by?put,?offer,?etc?入隊操作用到的鎖?*/
          private?final?ReentrantLock?putLock?=?new?ReentrantLock();

          /**?Wait?queue?for?waiting?puts?*/
          private?final?Condition?notFull?=?putLock.newCondition();

          1.Node類型的變量head和last,這是鏈表常見操作,也意味著LinkedBlockingQueue底層數(shù)據(jù)結(jié)構(gòu)是鏈表。
          2.與ArrayBlockingQueue不同的是,這里有兩個ReentrantLock對象,put操作個take操作的鎖對象是分開的,這樣做也是為了提高容器的并發(fā)能力。

          ??再看下Node這個內(nèi)部類

          ????/**
          ?????*?Linked?list?node?class
          ?????*/

          ????static?class?Node<E>?{
          ????????E?item;

          ????????//指向下一個節(jié)點
          ????????Node?next;

          ????????Node(E?x)?{?item?=?x;?}
          ????}

          ??只有next屬性意味著這是一個單向鏈表!

          ??再看下LinkedBlockingQueue的構(gòu)造函數(shù)

          ????public?LinkedBlockingQueue()?{
          ????????this(Integer.MAX_VALUE);
          ????}
          ????public?LinkedBlockingQueue(int?capacity)?{
          ????????if?(capacity?<=?0)?throw?new?IllegalArgumentException();
          ????????this.capacity?=?capacity;
          ????????last?=?head?=?new?Node(null);
          ????}
          ????public?LinkedBlockingQueue(Collection?c)?{
          ????????this(Integer.MAX_VALUE);
          ????????...
          ????????}

          1.當(dāng)構(gòu)造函數(shù)不傳capacity參數(shù)的時候,LinkedBlockingQueue就是一個無界阻塞隊列(其實也并非無界,不傳默認(rèn)值就是Integer.MAX_VALUE)。
          2.當(dāng)構(gòu)造函數(shù)傳入capacity參數(shù)的時候,LinkedBlockingQueue就是一個有界阻塞隊列。

          ??我們依然看看在LinkedBlockingQueue中“阻塞”組的兩個方法put()和take()分別怎么實現(xiàn)的

          /**
          ?*?Inserts?the?specified?element?at?the?tail?of?this?queue,?waiting?if
          ?*?necessary?for?space?to?become?available.
          ?*/

          public?void?put(E?e)?throws?InterruptedException?{
          ????if?(e?==?null)?throw?new?NullPointerException();

          ????//存儲隊列元素數(shù)量
          ????int?c?=?-1;

          ????//創(chuàng)建新節(jié)點
          ????Node?node?=?new?Node(e);

          ????//獲取putLock
          ????final?ReentrantLock?putLock?=?this.putLock;

          ????//隊列元素數(shù)量
          ????final?AtomicInteger?count?=?this.count;
          ????putLock.lockInterruptibly();
          ????try?{
          ????????//判斷隊列是否滿足入隊條件,如果隊列已滿,則阻塞等待一個“不滿”的信號
          ????????while?(count.get()?==?capacity)?{
          ????????????notFull.await();
          ????????}

          ????????//入隊操作
          ????????enqueue(node);

          ????????//隊列元素數(shù)量+1,執(zhí)行完下面這句后,count是入隊后的元素數(shù)量,而c的值還是入隊前的元素數(shù)量。
          ????????c?=?count.getAndIncrement();

          ????????//當(dāng)前入隊操作成功后,如果元素數(shù)量還小于隊列容量,則釋放一個“不滿”的信號
          ????????if?(c?+?1?????????????notFull.signal();

          ????}?finally?{
          ????????putLock.unlock();
          ????}

          ????//這里的c前面說了是元素入隊前的數(shù)量,如果入隊前元素數(shù)量為0(隊列是空的),那可能會有出隊線程在等待一個“不空”的信號,所以這里釋放一個“不空”的信號。
          ????if?(c?==?0)
          ????????signalNotEmpty();
          }

          private?void?signalNotEmpty()?{
          ????final?ReentrantLock?takeLock?=?this.takeLock;
          ????takeLock.lock();
          ????try?{
          ????????notEmpty.signal();
          ????}?finally?{
          ????????takeLock.unlock();
          ????}
          }

          public?E?take()?throws?InterruptedException?{

          ????//出隊元素
          ????E?x;

          ????//存儲隊列元素數(shù)量
          ????int?c?=?-1;

          ????//隊列元素數(shù)量
          ????final?AtomicInteger?count?=?this.count;

          ????//獲取takeLock
          ????final?ReentrantLock?takeLock?=?this.takeLock;
          ????takeLock.lockInterruptibly();

          ????try?{
          ????????//判斷隊列是否滿足出隊條件,如果隊列為空,則阻塞等待一個“不空”的信號
          ????????while?(count.get()?==?0)?{
          ????????????notEmpty.await();
          ????????}

          ????????//出隊操作
          ????????x?=?dequeue();

          ????????//隊列元素數(shù)量-1,執(zhí)行完下面這句后,count是出隊后的元素數(shù)量,而c的值還是出隊前的元素數(shù)量。
          ????????c?=?count.getAndDecrement();

          ????????//當(dāng)前出隊操作成功前隊列元素大于1,那當(dāng)前出隊操作成功后隊列元素也就大于0,則釋放一個“不空”的信號
          ????????if?(c?>?1)
          ????????????notEmpty.signal();

          ????}?finally?{
          ????????takeLock.unlock();
          ????}

          ????//這里的c前面說了是元素出隊前的數(shù)量,如果出隊前元素數(shù)量為總?cè)萘浚犃惺菨M的),那可能會有入隊線程在等待一個“不滿”的信號,所以這里釋放一個“不滿”的信號。
          ????if?(c?==?capacity)
          ????????signalNotFull();
          ????return?x;
          }
          private?void?signalNotFull()?{
          ????final?ReentrantLock?putLock?=?this.putLock;
          ????putLock.lock();
          ????try?{
          ????????notFull.signal();
          ????}?finally?{
          ????????putLock.unlock();
          ????}
          }

          ??這里源碼的同步邏輯比ArrayBlockingQueue中要稍微復(fù)雜一點,在ArrayBlockingQueue中每次入隊都釋放一個“不空”的信號,每次出隊都釋放一個“不滿”的信號,而LinkedBlockingQueue則不同。

          ??元素入隊的時候

          1.入隊后還有空位,則釋放一個“不滿”的信號。
          2.入隊時隊列為空,則釋放一個“不空”的信號。

          ??元素出隊的時候

          1.出隊后隊列還有元素,則釋放一個“不空”的信號。
          2.出隊前隊列是滿的,則釋放一個“不滿”的信號。

          SynchronousQueue

          ??SynchronousQueue從名字看叫“同步隊列”,怎么理解呢?雖然他也叫隊列,但是他不提供空間存儲元素!當(dāng)一個線程往隊列添加元素,需要匹配到有另外一個線程從隊列取元素,否則線程阻塞!當(dāng)一個線程從隊列獲取元素,需要匹配到有另外一個線程往隊列添加元素,否則線程阻塞!所以這里的同步指的就是入隊線程和出隊線程需要同步!這里有點類似你媽媽對你說:“今年你再找不到女朋友,過年你就別回來了!”,于是你第二年就真的沒回去過年!因為你是一個獲取數(shù)據(jù)(找女朋友)的線程,數(shù)據(jù)沒獲取到則一直阻塞!

          ??了解了大致概念,我們再來看看源碼!

          ????/**
          ?????*?Creates?a?{@code?SynchronousQueue}?with?nonfair?access?policy.
          ?????*/

          ????public?SynchronousQueue()?{
          ????????this(false);
          ????}

          ????/**
          ?????*?Creates?a?{@code?SynchronousQueue}?with?the?specified?fairness?policy.
          ?????*
          ?????*?@param?fair?if?true,?waiting?threads?contend?in?FIFO?order?for
          ?????*????????access;?otherwise?the?order?is?unspecified.
          ?????*/

          ????public?SynchronousQueue(boolean?fair)?{
          ????????transferer?=?fair???new?TransferQueue()?:?new?TransferStack();
          ????}

          ??兩個構(gòu)造函數(shù),fair參數(shù)指定公平策略,默認(rèn)為false,因此是非公平模式!先看看put和take方法的實現(xiàn):

          ????public?void?put(E?e)?throws?InterruptedException?{
          ????????if?(e?==?null)?throw?new?NullPointerException();
          ????????if?(transferer.transfer(e,?false,?0)?==?null)?{
          ????????????Thread.interrupted();
          ????????????throw?new?InterruptedException();
          ????????}
          ????}

          ????public?E?take()?throws?InterruptedException?{
          ????????E?e?=?transferer.transfer(null,?false,?0);
          ????????if?(e?!=?null)
          ????????????return?e;
          ????????Thread.interrupted();
          ????????throw?new?InterruptedException();
          ????}

          ??put和take方法很類似,都是調(diào)用transferer.transfer(…)方法,區(qū)別在于第一個參數(shù)!put方法在調(diào)用時候會傳入入隊的值,而take方法傳入null。

          ??上面說過有公平和非公平策略,今天將重點分析公平模式TransferQueue的源碼!從名字能看出來這也是一個隊列,我們先看TransferQueue的重點屬性和構(gòu)造方法:

          ????//?指向隊列頭部
          ????transient?volatile?QNode?head;
          ????//?指向隊列尾部
          ????transient?volatile?QNode?tail;

          ????TransferQueue()?{
          ????????//初始化一個空
          ????????QNode?h?=?new?QNode(null,?false);?//?initialize?to?dummy?node.
          ????????head?=?h;
          ????????tail?=?h;
          ????}

          ??一頭一尾,鏈表的一貫操作!構(gòu)造方法中,創(chuàng)建了一個QNode結(jié)點,并且將head和tail都指向這個結(jié)點!我們再看看QNode類的重要屬性和構(gòu)造方法:

          volatile?QNode?next;??????????//?指向隊列的下一個節(jié)點
          volatile?Object?item;?????????//?節(jié)點存儲的元素
          volatile?Thread?waiter;???????//?被阻塞的線程
          final?boolean?isData;??????????//?是否是“數(shù)據(jù)”結(jié)點(入隊線程為true,出隊線程為false)

          QNode(Object?item,?boolean?isData)?{
          ???this.item?=?item;
          ???this.isData?=?isData;
          ???}

          ??執(zhí)行完構(gòu)造函數(shù),節(jié)點圖示如下,一頭一尾都指向構(gòu)造函數(shù)中創(chuàng)建出來的新節(jié)點!

          fcbf0b4478a64810bfd2719b7b91e64d.webp1583417029976

          ??我們再回到上面提到的transferer.transfer(…)方法,也就是TransferQueue中的transfer(…)方法,核心邏輯都在這個方法中體現(xiàn):

          /**
          ?*?“存”或者“取”一個元素
          ?*/

          @SuppressWarnings("unchecked")
          E?transfer(E?e,?boolean?timed,?long?nanos)?{
          ????QNode?s?=?null;?//?constructed/reused?as?needed

          ????//當(dāng)前操作類型,傳非null的值則為生產(chǎn)線程,傳null則為消費線程。
          ????boolean?isData?=?(e?!=?null);

          ????for?(;;)?{
          ????????QNode?t?=?tail;
          ????????QNode?h?=?head;
          ????????//上面我們說過在構(gòu)造方法中就創(chuàng)建了一個QNode結(jié)點,并且將head和tail都指向這個結(jié)點
          ????????//因此這里t、h一般情況下不會為null
          ????????if?(t?==?null?||?h?==?null)?????????//?saw?uninitialized?value
          ????????????continue;???????????????????????//?spin

          ????????//根據(jù)SynchronousQueue的特性,不同類型的操作會配對成功。
          ????????//因此在阻塞隊列中只會存在一種類型的阻塞節(jié)點,要么全是消費線程要么全是生產(chǎn)線程!
          ????????//所以分三種情況:
          ????????//1.h == t,這種情況下隊列為空,需要將當(dāng)前節(jié)點入隊。
          ????????//2.t.isData?==?isData尾部節(jié)點的操作類型與當(dāng)前操作類型
          ????????//???????一致(尾部節(jié)點的操作類型代表著隊列中所有節(jié)點的操作類型),需要將當(dāng)前節(jié)點入隊。
          ????????//3.隊列不為空且尾部節(jié)點的操作類型與當(dāng)前操作類型不一致,
          ????????//???????需要從隊列頭部匹配一個節(jié)點并返回。
          ????????//因此再看下面的代碼,會根據(jù)上面3種情況走不同的分支。
          ????????if?(h?==?t?||?t.isData?==?isData)?{?//?empty?or?same-mode

          ????????????//進(jìn)入這個分支就是上面1、2的情況

          ????????????//獲取尾部節(jié)點的next指向,正常情況下tn等于null
          ????????????QNode?tn?=?t.next;

          ????????????//下面是判斷是否出現(xiàn)并發(fā)導(dǎo)致尾節(jié)點被更改
          ????????????if?(t?!=?tail)??????????????????//?inconsistent?read
          ????????????????continue;
          ????????????if?(tn?!=?null)?{???????????????//?lagging?tail
          ????????????????advanceTail(t,?tn);
          ????????????????continue;
          ????????????}

          ????????????//超時判斷
          ????????????if?(timed?&&?nanos?<=?0)????????//?can't?wait
          ????????????????return?null;

          ????????????//將當(dāng)前操作創(chuàng)建為新節(jié)點,傳入數(shù)據(jù)值和操作類型。
          ????????????//這里可以看下面的“圖二”
          ????????????if?(s?==?null)
          ????????????????s?=?new?QNode(e,?isData);

          ????????????//1、將阻塞隊列中尾部節(jié)點的next指向新節(jié)點
          ????????????//2、將tail屬性的指向設(shè)置為新節(jié)點
          ????????????//下面兩行執(zhí)行后隊列的變化可以看下面的“圖三”,注意紅色箭頭
          ????????????if?(!t.casNext(null,?s))????????//?failed?to?link?in
          ????????????????continue;
          ????????????advanceTail(t,?s);??????????????//?swing?tail?and?wait

          ????????????//在這個方法內(nèi)部會進(jìn)行自旋或者阻塞,直到配對成功。
          ????????????//建議這里先跳到下面這個方法內(nèi)部看完邏輯再回來。
          ????????????Object?x?=?awaitFulfill(s,?e,?timed,?nanos);

          ????????????//只有在線程被中斷的情況下會進(jìn)入這個分支
          ????????????if?(x?==?s)?{???????????????????//?wait?was?cancelled
          ????????????????clean(t,?s);
          ????????????????return?null;
          ????????????}

          ????????????if?(!s.isOffList())?{???????????//?not?already?unlinked
          ????????????????advanceHead(t,?s);??????????//?unlink?if?head
          ????????????????if?(x?!=?null)??????????????//?and?forget?fields
          ????????????????????s.item?=?s;
          ????????????????s.waiter?=?null;
          ????????????}

          ????????????//如果為生產(chǎn)線程,則返回入隊的值;如果為消費線程,則返回匹配到的生產(chǎn)線程的值。
          ????????????return?(x?!=?null)???(E)x?:?e;

          ????????}?else?{????????????????????????????//?complementary-mode

          ????????????//進(jìn)入這個分支就是上面3的情況

          ????????????//找到頭部節(jié)點的next指向
          ????????????//可以看下面的“圖四”,注意紅色箭頭指向的就是匹配到的節(jié)點
          ????????????QNode?m?=?h.next;???????????????//?node?to?fulfill
          ????????????if?(t?!=?tail?||?m?==?null?||?h?!=?head)
          ????????????????continue;???????????????????//?inconsistent?read
          ????????????Object?x?=?m.item;

          ????????????//m.casItem(x, e)方法很重要,會將匹配到的節(jié)點的item修改為當(dāng)前操作的值。
          ????????????//這樣awaitFulfill方法的x != e條件才能成立,被匹配的阻塞線程才能返回。
          ????????????//可以看下面的“圖五”,注意node1節(jié)點中item屬性對應(yīng)的值的變化。
          ????????????if?(isData?==?(x?!=?null)?||????//?m?already?fulfilled
          ????????????????x?==?m?||???????????????????//?m?cancelled
          ????????????????!m.casItem(x,?e))?{?????????//?lost?CAS
          ????????????????advanceHead(h,?m);??????????//?dequeue?and?retry
          ????????????????continue;
          ????????????}

          ????????????//調(diào)整head屬性的指向,這里建議這里先跳到下面這個方法內(nèi)部看完邏輯再回來。
          ????????????advanceHead(h,?m);??????????????//?successfully?fulfilled

          ????????????//喚醒匹配到的阻塞線程
          ????????????LockSupport.unpark(m.waiter);

          ????????????//如果為生產(chǎn)線程,則返回入隊的值;如果為消費線程,則返回匹配到的生產(chǎn)線程的值。
          ????????????return?(x?!=?null)???(E)x?:?e;
          ????????}
          ????}
          }

          Object?awaitFulfill(QNode?s,?E?e,?boolean?timed,?long?nanos)?{
          ????/*?Same?idea?as?TransferStack.awaitFulfill?*/
          ????final?long?deadline?=?timed???System.nanoTime()?+?nanos?:?0L;
          ????Thread?w?=?Thread.currentThread();

          ????//如果頭節(jié)點的next指向當(dāng)前的數(shù)據(jù)節(jié)點,也就是當(dāng)前數(shù)據(jù)節(jié)點是下一個待匹配的節(jié)點,那就自旋等待一會兒。
          ????//如果設(shè)置了超時時間就少自旋一會兒,沒有設(shè)置超時時間就多自旋一會兒。
          ????//可以看看maxTimedSpins和maxUntimedSpins兩個屬性的值設(shè)置,是與cpu數(shù)量相關(guān)的。
          ????int?spins?=?((head.next?==?s)??
          ?????????????????(timed???maxTimedSpins?:?maxUntimedSpins)?:?0);

          ????for?(;;)?{
          ????????if?(w.isInterrupted())
          ????????????s.tryCancel(e);
          ????????Object?x?=?s.item;
          ????????//?第一次進(jìn)來這里肯定是相等的,所以不會進(jìn)入這個分支。
          ????????//?當(dāng)有其他的線程匹配到當(dāng)前節(jié)點,這里的s.item的值會被更改(前面說到過的m.casItem(x, e)方法),所以方法返回。
          ????????if?(x?!=?e)
          ????????????return?x;
          ????????if?(timed)?{
          ????????????nanos?=?deadline?-?System.nanoTime();
          ????????????if?(nanos?<=?0L)?{
          ????????????????s.tryCancel(e);
          ????????????????continue;
          ????????????}
          ????????}

          ????????if?(spins?>?0)
          ????????????--spins;
          ????????else?if?(s.waiter?==?null)
          ????????????s.waiter?=?w;
          ????????else?if?(!timed)
          ????????????//這里線程會阻塞,如果有線程與當(dāng)前線程匹配,則被喚醒進(jìn)行下一次循環(huán)。
          ????????????LockSupport.park(this);
          ????????else?if?(nanos?>?spinForTimeoutThreshold)
          ????????????LockSupport.parkNanos(this,?nanos);
          ????}
          }

          void?advanceHead(QNode?h,?QNode?nh)?{
          ????//這個方法做了兩個操作
          ????//1、將head屬性的指向調(diào)整為頭節(jié)點的下一個結(jié)點
          ????//2、將原頭節(jié)點的next指向原頭節(jié)點本身
          ????//可以看下面的“圖六”,注意看紅色箭頭。
          ????if?(h?==?head?&&
          ????????UNSAFE.compareAndSwapObject(this,?headOffset,?h,?nh))
          ????????h.next?=?h;?//?forget?old?next
          }
          373aea4f7ddf5c4573ace49f8c95519a.webp1583418034152c572b794a064f89cc53dc62067301b7a.webp1583417597105d8eb948620ee288ad9608a5041bcb769.webp1583417621278

          ??上面就是基于公平模式TransferQueue分析SynchronousQueue的實現(xiàn),有興趣的可以自己去看看非公平模式TransferStack的實現(xiàn)。代碼類的文章可能在手機(jī)上看起來會比較累,可移步個人博客www.17coding.info獲得更好的閱讀體驗。


          瀏覽 25
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产美女啪啪 | 亚洲香蕉第一页 | 欧美一级特黄A片免费 | 亚洲大几吧色色91视频 | 女人 精96XXXxx在线播放 |