<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          StampedLock——比讀寫鎖更快的鎖

          共 2389字,需瀏覽 5分鐘

           ·

          2020-12-28 09:28

          ?寫給自己看,說給別人聽。你好,這是think123的第84篇原創(chuàng)文章



          簡介

          ReentrantReadWriteLock支持讀寫鎖,StampedLock支持寫鎖、悲觀鎖讀和樂觀讀(無鎖)。

          其中寫鎖、悲觀讀鎖的語義和ReentrantReadWriteLock中的寫鎖、讀鎖語義一樣,都是允許多個線程同時獲取悲觀鎖讀,但是只允許一個線程獲取寫鎖,寫鎖和悲觀讀鎖是互斥的。

          不同的是:StampedLock 里的寫鎖和悲觀讀鎖加鎖成功之后,都會返回一個 stamp;然后解鎖的時候,需要傳入這個 stamp。

          以下為官方使用例子

          public?class?Point?{

          ??private?final?StampedLock?sl?=?new?StampedLock();
          ??private?double?x,?y;

          ??void?move(double?deltaX,?double?deltaY)?{
          ????long?stamp?=?sl.writeLock();
          ????try?{
          ??????x?+=?deltaX;
          ??????y?+=?deltaY;
          ????}?finally?{
          ??????sl.unlockWrite(stamp);
          ????}
          ??}

          ??double?distanceFromOrigin()?{
          ????long?stamp?=?sl.tryOptimisticRead();
          ????double?currentX?=?x,?currentY?=?y;
          ????if?(!sl.validate(stamp))?{
          ????????stamp?=?sl.readLock();
          ????????try?{
          ??????????currentX?=?x;
          ??????????currentY?=?y;
          ????????}?finally?{
          ??????????sl.unlockRead(stamp);
          ????????}
          ????}
          ????//?如果處理業(yè)務需要保持互斥,那么就用互斥鎖,如果不需要保持互斥才可以
          ????//?用讀寫鎖。一般來講緩存是不需要保持互斥性的,能接受瞬間的不一致
          ????return?Math.sqrt(currentX?*?currentX?+?currentY?*?currentY);
          ??}

          ??//?StampedLock?支持鎖的降級(通過?tryConvertToReadLock()方法)
          ??//?升級(通過tryConvertToWriteLock()方法)
          ??void?moveIfAtOrigin(double?newX,?double?newY)?{
          ????long?stamp?=?sl.readLock();
          ????try?{
          ??????while?(x?==?0.0?&&?y?==?0.0)?{
          ????????long?ws?=?sl.tryConvertToWriteLock(stamp);
          ????????if?(ws?!=?0L)?{
          ??????????stamp?=?ws;
          ??????????x?=?newX;
          ??????????y?=?newY;
          ??????????break;
          ????????}?else?{
          ??????????sl.unlockRead(stamp);
          ??????????stamp?=?sl.writeLock();
          ????????}
          ??????}
          ????}?finally?{
          ??????sl.unlock(stamp);
          ????}
          ??}
          }

          主要數(shù)據結構

          雖然StampedLock沒有使用AQS,不過它的數(shù)據結構中還是用到了CLH隊列。

          WNode是CLH隊列的節(jié)點,其源碼如下


          ??static?final?class?WNode?{
          ????//?前驅節(jié)點
          ????volatile?WNode?prev;

          ????//?后繼節(jié)點
          ????volatile?WNode?next;

          ????//?獲取讀鎖的列表
          ????volatile?WNode?cowait;

          ????//?線程
          ????volatile?Thread?thread;

          ????//?節(jié)點狀態(tài)。0,?WAITING,?or?CANCELLED
          ????volatile?int?status;
          ????
          ????//?讀模式或者寫模式。RMODE?or?WMODE
          ????final?int?mode;??
          ????WNode(int?m,?WNode?p)?{?mode?=?m;?prev?=?p;?}
          ??}

          StampedLock中其中重要屬性如下

          ?//CLH隊列頭結點
          ??private?transient?volatile?WNode?whead;

          ??//?CLH隊列尾節(jié)點
          ??private?transient?volatile?WNode?wtail;

          ??//?鎖狀態(tài)
          ??private?transient?volatile?long?state;
          ??
          ??//?讀鎖次數(shù)的額外計數(shù)器
          ??private?transient?int?readerOverflow;

          ??//?讀鎖的位數(shù)
          ??private?static?final?int?LG_READERS?=?7;

          ??//?計算state值常量
          ??private?static?final?long?RUNIT?=?1L;
          ??//?寫鎖標志位,十進制:128?二進制:1000?0000
          ??private?static?final?long?WBIT??=?1L?<
          ??//?讀狀態(tài)標志位。?十進制:127??二進制:?0111?1111
          ??private?static?final?long?RBITS?=?WBIT?-?1L;

          ??//?state狀態(tài)中記錄讀鎖快滿了的值,126
          ??private?static?final?long?RFULL?=?RBITS?-?1L;

          ??//?用來獲取讀寫狀態(tài)。?十進制:255?二進制:1111?1111
          ??private?static?final?long?ABITS?=?RBITS?|?WBIT;

          ??//?-128?(1....1?1000?0000)
          ??private?static?final?long?SBITS?=?~RBITS;

          ??//?狀態(tài)state的初始值?256?二進制:?00001?0000?0000
          ??private?static?final?long?ORIGIN?=?WBIT?<1;


          ??//?鎖的狀態(tài)。使用state來控制當前是讀鎖,還是寫鎖
          ??private?transient?volatile?long?state;
          ??
          ??//?額外的讀鎖計數(shù)器
          ??private?transient?int?readerOverflow;

          ??//?state初始值為256
          ??public?StampedLock()?{
          ??????state?=?ORIGIN;
          ??}

          看過我之前分析AQS源碼的應該對這個CLH很熟悉了。同樣的StampedLock也是通過這個stae變量來進行讀寫鎖判斷的。這個state承載了三個部分的內容

          狀態(tài)位

          StampedLock中state是long類型,占64位,它被劃為了三個部分來使用。低7位作為讀鎖標志位,可以由多個線程共享,每有一個線程申請讀鎖成功,低7位就加1。第8位是寫鎖位,由線程獨占。 其余位是stamp位,用來記錄寫鎖狀態(tài)的變化(版本號),每使用一次寫鎖,stamp位就會加1。

          同時如果讀鎖數(shù)量超過了126之后,超出的次數(shù)使用readerOverflow來進行計數(shù)。

          當出現(xiàn)并發(fā)的情況的時候,CLH隊列的排隊情況是怎樣的呢?

          比如,線程w1獲取了寫鎖,一直未釋放。此時有4個線程分別獲取讀鎖(獲取順序是R0-->R1-->R2-->R3),又有線程W2獲取寫鎖,最后還有R4,R5,R6三個線程獲取讀鎖,那么此時隊列的排隊情況如下

          CLH

          因為讀鎖是可以被多個線程獲取的,如果同一時間有多個線程來獲取讀鎖卻獲取不到時,這個時候第一個獲取讀鎖的線程會被加入到鏈表中,而后面的的讀線程會被加入到cowait棧中, 可以認為cowait是一條副鏈。

          這里的cowait可以理解為協(xié)同等待,表示將這些獲取讀鎖的線程作為一個整體來獲取鎖。

          獲取樂觀讀鎖

          public?long?tryOptimisticRead()?{
          ????long?s;
          ????return?(((s?=?state)?&?WBIT)?==?0L)???(s?&?SBITS)?:?0L;
          }

          樂觀讀鎖邏輯比較簡單,就是判斷寫鎖是否被占用(判斷state第8位的值是否為1),如果寫鎖被占用則返回0,否則返回stamp位。

          state初始值為256(1?0000?0000)

          ??01000?0000(WBIT)
          &?10000?0000(state)
          ==============
          ??00000?0000

          ?1?0000?0000(state)
          ?1?1000?0000(SBITS)
          ?==============
          ?1?0000?0000

          檢測樂觀讀鎖

          public?boolean?validate(long?stamp)?{

          ??//?通過UNSafe插入內存屏障,禁止重排序
          ??U.loadFence();
          ??return?(stamp?&?SBITS)?==?(state?&?SBITS);
          }

          返回true: 表示期間沒有寫鎖,不關心讀鎖。

          返回false: 表示期間有寫鎖發(fā)生

          SBITS為-128,用二進制表示是:1111 1111 1111 1000 0000

          x?xxxx?xxxx(stamp)
          1?1000?0000

          1?yyyy?yyyy(state)
          1?1000?0000

          SBITS后7位都是0,也就是不關心讀鎖,我們只關心stamp位和寫位。

          當我們獲取樂觀讀時,如果此時已經有了寫鎖,那么返回stamp值為0,此時進行驗證肯定為false。

          獲取普通讀鎖

          public?long?readLock()?{
          ??long?s?=?state,?next;
          ??//?whead?==?wtail時,隊列為空,表示當前沒有線程在排隊
          ??return?((whead?==?wtail?&&?(s?&?ABITS)????????????U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?RUNIT))??
          ???????????//?acquireRead的第一個參數(shù)為false,標識不處理線程中斷
          ??????????next?:?acquireRead(false,?0L));
          }

          當獲取讀鎖成功時,會將state值加1。當條件不滿足時(隊列不為空,CAS失敗,或者讀鎖的個數(shù)已經大于等于126),都會進入到acquireRead()方法,這個方法主要分為兩個大的for循環(huán),代碼比較長我就不貼出來了。

          它的主要邏輯如下:

          1. 如果當前隊列沒有排隊的線程,該線程是第一個準備排隊的元素,就有很大機會獲取到鎖,所以會先自旋嘗試獲取到鎖(自旋次數(shù)為64),如果獲取到了將state值加1,如果讀鎖次數(shù)已經超出了126,則使用readerOverflow記錄超出的讀鎖數(shù)目。如果未獲取到鎖,則進入第2步
          ?private?long?tryIncReaderOverflow(long?s)?{
          ?
          ??//?如果讀鎖的記錄數(shù)等于126,
          ??if?((s?&?ABITS)?==?RFULL)?{
          ??????//?將state的值加1
          ??????//?CAS之后state的后七位的值是127,state的整個值是383(1?0111?1111)。
          ??????if?(U.compareAndSwapLong(this,?STATE,?s,?s?|?RBITS))?{
          ??????????//?將readOverflow的值加1
          ??????????++readerOverflow;
          ??????????state?=?s;
          ??????????return?s;
          ??????}
          ??}
          ??else?if?((LockSupport.nextSecondarySeed()?&
          ????????????OVERFLOW_YIELD_RATE)?==?0)
          ??????Thread.yield();
          ??return?0L;
          }
          1. 初始化入隊排隊節(jié)點,形成鏈表關系。和AQS一樣的是head節(jié)點被當成哨兵節(jié)點或者正持有鎖正在運行的線程(虛擬)。所以head節(jié)點的thread為null,mode的值為WMODE。這樣會形成head --> node <-- tail這樣的節(jié)點關系

          2. 如果又有新的線程獲取讀鎖失敗,會將新的線程加入到cowait棧中

          3. 對于不是第一個獲取讀鎖的線程,它會先加入到cowait這個棧中,然后再通過CAS獲取鎖,如果獲取失敗就被阻塞,等待被喚醒或者由于超時中斷被取消。

          4. 會對第一個獲取讀鎖的線程進行優(yōu)待,因為它前面沒有在排隊的線程,所以這一層還是在自旋獲取鎖,只是自旋次數(shù)再增加,首先會自旋1024次獲得鎖,如果還未獲取到,在自旋2048次。 如果還未等到鎖釋放,就阻塞當前線程,等待被喚醒,直到獲得鎖或者超時中斷被取消。

          第1-4步屬于一個大的循環(huán),第5步驟屬于另外一個大的循環(huán)。

          同時當獲取讀鎖線程被喚醒獲取到鎖后,它同時也會喚醒掛在它身上的cowait棧中的線程。

          從分析中可以看到StampedLock中通過大量的自旋操作可以一定程度避免線程阻塞,只要線程執(zhí)行操作夠快,釋放鎖比較及時,可以說幾乎不會存在阻塞。

          釋放讀鎖

          釋放讀鎖的代碼比較簡單,主要操作如下

          1. 首先判斷stamp是否合法,如果不合法則拋出IllegalMonitorStateException異常
          2. 如果讀鎖個數(shù)小于126,則通過CAS將state值減去1,如果釋放的是最后一個讀鎖,則需要喚醒隊列中的節(jié)點。
          3. 如果讀鎖個數(shù)已經溢出了,則將readerOverflow減去1
          public?void?unlockRead(long?stamp)?{
          ??long?s,?m;?WNode?h;
          ??//?自旋
          ??for?(;;)?{
          ??????//?判斷stamp是否合法
          ??????if?(((s?=?state)?&?SBITS)?!=?(stamp?&?SBITS)?||
          ??????????(stamp?&?ABITS)?==?0L?||?(m?=?s?&?ABITS)?==?0L?||?m?==?WBIT)
          ??????????throw?new?IllegalMonitorStateException();
          ??????//?讀鎖個數(shù)小于126
          ??????if?(m???????????if?(U.compareAndSwapLong(this,?STATE,?s,?s?-?RUNIT))?{
          ??????????????//?釋放最后一個讀鎖時,需要喚醒下一個節(jié)點
          ??????????????if?(m?==?RUNIT?&&?(h?=?whead)?!=?null?&&?h.status?!=?0)
          ??????????????????release(h);
          ??????????????break;
          ??????????}
          ??????}
          ??????//?讀鎖個數(shù)飽和溢出,需要減少readerOverflow
          ??????else?if?(tryDecReaderOverflow(s)?!=?0L)
          ??????????break;
          ??}
          }

          獲取寫鎖

          public?long?writeLock()?{
          ??long?s,?next;
          ??return?((((s?=?state)?&?ABITS)?==?0L?&&
          ???????????U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?WBIT))??
          ??????????next?:?acquireWrite(false,?0L));
          }

          如果state中的第8位等于0并且CAS設置state值成功,則獲取鎖成功,否則進入到acquireWrite方法。

          acquireWrite的邏輯也分為兩部分,分別是兩個for循環(huán)。

          第一個for循環(huán)邏輯如下

          ?for?(int?spins?=?-1;;)?{?//?spin?while?enqueuing
          ????long?m,?s,?ns;

          ????//?如果當前寫鎖標志位仍然為0,即沒有其他線程獲取到寫鎖
          ????if?((m?=?(s?=?state)?&?ABITS)?==?0L)?{
          ????????//?CAS將state的值加128,實質是將寫鎖狀態(tài)位加1
          ????????if?(U.compareAndSwapLong(this,?STATE,?s,?ns?=?s?+?WBIT))
          ????????????return?ns;
          ????}
          ????//?其他線程獲取到了寫鎖,則設定自旋次數(shù)為64次
          ????else?if?(spins?0)
          ????????spins?=?(m?==?WBIT?&&?wtail?==?whead)???SPINS?:?0;
          ????else?if?(spins?>?0)?{
          ????????//?隨機減少自旋次數(shù)
          ????????if?(LockSupport.nextSecondarySeed()?>=?0)
          ????????????--spins;
          ????}
          ????//?自旋仍未獲取到寫鎖則執(zhí)行下面的邏輯
          ????//?初始化CLH隊列(主要是whead,wtail節(jié)點)
          ????//?p指向了wtail節(jié)點
          ????else?if?((p?=?wtail)?==?null)?{
          ????????WNode?hd?=?new?WNode(WMODE,?null);
          ????????if?(U.compareAndSwapObject(this,?WHEAD,?null,?hd))
          ????????????wtail?=?hd;
          ????}
          ????//?初始化節(jié)點
          ????else?if?(node?==?null)
          ????????node?=?new?WNode(WMODE,?p);

          ????//?最終形成?whead?-->?node?<--?wtail
          ????else?if?(node.prev?!=?p)
          ????????node.prev?=?p;
          ????else?if?(U.compareAndSwapObject(this,?WTAIL,?p,?node))?{
          ????????p.next?=?node;
          ????????//?形成鏈表關系之后自旋結束
          ????????break;
          ????}
          }

          第二個for循環(huán)代碼也比較長,我就不貼出來了。它的核心邏輯如下

          1. 如果仍然只有當前線程在等待鎖,則會先自旋1024次去獲取寫鎖,如果獲取失敗,則在自旋2048次再次去獲取。如果都獲取失敗,則進入第二步。

          2. 阻塞當前線程等待被喚醒

          釋放寫鎖

          public?void?unlockWrite(long?stamp)?{
          ??WNode?h;
          ??//?驗證stamp是否合法
          ??if?(state?!=?stamp?||?(stamp?&?WBIT)?==?0L)
          ????throw?new?IllegalMonitorStateException();

          ??//?釋放寫鎖,stamp位(版本號)加1
          ??//?stamp+WBIT會將state的第8位置為0,就相當于釋放了寫鎖
          ??state?=?(stamp?+=?WBIT)?==?0L???ORIGIN?:?stamp;

          ??//?如果頭結點不為空,并且狀態(tài)不為0,,則調用release方法喚醒下一個節(jié)點
          ??if?((h?=?whead)?!=?null?&&?h.status?!=?0)
          ??????release(h);
          }
          private?void?release(WNode?h)?{
          ??if?(h?!=?null)?{
          ????WNode?q;?Thread?w;
          ????U.compareAndSwapInt(h,?WSTATUS,?WAITING,?0);
          ????//?如果頭結點的下一個節(jié)點為空或者其狀態(tài)為已取消
          ????if?((q?=?h.next)?==?null?||?q.status?==?CANCELLED)?{
          ????????//?從尾結點向前遍歷找到可用節(jié)點
          ????????for?(WNode?t?=?wtail;?t?!=?null?&&?t?!=?h;?t?=?t.prev)
          ????????????if?(t.status?<=?0)
          ????????????????q?=?t;
          ????}
          ????//?喚醒q節(jié)點所在的線程
          ????if?(q?!=?null?&&?(w?=?q.thread)?!=?null)
          ????????U.unpark(w);
          ??}
          }

          釋放寫鎖過程總結如下

          1. 將state的第8位置為0(釋放寫鎖),并將stamp位(version)加1
          2. 喚醒下一個節(jié)點

          鎖的降級(tryConvertToReadLock)

          tryConvertToReadLock方法可以用于鎖的降級。不過并不是只有再獲取讀鎖時才能調用該方法。

          public?long?tryConvertToReadLock(long?stamp)?{

          ??//?a為鎖標識,m則是最新的鎖標識
          ??long?a?=?stamp?&?ABITS,?m,?s,?next;?WNode?h;

          ??//?state的寫鎖標志位和版本號一致(有可能寫鎖標志位是0,即可能是讀鎖)
          ??while?(((s?=?state)?&?SBITS)?==?(stamp?&?SBITS))?{

          ??????//?還未添加任何鎖標識
          ??????if?((m?=?s?&?ABITS)?==?0L)?{
          ??????????if?(a?!=?0L)
          ??????????????break;
          ??????????//?讀鎖次數(shù)小于126
          ??????????else?if?(m???????????????//?將state的值加1
          ??????????????if?(U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?RUNIT))
          ??????????????????return?next;
          ??????????}
          ??????????//?讀鎖次數(shù)已經超出了,使用額外字段記錄
          ??????????else?if?((next?=?tryIncReaderOverflow(s))?!=?0L)
          ??????????????return?next;
          ??????}
          ??????//?當前是寫鎖狀態(tài)
          ??????else?if?(m?==?WBIT)?{
          ??????????if?(a?!=?m)
          ??????????????break;
          ??????????//?將讀鎖次數(shù)加1,寫鎖標志置為0,stamp位加1。即加129
          ??????????state?=?next?=?s?+?(WBIT?+?RUNIT);
          ??????????//?釋放鎖
          ??????????if?((h?=?whead)?!=?null?&&?h.status?!=?0)
          ??????????????release(h);
          ??????????return?next;
          ??????}
          ??????//?已經是讀鎖了直接返回
          ??????else?if?(a?!=?0L?&&?a???????????return?stamp;
          ??????else
          ??????????break;
          ??}
          ??//?轉換失敗
          ??return?0L;
          }

          它的主要邏輯如下,如果當前線程還未加任何鎖,則加上寫鎖并返回最新的stamp值。如果當前線程已經是寫鎖,則釋放寫鎖,并更新state的值(讀鎖加1,寫鎖狀態(tài)為置為0,version加1)。 如果當前線程是讀鎖則直接返回stamp值。如果上面條件都不滿足,則轉換失敗。

          鎖的升級(tryConvertToWriteLock)

          public?long?tryConvertToWriteLock(long?stamp)?{
          ??long?a?=?stamp?&?ABITS,?m,?s,?next;

          ??//?state的寫鎖標志位和版本號一致(有可能寫鎖標志位是0,即可能是讀鎖)
          ??while?(((s?=?state)?&?SBITS)?==?(stamp?&?SBITS))?{

          ??????//?還未添加任何鎖標識
          ??????if?((m?=?s?&?ABITS)?==?0L)?{
          ??????????if?(a?!=?0L)
          ??????????????break;
          ??????????//?將寫鎖狀態(tài)位置為0
          ??????????if?(U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?WBIT))
          ??????????????return?next;
          ??????}
          ??????//?如果當前已經是寫鎖狀態(tài),則直接返回
          ??????else?if?(m?==?WBIT)?{
          ??????????if?(a?!=?m)
          ??????????????break;
          ??????????return?stamp;
          ??????}
          ??????//?如果當前是唯一讀鎖,則轉換為寫鎖
          ??????else?if?(m?==?RUNIT?&&?a?!=?0L)?{
          ??????????if?(U.compareAndSwapLong(this,?STATE,?s,
          ???????????????????????????????????next?=?s?-?RUNIT?+?WBIT))
          ??????????????return?next;
          ??????}
          ??????else
          ??????????break;
          ??}
          ??return?0L;
          }

          鎖的升級和鎖的降級的邏輯類似,這里就不再讀過介紹了。

          總結以及注意

          • StampedLock沒有使用AQS,而是依靠自己實現(xiàn)的同步狀態(tài)(long state占64位)和變異的CLH隊列

          • StampedLock使用state的低7位標識讀鎖數(shù)量(超出126的使用readerOverflow字段記錄),第8位標識寫鎖,高56位記錄鎖的版本,每次釋放/獲取寫鎖版本號都會加1

          • StampedLock中讀鎖和讀鎖不阻塞,讀鎖寫鎖相互阻塞,寫鎖和寫鎖也相互阻塞

          • StampedLock的連續(xù)多個讀鎖線程,只有第一個是在CLH隊列中,后面的會掛在第一個線程的cowait棧中

          • StampedLock喚醒第一個讀線程后,讀線程會喚醒它cowait棧的所有讀線程(acquireRead()方法中)

          • StampedLock不支持公平鎖,也不支持Condition

          • StampedLock支持鎖的降級和鎖的升級

          • StampedLock中的樂觀讀操作是無鎖的

          • StampedLock中使用了大量的自旋+CAS操作,適合持有鎖時間比較短的任務,持有鎖時間長的話不僅會浪費CPU而且仍然會阻塞自己。

          • 線程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上時,此時調用該阻塞線程的 interrupt() 方法,會導致 CPU 飆升。所以,使用 StampedLock 一定不要調用中斷操作,如果需要支持中斷功能,一定使用可中斷的悲觀讀鎖 readLockInterruptibly() 和寫鎖 writeLockInterruptibly()



          瀏覽 38
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  豆花视频一区二区三区在线观看 | 超碰国内自拍 | 丰满肥臀无码一区二区三区 | 成人a一级毛片免费看 | 香蕉大伊人 |