多線程編程-分析阻塞隊列的源碼實現(xiàn)
??看過我上一篇文章的應(yīng)該知道(家里條件允許的可以先看看上一篇文章),如果想實現(xiàn)一個生產(chǎn)者消費者模型,我們可以基于JVM自帶的synchronized+wait+notify實現(xiàn),也可以用JDK里面的ReentrantLock+Condition實現(xiàn)!不過從上篇文章的demo看,實現(xiàn)起來也不是那么容易!因為我們既要關(guān)心什么時候需要阻塞線程,又要需要關(guān)心何時喚醒線程。控制的細(xì)節(jié)太多,一個疏忽可能就導(dǎo)致了一個不易發(fā)現(xiàn)的bug,比如上篇文章中的虛假喚醒的例子!那有沒有一種我們不用關(guān)心那么多復(fù)雜細(xì)節(jié)就能實現(xiàn)生產(chǎn)者消費者模式的方法呢?本文要講的阻塞隊列就是一種很好的實現(xiàn)!
??在我們剛開始學(xué)數(shù)據(jù)結(jié)構(gòu)的時候,都接觸過一種先進(jìn)先出(first in first out,簡稱“FIFO”)的數(shù)據(jù)結(jié)構(gòu),叫隊列。阻塞隊列從名字看也是隊列的一種,因此滿足隊列的特性,然后這個隊列是可阻塞的!這個阻塞怎么理解呢?就是當(dāng)我們一個線程往阻塞隊列里面添加元素的時候,如果隊列滿了,那這個線程不會直接返回,而是會被阻塞,直到元素添加成功!當(dāng)我們一個線程從阻塞隊列里面獲取元素的時候,如果隊列是空的,那這個線程不會直接返回,而是會被阻塞直到元素獲取成功。而阻塞以及喚醒的操作都由阻塞隊列來管理!
常用阻塞隊列類圖
??我們先看在java中阻塞隊列基本的繼承關(guān)系圖:
1583070100191??完整的繼承關(guān)系要比這張圖復(fù)雜一些,但為了清晰起見圖中我只畫了主要的類和關(guān)系。隊列的基接口Queue與我們開發(fā)中經(jīng)常用到的List、Set是兄弟關(guān)系,因此我這里也列出來了方便對比記憶!阻塞隊列的基接口是繼承自Queue接口的BlockingQueue接口,其他阻塞隊列具體實現(xiàn)都繼承BlockingQueue接口!
BlockingQueue常用方法
??我們先看隊列基接口Queue中的方法
1583071182495??這個接口一共6個方法,我們可以分為兩組
??1、“異常”組
1、add(e):將元素放到隊列末尾,成功返回true,失敗則拋異常。
2、remove():獲取并移除隊首元素,獲取失敗則拋異常。
3、element():獲取隊首元素,不移除,獲取失敗則拋異常。
??2、“特殊值”組
1、offer(e):將元素放到隊列末尾,成功返回true,失敗返回false。
2、poll():獲取并返回隊首元素,獲取失敗則返回null。
3、peek():獲取隊首元素,不移除,獲取失敗則返回null。
??“異常”組的3個方法在操作失敗的時候會拋異常,因此叫“異常”組!
??“特殊值”組3個方法與“異常”組的3個方法是一一對應(yīng)的,功能都一樣,只是在操作失敗的時候不會拋異常而是返回一個特殊值,因此叫“特殊值組”。
??這兩組方法都是在Queue接口中定義的,因此跟阻塞就沒有什么關(guān)系了。那我們再看看BlockingQueue接口中的方法
1583071557087??這個接口我們重點關(guān)注標(biāo)記出來的4個方法,這幾個方法我們也可以分為兩組
??3、“阻塞”組
1、put(e):將元素放到隊列末尾,如果隊列滿了,則等待。
2、take():獲取并移除隊首元素,如果隊列為空,則等待。
??4、“超時”組
1、offer(e,time,unit):將元素放到隊列末尾,如果隊列滿了,則等待,當(dāng)?shù)却^指定時間后仍添加元素失敗,則返回false,否則返回true。
2、poll(time,unit):獲取并返回隊首元素,如果隊列為空,則等待,當(dāng)?shù)却^指定時間后仍獲取失敗則返回null,否則返回獲取到的元素。
??這兩組方法都是在BlockingQueue接口中定義的,因此都是跟阻塞相關(guān)的!
??“阻塞”組2個方法在操作不成功的時候會一直阻塞線程,直到能夠操作成功,因此叫“阻塞”組!用一個成語形容就是“不見不散”!
??“超時”組2個方法與“超時”組的2個方法是一一對應(yīng)的,功能都一樣,只是這2個方法不會一直阻塞,超過了指定的時間還沒成功就停止阻塞并返回,因此叫“超時”組!用一個成語形容就是“過時不候”!
??這四組方法合在一起就有了下面的一張表格:
| 方法功能 | 異常組 | 特殊值組 | 阻塞組 | 超時組 |
|---|---|---|---|---|
| 元素入隊 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 元素出隊 | remove() | pool() | take() | poll(time,unit) |
| 檢查元素 | element() | peek() | 無 | 無 |
源碼分析常用阻塞隊列
??BlockingQueue的實現(xiàn)類有多個,但是如果每一個源碼都進(jìn)行分析那不僅很影響篇幅且沒必要,因此我這里拿三個常用的阻塞隊列源碼進(jìn)行分析!在源碼中jdk的版本為1.8!
ArrayBlockingQueue
??我們先看下ArrayBlockingQueue中的幾個屬性
????/**?The?queued?items?使用數(shù)組存儲元素?*/
????final?Object[]?items;
????/** items index for next take, poll, peek or remove 下一個出隊元素索引?*/
????int?takeIndex;
????/**?items?index?for?next?put,?offer,?or?add?下一個入隊元素索引?*/
????int?putIndex;
????/**?Number?of?elements?in?the?queue?隊列元素個數(shù)?*/
????int?count;
????/*
?????*?ReentrantLock+Condition控制并發(fā)
?????*?Concurrency?control?uses?the?classic?two-condition?algorithm
?????*?found?in?any?textbook.
?????*/
????/**?Main?lock?guarding?all?access?*/
????final?ReentrantLock?lock;
????/**?Condition?for?waiting?takes?*/
????private?final?Condition?notEmpty;
????/**?Condition?for?waiting?puts?*/
????private?final?Condition?notFull;
1.object類型數(shù)組,也意味著ArrayBlockingQueue底層數(shù)據(jù)結(jié)構(gòu)是數(shù)組。
2.ReentrantLock+Condition,如果看過我上一篇文章的應(yīng)該很熟悉,這是用作來線程同步和線程通信的。
??我們再看下ArrayBlockingQueue的構(gòu)造函數(shù)。
????public?ArrayBlockingQueue(int?capacity)?{
????????this(capacity,?false);
????}
????public?ArrayBlockingQueue(int?capacity,?boolean?fair)?{
????????if?(capacity?<=?0)
????????????throw?new?IllegalArgumentException();
????????this.items?=?new?Object[capacity];
????????lock?=?new?ReentrantLock(fair);
????????notEmpty?=?lock.newCondition();
????????notFull?=??lock.newCondition();
????}
????public?ArrayBlockingQueue(int?capacity,?boolean?fair,
??????????????????????????????Collection?extends?E>?c){
????????this(capacity,?fair);
????????//初始化一個集合到隊列
????????....
????}
??這三個構(gòu)造函數(shù)都必須傳入一個int類型的capacity參數(shù),這個參數(shù)也意味著ArrayBlockingQueue是一個有界的阻塞隊列!
??我們前面說過隊列有常用的四組方法,而跟阻塞相關(guān)的是“阻塞”組和“超時”組的四個方法!我們以“阻塞”組的put()和take()方法為例,來窺探一下源碼里面的奧秘:
????/**
?????*?Inserts?the?specified?element?at?the?tail?of?this?queue,?waiting
?????*?for?space?to?become?available?if?the?queue?is?full.
?????*/
????public?void?put(E?e)?throws?InterruptedException?{
????????checkNotNull(e);
????????//加鎖操作
????????final?ReentrantLock?lock?=?this.lock;
????????lock.lockInterruptibly();
????????try?{
????????????//判斷隊列是否滿足入隊條件,如果隊列已滿,則阻塞等待一個“不滿”的信號
????????????while?(count?==?items.length)
????????????????notFull.await();
????????????//滿足條件,則進(jìn)行入隊操作
????????????enqueue(e);
????????}?finally?{
????????????lock.unlock();
????????}
????}
????private?void?enqueue(E?x)?{
????????final?Object[]?items?=?this.items;
????????items[putIndex]?=?x;
????????//?下一個入隊元素索引超過了數(shù)組的長度,則又從0開始。
????????if?(++putIndex?==?items.length)
????????????putIndex?=?0;
????????count++;
????????//放入元素后,釋放一個“不空”的信號。喚醒等待中的出隊線程。
????????notEmpty.signal();
????}
????public?E?take()?throws?InterruptedException?{
????????//加鎖操作
????????final?ReentrantLock?lock?=?this.lock;
????????lock.lockInterruptibly();
????????try?{
????????????//判斷隊列是否滿足出隊條件,如果隊列為空,則阻塞等待一個“不空”的信號
????????????while?(count?==?0)
????????????????notEmpty.await();
????????????//滿足條件,則進(jìn)行出隊操作
????????????return?dequeue();
????????}?finally?{
????????????lock.unlock();
????????}
????}
????private?E?dequeue()?{
????????final?Object[]?items?=?this.items;
????????E?x?=?(E)?items[takeIndex];
????????items[takeIndex]?=?null;//help?GC
????????//?下一個出隊元素索引超過了數(shù)組的長度,則又從0開始。
????????if?(++takeIndex?==?items.length)
????????????takeIndex?=?0;
????????count--;
????????if?(itrs?!=?null)
????????????itrs.elementDequeued();//更新迭代器元素數(shù)據(jù)
????????//取出元素后,釋放一個“不滿”的信號。喚醒等待中的入隊線程。
????????notFull.signal();
????????return?x;
????}
??ArrayBlockingQueue的入隊出隊代碼還是很簡單的,當(dāng)我們往一個阻塞隊列里面添加數(shù)據(jù)的時候,阻塞隊列用一個固定長度的數(shù)據(jù)存儲數(shù)據(jù),如果數(shù)組的長度達(dá)到了最大容量,則添加數(shù)據(jù)的線程會被阻塞。當(dāng)我們從阻塞隊列獲取數(shù)據(jù)的時候,如果隊列為空,則獲取數(shù)據(jù)的線程會被阻塞!相信代碼上的注釋已經(jīng)足夠理解這塊的代碼邏輯了!
LinkedBlockingQueue
??我們先看下LinkedBlockingQueue中的幾個屬性
/**?The?capacity?bound,?or?Integer.MAX_VALUE?if?none?隊列容量?*/
private?final?int?capacity;
/**?Current?number?of?elements?隊列元素個數(shù)?*/
private?final?AtomicInteger?count?=?new?AtomicInteger();
/**
?*?隊列頭
?*?Head?of?linked?list.
?*?Invariant:?head.item?==?null
?*/
transient?Node?head;
/**
?*?隊列尾
?*?Tail?of?linked?list.
?*?Invariant:?last.next?==?null
?*/
private?transient?Node?last;
/**?Lock?held?by?take,?poll,?etc?出隊操作用到的鎖?*/
private?final?ReentrantLock?takeLock?=?new?ReentrantLock();
/**?Wait?queue?for?waiting?takes?*/
private?final?Condition?notEmpty?=?takeLock.newCondition();
/**?Lock?held?by?put,?offer,?etc?入隊操作用到的鎖?*/
private?final?ReentrantLock?putLock?=?new?ReentrantLock();
/**?Wait?queue?for?waiting?puts?*/
private?final?Condition?notFull?=?putLock.newCondition();
1.Node類型的變量head和last,這是鏈表常見操作,也意味著LinkedBlockingQueue底層數(shù)據(jù)結(jié)構(gòu)是鏈表。
2.與ArrayBlockingQueue不同的是,這里有兩個ReentrantLock對象,put操作個take操作的鎖對象是分開的,這樣做也是為了提高容器的并發(fā)能力。
??再看下Node這個內(nèi)部類
????/**
?????*?Linked?list?node?class
?????*/
????static?class?Node<E>?{
????????E?item;
????????//指向下一個節(jié)點
????????Node?next;
????????Node(E?x)?{?item?=?x;?}
????}
??只有next屬性意味著這是一個單向鏈表!
??再看下LinkedBlockingQueue的構(gòu)造函數(shù)
????public?LinkedBlockingQueue()?{
????????this(Integer.MAX_VALUE);
????}
????public?LinkedBlockingQueue(int?capacity)?{
????????if?(capacity?<=?0)?throw?new?IllegalArgumentException();
????????this.capacity?=?capacity;
????????last?=?head?=?new?Node(null);
????}
????public?LinkedBlockingQueue(Collection?extends?E>?c)?{
????????this(Integer.MAX_VALUE);
????????...
????????}
1.當(dāng)構(gòu)造函數(shù)不傳capacity參數(shù)的時候,LinkedBlockingQueue就是一個無界阻塞隊列(其實也并非無界,不傳默認(rèn)值就是Integer.MAX_VALUE)。
2.當(dāng)構(gòu)造函數(shù)傳入capacity參數(shù)的時候,LinkedBlockingQueue就是一個有界阻塞隊列。
??我們依然看看在LinkedBlockingQueue中“阻塞”組的兩個方法put()和take()分別怎么實現(xiàn)的
/**
?*?Inserts?the?specified?element?at?the?tail?of?this?queue,?waiting?if
?*?necessary?for?space?to?become?available.
?*/
public?void?put(E?e)?throws?InterruptedException?{
????if?(e?==?null)?throw?new?NullPointerException();
????//存儲隊列元素數(shù)量
????int?c?=?-1;
????//創(chuàng)建新節(jié)點
????Node?node?=?new?Node(e);
????//獲取putLock
????final?ReentrantLock?putLock?=?this.putLock;
????//隊列元素數(shù)量
????final?AtomicInteger?count?=?this.count;
????putLock.lockInterruptibly();
????try?{
????????//判斷隊列是否滿足入隊條件,如果隊列已滿,則阻塞等待一個“不滿”的信號
????????while?(count.get()?==?capacity)?{
????????????notFull.await();
????????}
????????//入隊操作
????????enqueue(node);
????????//隊列元素數(shù)量+1,執(zhí)行完下面這句后,count是入隊后的元素數(shù)量,而c的值還是入隊前的元素數(shù)量。
????????c?=?count.getAndIncrement();
????????//當(dāng)前入隊操作成功后,如果元素數(shù)量還小于隊列容量,則釋放一個“不滿”的信號
????????if?(c?+?1?????????????notFull.signal();
????}?finally?{
????????putLock.unlock();
????}
????//這里的c前面說了是元素入隊前的數(shù)量,如果入隊前元素數(shù)量為0(隊列是空的),那可能會有出隊線程在等待一個“不空”的信號,所以這里釋放一個“不空”的信號。
????if?(c?==?0)
????????signalNotEmpty();
}
private?void?signalNotEmpty()?{
????final?ReentrantLock?takeLock?=?this.takeLock;
????takeLock.lock();
????try?{
????????notEmpty.signal();
????}?finally?{
????????takeLock.unlock();
????}
}
public?E?take()?throws?InterruptedException?{
????//出隊元素
????E?x;
????//存儲隊列元素數(shù)量
????int?c?=?-1;
????//隊列元素數(shù)量
????final?AtomicInteger?count?=?this.count;
????//獲取takeLock
????final?ReentrantLock?takeLock?=?this.takeLock;
????takeLock.lockInterruptibly();
????try?{
????????//判斷隊列是否滿足出隊條件,如果隊列為空,則阻塞等待一個“不空”的信號
????????while?(count.get()?==?0)?{
????????????notEmpty.await();
????????}
????????//出隊操作
????????x?=?dequeue();
????????//隊列元素數(shù)量-1,執(zhí)行完下面這句后,count是出隊后的元素數(shù)量,而c的值還是出隊前的元素數(shù)量。
????????c?=?count.getAndDecrement();
????????//當(dāng)前出隊操作成功前隊列元素大于1,那當(dāng)前出隊操作成功后隊列元素也就大于0,則釋放一個“不空”的信號
????????if?(c?>?1)
????????????notEmpty.signal();
????}?finally?{
????????takeLock.unlock();
????}
????//這里的c前面說了是元素出隊前的數(shù)量,如果出隊前元素數(shù)量為總?cè)萘浚犃惺菨M的),那可能會有入隊線程在等待一個“不滿”的信號,所以這里釋放一個“不滿”的信號。
????if?(c?==?capacity)
????????signalNotFull();
????return?x;
}
private?void?signalNotFull()?{
????final?ReentrantLock?putLock?=?this.putLock;
????putLock.lock();
????try?{
????????notFull.signal();
????}?finally?{
????????putLock.unlock();
????}
}
??這里源碼的同步邏輯比ArrayBlockingQueue中要稍微復(fù)雜一點,在ArrayBlockingQueue中每次入隊都釋放一個“不空”的信號,每次出隊都釋放一個“不滿”的信號,而LinkedBlockingQueue則不同。
??元素入隊的時候
1.入隊后還有空位,則釋放一個“不滿”的信號。
2.入隊時隊列為空,則釋放一個“不空”的信號。
??元素出隊的時候
1.出隊后隊列還有元素,則釋放一個“不空”的信號。
2.出隊前隊列是滿的,則釋放一個“不滿”的信號。
SynchronousQueue
??SynchronousQueue從名字看叫“同步隊列”,怎么理解呢?雖然他也叫隊列,但是他不提供空間存儲元素!當(dāng)一個線程往隊列添加元素,需要匹配到有另外一個線程從隊列取元素,否則線程阻塞!當(dāng)一個線程從隊列獲取元素,需要匹配到有另外一個線程往隊列添加元素,否則線程阻塞!所以這里的同步指的就是入隊線程和出隊線程需要同步!這里有點類似你媽媽對你說:“今年你再找不到女朋友,過年你就別回來了!”,于是你第二年就真的沒回去過年!因為你是一個獲取數(shù)據(jù)(找女朋友)的線程,數(shù)據(jù)沒獲取到則一直阻塞!
??了解了大致概念,我們再來看看源碼!
????/**
?????*?Creates?a?{@code?SynchronousQueue}?with?nonfair?access?policy.
?????*/
????public?SynchronousQueue()?{
????????this(false);
????}
????/**
?????*?Creates?a?{@code?SynchronousQueue}?with?the?specified?fairness?policy.
?????*
?????*?@param?fair?if?true,?waiting?threads?contend?in?FIFO?order?for
?????*????????access;?otherwise?the?order?is?unspecified.
?????*/
????public?SynchronousQueue(boolean?fair)?{
????????transferer?=?fair???new?TransferQueue()?:?new?TransferStack();
????}
??兩個構(gòu)造函數(shù),fair參數(shù)指定公平策略,默認(rèn)為false,因此是非公平模式!先看看put和take方法的實現(xiàn):
????public?void?put(E?e)?throws?InterruptedException?{
????????if?(e?==?null)?throw?new?NullPointerException();
????????if?(transferer.transfer(e,?false,?0)?==?null)?{
????????????Thread.interrupted();
????????????throw?new?InterruptedException();
????????}
????}
????public?E?take()?throws?InterruptedException?{
????????E?e?=?transferer.transfer(null,?false,?0);
????????if?(e?!=?null)
????????????return?e;
????????Thread.interrupted();
????????throw?new?InterruptedException();
????}
??put和take方法很類似,都是調(diào)用transferer.transfer(…)方法,區(qū)別在于第一個參數(shù)!put方法在調(diào)用時候會傳入入隊的值,而take方法傳入null。
??上面說過有公平和非公平策略,今天將重點分析公平模式TransferQueue的源碼!從名字能看出來這也是一個隊列,我們先看TransferQueue的重點屬性和構(gòu)造方法:
????//?指向隊列頭部
????transient?volatile?QNode?head;
????//?指向隊列尾部
????transient?volatile?QNode?tail;
????TransferQueue()?{
????????//初始化一個空
????????QNode?h?=?new?QNode(null,?false);?//?initialize?to?dummy?node.
????????head?=?h;
????????tail?=?h;
????}
??一頭一尾,鏈表的一貫操作!構(gòu)造方法中,創(chuàng)建了一個QNode結(jié)點,并且將head和tail都指向這個結(jié)點!我們再看看QNode類的重要屬性和構(gòu)造方法:
volatile?QNode?next;??????????//?指向隊列的下一個節(jié)點
volatile?Object?item;?????????//?節(jié)點存儲的元素
volatile?Thread?waiter;???????//?被阻塞的線程
final?boolean?isData;??????????//?是否是“數(shù)據(jù)”結(jié)點(入隊線程為true,出隊線程為false)
QNode(Object?item,?boolean?isData)?{
???this.item?=?item;
???this.isData?=?isData;
???}
??執(zhí)行完構(gòu)造函數(shù),節(jié)點圖示如下,一頭一尾都指向構(gòu)造函數(shù)中創(chuàng)建出來的新節(jié)點!
1583417029976??我們再回到上面提到的transferer.transfer(…)方法,也就是TransferQueue中的transfer(…)方法,核心邏輯都在這個方法中體現(xiàn):
/**
?*?“存”或者“取”一個元素
?*/
@SuppressWarnings("unchecked")
E?transfer(E?e,?boolean?timed,?long?nanos)?{
????QNode?s?=?null;?//?constructed/reused?as?needed
????//當(dāng)前操作類型,傳非null的值則為生產(chǎn)線程,傳null則為消費線程。
????boolean?isData?=?(e?!=?null);
????for?(;;)?{
????????QNode?t?=?tail;
????????QNode?h?=?head;
????????//上面我們說過在構(gòu)造方法中就創(chuàng)建了一個QNode結(jié)點,并且將head和tail都指向這個結(jié)點
????????//因此這里t、h一般情況下不會為null
????????if?(t?==?null?||?h?==?null)?????????//?saw?uninitialized?value
????????????continue;???????????????????????//?spin
????????//根據(jù)SynchronousQueue的特性,不同類型的操作會配對成功。
????????//因此在阻塞隊列中只會存在一種類型的阻塞節(jié)點,要么全是消費線程要么全是生產(chǎn)線程!
????????//所以分三種情況:
????????//1.h == t,這種情況下隊列為空,需要將當(dāng)前節(jié)點入隊。
????????//2.t.isData?==?isData尾部節(jié)點的操作類型與當(dāng)前操作類型
????????//???????一致(尾部節(jié)點的操作類型代表著隊列中所有節(jié)點的操作類型),需要將當(dāng)前節(jié)點入隊。
????????//3.隊列不為空且尾部節(jié)點的操作類型與當(dāng)前操作類型不一致,
????????//???????需要從隊列頭部匹配一個節(jié)點并返回。
????????//因此再看下面的代碼,會根據(jù)上面3種情況走不同的分支。
????????if?(h?==?t?||?t.isData?==?isData)?{?//?empty?or?same-mode
????????????//進(jìn)入這個分支就是上面1、2的情況
????????????//獲取尾部節(jié)點的next指向,正常情況下tn等于null
????????????QNode?tn?=?t.next;
????????????//下面是判斷是否出現(xiàn)并發(fā)導(dǎo)致尾節(jié)點被更改
????????????if?(t?!=?tail)??????????????????//?inconsistent?read
????????????????continue;
????????????if?(tn?!=?null)?{???????????????//?lagging?tail
????????????????advanceTail(t,?tn);
????????????????continue;
????????????}
????????????//超時判斷
????????????if?(timed?&&?nanos?<=?0)????????//?can't?wait
????????????????return?null;
????????????//將當(dāng)前操作創(chuàng)建為新節(jié)點,傳入數(shù)據(jù)值和操作類型。
????????????//這里可以看下面的“圖二”
????????????if?(s?==?null)
????????????????s?=?new?QNode(e,?isData);
????????????//1、將阻塞隊列中尾部節(jié)點的next指向新節(jié)點
????????????//2、將tail屬性的指向設(shè)置為新節(jié)點
????????????//下面兩行執(zhí)行后隊列的變化可以看下面的“圖三”,注意紅色箭頭
????????????if?(!t.casNext(null,?s))????????//?failed?to?link?in
????????????????continue;
????????????advanceTail(t,?s);??????????????//?swing?tail?and?wait
????????????//在這個方法內(nèi)部會進(jìn)行自旋或者阻塞,直到配對成功。
????????????//建議這里先跳到下面這個方法內(nèi)部看完邏輯再回來。
????????????Object?x?=?awaitFulfill(s,?e,?timed,?nanos);
????????????//只有在線程被中斷的情況下會進(jìn)入這個分支
????????????if?(x?==?s)?{???????????????????//?wait?was?cancelled
????????????????clean(t,?s);
????????????????return?null;
????????????}
????????????if?(!s.isOffList())?{???????????//?not?already?unlinked
????????????????advanceHead(t,?s);??????????//?unlink?if?head
????????????????if?(x?!=?null)??????????????//?and?forget?fields
????????????????????s.item?=?s;
????????????????s.waiter?=?null;
????????????}
????????????//如果為生產(chǎn)線程,則返回入隊的值;如果為消費線程,則返回匹配到的生產(chǎn)線程的值。
????????????return?(x?!=?null)???(E)x?:?e;
????????}?else?{????????????????????????????//?complementary-mode
????????????//進(jìn)入這個分支就是上面3的情況
????????????//找到頭部節(jié)點的next指向
????????????//可以看下面的“圖四”,注意紅色箭頭指向的就是匹配到的節(jié)點
????????????QNode?m?=?h.next;???????????????//?node?to?fulfill
????????????if?(t?!=?tail?||?m?==?null?||?h?!=?head)
????????????????continue;???????????????????//?inconsistent?read
????????????Object?x?=?m.item;
????????????//m.casItem(x, e)方法很重要,會將匹配到的節(jié)點的item修改為當(dāng)前操作的值。
????????????//這樣awaitFulfill方法的x != e條件才能成立,被匹配的阻塞線程才能返回。
????????????//可以看下面的“圖五”,注意node1節(jié)點中item屬性對應(yīng)的值的變化。
????????????if?(isData?==?(x?!=?null)?||????//?m?already?fulfilled
????????????????x?==?m?||???????????????????//?m?cancelled
????????????????!m.casItem(x,?e))?{?????????//?lost?CAS
????????????????advanceHead(h,?m);??????????//?dequeue?and?retry
????????????????continue;
????????????}
????????????//調(diào)整head屬性的指向,這里建議這里先跳到下面這個方法內(nèi)部看完邏輯再回來。
????????????advanceHead(h,?m);??????????????//?successfully?fulfilled
????????????//喚醒匹配到的阻塞線程
????????????LockSupport.unpark(m.waiter);
????????????//如果為生產(chǎn)線程,則返回入隊的值;如果為消費線程,則返回匹配到的生產(chǎn)線程的值。
????????????return?(x?!=?null)???(E)x?:?e;
????????}
????}
}
Object?awaitFulfill(QNode?s,?E?e,?boolean?timed,?long?nanos)?{
????/*?Same?idea?as?TransferStack.awaitFulfill?*/
????final?long?deadline?=?timed???System.nanoTime()?+?nanos?:?0L;
????Thread?w?=?Thread.currentThread();
????//如果頭節(jié)點的next指向當(dāng)前的數(shù)據(jù)節(jié)點,也就是當(dāng)前數(shù)據(jù)節(jié)點是下一個待匹配的節(jié)點,那就自旋等待一會兒。
????//如果設(shè)置了超時時間就少自旋一會兒,沒有設(shè)置超時時間就多自旋一會兒。
????//可以看看maxTimedSpins和maxUntimedSpins兩個屬性的值設(shè)置,是與cpu數(shù)量相關(guān)的。
????int?spins?=?((head.next?==?s)??
?????????????????(timed???maxTimedSpins?:?maxUntimedSpins)?:?0);
????for?(;;)?{
????????if?(w.isInterrupted())
????????????s.tryCancel(e);
????????Object?x?=?s.item;
????????//?第一次進(jìn)來這里肯定是相等的,所以不會進(jìn)入這個分支。
????????//?當(dāng)有其他的線程匹配到當(dāng)前節(jié)點,這里的s.item的值會被更改(前面說到過的m.casItem(x, e)方法),所以方法返回。
????????if?(x?!=?e)
????????????return?x;
????????if?(timed)?{
????????????nanos?=?deadline?-?System.nanoTime();
????????????if?(nanos?<=?0L)?{
????????????????s.tryCancel(e);
????????????????continue;
????????????}
????????}
????????if?(spins?>?0)
????????????--spins;
????????else?if?(s.waiter?==?null)
????????????s.waiter?=?w;
????????else?if?(!timed)
????????????//這里線程會阻塞,如果有線程與當(dāng)前線程匹配,則被喚醒進(jìn)行下一次循環(huán)。
????????????LockSupport.park(this);
????????else?if?(nanos?>?spinForTimeoutThreshold)
????????????LockSupport.parkNanos(this,?nanos);
????}
}
void?advanceHead(QNode?h,?QNode?nh)?{
????//這個方法做了兩個操作
????//1、將head屬性的指向調(diào)整為頭節(jié)點的下一個結(jié)點
????//2、將原頭節(jié)點的next指向原頭節(jié)點本身
????//可以看下面的“圖六”,注意看紅色箭頭。
????if?(h?==?head?&&
????????UNSAFE.compareAndSwapObject(this,?headOffset,?h,?nh))
????????h.next?=?h;?//?forget?old?next
}
1583418034152
1583417597105
1583417621278??上面就是基于公平模式TransferQueue分析SynchronousQueue的實現(xiàn),有興趣的可以自己去看看非公平模式TransferStack的實現(xiàn)。代碼類的文章可能在手機(jī)上看起來會比較累,可移步個人博客www.17coding.info獲得更好的閱讀體驗。
