<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Java 中的 AQS 到底是什么?高級面試必問!

          共 5581字,需瀏覽 12分鐘

           ·

          2020-10-14 06:06

          Java技術(shù)棧

          www.javastack.cn

          關(guān)注閱讀更多優(yōu)質(zhì)文章



          作者:夜勿語
          來源:www.cnblogs.com/yewy/p/13773799.html

          前言

          JDK1.5以前只有synchronized同步鎖,并且效率非常低,因此大神Doug Lea自己寫了一套并發(fā)框架,這套框架的核心就在于AbstractQueuedSynchronizer類(即AQS),性能非常高,所以被引入JDK包中,即JUC。

          那么AQS是怎么實(shí)現(xiàn)的呢?

          本篇就是對AQS及其相關(guān)組件進(jìn)行分析,了解其原理,并領(lǐng)略大神的優(yōu)美而又精簡的代碼。

          AbstractQueuedSynchronizer

          AQS是JUC下最核心的類,沒有之一,所以我們先來分析一下這個類的數(shù)據(jù)結(jié)構(gòu)。

          AQS內(nèi)部是使用了雙向鏈表將等待線程鏈接起來,當(dāng)發(fā)生并發(fā)競爭的時候,就會初始化該隊列并讓線程進(jìn)入睡眠等待喚醒,同時每個節(jié)點(diǎn)會根據(jù)是否為共享鎖標(biāo)記狀態(tài)為共享模式獨(dú)占模式

          這個數(shù)據(jù)結(jié)構(gòu)需要好好理解并牢牢記住,下面分析的組件都將基于此實(shí)現(xiàn)。

          Lock

          Lock是一個接口,提供了加/解鎖的通用API,JUC主要提供了兩種鎖,ReentrantLock和ReentrantReadWriteLock,前者是重入鎖,實(shí)現(xiàn)Lock接口,后者是讀寫鎖,本身并沒有實(shí)現(xiàn)Lock接口,而是其內(nèi)部類ReadLock或WriteLock實(shí)現(xiàn)了Lock接口。

          先來看看Lock都提供了哪些接口:

          //?普通加鎖,不可打斷;未獲取到鎖進(jìn)入AQS阻塞??
          void?lock();??
          ??
          //?可打斷鎖??
          void?lockInterruptibly()?throws?InterruptedException;??
          ??
          //?嘗試加鎖,未獲取到鎖不阻塞,返回標(biāo)識??
          boolean?tryLock();??
          ??
          //?帶超時時間的嘗試加鎖??
          boolean?tryLock(long?time,?TimeUnit?unit)?throws?InterruptedException;??
          ??
          //?解鎖??
          void?unlock();??
          ??
          //?創(chuàng)建一個條件隊列??
          Condition?newCondition();??

          看到這里讀者們可以先思考下,自己如何來實(shí)現(xiàn)上面這些接口。

          ReentrantLock

          加鎖

          synchronizedReentrantLock都是可重入的,后者使用更加靈活,也提供了更多的高級特性,但其本質(zhì)的實(shí)現(xiàn)原理是差不多的(據(jù)說synchronized是借鑒了ReentrantLock的實(shí)現(xiàn)原理)。

          ReentrantLock提供了兩個構(gòu)造方法:

          public?ReentrantLock()?{??
          ????sync?=?new?NonfairSync();??
          }??

          public?ReentrantLock(boolean?fair)?{??
          ????sync?=?fair???new?FairSync()?:?new?NonfairSync();??
          }??

          有參構(gòu)造是根據(jù)參數(shù)創(chuàng)建公平鎖非公平鎖,而無參構(gòu)造默認(rèn)則是非公平鎖,因為非公平鎖性能非常高,并且大部分業(yè)務(wù)并不需要使用公平鎖。

          至于為什么非公平鎖性能很高,咱們接著往下看。

          非公平鎖/公平鎖

          lock

          非公平鎖和公平鎖在實(shí)現(xiàn)上基本一致,只有個別的地方不同,因此下面會采用對比分析方法進(jìn)行分析。

          從lock方法開始:

          public?void?lock()?{??
          ????sync.lock();??
          }??

          實(shí)際上是委托給了內(nèi)部類Sync,該類實(shí)現(xiàn)了AQS(其它組件實(shí)現(xiàn)方法也基本上都是這個套路);由于有公平和非公平兩種模式,因此該類又實(shí)現(xiàn)了兩個子類:FairSyncNonfairSync

          //?非公平鎖??
          final?void?lock()?{??
          ????if?(compareAndSetState(0,?1))??
          ????????setExclusiveOwnerThread(Thread.currentThread());??
          ????else??
          ????????acquire(1);??
          }??

          //?公平鎖??
          final?void?lock()?{??
          ???acquire(1);??
          }??

          這里就是公平鎖和非公平鎖的第一個不同,非公平鎖首先會調(diào)用CAS將state從0改為1,如果能改成功則表示獲取到鎖,直接將exclusiveOwnerThread設(shè)置為當(dāng)前線程,不用再進(jìn)行后續(xù)操作;否則則同公平鎖一樣調(diào)用acquire方法獲取鎖,這個是在AQS中實(shí)現(xiàn)的模板方法:

          public?final?void?acquire(int?arg)?{??
          ????if?(!tryAcquire(arg)?&&??
          ????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))??
          ????????selfInterrupt();??
          }??
          tryAcquire

          這里兩種鎖唯一不同的實(shí)現(xiàn)就是tryAcquire方法,先來看非公平鎖的實(shí)現(xiàn):

          protected?final?boolean?tryAcquire(int?acquires)?{??
          ????return?nonfairTryAcquire(acquires);??
          }??

          final?boolean?nonfairTryAcquire(int?acquires)?{??
          ????final?Thread?current?=?Thread.currentThread();??
          ????int?c?=?getState();??
          ????if?(c?==?0)?{??
          ????????if?(compareAndSetState(0,?acquires))?{??
          ????????????setExclusiveOwnerThread(current);??
          ????????????return?true;??
          ????????}??
          ????}??
          ????else?if?(current?==?getExclusiveOwnerThread())?{??
          ????????int?nextc?=?c?+?acquires;??
          ????????if?(nextc?????????????throw?new?Error("Maximum?lock?count?exceeded");??
          ????????setState(nextc);??
          ????????return?true;??
          ????}??
          ????return?false;??
          }??

          state=0表示還沒有被線程持有鎖,直接通過CAS修改,能修改成功的就獲取到鎖,修改失敗的線程先判斷exclusiveOwnerThread是不是當(dāng)前線程,是則state+1,表示重入次數(shù)+1并返回true,加鎖成功,否則則返回false表示嘗試加鎖失敗并調(diào)用acquireQueued入隊。

          protected?final?boolean?tryAcquire(int?acquires)?{??
          ????final?Thread?current?=?Thread.currentThread();??
          ????int?c?=?getState();??
          ????if?(c?==?0)?{??
          ????????if?(!hasQueuedPredecessors()?&&??
          ????????????compareAndSetState(0,?acquires))?{??
          ????????????setExclusiveOwnerThread(current);??
          ????????????return?true;??
          ????????}??
          ????}??
          ????else?if?(current?==?getExclusiveOwnerThread())?{??
          ????????int?nextc?=?c?+?acquires;??
          ????????if?(nextc?????????????throw?new?Error("Maximum?lock?count?exceeded");??
          ????????setState(nextc);??
          ????????return?true;??
          ????}??
          ????return?false;??
          }??

          public?final?boolean?hasQueuedPredecessors()?{??
          ????Node?t?=?tail;?//?Read?fields?in?reverse?initialization?order??
          ????Node?h?=?head;??
          ????Node?s;??
          ????//?首尾不相等且頭結(jié)點(diǎn)線程不是當(dāng)前線程則表示需要進(jìn)入隊列??
          ????return?h?!=?t?&&??
          ????????((s?=?h.next)?==?null?||?s.thread?!=?Thread.currentThread());??
          }??

          上面就是公平鎖的嘗試獲取鎖的代碼,可以看到基本和非公平鎖的代碼是一樣的,區(qū)別在于首次加鎖需要判斷是否已經(jīng)有隊列存在,沒有才去加鎖,有則直接返回false。

          addWaiter

          接著來看addWaiter方法,當(dāng)嘗試加鎖失敗時,首先就會調(diào)用該方法創(chuàng)建一個Node節(jié)點(diǎn)并添加到隊列中去。

          private?Node?addWaiter(Node?mode)?{??
          ????Node?node?=?new?Node(Thread.currentThread(),?mode);??
          ????Node?pred?=?tail;??
          ????//?尾節(jié)點(diǎn)不為null表示已經(jīng)存在隊列,直接將當(dāng)前線程作為尾節(jié)點(diǎn)??
          ????if?(pred?!=?null)?{??
          ????????node.prev?=?pred;??
          ????????if?(compareAndSetTail(pred,?node))?{??
          ????????????pred.next?=?node;??
          ????????????return?node;??
          ????????}??
          ????}??
          ????//?尾結(jié)點(diǎn)不存在則表示還沒有初始化隊列,需要初始化隊列??
          ????enq(node);??
          ????return?node;??
          }??

          private?Node?enq(final?Node?node)?{??
          //?自旋??
          ????for?(;;)?{??
          ????????Node?t?=?tail;??
          ????????if?(t?==?null)?{?//?只會有一個線程設(shè)置頭節(jié)點(diǎn)成功???
          ????????????if?(compareAndSetHead(new?Node()))??
          ????????????????tail?=?head;??
          ????????}?else?{?//?其它設(shè)置頭節(jié)點(diǎn)失敗的都會自旋設(shè)置尾節(jié)點(diǎn)??
          ????????????node.prev?=?t;??
          ????????????if?(compareAndSetTail(t,?node))?{??
          ????????????????t.next?=?node;??
          ????????????????return?t;??
          ????????????}??
          ????????}??
          ????}??
          }??

          這里首先傳入了一個獨(dú)占模式的空節(jié)點(diǎn),并根據(jù)該節(jié)點(diǎn)和當(dāng)前線程創(chuàng)建了一個Node,然后判斷是否已經(jīng)存在隊列,若存在則直接入隊,否則調(diào)用enq方法初始化隊列,提高效率。

          此處還有一個非常細(xì)節(jié)的地方,為什么設(shè)置尾節(jié)點(diǎn)時都要先將之前的尾節(jié)點(diǎn)設(shè)置為node.pre的值呢,而不是在CAS之后再設(shè)置?

          比如像下面這樣:

          if?(compareAndSetTail(pred,?node))?{??
          ?node.prev?=?pred;??
          ????pred.next?=?node;??
          ????return?node;??
          }??

          因為如果這樣做的話,在CAS設(shè)置完tail后會存在一瞬間的tail.pre=null的情況,而Doug Lea正是考慮到這種情況,不論何時獲取tail.pre都不會為null。

          acquireQueued

          接著看acquireQueued方法:

          final?boolean?acquireQueued(final?Node?node,?int?arg)?{??
          ?//?為true表示存在需要取消加鎖的節(jié)點(diǎn),僅從這段代碼可以看出,??
          ?//?除非發(fā)生異常,否則不會存在需要取消加鎖的節(jié)點(diǎn)。??
          ????boolean?failed?=?true;??
          ????try?{??
          ?????//?打斷標(biāo)記,因為調(diào)用的是lock方法,所以是不可打斷的??
          ?????//?(但實(shí)際上是打斷了的,只不過這里采用了一種**靜默**處理方式,稍后分析)??
          ????????boolean?interrupted?=?false;??
          ????????for?(;;)?{??
          ????????????final?Node?p?=?node.predecessor();??
          ????????????if?(p?==?head?&&?tryAcquire(arg))?{??
          ????????????????setHead(node);??
          ????????????????p.next?=?null;?//?help?GC??
          ????????????????failed?=?false;??
          ????????????????return?interrupted;??
          ????????????}??
          ????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&??
          ????????????????parkAndCheckInterrupt())??
          ????????????????interrupted?=?true;??
          ????????}??
          ????}?finally?{??
          ????????if?(failed)??
          ????????????cancelAcquire(node);??
          ????}??
          }??

          private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{??
          ????int?ws?=?pred.waitStatus;??
          ????if?(ws?==?Node.SIGNAL)??
          ????????return?true;??

          ????if?(ws?>?0)?{??
          ????????do?{??
          ????????????node.prev?=?pred?=?pred.prev;??
          ????????}?while?(pred.waitStatus?>?0);??
          ????????pred.next?=?node;??
          ????}?else?{??
          ????????compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);??
          ????}??
          ????return?false;??
          }??

          private?final?boolean?parkAndCheckInterrupt()?{??
          ????LockSupport.park(this);??
          ????return?Thread.interrupted();??
          }??

          這里就是隊列中線程加鎖/睡眠的核心邏輯,首先判斷剛剛調(diào)用addWaiter方法添加到隊列的節(jié)點(diǎn)是否是頭節(jié)點(diǎn),如果是則再次嘗試加鎖,這個剛剛分析過了,非公平鎖在這里就會再次搶一次鎖,搶鎖成功則設(shè)置為head節(jié)點(diǎn)并返回打斷標(biāo)記;否則則和公平鎖一樣調(diào)用shouldParkAfterFailedAcquire判斷是否應(yīng)該調(diào)用park方法進(jìn)入睡眠。

          park細(xì)節(jié)

          為什么在park前需要這么一個判斷呢?因為當(dāng)前節(jié)點(diǎn)的線程進(jìn)入park后只能被前一個節(jié)點(diǎn)喚醒,那前一個節(jié)點(diǎn)怎么知道有沒有后繼節(jié)點(diǎn)需要喚醒呢?

          因此當(dāng)前節(jié)點(diǎn)在park前需要給前一個節(jié)點(diǎn)設(shè)置一個標(biāo)識,即將waitStatus設(shè)置為Node.SIGNAL(-1),然后自旋一次再走一遍剛剛的流程,若還是沒有獲取到鎖,則調(diào)用parkAndCheckInterrupt進(jìn)入睡眠狀態(tài)。

          打斷

          讀者可能會比較好奇Thread.interrupted這個方法是做什么用的。

          public?static?boolean?interrupted()?{??
          ????return?currentThread().isInterrupted(true);??
          }

          這個是用來判斷當(dāng)前線程是否被打斷過,并清除打斷標(biāo)記(若是被打斷過則會返回true,并將打斷標(biāo)記設(shè)置為false),所以調(diào)用lock方法時,通過interrupt也是會打斷睡眠的線程的,只是Doug Lea做了一個假象,讓用戶無感知。

          但有些場景又需要知道該線程是否被打斷過,所以acquireQueued最終會返回interrupted打斷標(biāo)記,如果是被打斷過,則返回的true,并在acquire方法中調(diào)用selfInterrupt再次打斷當(dāng)前線程(將打斷標(biāo)記設(shè)置為true)。

          推薦閱讀:一文搞懂 Java 線程中斷

          這里我們對比看看lockInterruptibly的實(shí)現(xiàn):

          public?void?lockInterruptibly()?throws?InterruptedException?{??
          ????sync.acquireInterruptibly(1);??
          }??

          public?final?void?acquireInterruptibly(int?arg)??
          ????????throws?InterruptedException?{??
          ????if?(Thread.interrupted())??
          ????????throw?new?InterruptedException();??
          ????if?(!tryAcquire(arg))??
          ????????doAcquireInterruptibly(arg);??
          }??

          private?void?doAcquireInterruptibly(int?arg)??
          ????throws?InterruptedException?{??
          ????final?Node?node?=?addWaiter(Node.EXCLUSIVE);??
          ????boolean?failed?=?true;??
          ????try?{??
          ????????for?(;;)?{??
          ????????????final?Node?p?=?node.predecessor();??
          ????????????if?(p?==?head?&&?tryAcquire(arg))?{??
          ????????????????setHead(node);??
          ????????????????p.next?=?null;?//?help?GC??
          ????????????????failed?=?false;??
          ????????????????return;??
          ????????????}??
          ????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&??
          ????????????????parkAndCheckInterrupt())??
          ????????????????throw?new?InterruptedException();??
          ????????}??
          ????}?finally?{??
          ????????if?(failed)??
          ????????????cancelAcquire(node);??
          ????}??
          }??

          可以看到區(qū)別就在于使用lockInterruptibly加鎖被打斷后,是直接拋出InterruptedException異常,我們可以捕獲這個異常進(jìn)行相應(yīng)的處理。

          推薦閱讀:Java 異常處理的 10 個良心建議

          取消

          最后來看看cancelAcquire是如何取消加鎖的,該情況比較特殊,簡單了解下即可:

          private?void?cancelAcquire(Node?node)?{??
          ????if?(node?==?null)??
          ????????return;??

          //?首先將線程置空??
          ????node.thread?=?null;??

          //?waitStatus?>?0表示節(jié)點(diǎn)處于取消狀態(tài),則直接將當(dāng)前節(jié)點(diǎn)的pre指向在此之前的最后一個有效節(jié)點(diǎn)??
          ????Node?pred?=?node.prev;??
          ????while?(pred.waitStatus?>?0)??
          ????????node.prev?=?pred?=?pred.prev;??

          //?保存前一個節(jié)點(diǎn)的下一個節(jié)點(diǎn),如果在此之前存在取消節(jié)點(diǎn),這里就是之前取消被取消節(jié)點(diǎn)的頭節(jié)點(diǎn)??
          ????Node?predNext?=?pred.next;??

          ????node.waitStatus?=?Node.CANCELLED;??

          //?當(dāng)前節(jié)點(diǎn)是tail節(jié)點(diǎn),則替換尾節(jié)點(diǎn),替換成功則將新的尾結(jié)點(diǎn)的下一個節(jié)點(diǎn)設(shè)置為null;??
          //?否則需要判斷是將當(dāng)前節(jié)點(diǎn)的下一個節(jié)點(diǎn)賦值給最后一個有效節(jié)點(diǎn),還是喚醒下一個節(jié)點(diǎn)。??
          ????if?(node?==?tail?&&?compareAndSetTail(node,?pred))?{??
          ????????compareAndSetNext(pred,?predNext,?null);??
          ????}?else?{??
          ????????int?ws;??
          ????????if?(pred?!=?head?&&??
          ????????????((ws?=?pred.waitStatus)?==?Node.SIGNAL?||??
          ?????????????(ws?<=?0?&&?compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL)))?&&??
          ????????????pred.thread?!=?null)?{??
          ????????????Node?next?=?node.next;??
          ????????????if?(next?!=?null?&&?next.waitStatus?<=?0)??
          ????????????????compareAndSetNext(pred,?predNext,?next);??
          ????????}?else?{??
          ????????????unparkSuccessor(node);??
          ????????}??

          ????????node.next?=?node;?//?help?GC??
          ????}??
          }??

          解鎖

          public?void?unlock()?{??
          ????sync.release(1);??
          }??

          public?final?boolean?release(int?arg)?{??
          ????if?(tryRelease(arg))?{??
          ????????Node?h?=?head;??
          ????????if?(h?!=?null?&&?h.waitStatus?!=?0)??
          ????????????unparkSuccessor(h);??
          ????????return?true;??
          ????}??
          ????return?false;??
          }??

          protected?final?boolean?tryRelease(int?releases)?{??
          ????int?c?=?getState()?-?releases;??
          ????if?(Thread.currentThread()?!=?getExclusiveOwnerThread())??
          ????????throw?new?IllegalMonitorStateException();??
          ????boolean?free?=?false;??
          ????if?(c?==?0)?{??
          ????????free?=?true;??
          ????????setExclusiveOwnerThread(null);??
          ????}??
          ????setState(c);??
          ????return?free;??
          }??

          private?void?unparkSuccessor(Node?node)?{??
          ????int?ws?=?node.waitStatus;??
          ????if?(ws?????????compareAndSetWaitStatus(node,?ws,?0);??

          ????Node?s?=?node.next;??
          ????//?并發(fā)情況下,可能已經(jīng)被其它線程喚醒或已經(jīng)取消,則從后向前找到最后一個有效節(jié)點(diǎn)并喚醒??
          ????if?(s?==?null?||?s.waitStatus?>?0)?{??
          ????????s?=?null;??
          ????????for?(Node?t?=?tail;?t?!=?null?&&?t?!=?node;?t?=?t.prev)??
          ????????????if?(t.waitStatus?<=?0)??
          ????????????????s?=?t;??
          ????}??
          ????if?(s?!=?null)??
          ????????LockSupport.unpark(s.thread);??
          }??

          解鎖就比較簡單了,先調(diào)用tryReleasestate執(zhí)行減一操作,如果state==0,則表示完全釋放鎖;若果存在后繼節(jié)點(diǎn),則調(diào)用unparkSuccessor喚醒后繼節(jié)點(diǎn),喚醒后的節(jié)點(diǎn)的waitStatus會重新被設(shè)置為0。

          只是這里有一個小細(xì)節(jié),為什么是從后向前找呢?因為我們在開始說過,設(shè)置尾節(jié)點(diǎn)保證了node.pre不會為null,但pre.next仍有可能是null,所以這里只能從后向前找到最后一個有效節(jié)點(diǎn)。

          小結(jié)

          上面是ReentrantLock的加鎖流程,可以看到整個流程不算復(fù)雜,只是判斷和跳轉(zhuǎn)比較多,主要是Doug Lea將代碼和性能都優(yōu)化到了極致,代碼非常精簡,但細(xì)節(jié)卻非常多。

          這篇《到底什么是重入鎖,一次搞清楚!推薦看下,關(guān)注公眾號Java技術(shù)棧回復(fù)java獲取更多Java及多線程教程。

          另外通過上面的分析,我們也可以發(fā)現(xiàn),公平鎖和非公平鎖的區(qū)別就在于非公平鎖不管是否有線程在排隊,先搶三次鎖,而公平鎖則會判斷是否存在隊列,有線程在排隊則直接進(jìn)入隊列排隊;另外線程在park被喚醒后非公平鎖還會搶鎖,公平鎖仍然需要排隊,所以非公平鎖的性能比公平鎖高很多,大部分情況下我們使用非公平鎖即可。

          ReentrantReadWriteLock

          ReentrantLock是一把獨(dú)占鎖,只支持重入,不支持共享,所以JUC包下還提供了讀寫鎖,這把鎖支持讀讀并發(fā),但讀寫、寫寫都是互斥的。

          讀寫鎖也是基于AQS實(shí)現(xiàn)的,也包含了一個繼承自AQS的內(nèi)部類Sync,同樣也有公平和非公平兩種模式,下面主要討論非公平模式下的讀寫鎖實(shí)現(xiàn)。

          讀寫鎖實(shí)現(xiàn)相對比較復(fù)雜,在ReentrantLock中就是使用的int型的state屬性來表示鎖被某個線程占有和重入次數(shù),而ReentrantReadWriteLock分為了讀和寫兩種鎖,要怎么用一個字段表示兩種鎖的狀態(tài)呢?

          Doug Lea大師將state字段分為了高二字節(jié)和低二字節(jié),即高16位用來表示讀鎖狀態(tài),低16位則用來表示寫鎖,如下圖:

          因為讀寫鎖狀態(tài)都只用了兩個字節(jié),所以可重入的次數(shù)最多是65535,當(dāng)然正常情況下重入是不可能達(dá)到這么多的。

          那它是怎么實(shí)現(xiàn)的呢?還是先從構(gòu)造方法開始:

          public?ReentrantReadWriteLock()?{??
          ????this(false);??
          }??

          public?ReentrantReadWriteLock(boolean?fair)?{??
          ????sync?=?fair???new?FairSync()?:?new?NonfairSync();??
          ????readerLock?=?new?ReadLock(this);??
          ????writerLock?=?new?WriteLock(this);??
          }??

          同樣默認(rèn)就是非公平鎖,同時還創(chuàng)建了readerLockwriterLock兩個對象,我們只需要像下面這樣就能獲取到讀寫鎖:

          private?static?ReentrantReadWriteLock?lock?=?new?ReentrantReadWriteLock();??
          private?static?Lock?r?=?lock.readLock();??
          private?static?Lock?w?=?lock.writeLock();??

          寫鎖

          由于寫鎖的加鎖過程相對更簡單,下面先從寫鎖加鎖開始分析,入口在ReentrantReadWriteLock#WriteLock.lock()方法,點(diǎn)進(jìn)去看,發(fā)現(xiàn)還是使用的AQS中的acquire方法:

          public?final?void?acquire(int?arg)?{??
          ????if?(!tryAcquire(arg)?&&??
          ????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))??
          ????????selfInterrupt();??
          }??

          所以不同的地方也只有tryAcquire方法,我們重點(diǎn)分析這個方法就行:

          static?final?int?SHARED_SHIFT???=?16;??
          //?65535??
          static?final?int?MAX\_COUNT??????=?(1?<//?低16位是1111....1111??
          static?final?int?EXCLUSIVE\_MASK?=?(1?<//?得到c低16位的值??
          static?int?exclusiveCount(int?c)?{?return?c?&?EXCLUSIVE_MASK;?}??

          protected?final?boolean?tryAcquire(int?acquires)?{??
          ????Thread?current?=?Thread.currentThread();??
          ????int?c?=?getState();??
          ????//?獲取寫鎖加鎖和重入的次數(shù)??
          ????int?w?=?exclusiveCount(c);??
          ????if?(c?!=?0)?{?//?已經(jīng)有線程持有鎖??
          ?????//?這里有兩種情況:1. c!=0?&& w==0表示有線程獲取了讀鎖,不論是否是當(dāng)前線程,直接返回false,??
          ?????//?也就是說讀-寫鎖是不支持升級重入的(但支持寫-讀降級),原因后文會詳細(xì)分析;??
          ?????//?2.?c!=0?&&?w!=0?&&?current?!=?getExclusiveOwnerThread()表示有其它線程持有了寫鎖,寫寫互斥??
          ????????if?(w?==?0?||?current?!=?getExclusiveOwnerThread())??
          ????????????return?false;??

          //?超出65535,拋異常??
          ????????if?(w?+?exclusiveCount(acquires)?>?MAX_COUNT)??
          ????????????throw?new?Error("Maximum?lock?count?exceeded");??
          ????????//?否則寫鎖的次數(shù)直接加1??
          ????????setState(c?+?acquires);??
          ????????return?true;??
          ????}??

          //?c==0才會走到這,但這時存在兩種情況,有隊列和無隊列,所以公平鎖和非公平鎖處理不同,??
          //?前者需要判斷是否存在隊列,有則嘗試加鎖失敗,無則加鎖成功,而非公平鎖直接使用CAS加鎖即可??
          ????if?(writerShouldBlock()?||??
          ????????!compareAndSetState(c,?c?+?acquires))??
          ????????return?false;??
          ????setExclusiveOwnerThread(current);??
          ????return?true;??
          }??
          ?
          ?
          ?

          寫鎖嘗試加鎖的過程就分析完了,其余的部分上文已經(jīng)講過,這里不再贅述。

          讀鎖

          public?void?lock()?{??
          ????sync.acquireShared(1);??
          }??

          public?final?void?acquireShared(int?arg)?{??
          ????if?(tryAcquireShared(arg)?????????doAcquireShared(arg);??
          }???

          讀鎖在加鎖開始就和其它鎖不同,調(diào)用的是acquireShared方法,意為獲取共享鎖。

          static?final?int?SHARED\_UNIT????=?(1?<//?右移16位得到讀鎖狀態(tài)的值??
          static?int?sharedCount(int?c)????{?return?c?>>>?SHARED_SHIFT;?}??

          protected?final?int?tryAcquireShared(int?unused)?{??
          ?????Thread?current?=?Thread.currentThread();??
          ?????int?c?=?getState();??
          ?????//?為什么讀寫互斥?因為讀鎖一上來就判斷了是否有其它線程持有了寫鎖(當(dāng)前線程持有寫鎖再獲取讀鎖是可以的)??
          ?????if?(exclusiveCount(c)?!=?0?&&??
          ?????????getExclusiveOwnerThread()?!=?current)??
          ?????????return?-1;??
          ?????int?r?=?sharedCount(c);??
          ?????//?公平鎖判斷是否存在隊列,非公平鎖判斷第一個節(jié)點(diǎn)是不是EXCLUSIVE模式,是的話會返回true??
          ?????//?返回false則需要判斷讀鎖加鎖次數(shù)是否超過65535,沒有則使用CAS給讀鎖+1??
          ?????if?(!readerShouldBlock()?&&??
          ?????????r??????????compareAndSetState(c,?c?+?SHARED_UNIT))?{??
          ?????????if?(r?==?0)?{??
          ??????????//?第一個讀鎖線程就是當(dāng)前線程??
          ?????????????firstReader?=?current;??
          ?????????????firstReaderHoldCount?=?1;??
          ?????????}?else?if?(firstReader?==?current)?{??
          ??????????//?記錄讀鎖的重入??
          ?????????????firstReaderHoldCount++;??
          ?????????}?else?{??
          ??????????//?獲取最后一次加讀鎖的重入次數(shù)記錄器HoldCounter??
          ?????????????HoldCounter?rh?=?cachedHoldCounter;??
          ?????????????if?(rh?==?null?||?rh.tid?!=?getThreadId(current))??
          ??????????????//?當(dāng)前線程第一次重入需要初始化,以及當(dāng)前線程和緩存的最后一次記錄器的線程id不同,需要從ThreadLocalHoldCounter拿到對應(yīng)的記錄器??
          ?????????????????cachedHoldCounter?=?rh?=?readHolds.get();??
          ?????????????else?if?(rh.count?==?0)??
          ??????????????//?緩存到ThreadLocal??
          ?????????????????readHolds.set(rh);??
          ?????????????rh.count++;??
          ?????????}??
          ?????????return?1;??
          ?????}??
          ?????return?fullTryAcquireShared(current);??
          }??

          這段代碼有點(diǎn)復(fù)雜,首先需要保證讀寫互斥,然后進(jìn)行初次加鎖,若加鎖失敗就會調(diào)用fullTryAcquireShared方法進(jìn)行兜底處理。在初次加鎖中與寫鎖不同的是,寫鎖的state可以直接用來記錄寫鎖的重入次數(shù),因為寫寫互斥,但讀鎖是共享的,state用來記錄讀鎖的加鎖次數(shù)了,重入次數(shù)該怎么記錄呢?

          重入是指同一線程,那么是不是可以使用ThreadLocl來保存呢?沒錯,Doug Lea就是這么處理的,新增了一個HoldCounter類,這個類只有線程id和重入次數(shù)兩個字段,當(dāng)線程重入的時候就會初始化這個類并保存在ThreadLocalHoldCounter類中,這個類就是繼承ThreadLocl的,用來初始化HoldCounter對象并保存。

          這里還有個小細(xì)節(jié),為什么要使用cachedHoldCounter緩存最后一次加讀鎖的HoldCounter

          因為大部分情況下,重入和釋放鎖的線程很有可能就是最后一次加鎖的線程,所以這樣做能夠提高加解鎖的效率,Doug Lea真是把性能優(yōu)化到了極致。

          上面只是初次加鎖,有可能會加鎖失敗,就會進(jìn)入到fullTryAcquireShared方法:

          final?int?fullTryAcquireShared(Thread?current)?{??
          ????HoldCounter?rh?=?null;??
          ????for?(;;)?{??
          ????????int?c?=?getState();??
          ????????if?(exclusiveCount(c)?!=?0)?{??
          ????????????if?(getExclusiveOwnerThread()?!=?current)??
          ????????????????return?-1;??
          ????????}?else?if?(readerShouldBlock())?{??
          ????????????if?(firstReader?==?current)?{??
          ????????????????//?assert?firstReaderHoldCount?>?0;??
          ????????????}?else?{??
          ????????????????if?(rh?==?null)?{??
          ????????????????????rh?=?cachedHoldCounter;??
          ????????????????????if?(rh?==?null?||?rh.tid?!=?getThreadId(current))?{??
          ????????????????????????rh?=?readHolds.get();??
          ????????????????????????if?(rh.count?==?0)??
          ????????????????????????????readHolds.remove();??
          ????????????????????}??
          ????????????????}??
          ????????????????if?(rh.count?==?0)??
          ????????????????????return?-1;??
          ????????????}??
          ????????}??
          ????????if?(sharedCount(c)?==?MAX_COUNT)??
          ????????????throw?new?Error("Maximum?lock?count?exceeded");??
          ????????if?(compareAndSetState(c,?c?+?SHARED_UNIT))?{??
          ????????????if?(sharedCount(c)?==?0)?{??
          ????????????????firstReader?=?current;??
          ????????????????firstReaderHoldCount?=?1;??
          ????????????}?else?if?(firstReader?==?current)?{??
          ????????????????firstReaderHoldCount++;??
          ????????????}?else?{??
          ????????????????if?(rh?==?null)??
          ????????????????????rh?=?cachedHoldCounter;??
          ????????????????if?(rh?==?null?||?rh.tid?!=?getThreadId(current))??
          ????????????????????rh?=?readHolds.get();??
          ????????????????else?if?(rh.count?==?0)??
          ????????????????????readHolds.set(rh);??
          ????????????????rh.count++;??
          ????????????????cachedHoldCounter?=?rh;?//?cache?for?release??
          ????????????}??
          ????????????return?1;??
          ????????}??
          ????}??
          }??

          這個方法中代碼和tryAcquireShared基本上一致,只是采用了自旋的方式,處理初次加鎖中的漏網(wǎng)之魚,讀者們可自行閱讀分析。

          上面兩個方法若返回大于0則表示加鎖成功,小于0則會調(diào)用doAcquireShared方法,這個就和之前分析的acquireQueued差不多了:

          private?void?doAcquireShared(int?arg)?{??
          ?//?先添加一個SHARED類型的節(jié)點(diǎn)到隊列??
          ????final?Node?node?=?addWaiter(Node.SHARED);??
          ????boolean?failed?=?true;??
          ????try?{??
          ????????boolean?interrupted?=?false;??
          ????????for?(;;)?{??
          ????????????final?Node?p?=?node.predecessor();??
          ????????????if?(p?==?head)?{??
          ?????????????//?再次嘗試加讀鎖??
          ????????????????int?r?=?tryAcquireShared(arg);??
          ????????????????if?(r?>=?0)?{??
          ?????????????????//?設(shè)置head節(jié)點(diǎn)以及傳播喚醒后面的讀線程??
          ????????????????????setHeadAndPropagate(node,?r);??
          ????????????????????p.next?=?null;?//?help?GC??
          ????????????????????if?(interrupted)??
          ????????????????????????selfInterrupt();??
          ????????????????????failed?=?false;??
          ????????????????????return;??
          ????????????????}??
          ????????????}??
          ????????????//?只有前一個節(jié)點(diǎn)的waitStatus=-1時才會park,=0或者-3(先不考慮-2和1的情況)都會設(shè)置為-1后再次自旋嘗試加鎖,若還是加鎖失敗就會park??
          ????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&??
          ????????????????parkAndCheckInterrupt())??
          ????????????????interrupted?=?true;??
          ????????}??
          ????}?finally?{??
          ????????if?(failed)??
          ????????????cancelAcquire(node);??
          ????}??
          }??

          private?void?setHeadAndPropagate(Node?node,?int?propagate)?{??
          ?//?設(shè)置頭節(jié)點(diǎn)??
          ????Node?h?=?head;?//?Record?old?head?for?check?below??
          ????setHead(node);??

          ????//?propagate是tryAcquireShared的返回值,當(dāng)前線程加鎖成功還要去喚醒后繼的共享節(jié)點(diǎn)??
          ????//?(其余的判斷比較復(fù)雜,筆者也還未想明白,知道的讀者可以指點(diǎn)一下)??
          ????if?(propagate?>?0?||?h?==?null?||?h.waitStatus?????????(h?=?head)?==?null?||?h.waitStatus?????????Node?s?=?node.next;??
          ????????//?判斷后繼節(jié)點(diǎn)是否是共享節(jié)點(diǎn)??
          ????????if?(s?==?null?||?s.isShared())??
          ????????????doReleaseShared();??
          ????}??
          }??

          private?void?doReleaseShared()?{??
          ????for?(;;)?{??
          ????????Node?h?=?head;??
          ????????//?存在后繼節(jié)點(diǎn)??
          ????????if?(h?!=?null?&&?h?!=?tail)?{??
          ????????????int?ws?=?h.waitStatus;??
          ????????????if?(ws?==?Node.SIGNAL)?{??
          ?????????????//?當(dāng)前一個節(jié)點(diǎn)加鎖成功后自然需要將-1改回0,并喚醒后繼線程,同時自旋將0改為-2讓喚醒傳播下去??
          ????????????????if?(!compareAndSetWaitStatus(h,?Node.SIGNAL,?0))??
          ????????????????????continue;??????????
          ????????????????unparkSuccessor(h);??
          ????????????}??
          ????????????//?設(shè)置頭節(jié)點(diǎn)的waitStatus=-2,使得喚醒可以傳播下去??
          ????????????else?if?(ws?==?0?&&??
          ?????????????????????!compareAndSetWaitStatus(h,?0,?Node.PROPAGATE))??
          ????????????????continue;???????????????
          ????????}??
          ????????if?(h?==?head)????????????
          ????????????break;??
          ????}??
          }??

          private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{??
          ????int?ws?=?pred.waitStatus;??
          ????if?(ws?==?Node.SIGNAL)??
          ????????return?true;??
          ????if?(ws?>?0)?{??
          ????????do?{??
          ????????????node.prev?=?pred?=?pred.prev;??
          ????????}?while?(pred.waitStatus?>?0);??
          ????????pred.next?=?node;??
          ????}?else?{??
          ????????compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);??
          ????}??
          ????return?false;??
          }??

          這里的邏輯也非常的繞,當(dāng)多個線程同時調(diào)用addWaiter添加到隊列中后,并且假設(shè)這些節(jié)點(diǎn)的第一個節(jié)點(diǎn)的前一個節(jié)點(diǎn)就是head節(jié)點(diǎn),那么第一個節(jié)點(diǎn)就能加鎖成功(假設(shè)都是SHARED節(jié)點(diǎn)),其余的節(jié)點(diǎn)在第一個節(jié)點(diǎn)設(shè)置頭節(jié)點(diǎn)之前都會進(jìn)入shouldParkAfterFailedAcquire方法,這時候waitStatus都等于0,所以繼續(xù)自旋不會park,若再次加鎖還失敗就會park(因為這時候waitStatus=-1),但都是讀線程的情況下一般都不會出現(xiàn),因為setHeadAndPropagate第一步就是修改head,所以其余SHARED節(jié)點(diǎn)最終都能加鎖成功并一直將喚醒傳播下去。

          以上就是讀寫鎖加鎖過程,解鎖比較簡單,這里就不詳細(xì)分析了。

          小結(jié)

          讀寫鎖將state分為了高二字節(jié)和低二字節(jié),分別存儲讀鎖和寫鎖的狀態(tài),實(shí)現(xiàn)更為的復(fù)雜,在使用上還有幾點(diǎn)需要注意:

          • 讀讀共享,但是在讀中間穿插了寫的話,后面的讀都會被阻塞,直到前面的寫釋放鎖后,后面的讀才會共享,相關(guān)原理看完前文不難理解。

          • 讀寫鎖只支持降級重入,不支持升級重入。因為如果支持升級重入的話,是會出現(xiàn)死鎖的。如下面這段代碼:

          private?static?void?rw()?{??
          ????r.lock();??
          ????try?{??
          ????????log.info("獲取到讀鎖");??
          ????????w.lock();??
          ????????try?{??
          ????????????log.info("獲取到寫鎖");??
          ????????}?finally?{??
          ????????????w.unlock();??
          ????????}??
          ????}?finally?{??
          ????????r.unlock();??
          ????}??
          }??

          多個線程訪問都能獲取到讀鎖,但讀寫互斥,彼此都要等待對方的讀鎖釋放才能獲取到寫鎖,這就造成了死鎖。

          ReentrantReadWriteLock在某些場景下性能上不算高,因此Doug Lea在JDK1.8的時候又提供了一把高性能的讀寫鎖StampedLock,前者讀寫鎖都是悲觀鎖,而后者提供了新的模式——樂觀鎖,但它不是基于AQS實(shí)現(xiàn)的,本文不進(jìn)行分析。

          Condition

          Lock接口中還有一個方法newCondition,這個方法就是創(chuàng)建一個條件隊列:

          public?Condition?newCondition()?{??
          ????return?sync.newCondition();??
          }??

          final?ConditionObject?newCondition()?{??
          ????return?new?ConditionObject();??
          }??

          所謂條件隊列就是創(chuàng)建一個新的ConditionObject對象,這個對象的數(shù)據(jù)結(jié)構(gòu)在開篇就看過了,包含兩個節(jié)點(diǎn)字段,每當(dāng)調(diào)用Condition#await方法時就會在對應(yīng)的Condition對象中排隊等待:

          public?final?void?await()?throws?InterruptedException?{??
          ????if?(Thread.interrupted())??
          ????????throw?new?InterruptedException();??
          ????//?加入條件隊列??
          ????Node?node?=?addConditionWaiter();??
          ????//?因為Condition.await必須配合Lock.lock使用,所以await時就是將已獲得鎖的線程全部釋放掉??
          ????int?savedState?=?fullyRelease(node);??
          ????int?interruptMode?=?0;??
          ????//?判斷是在同步隊列還是條件隊列,后者則直接park??
          ????while?(!isOnSyncQueue(node))?{??
          ????????LockSupport.park(this);??
          ????????//?獲取打斷處理方式(拋出異常或重設(shè)標(biāo)記)??
          ????????if?((interruptMode?=?checkInterruptWhileWaiting(node))?!=?0)??
          ????????????break;??
          ????}??
          ????//?調(diào)用aqs的方法??
          ????if?(acquireQueued(node,?savedState)?&&?interruptMode?!=?THROW_IE)??
          ????????interruptMode?=?REINTERRUPT;??
          ????if?(node.nextWaiter?!=?null)?//?clean?up?if?cancelled??
          ?????//?清除掉已經(jīng)進(jìn)入同步隊列的節(jié)點(diǎn)??
          ????????unlinkCancelledWaiters();??
          ????if?(interruptMode?!=?0)??
          ????????reportInterruptAfterWait(interruptMode);??
          }??

          private?Node?addConditionWaiter()?{??
          ????Node?t?=?lastWaiter;??
          ????//?清除狀態(tài)為取消的節(jié)點(diǎn)??
          ????if?(t?!=?null?&&?t.waitStatus?!=?Node.CONDITION)?{??
          ????????unlinkCancelledWaiters();??
          ????????t?=?lastWaiter;??
          ????}??

          //?創(chuàng)建一個CONDITION狀態(tài)的節(jié)點(diǎn)并添加到隊列末尾??
          ????Node?node?=?new?Node(Thread.currentThread(),?Node.CONDITION);??
          ????if?(t?==?null)??
          ????????firstWaiter?=?node;??
          ????else??
          ????????t.nextWaiter?=?node;??
          ????lastWaiter?=?node;??
          ????return?node;??
          }??

          await方法實(shí)現(xiàn)比較簡單,大部分代碼都是上文分析過的,這里不再重復(fù)。接著來看signal方法:

          public?final?void?signal()?{??
          ????if?(!isHeldExclusively())??
          ????????throw?new?IllegalMonitorStateException();??
          ????//?從條件隊列第一個節(jié)點(diǎn)開始喚醒??
          ????Node?first?=?firstWaiter;??
          ????if?(first?!=?null)??
          ????????doSignal(first);??
          }??

          private?void?doSignal(Node?first)?{??
          ????do?{??
          ????????if?(?(firstWaiter?=?first.nextWaiter)?==?null)??
          ????????????lastWaiter?=?null;??
          ????????first.nextWaiter?=?null;??
          ????}?while?(!transferForSignal(first)?&&??
          ?????????????(first?=?firstWaiter)?!=?null);??
          }??

          final?boolean?transferForSignal(Node?node)?{??
          ?//?修改waitStatus狀態(tài),如果修改失敗,則說明該節(jié)點(diǎn)已經(jīng)從條件隊列轉(zhuǎn)移到了同步隊列??
          ????if?(!compareAndSetWaitStatus(node,?Node.CONDITION,?0))??
          ????????return?false;??

          //?上面修改成功,則將該節(jié)點(diǎn)添加到同步隊列末尾,并返回之前的尾結(jié)點(diǎn)??
          ????Node?p?=?enq(node);??
          ????int?ws?=?p.waitStatus;??
          ????if?(ws?>?0?||?!compareAndSetWaitStatus(p,?ws,?Node.SIGNAL))??
          ?????//?unpark當(dāng)前線程,結(jié)合await方法看??
          ????????LockSupport.unpark(node.thread);??
          ????return?true;??
          }???

          signal的邏輯也比較簡單,就是喚醒條件隊列中的第一個節(jié)點(diǎn),主要是要結(jié)合await的代碼一起理解。

          其它組件

          上文分析的鎖都是用來實(shí)現(xiàn)并發(fā)安全控制的,而對于多線程協(xié)作JUC又基于AQS提供了CountDownLatch、CyclicBarrier、Semaphore等組件,下面一一分析。

          CountDownLatch

          CountDownLatch在創(chuàng)建的時候就需要指定一個計數(shù):

          CountDownLatch?countDownLatch?=?new?CountDownLatch(5);??

          然后在需要等待的地方調(diào)用countDownLatch.await()方法,然后在其它線程完成任務(wù)后調(diào)用countDownLatch.countDown()方法,每調(diào)用一次該計數(shù)就會減一,直到計數(shù)為0時,await的地方就會自動喚醒,繼續(xù)后面的工作,所以CountDownLatch適用于一個線程等待多個線程的場景,那它是怎么實(shí)現(xiàn)的呢?

          讀者們可以結(jié)合上文自己先思考下。

          public?CountDownLatch(int?count)?{??
          ????if?(count?"count?);??
          ????this.sync?=?new?Sync(count);??
          }??

          Sync(int?count)?{??
          ????setState(count);??
          }??

          與前面講的鎖一樣,也有一個內(nèi)部類Sync繼承自AQS,并且在構(gòu)造時就將傳入的計數(shù)設(shè)置到了state屬性,看到這里不難猜到CountDownLatch的實(shí)現(xiàn)原理了。

          public?void?await()?throws?InterruptedException?{??
          ????sync.acquireSharedInterruptibly(1);??
          }??

          public?final?void?acquireSharedInterruptibly(int?arg)??
          ????????throws?InterruptedException?{??
          ????if?(Thread.interrupted())??
          ????????throw?new?InterruptedException();??
          ????if?(tryAcquireShared(arg)?????????doAcquireSharedInterruptibly(arg);??
          }??

          protected?int?tryAcquireShared(int?acquires)?{??
          ????return?(getState()?==?0)???1?:?-1;??
          }??
          ?

          在await方法中使用的是可打斷的方式獲取的共享鎖,同樣除了tryAcquireShared方法,其余的都是復(fù)用的之前分析過的代碼,而tryAcquireShared就是判斷state是否等于0,不等于就阻塞。

          public?void?countDown()?{??
          ????sync.releaseShared(1);??
          }??

          public?final?boolean?releaseShared(int?arg)?{??
          ????if?(tryReleaseShared(arg))?{??
          ????????doReleaseShared();??
          ????????return?true;??
          ????}??
          ????return?false;??
          }??

          protected?boolean?tryReleaseShared(int?releases)?{??
          ????for?(;;)?{??
          ????????int?c?=?getState();??
          ????????if?(c?==?0)??
          ????????????return?false;??
          ????????int?nextc?=?c-1;??
          ????????if?(compareAndSetState(c,?nextc))??
          ????????????return?nextc?==?0;??
          ????}??
          }??

          而調(diào)用countDown就更簡單了,每次對state遞減,直到為0時才會調(diào)用doReleaseShared釋放阻塞的線程。

          最后需要注意的是CountDownLatch的計數(shù)是不支持重置的,每次使用都要新建一個。這篇倒計時器CountDownLatch》舍得看一下

          CyclicBarrier

          CyclicBarrier和CountDownLatch使用差不多,不過它只有await方法。CyclicBarrier在創(chuàng)建時同樣需要指定一個計數(shù),當(dāng)調(diào)用await的次數(shù)達(dá)到計數(shù)時,所有線程就會同時喚醒,相當(dāng)于設(shè)置了一個“起跑線”,需要等所有運(yùn)動員都到達(dá)這個“起跑線”后才能一起開跑。

          另外它還支持重置計數(shù),提供了reset方法。

          public?CyclicBarrier(int?parties)?{??
          ????this(parties,?null);??
          }??

          public?CyclicBarrier(int?parties,?Runnable?barrierAction)?{??
          ????if?(parties?<=?0)?throw?new?IllegalArgumentException();??
          ????this.parties?=?parties;??
          ????this.count?=?parties;??
          ????this.barrierCommand?=?barrierAction;??
          }??

          CyclicBarrier提供了兩個構(gòu)造方法,我們可以傳入一個Runnable類型的回調(diào)函數(shù),當(dāng)達(dá)到計數(shù)時,由最后一個調(diào)用await的線程觸發(fā)執(zhí)行。

          public?int?await()?throws?InterruptedException,?BrokenBarrierException?{??
          ????try?{??
          ????????return?dowait(false,?0L);??
          ????}?catch?(TimeoutException?toe)?{??
          ????????throw?new?Error(toe);?//?cannot?happen??
          ????}??
          }??

          private?int?dowait(boolean?timed,?long?nanos)??
          ????throws?InterruptedException,?BrokenBarrierException,??
          ???????????TimeoutException?{??
          ????final?ReentrantLock?lock?=?this.lock;??
          ????lock.lock();??
          ????try?{??
          ????????final?Generation?g?=?generation;??

          ????????if?(g.broken)??
          ????????????throw?new?BrokenBarrierException();??

          //?是否打斷,打斷會喚醒所有條件隊列中的線程??
          ????????if?(Thread.interrupted())?{??
          ????????????breakBarrier();??
          ????????????throw?new?InterruptedException();??
          ????????}??

          //?計數(shù)為0時,喚醒條件隊列中的所有線程??
          ????????int?index?=?--count;??
          ????????if?(index?==?0)?{??//?tripped??
          ????????????boolean?ranAction?=?false;??
          ????????????try?{??
          ????????????????final?Runnable?command?=?barrierCommand;??
          ????????????????if?(command?!=?null)??
          ????????????????????command.run();??
          ????????????????ranAction?=?true;??
          ????????????????nextGeneration();??
          ????????????????return?0;??
          ????????????}?finally?{??
          ????????????????if?(!ranAction)??
          ????????????????????breakBarrier();??
          ????????????}??
          ????????}??

          ????????for?(;;)?{??
          ????????????try?{??
          ?????????????//?不帶超時時間直接進(jìn)入條件隊列等待??
          ????????????????if?(!timed)??
          ????????????????????trip.await();??
          ????????????????else?if?(nanos?>?0L)??
          ????????????????????nanos?=?trip.awaitNanos(nanos);??
          ????????????}?catch?(InterruptedException?ie)?{??
          ????????????????if?(g?==?generation?&&?!?g.broken)?{??
          ????????????????????breakBarrier();??
          ????????????????????throw?ie;??
          ????????????????}?else?{??
          ????????????????????Thread.currentThread().interrupt();??
          ????????????????}??
          ????????????}??

          ????????????if?(g.broken)??
          ????????????????throw?new?BrokenBarrierException();??

          ????????????if?(g?!=?generation)??
          ????????????????return?index;??

          ????????????if?(timed?&&?nanos?<=?0L)?{??
          ????????????????breakBarrier();??
          ????????????????throw?new?TimeoutException();??
          ????????????}??
          ????????}??
          ????}?finally?{??
          ????????lock.unlock();??
          ????}??
          }??

          private?void?nextGeneration()?{??
          ????//?signal?completion?of?last?generation??
          ????trip.signalAll();??
          ????//?set?up?next?generation??
          ????count?=?parties;??
          ????generation?=?new?Generation();??
          }??

          這里邏輯比較清晰,就是使用了ReentrantLock以及Condition來實(shí)現(xiàn)。在構(gòu)造方法中我們可以看到保存了兩個變量count和parties,每次調(diào)用await都會對count變量遞減,count不為0時都會進(jìn)入到trip條件隊列中等待,否則就會通過signalAll方法喚醒所有的線程,并將parties重新賦值給count。

          reset方法很簡單,這里不詳細(xì)分析了。

          Semaphore

          Semaphore是信號的意思,或者說許可,可以用來控制最大并發(fā)量。初始定義好有幾個信號,然后在需要獲取信號的地方調(diào)用acquire方法,執(zhí)行完成后,需要調(diào)用release方法回收信號。

          public?Semaphore(int?permits)?{??
          ????sync?=?new?NonfairSync(permits);??
          }??

          public?Semaphore(int?permits,?boolean?fair)?{??
          ????sync?=?fair???new?FairSync(permits)?:?new?NonfairSync(permits);??
          }??

          它也有兩個構(gòu)造方法,可以指定公平或是非公平,而permits就是state的值。

          public?void?acquire()?throws?InterruptedException?{??
          ????sync.acquireSharedInterruptibly(1);??
          }??

          //?非公平方式??
          final?int?nonfairTryAcquireShared(int?acquires)?{??
          ????for?(;;)?{??
          ????????int?available?=?getState();??
          ????????int?remaining?=?available?-?acquires;??
          ????????if?(remaining?????????????compareAndSetState(available,?remaining))??
          ????????????return?remaining;??
          ????}??
          }??

          //?公平方式??
          protected?int?tryAcquireShared(int?acquires)?{??
          ????for?(;;)?{??
          ????????if?(hasQueuedPredecessors())??
          ????????????return?-1;??
          ????????int?available?=?getState();??
          ????????int?remaining?=?available?-?acquires;??
          ????????if?(remaining?????????????compareAndSetState(available,?remaining))??
          ????????????return?remaining;??
          ????}??
          }??

          acquire方法和CountDownLatch是一樣的,只是tryAcquireShared區(qū)分了公平和非公平方式。獲取到信號相當(dāng)于加共享鎖成功,否則則進(jìn)入隊列阻塞等待;而release方法和讀鎖解鎖方式也是一樣的,只是每次release都會將state+1。

          總結(jié)

          本文詳細(xì)分析了AQS的核心原理、鎖的實(shí)現(xiàn)以及常用的相關(guān)組件,掌握其原理能讓我們準(zhǔn)確的使用JUC下面的鎖以及線程協(xié)作組件。

          另外AQS代碼設(shè)計是非常精良的,有非常多的細(xì)節(jié),精簡的代碼中把所有的情況都考慮到了,細(xì)細(xì)體味對我們自身編碼能力也會有很大的提高。

          文章錯誤和不清楚的地方歡迎批評指出,另外超時相關(guān)的API本文都未涉及到,讀者可自行分析。





          關(guān)注Java技術(shù)棧看更多干貨



          戳原文,獲取精選面試題!
          瀏覽 46
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费看做爱的网站 | 黑人大香蕉 | 91女人18毛片水多国产 | 亚洲第一网站视频香蕉视频 | 日日躁夜夜躁 |