AQS解析與實(shí)戰(zhàn)
前言
前段時(shí)間在面試,發(fā)現(xiàn)面試官都有問(wèn)到同步器AQS的相關(guān)問(wèn)題。AQS為Java中幾乎所有的鎖和同步器提供一個(gè)基礎(chǔ)框架,派生出如ReentrantLock、Semaphore、CountDownLatch等AQS全家桶。本文基于AQS原理的幾個(gè)核心點(diǎn),談?wù)剬?duì)AbstractQueuedSynchronizer的理解,并實(shí)現(xiàn)一個(gè)自定義同步器。
AQS原理面試題的核心回答要點(diǎn)
state 狀態(tài)的維護(hù)。
CLH隊(duì)列
ConditionObject通知
模板方法設(shè)計(jì)模式
獨(dú)占與共享模式。
自定義同步器。
AQS全家桶的一些延伸,如:ReentrantLock等。
AQS的類圖結(jié)構(gòu)
AQS全稱是AbstractQueuedSynchronizer,即抽象同步隊(duì)列。下面看一下AQS的類圖結(jié)構(gòu):

為了方便下面幾個(gè)關(guān)鍵點(diǎn)的理解,大家先熟悉一下AQS的類圖結(jié)構(gòu)。
state 狀態(tài)的維護(hù)
在AQS中維持了一個(gè)單一的共享狀態(tài)state,來(lái)實(shí)現(xiàn)同步器同步??匆幌聅tate的相關(guān)代碼如下:state源碼
/*** The synchronization state.*/private volatile int state;/*** Returns the current value of synchronization state.* This operation has memory semantics of a {@code volatile} read.* @return current state value*/protected final int getState() {return state;}/*** Sets the value of synchronization state.* This operation has memory semantics of a {@code volatile} write.* @param newState the new state value*/protected final void setState(int newState) {state = newState;}/*** Atomically sets synchronization state to the given updated* value if the current state value equals the expected value.* This operation has memory semantics of a {@code volatile} read* and write.** @param expect the expected value* @param update the new value* @return {@code true} if successful. False return indicates that the actual* value was not equal to the expected value.*/protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
state 源碼設(shè)計(jì)幾個(gè)回答要點(diǎn):
state用volatile修飾,保證多線程中的可見(jiàn)性。
getState()和setState()方法采用final修飾,限制AQS的子類重寫(xiě)它們兩。
compareAndSetState()方法采用樂(lè)觀鎖思想的CAS算法,也是采用final修飾的,不允許子類重寫(xiě)。
CLH隊(duì)列
談到CLH隊(duì)列,我們結(jié)合以上state狀態(tài),先來(lái)看一下AQS原理圖:

CLH(Craig, Landin, and Hagersten locks) 同步隊(duì)列 是一個(gè)FIFO雙向隊(duì)列,其內(nèi)部通過(guò)節(jié)點(diǎn)head和tail記錄隊(duì)首和隊(duì)尾元素,隊(duì)列元素的類型為Node。AQS依賴它來(lái)完成同步狀態(tài)state的管理,當(dāng)前線程如果獲取同步狀態(tài)失敗時(shí),AQS則會(huì)將當(dāng)前線程已經(jīng)等待狀態(tài)等信息構(gòu)造成一個(gè)節(jié)點(diǎn)(Node)并將其加入到CLH同步隊(duì)列,同時(shí)會(huì)阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時(shí),會(huì)把首節(jié)點(diǎn)喚醒(公平鎖),使其再次嘗試獲取同步狀態(tài)。
Node節(jié)點(diǎn)
CLH同步隊(duì)列中,一個(gè)節(jié)點(diǎn)表示一個(gè)線程,它保存著線程的引用(thread)、狀態(tài)(waitStatus)、前驅(qū)節(jié)點(diǎn)(prev)、后繼節(jié)點(diǎn)(next),condition隊(duì)列的后續(xù)節(jié)點(diǎn)(nextWaiter)如下圖:

waitStatus幾種狀態(tài)狀態(tài):

我們?cè)倏匆幌翪LH隊(duì)列入列以及出列的代碼:
入列
CLH隊(duì)列入列就是tail指向新節(jié)點(diǎn)、新節(jié)點(diǎn)的prev指向當(dāng)前最后的節(jié)點(diǎn),當(dāng)前最后一個(gè)節(jié)點(diǎn)的next指向當(dāng)前節(jié)點(diǎn)。addWaiter方法如下:
//構(gòu)造Nodeprivate Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure(快速嘗試添加尾節(jié)點(diǎn))Node pred = tail;if (pred != null) {node.prev = pred;//CAS設(shè)置尾節(jié)點(diǎn)if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//多次嘗試enq(node);return node;}
由以上代碼可得,addWaiter設(shè)置尾節(jié)點(diǎn)失敗的話,調(diào)用enq(Node node)方法設(shè)置尾節(jié)點(diǎn),enq方法如下:
private Node enq(final Node node) {//死循環(huán)嘗試,知道成功為止for (;;) {Node t = tail;//tail 不存在,設(shè)置為首節(jié)點(diǎn)if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
出列
首節(jié)點(diǎn)的線程釋放同步狀態(tài)后,將會(huì)喚醒它的后繼節(jié)點(diǎn)(next),而后繼節(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置為首節(jié)點(diǎn)??梢钥匆幌乱韵聝啥卧创a:
Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
CLH核心幾個(gè)回答要點(diǎn)
雙向鏈表入列出列
CAS算法設(shè)置尾節(jié)點(diǎn)+死循環(huán)自旋。
ConditionObject
ConditionObject簡(jiǎn)介
我們都知道,synchronized控制同步的時(shí)候,可以配合Object的wait()、notify(),notifyAll() 系列方法可以實(shí)現(xiàn)等待/通知模式。而Lock呢?它提供了條件Condition接口,配合await(),signal(),signalAll() 等方法也可以實(shí)現(xiàn)等待/通知機(jī)制。ConditionObject實(shí)現(xiàn)了Condition接口,給AQS提供條件變量的支持 。
Condition隊(duì)列與CLH隊(duì)列的那些事
我們先來(lái)看一下圖:

ConditionObject隊(duì)列與CLH隊(duì)列的愛(ài)恨情仇:
調(diào)用了await()方法的線程,會(huì)被加入到conditionObject等待隊(duì)列中,并且喚醒CLH隊(duì)列中head節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)。
線程在某個(gè)ConditionObject對(duì)象上調(diào)用了singnal()方法后,等待隊(duì)列中的firstWaiter會(huì)被加入到AQS的CLH隊(duì)列中,等待被喚醒。
當(dāng)線程調(diào)用unLock()方法釋放鎖時(shí),CLH隊(duì)列中的head節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)(在本例中是firtWaiter),會(huì)被喚醒。
區(qū)別:
ConditionObject對(duì)象都維護(hù)了一個(gè)單獨(dú)的等待隊(duì)列 ,AQS所維護(hù)的CLH隊(duì)列是同步隊(duì)列,它們節(jié)點(diǎn)類型相同,都是Node。
獨(dú)占與共享模式。
AQS支持兩種同步模式:獨(dú)占式和共享式。
獨(dú)占式
同一時(shí)刻僅有一個(gè)線程持有同步狀態(tài),如ReentrantLock。又可分為公平鎖和非公平鎖。
公平鎖: 按照線程在隊(duì)列中的排隊(duì)順序,有禮貌的,先到者先拿到鎖。
非公平鎖: 當(dāng)線程要獲取鎖時(shí),無(wú)視隊(duì)列順序直接去搶鎖,不講道理的,誰(shuí)搶到就是誰(shuí)的。
acquire(int arg)是獨(dú)占式獲取同步狀態(tài)的方法,我們來(lái)看一下源碼:
acquire(long arg)方法
public final void acquire(long arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
addWaiter方法
//構(gòu)造Nodeprivate Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure(快速嘗試添加尾節(jié)點(diǎn))Node pred = tail;if (pred != null) {node.prev = pred;//CAS設(shè)置尾節(jié)點(diǎn)if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//多次嘗試enq(node);return node;}
acquireQueued(final Node node, long arg)方法
final boolean acquireQueued(final Node node, long arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
selfInterrupt()方法
static void selfInterrupt() {Thread.currentThread().interrupt();}
結(jié)合源代碼,可得acquire(int arg)方法流程圖,如下:

共享式
多個(gè)線程可同時(shí)執(zhí)行,如Semaphore/CountDownLatch等都是共享式的產(chǎn)物。
acquireShared(long arg)是共享式獲取同步狀態(tài)的方法,可以看一下源碼:
public final void acquireShared(long arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}
由上可得,先調(diào)用tryAcquireShared(int arg)方法嘗試獲取同步狀態(tài),如果獲取失敗,調(diào)用doAcquireShared(int arg)自旋方式獲取同步狀態(tài),方法源碼如下:
private void doAcquireShared(long arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {long r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
AQS的模板方法設(shè)計(jì)模式
模板方法模式
模板方法模式:在一個(gè)方法中定義一個(gè)算法的骨架,而將一些步驟延遲到子類中。模板方法使得子類可以在不改變算法結(jié)構(gòu)的情況下,重新定義算法中的某些步驟。
模板方法模式生活中的例子:假設(shè)我們要去北京旅游,那么我們可以坐高鐵或者飛機(jī),或者火車(chē),那么定義交通方式的抽象類,可以有以下模板:買(mǎi)票->安檢->乘坐xx交通工具->到達(dá)北京。讓子類繼承該抽象類,實(shí)現(xiàn)對(duì)應(yīng)的模板方法。

AQS定義的一些模板方法如下:
isHeldExclusively()//該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
tryAcquire(int)//獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease(int)//獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。tryAcquireShared(int)//共享方式。嘗試獲取資源。負(fù)數(shù)表示失?。?表示成功,但沒(méi)有剩余可用資源;正數(shù)表示成功,且有剩余資源。
tryReleaseShared(int)//共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。
簡(jiǎn)言之,就是AQS提供tryAcquire,tryAcquireShared等模板方法,給子類實(shí)現(xiàn)自定義的同步器。
自定義同步器。
基于以上分析,我們都知道state,CLH隊(duì)列,ConditionObject隊(duì)列 等這些關(guān)鍵點(diǎn),你要實(shí)現(xiàn)自定義鎖的話,首先需要確定你要實(shí)現(xiàn)的是獨(dú)占鎖還是共享鎖,定義原子變量state的含義,再定義一個(gè)內(nèi)部類去繼承AQS,重寫(xiě)對(duì)應(yīng)的模板方法。
我們來(lái)看一下基于 AQS 實(shí)現(xiàn)的不可重入的獨(dú)占鎖的demo,來(lái)自《Java并發(fā)編程之美》:
public class NonReentrantLock implements Lock,Serializable{//內(nèi)部類,自定義同步器static class Sync extends AbstractQueuedSynchronizer {//是否鎖已經(jīng)被持有public boolean isHeldExclusively() {return getState() == 1;}//如果state為0 則嘗試獲取鎖public boolean tryAcquire(int arg) {assert arg== 1 ;//CAS設(shè)置狀態(tài),能保證操作的原子性,當(dāng)前為狀態(tài)為0,操作成功狀態(tài)改為1if(compareAndSetState(0, 1)){//設(shè)置當(dāng)前獨(dú)占的線程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//嘗試釋放鎖,設(shè)置state為0public boolean tryRelease(int arg) {assert arg ==1;//如果同步器同步器狀態(tài)等于0,則拋出監(jiān)視器非法狀態(tài)異常if(getState() == 0)throw new IllegalMonitorStateException();//設(shè)置獨(dú)占鎖的線程為nullsetExclusiveOwnerThread(null);//設(shè)置同步狀態(tài)為0setState(0);return true;}//返回Condition,每個(gè)Condition都包含了一個(gè)Condition隊(duì)列Condition newCondition(){return new ConditionObject();}}//創(chuàng)建一個(gè)Sync來(lái)做具體的工作private final Sync sync= new Sync ();@Overridepublic void lock() {sync.acquire(1);}public boolean isLocked() {return sync.isHeldExclusively();}@Overridepublic void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}@Overridepublic boolean tryLock() {return sync.tryAcquire(1);}@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}@Overridepublic void unlock() {sync.release(1);}@Overridepublic Condition newCondition() {return sync.newCondition();}}
NonReentrantLockDemoTest:
public class NonReentrantLockDemoTest {private static NonReentrantLock nonReentrantLock = new NonReentrantLock();public static void main(String[] args) {for (int i = 0; i < 10; i++) {Thread thread = new Thread(() -> {nonReentrantLock.lock();try {System.out.println(Thread.currentThread().getName());Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();} finally {nonReentrantLock.unlock();}});thread.start();}}}
運(yùn)行結(jié)果:

AQS全家桶實(shí)戰(zhàn)
AQS派生出如ReentrantLock、Semaphore等AQS全家桶,接下來(lái)可以看一下它們的使用案例。
ReentrantLock
ReentrantLock介紹
ReentrantLock為重入鎖,能夠?qū)蚕碣Y源能夠重復(fù)加鎖,是實(shí)現(xiàn)Lock接口的一個(gè)類。
ReentrantLock支持公平鎖和非公平鎖兩種方式
ReentrantLock案例
使用ReentrantLock來(lái)實(shí)現(xiàn)個(gè)簡(jiǎn)單線程安全的list,如下:
public class ReentrantLockList {// 線程不安全的listprivate ArrayList<String> array = new ArrayList<>();//獨(dú)占鎖private volatile ReentrantLock lock = new ReentrantLock();//添加元素public void add(String e){lock.lock();try {array.add(e);}finally {lock.unlock();}}//刪除元素public void remove(String e){lock.lock();try {array.remove(e);}finally {lock.unlock();}}//獲取元素public String get(int index){lock.lock();try {return array.get(index);}finally {lock.unlock();}}}
Semaphore
Semaphore介紹
Semaphore也叫信號(hào)量,可以用來(lái)控制資源并發(fā)訪問(wèn)的線程數(shù)量,通過(guò)協(xié)調(diào)各個(gè)線程,以保證合理的使用資源。
Semaphore案例
Java多線程有一到比較經(jīng)典的面試題:ABC三個(gè)線程順序輸出,循環(huán)10遍。
public class ABCSemaphore {private static Semaphore A = new Semaphore(1);private static Semaphore B = new Semaphore(1);private static Semaphore C = new Semaphore(1);static class ThreadA extends Thread {@Overridepublic void run() {try {for (int i = 0; i < 10; i++) {A.acquire();System.out.print("A");B.release();}} catch (InterruptedException e) {e.printStackTrace();}}}static class ThreadB extends Thread {@Overridepublic void run() {try {for (int i = 0; i < 10; i++) {B.acquire();System.out.print("B");C.release();}} catch (InterruptedException e) {e.printStackTrace();}}}static class ThreadC extends Thread {@Overridepublic void run() {try {for (int i = 0; i < 10; i++) {C.acquire();System.out.print("C");A.release();}} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) throws InterruptedException {// 開(kāi)始只有A可以獲取, BC都不可以獲取, 保證了A最先執(zhí)行B.acquire();C.acquire();new ThreadA().start();new ThreadB().start();new ThreadC().start();}

騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)
面試:史上最全多線程面試題 !
最新阿里內(nèi)推Java后端面試題
JVM難學(xué)?那是因?yàn)槟銢](méi)認(rèn)真看完這篇文章

關(guān)注作者微信公眾號(hào) —《JAVA爛豬皮》
了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力
