由淺入深逐步講解Java并發(fā)的半壁江山AQS

1、JUC的由來
synchronized 關(guān)鍵字是JDK官方人員用C++代碼寫的,在JDK6以前是重量級鎖。Java大牛 Doug Lea對 synchronized 在并發(fā)編程條件下的性能表現(xiàn)不滿意就自己寫了個JUC,以此來提升并發(fā)性能,本文要講的就是JUC并發(fā)包下的AbstractQueuedSynchronizer。
在JUC中 CountDownLatch、ReentrantLock、ThreadPoolExecutor、ReentrantReadWriteLock 等底層用的都是AQS,AQS幾乎占據(jù)了JUC并發(fā)包里的半壁江山,如果想要獲取鎖可以被中斷、超時獲取鎖、嘗試獲取鎖那就用AQS吧。
DougLea杰作: HashMap、JUC、ConcurrentHashMap等。
溫馨提醒:
涉及到AQS重要方法、lock、unlock、CountDownLatch、await、signal幾個重要組件的底層講解所以內(nèi)容有點(diǎn)長,嫌啰嗦的可直接看幾個流程圖即可(原圖公眾號回復(fù)lock即可)。
2、AQS前置知識點(diǎn)
2.1、模板方法
AbstractQueuedSynchronizer是個抽象類,所有用到方法的類都要繼承此類的若干方法,對應(yīng)的設(shè)計模式就是模版模式。
模版模式定義:一個抽象類公開定義了執(zhí)行它的方法的方式/模板。它的子類可以按需要重寫方法實(shí)現(xiàn),但調(diào)用將以抽象類中定義的方式進(jìn)行。這種類型的設(shè)計模式屬于行為型模式。
抽象類:
public abstract class SendCustom {
public abstract void to();
public abstract void from();
public void date() {
System.out.println(new Date());
}
public abstract void send();
// 注意此處 框架方法-模板方法
public void sendMessage() {
to();
from();
date();
send();
}
}
模板方法派生類:
public class SendSms extends SendCustom {
@Override
public void to() {
System.out.println("sowhat");
}
@Override
public void from() {
System.out.println("xiaomai");
}
@Override
public void send() {
System.out.println("Send message");
}
public static void main(String[] args) {
SendCustom sendC = new SendSms();
sendC.sendMessage();
}
}
2.2、LookSupport
LockSupport 是一個線程阻塞工具類,所有的方法都是靜態(tài)方法,可以讓線程在任意位置阻塞,當(dāng)然阻塞之后肯定得有喚醒的方法。常用方法如下:
public static void park(Object blocker); // 暫停當(dāng)前線程
public static void parkNanos(Object blocker, long nanos); // 暫停當(dāng)前線程,不過有超時時間的限制
public static void parkUntil(Object blocker, long deadline); // 暫停當(dāng)前線程,直到某個時間
public static void park(); // 無期限暫停當(dāng)前線程
public static void parkNanos(long nanos); // 暫停當(dāng)前線程,不過有超時時間的限制
public static void parkUntil(long deadline); // 暫停當(dāng)前線程,直到某個時間
public static void unpark(Thread thread); // 恢復(fù)當(dāng)前線程
public static Object getBlocker(Thread t);
叫park是因?yàn)?strong style="line-height: 1.75em;">park英文意思為停車。我們?nèi)绻?strong style="line-height: 1.75em;">Thread看成一輛車的話,park就是讓車停下,unpark就是讓車啟動然后跑起來。
與Object類的wait/notify機(jī)制相比,park/unpark有兩個優(yōu)點(diǎn):
以thread為操作對象更符合阻塞線程的直觀定義 操作更精準(zhǔn),可以準(zhǔn)確地喚醒某一個線程(notify隨機(jī)喚醒一個線程,notifyAll 喚醒所有等待的線程),增加了靈活性。
park/unpark調(diào)用的是 Unsafe(提供CAS操作) 中的 native代碼。
park/unpark 功能在Linux系統(tǒng)下是用的Posix線程庫pthread中的mutex(互斥量),condition(條件變量)來實(shí)現(xiàn)的。mutex和condition保護(hù)了一個 _counter 的變量,當(dāng) park 時,這個變量被設(shè)置為0。當(dāng)unpark時,這個變量被設(shè)置為1。
2.3、CAS
CAS 是 CPU指令級別實(shí)現(xiàn)了原子性的比較和交換(Conmpare And Swap)操作,注意CAS不是鎖只是CPU提供的一個原子性操作指令。
CAS在語言層面不進(jìn)行任何處理,直接將原則操作實(shí)現(xiàn)在硬件級別實(shí)現(xiàn),之所以可以實(shí)現(xiàn)硬件級別的操作核心是因?yàn)镃AS操作類中有個核心類UnSafe類。
關(guān)于CAS引發(fā)的ABA問題、性能開銷問題、只能保證一個共享變量之間的原則性操作問題,以前CAS中寫過,在此不再重復(fù)講解。
注意:并不是說 CAS 一定比SYN好,如果高并發(fā)執(zhí)行時間久 ,用SYN好, 因?yàn)镾YN底層用了wait() 阻塞后是不消耗CPU資源的。如果鎖競爭不激烈說明自旋不嚴(yán)重,此時用CAS。
3、AQS重要方法
模版方法分為獨(dú)占式跟共享式,子類根據(jù)需要不同調(diào)用不同的模版方法(講解有點(diǎn)多,想看底層可直接下滑到第四章節(jié))。
3.1 模板方法
3.1.1 獨(dú)占式獲取
3.1.1.1 accquire
不可中斷獲取鎖accquire是獲取獨(dú)占鎖方法,acquire嘗試獲取資源,成功則直接返回,不成功則進(jìn)入等待隊列,這個過程不會被線程中斷,被外部中斷也不響應(yīng),獲取資源后才再進(jìn)行自我中斷selfInterrupt()。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
-
acquire(arg) tryAcquire(arg) 顧名思義,它就是嘗試獲取鎖,需要我們自己實(shí)現(xiàn)具體細(xì)節(jié),一般要求是:
如果該鎖沒有被另一個線程保持,則獲取該鎖并立即返回,將鎖的保持計數(shù)設(shè)置為 1。
如果當(dāng)前線程已經(jīng)保持該鎖,則將保持計數(shù)加 1,并且該方法立即返回。
如果該鎖被另一個線程保持,則出于線程調(diào)度的目的,禁用當(dāng)前線程,并且在獲得鎖之前,該線程將一直處于休眠狀態(tài),此時鎖保持計數(shù)被設(shè)置為 1。
-
addWaiter(Node.EXCLUSIVE)
主要功能是 一旦嘗試獲取鎖未成功,就要使用該方法將其加入同步隊列尾部,由于可能有多個線程并發(fā)加入隊尾產(chǎn)生競爭,因此采用compareAndSetTail鎖方法來保證同步
-
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
一旦加入同步隊列,就需要使用該方法,自旋阻塞 喚醒來不斷的嘗試獲取鎖,直到被中斷或獲取到鎖。
3.1.1.2 acquireInterruptibly
可中斷獲取鎖acquireInterruptibly相比于acquire支持響應(yīng)中斷。
1、如果當(dāng)前線程未被中斷,則嘗試獲取鎖。
2、如果鎖空閑則獲鎖并立即返回,state = 1。
3、如果當(dāng)前線程已持此鎖,state + 1,并且該方法立即返回。
4、如果鎖被另一個線程保持,出于線程調(diào)度目的,禁用當(dāng)前線程,線程休眠ing,除非鎖由當(dāng)前線程獲得或者當(dāng)前線程被中斷了,中斷后會拋出InterruptedException,并且清除當(dāng)前線程的已中斷狀態(tài)。
5、此方法是一個顯式中斷點(diǎn),所以要優(yōu)先考慮響應(yīng)中斷。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException(); // acquireInterruptibly 選擇
interrupted = true; // acquire 的選擇
3.1.1.3 tryAcquireNanos
該方法可以被中斷,增加了超時則失敗的功能??梢哉f該方法的實(shí)現(xiàn)與上述兩方法沒有任何區(qū)別。時間功能上就是用的標(biāo)準(zhǔn)超時功能,如果剩余時間小于0那么acquire失敗,如果該時間大于一次自旋鎖時間(spinForTimeoutThreshold = 1000),并且可以被阻塞,那么調(diào)用LockSupport.parkNanos方法阻塞線程。
doAcquireNanos內(nèi)部:
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
該方法一般會有以下幾種情況產(chǎn)生:
在指定時間內(nèi),線程獲取到鎖,返回true。 當(dāng)前線程在超時時間內(nèi)被中斷,拋中斷異常后,線程退出。 到截止時間后線程仍未獲取到鎖,此時線程獲得鎖失敗,不再等待直接返回false。
3.1.2 共享式獲取
3.1.2.1 acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
該模版方法的工作:
-
調(diào)用 tryAcquireShared(arg)嘗試獲得資源,返回值代表如下含義:
負(fù)數(shù)表示失敗。
0 表示成功,但沒有剩余可用資源。
正數(shù)表示成功,且有剩余資源。
doAcquireShared作用:
創(chuàng)建節(jié)點(diǎn)然后加入到隊列中去,這一塊和獨(dú)占模式下的 addWaiter 代碼差不多,不同的是結(jié)點(diǎn)的模式是SHARED,在獨(dú)占模式 EXCLUSIVE。
3.1.2.2 acquireSharedInterruptibly
無非就是可中斷性的共享方法
public final void acquireSharedInterruptibly(long arg) throws InterruptedException {
if (Thread.interrupted()) // 如果線程被中斷,則拋出異常
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
// 如果tryAcquireShared()方法獲取失敗,則調(diào)用如下的方法
doAcquireSharedInterruptibly(arg);
}
3.1.2.3. tryAcquireSharedNanos
嘗試以共享模式獲取,如果被中斷則中止,如果超過給定超時期則失敗。實(shí)現(xiàn)此方法首先要檢查中斷狀態(tài),然后至少調(diào)用一次 tryacquireshared(long),并在成功時返回。否則,在成功、線程中斷或超過超時期之前,線程將加入隊列,可能反復(fù)處于阻塞或未阻塞狀態(tài),并一直調(diào)用 tryacquireshared(long)。
public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
3.1.3 獨(dú)占式釋放
獨(dú)占鎖的釋放調(diào)用unlock方法,而該方法實(shí)際調(diào)用了AQS的release方法,這段代碼邏輯比較簡單,如果同步狀態(tài)釋放成功(tryRelease返回true)則會執(zhí)行if塊中的代碼,當(dāng)head指向的頭結(jié)點(diǎn)不為null,并且該節(jié)點(diǎn)的狀態(tài)值不為0的話才會執(zhí)行unparkSuccessor()方法。
public final boolean release(long arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
3.1.4 共享式釋放
releaseShared首先去嘗試釋放資源tryReleaseShared(arg),如果釋放成功了,就代表有資源空閑出來,那么就用doReleaseShared()去喚醒后續(xù)結(jié)點(diǎn)。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
比如CountDownLatch的countDown()具體實(shí)現(xiàn):
public void countDown() {
sync.releaseShared(1);
}
3.2 子類需實(shí)現(xiàn)方法
子類要實(shí)現(xiàn)父類方法也分為獨(dú)占式跟共享式。
3.2.1 獨(dú)占式獲取
tryAcquire 顧名思義,就是嘗試獲取鎖,AQS在這里沒有對其進(jìn)行功能的實(shí)現(xiàn),只有一個拋出異常的語句,我們需要自己對其進(jìn)行實(shí)現(xiàn),可以對其重寫實(shí)現(xiàn)公平鎖、不公平鎖、可重入鎖、不可重入鎖
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
3.2.2 獨(dú)占式釋放
tryRelease 嘗試釋放 獨(dú)占鎖,需要子類實(shí)現(xiàn)。
protected boolean tryRelease(long arg) {
throw new UnsupportedOperationException();
}
3.2.3 共享式獲取
tryAcquireShared 嘗試進(jìn)行共享鎖的獲得,需要子類實(shí)現(xiàn)。
protected long tryAcquireShared(long arg) {
throw new UnsupportedOperationException();
}
3.2.4 共享式釋放
tryReleaseShared嘗試進(jìn)行共享鎖的釋放,需要子類實(shí)現(xiàn)。
protected boolean tryReleaseShared(long arg) {
throw new UnsupportedOperationException();
}
3.3 狀態(tài)標(biāo)志位
state因?yàn)橛?volatile修飾 保證了我們操作的可見性,所以任何線程通過getState()獲得狀態(tài)都是可以得到最新值,但是setState()無法保證原子性,因此AQS給我們提供了compareAndSetState方法利用底層UnSafe的CAS功能來實(shí)現(xiàn)原子性。
private volatile long state;
protected final long getState() {
return state;
}
protected final void setState(long newState) {
state = newState;
}
protected final boolean compareAndSetState(long expect, long update) {
return unsafe.compareAndSwapLong(this, stateOffset, expect, update);
}
3.4 查詢是否獨(dú)占模式
isHeldExclusively 該函數(shù)的功能是查詢當(dāng)前的工作模式是否是獨(dú)占模式。需要子類實(shí)現(xiàn)。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
3.5 自定義實(shí)現(xiàn)鎖
這里需要重點(diǎn)說明一點(diǎn),JUC中一般是用一個子類繼承自Lock,然后在子類中定義一個內(nèi)部類來實(shí)現(xiàn)AQS的繼承跟使用。
public class SowhatLock implements Lock
{
private Sync sync = new Sync();
@Override
public void lock()
{
sync.acquire(1);
}
@Override
public boolean tryLock()
{
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
{
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock()
{
sync.release(1);
}
@Override
public Condition newCondition()
{
return sync.newCondition();
}
@Override
public void lockInterruptibly() throws InterruptedException
{
}
private class Sync extends AbstractQueuedSynchronizer
{
@Override
protected boolean tryAcquire(int arg)
{
assert arg == 1;
if (compareAndSetState(0, 1))
{
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg)
{
assert arg == 1;
if (!isHeldExclusively())
{
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively()
{
return getExclusiveOwnerThread() == Thread.currentThread();
}
Condition newCondition() {
return new ConditionObject();
}
}
}
自定義實(shí)現(xiàn)類:
public class SoWhatTest
{
public static int m = 0;
public static CountDownLatch latch = new CountDownLatch(50);
public static Lock lock = new SowhatLock();
public static void main(String[] args) throws Exception
{
Thread[] threads = new Thread[50];
for (int i = 0; i < threads.length ; i++)
{
threads[i] = new Thread(()->{
try{
lock.lock();
for (int j = 0; j <100 ; j++)
{
m++;
}
}finally
{
lock.unlock();
}
latch.countDown();
});
}
for(Thread t : threads) t.start();
latch.await();
System.out.println(m);
}
}
4、AQS底層
4.1 CLH
CLH(Craig、 Landin、 Hagersten locks三個人名字綜合而命名):
是一個自旋鎖,能確保無饑餓性,提供先來先服務(wù)的公平性。 CLH 鎖也是一種基于鏈表的可擴(kuò)展、高性能、公平的自旋鎖,申請線程只在本地變量上自旋,它不斷輪詢前驅(qū)的狀態(tài),如果發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋。
4.2 Node
CLH 隊列由Node對象組成,其中Node是AQS中的內(nèi)部類。
static final class Node {
// 標(biāo)識共享鎖
static final Node SHARED = new Node();
// 標(biāo)識獨(dú)占鎖
static final Node EXCLUSIVE = null;
// 前驅(qū)節(jié)點(diǎn)
volatile Node prev;
// 后繼節(jié)點(diǎn)
volatile Node next;
// 獲取鎖失敗的線程保存在Node節(jié)點(diǎn)中。
volatile Thread thread;
// 當(dāng)我們調(diào)用了Condition后他也有一個等待隊列
Node nextWaiter;
//在Node節(jié)點(diǎn)中一般通過waitStatus獲得下面節(jié)點(diǎn)不同的狀態(tài),狀態(tài)對應(yīng)下方。
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
waitStatus 有如下5中狀態(tài):
-
CANCELLED = 1
表示當(dāng)前結(jié)點(diǎn)已取消調(diào)度。當(dāng)超時或被中斷(響應(yīng)中斷的情況下),會觸發(fā)變更為此狀態(tài),進(jìn)入該狀態(tài)后的結(jié)點(diǎn)將不會再變化。
-
SIGNAL = -1
表示后繼結(jié)點(diǎn)在等待當(dāng)前結(jié)點(diǎn)喚醒。后繼結(jié)點(diǎn)入隊時,會將前繼結(jié)點(diǎn)的狀態(tài)更新為 SIGNAL。
-
CONDITION = -2
表示結(jié)點(diǎn)等待在 Condition 上,當(dāng)其他線程調(diào)用了 Condition 的 signal() 方法后,CONDITION狀態(tài)的結(jié)點(diǎn)將從等待隊列轉(zhuǎn)移到同步隊列中,等待獲取同步鎖。
-
PROPAGATE = -3
共享模式下,前繼結(jié)點(diǎn)不僅會喚醒其后繼結(jié)點(diǎn),同時也可能會喚醒后繼的后繼結(jié)點(diǎn)。
-
INITIAL = 0
新結(jié)點(diǎn)入隊時的默認(rèn)狀態(tài)。
4.3 AQS實(shí)現(xiàn)
4.3.1 公平鎖和非公平鎖
銀行售票窗口營業(yè)中:
公平排隊:每個客戶來了自動在最后面排隊,輪到自己辦理業(yè)務(wù)的時候拿出身份證等證件取票。
非公平排隊:有個旅客火車馬上開車了,他拿著自己的各種證件著急這想跟窗口工作人員說是否可以加急辦理下,可以的話則直接辦理,不可以的話則去隊尾排隊去。
在JUC中同樣存在公平鎖跟非公平鎖,一般非公平鎖效率好一些。因?yàn)榉枪芥i狀態(tài)下打算搶鎖的線程不用排隊掛起了。
4.3.2 AQS細(xì)節(jié)
AQS內(nèi)部維護(hù)著一個FIFO的隊列,即CLH隊列,提供先來先服務(wù)的公平性。AQS的同步機(jī)制就是依靠CLH隊列實(shí)現(xiàn)的。CLH隊列是FIFO的雙端雙向鏈表隊列(方便尾部節(jié)點(diǎn)插入)。線程通過AQS獲取鎖失敗,就會將線程封裝成一個Node節(jié)點(diǎn),通過CAS原子操作插入隊列尾。當(dāng)有線程釋放鎖時,會嘗試讓隊頭的next節(jié)點(diǎn)占用鎖,個人理解AQS具有如下幾個特點(diǎn):
在AQS 同步隊列中 -1 表示線程在睡眠狀態(tài) 當(dāng)前Node節(jié)點(diǎn)線程會把前一個Node.ws = -1。當(dāng)前節(jié)點(diǎn)把前面節(jié)點(diǎn)ws設(shè)置為-1,你可以理解為:你自己能知道自己睡著了嗎?只能是別人看到了發(fā)現(xiàn)你睡眠了! 持有鎖的線程永遠(yuǎn)不在隊列中。 在AQS隊列中第二個才是最先排隊的線程。 如果是交替型任務(wù)或者單線程任務(wù),即使用了Lock也不會涉及到AQS 隊列。 不到萬不得已不要輕易park線程,很耗時的!所以排隊的頭線程會自旋的嘗試幾個獲取鎖。
4.4 加鎖跟解鎖流程圖
以最經(jīng)典的 ReentrantLock 為例逐步分析下 lock 跟 unlock 底層流程圖(要原圖的話公眾號回復(fù):lock)。
private Lock lock = new ReentrantLock();
public void test(){
lock.lock();
try{
doSomeThing();
}catch (Exception e){
...
}finally {
lock.unlock();
}
}
4.4.1 獨(dú)占式加入同步隊列
同步器AQS中包含兩個節(jié)點(diǎn)類型的引用:一個指向頭結(jié)點(diǎn)的引用(head),一個指向尾節(jié)點(diǎn)的引用(tail),如果加入的節(jié)點(diǎn)是OK的則會直接運(yùn)行該節(jié)點(diǎn),當(dāng)若干個線程搶鎖失敗了那么就會搶著加入到同步隊列的尾部,因?yàn)槭菗屩尤脒@個時候用CAS來設(shè)置尾部節(jié)點(diǎn)。入口代碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
-
tryAcquire
該方法是需要自我實(shí)現(xiàn)的,在上面的demo中可見一斑,就是返回是否獲得了鎖。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 是否需要加入隊列,不需要的話則嘗試CAS獲得鎖,獲得成功后 設(shè)置當(dāng)前鎖的擁有者
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 這就是可重入鎖的實(shí)現(xiàn)
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
-
addWaiter(Node.EXCLUSIVE,arg)
/**
* 如果嘗試獲取同步狀態(tài)失敗的話,則構(gòu)造同步節(jié)點(diǎn)(獨(dú)占式的Node.EXCLUSIVE),通過addWaiter(Node node,int args)方法將該節(jié)點(diǎn)加入到同步隊列的隊尾。
*/
private Node addWaiter(Node mode) {
// 用當(dāng)前線程構(gòu)造一個Node對象,mode是一個表示Node類型的字段,或者說是這個節(jié)點(diǎn)是獨(dú)占的還是共享的
Node node = new Node(Thread.currentThread(), mode);
// 將目前隊列中尾部節(jié)點(diǎn)給pred
Node pred = tail;
// 隊列不為空的時候
if (pred != null) {
node.prev = pred;
// 先嘗試通過AQS方式修改尾節(jié)點(diǎn)為最新的節(jié)點(diǎn),如果修改失敗,意味著有并發(fā),
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//第一次嘗試添加尾部失敗說明有并發(fā),此時進(jìn)入自旋
enq(node);
return node;
}
-
自旋enq 方法將并發(fā)添加節(jié)點(diǎn)的請求通過CAS跟自旋將尾節(jié)點(diǎn)的添加變得 串行化起來。說白了就是讓節(jié)點(diǎn)放到正確的隊尾位置。
/**
* 這里進(jìn)行了循環(huán),如果此時存在了tail就執(zhí)行同上一步驟的添加隊尾操作,如果依然不存在,
* 就把當(dāng)前線程作為head結(jié)點(diǎn)。插入節(jié)點(diǎn)后,調(diào)用acquireQueued()進(jìn)行阻塞
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
-
acquireQueued 是當(dāng)前Node節(jié)點(diǎn)線程在死循環(huán)中獲取同步狀態(tài),而只有前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)才能嘗試獲取鎖,原因是:
頭結(jié)點(diǎn)是成功獲取同步狀態(tài)(鎖)的節(jié)點(diǎn),而頭節(jié)點(diǎn)的線程釋放了同步狀態(tài)以后,將會喚醒其后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)的線程被喚醒后要檢查自己的前驅(qū)節(jié)點(diǎn)是否為頭結(jié)點(diǎn)。 維護(hù)同步隊列的FIFO原則,節(jié)點(diǎn)進(jìn)入同步隊列之后,會嘗試自旋幾次。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋檢查當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是否為頭結(jié)點(diǎn),才能獲取鎖
for (;;) {
// 獲取節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 節(jié)點(diǎn)中的線程循環(huán)的檢查,自己的前驅(qū)節(jié)點(diǎn)是否為頭節(jié)點(diǎn)
// 只有當(dāng)前節(jié)點(diǎn) 前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)才會 再次調(diào)用我們實(shí)現(xiàn)的方法tryAcquire
// 接下來無非就是將當(dāng)前節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn),移除之前的頭節(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 否則檢查前一個節(jié)點(diǎn)的狀態(tài),看當(dāng)前獲取鎖失敗的線程是否要掛起
if (shouldParkAfterFailedAcquire(p, node) &&
//如果需要掛起,借助JUC包下面的LockSupport類的靜態(tài)方法park掛起當(dāng)前線程,直到被喚醒
parkAndCheckInterrupt())
interrupted = true; // 兩個判斷都是true說明 則置true
}
} finally {
//如果等待過程中沒有成功獲取資源(如timeout,或者可中斷的情況下被中斷了),那么取消結(jié)點(diǎn)在隊列中的等待。
if (failed)
//取消請求,將當(dāng)前節(jié)點(diǎn)從隊列中移除
cancelAcquire(node);
}
}
如果成功就返回,否則就執(zhí)行shouldParkAfterFailedAcquire、parkAndCheckInterrupt來達(dá)到阻塞效果。
-
shouldParkAfterFailedAcquire 第二步的 addWaiter()構(gòu)造的新節(jié)點(diǎn),waitStatus的默認(rèn)值是0。此時,會進(jìn)入最后一個if判斷,CAS設(shè)置pred.waitStatus SIGNAL,最后返回false。由于返回false,第四步的acquireQueued會繼續(xù)進(jìn)行循環(huán)。假設(shè)node的前繼節(jié)點(diǎn)pred仍然不是頭結(jié)點(diǎn)或鎖獲取失敗,則會再次進(jìn)入shouldParkAfterFailedAcquire()。上一輪循環(huán)中已經(jīng)將pred.waitStatu = -1了,則這次會進(jìn)入第一個判斷條件,直接返回true,表示應(yīng)該阻塞調(diào)用parkAndCheckInterrupt。
那么什么時候會遇到ws > 0呢?當(dāng)pred所維護(hù)的獲取請求被取消時(也就是node.waitStatus = CANCELLED,這時就會循環(huán)移除所有被取消的前繼節(jié)點(diǎn)pred,直到找到未被取消的pred。移除所有被取消的前繼節(jié)點(diǎn)后,直接返回false。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 獲得前驅(qū)節(jié)點(diǎn)的狀態(tài)
if (ws == Node.SIGNAL) //此處是第二次設(shè)置
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 此處是第一次設(shè)置 unsafe級別調(diào)用設(shè)置
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
-
parkAndCheckInterrupt 主要任務(wù)是暫停當(dāng)前線程然后查看是否已經(jīng)暫停了。
private final boolean parkAndCheckInterrupt() {
// 調(diào)用park()使線程進(jìn)入掛起狀態(tài),什么時候調(diào)用了unpark再繼續(xù)執(zhí)行下面
LockSupport.park(this);
// 如果被喚醒,查看自己是不是已經(jīng)被中斷了。
return Thread.interrupted();
}
-
cancelAcquire acquireQueued方法的finally會判斷failed值,正常運(yùn)行時候自旋出來的時候會是false,如果中斷或者timeout了 則會是true,執(zhí)行cancelAcquire,其中核心代碼是node.waitStatus = Node.CANCELLED。 -
selfInterrupt
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
4.4.2 獨(dú)占式釋放隊列頭節(jié)點(diǎn)
release()會調(diào)用tryRelease方法嘗試釋放當(dāng)前線程持有的鎖,成功的話喚醒后繼線程,并返回true,否則直接返回false。
public final boolean release(long arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
-
tryRelease 這個是子類需要自我實(shí)現(xiàn)的,沒啥說的根據(jù)業(yè)務(wù)需要實(shí)現(xiàn)。 -
unparkSuccessor 喚醒頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; // 獲得頭節(jié)點(diǎn)狀態(tài)
if (ws < 0) //如果頭節(jié)點(diǎn)裝小于0 則將其置為0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; //這個是新的頭節(jié)點(diǎn)
if (s == null || s.waitStatus > 0) {
// 如果新頭節(jié)點(diǎn)不滿足要求
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
//從隊列尾部開始往前去找最前面的一個waitStatus小于0的節(jié)點(diǎn)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)//喚醒后繼節(jié)點(diǎn)對應(yīng)的線程
LockSupport.unpark(s.thread);
}
4.4.3 AQS 中增加跟刪除形象圖
5、CountDownLatch底層
5.1 共享鎖 CountDownLatch底層
CountDownLatch 雖然相對簡單,但也實(shí)現(xiàn)了共享鎖模型。但是如何正確的吹逼 CountDownLatch 呢?如果在理解了上述流程的基礎(chǔ)上,從CountDownLatch入手來看 AQS 中關(guān)于共享鎖的代碼還比較好看懂,在看的時候可以 以看懂大致內(nèi)容為主,學(xué)習(xí)其設(shè)計的思路,不要陷入所有條件處理細(xì)節(jié)中,多線程環(huán)境中,對與錯有時候不是那么容易看出來的。個人追源碼繪制了如下圖:
5.2 計數(shù)信號量Semaphore
Semaphore 這就是共享鎖的一個實(shí)現(xiàn)類,在初始化的時候就規(guī)定了共享鎖池的大小N,有一個線程獲得了鎖,可用數(shù)就減少1個。有一個線程釋放鎖可用數(shù)就增加1個。如果有 >=2 的線程同時釋放鎖,則此時有多個鎖可用。這個時候就可以 同時喚醒 兩個鎖 setHeadAndPropagate (流程圖懶的繪制了)。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//找先驅(qū)結(jié)點(diǎn)
final Node p = node.predecessor();
if (p == head) {
// 嘗試獲取資源
int r = tryAcquireShared(arg);
if (r >= 0) {
// 設(shè)置當(dāng)前結(jié)點(diǎn)為頭結(jié)點(diǎn),然后去喚醒后續(xù)結(jié)點(diǎn)。注意傳播性 喚醒!
setHeadAndPropagate(node, r);
p.next = null; // help GC 釋放頭結(jié)點(diǎn),等待GC
if (interrupted)
selfInterrupt();
failed = false;//獲取到資源
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)//如果最后沒有獲取到資源,則cancel
cancelAcquire(node);
}
}
5.3 ReentrantReadWriteLock
在 ReentrantReadWriteLock 類中也是只有一個32位的int state來表示讀鎖跟寫鎖,如何實(shí)現(xiàn)的?
后16位用來保存獨(dú)享的寫鎖個數(shù),第一次獲得就是01,第二次重入就是10了,這樣的方式來保存。 但是多個線程都可以獲得讀鎖,并且每個線程可能讀多次,如何保存?我們用前16位來保存有多少個線程獲得了讀鎖。 每個讀鎖線程獲得的重入讀鎖個數(shù) 由內(nèi)部類 HoldCounter與讀鎖配套使用。
6、Condition
synchronized 可用 wait() 和 notify()/notifyAll() 方法相結(jié)合可以實(shí)現(xiàn)等待/通知模式。Lock 也提供了 Condition 來提供類似的功能。
Condition是JDK5后引入的Interface,它用來替代傳統(tǒng)的Object的wait()/notify()實(shí)現(xiàn)線程間的協(xié)作,相比使用Object的wait()/notify(),使用Condition的await()/signal()這種方式 實(shí)現(xiàn)線程間協(xié)作更加安全和高效。簡單說,他的作用是使得某些線程一起等待某個條件(Condition),只有當(dāng)該條件具備(signal 或者 signalAll方法被調(diào)用)時,這些等待線程才會被喚醒,從而重新爭奪鎖。wait()/notify()這些都更傾向于底層的實(shí)現(xiàn)開發(fā),而Condition接口更傾向于代碼實(shí)現(xiàn)的等待通知效果。兩者之間的區(qū)別與共通點(diǎn)如下:
6.1 條件等待隊列
條件等待隊列,指的是 Condition 內(nèi)部自己維護(hù)的一個隊列,不同于 AQS 的 同步等待隊列。它具有以下特點(diǎn):
要加入條件等待隊列的節(jié)點(diǎn),不能在 同步等待隊列。
從 條件等待隊列 移除的節(jié)點(diǎn),會進(jìn)入同步等待隊列。
一個鎖對象只能有一個同步等待隊列,但可以有多個條件等待隊列。
這里以 AbstractQueuedSynchronizer 的內(nèi)部類 ConditionObject 為例(Condition 的實(shí)現(xiàn)類)來分析下它的具體實(shí)現(xiàn)過程。首先來看該類內(nèi)部定義的幾個成員變量:
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
它采用了 AQS 的 Node 節(jié)點(diǎn)構(gòu)造(前面說過Node類有nextWaiter屬性),并定義了兩個成員變量:firstWaiter、lastWaiter 。說明在 ConditionObject 內(nèi)部也維護(hù)著一個自己的單向等待隊列。目前可知它的結(jié)構(gòu)如下:
6.2 await、signal
比如有線程 1、2競爭鎖,下面來說下具體過程 線程1:
1、線程1 調(diào)用 reentrantLock.lock時,持有鎖。
2、線程1 調(diào)用 await 方法,進(jìn)入條件等待隊列 ,同時釋放鎖。
3、線程1 獲取到線程2 signal 信號,從條件等待隊列進(jìn)入同步等待隊列。
線程2:
1、線程2 調(diào)用 reentrantLock.lock時,由于鎖被線程1 持有,進(jìn)入同步等待隊列 。
2、由于線程1 釋放鎖,線程2 從同步等待隊列 移除,獲取到鎖。
3、線程2 調(diào)用 signal 方法,導(dǎo)致線程 1 被喚醒。線程2 調(diào)用unlock ,線程1 獲取鎖后繼續(xù)下走。
6.2.1 await
當(dāng)我們看await、signal 的源碼時候不要認(rèn)為等待隊列跟同步隊列是完全分開的,其實(shí)個人感覺底層源碼是有點(diǎn) HashMap 中的紅黑樹跟雙向鏈表的意思。
當(dāng)調(diào)用await方法時候,說明當(dāng)前任務(wù)隊列的頭節(jié)點(diǎn)拿著鎖呢,此時要把該Thread從任務(wù)隊列挪到等待隊列再喚醒任務(wù)隊列最前面排隊的運(yùn)行任務(wù),如圖:
-
thread 表示節(jié)點(diǎn)存放的線程。 -
waitStatus 表示節(jié)點(diǎn)等待狀態(tài)。條件等待隊列中的節(jié)點(diǎn)等待狀態(tài)都是 CONDITION,否則會被清除。 -
nextWaiter 表示后指針。
6.2.2 signal
當(dāng)我們調(diào)用signal方法的時候,我們要將等待隊列中的頭節(jié)點(diǎn)移出來,讓其去搶鎖,如果是公平模式就要去排隊了,流程如圖:

上面只是形象流程圖,如果從代碼級別看的話大致流程如下:

6.2.3 signalAll
signalAll 與 signal 方法的區(qū)別體現(xiàn)在 doSignalAll 方法上,前面我們已經(jīng)知道doSignal方法只會對等待隊列的頭節(jié)點(diǎn)進(jìn)行操作,doSignalAll方法只不過將等待隊列中的每一個節(jié)點(diǎn)都移入到同步隊列中,即通知當(dāng)前調(diào)用condition.await()方法的每一個線程:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null); // 循環(huán)
}
6.3 End
一個 Condition 對象就有一個單項的等待任務(wù)隊列。在一個多線程任務(wù)中我們可以new出多個等待任務(wù)隊列。比如我們new出來兩個等待隊列。
private Lock lock = new ReentrantLock();
private Condition FirstCond = lock.newCondition();
private Condition SecondCond = lock.newCondition();
所以真正的AQS任務(wù)中一般是一個任務(wù)隊列N個等待隊列的,因此我們盡量調(diào)用signal而少用signalAll,因?yàn)樵谥付ǖ膶?shí)例化等待隊列中只有一個可以拿到鎖的。
Synchronized 中的 wait 跟 notify 底層代碼的等待隊列只有一個,多個線程調(diào)用wait的時候我們是無法知道頭節(jié)點(diǎn)是那個具體線程的。因此只能notifyAll。
7、參考
1、詳解Condition的await和signal:https://www.jianshu.com/p/28387056eeb4
2、Condition的await和signal流程:https://www.cnblogs.com/insaneXs/p/12219097.html
歡迎關(guān)注微信公眾號:互聯(lián)網(wǎng)全棧架構(gòu),收取更多有價值的信息。
