<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          1.5w字,30圖帶你徹底掌握 AQS!

          共 10467字,需瀏覽 21分鐘

           ·

          2020-10-31 01:51

          前言

          AQS( AbstractQueuedSynchronizer )是一個(gè)用來構(gòu)建鎖和同步器(所謂同步,是指線程之間的通信、協(xié)作)的框架,Lock 包中的各種鎖(如常見的 ReentrantLock, ReadWriteLock), concurrent?包中的各種同步器(如 CountDownLatch, Semaphore, CyclicBarrier)都是基于 AQS 來構(gòu)建,所以理解 AQS 的實(shí)現(xiàn)原理至關(guān)重要,AQS 也是面試中區(qū)分侯選人的常見考點(diǎn),我們務(wù)必要掌握,本文將用循序漸近地介紹 AQS,相信大家看完一定有收獲。文章目錄如下

          1. 鎖原理 - 信號量 vs 管程

          2. AQS 實(shí)現(xiàn)原理

          3. AQS 源碼剖析

          4. 如何利用 AQS 自定義一個(gè)互斥鎖

          ?

          鎖原理 - 信號量 vs 管程

          在并發(fā)編程領(lǐng)域,有兩大核心問題:互斥同步,互斥即同一時(shí)刻只允許一個(gè)線程訪問共享資源,同步,即線程之間如何通信、協(xié)作,一般這兩大問題可以通過信號量管程來解決。

          信號量

          信號量(Semaphore)是操作系統(tǒng)提供的一種進(jìn)程間常見的通信方式,主要用來協(xié)調(diào)并發(fā)程序?qū)蚕碣Y源的訪問,操作系統(tǒng)可以保證對信號量操作的原子性。它是怎么實(shí)現(xiàn)的呢。

          • 信號量由一個(gè)共享整型變量 S 和兩個(gè)原子操作 PV 組成,S 只能通過 P 和 V 操作來改變

          • P 操作:即請求資源,意味著 S 要減 1,如果 S < ?0, 則表示沒有資源了,此時(shí)線程要進(jìn)入等待隊(duì)列(同步隊(duì)列)等待

          • V 操作: ?即釋放資源,意味著 S 要加 1, 如果 S 小于等于 0,說明等待隊(duì)列里有線程,此時(shí)就需要喚醒線程。

          示意圖如下

          信號量機(jī)制的引入解決了進(jìn)程同步和互斥問題,但信號量的大量同步操作分散在各個(gè)進(jìn)程中不便于管理,還有可能導(dǎo)致系統(tǒng)死鎖。如:生產(chǎn)者消費(fèi)者問題中將P、V顛倒可能死鎖(見文末參考鏈接),另外條件越多,需要的信號量就越多,需要更加謹(jǐn)慎地處理信號量之間的處理順序,否則很容易造成死鎖現(xiàn)象。

          基于信號量給編程帶來的隱患,于是有了提出了對開發(fā)者更加友好的并發(fā)編程模型-管程

          管程

          Dijkstra 于 1971 年提出:把所有進(jìn)程對某一種臨界資源的同步操作都集中起來,構(gòu)成一個(gè)所謂的秘書進(jìn)程。凡要訪問該臨界資源的進(jìn)程,都需先報(bào)告秘書,由秘書來實(shí)現(xiàn)諸進(jìn)程對同一臨界資源的互斥使用,這種機(jī)制就是管程。

          管程是一種在信號量機(jī)制上進(jìn)行改進(jìn)的并發(fā)編程模型,解決了信號量在臨界區(qū)的 PV 操作上配對的麻煩,把配對的 PV 操作集中在一起而形成的并發(fā)編程方法理論,極大降低了使用和理解成本。

          管程由四部分組成:

          1. 管程內(nèi)部的共享變量。

          2. 管程內(nèi)部的條件變量。

          3. 管程內(nèi)部并行執(zhí)行的進(jìn)程。

          4. 對于局部與管程內(nèi)部的共享數(shù)據(jù)設(shè)置初始值的語句。

          由此可見,管程就是一個(gè)對象監(jiān)視器。任何線程想要訪問該資源(共享變量),就要排隊(duì)進(jìn)入監(jiān)控范圍。進(jìn)入之后,接受檢查,不符合條件,則要繼續(xù)等待,直到被通知,然后繼續(xù)進(jìn)入監(jiān)視器。

          需要注意的事,信號量和管程兩者是等價(jià)的,信號量可以實(shí)現(xiàn)管程,管程也可以實(shí)現(xiàn)信號量,只是兩者的表現(xiàn)形式不同而已,管程對開發(fā)者更加友好。

          兩者的區(qū)別如下

          管程為了解決信號量在臨界區(qū)的 PV 操作上的配對的麻煩,把配對的 PV 操作集中在一起,并且加入了條件變量的概念,使得在多條件下線程間的同步實(shí)現(xiàn)變得更加簡單。

          怎么理解管程中的入口等待隊(duì)列,共享變量,條件變量等概念,有時(shí)候技術(shù)上的概念較難理解,我們可以借助生活中的場景來幫助我們理解,就以我們的就醫(yī)場景為例來簡單說明一下,正常的就醫(yī)流程如下:

          1. 病人去掛號后,去侯診室等待叫號

          2. 叫到自己時(shí),就可以進(jìn)入就診室就診了

          3. 就診時(shí),有兩種情況,一種是醫(yī)生很快就確定病人的病,并作出診斷,診斷完成后,就通知下一位病人進(jìn)來就診,一種是醫(yī)生無法確定病因,需要病人去做個(gè)驗(yàn)血 / CT 檢查才能確定病情,于是病人就先去驗(yàn)個(gè)血 / ?CT

          4. 病人驗(yàn)完血 / 做完 CT 后,重新取號,等待叫號(進(jìn)入入口等待隊(duì)列)

          5. 病人等到自己的號,病人又重新拿著驗(yàn)血 / CT 報(bào)告去找醫(yī)生就診

          整個(gè)流程如下

          那么管程是如何解決互斥同步的呢

          首先來看互斥,上文中醫(yī)生即共享資源(也即共享變量),就診室即為臨界區(qū),病人即線程,任何病人如果想要訪問臨界區(qū),必須首先獲取共享資源(即醫(yī)生),入口一次只允許一個(gè)線程經(jīng)過,在共享資源被占有的情況下,如果再有線程想占有共享資源,就需要到等待隊(duì)列去等候,等到獲取共享資源的線程釋放資源后,等待隊(duì)列中的線程就可以去競爭共享資源了,這樣就解決了互斥問題,所以本質(zhì)上管程是通過將共享資源及其對共享資源的操作(線程安全地獲取和釋放)封裝起來來保證互斥性的。

          再來看同步,同步是通過文中的條件變量及其等待隊(duì)列實(shí)現(xiàn)的,同步的實(shí)現(xiàn)分兩種情況

          1. 病人進(jìn)入就診室后,無需做驗(yàn)血 / CT 等操作,于是醫(yī)生診斷完成后,就會釋放共享資源(解鎖)去通知(notify,notifyAll)入口等待隊(duì)列的下一個(gè)病人,下一個(gè)病人聽到叫號后就能看醫(yī)生了。

          2. 如果病人進(jìn)入就診室后需要做驗(yàn)血 / CT 等操作,會去驗(yàn)血 / CT 隊(duì)列(條件隊(duì)列)排隊(duì), 同時(shí)釋放共享變量(醫(yī)生),通知入口等待隊(duì)列的其他病人(線程)去獲取共享變量(醫(yī)生),獲得許可的線程執(zhí)行完臨界區(qū)的邏輯后會喚醒條件變量等待隊(duì)列中的線程,將它放到入口等待隊(duì)列中 ,等到其獲取共享變量(醫(yī)生)時(shí),即可進(jìn)入入口(臨界區(qū))處理。

          在 Java 里,鎖大多是依賴于管程來實(shí)現(xiàn)的,以大家熟悉的內(nèi)置鎖 synchronized 為例,它的實(shí)現(xiàn)原理如下。

          可以看到 synchronized 鎖也是基于管程實(shí)現(xiàn)的,只不過它只有且只有一個(gè)條件變量(就是鎖對象本身)而已,這也是為什么JDK 要實(shí)現(xiàn) Lock 鎖的原因之一,Lock 支持多個(gè)條件變量。

          通過這樣的類比,相信大家對管程的工作機(jī)制有了比較清晰的認(rèn)識,為啥要花這么大的力氣介紹管程呢,一來管程是解決并發(fā)問題的萬能鑰匙,二來?AQS 是基于 Java 并發(fā)包中管程的一種實(shí)現(xiàn),所以理解管程對我們理解 AQS 會大有幫助,接下來我們就來看看 AQS 是如何工作的。

          ?

          AQS 實(shí)現(xiàn)原理

          AQS 全稱是 AbstractQueuedSynchronizer,是一個(gè)用來構(gòu)建同步器的框架,它維護(hù)了一個(gè)共享資源 state 和一個(gè) FIFO 的等待隊(duì)列(即上文中管程的入口等待隊(duì)列),底層利用了 CAS 機(jī)制來保證操作的原子性。

          AQS 實(shí)現(xiàn)鎖的主要原理如下:

          以實(shí)現(xiàn)獨(dú)占鎖為例(即當(dāng)前資源只能被一個(gè)線程占有),其實(shí)現(xiàn)原理如下:state 初始化 0,在多線程條件下,線程要執(zhí)行臨界區(qū)的代碼,必須首先獲取 state,某個(gè)線程獲取成功之后, state 加 1,其他線程再獲取的話由于共享資源已被占用,所以會到 FIFO 等待隊(duì)列去等待,等占有 state 的線程執(zhí)行完臨界區(qū)的代碼釋放資源( state 減 1)后,會喚醒 FIFO 中的下一個(gè)等待線程(head 中的下一個(gè)結(jié)點(diǎn))去獲取 state。

          state 由于是多線程共享變量,所以必須定義成 volatile,以保證 state 的可見性, 同時(shí)雖然 volatile 能保證可見性,但不能保證原子性,所以 AQS 提供了對 state 的原子操作方法,保證了線程安全。

          另外 AQS 中實(shí)現(xiàn)的 FIFO 隊(duì)列(CLH 隊(duì)列)其實(shí)是雙向鏈表實(shí)現(xiàn)的,由 head, tail 節(jié)點(diǎn)表示,head 結(jié)點(diǎn)代表當(dāng)前占用的線程,其他節(jié)點(diǎn)由于暫時(shí)獲取不到鎖所以依次排隊(duì)等待鎖釋放。

          所以我們不難明白 AQS 的如下定義:

          public?abstract?class?AbstractQueuedSynchronizer
          ??extends?AbstractOwnableSynchronizer
          ????implements?java.io.Serializable?
          {
          ????//?以下為雙向鏈表的首尾結(jié)點(diǎn),代表入口等待隊(duì)列
          ????private?transient?volatile?Node?head;
          ????private?transient?volatile?Node?tail;
          ????//?共享變量?state
          ????private?volatile?int?state;
          ????//?cas?獲取?/?釋放?state,保證線程安全地獲取鎖
          ????protected?final?boolean?compareAndSetState(int?expect,?int?update)?{
          ????????//?See?below?for?intrinsics?setup?to?support?this
          ????????return?unsafe.compareAndSwapInt(this,?stateOffset,?expect,?update);
          ????}
          ????//?...
          ?}

          ?

          AQS 源碼剖析

          ReentrantLock 是我們比較常用的一種鎖,也是基于 AQS 實(shí)現(xiàn)的,所以接下來我們就來分析一下 ReentrantLock 鎖的實(shí)現(xiàn)來一探 AQS 究竟。本文將會采用圖文并茂的方式讓大家理解 AQS 的實(shí)現(xiàn)原理,大家在學(xué)習(xí)過程中,可以多類比一下上文中就診的例子,相信會有助于理解。

          首先我們要知道 ReentrantLock 是獨(dú)占鎖,也有公平和非公平兩種鎖模式,什么是獨(dú)占與有共享模式,什么又是公平鎖與非公平鎖

          與獨(dú)占鎖對應(yīng)的是共享鎖,這兩者有什么區(qū)別呢

          獨(dú)占鎖:即其他線程只有在占有鎖的線程釋放后才能競爭鎖,有且只有一個(gè)線程能競爭成功(醫(yī)生只有一個(gè),一次只能看一個(gè)病人)

          共享鎖:即共享資源可以被多個(gè)線程同時(shí)占有,直到共享資源被占用完畢(多個(gè)醫(yī)生,可以看多個(gè)病人),常見的有讀寫鎖 ReadWriteLock, CountdownLatch,兩者的區(qū)別如下


          什么是公平鎖與非公平鎖

          還是以就醫(yī)為例,所謂公平鎖即大家取號后老老實(shí)實(shí)按照先來后到的順序在侯診室依次等待叫號,如果是非公平鎖呢,新來的病人(線程)很霸道,不取號排隊(duì) ,直接去搶先看病,占有醫(yī)生(不一定成功)

          公平鎖與非公平鎖


          本文我們將會重點(diǎn)分析獨(dú)占,非公平模式的源碼實(shí)現(xiàn),不分析共享模式與 Condition 的實(shí)現(xiàn),因?yàn)槠饰隽霜?dú)占鎖的實(shí)現(xiàn),由于原理都是相似的,再分析共享與 Condition 就不難了。

          首先我們先來看下 ReentrantLock 的使用方法

          //?1.?初始化可重入鎖
          private?ReentrantLock?lock?=?new?ReentrantLock();
          public?void?run()?{
          ????//?加鎖
          ????lock.lock();
          ????try?{
          ????????//?2.?執(zhí)行臨界區(qū)代碼
          ????}?catch?(InterruptedException?e)?{
          ????????e.printStackTrace();
          ????}?finally?{
          ????????//?3.?解鎖
          ????????lock.unlock();
          ????}
          }

          第一步是初始化可重入鎖,可以看到默認(rèn)使用的是非公平鎖機(jī)制

          public?ReentrantLock()?{
          ????sync?=?new?NonfairSync();
          }

          當(dāng)然你也可以用如下構(gòu)造方法來指定使用公平鎖:

          public?ReentrantLock(boolean?fair)?{
          ????sync?=?fair???new?FairSync()?:?new?NonfairSync();
          }

          畫外音: FairSync 和 NonfairSync 是 ReentrantLock 實(shí)現(xiàn)的內(nèi)部類,分別指公平和非公平模式,ReentrantLock ReentrantLock 的加鎖(lock),解鎖(unlock)在內(nèi)部具體都是調(diào)用的 FairSync,NonfairSync 的加鎖和解鎖方法。

          幾個(gè)類的關(guān)系如下:

          我們先來剖析下非公平鎖(NonfairSync)的實(shí)現(xiàn)方式,來看上述示例代碼的第二步:加鎖,由于默認(rèn)的是非公平鎖的加鎖,所以我們來分析下非公平鎖是如何加鎖的


          可以看到 lock 方法主要有兩步

          1. 使用 CAS 來獲取 state 資源,如果成功設(shè)置 1,代表 state 資源獲取鎖成功,此時(shí)記錄下當(dāng)前占用 state 的線程?setExclusiveOwnerThread(Thread.currentThread());

          2. 如果 CAS 設(shè)置 state 為 1 失敗(代表獲取鎖失敗),則執(zhí)行 acquire(1) 方法,這個(gè)方法是 AQS 提供的方法,如下

          public?final?void?acquire(int?arg)?{
          ????if?(!tryAcquire(arg)?&&
          ????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
          ????????selfInterrupt();
          }

          tryAcquire 剖析

          首先 調(diào)用 tryAcquire 嘗試著獲取 state,如果成功,則跳過后面的步驟。如果失敗,則執(zhí)行 acquireQueued 將線程加入 CLH 等待隊(duì)列中。

          先來看下 tryAcquire 方法,這個(gè)方法是 AQS 提供的一個(gè)模板方法,最終由其 AQS 具體的實(shí)現(xiàn)類(Sync)實(shí)現(xiàn),由于執(zhí)行的是非公平鎖邏輯,執(zhí)行的代碼如下:

          final?boolean?nonfairTryAcquire(int?acquires)?{
          ????final?Thread?current?=?Thread.currentThread();
          ????int?c?=?getState();

          ????if?(c?==?0)?{
          ????????//?如果?c?等于0,表示此時(shí)資源是空閑的(即鎖是釋放的),再用?CAS?獲取鎖
          ????????if?(compareAndSetState(0,?acquires))?{
          ????????????setExclusiveOwnerThread(current);
          ????????????return?true;
          ????????}
          ????}
          ????else?if?(current?==?getExclusiveOwnerThread())?{
          ????????//?此條件表示之前已有線程獲得鎖,且此線程再一次獲得了鎖,獲取資源次數(shù)再加?1,這也映證了?ReentrantLock?為可重入鎖
          ????????int?nextc?=?c?+?acquires;
          ????????if?(nextc?0
          )?//?overflow
          ????????????throw?new?Error("Maximum?lock?count?exceeded");
          ????????setState(nextc);
          ????????return?true;
          ????}
          ????return?false;
          }

          此段代碼可知鎖的獲取主要分兩種情況

          1. state 為 0 時(shí),代表鎖已經(jīng)被釋放,可以去獲取,于是使用 CAS 去重新獲取鎖資源,如果獲取成功,則代表競爭鎖成功,使用 setExclusiveOwnerThread(current) 記錄下此時(shí)占有鎖的線程,看到這里的 CAS,大家應(yīng)該不難理解為啥當(dāng)前實(shí)現(xiàn)是非公平鎖了,因?yàn)殛?duì)列中的線程與新線程都可以 CAS 獲取鎖啊,新來的線程不需要排隊(duì)

          2. 如果 state 不為 0,代表之前已有線程占有了鎖,如果此時(shí)的線程依然是之前占有鎖的線程(current == getExclusiveOwnerThread() 為 true),代表此線程再一次占有了鎖(可重入鎖),此時(shí)更新 state,記錄下鎖被占有的次數(shù)(鎖的重入次數(shù)),這里的 setState 方法不需要使用 CAS 更新,因?yàn)榇藭r(shí)的鎖就是當(dāng)前線程占有的,其他線程沒有機(jī)會進(jìn)入這段代碼執(zhí)行。所以此時(shí)更新 state 是線程安全的。

          假設(shè)當(dāng)前 state = 0,即鎖不被占用,現(xiàn)在有 T1, T2, T3 這三個(gè)線程要去競爭鎖

          假設(shè)現(xiàn)在 T1 獲取鎖成功,則兩種情況分別為 1、 T1 首次獲取鎖成功

          2、 T1 再次獲取鎖成功,state 再加 1,表示鎖被重入了兩次,當(dāng)前如果 T1一直申請占用鎖成功,state 會一直累加

          acquireQueued 剖析

          如果 tryAcquire(arg) 執(zhí)行失敗,代表獲取鎖失敗,則執(zhí)行 acquireQueued 方法,將線程加入 FIFO 等待隊(duì)列

          public?final?void?acquire(int?arg)?{
          ????if?(!tryAcquire(arg)?&&
          ????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
          ????????selfInterrupt();
          }

          所以接下來我們來看看 acquireQueued 的執(zhí)行邏輯,首先會調(diào)用?addWaiter(Node.EXCLUSIVE)??將包含有當(dāng)前線程的 Node 節(jié)點(diǎn)入隊(duì), Node.EXCLUSIVE 代表此結(jié)點(diǎn)為獨(dú)占模式

          再來看下 addWaiter 是如何實(shí)現(xiàn)的

          private?Node?addWaiter(Node?mode)?{
          ????Node?node?=?new?Node(Thread.currentThread(),?mode);
          ????Node?pred?=?tail;
          ????//?如果尾結(jié)點(diǎn)不為空,則用?CAS?將獲取鎖失敗的線程入隊(duì)
          ????if?(pred?!=?null)?{
          ????????node.prev?=?pred;
          ????????if?(compareAndSetTail(pred,?node))?{
          ????????????pred.next?=?node;
          ????????????return?node;
          ????????}
          ????}
          ????//?如果結(jié)點(diǎn)為空,執(zhí)行?enq?方法
          ????enq(node);
          ????return?node;
          }

          這段邏輯比較清楚,首先是獲取 FIFO 隊(duì)列的尾結(jié)點(diǎn),如果尾結(jié)點(diǎn)存在,則采用 CAS 的方式將等待線程入隊(duì),如果尾結(jié)點(diǎn)為空則執(zhí)行 enq 方法

          private?Node?enq(final?Node?node)?{
          ????for?(;;)?{
          ????????Node?t?=?tail;
          ????????if?(t?==?null)?{
          ????????????//?尾結(jié)點(diǎn)為空,說明?FIFO?隊(duì)列未初始化,所以先初始化其頭結(jié)點(diǎn)
          ????????????if?(compareAndSetHead(new?Node()))
          ????????????????tail?=?head;
          ????????}?else?{
          ????????????//?尾結(jié)點(diǎn)不為空,則將等待線程入隊(duì)
          ????????????node.prev?=?t;
          ????????????if?(compareAndSetTail(t,?node))?{
          ????????????????t.next?=?node;
          ????????????????return?t;
          ????????????}
          ????????}
          ????}
          }

          首先判斷 tail 是否為空,如果為空說明 FIFO 隊(duì)列的 head,tail 還未構(gòu)建,此時(shí)先構(gòu)建頭結(jié)點(diǎn),構(gòu)建之后再用 CAS 的方式將此線程結(jié)點(diǎn)入隊(duì)

          使用 CAS 創(chuàng)建 head 節(jié)點(diǎn)的時(shí)候只是簡單調(diào)用了 new Node() 方法,并不像其他節(jié)點(diǎn)那樣記錄 thread,這是為啥

          因?yàn)?head 結(jié)點(diǎn)為虛結(jié)點(diǎn),它只代表當(dāng)前有線程占用了 state,至于占用 state 的是哪個(gè)線程,其實(shí)是調(diào)用了上文的 setExclusiveOwnerThread(current) ,即記錄在 exclusiveOwnerThread 屬性里。

          執(zhí)行完 addWaiter 后,線程入隊(duì)成功,現(xiàn)在就要看最后一個(gè)最關(guān)鍵的方法 acquireQueued 了,這個(gè)方法有點(diǎn)難以理解,先不急,我們先用三個(gè)線程來模擬一下之前的代碼對應(yīng)的步驟

          1、假設(shè) T1 獲取鎖成功,由于此時(shí) FIFO 未初始化,所以先創(chuàng)建 head 結(jié)點(diǎn)


          2、此時(shí) T2 或 T3 再去競爭 state 失敗,入隊(duì),如下圖:


          好了,現(xiàn)在問題來了, T2,T3 入隊(duì)后怎么處理,是馬上阻塞嗎,馬上阻塞意味著線程從運(yùn)行態(tài)轉(zhuǎn)為阻塞態(tài) ,涉及到用戶態(tài)向內(nèi)核態(tài)的切換,而且喚醒后也要從內(nèi)核態(tài)轉(zhuǎn)為用戶態(tài),開銷相對比較大,所以 AQS 對這種入隊(duì)的線程采用的方式是讓它們自旋來競爭鎖,如下圖示

          不過聰明的你可能發(fā)現(xiàn)了一個(gè)問題,如果當(dāng)前鎖是獨(dú)占鎖,如果鎖一直被被 T1 占有, T2,T3 一直自旋沒太大意義,反而會占用 CPU,影響性能,所以更合適的方式是它們自旋一兩次競爭不到鎖后識趣地阻塞以等待前置節(jié)點(diǎn)釋放鎖后再來喚醒它。

          另外如果鎖在自旋過程中被中斷了,或者自旋超時(shí)了,應(yīng)該處于「取消」?fàn)顟B(tài)。

          基于每個(gè) Node 可能所處的狀態(tài),AQS 為其定義了一個(gè)變量 waitStatus,根據(jù)這個(gè)變量值對相應(yīng)節(jié)點(diǎn)進(jìn)行相關(guān)的操作,我們一起來看這看這個(gè)變量都有哪些值,是時(shí)候看一個(gè) Node 結(jié)點(diǎn)的屬性定義了

          static?final?class?Node?{
          ????static?final?Node?SHARED?=?new?Node();//標(biāo)識等待節(jié)點(diǎn)處于共享模式
          ????static?final?Node?EXCLUSIVE?=?null;//標(biāo)識等待節(jié)點(diǎn)處于獨(dú)占模式

          ????static?final?int?CANCELLED?=?1;?//由于超時(shí)或中斷,節(jié)點(diǎn)已被取消
          ????static?final?int?SIGNAL?=?-1;??//?節(jié)點(diǎn)阻塞(park)必須在其前驅(qū)結(jié)點(diǎn)為?SIGNAL?的狀態(tài)下才能進(jìn)行,如果結(jié)點(diǎn)為?SIGNAL,則其釋放鎖或取消后,可以通過?unpark?喚醒下一個(gè)節(jié)點(diǎn),
          ????static?final?int?CONDITION?=?-2;//表示線程在等待條件變量(先獲取鎖,加入到條件等待隊(duì)列,然后釋放鎖,等待條件變量滿足條件;只有重新獲取鎖之后才能返回)
          ????static?final?int?PROPAGATE?=?-3;//表示后續(xù)結(jié)點(diǎn)會傳播喚醒的操作,共享模式下起作用

          ????//等待狀態(tài):對于condition節(jié)點(diǎn),初始化為CONDITION;其它情況,默認(rèn)為0,通過CAS操作原子更新
          ????volatile?int?waitStatus;

          通過狀態(tài)的定義,我們可以猜測一下 AQS 對線程自旋的處理:如果當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)不為 head,且它的狀態(tài)為 SIGNAL,則結(jié)點(diǎn)進(jìn)入阻塞狀態(tài)。來看下代碼以映證我們的猜測:

          final?boolean?acquireQueued(final?Node?node,?int?arg)?{
          ????boolean?failed?=?true;
          ????try?{
          ????????boolean?interrupted?=?false;
          ????????for?(;;)?{
          ????????????final?Node?p?=?node.predecessor();
          ????????????//?如果前一個(gè)節(jié)點(diǎn)是?head,則嘗試自旋獲取鎖
          ????????????if?(p?==?head?&&?tryAcquire(arg))?{
          ????????????????//??將?head?結(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn),原?head?結(jié)點(diǎn)出隊(duì)
          ????????????????setHead(node);
          ????????????????p.next?=?null;?//?help?GC
          ????????????????failed?=?false;
          ????????????????return?interrupted;
          ????????????}
          ????????????//?如果前一個(gè)節(jié)點(diǎn)不是?head?或者競爭鎖失敗,則進(jìn)入阻塞狀態(tài)
          ????????????if?(shouldParkAfterFailedAcquire(p,?node)?&&
          ????????????????parkAndCheckInterrupt())
          ????????????????interrupted?=?true;
          ????????}
          ????}?finally?{
          ????????if?(failed)
          ????????????//?如果線程自旋中因?yàn)楫惓5仍颢@取鎖最終失敗,則調(diào)用此方法
          ????????????cancelAcquire(node);
          ????}
          }

          先來看第一種情況,如果當(dāng)前結(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)是 head 結(jié)點(diǎn),且獲取鎖(tryAcquire)成功的處理

          可以看到主要的處理就是把 head 指向當(dāng)前節(jié)點(diǎn),并且讓原 head 結(jié)點(diǎn)出隊(duì),這樣由于原 head 不可達(dá), 會被垃圾回收。

          注意其中 setHead 的處理

          private?void?setHead(Node?node)?{
          ????head?=?node;
          ????node.thread?=?null;
          ????node.prev?=?null;
          }

          將 head 設(shè)置成當(dāng)前結(jié)點(diǎn)后,要把節(jié)點(diǎn)的 thread, pre 設(shè)置成 null,因?yàn)橹胺治鲞^了,head 是虛節(jié)點(diǎn),不保存除 waitStatus(結(jié)點(diǎn)狀態(tài))的其他信息,所以這里把 thread ,pre 置為空,因?yàn)檎加墟i的線程由 exclusiveThread 記錄了,如果 head 再記錄 thread 不僅多此一舉,反而在釋放鎖的時(shí)候要多操作一遍 head 的 thread 釋放。

          如果前一個(gè)節(jié)點(diǎn)不是 head 或者競爭鎖失敗,則首先調(diào)用 ?shouldParkAfterFailedAcquire 方法判斷鎖是否應(yīng)該停止自旋進(jìn)入阻塞狀態(tài):

          private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{
          ????int?ws?=?pred.waitStatus;
          ????????
          ????if?(ws?==?Node.SIGNAL)
          ???????//?1.?如果前置頂點(diǎn)的狀態(tài)為?SIGNAL,表示當(dāng)前節(jié)點(diǎn)可以阻塞了
          ????????return?true;
          ????if?(ws?>?0)?{
          ????????//?2.?移除取消狀態(tài)的結(jié)點(diǎn)
          ????????do?{
          ????????????node.prev?=?pred?=?pred.prev;
          ????????}?while?(pred.waitStatus?>?0);
          ????????pred.next?=?node;
          ????}?else?{
          ????????//?3.?如果前置節(jié)點(diǎn)的?ws?不為?0,則其設(shè)置為?SIGNAL,
          ????????compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL);
          ????}
          ????return?false;
          }

          這一段代碼有點(diǎn)繞,需要稍微動點(diǎn)腦子,按以上步驟一步步來看

          1、 首先我們要明白,根據(jù)之前 Node 類的注釋,如果前驅(qū)節(jié)點(diǎn)為 SIGNAL,則當(dāng)前節(jié)點(diǎn)可以進(jìn)入阻塞狀態(tài)。

          如圖示:T2,T3 的前驅(qū)節(jié)點(diǎn)的 waitStatus 都為 SIGNAL,所以 T2,T3 此時(shí)都可以阻塞。

          2、如果前驅(qū)節(jié)點(diǎn)為取消狀態(tài),則前驅(qū)節(jié)點(diǎn)需要移除,這些采用了一個(gè)更巧妙的方法,把所有當(dāng)前節(jié)點(diǎn)之前所有 waitStatus 為取消狀態(tài)的節(jié)點(diǎn)全部移除,假設(shè)有四個(gè)線程如下,T2,T3 為取消狀態(tài),則執(zhí)行邏輯后如下圖所示,T2, T3 節(jié)點(diǎn)會被 GC。


          3、如果前驅(qū)節(jié)點(diǎn)小于等于 0,則需要首先將其前驅(qū)節(jié)點(diǎn)置為 SIGNAL,因?yàn)榍拔奈覀兎治鲞^,當(dāng)前節(jié)點(diǎn)進(jìn)入阻塞的一個(gè)條件是前驅(qū)節(jié)點(diǎn)必須為 SIGNAL,這樣下一次自旋后發(fā)現(xiàn)前驅(qū)節(jié)點(diǎn)為 SIGNAL,就會返回 true(即步驟 1)

          shouldParkAfterFailedAcquire 返回 true 代表線程可以進(jìn)入阻塞中斷,那么下一步 parkAndCheckInterrupt 就該讓線程阻塞了

          private?final?boolean?parkAndCheckInterrupt()?{
          ????//?阻塞線程
          ????LockSupport.park(this);
          ????//?返回線程是否中斷過,并且清除中斷狀態(tài)(在獲得鎖后會補(bǔ)一次中斷)
          ????return?Thread.interrupted();
          }

          這里的阻塞線程很容易理解,但為啥要判斷線程是否中斷過呢,因?yàn)槿绻€程在阻塞期間收到了中斷,喚醒(轉(zhuǎn)為運(yùn)行態(tài))獲取鎖后(acquireQueued 為 true)需要補(bǔ)一個(gè)中斷,如下所示:

          public?final?void?acquire(int?arg)?{
          ????if?(!tryAcquire(arg)?&&
          ????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
          ????????//?如果是因?yàn)橹袛鄦拘训木€程,獲取鎖后需要補(bǔ)一下中斷
          ????????selfInterrupt();
          }

          至此,獲取鎖的流程已經(jīng)分析完畢,不過還有一個(gè)疑惑我們還沒解開:前文不是說 Node 狀態(tài)為取消狀態(tài)會被取消嗎,那 Node 什么時(shí)候會被設(shè)置為取消狀態(tài)呢。

          回頭看 acquireQueued

          final?boolean?acquireQueued(final?Node?node,?int?arg)?{
          ????boolean?failed?=?true;
          ????try?{
          ????????//?省略自旋獲取鎖代碼????????
          ????}?finally?{
          ????????if?(failed)
          ????????????//?如果線程自旋中因?yàn)楫惓5仍颢@取鎖最終失敗,則調(diào)用此方法
          ????????????cancelAcquire(node);
          ????}
          }

          看最后一個(gè) cancelAcquire 方法,如果線程自旋中因?yàn)楫惓5仍颢@取鎖最終失敗,則會調(diào)用此方法

          private?void?cancelAcquire(Node?node)?{
          ????//?如果節(jié)點(diǎn)為空,直接返回
          ????if?(node?==?null)
          ????????return;
          ????//?由于線程要被取消了,所以將?thread?線程清掉
          ????node.thread?=?null;

          ????//?下面這步表示將?node?的?pre?指向之前第一個(gè)非取消狀態(tài)的結(jié)點(diǎn)(即跳過所有取消狀態(tài)的結(jié)點(diǎn)),waitStatus?>?0?表示當(dāng)前結(jié)點(diǎn)狀態(tài)為取消狀態(tài)
          ????Node?pred?=?node.prev;
          ????while?(pred.waitStatus?>?0)
          ????????node.prev?=?pred?=?pred.prev;

          ????//?獲取經(jīng)過過濾后的?pre?的?next?結(jié)點(diǎn),這一步主要用在后面的?CAS?設(shè)置?pre?的?next?節(jié)點(diǎn)上
          ????Node?predNext?=?pred.next;
          ????//?將當(dāng)前結(jié)點(diǎn)設(shè)置為取消狀態(tài)
          ????node.waitStatus?=?Node.CANCELLED;

          ????//?如果當(dāng)前取消結(jié)點(diǎn)為尾結(jié)點(diǎn),使用?CAS?則將尾結(jié)點(diǎn)設(shè)置為其前驅(qū)節(jié)點(diǎn),如果設(shè)置成功,則尾結(jié)點(diǎn)的?next?指針設(shè)置為空
          ????if?(node?==?tail?&&?compareAndSetTail(node,?pred))?{
          ????????compareAndSetNext(pred,?predNext,?null);
          ????}?else?{
          ????//?這一步看得有點(diǎn)繞,我們想想,如果當(dāng)前節(jié)點(diǎn)取消了,那是不是要把當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn),但是我們之前也說了,要喚醒或阻塞結(jié)點(diǎn),須在其前驅(qū)節(jié)點(diǎn)的狀態(tài)為 SIGNAL 的條件才能操作,所以在設(shè)置 pre 的 next 節(jié)點(diǎn)時(shí)要保證 pre 結(jié)點(diǎn)的狀態(tài)為 SIGNAL,想通了這一點(diǎn)相信你不難理解以下代碼。
          ????????int?ws;
          ????????if?(pred?!=?head?&&
          ????????????((ws?=?pred.waitStatus)?==?Node.SIGNAL?||
          ?????????????(ws?<=?0?&&?compareAndSetWaitStatus(pred,?ws,?Node.SIGNAL)))?&&
          ????????????pred.thread?!=?null)?{
          ????????????Node?next?=?node.next;
          ????????????if?(next?!=?null?&&?next.waitStatus?<=?0)
          ????????????????compareAndSetNext(pred,?predNext,?next);
          ????????}?else?{
          ????????//?如果?pre?為?head,或者??pre?的狀態(tài)設(shè)置?SIGNAL?失敗,則直接喚醒后繼結(jié)點(diǎn)去競爭鎖,之前我們說過,?SIGNAL?的結(jié)點(diǎn)取消(或釋放鎖)后可以喚醒后繼結(jié)點(diǎn)
          ????????????unparkSuccessor(node);
          ????????}
          ????????node.next?=?node;?//?help?GC
          ????}
          }

          這一段代碼有點(diǎn)繞,我們一個(gè)個(gè)來看,首先考慮以下情況

          1、首先第一步當(dāng)前節(jié)點(diǎn)之前有取消結(jié)點(diǎn)時(shí),則邏輯如下

          2、如果當(dāng)前結(jié)點(diǎn)既非頭結(jié)點(diǎn)的后繼結(jié)點(diǎn),也非尾結(jié)點(diǎn),即步驟 1 所示,則最終結(jié)果如下

          這里肯定有人問,這種情況下當(dāng)前節(jié)點(diǎn)與它的前驅(qū)結(jié)點(diǎn)無法被 GC 啊,還記得我們上文分析鎖自旋時(shí)的處理嗎,再看下以下代碼

          private?static?boolean?shouldParkAfterFailedAcquire(Node?pred,?Node?node)?{
          ????int?ws?=?pred.waitStatus;
          ????//?省略無關(guān)代碼
          ????if?(ws?>?0)?{
          ????????//?移除取消狀態(tài)的結(jié)點(diǎn)
          ????????do?{
          ????????????node.prev?=?pred?=?pred.prev;
          ????????}?while?(pred.waitStatus?>?0);
          ????????pred.next?=?node;
          ????}?
          ????return?false;
          }

          這段代碼會將 node 的 pre 指向之前 waitStatus 為非 CANCEL 的節(jié)點(diǎn)

          所以當(dāng) T4 執(zhí)行這段代碼時(shí),會變成如下情況

          可以看到此時(shí)中間的兩個(gè) CANCEL 節(jié)點(diǎn)不可達(dá)了,會被 GC

          3、如果當(dāng)前結(jié)點(diǎn)為 tail 結(jié)點(diǎn),則結(jié)果如下,這種情況下當(dāng)前結(jié)點(diǎn)不可達(dá),會被 GC

          4、如果當(dāng)前結(jié)點(diǎn)為 head 的后繼結(jié)點(diǎn)時(shí),如下


          結(jié)果中的 CANCEL 結(jié)點(diǎn)同樣會在 tail 結(jié)點(diǎn)自旋調(diào)用 shouldParkAfterFailedAcquire 后不可達(dá),如下

          自此我們終于分析完了鎖的獲取流程,接下來我們來看看鎖是如何釋放的。

          鎖釋放

          不管是公平鎖還是非公平鎖,最終都是調(diào)的 AQS 的如下模板方法來釋放鎖

          //?java.util.concurrent.locks.AbstractQueuedSynchronizer

          public?final?boolean?release(int?arg)?{
          ????//?鎖釋放是否成功
          ????if?(tryRelease(arg))?{
          ????????Node?h?=?head;
          ????????if?(h?!=?null?&&?h.waitStatus?!=?0)
          ????????????unparkSuccessor(h);
          ????????return?true;
          ????}
          ????return?false;
          }

          tryRelease 方法定義在了 AQS 的子類 Sync 方法里

          //?java.util.concurrent.locks.ReentrantLock.Sync

          protected?final?boolean?tryRelease(int?releases)?{
          ????int?c?=?getState()?-?releases;
          ????//?只有持有鎖的線程才能釋放鎖,所以如果當(dāng)前鎖不是持有鎖的線程,則拋異常
          ????if?(Thread.currentThread()?!=?getExclusiveOwnerThread())
          ????????throw?new?IllegalMonitorStateException();
          ????boolean?free?=?false;
          ????//?說明線程持有的鎖全部釋放了,需要釋放?exclusiveOwnerThread?的持有線程
          ????if?(c?==?0)?{
          ????????free?=?true;
          ????????setExclusiveOwnerThread(null);
          ????}
          ????setState(c);
          ????return?free;
          }

          鎖釋放成功后該干嘛,顯然是喚醒之后 head 之后節(jié)點(diǎn),讓它來競爭鎖

          //?java.util.concurrent.locks.AbstractQueuedSynchronizer

          public?final?boolean?release(int?arg)?{
          ????//?鎖釋放是否成功
          ????if?(tryRelease(arg))?{
          ????????Node?h?=?head;
          ????????if?(h?!=?null?&&?h.waitStatus?!=?0)
          ????????????//?鎖釋放成功后,喚醒?head?之后的節(jié)點(diǎn),讓它來競爭鎖
          ????????????unparkSuccessor(h);
          ????????return?true;
          ????}
          ????return?false;
          }

          這里釋放鎖的條件為啥是 h != null && h.waitStatus != 0 呢。

          1. 如果 h == null, 這有兩種可能,一種是一個(gè)線程在競爭鎖,現(xiàn)在它釋放了,當(dāng)然沒有所謂的喚醒后繼節(jié)點(diǎn),一種是其他線程正在運(yùn)行競爭鎖,只是還未初始化頭節(jié)點(diǎn),既然其他線程正在運(yùn)行,也就無需執(zhí)行喚醒操作

          2. 如果 h != null 且 h.waitStatus == 0,說明 head 的后繼節(jié)點(diǎn)正在自旋競爭鎖,也就是說線程是運(yùn)行狀態(tài)的,無需喚醒。

          3. 如果 h != null 且 h.waitStatus < 0, 此時(shí) waitStatus 值可能為 SIGNAL,或 PROPAGATE,這兩種情況說明后繼結(jié)點(diǎn)阻塞需要被喚醒

          來看一下喚醒方法 unparkSuccessor:

          private?void?unparkSuccessor(Node?node)?{
          ????//?獲取?head?的?waitStatus(假設(shè)其為?SIGNAL),并用?CAS?將其置為?0,為啥要做這一步呢,之前我們分析過多次,其實(shí)?waitStatus?=?SIGNAL(
          ????int?ws?=?node.waitStatus;
          ????if?(ws?0
          )
          ????????compareAndSetWaitStatus(node,?ws,?0);

          ????//?以下操作為獲取隊(duì)列第一個(gè)非取消狀態(tài)的結(jié)點(diǎn),并將其喚醒
          ????Node?s?=?node.next;
          ????//?s?狀態(tài)為非空,或者其為取消狀態(tài),說明?s?是無效節(jié)點(diǎn),此時(shí)需要執(zhí)行?if?里的邏輯
          ????if?(s?==?null?||?s.waitStatus?>?0)?{
          ????????s?=?null;
          ????????//?以下操作為從尾向前獲取最后一個(gè)非取消狀態(tài)的結(jié)點(diǎn)
          ????????for?(Node?t?=?tail;?t?!=?null?&&?t?!=?node;?t?=?t.prev)
          ????????????if?(t.waitStatus?<=?0)
          ????????????????s?=?t;
          ????}
          ????if?(s?!=?null)
          ????????LockSupport.unpark(s.thread);
          }

          這里的尋找隊(duì)列的第一個(gè)非取消狀態(tài)的節(jié)點(diǎn)為啥要從后往前找呢,因?yàn)楣?jié)點(diǎn)入隊(duì)并不是原子操作,如下

          線程自旋時(shí)時(shí)是先執(zhí)行 node.pre = pred, 然后再執(zhí)行 pred.next = node,如果 unparkSuccessor 剛好在這兩者之間執(zhí)行,此時(shí)是找不到 ?head 的后繼節(jié)點(diǎn)的,如下



          如何利用 AQS 自定義一個(gè)互斥鎖

          AQS 通過提供 state 及 FIFO 隊(duì)列的管理,為我們提供了一套通用的實(shí)現(xiàn)鎖的底層方法,基本上定義一個(gè)鎖,都是轉(zhuǎn)為在其內(nèi)部定義 AQS 的子類,調(diào)用 AQS 的底層方法來實(shí)現(xiàn)的,由于 AQS 在底層已經(jīng)為了定義好了這些獲取 state 及 FIFO 隊(duì)列的管理工作,我們要實(shí)現(xiàn)一個(gè)鎖就比較簡單了,我們可以基于 AQS 來實(shí)現(xiàn)一個(gè)非可重入的互斥鎖,如下所示

          public?class?Mutex??{

          ????private?Sync?sync?=?new?Sync();
          ????
          ????public?void?lock?()?{
          ????????sync.acquire(1);
          ????}
          ????
          ????public?void?unlock?()?{
          ????????sync.release(1);
          ????}

          ????private?static?class?Sync?extends?AbstractQueuedSynchronizer?{
          ????????@Override
          ????????protected?boolean?tryAcquire?(int?arg)?{
          ????????????return?compareAndSetState(0,?1);
          ????????}

          ????????@Override
          ????????protected?boolean?tryRelease?(int?arg)?{
          ????????????setState(0);
          ????????????return?true;
          ????????}

          ????????@Override
          ????????protected?boolean?isHeldExclusively?()?{
          ????????????return?getState()?==?1;
          ????????}
          ????}
          }

          可以看到區(qū)區(qū)幾行代碼就實(shí)現(xiàn)了,確實(shí)很方便。

          ?

          總結(jié)

          本文通過圖文并茂的方式幫助大家梳理了一遍 AQS 的實(shí)現(xiàn)方式,相信大家看完對 AQS 應(yīng)該有了比較深入的認(rèn)識,首先要明白鎖的實(shí)現(xiàn)原理,信號量及管程,理解了管程的設(shè)計(jì)思路對 AQS 有了一個(gè)概念上的認(rèn)識,再去讀源碼就會用管程的概念去套,也就更容易理解了,另外大家可以多類比一下生活中的場景,如就醫(yī)場景,通過類似的方式學(xué)習(xí)能讓我們更好地理解相關(guān)技術(shù)的設(shè)計(jì)思路。

          巨人的肩膀
          • 【操作系統(tǒng)】生產(chǎn)者消費(fèi)者問題 https://blog.csdn.net/liushall/article/details/81569609
          • https://time.geekbang.org/column/article/86089
          • https://www.cnblogs.com/binarylei/p/12544002.html
          • https://zhuanlan.zhihu.com/p/150573153
          • https://blog.csdn.net/anlian523/article/details/106448512
          • https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html


          有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)

          歡迎大家關(guān)注Java之道公眾號


          好文章,我在看??

          瀏覽 68
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黄色一及片自拍 | 国产精品高潮无套内谢 | 五月色婷婷国产 | 国产超碰在线 | 极品漂亮人妻找猛男3p |