并發(fā)編程之Condition解析
實(shí)現(xiàn)線程之間的通信,在Object類中提供等待/通知機(jī)制(并發(fā)編程之wait-notify解析)。而wait/notify有一個(gè)缺點(diǎn)如果有多個(gè)線程處于等待阻塞狀態(tài),單純的使用notify是無(wú)法喚醒具體的線程的,只能任意喚醒一個(gè)或者使用notifyAll喚醒全部。再者,調(diào)用wait方法處于阻塞中的線程無(wú)法支持響應(yīng)中斷。
在Java并發(fā)包下,提供了一套新的基于等待/喚醒的API,即Condition。接下來(lái)我們對(duì)Condition相關(guān)源碼做一個(gè)分析。

Condition是一個(gè)接口,其實(shí)現(xiàn)類有ConditionObject,在ConditionObject是一個(gè)AQS的內(nèi)部類。其對(duì)應(yīng)的方法有:
//阻塞等待void await()boolean await(long time, TimeUnit unit)long awaitNanos(long nanosTimeout)//通知喚醒void signal()void signalAll()
從其API中我們也可以看出在Object中的wait/notify類似。話不多說(shuō),開(kāi)始進(jìn)入源碼環(huán)節(jié)。

入口的方法為await:
public final void await() throws InterruptedException {// 線程阻塞的話 直接拋出異常if (Thread.interrupted())????throw?new?InterruptedException();????// 當(dāng)前需要阻塞的線程封裝成Node插入到隊(duì)列(等待隊(duì)列)的尾部Node node = addConditionWaiter();// 釋放所持有的鎖資源// 因?yàn)檎{(diào)用await之前,必須先獲得鎖,這里會(huì)釋放掉int savedState = fullyRelease(node);int interruptMode = 0;//判斷節(jié)點(diǎn)是否在同步隊(duì)列里面//如果在同步隊(duì)列里面說(shuō)明等待節(jié)點(diǎn)被喚醒了while (!isOnSyncQueue(node)) {LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 嘗試競(jìng)爭(zhēng)資源獲取鎖if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
我們知道,調(diào)用await方法之前必須先獲取到鎖,在await方法中,會(huì)把當(dāng)前線程封裝為一個(gè)Node插入到隊(duì)列的尾部,這里需要注意到的幾點(diǎn)是:
此隊(duì)列用于線程的等待,還不具體直接去競(jìng)爭(zhēng)臨界資源,這個(gè)隊(duì)列我們以下稱為等待隊(duì)列
等待隊(duì)列是單向的,跟AQS中的同步隊(duì)列是雙向的
等待隊(duì)列無(wú)頭結(jié)點(diǎn),而同步隊(duì)列最開(kāi)始會(huì)初始化一個(gè)空的頭結(jié)點(diǎn)
當(dāng)前線程節(jié)點(diǎn)加入到隊(duì)尾之后,需要釋放鎖資源fullyRelease,而這也是為什么在調(diào)用await的之前需要先獲取鎖。
然后判斷當(dāng)前節(jié)點(diǎn)是否在同步隊(duì)列中isOnSyncQueue(被喚醒的節(jié)點(diǎn)會(huì)被加到同步隊(duì)列中),如果當(dāng)前節(jié)點(diǎn)在同步隊(duì)列中發(fā)現(xiàn)了,說(shuō)明已經(jīng)被喚醒了。否則當(dāng)前線程阻塞,等待被喚醒(被通知喚醒的邏輯接下來(lái)分析)。
當(dāng)線程會(huì)喚醒之后,就會(huì)退出while語(yǔ)句,執(zhí)行acquireQueued獲取鎖從而繼續(xù)執(zhí)行。
到這里,我們知道,一個(gè)等待的線程會(huì)被封裝為Node,插入到等待隊(duì)列的尾部(隊(duì)列為單向鏈表),等待的被喚醒

那么接下來(lái),我們來(lái)看看通知喚醒的邏輯,入口方法為signal,
public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();// 獲取等待隊(duì)列的第一個(gè)節(jié)點(diǎn)喚醒Node first = firstWaiter;if (first != null)// 喚醒操作doSignal(first);}private void doSignal(Node first) {do {// 首節(jié)點(diǎn)是需要喚醒的,所以需要從隊(duì)列中移除掉if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;// 節(jié)點(diǎn)由等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中} while (!transferForSignal(first) &&(first = firstWaiter) != null);}final boolean transferForSignal(Node node) {// 插入到同步隊(duì)列中Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
調(diào)用signal方法,首先會(huì)喚醒等待時(shí)間最長(zhǎng)的,因?yàn)榈却?duì)列也是FIFO隊(duì)列,所以首節(jié)點(diǎn)就是等待時(shí)間最長(zhǎng)的節(jié)點(diǎn),所以這里需要喚醒首節(jié)點(diǎn)。喚醒的節(jié)點(diǎn)會(huì)從等待隊(duì)列中移除,然后把對(duì)應(yīng)的節(jié)點(diǎn)從等待隊(duì)列中加入到同步隊(duì)列,可以看出,其實(shí)喚醒操作就是把等待隊(duì)列中的節(jié)點(diǎn)加入到同步隊(duì)列,因?yàn)橹挥型疥?duì)列中的節(jié)點(diǎn)才具有競(jìng)爭(zhēng)臨界資源的資格。

為了有助于理解,對(duì)于ConditionObject等待喚醒的邏輯,做出了以下的圖:

線程嘗試回去資源,獲取到同步鎖之后進(jìn)入臨界資源,然后調(diào)用await方法,以尾插法的形式進(jìn)入等待隊(duì)列,這里需要注意的是,等待隊(duì)列是可以有多個(gè)的。當(dāng)調(diào)用signal方法的時(shí)候,會(huì)把等待隊(duì)列的首節(jié)點(diǎn)插入到同步隊(duì)列的尾節(jié)點(diǎn)中,而等待節(jié)點(diǎn)中相關(guān)的節(jié)點(diǎn)會(huì)被刪除。當(dāng)調(diào)用signalAll方法的時(shí)候,會(huì)把單個(gè)的等待隊(duì)列全部插入到同步隊(duì)列中,在同步節(jié)點(diǎn)中的線程會(huì)嘗試去競(jìng)爭(zhēng)臨界資源。

對(duì)Condition有了一個(gè)詳細(xì)的認(rèn)識(shí)后,我們來(lái)基于此實(shí)現(xiàn)一個(gè)等待隊(duì)列
public class TBlockingQueue {private String[] arr ;int size ;public int startIndex;public int endIndex;ReentrantLock rx = new ReentrantLock();Condition fullCd = rx.newCondition();Condition emptyCd = rx.newCondition();public TBlockingQueue(int n){if(n<=0) throw new RuntimeException("n must > 0");arr = new String[n];}public void put(String data){if(arr == null || data == null) throw new RuntimeException("must be init or check your data");try {rx.lock();while (arr.length <= size) {fullCd.await();}arr[endIndex] = data;if(arr.length -1 == endIndex){endIndex = 0 ;} else {endIndex++ ;}size ++ ;emptyCd.signalAll();} catch (InterruptedException e) {e.printStackTrace();} finally {rx.unlock();}}public String take(){try {rx.lock();while (size == 0){emptyCd.await();}String data = arr[startIndex];arr[startIndex] = null;if(arr.length -1 == startIndex){endIndex = 0 ;} else {startIndex++ ;}size -- ;fullCd.signalAll();return data;} catch (InterruptedException e) {e.printStackTrace();throw new RuntimeException(e);} finally {rx.unlock();}}}
?
感謝你的閱讀,有任何問(wèn)題你可以在此公眾號(hào)上與我交流,如果你覺(jué)得此文章對(duì)你有所收獲的話,可以關(guān)注一波【南瓜小燈】嘛。學(xué)習(xí)的路上,期待著你我共同前行!?。?/strong>
