Java多線程之ReentrantLock可重入鎖
這里就JUC包中的ReentrantLock可重入鎖做相關(guān)介紹

概述
Java在語言層面提供了synchronized鎖,其在經(jīng)歷了一系列的鎖優(yōu)化過程后。目前來看性能已經(jīng)是很優(yōu)秀的了。那ReentrantLock作為synchronized鎖的替代實(shí)現(xiàn),是否就完全沒有必要了呢?顯然不是,因?yàn)槠涮峁┝吮萻ynchronized鎖更靈活的控制方式及手段。這里首先說明ReentrantLock是一個可重入的互斥鎖,其常用方法如下所示。可以看到,一方面,相比于synchronized鎖的非公平性而言,ReentrantLock支持公平、非公平兩種實(shí)現(xiàn),默認(rèn)為非公平鎖;另一方面,ReentrantLock的加鎖、解鎖需要顯式調(diào)用方法操作,進(jìn)一步提高了控制的靈活性。實(shí)踐過程中,推薦將unlock釋放鎖操作放在finally塊中,以避免鎖未被正確釋放。值得一提的是,對于tryLock()方法而言,其是非阻塞的。當(dāng)此時鎖未被其他線程持有,則會直接分配給它。不論是否存在其他正在等待該鎖的線程。即使當(dāng)前這個可重入鎖實(shí)例是公平的。換言之tryLock()方法會破壞公平的可重入鎖的公平性。如果既期望使用非阻塞方式,又期望不破壞公平鎖的公平性,可以使用它的超時機(jī)制版本,同時將超時時間設(shè)為0。即 tryLock(0, TimeUnit.SECONDS)
//?創(chuàng)建一個非公平的可重入鎖
public?ReentrantLock();
//?創(chuàng)建一個非公平/公平的可重入鎖
public?ReentrantLock(boolean?fair);
//?阻塞式獲取鎖
public?void?lock();
//?阻塞式獲取鎖
public?void?lockInterruptibly()?throws?InterruptedException;
//?非阻塞式獲取鎖,?true:?獲取鎖成功;?false:?獲取鎖失敗
public?boolean?tryLock();
//?支持超時機(jī)制的非阻塞式獲取鎖,?true:?獲取鎖成功;?false:?獲取鎖失敗
public?boolean?tryLock(long?timeout,?TimeUnit?unit)?throws?InterruptedException;
//?釋放鎖
public?void?unlock();
//?獲取一個Condition實(shí)例
public?Condition?newCondition();
不僅如此, ReentrantLock還支持基于條件變量Condition的控制方式。具體地,可通過其newCondition方法獲取一個Condition實(shí)例。對于Condition而言,其常見的方法如下所示。可以看到,其與synchronized中的wait/notify/notifyAll機(jī)制是類似的。只不過ReentrantLock支持同時操作多個條件變量Condition,實(shí)現(xiàn)對線程間協(xié)作進(jìn)行更精細(xì)化的控制。需要注意的是,一方面,某線程通過條件變量A而進(jìn)入Wait狀態(tài),則喚醒它也必須是通過條件變量A,而不能通過其他條件變量進(jìn)行喚醒;另一方面,調(diào)用signal/signalAll方法只會喚醒在調(diào)用該方法前已經(jīng)進(jìn)入Wait狀態(tài)的線程,而在這之后進(jìn)入Wait狀態(tài)的線程則不會被喚醒
//?釋放鎖并進(jìn)入Wait狀態(tài)?
void?await()?throws?InterruptedException;
//?隨機(jī)喚醒一個通過該條件變量而進(jìn)入Wait狀態(tài)的線程
void?signal();
//?喚醒全部通過該條件變量而進(jìn)入Wait狀態(tài)的線程
void?signalAll();
實(shí)踐
可重入性
顧名思義,ReentrantLock鎖是可重入的。現(xiàn)在我們驗(yàn)證下,并通過這個例子來對其基本用法進(jìn)行實(shí)踐
public?class?ReentrantLockTest1?{
????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
????@Test
????public?void?test1()?{
????????ReentrantLock?reentrantLock?=?new?ReentrantLock();
????????ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);
????????Task?task?=?new?Task(reentrantLock);
????????for(int?i=1;?i<=3;?i++)?{
????????????threadPool.execute(?task?);
????????}
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?120*1000?);?}?catch?(Exception?e)?{}
????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
????}
????/**
?????*?打印信息
?????*?@param?msg
?????*/
????private?static?void?info(String?msg)?{
????????String?time?=?formatter.format(LocalTime.now());
????????String?thread?=?Thread.currentThread().getName();
????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
????????System.out.println(log);
????}
????@AllArgsConstructor
????private?static?class?Task?implements?Runnable{
????????private?final?ReentrantLock?reentrantLock;
????????@Override
????????public?void?run()?{
????????????reentrantLock.lock();
????????????info("成功獲取鎖?#1");
????????????try{
????????????????//?模擬業(yè)務(wù)耗時
????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????????methodA();
????????????}?catch?(Exception?e)?{
????????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
????????????}?finally?{
????????????????info("釋放鎖?#1\n");
????????????????reentrantLock.unlock();
????????????}
????????}
????????private?void?methodA()?{
????????????reentrantLock.lock();
????????????info("成功獲取鎖?#2");
????????????//?獲取鎖的重入次數(shù)
????????????int?count?=?reentrantLock.getHoldCount();
????????????info("count:?"?+?count);
????????????try{
????????????????//?模擬業(yè)務(wù)耗時
????????????????Thread.sleep(RandomUtils.nextLong(100,?500));
????????????}?catch?(Exception?e)?{
????????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
????????????}?finally?{
????????????????info("釋放鎖?#2");
????????????????reentrantLock.unlock();
????????????}
????????}
????}
}
測試結(jié)果如下所示,符合預(yù)期。可以看到其是一方面具有可重入性,另一方面也具有互斥性

Condition條件變量
這里通過生產(chǎn)者-消費(fèi)者模型來展示如何通過Condition條件變量進(jìn)行更好的控制。在這個例子中,我們有兩個生產(chǎn)者、兩個消費(fèi)者。前者用于添加數(shù)據(jù),后者則進(jìn)行數(shù)據(jù)消費(fèi),具體地,分別是奇數(shù)、偶數(shù)的消費(fèi)者。生產(chǎn)者每次生產(chǎn)完成后,根據(jù)隊(duì)列頭部元素的奇偶性通過相應(yīng)的條件變量通知喚醒對應(yīng)的消費(fèi)者進(jìn)行消費(fèi)。而消費(fèi)者每次消費(fèi)后會通知所有生產(chǎn)者。實(shí)現(xiàn)如下所示
public?class?ReentrantLockTest2?{
????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
????@Test
????public?void?test1()?{
????????MyQueue?myQueue?=?new?MyQueue();
????????Thread?producer1?=?new?Thread(?()->{
????????????for(int?i=0;?i<2;?i++)?{
????????????????int?random?=?RandomUtils.nextInt(1,100);
????????????????//info("準(zhǔn)備添加數(shù)據(jù):?"+random);
????????????????myQueue.add(?random?);
????????????????//info("數(shù)據(jù)添加結(jié)束");
????????????}
????????},?"生產(chǎn)者1"?);
????????Thread?producer2?=?new?Thread(?()->{
????????????for(int?i=0;?i<2;?i++)?{
????????????????int?random?=?RandomUtils.nextInt(1,100);
????????????????myQueue.add(?random?);
????????????}
????????},?"生產(chǎn)者2"?);
????????Thread?consumer1?=?new?Thread(?()->{
????????????while?(true)?{
????????????????int?result?=?myQueue.getEven();
????????????????info("result:?"?+?result);
????????????}
????????},?"奇數(shù)消費(fèi)者"?);
????????Thread?consumer2?=?new?Thread(?()->{
????????????while?(true)?{
????????????????int?result?=?myQueue.getOdd();
????????????????info("result:?"?+?result);
????????????}
????????},?"偶數(shù)消費(fèi)者"?);
????????producer1.start();
????????producer2.start();
????????consumer1.start();
????????consumer2.start();
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?60*1000?);?}?catch?(Exception?e)?{}
????????System.out.println("\n----------------------?系統(tǒng)下線?----------------------");
????}
????/**
?????*?打印信息
?????*?@param?msg
?????*/
????public?static?void?info(String?msg)?{
????????String?time?=?formatter.format(LocalTime.now());
????????String?thread?=?Thread.currentThread().getName();
????????String?log?=?"["+time+"]?"+?"?<"+?thread?+">?"?+?msg;
????????System.out.println(log);
????}
}
class?MyQueue?{
????private?ReentrantLock?reentrantLock?=?new?ReentrantLock();
????/**
?????*?條件:?隊(duì)列不為滿,?用于生產(chǎn)者可以向隊(duì)尾添加元素
?????*/
????private?Condition?nonfullCondition?=?reentrantLock.newCondition();
????/**
?????*?條件:?隊(duì)頭元素為奇數(shù),?用于消費(fèi)者可以從隊(duì)頭獲取奇數(shù)
?????*/
????private?Condition?evenNumCondition?=?reentrantLock.newCondition();
????/**
?????*?條件:?隊(duì)頭元素為偶數(shù),?用于消費(fèi)者可以從隊(duì)頭獲取偶數(shù)
?????*/
????private?Condition?oddNumCondition?=?reentrantLock.newCondition();
????private?Queue?queue?=?new?LinkedList<>();
????/**
?????*?隊(duì)列最大容量
?????*/
????private?Integer?maxSize?=?2;
????/**
?????*?隊(duì)列最小容量
?????*/
????private?Integer?minSize?=?0;
????/**
?????*?向隊(duì)列(尾部)添加數(shù)據(jù)
?????*?@param?element
?????*/
????public?void?add(Integer?element)?{
????????Integer?head?=?null;
????????reentrantLock.lock();
????????ReentrantLockTest2.info("獲取到鎖");
????????try{
????????????while?(?queue.size()?>=?maxSize?)?{
????????????????ReentrantLockTest2.info("隊(duì)列已滿無法添加,?被阻塞,?釋放鎖");
????????????????//?隊(duì)列為滿,?進(jìn)入Wait狀態(tài)并釋放鎖
????????????????nonfullCondition.await();
????????????}
????????????//?生產(chǎn),添加元素到隊(duì)尾
????????????queue.offer(element);
????????????//?查看隊(duì)頭元素
????????????head?=?queue.peek();
????????????Boolean?isEvenNumber?=?head%2==1;
????????????if(?isEvenNumber?)?{
????????????????//?喚醒奇數(shù)消費(fèi)者
????????????????evenNumCondition.signal();
????????????}?else?{
????????????????//?喚醒偶數(shù)消費(fèi)者
????????????????oddNumCondition.signal();
????????????}
????????}?catch?(Exception?e)?{
????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
????????}?finally?{
????????????ReentrantLockTest2.info("添加操作結(jié)束,?釋放鎖,?Head:?"?+head?+",?Add:?"+element?+?"?,?Queue:?"?+?queue);
????????????reentrantLock.unlock();
????????}
????}
????/**
?????*?獲取奇數(shù)
?????*?@return
?????*/
????public?Integer?getEven()?{
????????Integer?element?=?null;
????????reentrantLock.lock();
????????ReentrantLockTest2.info("獲取到鎖");
????????try{
????????????while(?!isEven()?)?{
????????????????ReentrantLockTest2.info("隊(duì)頭元素非奇數(shù),?被阻塞,?釋放鎖");
????????????????//?隊(duì)頭非奇數(shù),?進(jìn)入Wait狀態(tài)并釋放鎖
????????????????evenNumCondition.await();
????????????}
????????????element?=?queue.poll();
????????????//?喚醒所有生產(chǎn)者
????????????nonfullCondition.signalAll();
????????}?catch?(Exception?e)?{
????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
????????}?finally?{
????????????ReentrantLockTest2.info("獲取奇數(shù)操作結(jié)束,?釋放鎖,?Element:?"+element+",?Queue:?"+queue);
????????????reentrantLock.unlock();
????????}
????????return?element;
????}
????/**
?????*?獲取偶數(shù)
?????*?@return
?????*/
????public?Integer?getOdd()?{
????????Integer?element?=?null;
????????reentrantLock.lock();
????????ReentrantLockTest2.info("獲取到鎖");
????????try{
????????????while(?!isOdd()?)?{
????????????????ReentrantLockTest2.info("隊(duì)頭元素非偶數(shù),?被阻塞,?釋放鎖");
????????????????//?隊(duì)頭非偶數(shù),?進(jìn)入Wait狀態(tài)并釋放鎖
????????????????oddNumCondition.await();
????????????}
????????????element?=?queue.poll();
????????????//?喚醒所有生產(chǎn)者
????????????nonfullCondition.signalAll();
????????}?catch?(Exception?e)?{
????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
????????}?finally?{
????????????ReentrantLockTest2.info("獲取偶數(shù)操作結(jié)束,?釋放鎖,?Element:?"+element+",?Queue:?"+queue);
????????????reentrantLock.unlock();
????????}
????????return?element;
????}
????/**
?????*?判斷隊(duì)頭元素是否為奇數(shù)
?????*?@return
?????*/
????private?boolean?isEven()?{
????????if(?queue.size()?<=?minSize?)?{
????????????return?false;
????????}
????????Integer?num?=?queue.peek();
????????if(num%2?==?1)?{
????????????return?true;
????????}?else?{
????????????return?false;
????????}
????}
????/**
?????*?判斷隊(duì)頭元素是否為偶數(shù)
?????*?@return
?????*/
????private?boolean?isOdd()?{
????????if(?queue.size()?<=?minSize?)?{
????????????return?false;
????????}
????????Integer?num?=?queue.peek();
????????if(num%2?==?0)?{
????????????return?true;
????????}?else?{
????????????return?false;
????????}
????}
}
測試結(jié)果如下所示,符合預(yù)期

實(shí)現(xiàn)原理
構(gòu)造器
ReentrantLock可重入鎖的實(shí)現(xiàn)過程同樣依賴于AQS,具體地,其是對AQS中互斥鎖的使用。在構(gòu)建ReentrantLock實(shí)例過程中,其通過sync變量持有AQS的實(shí)現(xiàn)類Sync。進(jìn)一步地,按公平性與否可細(xì)分為NonfairSync、FairSync兩種實(shí)現(xiàn)方式。后面我們還會看到,其通過AQS的state字段來記錄當(dāng)前線程獲取鎖的次數(shù)。例如當(dāng)一個線程連續(xù)調(diào)用兩次lock方法,則state字段即為2
public?class?ReentrantLock?implements?Lock,?java.io.Serializable?{
????private?final?Sync?sync;
????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
????????...
????}
????static?final?class?NonfairSync?extends?Sync?{
????????...
????}
????static?final?class?FairSync?extends?Sync?{
????????...
????}
????public?ReentrantLock()?{
????????sync?=?new?NonfairSync();
????}
????public?ReentrantLock(boolean?fair)?{
????????sync?=?fair???new?FairSync()?:?new?NonfairSync();
????}
}
lock方法
對于lock()方法,我們先來看下其在非公平版本下的實(shí)現(xiàn)。其首先會調(diào)用NonfairSync類的lock()方法,在該方法中,由于是非公平性的實(shí)現(xiàn),故其會直接使用CAS嘗試獲取鎖。如果失敗,則進(jìn)一步調(diào)用AQS的acquire方法。tryAcquire方法的返回值決定了當(dāng)前線程是否需要進(jìn)入AQS阻塞隊(duì)列,如果返回true則說明當(dāng)前線程獲取鎖成功,直接結(jié)束;反之則說明該線程需要被放入AQS阻塞隊(duì)列當(dāng)中。可以看到NonfairSync類實(shí)現(xiàn)了tryAcquire方法,具體則是通過調(diào)用Sync的nonfairTryAcquire方法完成。可以看到nonfairTryAcquire方法中,當(dāng)前線程根據(jù)state是否為0、是否為鎖重入等場景進(jìn)行了加鎖嘗試,如果成功則直接返回true
public?class?ReentrantLock?implements?Lock,?java.io.Serializable?{
????public?void?lock()?{
????????sync.lock();
????}
????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
????????
????????abstract?void?lock();
????????final?boolean?nonfairTryAcquire(int?acquires)?{
????????????final?Thread?current?=?Thread.currentThread();
????????????int?c?=?getState();
????????????if?(c?==?0)?{
????????????????//?AQS的state為0,?說明沒有線程獲取鎖,?故這里直接通過CAS方式嘗試獲取鎖
????????????????if?(compareAndSetState(0,?acquires))?{
????????????????????//?獲取成功,?則設(shè)置鎖持有線程為當(dāng)前線程
????????????????????setExclusiveOwnerThread(current);
????????????????????return?true;
????????????????}
????????????}
????????????else?if?(current?==?getExclusiveOwnerThread())?{
????????????????//?當(dāng)前線程即為鎖持有線程,?即鎖重入
????????????????int?nextc?=?c?+?acquires;
????????????????if?(nextc?0)?//?overflow
????????????????????throw?new?Error("Maximum?lock?count?exceeded");
????????????????//?則更新其獲取鎖的次數(shù)
????????????????setState(nextc);
????????????????return?true;
????????????}
????????????return?false;
????????}
????}
????static?final?class?NonfairSync?extends?Sync?{
????????final?void?lock()?{
????????????//?直接使用CAS嘗試獲取鎖
????????????if?(compareAndSetState(0,?1))
????????????????//?獲取成功,?則設(shè)置鎖持有線程為當(dāng)前線程
????????????????setExclusiveOwnerThread(Thread.currentThread());
????????????else
????????????????//?獲取失敗,?則調(diào)用AQS的acquire方法
????????????????acquire(1);
????????}
????????//?非公平的嘗試獲取鎖
????????protected?final?boolean?tryAcquire(int?acquires)?{
????????????return?nonfairTryAcquire(acquires);
????????}
????}
}
...
public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
????public?final?void?acquire(int?arg)?{
????????if?(!tryAcquire(arg)?&&
????????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
????????????selfInterrupt();
????}
????//?需要子類去實(shí)現(xiàn)
????protected?boolean?tryAcquire(int?arg)?{
????????throw?new?UnsupportedOperationException();
????}
}
而在公平版本的lock()實(shí)現(xiàn)就比較簡單了。其首先調(diào)用FairSync類的lock方法,然后進(jìn)一步調(diào)用AQS的acquire方法。類似地,F(xiàn)airSync類實(shí)現(xiàn)了AQS的tryAcquire方法。值得一提的是為了保障公平性,其在通過CAS方式嘗試獲取鎖前,需要先調(diào)用hasQueuedPredecessors方法,該方法用于判斷AQS隊(duì)列中有無其他線程在排隊(duì)
public?class?ReentrantLock?implements?Lock,?java.io.Serializable?{
????public?void?lock()?{
????????sync.lock();
????}
????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
????????abstract?void?lock();
????}
????static?final?class?FairSync?extends?Sync?{
????????final?void?lock()?{
????????????acquire(1);
????????}
????????//?公平的嘗試獲取鎖
????????protected?final?boolean?tryAcquire(int?acquires)?{
????????????final?Thread?current?=?Thread.currentThread();
????????????int?c?=?getState();
????????????//?AQS的state為0,?說明沒有線程獲取鎖
????????????if?(c?==?0)?{
????????????????//?AQS的阻塞隊(duì)列中如果沒有其他線程排隊(duì),?才會通過CAS方式嘗試獲取鎖
????????????????if?(!hasQueuedPredecessors()?&&
????????????????????compareAndSetState(0,?acquires))?{
????????????????????//?獲取成功,?則設(shè)置鎖持有線程為當(dāng)前線程
????????????????????setExclusiveOwnerThread(current);
????????????????????return?true;
????????????????}
????????????}
????????????else?if?(current?==?getExclusiveOwnerThread())?{
????????????????//?當(dāng)前線程即為鎖持有線程,?即鎖重入
????????????????int?nextc?=?c?+?acquires;
????????????????if?(nextc?0)
????????????????????throw?new?Error("Maximum?lock?count?exceeded");
????????????????//?則更新其獲取鎖的次數(shù)
????????????????setState(nextc);
????????????????return?true;
????????????}
????????????return?false;
????????}
????}
}
...
public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
????public?final?void?acquire(int?arg)?{
????????if?(!tryAcquire(arg)?&&
????????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
????????????selfInterrupt();
????}
????//?需要子類去實(shí)現(xiàn)
????protected?boolean?tryAcquire(int?arg)?{
????????throw?new?UnsupportedOperationException();
????}
}
unlock方法
對于unlock()方法來說,基本原理類似。其首先調(diào)用AQS的release方法,并進(jìn)一步調(diào)用tryRelease()方法。該方法子類進(jìn)行實(shí)現(xiàn),其返回值如果為true,則表示鎖已經(jīng)完全被釋放,需要將AQS阻塞隊(duì)列的線程喚醒。具體地,Sync類實(shí)現(xiàn)了tryRelease方法,其內(nèi)部邏輯很簡單,如果state字段減為0則返回true;反之,則返回false。因?yàn)镽eentrantLock是可重入鎖,線程可能需要調(diào)用多次unlock()方法才會將鎖完全釋放掉
public?class?ReentrantLock?implements?Lock,?java.io.Serializable?{
????public?void?unlock()?{
????????sync.release(1);
????}
????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
????????
????????protected?final?boolean?tryRelease(int?releases)?{
????????????int?c?=?getState()?-?releases;
????????????if?(Thread.currentThread()?!=?getExclusiveOwnerThread())
????????????????throw?new?IllegalMonitorStateException();
????????????boolean?free?=?false;
????????????//?AQS的state為0,?說明該線程持有的鎖完全釋放
????????????if?(c?==?0)?{
????????????????free?=?true;
????????????????setExclusiveOwnerThread(null);
????????????}
????????????setState(c);
????????????return?free;
????????}
????}
}
...
public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
????
????public?final?boolean?release(int?arg)?{
????????if?(tryRelease(arg))?{
????????????Node?h?=?head;
????????????if?(h?!=?null?&&?h.waitStatus?!=?0)
????????????????unparkSuccessor(h);
????????????return?true;
????????}
????????return?false;
????}
????//?需要子類去實(shí)現(xiàn)
????protected?boolean?tryRelease(int?arg)?{
????????throw?new?UnsupportedOperationException();
????}
}
Condition 條件變量
ReentrantLock可重入鎖特別地還提供了對Condition條件變量的支持。具體地,則是通過AQS的內(nèi)部類ConditionObject來實(shí)現(xiàn)的。每一個Condition實(shí)例都會關(guān)聯(lián)一個條件隊(duì)列,其是一個單向鏈表。ConditionObject中包含兩個Node類型的指針,分別用于指向條件隊(duì)列的隊(duì)頭、隊(duì)尾。而內(nèi)部類Node用于對線程進(jìn)行包裝,其nextWaiter字段在這里的用途是作為條件隊(duì)列中當(dāng)前Node節(jié)點(diǎn)指向后繼Node節(jié)點(diǎn)的指針
public?class?ReentrantLock?implements?Lock,?java.io.Serializable?{
????public?Condition?newCondition()?{
????????return?sync.newCondition();
????}
????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
????????final?ConditionObject?newCondition()?{
????????????return?new?ConditionObject();
????????}
????}
}
...
public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
????
????public?class?ConditionObject?implements?Condition,?java.io.Serializable?{
????????//?條件隊(duì)列的隊(duì)頭指針
????????private?transient?Node?firstWaiter;
????
????????//?條件隊(duì)列的隊(duì)尾指針
????????private?transient?Node?lastWaiter;
????????public?ConditionObject()?{}
????}????
????static?final?class?Node?{
????????Node?nextWaiter;
????}
}
這里以await()來介紹如何實(shí)現(xiàn)線程的掛起阻塞,前面提到Condition條件變量實(shí)例關(guān)聯(lián)了一個條件隊(duì)列。故通過addConditionWaiter方法將當(dāng)前線程包裝為Node實(shí)例添加到條件隊(duì)列的尾部。我們清楚線程調(diào)用await()方法必然是持有鎖的,故該線程在被阻塞掛起前,需要完全釋放掉其持有的鎖。故調(diào)用AQS的fullyRelease方法將state置為0。當(dāng)然還需要通過savedState來保存、記錄下線程此前持有鎖的次數(shù),以便線程被喚醒后可以正確地進(jìn)行加鎖。此時由于isOnSyncQueue方法返回false,故其進(jìn)行while循環(huán)。并進(jìn)一步地利用LockSupport.park()方法實(shí)現(xiàn)將當(dāng)前線程掛起阻塞。后續(xù)當(dāng)其他線程將該Node從條件隊(duì)列轉(zhuǎn)移到AQS阻塞隊(duì)列并喚醒后,由于isOnSyncQueue()將返回true,即會退出while循環(huán)
public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
????public?class?ConditionObject?implements?Condition,?java.io.Serializable?{
????????public?final?void?await()?throws?InterruptedException?{
????????????if?(Thread.interrupted())
????????????????throw?new?InterruptedException();
????????????//?將當(dāng)前線程包裝為Node后加入條件隊(duì)列的尾部
????????????Node?node?=?addConditionWaiter();
????????????//?獲取該線程加鎖次數(shù),?然后釋放當(dāng)前線程持有鎖
????????????int?savedState?=?fullyRelease(node);
????????????int?interruptMode?=?0;
????????????//?當(dāng)線程被喚醒后,?由于其已經(jīng)從條件隊(duì)列轉(zhuǎn)移到AQS阻塞隊(duì)列
????????????//?故isOnSyncQueue(node)將返回true,?即退出while循環(huán)
????????????while?(!isOnSyncQueue(node))?{
????????????????//?如果其不在AQS阻塞隊(duì)列中,?則利用park方法將其進(jìn)行阻塞
????????????????LockSupport.park(this);
????????????????if?((interruptMode?=?checkInterruptWhileWaiting(node))?!=?0)
????????????????????break;
????????????}
????????????//?被喚醒后將進(jìn)入AQS的阻塞隊(duì)列,等待獲取鎖?
????????????if?(acquireQueued(node,?savedState)?&&?interruptMode?!=?THROW_IE)
????????????????interruptMode?=?REINTERRUPT;
????????????if?(node.nextWaiter?!=?null)?//?clean?up?if?cancelled
????????????????unlinkCancelledWaiters();
????????????if?(interruptMode?!=?0)
????????????????reportInterruptAfterWait(interruptMode);
????????}
????}
????
????final?int?fullyRelease(Node?node)?{
????????boolean?failed?=?true;
????????try?{
????????????//?獲取state值,?即持有鎖線程的加鎖次數(shù)
????????????int?savedState?=?getState();
????????????//??完全釋放鎖,?使得state為0
????????????if?(release(savedState))?{
????????????????failed?=?false;
????????????????return?savedState;
????????????}?else?{
????????????????throw?new?IllegalMonitorStateException();
????????????}
????????}?finally?{
????????????if?(failed)
????????????????node.waitStatus?=?Node.CANCELLED;
????????}
????}
}
這里以signal()方法說明如何實(shí)現(xiàn)喚醒線程。其內(nèi)部是通過調(diào)用doSignal()來實(shí)現(xiàn)的。從條件隊(duì)列的頭部移出一個Node,并通過transferForSignal()方法將該Node從條件隊(duì)列轉(zhuǎn)移到AQS阻塞隊(duì)列并喚醒該Node。如果transferForSignal()方法成功則本次喚醒結(jié)束。如果失敗了則繼續(xù)從條件隊(duì)列中移出下一個Node并重復(fù)上述操作,直到條件隊(duì)列為空為止
public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
????
????public?class?ConditionObject?implements?Condition,?java.io.Serializable?{
????????
????????public?final?void?signal()?{
????????????//?保證調(diào)用signal()方法的線程必須是當(dāng)前鎖的持有者
????????????if?(!isHeldExclusively())
????????????????throw?new?IllegalMonitorStateException();
????????????Node?first?=?firstWaiter;
????????????//?條件隊(duì)列不為空
????????????if?(first?!=?null)
????????????????doSignal(first);
????????}
????????private?void?doSignal(Node?first)?{
????????????do?{
????????????????if?(?(firstWaiter?=?first.nextWaiter)?==?null)
????????????????????lastWaiter?=?null;
????????????????first.nextWaiter?=?null;
????????????}?while?(!transferForSignal(first)?&&
?????????????????????(first?=?firstWaiter)?!=?null);
????????}
????}
}
參考文獻(xiàn)
Java并發(fā)編程之美 翟陸續(xù)、薛賓田著
