<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          圖文并茂:AQS 是怎么運(yùn)行的?

          共 12779字,需瀏覽 26分鐘

           ·

          2020-11-27 00:01

          前言

          如果你想深入研究Java并發(fā)的話,那么AQS一定是繞不開的一塊知識點(diǎn),Java并發(fā)包很多的同步工具類底層都是基于AQS來實(shí)現(xiàn)的,比如我們工作中經(jīng)常用的Lock工具ReentrantLock、柵欄CountDownLatch、信號量Semaphore等,而且關(guān)于AQS的知識點(diǎn)也是面試中經(jīng)常考察的內(nèi)容,所以,無論是為了更好的使用還是為了應(yīng)付面試,深入學(xué)習(xí)AQS都很有必要。

          CAS

          學(xué)習(xí)AQS之前,我們有必要了解一個知識點(diǎn),就是AQS底層中大量使用的CAS,關(guān)于CAS,大家應(yīng)該都不陌生,這里不多復(fù)述。

          此時,好幾塊搬磚朝我飛了過來。。。。。

          好吧,開個玩笑,還是大概講解一下吧,了解的同學(xué)可以跳過這一段。

          CAS是樂觀鎖的一種思想,它假設(shè)線程對資源的訪問是沒有沖突的,同時所有的線程執(zhí)行都不需要等待,可以持續(xù)執(zhí)行。如果有沖突的話,就用比較+交換的方式來檢測沖突,有沖突就不斷重試。

          CAS的全稱是Compare-and-Swap,也就是比較并交換,它包含了三個參數(shù):V,A,B,V表示要讀寫的內(nèi)存位置,A表示舊的預(yù)期值,B表示新值,當(dāng)執(zhí)行CAS時,只有當(dāng)V的值等于預(yù)期值A(chǔ)時,才會把V的值改為B,這樣的方式可以讓多個線程同時去修改,但也會因?yàn)榫€程操作失敗而不斷重試,對CPU有一定程序上的開銷。

          AQS簡介

          本文主角正式登場。

          AQS,全名AbstractQueuedSynchronizer,是一個抽象類的隊(duì)列式同步器,它的內(nèi)部通過維護(hù)一個狀態(tài)volatile int state(共享資源),一個FIFO線程等待隊(duì)列來實(shí)現(xiàn)同步功能。

          state用關(guān)鍵字volatile修飾,代表著該共享資源的狀態(tài)一更改就能被所有線程可見,而AQS的加鎖方式本質(zhì)上就是多個線程在競爭state,當(dāng)state為0時代表線程可以競爭鎖,不為0時代表當(dāng)前對象鎖已經(jīng)被占有,其他線程來加鎖時則會失敗,加鎖失敗的線程會被放入一個FIFO的等待隊(duì)列中,這些線程會被UNSAFE.park()操作掛起,等待其他獲取鎖的線程釋放鎖才能夠被喚醒。

          而這個等待隊(duì)列其實(shí)就相當(dāng)于一個CLH隊(duì)列,用一張?jiān)韴D來表示大致如下:


          基礎(chǔ)定義

          AQS支持兩種資源分享的方式:Exclusive(獨(dú)占,只有一個線程能執(zhí)行,如ReentrantLock)和Share(共享,多個線程可同時執(zhí)行,如Semaphore/CountDownLatch)。

          自定義的同步器繼承AQS后,只需要實(shí)現(xiàn)共享資源state的獲取和釋放方式即可,其他如線程隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等)等操作,AQS在頂層已經(jīng)實(shí)現(xiàn)了,

          AQS代碼內(nèi)部提供了一系列操作鎖和線程隊(duì)列的方法,主要操作鎖的方法包含以下幾個:

          • compareAndSetState():利用CAS的操作來設(shè)置state的值
          • tryAcquire(int):獨(dú)占方式獲取鎖。成功則返回true,失敗則返回false。
          • tryRelease(int):獨(dú)占方式釋放鎖。成功則返回true,失敗則返回false。
          • tryAcquireShared(int):共享方式釋放鎖。負(fù)數(shù)表示失敗;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
          • tryReleaseShared(int):共享方式釋放鎖。如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false。

          像ReentrantLock就是實(shí)現(xiàn)了自定義的tryAcquire-tryRelease,從而操作state的值來實(shí)現(xiàn)同步效果。

          除此之外,AQS內(nèi)部還定義了一個靜態(tài)類Node,表示CLH隊(duì)列的每一個結(jié)點(diǎn),該結(jié)點(diǎn)的作用是對每一個等待獲取資源做了封裝,包含了需要同步的線程本身、線程等待狀態(tài).....

          我們可以看下該類的一些重點(diǎn)變量:

          static?final?class?Node?{
          ????????/**?表示共享模式下等待的Node?*/
          ????????static?final?Node?SHARED?=?new?Node();
          ????????/**?表示獨(dú)占模式下等待的mode?*/
          ????????static?final?Node?EXCLUSIVE?=?null;

          ????????/**?下面幾個為waitStatus的具體值?*/
          ????????static?final?int?CANCELLED?=??1;
          ????????static?final?int?SIGNAL????=?-1;
          ????????static?final?int?CONDITION?=?-2;
          ????????static?final?int?PROPAGATE?=?-3;
          ????
          ????????volatile?int?waitStatus;
          ????????
          ?????????/**?表示前面的結(jié)點(diǎn)?*/
          ????????volatile?Node?prev;
          ?????????/**?表示后面的結(jié)點(diǎn)?*/
          ????????volatile?Node?next;
          ?????????/**當(dāng)前結(jié)點(diǎn)裝載的線程,初始化時被創(chuàng)建,使用后會置空*/
          ????????volatile?Thread?thread;
          ?????????/**鏈接到下一個節(jié)點(diǎn)的等待條件,用到Condition的時候會使用到*/
          ????????Node?nextWaiter;
          ????
          ????}

          代碼里面定義了一個表示當(dāng)前Node結(jié)點(diǎn)等待狀態(tài)的字段waitStatus,該字段的取值包含了CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、0,這五個值代表了不同的特定場景:

          • CANCELLED:表示當(dāng)前結(jié)點(diǎn)已取消調(diào)度。當(dāng)timeout或被中斷(響應(yīng)中斷的情況下),會觸發(fā)變更為此狀態(tài),進(jìn)入該狀態(tài)后的結(jié)點(diǎn)將不會再變化。
          • SIGNAL:表示后繼結(jié)點(diǎn)在等待當(dāng)前結(jié)點(diǎn)喚醒。后繼結(jié)點(diǎn)入隊(duì)時,會將前繼結(jié)點(diǎn)的狀態(tài)更新為SIGNAL(記住這個-1的值,因?yàn)楹竺嫖覀冎v的時候經(jīng)常會提到)
          • CONDITION:表示結(jié)點(diǎn)等待在Condition上,當(dāng)其他線程調(diào)用了Condition的signal()方法后,CONDITION狀態(tài)的結(jié)點(diǎn)將從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中,等待獲取同步鎖。(注:Condition是AQS的一個組件,后面會細(xì)說)
          • PROPAGATE:共享模式下,前繼結(jié)點(diǎn)不僅會喚醒其后繼結(jié)點(diǎn),同時也可能會喚醒后繼的后繼結(jié)點(diǎn)。
          • 0:新結(jié)點(diǎn)入隊(duì)時的默認(rèn)狀態(tài)。

          也就是說,當(dāng)waitStatus為負(fù)值表示結(jié)點(diǎn)處于有效等待狀態(tài),為正值的時候表示結(jié)點(diǎn)已被取消。

          在AQS內(nèi)部中還維護(hù)了兩個Node對象headtail,一開始默認(rèn)都為null

          private?transient?volatile?Node?head;
          private?transient?volatile?Node?tail;???

          講完了AQS的一些基礎(chǔ)定義,我們就可以開始學(xué)習(xí)同步的具體運(yùn)行機(jī)制了,為了更好的演示,我們用ReentrantLock作為使用入口,一步步跟進(jìn)源碼探究AQS底層是如何運(yùn)作的,這里說明一下,因?yàn)镽eentrantLock底層調(diào)用的AQS是獨(dú)占模式,所以下文講解的AQS源碼也是針對獨(dú)占模式的操作

          好了,熱身正式結(jié)束,來吧。

          獨(dú)占模式

          加鎖過程

          我們都知道,ReentrantLock的加鎖和解鎖方法分別為lock()和unLock(),我們先來看獲取鎖的方法,

          final?void?lock()?{
          ?if?(compareAndSetState(0,?1))
          ??setExclusiveOwnerThread(Thread.currentThread());
          ?else
          ??acquire(1);
          }

          邏輯很簡單,線程進(jìn)來后直接利用CAS嘗試搶占鎖,如果搶占成功state值回被改為1,且設(shè)置對象獨(dú)占鎖線程為當(dāng)前線程,否則就調(diào)用acquire(1)再次嘗試獲取鎖。

          我們假定有兩個線程A和B同時競爭鎖,A進(jìn)來先搶占到鎖,此時的AQS模型圖就類似這樣:

          繼續(xù)走下面的方法,

          public?final?void?acquire(int?arg)?{
          ?if?(!tryAcquire(arg)?&&
          ??acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
          ??selfInterrupt();
          }

          acquire包含了幾個函數(shù)的調(diào)用,

          tryAcquire:嘗試直接獲取鎖,如果成功就直接返回;

          addWaiter:將該線程加入等待隊(duì)列FIFO的尾部,并標(biāo)記為獨(dú)占模式;

          acquireQueued:線程阻塞在等待隊(duì)列中獲取鎖,一直獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。

          selfInterrupt:自我中斷,就是既拿不到鎖,又在等待時被中斷了,線程就會進(jìn)行自我中斷selfInterrupt(),將中斷補(bǔ)上。

          我們一個個來看源碼,并結(jié)合上面的兩個線程來做場景分析。

          tryAcquire

          不用多說,就是為了再次嘗試獲取鎖

          protected?final?boolean?tryAcquire(int?acquires)?{
          ?return?nonfairTryAcquire(acquires);
          }

          final?boolean?nonfairTryAcquire(int?acquires)?{
          ?final?Thread?current?=?Thread.currentThread();
          ?int?c?=?getState();
          ?if?(c?==?0)?{
          ??if?(compareAndSetState(0,?acquires))?{
          ???setExclusiveOwnerThread(current);
          ???return?true;
          ??}
          ?}
          ?else?if?(current?==?getExclusiveOwnerThread())?{
          ??int?nextc?=?c?+?acquires;
          ??if?(nextc????throw?new?Error("Maximum?lock?count?exceeded");
          ??setState(nextc);
          ??return?true;
          ?}
          ?return?false;
          }

          當(dāng)線程B進(jìn)來后,nonfairTryAcquire方法首先會獲取state的值,如果為0,則正常獲取該鎖,不為0的話判斷是否是當(dāng)前線程占用了,是的話就累加state的值,這里的累加也是為了配合釋放鎖時候的次數(shù),從而實(shí)現(xiàn)可重入鎖的效果。

          當(dāng)然,因?yàn)橹版i已經(jīng)被線程A占領(lǐng)了,所以這時候tryAcquire會返回false,繼續(xù)下面的流程。

          addWaiter

          private?Node?addWaiter(Node?mode)?{
          ?Node?node?=?new?Node(Thread.currentThread(),?mode);
          ?//?Try?the?fast?path?of?enq;?backup?to?full?enq?on?failure
          ?Node?pred?=?tail;
          ?if?(pred?!=?null)?{
          ??node.prev?=?pred;
          ??if?(compareAndSetTail(pred,?node))?{
          ???pred.next?=?node;
          ???return?node;
          ??}
          ?}
          ?enq(node);
          ?return?node;
          }

          這段代碼首先會創(chuàng)建一個和當(dāng)前線程綁定的Node節(jié)點(diǎn),Node為雙向鏈表。此時等待隊(duì)列中的tail指針為空,直接調(diào)用enq(node)方法將當(dāng)前線程加入等待隊(duì)列尾部,然后返回當(dāng)前結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn),

          private?Node?enq(final?Node?node)?{
          ?//?CAS"自旋",直到成功加入隊(duì)尾
          ????for?(;;)?{
          ????????Node?t?=?tail;
          ????????if?(t?==?null)?{
          ?????????//?隊(duì)列為空,初始化一個Node結(jié)點(diǎn)作為Head結(jié)點(diǎn),并將tail結(jié)點(diǎn)也指向它
          ????????????if?(compareAndSetHead(new?Node()))
          ????????????????tail?=?head;
          ????????}?else?{
          ?????????//?把當(dāng)前結(jié)點(diǎn)插入隊(duì)列尾部
          ????????????node.prev?=?t;
          ????????????if?(compareAndSetTail(t,?node))?{
          ????????????????t.next?=?node;
          ????????????????return?t;
          ????????????}
          ????????}
          ????}
          }

          第一遍循環(huán)時,tail指針為空,初始化一個Node結(jié)點(diǎn),并把head和tail結(jié)點(diǎn)都指向它,然后第二次循環(huán)進(jìn)來之后,tail結(jié)點(diǎn)不為空了,就將當(dāng)前的結(jié)點(diǎn)加入到tail結(jié)點(diǎn)后面,也就是這樣:

          todo 如果此時有另一個線程C進(jìn)來的話,發(fā)現(xiàn)鎖已經(jīng)被A拿走了,然后隊(duì)列里已經(jīng)有了線程B,那么線程C就只能乖乖排到線程B的后面去,

          acquireQueued

          接著解讀方法,通過tryAcquire()和addWaiter(),我們的線程還是沒有拿到資源,并且還被排到了隊(duì)列的尾部,如果讓你來設(shè)計(jì)的話,這個時候你會怎么處理線程呢?其實(shí)答案也很簡單,能做的事無非兩個:

          1、循環(huán)讓線程再搶資源。但仔細(xì)一推敲就知道不合理,因?yàn)槿绻卸鄠€線程都參與的話,你搶我也搶只會降低系統(tǒng)性能

          2、進(jìn)入等待狀態(tài)休息,直到其他線程徹底釋放資源后喚醒自己,自己再拿到資源

          毫無疑問,選擇2更加靠譜,acquireQueued方法做的也是這樣的處理:

          final?boolean?acquireQueued(final?Node?node,?int?arg)?{
          ?boolean?failed?=?true;
          ?try?{
          ??//?標(biāo)記是否會被中斷
          ??boolean?interrupted?=?false;
          ??//?CAS自旋
          ??for?(;;)?{
          ???//?獲取當(dāng)前結(jié)點(diǎn)的前結(jié)點(diǎn)
          ???final?Node?p?=?node.predecessor();
          ???if?(p?==?head?&&?tryAcquire(arg))?{
          ????setHead(node);
          ????p.next?=?null;?//?help?GC
          ????failed?=?false;
          ????return?interrupted;
          ???}
          ???if?(shouldParkAfterFailedAcquire(p,?node)?&&
          ????parkAndCheckInterrupt())
          ????interrupted?=?true;
          ??}
          ?}?finally?{
          ??if?(failed)
          ???//?獲取鎖失敗,則將此線程對應(yīng)的node的waitStatus改為CANCEL
          ???cancelAcquire(node);
          ?}
          }
          private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{
          ?int?ws?=?pred.waitStatus;
          ?if?(ws?==?Node.SIGNAL)
          ??//?前驅(qū)結(jié)點(diǎn)等待狀態(tài)為"SIGNAL",那么自己就可以安心等待被喚醒了
          ??return?true;
          ?if?(ws?>?0)?{
          ??/*
          ???*?前驅(qū)結(jié)點(diǎn)被取消了,通過循環(huán)一直往前找,直到找到等待狀態(tài)有效的結(jié)點(diǎn)(等待狀態(tài)值小于等于0)?,
          ???*?然后排在他們的后邊,至于那些被當(dāng)前Node強(qiáng)制"靠后"的結(jié)點(diǎn),因?yàn)橐呀?jīng)被取消了,也沒有引用鏈,
          ???*?就等著被GC了
          ???*/
          ??do?{
          ???node.prev?=?pred?=?pred.prev;
          ??}?while?(pred.waitStatus?>?0);
          ??pred.next?=?node;
          ?}?else?{
          ??//?如果前驅(qū)正常,那就把前驅(qū)的狀態(tài)設(shè)置成SIGNAL
          ??compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);
          ?}
          ?return?false;
          }
          private?final?boolean?parkAndCheckInterrupt()?{
          ?LockSupport.park(this);
          ?return?Thread.interrupted();
          }

          acquireQueued方法的流程是這樣的:

          1、CAS自旋,先判斷當(dāng)前傳入的Node的前結(jié)點(diǎn)是否為head結(jié)點(diǎn),是的話就嘗試獲取鎖,獲取鎖成功的話就把當(dāng)前結(jié)點(diǎn)置為head,之前的head置為null(方便GC),然后返回

          2、如果前驅(qū)結(jié)點(diǎn)不是head或者加鎖失敗的話,就調(diào)用shouldParkAfterFailedAcquire,將前驅(qū)節(jié)點(diǎn)的waitStatus變?yōu)榱?strong style="line-height: 1.75em;">SIGNAL=-1,最后執(zhí)行parkAndChecknIterrupt方法,調(diào)用LockSupport.park()掛起當(dāng)前線程,parkAndCheckInterrupt在掛起線程后會判斷線程是否被中斷,如果被中斷的話,就會重新跑acquireQueued方法的CAS自旋操作,直到獲取資源。

          ps:LockSupport.park方法會讓當(dāng)前線程進(jìn)入waitting狀態(tài),在這種狀態(tài)下,線程被喚醒的情況有兩種,一是被unpark(),二是被interrupt(),所以,如果是第二種情況的話,需要返回被中斷的標(biāo)志,然后在acquire頂層方法的窗口那里自我中斷補(bǔ)上

          此時,因?yàn)榫€程A還未釋放鎖,所以線程B狀態(tài)都是被掛起的,

          到這里,加鎖的流程就分析完了,其實(shí)整體來說也并不復(fù)雜,而且當(dāng)你理解了獨(dú)占模式加鎖的過程,后面釋放鎖和共享模式的運(yùn)行機(jī)制也沒什么難懂的了,所以整個加鎖的過程還是有必要多消化下的,也是AQS的重中之重。

          為了方便你們更加清晰理解,我加多一張流程圖吧(這個作者也太暖了吧,哈哈)

          釋放鎖

          說完了加鎖,我們來看看釋放鎖是怎么做的,AQS中釋放鎖的方法是release(),當(dāng)調(diào)用該方法時會釋放指定量的資源 (也就是鎖) ,如果徹底釋放了(即state=0),它會喚醒等待隊(duì)列里的其他線程來獲取資源。

          還是一步步看源碼吧,

          public?final?boolean?release(int?arg)?{
          ?if?(tryRelease(arg))?{
          ??Node?h?=?head;
          ??if?(h?!=?null?&&?h.waitStatus?!=?0)
          ???unparkSuccessor(h);
          ??return?true;
          ?}
          ?return?false;
          }

          tryRelease

          代碼上可以看出,核心的邏輯都在tryRelease方法中,該方法的作用是釋放資源,AQS里該方法沒有具體的實(shí)現(xiàn),需要由自定義的同步器去實(shí)現(xiàn),我們看下ReentrantLock代碼中對應(yīng)方法的源碼:

          protected?final?boolean?tryRelease(int?releases)?{
          ?int?c?=?getState()?-?releases;
          ?if?(Thread.currentThread()?!=?getExclusiveOwnerThread())
          ??throw?new?IllegalMonitorStateException();
          ?boolean?free?=?false;
          ?if?(c?==?0)?{
          ??free?=?true;
          ??setExclusiveOwnerThread(null);
          ?}
          ?setState(c);
          ?return?free;
          }

          tryRelease方法會減去state對應(yīng)的值,如果state為0,也就是已經(jīng)徹底釋放資源,就返回true,并且把獨(dú)占的線程置為null,否則返回false。

          此時AQS中的數(shù)據(jù)就會變成這樣:

          完全釋放資源后,當(dāng)前線程要做的就是喚醒CLH隊(duì)列中第一個在等待資源的線程,也就是head結(jié)點(diǎn)后面的線程,此時調(diào)用的方法是unparkSuccessor()

          private?void?unparkSuccessor(Node?node)?{
          ????int?ws?=?node.waitStatus;
          ????if?(ws??????//將head結(jié)點(diǎn)的狀態(tài)置為0
          ????????compareAndSetWaitStatus(node,?ws,?0);
          ?//找到下一個需要喚醒的結(jié)點(diǎn)s
          ????Node?s?=?node.next;
          ????//如果為空或已取消
          ????if?(s?==?null?||?s.waitStatus?>?0)?{
          ????????s?=?null;
          ????????//?從后向前,直到找到等待狀態(tài)小于0的結(jié)點(diǎn),前面說了,結(jié)點(diǎn)waitStatus小于0時才有效
          ????????for?(Node?t?=?tail;?t?!=?null?&&?t?!=?node;?t?=?t.prev)?
          ????????????if?(t.waitStatus?<=?0)
          ????????????????s?=?t;
          ????}
          ????//?找到有效的結(jié)點(diǎn),直接喚醒
          ????if?(s?!=?null)
          ????????LockSupport.unpark(s.thread);//喚醒
          }

          方法的邏輯很簡單,就是先將head的結(jié)點(diǎn)狀態(tài)置為0,避免下面找結(jié)點(diǎn)的時候再找到head,然后找到隊(duì)列中最前面的有效結(jié)點(diǎn),然后喚醒,我們假設(shè)這個時候線程A已經(jīng)釋放鎖,那么此時隊(duì)列中排最前邊競爭鎖的線程B就會被喚醒。然后被喚醒的線程B就會嘗試用CAS獲取鎖,回到acquireQueued方法的邏輯,

          for?(;;)?{
          ?//?獲取當(dāng)前結(jié)點(diǎn)的前結(jié)點(diǎn)
          ?final?Node?p?=?node.predecessor();
          ?if?(p?==?head?&&?tryAcquire(arg))?{
          ??setHead(node);
          ??p.next?=?null;?//?help?GC
          ??failed?=?false;
          ??return?interrupted;
          ?}
          ?if?(shouldParkAfterFailedAcquire(p,?node)?&&
          ??parkAndCheckInterrupt())
          ??interrupted?=?true;
          }

          當(dāng)線程B獲取鎖之后,會把當(dāng)前結(jié)點(diǎn)賦值給head,然后原先的前驅(qū)結(jié)點(diǎn) (也就是原來的head結(jié)點(diǎn)) 去掉引用鏈,方便回收,這樣一來,線程B獲取鎖的整個過程就完成了,此時AQS的數(shù)據(jù)就會變成這樣:

          到這里,我們已經(jīng)分析完了AQS獨(dú)占模式下加鎖和釋放鎖的過程,也就是tryAccquire->tryRelease這一鏈條的邏輯,除此之外,AQS中還支持共享模式的同步,這種模式下關(guān)于鎖的操作核心其實(shí)就是tryAcquireShared->tryReleaseShared這兩個方法,我們可以簡單看下

          共享模式

          獲取鎖

          AQS中,共享模式獲取鎖的頂層入口方法是acquireShared,該方法會獲取指定數(shù)量的資源,成功的話就直接返回,失敗的話就進(jìn)入等待隊(duì)列,直到獲取資源,

          public?final?void?acquireShared(int?arg)?{
          ?if?(tryAcquireShared(arg)???doAcquireShared(arg);
          }

          該方法里包含了兩個方法的調(diào)用,

          tryAcquireShared:嘗試獲取一定資源的鎖,返回的值代表獲取鎖的狀態(tài)。

          doAcquireShared:進(jìn)入等待隊(duì)列,并循環(huán)嘗試獲取鎖,直到成功。

          tryAcquireShared

          tryAcquireShared在AQS里沒有實(shí)現(xiàn),同樣由自定義的同步器去完成具體的邏輯,像一些較為常見的并發(fā)工具Semaphore、CountDownLatch里就有對該方法的自定義實(shí)現(xiàn),雖然實(shí)現(xiàn)的邏輯不同,但方法的作用是一樣的,就是獲取一定資源的資源,然后根據(jù)返回值判斷是否還有剩余資源,從而決定下一步的操作。

          返回值有三種定義:

          • 負(fù)值代表獲取失敗;
          • 0代表獲取成功,但沒有剩余的資源,也就是state已經(jīng)為0;
          • 正值代表獲取成功,而且state還有剩余,其他線程可以繼續(xù)領(lǐng)取

          當(dāng)返回值小于0時,證明此次獲取一定數(shù)量的鎖失敗了,然后就會走doAcquireShared方法

          doAcquireShared

          此方法的作用是將當(dāng)前線程加入等待隊(duì)列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應(yīng)量的資源后才返回,這是它的源碼:

          private?void?doAcquireShared(int?arg)?{
          ?//?加入隊(duì)列尾部
          ?final?Node?node?=?addWaiter(Node.SHARED);
          ?boolean?failed?=?true;
          ?try?{
          ??boolean?interrupted?=?false;
          ??//?CAS自旋
          ??for?(;;)?{
          ???final?Node?p?=?node.predecessor();
          ???//?判斷前驅(qū)結(jié)點(diǎn)是否是head
          ???if?(p?==?head)?{
          ????//?嘗試獲取一定數(shù)量的鎖
          ????int?r?=?tryAcquireShared(arg);
          ????if?(r?>=?0)?{
          ?????//?獲取鎖成功,而且還有剩余資源,就設(shè)置當(dāng)前結(jié)點(diǎn)為head,并繼續(xù)喚醒下一個線程
          ?????setHeadAndPropagate(node,?r);
          ?????//?讓前驅(qū)結(jié)點(diǎn)去掉引用鏈,方便被GC
          ?????p.next?=?null;?//?help?GC
          ?????if?(interrupted)
          ??????selfInterrupt();
          ?????failed?=?false;
          ?????return;
          ????}
          ???}
          ???//?跟獨(dú)占模式一樣,改前驅(qū)結(jié)點(diǎn)waitStatus為-1,并且當(dāng)前線程掛起,等待被喚醒
          ???if?(shouldParkAfterFailedAcquire(p,?node)?&&
          ????parkAndCheckInterrupt())
          ????interrupted?=?true;
          ??}
          ?}?finally?{
          ??if?(failed)
          ???cancelAcquire(node);
          ?}
          }

          private?void?setHeadAndPropagate(Node?node,?int?propagate)?{
          ????Node?h?=?head;
          ????//?head指向自己
          ????setHead(node);
          ?????//?如果還有剩余量,繼續(xù)喚醒下一個鄰居線程
          ????if?(propagate?>?0?||?h?==?null?||?h.waitStatus?????????Node?s?=?node.next;
          ????????if?(s?==?null?||?s.isShared())
          ????????????doReleaseShared();
          ????}
          }

          看到這里,你會不會一點(diǎn)熟悉的感覺,這個方法的邏輯怎么跟上面那個acquireQueued() 那么類似啊?對的,其實(shí)兩個流程并沒有太大的差別。只是doAcquireShared()比起獨(dú)占模式下的獲取鎖上多了一步喚醒后繼線程的操作,當(dāng)獲取完一定的資源后,發(fā)現(xiàn)還有剩余的資源,就繼續(xù)喚醒下一個鄰居線程,這才符合"共享"的思想嘛。

          這里我們可以提出一個疑問,共享模式下,當(dāng)前線程釋放了一定數(shù)量的資源,但這部分資源滿足不了下一個等待結(jié)點(diǎn)的需要的話,那么會怎么樣?

          按照正常的思維,共享模式是可以多個線程同時執(zhí)行的才對,所以,多個線程的情況下,如果老大釋放完資源,但這部分資源滿足不了老二,但能滿足老三,那么老三就可以拿到資源。可事實(shí)是,從源碼設(shè)計(jì)中可以看出,如果真的發(fā)生了這種情況,老三是拿不到資源的,因?yàn)榈却?duì)列是按順序排列的,老二的資源需求量大,會把后面量小的老三以及老四、老五等都給卡住。從這一個角度來看,雖然AQS嚴(yán)格保證了順序,但也降低了并發(fā)能力

          接著往下說吧,喚醒下一個鄰居線程的邏輯在doReleaseShared()中,我們放到下面的釋放鎖來解析。

          釋放鎖

          共享模式釋放鎖的頂層方法是releaseShared,它會釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊(duì)列里的其他線程來獲取資源。下面是releaseShared()的源碼:

          public?final?boolean?releaseShared(int?arg)?{
          ?if?(tryReleaseShared(arg))?{
          ??doReleaseShared();
          ??return?true;
          ?}
          ?return?false;
          }

          該方法同樣包含兩部分的邏輯:

          tryReleaseShared:釋放資源。

          doAcquireShared:喚醒后繼結(jié)點(diǎn)。

          tryAcquireShared方法一樣,tryReleaseShared在AQS中沒有具體的實(shí)現(xiàn),由子同步器自己去定義,但功能都一樣,就是釋放一定數(shù)量的資源。

          釋放完資源后,線程不會馬上就收工,而是喚醒等待隊(duì)列里最前排的等待結(jié)點(diǎn)。

          doAcquireShared

          喚醒后繼結(jié)點(diǎn)的工作在doReleaseShared()方法中完成,我們可以看下它的源碼:

          private?void?doReleaseShared()?{
          ?for?(;;)?{
          ??//?獲取等待隊(duì)列中的head結(jié)點(diǎn)
          ??Node?h?=?head;
          ??if?(h?!=?null?&&?h?!=?tail)?{
          ???int?ws?=?h.waitStatus;
          ???//?head結(jié)點(diǎn)waitStatus?=?-1,喚醒下一個結(jié)點(diǎn)對應(yīng)的線程
          ???if?(ws?==?Node.SIGNAL)?{
          ????if?(!compareAndSetWaitStatus(h,?Node.SIGNAL,?0))
          ?????continue;????????????//?loop?to?recheck?cases
          ????//?喚醒后繼結(jié)點(diǎn)
          ????unparkSuccessor(h);
          ???}
          ???else?if?(ws?==?0?&&
          ??????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))
          ????continue;????????????????//?loop?on?failed?CAS
          ??}
          ??if?(h?==?head)???????????????????//?loop?if?head?changed
          ???break;
          ?}
          }

          代碼沒什么特別的,就是如果等待隊(duì)列head結(jié)點(diǎn)的waitStatus為-1的話,就直接喚醒后繼結(jié)點(diǎn),喚醒的方法unparkSuccessor()在上面已經(jīng)講過了,這里也沒必要再復(fù)述。

          總的來看,AQS共享模式的運(yùn)作流程和獨(dú)占模式很相似,只要掌握了獨(dú)占模式的流程運(yùn)轉(zhuǎn),共享模式什么的不就那樣嗎,沒難度。這也是我為什么共享模式講解中不畫流程圖的原因,沒必要嘛。

          Condition

          介紹完了AQS的核心功能,我們再擴(kuò)展一個知識點(diǎn),在AQS中,除了提供獨(dú)占/共享模式的加鎖/解鎖功能,它還對外提供了關(guān)于Condition的一些操作方法。

          Condition是個接口,在jdk1.5版本后設(shè)計(jì)的,基本的方法就是await()signal()方法,功能大概就對應(yīng)Objectwait()notify(),Condition必須要配合鎖一起使用,因?yàn)閷蚕頎顟B(tài)變量的訪問發(fā)生在多線程環(huán)境下。一個Condition的實(shí)例必須與一個Lock綁定,因此Condition一般都是作為Lock的內(nèi)部實(shí)現(xiàn) ,AQS中就定義了一個類ConditionObject來實(shí)現(xiàn)了這個接口,

          那么它應(yīng)該怎么用呢?我們可以簡單寫個demo來看下效果

          public?class?ConditionDemo?{

          ????public?static?void?main(String[]?args)?{
          ????????ReentrantLock?lock?=?new?ReentrantLock();
          ????????Condition?condition?=?lock.newCondition();
          ????????Thread?tA?=?new?Thread(()?->?{
          ????????????lock.lock();
          ????????????try?{
          ????????????????System.out.println("線程A加鎖成功");
          ????????????????System.out.println("線程A執(zhí)行await被掛起");
          ????????????????condition.await();
          ????????????????System.out.println("線程A被喚醒成功");
          ????????????}?catch?(InterruptedException?e)?{
          ????????????????e.printStackTrace();
          ????????????}?finally?{
          ????????????????lock.unlock();
          ????????????????System.out.println("線程A釋放鎖成功");
          ????????????}
          ????????});

          ????????Thread?tB?=?new?Thread(()?->?{
          ????????????lock.lock();
          ????????????try?{
          ????????????????System.out.println("線程B加鎖成功");
          ????????????????condition.signal();
          ????????????????System.out.println("線程B喚醒線程A");
          ????????????}?finally?{
          ????????????????lock.unlock();
          ????????????????System.out.println("線程B釋放鎖成功");
          ????????????}
          ????????});
          ????????tA.start();
          ????????tB.start();
          ????}
          }

          執(zhí)行main函數(shù)后結(jié)果輸出為:

          線程A加鎖成功
          線程A執(zhí)行await被掛起
          線程B加鎖成功
          線程B喚醒線程A
          線程B釋放鎖成功
          線程A被喚醒成功
          線程A釋放鎖成功

          代碼執(zhí)行的結(jié)果很容易理解,線程A先獲取鎖,然后調(diào)用await()方法掛起當(dāng)前線程并釋放鎖,線程B這時候拿到鎖,然后調(diào)用signal喚醒線程A。

          毫無疑問,這兩個方法讓線程的狀態(tài)發(fā)生了變化,我們仔細(xì)來研究一下,

          翻看AQS的源碼,我們會發(fā)現(xiàn)Condition中定義了兩個屬性firstWaiterlastWaiter,前面說了,AQS中包含了一個FIFO的CLH等待隊(duì)列,每個Conditon對象就包含這樣一個等待隊(duì)列,而這兩個屬性分別表示的是等待隊(duì)列中的首尾結(jié)點(diǎn),

          /**?First?node?of?condition?queue.?*/
          private?transient?Node?firstWaiter;
          /**?Last?node?of?condition?queue.?*/
          private?transient?Node?lastWaiter;

          注意:Condition當(dāng)中的等待隊(duì)列和AQS主體的同步等待隊(duì)列是分開的,兩個隊(duì)列雖然結(jié)構(gòu)體相同,但是作用域是分開的

          await

          先看await()的源碼:

          public?final?void?await()?throws?InterruptedException?{
          ????if?(Thread.interrupted())
          ????????throw?new?InterruptedException();
          ????//?將當(dāng)前線程加入到等待隊(duì)列中
          ????Node?node?=?addConditionWaiter();
          ????//?完全釋放占有的資源,并返回資源數(shù)
          ????int?savedState?=?fullyRelease(node);
          ????int?interruptMode?=?0;
          ????//?循環(huán)判斷當(dāng)前結(jié)點(diǎn)是不是在Condition的隊(duì)列中,是的話掛起
          ????while?(!isOnSyncQueue(node))?{
          ????????LockSupport.park(this);
          ????????if?((interruptMode?=?checkInterruptWhileWaiting(node))?!=?0)
          ????????????break;
          ????}
          ????if?(acquireQueued(node,?savedState)?&&?interruptMode?!=?THROW_IE)
          ????????interruptMode?=?REINTERRUPT;
          ????if?(node.nextWaiter?!=?null)?//?clean?up?if?cancelled
          ????????unlinkCancelledWaiters();
          ????if?(interruptMode?!=?0)
          ????????reportInterruptAfterWait(interruptMode);
          }

          當(dāng)一個線程調(diào)用Condition.await()方法,將會以當(dāng)前線程構(gòu)造結(jié)點(diǎn),這個結(jié)點(diǎn)的waitStatus賦值為Node.CONDITION,也就是-2,并將結(jié)點(diǎn)從尾部加入等待隊(duì)列,然后尾部結(jié)點(diǎn)就會指向這個新增的結(jié)點(diǎn),

          private?Node?addConditionWaiter()?{
          ????Node?t?=?lastWaiter;
          ????//?If?lastWaiter?is?cancelled,?clean?out.
          ????if?(t?!=?null?&&?t.waitStatus?!=?Node.CONDITION)?{
          ????????unlinkCancelledWaiters();
          ????????t?=?lastWaiter;
          ????}
          ????Node?node?=?new?Node(Thread.currentThread(),?Node.CONDITION);
          ????if?(t?==?null)
          ????????firstWaiter?=?node;
          ????else
          ????????t.nextWaiter?=?node;
          ????lastWaiter?=?node;
          ????return?node;
          }

          我們依然用上面的demo來演示,此時,線程A獲取鎖并調(diào)用Condition.await()方法后,AQS內(nèi)部的數(shù)據(jù)結(jié)構(gòu)會變成這樣:

          在Condition隊(duì)列中插入對應(yīng)的結(jié)點(diǎn)后,線程A會釋放所持有的資源,走到while循環(huán)那層邏輯,

          while?(!isOnSyncQueue(node))?{
          ?LockSupport.park(this);
          ?if?((interruptMode?=?checkInterruptWhileWaiting(node))?!=?0)
          ??break;
          }

          isOnSyncQueue方法的會判斷當(dāng)前的線程節(jié)點(diǎn)是不是在同步隊(duì)列中,這個時候此結(jié)點(diǎn)還在Condition隊(duì)列中,所以該方法返回false,這樣的話循環(huán)會一直持續(xù)下去,線程被掛起,等待被喚醒,此時,線程A的流程暫時停止了。

          當(dāng)線程A調(diào)用await()方法掛起的時候,線程B獲取到了線程A釋放的資源,然后執(zhí)行signal()方法:

          signal

          public?final?void?signal()?{
          ????if?(!isHeldExclusively())
          ????????throw?new?IllegalMonitorStateException();
          ????Node?first?=?firstWaiter;
          ????if?(first?!=?null)
          ????????doSignal(first);
          }

          先判斷當(dāng)前線程是否為獲取鎖的線程,如果不是則直接拋出異常。接著調(diào)用doSignal()方法來喚醒線程。

          private?void?doSignal(Node?first)?{
          ?//?循環(huán),從隊(duì)列一直往后找不為空的首結(jié)點(diǎn)
          ????do?{
          ????????if?(?(firstWaiter?=?first.nextWaiter)?==?null)
          ????????????lastWaiter?=?null;
          ????????first.nextWaiter?=?null;
          ????}?while?(!transferForSignal(first)?&&
          ?????????????(first?=?firstWaiter)?!=?null);
          }

          final?boolean?transferForSignal(Node?node)?{
          ?//?CAS循環(huán),將結(jié)點(diǎn)的waitStatus改為0
          ????if?(!compareAndSetWaitStatus(node,?Node.CONDITION,?0))
          ????????return?false;
          ?//?上面已經(jīng)分析過,此方法會把當(dāng)前結(jié)點(diǎn)加入到等待隊(duì)列中,并返回前驅(qū)結(jié)點(diǎn)
          ????Node?p?=?enq(node);
          ????int?ws?=?p.waitStatus;
          ????if?(ws?>?0?||?!compareAndSetWaitStatus(p,?ws,?Node.SIGNAL))
          ????????LockSupport.unpark(node.thread);
          ????return?true;
          }

          doSignal的代碼中可以看出,這時候程序?qū)ふ业氖荂ondition等待隊(duì)列中首結(jié)點(diǎn)firstWaiter的結(jié)點(diǎn),此時該結(jié)點(diǎn)指向的是線程A的結(jié)點(diǎn),所以之后的流程作用的都是線程A的結(jié)點(diǎn)。

          這里分析下transferForSignal方法,先通過CAS自旋將結(jié)點(diǎn)waitStatus改為0,然后就把結(jié)點(diǎn)放入到同步隊(duì)列 (此隊(duì)列不是Condition的等待隊(duì)列) 中,然后再用CAS將同步隊(duì)列中該結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)waitStatus改為Node.SIGNAL,也就是-1,此時AQS的數(shù)據(jù)結(jié)構(gòu)大概如下(額.....少畫了個箭頭,大家就當(dāng)head結(jié)點(diǎn)是線程A結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)就好):

          回到await()方法,當(dāng)線程A的結(jié)點(diǎn)被加入同步隊(duì)列中時,isOnSyncQueue()會返回true,跳出循環(huán),

          while?(!isOnSyncQueue(node))?{
          ????????LockSupport.park(this);
          ????????if?((interruptMode?=?checkInterruptWhileWaiting(node))?!=?0)
          ????????????break;
          ????}
          ????if?(acquireQueued(node,?savedState)?&&?interruptMode?!=?THROW_IE)
          ????????interruptMode?=?REINTERRUPT;
          ????if?(node.nextWaiter?!=?null)?//?clean?up?if?cancelled
          ????????unlinkCancelledWaiters();
          ????if?(interruptMode?!=?0)
          ????????reportInterruptAfterWait(interruptMode);

          接著執(zhí)行acquireQueued()方法,這里就不用多說了吧,嘗試重新獲取鎖,如果獲取鎖失敗繼續(xù)會被掛起,直到另外線程釋放鎖才被喚醒。

          所以,當(dāng)線程B釋放完鎖后,線程A被喚醒,繼續(xù)嘗試獲取鎖,至此流程結(jié)束。

          對于這整個通信過程,我們可以畫一張流程圖展示下:

          總結(jié)

          說完了Condition的使用和底層運(yùn)行機(jī)制,我們再來總結(jié)下它跟普通 wait/notify 的比較,一般這也是問的比較多的,Condition大概有以下兩點(diǎn)優(yōu)勢:

          • Condition 需要結(jié)合 Lock 進(jìn)行控制,使用的時候要注意一定要對應(yīng)的unlock(),可以對多個不同條件進(jìn)行控制,只要new 多個 Condition對象就可以為多個線程控制通信,wait/notify 只能和 synchronized 關(guān)鍵字一起使用,并且只能喚醒一個或者全部的等待隊(duì)列;
          • Condition 有類似于 await 的機(jī)制,因此不會產(chǎn)生加鎖方式而產(chǎn)生的死鎖出現(xiàn),同時底層實(shí)現(xiàn)的是 park/unpark 的機(jī)制,因此也不會產(chǎn)生先喚醒再掛起的死鎖,一句話就是不會產(chǎn)生死鎖,但是 wait/notify 會產(chǎn)生先喚醒再掛起的死鎖。

          最后

          對AQS的源碼分析到這里就全部結(jié)束了,雖然還有很多知識點(diǎn)沒講解,比如公平鎖/非公平鎖下AQS是怎么作用的,篇幅所限,部分知識點(diǎn)沒有擴(kuò)展還請見諒,盡管如此,如果您能看完文章的話,相信對AQS也算是有足夠的了解了。

          回顧本篇文章,我們不難發(fā)現(xiàn),無論是獨(dú)占還是共享模式,或者結(jié)合是Condition工具使用,AQS本質(zhì)上的同步功能都是通過對鎖和隊(duì)列中結(jié)點(diǎn)的操作來實(shí)現(xiàn)的,從設(shè)計(jì)上講,AQS的組成結(jié)構(gòu)并不算復(fù)雜,底層的運(yùn)轉(zhuǎn)機(jī)制也不會很繞,所以,大家如果看源碼的時候覺得有些困難的話也不用灰心,多看幾遍,順便畫個圖之類的,理清下流程還是沒什么問題的。

          當(dāng)然,自己看得懂是一回事,寫出來讓別人看懂又是另一回事了,就像這篇文章,我花了好長的時間來準(zhǔn)備,又是畫圖又是理流程的,期間還參考了不少網(wǎng)上大神的博文,肝了幾天才算是成文了。雖然我知道本文不算什么高質(zhì)文,但我也算是費(fèi)盡心力了,寫技術(shù)文真是挺累的,大家看的覺得不錯的話還請幫忙轉(zhuǎn)發(fā)下或點(diǎn)個贊吧!這也是對我最好的鼓勵了






          關(guān)注Java技術(shù)棧看更多干貨



          戳原文,獲取精選面試題!
          瀏覽 63
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  影音先锋成人AV | se综合| 黄网在线免费观看 | 丁香六月婷婷五月 | 亚洲精品无码在线观看 |