<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          一文搞懂AQS及其組件的核心原理

          共 5712字,需瀏覽 12分鐘

           ·

          2020-10-10 23:34

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”

          優(yōu)質(zhì)文章,第一時間送達(dá)

          ? 作者?|??夜勿語

          來源 |? urlify.cn/JNVRby

          66套java從入門到精通實(shí)戰(zhàn)課程分享

          前言

          JDK1.5以前只有synchronized同步鎖,并且效率非常低,因此大神Doug Lea自己寫了一套并發(fā)框架,這套框架的核心就在于AbstractQueuedSynchronizer類(即AQS),性能非常高,所以被引入JDK包中,即JUC。那么AQS是怎么實(shí)現(xiàn)的呢?本篇就是對AQS及其相關(guān)組件進(jìn)行分析,了解其原理,并領(lǐng)略大神的優(yōu)美而又精簡的代碼。

          AbstractQueuedSynchronizer

          AQS是JUC下最核心的類,沒有之一,所以我們先來分析一下這個類的數(shù)據(jù)結(jié)構(gòu)。

          AQS內(nèi)部是使用了雙向鏈表將等待線程鏈接起來,當(dāng)發(fā)生并發(fā)競爭的時候,就會初始化該隊(duì)列并讓線程進(jìn)入睡眠等待喚醒,同時每個節(jié)點(diǎn)會根據(jù)是否為共享鎖標(biāo)記狀態(tài)為共享模式獨(dú)占模式。這個數(shù)據(jù)結(jié)構(gòu)需要好好理解并牢牢記住,下面分析的組件都將基于此實(shí)現(xiàn)。

          Lock

          Lock是一個接口,提供了加/解鎖的通用API,JUC主要提供了兩種鎖,ReentrantLock和ReentrantReadWriteLock,前者是重入鎖,實(shí)現(xiàn)Lock接口,后者是讀寫鎖,本身并沒有實(shí)現(xiàn)Lock接口,而是其內(nèi)部類ReadLock或WriteLock實(shí)現(xiàn)了Lock接口。先來看看Lock都提供了哪些接口:

          //?普通加鎖,不可打斷;未獲取到鎖進(jìn)入AQS阻塞
          void?lock();

          //?可打斷鎖
          void?lockInterruptibly()?throws?InterruptedException;

          //?嘗試加鎖,未獲取到鎖不阻塞,返回標(biāo)識
          boolean?tryLock();

          //?帶超時時間的嘗試加鎖
          boolean?tryLock(long?time,?TimeUnit?unit)?throws?InterruptedException;

          //?解鎖
          void?unlock();

          //?創(chuàng)建一個條件隊(duì)列
          Condition?newCondition();

          看到這里讀者們可以先思考下,自己如何來實(shí)現(xiàn)上面這些接口。

          ReentrantLock

          加鎖

          synchronizedReentrantLock都是可重入的,后者使用更加靈活,也提供了更多的高級特性,但其本質(zhì)的實(shí)現(xiàn)原理是差不多的(據(jù)說synchronized是借鑒了ReentrantLock的實(shí)現(xiàn)原理)。ReentrantLock提供了兩個構(gòu)造方法:

          ?public?ReentrantLock()?{
          ????????sync?=?new?NonfairSync();
          ????}

          ????public?ReentrantLock(boolean?fair)?{
          ????????sync?=?fair???new?FairSync()?:?new?NonfairSync();
          ????}

          有參構(gòu)造是根據(jù)參數(shù)創(chuàng)建公平鎖非公平鎖,而無參構(gòu)造默認(rèn)則是非公平鎖,因?yàn)榉枪芥i性能非常高,并且大部分業(yè)務(wù)并不需要使用公平鎖。至于為什么非公平鎖性能很高,咱們接著往下看。

          非公平鎖/公平鎖

          lock

          非公平鎖和公平鎖在實(shí)現(xiàn)上基本一致,只有個別的地方不同,因此下面會采用對比分析方法進(jìn)行分析。
          從lock方法開始:

          ????public?void?lock()?{
          ????????sync.lock();
          ????}

          實(shí)際上是委托給了內(nèi)部類Sync,該類實(shí)現(xiàn)了AQS(其它組件實(shí)現(xiàn)方法也基本上都是這個套路);由于有公平和非公平兩種模式,因此該類又實(shí)現(xiàn)了兩個子類:FairSyncNonfairSync

          ?//?非公平鎖
          ????final?void?lock()?{
          ????????if?(compareAndSetState(0,?1))
          ????????????setExclusiveOwnerThread(Thread.currentThread());
          ????????else
          ????????????acquire(1);
          ????}

          ?//?公平鎖
          ????final?void?lock()?{
          ???????acquire(1);
          ????}

          這里就是公平鎖和非公平鎖的第一個不同,非公平鎖首先會調(diào)用CAS將state從0改為1,如果能改成功則表示獲取到鎖,直接將exclusiveOwnerThread設(shè)置為當(dāng)前線程,不用再進(jìn)行后續(xù)操作;否則則同公平鎖一樣調(diào)用acquire方法獲取鎖,這個是在AQS中實(shí)現(xiàn)的模板方法:

          ????public?final?void?acquire(int?arg)?{
          ????????if?(!tryAcquire(arg)?&&
          ????????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
          ????????????selfInterrupt();
          ????}
          tryAcquire

          這里兩種鎖唯一不同的實(shí)現(xiàn)就是tryAcquire方法,先來看非公平鎖的實(shí)現(xiàn):

          ????protected?final?boolean?tryAcquire(int?acquires)?{
          ????????return?nonfairTryAcquire(acquires);
          ????}

          ????final?boolean?nonfairTryAcquire(int?acquires)?{
          ????????final?Thread?current?=?Thread.currentThread();
          ????????int?c?=?getState();
          ????????if?(c?==?0)?{
          ????????????if?(compareAndSetState(0,?acquires))?{
          ????????????????setExclusiveOwnerThread(current);
          ????????????????return?true;
          ????????????}
          ????????}
          ????????else?if?(current?==?getExclusiveOwnerThread())?{
          ????????????int?nextc?=?c?+?acquires;
          ????????????if?(nextc?????????????????throw?new?Error("Maximum?lock?count?exceeded");
          ????????????setState(nextc);
          ????????????return?true;
          ????????}
          ????????return?false;
          ????}

          state=0表示還沒有被線程持有鎖,直接通過CAS修改,能修改成功的就獲取到鎖,修改失敗的線程先判斷exclusiveOwnerThread是不是當(dāng)前線程,是則state+1,表示重入次數(shù)+1并返回true,加鎖成功,否則則返回false表示嘗試加鎖失敗并調(diào)用acquireQueued入隊(duì)。

          ????protected?final?boolean?tryAcquire(int?acquires)?{
          ????????final?Thread?current?=?Thread.currentThread();
          ????????int?c?=?getState();
          ????????if?(c?==?0)?{
          ????????????if?(!hasQueuedPredecessors()?&&
          ????????????????compareAndSetState(0,?acquires))?{
          ????????????????setExclusiveOwnerThread(current);
          ????????????????return?true;
          ????????????}
          ????????}
          ????????else?if?(current?==?getExclusiveOwnerThread())?{
          ????????????int?nextc?=?c?+?acquires;
          ????????????if?(nextc?????????????????throw?new?Error("Maximum?lock?count?exceeded");
          ????????????setState(nextc);
          ????????????return?true;
          ????????}
          ????????return?false;
          ????}

          ????public?final?boolean?hasQueuedPredecessors()?{
          ????????Node?t?=?tail;?//?Read?fields?in?reverse?initialization?order
          ????????Node?h?=?head;
          ????????Node?s;
          ????????//?首尾不相等且頭結(jié)點(diǎn)線程不是當(dāng)前線程則表示需要進(jìn)入隊(duì)列
          ????????return?h?!=?t?&&
          ????????????((s?=?h.next)?==?null?||?s.thread?!=?Thread.currentThread());
          ????}

          上面就是公平鎖的嘗試獲取鎖的代碼,可以看到基本和非公平鎖的代碼是一樣的,區(qū)別在于首次加鎖需要判斷是否已經(jīng)有隊(duì)列存在,沒有才去加鎖,有則直接返回false。

          addWaiter

          接著來看addWaiter方法,當(dāng)嘗試加鎖失敗時,首先就會調(diào)用該方法創(chuàng)建一個Node節(jié)點(diǎn)并添加到隊(duì)列中去。

          ????private?Node?addWaiter(Node?mode)?{
          ????????Node?node?=?new?Node(Thread.currentThread(),?mode);
          ????????Node?pred?=?tail;
          ????????//?尾節(jié)點(diǎn)不為null表示已經(jīng)存在隊(duì)列,直接將當(dāng)前線程作為尾節(jié)點(diǎn)
          ????????if?(pred?!=?null)?{
          ????????????node.prev?=?pred;
          ????????????if?(compareAndSetTail(pred,?node))?{
          ????????????????pred.next?=?node;
          ????????????????return?node;
          ????????????}
          ????????}
          ????????//?尾結(jié)點(diǎn)不存在則表示還沒有初始化隊(duì)列,需要初始化隊(duì)列
          ????????enq(node);
          ????????return?node;
          ????}

          ????private?Node?enq(final?Node?node)?{
          ??//?自旋
          ????????for?(;;)?{
          ????????????Node?t?=?tail;
          ????????????if?(t?==?null)?{?//?只會有一個線程設(shè)置頭節(jié)點(diǎn)成功?
          ????????????????if?(compareAndSetHead(new?Node()))
          ????????????????????tail?=?head;
          ????????????}?else?{?//?其它設(shè)置頭節(jié)點(diǎn)失敗的都會自旋設(shè)置尾節(jié)點(diǎn)
          ????????????????node.prev?=?t;
          ????????????????if?(compareAndSetTail(t,?node))?{
          ????????????????????t.next?=?node;
          ????????????????????return?t;
          ????????????????}
          ????????????}
          ????????}
          ????}

          這里首先傳入了一個獨(dú)占模式的空節(jié)點(diǎn),并根據(jù)該節(jié)點(diǎn)和當(dāng)前線程創(chuàng)建了一個Node,然后判斷是否已經(jīng)存在隊(duì)列,若存在則直接入隊(duì),否則調(diào)用enq方法初始化隊(duì)列,提高效率。
          此處還有一個非常細(xì)節(jié)的地方,為什么設(shè)置尾節(jié)點(diǎn)時都要先將之前的尾節(jié)點(diǎn)設(shè)置為node.pre的值呢,而不是在CAS之后再設(shè)置?比如像下面這樣:

          if?(compareAndSetTail(pred,?node))?{
          ?node.prev?=?pred;
          ????pred.next?=?node;
          ????return?node;
          }

          因?yàn)槿绻@樣做的話,在CAS設(shè)置完tail后會存在一瞬間的tail.pre=null的情況,而Doug Lea正是考慮到這種情況,不論何時獲取tail.pre都不會為null。

          acquireQueued

          接著看acquireQueued方法:

          ????final?boolean?acquireQueued(final?Node?node,?int?arg)?{
          ?????//?為true表示存在需要取消加鎖的節(jié)點(diǎn),僅從這段代碼可以看出,
          ?????//?除非發(fā)生異常,否則不會存在需要取消加鎖的節(jié)點(diǎn)。
          ????????boolean?failed?=?true;
          ????????try?{
          ?????????//?打斷標(biāo)記,因?yàn)檎{(diào)用的是lock方法,所以是不可打斷的
          ?????????//?(但實(shí)際上是打斷了的,只不過這里采用了一種**靜默**處理方式,稍后分析)
          ????????????boolean?interrupted?=?false;
          ????????????for?(;;)?{
          ????????????????final?Node?p?=?node.predecessor();
          ????????????????if?(p?==?head?&&?tryAcquire(arg))?{
          ????????????????????setHead(node);
          ????????????????????p.next?=?null;?//?help?GC
          ????????????????????failed?=?false;
          ????????????????????return?interrupted;
          ????????????????}
          ????????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&
          ????????????????????parkAndCheckInterrupt())
          ????????????????????interrupted?=?true;
          ????????????}
          ????????}?finally?{
          ????????????if?(failed)
          ????????????????cancelAcquire(node);
          ????????}
          ????}

          ????private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{
          ????????int?ws?=?pred.waitStatus;
          ????????if?(ws?==?Node.SIGNAL)
          ????????????return?true;
          ????????????
          ????????if?(ws?>?0)?{
          ????????????do?{
          ????????????????node.prev?=?pred?=?pred.prev;
          ????????????}?while?(pred.waitStatus?>?0);
          ????????????pred.next?=?node;
          ????????}?else?{
          ????????????compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);
          ????????}
          ????????return?false;
          ????}

          ????private?final?boolean?parkAndCheckInterrupt()?{
          ????????LockSupport.park(this);
          ????????return?Thread.interrupted();
          ????}

          這里就是隊(duì)列中線程加鎖/睡眠的核心邏輯,首先判斷剛剛調(diào)用addWaiter方法添加到隊(duì)列的節(jié)點(diǎn)是否是頭節(jié)點(diǎn),如果是則再次嘗試加鎖,這個剛剛分析過了,非公平鎖在這里就會再次搶一次鎖,搶鎖成功則設(shè)置為head節(jié)點(diǎn)并返回打斷標(biāo)記;否則則和公平鎖一樣調(diào)用shouldParkAfterFailedAcquire判斷是否應(yīng)該調(diào)用park方法進(jìn)入睡眠。

          park細(xì)節(jié)

          為什么在park前需要這么一個判斷呢?因?yàn)楫?dāng)前節(jié)點(diǎn)的線程進(jìn)入park后只能被前一個節(jié)點(diǎn)喚醒,那前一個節(jié)點(diǎn)怎么知道有沒有后繼節(jié)點(diǎn)需要喚醒呢?因此當(dāng)前節(jié)點(diǎn)在park前需要給前一個節(jié)點(diǎn)設(shè)置一個標(biāo)識,即將waitStatus設(shè)置為Node.SIGNAL(-1),然后自旋一次再走一遍剛剛的流程,若還是沒有獲取到鎖,則調(diào)用parkAndCheckInterrupt進(jìn)入睡眠狀態(tài)。

          打斷

          讀者可能會比較好奇Thread.interrupted這個方法是做什么用的。

          ????public?static?boolean?interrupted()?{
          ????????return?currentThread().isInterrupted(true);
          ????}

          這個是用來判斷當(dāng)前線程是否被打斷過,并清除打斷標(biāo)記(若是被打斷過則會返回true,并將打斷標(biāo)記設(shè)置為false),所以調(diào)用lock方法時,通過interrupt也是會打斷睡眠的線程的,只是Doug Lea做了一個假象,讓用戶無感知;但有些場景又需要知道該線程是否被打斷過,所以acquireQueued最終會返回interrupted打斷標(biāo)記,如果是被打斷過,則返回的true,并在acquire方法中調(diào)用selfInterrupt再次打斷當(dāng)前線程(將打斷標(biāo)記設(shè)置為true)。
          這里我們對比看看lockInterruptibly的實(shí)現(xiàn):

          ????public?void?lockInterruptibly()?throws?InterruptedException?{
          ????????sync.acquireInterruptibly(1);
          ????}

          ????public?final?void?acquireInterruptibly(int?arg)
          ????????????throws?InterruptedException?{
          ????????if?(Thread.interrupted())
          ????????????throw?new?InterruptedException();
          ????????if?(!tryAcquire(arg))
          ????????????doAcquireInterruptibly(arg);
          ????}

          ????private?void?doAcquireInterruptibly(int?arg)
          ????????throws?InterruptedException?{
          ????????final?Node?node?=?addWaiter(Node.EXCLUSIVE);
          ????????boolean?failed?=?true;
          ????????try?{
          ????????????for?(;;)?{
          ????????????????final?Node?p?=?node.predecessor();
          ????????????????if?(p?==?head?&&?tryAcquire(arg))?{
          ????????????????????setHead(node);
          ????????????????????p.next?=?null;?//?help?GC
          ????????????????????failed?=?false;
          ????????????????????return;
          ????????????????}
          ????????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&
          ????????????????????parkAndCheckInterrupt())
          ????????????????????throw?new?InterruptedException();
          ????????????}
          ????????}?finally?{
          ????????????if?(failed)
          ????????????????cancelAcquire(node);
          ????????}
          ????}

          可以看到區(qū)別就在于使用lockInterruptibly加鎖被打斷后,是直接拋出InterruptedException異常,我們可以捕獲這個異常進(jìn)行相應(yīng)的處理。

          取消

          最后來看看cancelAcquire是如何取消加鎖的,該情況比較特殊,簡單了解下即可:

          ????private?void?cancelAcquire(Node?node)?{
          ????????if?(node?==?null)
          ????????????return;

          ??//?首先將線程置空
          ????????node.thread?=?null;

          ??//?waitStatus?>?0表示節(jié)點(diǎn)處于取消狀態(tài),則直接將當(dāng)前節(jié)點(diǎn)的pre指向在此之前的最后一個有效節(jié)點(diǎn)
          ????????Node?pred?=?node.prev;
          ????????while?(pred.waitStatus?>?0)
          ????????????node.prev?=?pred?=?pred.prev;
          ??
          ??//?保存前一個節(jié)點(diǎn)的下一個節(jié)點(diǎn),如果在此之前存在取消節(jié)點(diǎn),這里就是之前取消被取消節(jié)點(diǎn)的頭節(jié)點(diǎn)
          ????????Node?predNext?=?pred.next;
          ????????
          ????????node.waitStatus?=?Node.CANCELLED;

          ??//?當(dāng)前節(jié)點(diǎn)是tail節(jié)點(diǎn),則替換尾節(jié)點(diǎn),替換成功則將新的尾結(jié)點(diǎn)的下一個節(jié)點(diǎn)設(shè)置為null;
          ??//?否則需要判斷是將當(dāng)前節(jié)點(diǎn)的下一個節(jié)點(diǎn)賦值給最后一個有效節(jié)點(diǎn),還是喚醒下一個節(jié)點(diǎn)。
          ????????if?(node?==?tail?&&?compareAndSetTail(node,?pred))?{
          ????????????compareAndSetNext(pred,?predNext,?null);
          ????????}?else?{
          ????????????int?ws;
          ????????????if?(pred?!=?head?&&
          ????????????????((ws?=?pred.waitStatus)?==?Node.SIGNAL?||
          ?????????????????(ws?<=?0?&&?compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL)))?&&
          ????????????????pred.thread?!=?null)?{
          ????????????????Node?next?=?node.next;
          ????????????????if?(next?!=?null?&&?next.waitStatus?<=?0)
          ????????????????????compareAndSetNext(pred,?predNext,?next);
          ????????????}?else?{
          ????????????????unparkSuccessor(node);
          ????????????}

          ????????????node.next?=?node;?//?help?GC
          ????????}
          ????}

          解鎖

          ????public?void?unlock()?{
          ????????sync.release(1);
          ????}

          ????public?final?boolean?release(int?arg)?{
          ????????if?(tryRelease(arg))?{
          ????????????Node?h?=?head;
          ????????????if?(h?!=?null?&&?h.waitStatus?!=?0)
          ????????????????unparkSuccessor(h);
          ????????????return?true;
          ????????}
          ????????return?false;
          ????}

          ????protected?final?boolean?tryRelease(int?releases)?{
          ????????int?c?=?getState()?-?releases;
          ????????if?(Thread.currentThread()?!=?getExclusiveOwnerThread())
          ????????????throw?new?IllegalMonitorStateException();
          ????????boolean?free?=?false;
          ????????if?(c?==?0)?{
          ????????????free?=?true;
          ????????????setExclusiveOwnerThread(null);
          ????????}
          ????????setState(c);
          ????????return?free;
          ????}

          ????private?void?unparkSuccessor(Node?node)?{
          ????????int?ws?=?node.waitStatus;
          ????????if?(ws?????????????compareAndSetWaitStatus(node,?ws,?0);

          ????????Node?s?=?node.next;
          ????????//?并發(fā)情況下,可能已經(jīng)被其它線程喚醒或已經(jīng)取消,則從后向前找到最后一個有效節(jié)點(diǎn)并喚醒
          ????????if?(s?==?null?||?s.waitStatus?>?0)?{
          ????????????s?=?null;
          ????????????for?(Node?t?=?tail;?t?!=?null?&&?t?!=?node;?t?=?t.prev)
          ????????????????if?(t.waitStatus?<=?0)
          ????????????????????s?=?t;
          ????????}
          ????????if?(s?!=?null)
          ????????????LockSupport.unpark(s.thread);
          ????}

          解鎖就比較簡單了,先調(diào)用tryReleasestate執(zhí)行減一操作,如果state==0,則表示完全釋放鎖;若果存在后繼節(jié)點(diǎn),則調(diào)用unparkSuccessor喚醒后繼節(jié)點(diǎn),喚醒后的節(jié)點(diǎn)的waitStatus會重新被設(shè)置為0.
          只是這里有一個小細(xì)節(jié),為什么是從后向前找呢?因?yàn)槲覀冊陂_始說過,設(shè)置尾節(jié)點(diǎn)保證了node.pre不會為null,但pre.next仍有可能是null,所以這里只能從后向前找到最后一個有效節(jié)點(diǎn)。

          小結(jié)


          上面是ReentrantLock的加鎖流程,可以看到整個流程不算復(fù)雜,只是判斷和跳轉(zhuǎn)比較多,主要是Doug Lea將代碼和性能都優(yōu)化到了極致,代碼非常精簡,但細(xì)節(jié)卻非常多。另外通過上面的分析,我們也可以發(fā)現(xiàn),公平鎖和非公平鎖的區(qū)別就在于非公平鎖不管是否有線程在排隊(duì),先搶三次鎖,而公平鎖則會判斷是否存在隊(duì)列,有線程在排隊(duì)則直接進(jìn)入隊(duì)列排隊(duì);另外線程在park被喚醒后非公平鎖還會搶鎖,公平鎖仍然需要排隊(duì),所以非公平鎖的性能比公平鎖高很多,大部分情況下我們使用非公平鎖即可。

          ReentrantReadWriteLock

          ReentrantLock是一把獨(dú)占鎖,只支持重入,不支持共享,所以JUC包下還提供了讀寫鎖,這把鎖支持讀讀并發(fā),但讀寫、寫寫都是互斥的。
          讀寫鎖也是基于AQS實(shí)現(xiàn)的,也包含了一個繼承自AQS的內(nèi)部類Sync,同樣也有公平和非公平兩種模式,下面主要討論非公平模式下的讀寫鎖實(shí)現(xiàn)。
          讀寫鎖實(shí)現(xiàn)相對比較復(fù)雜,在ReentrantLock中就是使用的int型的state屬性來表示鎖被某個線程占有和重入次數(shù),而ReentrantReadWriteLock分為了讀和寫兩種鎖,要怎么用一個字段表示兩種鎖的狀態(tài)呢?Doug Lea大師將state字段分為了高二字節(jié)和低二字節(jié),即高16位用來表示讀鎖狀態(tài),低16位則用來表示寫鎖,如下圖:

          因?yàn)樽x寫鎖狀態(tài)都只用了兩個字節(jié),所以可重入的次數(shù)最多是65535,當(dāng)然正常情況下重入是不可能達(dá)到這么多的。
          那它是怎么實(shí)現(xiàn)的呢?還是先從構(gòu)造方法開始:

          ????public?ReentrantReadWriteLock()?{
          ????????this(false);
          ????}

          ????public?ReentrantReadWriteLock(boolean?fair)?{
          ????????sync?=?fair???new?FairSync()?:?new?NonfairSync();
          ????????readerLock?=?new?ReadLock(this);
          ????????writerLock?=?new?WriteLock(this);
          ????}

          同樣默認(rèn)就是非公平鎖,同時還創(chuàng)建了readerLockwriterLock兩個對象,我們只需要像下面這樣就能獲取到讀寫鎖:

          ????private?static?ReentrantReadWriteLock?lock?=?new?ReentrantReadWriteLock();
          ????private?static?Lock?r?=?lock.readLock();
          ????private?static?Lock?w?=?lock.writeLock();

          寫鎖

          由于寫鎖的加鎖過程相對更簡單,下面先從寫鎖加鎖開始分析,入口在ReentrantReadWriteLock#WriteLock.lock()方法,點(diǎn)進(jìn)去看,發(fā)現(xiàn)還是使用的AQS中的acquire方法:

          ????public?final?void?acquire(int?arg)?{
          ????????if?(!tryAcquire(arg)?&&
          ????????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
          ????????????selfInterrupt();
          ????}

          所以不同的地方也只有tryAcquire方法,我們重點(diǎn)分析這個方法就行:

          ?static?final?int?SHARED_SHIFT???=?16;
          ?//?65535
          ?static?final?int?MAX_COUNT??????=?(1?<?//?低16位是1111....1111
          ?static?final?int?EXCLUSIVE_MASK?=?(1?<?//?得到c低16位的值
          ?static?int?exclusiveCount(int?c)?{?return?c?&?EXCLUSIVE_MASK;?}

          ????protected?final?boolean?tryAcquire(int?acquires)?{
          ????????Thread?current?=?Thread.currentThread();
          ????????int?c?=?getState();
          ????????//?獲取寫鎖加鎖和重入的次數(shù)
          ????????int?w?=?exclusiveCount(c);
          ????????if?(c?!=?0)?{?//?已經(jīng)有線程持有鎖
          ?????????//?這里有兩種情況:1.?c!=0?&&?w==0表示有線程獲取了讀鎖,不論是否是當(dāng)前線程,直接返回false
          ?????????//?也就是說讀-寫鎖是不支持升級重入的(但支持寫-讀降級),原因后文會詳細(xì)分析;
          ?????????//?2.?c!=0?&&?w!=0?&&?current?!=?getExclusiveOwnerThread()表示有其它線程持有了寫鎖,寫寫互斥
          ????????????if?(w?==?0?||?current?!=?getExclusiveOwnerThread())
          ????????????????return?false;

          ???//?超出65535,拋異常
          ????????????if?(w?+?exclusiveCount(acquires)?>?MAX_COUNT)
          ????????????????throw?new?Error("Maximum?lock?count?exceeded");
          ????????????//?否則寫鎖的次數(shù)直接加1
          ????????????setState(c?+?acquires);
          ????????????return?true;
          ????????}

          ??//?c==0才會走到這,但這時存在兩種情況,有隊(duì)列和無隊(duì)列,所以公平鎖和非公平鎖處理不同,
          ??//?前者需要判斷是否存在隊(duì)列,有則嘗試加鎖失敗,無則加鎖成功,而非公平鎖直接使用CAS加鎖即可
          ????????if?(writerShouldBlock()?||
          ????????????!compareAndSetState(c,?c?+?acquires))
          ????????????return?false;
          ????????setExclusiveOwnerThread(current);
          ????????return?true;
          ????}

          寫鎖嘗試加鎖的過程就分析完了,其余的部分上文已經(jīng)講過,這里不再贅述。

          讀鎖

          ????public?void?lock()?{
          ????????sync.acquireShared(1);
          ????}

          ????public?final?void?acquireShared(int?arg)?{
          ????????if?(tryAcquireShared(arg)?????????????doAcquireShared(arg);
          ????}

          讀鎖在加鎖開始就和其它鎖不同,調(diào)用的是acquireShared方法,意為獲取共享鎖。

          ?static?final?int?SHARED_UNIT????=?(1?<?//?右移16位得到讀鎖狀態(tài)的值
          ?static?int?sharedCount(int?c)????{?return?c?>>>?SHARED_SHIFT;?}
          ?
          ????protected?final?int?tryAcquireShared(int?unused)?{
          ?????????Thread?current?=?Thread.currentThread();
          ?????????int?c?=?getState();
          ?????????//?為什么讀寫互斥?因?yàn)樽x鎖一上來就判斷了是否有其它線程持有了寫鎖(當(dāng)前線程持有寫鎖再獲取讀鎖是可以的)
          ?????????if?(exclusiveCount(c)?!=?0?&&
          ?????????????getExclusiveOwnerThread()?!=?current)
          ?????????????return?-1;
          ?????????int?r?=?sharedCount(c);
          ?????????//?公平鎖判斷是否存在隊(duì)列,非公平鎖判斷第一個節(jié)點(diǎn)是不是EXCLUSIVE模式,是的話會返回true
          ?????????//?返回false則需要判斷讀鎖加鎖次數(shù)是否超過65535,沒有則使用CAS給讀鎖+1
          ?????????if?(!readerShouldBlock()?&&
          ?????????????r??????????????compareAndSetState(c,?c?+?SHARED_UNIT))?{
          ?????????????if?(r?==?0)?{
          ??????????????//?第一個讀鎖線程就是當(dāng)前線程
          ?????????????????firstReader?=?current;
          ?????????????????firstReaderHoldCount?=?1;
          ?????????????}?else?if?(firstReader?==?current)?{
          ??????????????//?記錄讀鎖的重入
          ?????????????????firstReaderHoldCount++;
          ?????????????}?else?{
          ??????????????//?獲取最后一次加讀鎖的重入次數(shù)記錄器HoldCounter
          ?????????????????HoldCounter?rh?=?cachedHoldCounter;
          ?????????????????if?(rh?==?null?||?rh.tid?!=?getThreadId(current))
          ??????????????????//?當(dāng)前線程第一次重入需要初始化,以及當(dāng)前線程和緩存的最后一次記錄器的線程id不同,需要從ThreadLocalHoldCounter拿到對應(yīng)的記錄器
          ?????????????????????cachedHoldCounter?=?rh?=?readHolds.get();
          ?????????????????else?if?(rh.count?==?0)
          ??????????????????//?緩存到ThreadLocal
          ?????????????????????readHolds.set(rh);
          ?????????????????rh.count++;
          ?????????????}
          ?????????????return?1;
          ?????????}
          ?????????return?fullTryAcquireShared(current);
          ?????}

          這段代碼有點(diǎn)復(fù)雜,首先需要保證讀寫互斥,然后進(jìn)行初次加鎖,若加鎖失敗就會調(diào)用fullTryAcquireShared方法進(jìn)行兜底處理。在初次加鎖中與寫鎖不同的是,寫鎖的state可以直接用來記錄寫鎖的重入次數(shù),因?yàn)閷憣懟コ猓x鎖是共享的,state用來記錄讀鎖的加鎖次數(shù)了,重入次數(shù)該怎么記錄呢?重入是指同一線程,那么是不是可以使用ThreadLocl來保存呢?沒錯,Doug Lea就是這么處理的,新增了一個HoldCounter類,這個類只有線程id和重入次數(shù)兩個字段,當(dāng)線程重入的時候就會初始化這個類并保存在ThreadLocalHoldCounter類中,這個類就是繼承ThreadLocl的,用來初始化HoldCounter對象并保存。
          這里還有個小細(xì)節(jié),為什么要使用cachedHoldCounter緩存最后一次加讀鎖的HoldCounter?因?yàn)榇蟛糠智闆r下,重入和釋放鎖的線程很有可能就是最后一次加鎖的線程,所以這樣做能夠提高加解鎖的效率,Doug Lea真是把性能優(yōu)化到了極致。
          上面只是初次加鎖,有可能會加鎖失敗,就會進(jìn)入到fullTryAcquireShared方法:

          ????final?int?fullTryAcquireShared(Thread?current)?{
          ????????HoldCounter?rh?=?null;
          ????????for?(;;)?{
          ????????????int?c?=?getState();
          ????????????if?(exclusiveCount(c)?!=?0)?{
          ????????????????if?(getExclusiveOwnerThread()?!=?current)
          ????????????????????return?-1;
          ????????????}?else?if?(readerShouldBlock())?{
          ????????????????if?(firstReader?==?current)?{
          ????????????????????//?assert?firstReaderHoldCount?>?0;
          ????????????????}?else?{
          ????????????????????if?(rh?==?null)?{
          ????????????????????????rh?=?cachedHoldCounter;
          ????????????????????????if?(rh?==?null?||?rh.tid?!=?getThreadId(current))?{
          ????????????????????????????rh?=?readHolds.get();
          ????????????????????????????if?(rh.count?==?0)
          ????????????????????????????????readHolds.remove();
          ????????????????????????}
          ????????????????????}
          ????????????????????if?(rh.count?==?0)
          ????????????????????????return?-1;
          ????????????????}
          ????????????}
          ????????????if?(sharedCount(c)?==?MAX_COUNT)
          ????????????????throw?new?Error("Maximum?lock?count?exceeded");
          ????????????if?(compareAndSetState(c,?c?+?SHARED_UNIT))?{
          ????????????????if?(sharedCount(c)?==?0)?{
          ????????????????????firstReader?=?current;
          ????????????????????firstReaderHoldCount?=?1;
          ????????????????}?else?if?(firstReader?==?current)?{
          ????????????????????firstReaderHoldCount++;
          ????????????????}?else?{
          ????????????????????if?(rh?==?null)
          ????????????????????????rh?=?cachedHoldCounter;
          ????????????????????if?(rh?==?null?||?rh.tid?!=?getThreadId(current))
          ????????????????????????rh?=?readHolds.get();
          ????????????????????else?if?(rh.count?==?0)
          ????????????????????????readHolds.set(rh);
          ????????????????????rh.count++;
          ????????????????????cachedHoldCounter?=?rh;?//?cache?for?release
          ????????????????}
          ????????????????return?1;
          ????????????}
          ????????}
          ????}

          這個方法中代碼和tryAcquireShared基本上一致,只是采用了自旋的方式,處理初次加鎖中的漏網(wǎng)之魚,讀者們可自行閱讀分析。
          上面兩個方法若返回大于0則表示加鎖成功,小于0則會調(diào)用doAcquireShared方法,這個就和之前分析的acquireQueued差不多了:

          ????private?void?doAcquireShared(int?arg)?{
          ?????//?先添加一個SHARED類型的節(jié)點(diǎn)到隊(duì)列
          ????????final?Node?node?=?addWaiter(Node.SHARED);
          ????????boolean?failed?=?true;
          ????????try?{
          ????????????boolean?interrupted?=?false;
          ????????????for?(;;)?{
          ????????????????final?Node?p?=?node.predecessor();
          ????????????????if?(p?==?head)?{
          ?????????????????//?再次嘗試加讀鎖
          ????????????????????int?r?=?tryAcquireShared(arg);
          ????????????????????if?(r?>=?0)?{
          ?????????????????????//?設(shè)置head節(jié)點(diǎn)以及傳播喚醒后面的讀線程
          ????????????????????????setHeadAndPropagate(node,?r);
          ????????????????????????p.next?=?null;?//?help?GC
          ????????????????????????if?(interrupted)
          ????????????????????????????selfInterrupt();
          ????????????????????????failed?=?false;
          ????????????????????????return;
          ????????????????????}
          ????????????????}
          ????????????????//?只有前一個節(jié)點(diǎn)的waitStatus=-1時才會park,=0或者-3(先不考慮-2和1的情況)都會設(shè)置為-1后再次自旋嘗試加鎖,若還是加鎖失敗就會park
          ????????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&
          ????????????????????parkAndCheckInterrupt())
          ????????????????????interrupted?=?true;
          ????????????}
          ????????}?finally?{
          ????????????if?(failed)
          ????????????????cancelAcquire(node);
          ????????}
          ????}

          ????private?void?setHeadAndPropagate(Node?node,?int?propagate)?{
          ?????//?設(shè)置頭節(jié)點(diǎn)
          ????????Node?h?=?head;?//?Record?old?head?for?check?below
          ????????setHead(node);
          ????????
          ????????//?propagate是tryAcquireShared的返回值,當(dāng)前線程加鎖成功還要去喚醒后繼的共享節(jié)點(diǎn)
          ????????//?(其余的判斷比較復(fù)雜,筆者也還未想明白,知道的讀者可以指點(diǎn)一下)
          ????????if?(propagate?>?0?||?h?==?null?||?h.waitStatus?????????????(h?=?head)?==?null?||?h.waitStatus?????????????Node?s?=?node.next;
          ????????????//?判斷后繼節(jié)點(diǎn)是否是共享節(jié)點(diǎn)
          ????????????if?(s?==?null?||?s.isShared())
          ????????????????doReleaseShared();
          ????????}
          ????}

          ????private?void?doReleaseShared()?{
          ????????for?(;;)?{
          ????????????Node?h?=?head;
          ????????????//?存在后繼節(jié)點(diǎn)
          ????????????if?(h?!=?null?&&?h?!=?tail)?{
          ????????????????int?ws?=?h.waitStatus;
          ????????????????if?(ws?==?Node.SIGNAL)?{
          ?????????????????//?當(dāng)前一個節(jié)點(diǎn)加鎖成功后自然需要將-1改回0,并喚醒后繼線程,同時自旋將0改為-2讓喚醒傳播下去
          ????????????????????if?(!compareAndSetWaitStatus(h,?Node.SIGNAL,?0))
          ????????????????????????continue;????????
          ????????????????????unparkSuccessor(h);
          ????????????????}
          ????????????????//?設(shè)置頭節(jié)點(diǎn)的waitStatus=-2,使得喚醒可以傳播下去
          ????????????????else?if?(ws?==?0?&&
          ?????????????????????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))
          ????????????????????continue;?????????????
          ????????????}
          ????????????if?(h?==?head)??????????
          ????????????????break;
          ????????}
          ????}

          ????private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{
          ????????int?ws?=?pred.waitStatus;
          ????????if?(ws?==?Node.SIGNAL)
          ????????????return?true;
          ????????if?(ws?>?0)?{
          ????????????do?{
          ????????????????node.prev?=?pred?=?pred.prev;
          ????????????}?while?(pred.waitStatus?>?0);
          ????????????pred.next?=?node;
          ????????}?else?{
          ????????????compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);
          ????????}
          ????????return?false;
          ????}

          這里的邏輯也非常的繞,當(dāng)多個線程同時調(diào)用addWaiter添加到隊(duì)列中后,并且假設(shè)這些節(jié)點(diǎn)的第一個節(jié)點(diǎn)的前一個節(jié)點(diǎn)就是head節(jié)點(diǎn),那么第一個節(jié)點(diǎn)就能加鎖成功(假設(shè)都是SHARED節(jié)點(diǎn)),其余的節(jié)點(diǎn)在第一個節(jié)點(diǎn)設(shè)置頭節(jié)點(diǎn)之前都會進(jìn)入shouldParkAfterFailedAcquire方法,這時候waitStatus都等于0,所以繼續(xù)自旋不會park,若再次加鎖還失敗就會park(因?yàn)檫@時候waitStatus=-1),但都是讀線程的情況下一般都不會出現(xiàn),因?yàn)?strong>setHeadAndPropagate第一步就是修改head,所以其余SHARED節(jié)點(diǎn)最終都能加鎖成功并一直將喚醒傳播下去。
          以上就是讀寫鎖加鎖過程,解鎖比較簡單,這里就不詳細(xì)分析了。

          小結(jié)

          讀寫鎖將state分為了高二字節(jié)和低二字節(jié),分別存儲讀鎖和寫鎖的狀態(tài),實(shí)現(xiàn)更為的復(fù)雜,在使用上還有幾點(diǎn)需要注意:

          • 讀讀共享,但是在讀中間穿插了寫的話,后面的讀都會被阻塞,直到前面的寫釋放鎖后,后面的讀才會共享,相關(guān)原理看完前文不難理解。

          • 讀寫鎖只支持降級重入,不支持升級重入。因?yàn)槿绻С稚壷厝氲脑挘菚霈F(xiàn)死鎖的。如下面這段代碼:

          ????private?static?void?rw()?{
          ????????r.lock();
          ????????try?{
          ????????????log.info("獲取到讀鎖");
          ????????????w.lock();
          ????????????try?{
          ????????????????log.info("獲取到寫鎖");
          ????????????}?finally?{
          ????????????????w.unlock();
          ????????????}
          ????????}?finally?{
          ????????????r.unlock();
          ????????}
          ????}

          多個線程訪問都能獲取到讀鎖,但讀寫互斥,彼此都要等待對方的讀鎖釋放才能獲取到寫鎖,這就造成了死鎖。
          ReentrantReadWriteLock在某些場景下性能上不算高,因此Doug Lea在JDK1.8的時候又提供了一把高性能的讀寫鎖StampedLock,前者讀寫鎖都是悲觀鎖,而后者提供了新的模式——樂觀鎖,但它不是基于AQS實(shí)現(xiàn)的,本文不進(jìn)行分析。

          Condition

          Lock接口中還有一個方法newCondition,這個方法就是創(chuàng)建一個條件隊(duì)列:

          ????public?Condition?newCondition()?{
          ????????return?sync.newCondition();
          ????}

          ????final?ConditionObject?newCondition()?{
          ????????return?new?ConditionObject();
          ????}

          所謂條件隊(duì)列就是創(chuàng)建一個新的ConditionObject對象,這個對象的數(shù)據(jù)結(jié)構(gòu)在開篇就看過了,包含兩個節(jié)點(diǎn)字段,每當(dāng)調(diào)用Condition#await方法時就會在對應(yīng)的Condition對象中排隊(duì)等待:

          ????public?final?void?await()?throws?InterruptedException?{
          ????????if?(Thread.interrupted())
          ????????????throw?new?InterruptedException();
          ????????//?加入條件隊(duì)列
          ????????Node?node?=?addConditionWaiter();
          ????????//?因?yàn)镃ondition.await必須配合Lock.lock使用,所以await時就是將已獲得鎖的線程全部釋放掉
          ????????int?savedState?=?fullyRelease(node);
          ????????int?interruptMode?=?0;
          ????????//?判斷是在同步隊(duì)列還是條件隊(duì)列,后者則直接park
          ????????while?(!isOnSyncQueue(node))?{
          ????????????LockSupport.park(this);
          ????????????//?獲取打斷處理方式(拋出異常或重設(shè)標(biāo)記)
          ????????????if?((interruptMode?=?checkInterruptWhileWaiting(node))?!=?0)
          ????????????????break;
          ????????}
          ????????//?調(diào)用aqs的方法
          ????????if?(acquireQueued(node,?savedState)?&&?interruptMode?!=?THROW_IE)
          ????????????interruptMode?=?REINTERRUPT;
          ????????if?(node.nextWaiter?!=?null)?//?clean?up?if?cancelled
          ?????????//?清除掉已經(jīng)進(jìn)入同步隊(duì)列的節(jié)點(diǎn)
          ????????????unlinkCancelledWaiters();
          ????????if?(interruptMode?!=?0)
          ????????????reportInterruptAfterWait(interruptMode);
          ????}

          ????private?Node?addConditionWaiter()?{
          ????????Node?t?=?lastWaiter;
          ????????//?清除狀態(tài)為取消的節(jié)點(diǎn)
          ????????if?(t?!=?null?&&?t.waitStatus?!=?Node.CONDITION)?{
          ????????????unlinkCancelledWaiters();
          ????????????t?=?lastWaiter;
          ????????}

          ??//?創(chuàng)建一個CONDITION狀態(tài)的節(jié)點(diǎn)并添加到隊(duì)列末尾
          ????????Node?node?=?new?Node(Thread.currentThread(),?Node.CONDITION);
          ????????if?(t?==?null)
          ????????????firstWaiter?=?node;
          ????????else
          ????????????t.nextWaiter?=?node;
          ????????lastWaiter?=?node;
          ????????return?node;
          ????}

          await方法實(shí)現(xiàn)比較簡單,大部分代碼都是上文分析過的,這里不再重復(fù)。接著來看signal方法:

          ????public?final?void?signal()?{
          ????????if?(!isHeldExclusively())
          ????????????throw?new?IllegalMonitorStateException();
          ????????//?從條件隊(duì)列第一個節(jié)點(diǎn)開始喚醒
          ????????Node?first?=?firstWaiter;
          ????????if?(first?!=?null)
          ????????????doSignal(first);
          ????}

          ????private?void?doSignal(Node?first)?{
          ????????do?{
          ????????????if?(?(firstWaiter?=?first.nextWaiter)?==?null)
          ????????????????lastWaiter?=?null;
          ????????????first.nextWaiter?=?null;
          ????????}?while?(!transferForSignal(first)?&&
          ?????????????????(first?=?firstWaiter)?!=?null);
          ????}

          ????final?boolean?transferForSignal(Node?node)?{
          ?????//?修改waitStatus狀態(tài),如果修改失敗,則說明該節(jié)點(diǎn)已經(jīng)從條件隊(duì)列轉(zhuǎn)移到了同步隊(duì)列
          ????????if?(!compareAndSetWaitStatus(node,?Node.CONDITION,?0))
          ????????????return?false;
          ??
          ??//?上面修改成功,則將該節(jié)點(diǎn)添加到同步隊(duì)列末尾,并返回之前的尾結(jié)點(diǎn)
          ????????Node?p?=?enq(node);
          ????????int?ws?=?p.waitStatus;
          ????????if?(ws?>?0?||?!compareAndSetWaitStatus(p,?ws,?Node.SIGNAL))
          ?????????//?unpark當(dāng)前線程,結(jié)合await方法看
          ????????????LockSupport.unpark(node.thread);
          ????????return?true;
          ????}

          signal的邏輯也比較簡單,就是喚醒條件隊(duì)列中的第一個節(jié)點(diǎn),主要是要結(jié)合await的代碼一起理解。

          其它組件

          上文分析的鎖都是用來實(shí)現(xiàn)并發(fā)安全控制的,而對于多線程協(xié)作JUC又基于AQS提供了CountDownLatch、CyclicBarrier、Semaphore等組件,下面一一分析。

          CountDownLatch

          CountDownLatch在創(chuàng)建的時候就需要指定一個計(jì)數(shù):

          CountDownLatch?countDownLatch?=?new?CountDownLatch(5);

          然后在需要等待的地方調(diào)用countDownLatch.await()方法,然后在其它線程完成任務(wù)后調(diào)用countDownLatch.countDown()方法,每調(diào)用一次該計(jì)數(shù)就會減一,直到計(jì)數(shù)為0時,await的地方就會自動喚醒,繼續(xù)后面的工作,所以CountDownLatch適用于一個線程等待多個線程的場景,那它是怎么實(shí)現(xiàn)的呢?讀者們可以結(jié)合上文自己先思考下。

          ????public?CountDownLatch(int?count)?{
          ????????if?(count?"count?);
          ????????this.sync?=?new?Sync(count);
          ????}

          ????Sync(int?count)?{
          ????????setState(count);
          ????}

          與前面講的鎖一樣,也有一個內(nèi)部類Sync繼承自AQS,并且在構(gòu)造時就將傳入的計(jì)數(shù)設(shè)置到了state屬性,看到這里不難猜到CountDownLatch的實(shí)現(xiàn)原理了。

          ????public?void?await()?throws?InterruptedException?{
          ????????sync.acquireSharedInterruptibly(1);
          ????}

          ????public?final?void?acquireSharedInterruptibly(int?arg)
          ????????????throws?InterruptedException?{
          ????????if?(Thread.interrupted())
          ????????????throw?new?InterruptedException();
          ????????if?(tryAcquireShared(arg)?????????????doAcquireSharedInterruptibly(arg);
          ????}

          ????protected?int?tryAcquireShared(int?acquires)?{
          ????????return?(getState()?==?0)???1?:?-1;
          ????}

          在await方法中使用的是可打斷的方式獲取的共享鎖,同樣除了tryAcquireShared方法,其余的都是復(fù)用的之前分析過的代碼,而tryAcquireShared就是判斷state是否等于0,不等于就阻塞。

          ????public?void?countDown()?{
          ????????sync.releaseShared(1);
          ????}

          ????public?final?boolean?releaseShared(int?arg)?{
          ????????if?(tryReleaseShared(arg))?{
          ????????????doReleaseShared();
          ????????????return?true;
          ????????}
          ????????return?false;
          ????}
          ????
          ????protected?boolean?tryReleaseShared(int?releases)?{
          ????????for?(;;)?{
          ????????????int?c?=?getState();
          ????????????if?(c?==?0)
          ????????????????return?false;
          ????????????int?nextc?=?c-1;
          ????????????if?(compareAndSetState(c,?nextc))
          ????????????????return?nextc?==?0;
          ????????}
          ????}

          而調(diào)用countDown就更簡單了,每次對state遞減,直到為0時才會調(diào)用doReleaseShared釋放阻塞的線程。
          最后需要注意的是CountDownLatch的計(jì)數(shù)是不支持重置的,每次使用都要新建一個。

          CyclicBarrier

          CyclicBarrier和CountDownLatch使用差不多,不過它只有await方法。CyclicBarrier在創(chuàng)建時同樣需要指定一個計(jì)數(shù),當(dāng)調(diào)用await的次數(shù)達(dá)到計(jì)數(shù)時,所有線程就會同時喚醒,相當(dāng)于設(shè)置了一個“起跑線”,需要等所有運(yùn)動員都到達(dá)這個“起跑線”后才能一起開跑。另外它還支持重置計(jì)數(shù),提供了reset方法。

          ????public?CyclicBarrier(int?parties)?{
          ????????this(parties,?null);
          ????}

          ????public?CyclicBarrier(int?parties,?Runnable?barrierAction)?{
          ????????if?(parties?<=?0)?throw?new?IllegalArgumentException();
          ????????this.parties?=?parties;
          ????????this.count?=?parties;
          ????????this.barrierCommand?=?barrierAction;
          ????}

          CyclicBarrier提供了兩個構(gòu)造方法,我們可以傳入一個Runnable類型的回調(diào)函數(shù),當(dāng)達(dá)到計(jì)數(shù)時,由最后一個調(diào)用await的線程觸發(fā)執(zhí)行。

          ????public?int?await()?throws?InterruptedException,?BrokenBarrierException?{
          ????????try?{
          ????????????return?dowait(false,?0L);
          ????????}?catch?(TimeoutException?toe)?{
          ????????????throw?new?Error(toe);?//?cannot?happen
          ????????}
          ????}

          ????private?int?dowait(boolean?timed,?long?nanos)
          ????????throws?InterruptedException,?BrokenBarrierException,
          ???????????????TimeoutException?{
          ????????final?ReentrantLock?lock?=?this.lock;
          ????????lock.lock();
          ????????try?{
          ????????????final?Generation?g?=?generation;

          ????????????if?(g.broken)
          ????????????????throw?new?BrokenBarrierException();

          ???//?是否打斷,打斷會喚醒所有條件隊(duì)列中的線程
          ????????????if?(Thread.interrupted())?{
          ????????????????breakBarrier();
          ????????????????throw?new?InterruptedException();
          ????????????}

          ???//?計(jì)數(shù)為0時,喚醒條件隊(duì)列中的所有線程
          ????????????int?index?=?--count;
          ????????????if?(index?==?0)?{??//?tripped
          ????????????????boolean?ranAction?=?false;
          ????????????????try?{
          ????????????????????final?Runnable?command?=?barrierCommand;
          ????????????????????if?(command?!=?null)
          ????????????????????????command.run();
          ????????????????????ranAction?=?true;
          ????????????????????nextGeneration();
          ????????????????????return?0;
          ????????????????}?finally?{
          ????????????????????if?(!ranAction)
          ????????????????????????breakBarrier();
          ????????????????}
          ????????????}

          ????????????for?(;;)?{
          ????????????????try?{
          ?????????????????//?不帶超時時間直接進(jìn)入條件隊(duì)列等待
          ????????????????????if?(!timed)
          ????????????????????????trip.await();
          ????????????????????else?if?(nanos?>?0L)
          ????????????????????????nanos?=?trip.awaitNanos(nanos);
          ????????????????}?catch?(InterruptedException?ie)?{
          ????????????????????if?(g?==?generation?&&?!?g.broken)?{
          ????????????????????????breakBarrier();
          ????????????????????????throw?ie;
          ????????????????????}?else?{
          ????????????????????????Thread.currentThread().interrupt();
          ????????????????????}
          ????????????????}

          ????????????????if?(g.broken)
          ????????????????????throw?new?BrokenBarrierException();

          ????????????????if?(g?!=?generation)
          ????????????????????return?index;

          ????????????????if?(timed?&&?nanos?<=?0L)?{
          ????????????????????breakBarrier();
          ????????????????????throw?new?TimeoutException();
          ????????????????}
          ????????????}
          ????????}?finally?{
          ????????????lock.unlock();
          ????????}
          ????}

          ????private?void?nextGeneration()?{
          ????????//?signal?completion?of?last?generation
          ????????trip.signalAll();
          ????????//?set?up?next?generation
          ????????count?=?parties;
          ????????generation?=?new?Generation();
          ????}

          這里邏輯比較清晰,就是使用了ReentrantLock以及Condition來實(shí)現(xiàn)。在構(gòu)造方法中我們可以看到保存了兩個變量count和parties,每次調(diào)用await都會對count變量遞減,count不為0時都會進(jìn)入到trip條件隊(duì)列中等待,否則就會通過signalAll方法喚醒所有的線程,并將parties重新賦值給count。
          reset方法很簡單,這里不詳細(xì)分析了。

          Semaphore

          Semaphore是信號的意思,或者說許可,可以用來控制最大并發(fā)量。初始定義好有幾個信號,然后在需要獲取信號的地方調(diào)用acquire方法,執(zhí)行完成后,需要調(diào)用release方法回收信號。

          ????public?Semaphore(int?permits)?{
          ????????sync?=?new?NonfairSync(permits);
          ????}
          ???
          ????public?Semaphore(int?permits,?boolean?fair)?{
          ????????sync?=?fair???new?FairSync(permits)?:?new?NonfairSync(permits);
          ????}

          它也有兩個構(gòu)造方法,可以指定公平或是非公平,而permits就是state的值。

          ????public?void?acquire()?throws?InterruptedException?{
          ????????sync.acquireSharedInterruptibly(1);
          ????}

          ?//?非公平方式
          ????final?int?nonfairTryAcquireShared(int?acquires)?{
          ????????for?(;;)?{
          ????????????int?available?=?getState();
          ????????????int?remaining?=?available?-?acquires;
          ????????????if?(remaining?????????????????compareAndSetState(available,?remaining))
          ????????????????return?remaining;
          ????????}
          ????}

          ?//?公平方式
          ????protected?int?tryAcquireShared(int?acquires)?{
          ????????for?(;;)?{
          ????????????if?(hasQueuedPredecessors())
          ????????????????return?-1;
          ????????????int?available?=?getState();
          ????????????int?remaining?=?available?-?acquires;
          ????????????if?(remaining?????????????????compareAndSetState(available,?remaining))
          ????????????????return?remaining;
          ????????}
          ????}

          acquire方法和CountDownLatch是一樣的,只是tryAcquireShared區(qū)分了公平和非公平方式。獲取到信號相當(dāng)于加共享鎖成功,否則則進(jìn)入隊(duì)列阻塞等待;而release方法和讀鎖解鎖方式也是一樣的,只是每次release都會將state+1。

          總結(jié)

          本文詳細(xì)分析了AQS的核心原理、鎖的實(shí)現(xiàn)以及常用的相關(guān)組件,掌握其原理能讓我們準(zhǔn)確的使用JUC下面的鎖以及線程協(xié)作組件。另外AQS代碼設(shè)計(jì)是非常精良的,有非常多的細(xì)節(jié),精簡的代碼中把所有的情況都考慮到了,細(xì)細(xì)體味對我們自身編碼能力也會有很大的提高。
          文章錯誤和不清楚的地方歡迎批評指出,另外超時相關(guān)的API本文都未涉及到,讀者可自行分析。





          ??? ?



          感謝點(diǎn)贊支持下哈?

          瀏覽 64
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  av超碰在线| 五月激情六月丁香 | 国产精品www | 久久久久久久艹 | 免费观看黄色在线视频 |