Java8為什么要新增StampedLock票據(jù)鎖(郵戳鎖)
如下圖所示:

我們首先看Stampedlock有哪些屬性先,源碼如下:
private?static?final?long?serialVersionUID?=?-6001602636862214147L;
/**?獲取服務(wù)器CPU核數(shù)?*/
private?static?final?int?NCPU?=?Runtime.getRuntime().availableProcessors();
/**?線程入隊列前自旋次數(shù)?*/
private?static?final?int?SPINS?=?(NCPU?>?1)???1?<<?6?:?0;
/**?隊列頭結(jié)點自旋獲取鎖最大失敗次數(shù)后再次進(jìn)入隊列?*/
private?static?final?int?HEAD_SPINS?=?(NCPU?>?1)???1?<<?10?:?0;
/**?重新阻塞前的最大重試次數(shù)?*/
private?static?final?int?MAX_HEAD_SPINS?=?(NCPU?>?1)???1?<<?16?:?0;
/**?The?period?for?yielding?when?waiting?for?overflow?spinlock?*/
private?static?final?int?OVERFLOW_YIELD_RATE?=?7;?//?must?be?power?2?-?1
/**?溢出之前用于閱讀器計數(shù)的位數(shù)?*/
private?static?final?int?LG_READERS?=?7;
//?鎖定狀態(tài)和stamp操作的值
private?static?final?long?RUNIT?=?1L;?//?每次獲取讀鎖?進(jìn)行+1
private?static?final?long?WBIT??=?1L?<<?LG_READERS;??//寫狀態(tài)?1000?0000
private?static?final?long?RBITS?=?WBIT?-?1L;??//溢出保護(hù)?0111?1111
private?static?final?long?RFULL?=?RBITS?-?1L;?//最大讀線程數(shù)?0111?1110
private?static?final?long?ABITS?=?RBITS?|?WBIT;???//?掩碼?前8位都為1??1111?1111
private?static?final?long?SBITS?=?~RBITS;?//?掩碼?1...?1000?0000
//鎖state初始值,第9位為1,避免算術(shù)時和0沖突
private?static?final?long?ORIGIN?=?WBIT?<<?1;
//?來自取消獲取方法的特殊值,因此調(diào)用者可以拋出IE
private?static?final?long?INTERRUPTED?=?1L;
//?WNode節(jié)點的status值
private?static?final?int?WAITING???=?-1;
private?static?final?int?CANCELLED?=??1;
//?WNode節(jié)點的讀寫模式
private?static?final?int?RMODE?=?0;
private?static?final?int?WMODE?=?1;
/**?Wait?nodes?*/
static?final?class?WNode?{
????volatile?WNode?prev;
????volatile?WNode?next;
????volatile?WNode?cowait;????//?讀模式使用該節(jié)點形成棧
????volatile?Thread?thread;???//?non-null?while?possibly?parked
????volatile?int?status;??????//?0,?WAITING,?or?CANCELLED
????final?int?mode;???????????//?RMODE?or?WMODE
????WNode(int?m,?WNode?p)?{?mode?=?m;?prev?=?p;?}
}
/**?CLH隊頭節(jié)點?*/
private?transient?volatile?WNode?whead;
/**?CLH隊尾節(jié)點?*/
private?transient?volatile?WNode?wtail;
//?views
transient?ReadLockView?readLockView;
transient?WriteLockView?writeLockView;
transient?ReadWriteLockView?readWriteLockView;
/**?鎖隊列狀態(tài),?當(dāng)處于寫模式時第8位為1,讀模式時前7為為1-126(附加的readerOverflow用于當(dāng)讀者超過126時)?*/
private?transient?volatile?long?state;
/**?將state超過?RFULL=126的值放到readerOverflow字段中?*/
private?transient?int?readerOverflow;
StampedLockd源碼中的WNote就是等待鏈表隊列,每一個WNode標(biāo)識一個等待線程,whead為CLH隊列頭,wtail為CLH隊列尾,state為鎖的狀態(tài)。long型即64位,倒數(shù)第八位標(biāo)識寫鎖狀態(tài),如果為1,標(biāo)識寫鎖占用!下面圍繞這個state來講述鎖操作。
常量標(biāo)識
3種模式可以通過檢查區(qū)分 (m = stamp & ABITS):- 寫模式: m == WBIT
- 樂觀讀模式: m == 0L (即使讀鎖已經(jīng)被持有)
- 悲觀讀模式: m > 0L && m <= RFULL (同步狀態(tài)的拷貝,,但是stamp中的 * read hold count除了用來決定是哪個模式以外不會被使用)
- RBIT =0111 1111(即127)
- SBIT =1000 0000(后7位表示當(dāng)前正在讀取的線程數(shù)量,清0)
- 寫鎖writeLock是一個獨占鎖,同時只有一個線程可以獲取該鎖,當(dāng)一個線程獲取該鎖后,其他請求讀鎖和寫鎖的線程必須等待,這跟ReentrantReadWriteLock 的寫鎖很相似,不過要注意的是StampedLock的寫鎖是不可重入鎖,
/**
?*?
?*獲取寫鎖,獲取失敗會一直阻塞,直到獲得鎖成功
?*?@return?可以用來解鎖或轉(zhuǎn)換模式的戳記(128的整數(shù))
?*/
public?long?writeLock()?{
????long?s,?next;??
????return?((((s?=?state)?&?ABITS)?==?0L?&&???//?完全沒有任何鎖(沒有讀鎖和寫鎖)的時候可以通過
?????????????U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?WBIT))???//第8位置為1
????????????next?:?acquireWrite(false,?0L));
}
writeLock():典型的cas操作,如果STATE等于s,設(shè)置寫鎖位為1(s+WBIT)。acquireWrite跟acquireRead邏輯類似,先自旋嘗試、加入等待隊列、直至最終Unsafe.park()掛起線程。
private?long?acquireWrite(boolean?interruptible,?long?deadline)?{
????WNode?node?=?null,?p;
????for?(int?spins?=?-1;;)?{?//?入隊時自旋
????????long?m,?s,?ns;
????????//無鎖
????????if?((m?=?(s?=?state)?&?ABITS)?==?0L)?{
????????????if?(U.compareAndSwapLong(this,?STATE,?s,?ns?=?s?+?WBIT))
????????????????return?ns;
????????}
????????else?if?(spins?<?0)
????????????//持有寫鎖,并且隊列為空
????????????spins?=?(m?==?WBIT?&&?wtail?==?whead)???SPINS?:?0;
????????else?if?(spins?>?0)?{
????????????//恒成立
????????????if?(LockSupport.nextSecondarySeed()?>=?0)
????????????????--spins;
????????}
????????else?if?((p?=?wtail)?==?null)?{?
????????????//初始化隊列,寫鎖入隊列
????????????WNode?hd?=?new?WNode(WMODE,?null);
????????????if?(U.compareAndSwapObject(this,?WHEAD,?null,?hd))
????????????????wtail?=?hd;
????????}
????????else?if?(node?==?null)
????????????//不為空,寫鎖入隊列
????????????node?=?new?WNode(WMODE,?p);
????????else?if?(node.prev?!=?p)
????????????node.prev?=?p;
????????else?if?(U.compareAndSwapObject(this,?WTAIL,?p,?node))?{
????????????p.next?=?node;
????????????break;//入隊列成功退出循環(huán)
????????}
????}
????for?(int?spins?=?-1;;)?{
????????WNode?h,?np,?pp;?int?ps;
????????//前驅(qū)節(jié)點為頭節(jié)點
????????if?((h?=?whead)?==?p)?{
????????????if?(spins?<?0)
????????????????spins?=?HEAD_SPINS;
????????????else?if?(spins?<?MAX_HEAD_SPINS)
????????????????spins?<<=?1;
????????????for?(int?k?=?spins;;)?{?//?spin?at?head
????????????????long?s,?ns;
????????????????//無鎖
????????????????if?(((s?=?state)?&?ABITS)?==?0L)?{
????????????????????if?(U.compareAndSwapLong(this,?STATE,?s,
?????????????????????????????????????????????ns?=?s?+?WBIT))?{
????????????????????????//當(dāng)前節(jié)點設(shè)置為頭結(jié)點
????????????????????????whead?=?node;
????????????????????????node.prev?=?null;
????????????????????????return?ns;
????????????????????}
????????????????}
????????????????else?if?(LockSupport.nextSecondarySeed()?>=?0?&&
?????????????????????????--k?<=?0)
????????????????????break;
????????????}
????????}
????????else?if?(h?!=?null)?{?//?help?release?stale?waiters
????????????WNode?c;?Thread?w;
????????????//頭結(jié)點為讀鎖將棧中所有讀鎖線程喚醒
????????????while?((c?=?h.cowait)?!=?null)?{
????????????????if?(U.compareAndSwapObject(h,?WCOWAIT,?c,?c.cowait)?&&
????????????????????(w?=?c.thread)?!=?null)
????????????????????U.unpark(w);
????????????}
????????}
????????//
????????if?(whead?==?h)?{
????????????if?((np?=?node.prev)?!=?p)?{
????????????????if?(np?!=?null)
????????????????????(p?=?np).next?=?node;???//?stale
????????????}
????????????else?if?((ps?=?p.status)?==?0)
????????????????//前驅(qū)節(jié)點置為等待狀態(tài)
????????????????U.compareAndSwapInt(p,?WSTATUS,?0,?WAITING);
????????????else?if?(ps?==?CANCELLED)?{
????????????????if?((pp?=?p.prev)?!=?null)?{
????????????????????node.prev?=?pp;
????????????????????pp.next?=?node;
????????????????}
????????????}
????????????else?{
????????????????long?time;?//?0?argument?to?park?means?no?timeout
????????????????if?(deadline?==?0L)
????????????????????time?=?0L;
????????????????else?if?((time?=?deadline?-?System.nanoTime())?<=?0L)
????????????????????return?cancelWaiter(node,?node,?false);
????????????????Thread?wt?=?Thread.currentThread();
????????????????U.putObject(wt,?PARKBLOCKER,?this);
????????????????node.thread?=?wt;
????????????????if?(p.status?<?0?&&?(p?!=?h?||?(state?&?ABITS)?!=?0L)?&&
????????????????????whead?==?h?&&?node.prev?==?p)
????????????????????U.park(false,?time);??//?emulate?LockSupport.park
????????????????node.thread?=?null;
????????????????U.putObject(wt,?PARKBLOCKER,?null);
????????????????if?(interruptible?&&?Thread.interrupted())
????????????????????return?cancelWaiter(node,?node,?true);
????????????}
????????}
????}
}
并且StampedLock還提供了非阻塞tryWriteLock方法,源碼如下:
/**
?*?沒有任何鎖時則獲取寫鎖,否則返回0
?*
?*?@return?可以用來解鎖或轉(zhuǎn)換模式的戳記(128的整數(shù)),獲取失敗返回0
?*/
public?long?tryWriteLock()?{
????long?s,?next;
????return?((((s?=?state)?&?ABITS)?==?0L?&&
?????????????U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?WBIT))??
????????????next?:?0L);
}
/**
?*?unit時間內(nèi)獲得寫鎖成功返回狀態(tài)值,失敗返回0,或拋出InterruptedException
?*?@return?0:獲得鎖失敗
?*?@throws?InterruptedException?線程獲得鎖之前調(diào)用interrupt()方法拋出的異常
?*/
public?long?tryWriteLock(long?time,?TimeUnit?unit)
????throws?InterruptedException?{
????long?nanos?=?unit.toNanos(time);
????if?(!Thread.interrupted())?{
????????long?next,?deadline;
????????if?((next?=?tryWriteLock())?!=?0L)
????????????//獲得鎖成功
????????????return?next;
????????if?(nanos?<=?0L)
????????????//超時返回0
????????????return?0L;
????????if?((deadline?=?System.nanoTime()?+?nanos)?==?0L)
????????????deadline?=?1L;
????????if?((next?=?acquireWrite(true,?deadline))?!=?INTERRUPTED)
????????????//規(guī)定時間內(nèi)獲得鎖結(jié)果
????????????return?next;
????}
????throw?new?InterruptedException();
}
當(dāng)釋放該鎖的時候需要調(diào)用unlockWrite方法并傳遞獲取鎖的時候的stamp參數(shù)。源碼如下:
/**
?*?state匹配stamp則釋放寫鎖,
?*?@throws?IllegalMonitorStateException??不匹配則拋出異常
?*/
public?void?unlockWrite(long?stamp)?{
????WNode?h;
????//state不匹配stamp??或者?沒有寫鎖
????if?(state?!=?stamp?||?(stamp?&?WBIT)?==?0L)
????????throw?new?IllegalMonitorStateException();
????//state?+=?WBIT,?第8位置為0,但state?&?SBITS?會循環(huán),一共有4個值
????state?=?(stamp?+=?WBIT)?==?0L???ORIGIN?:?stamp;
????if?((h?=?whead)?!=?null?&&?h.status?!=?0)
????????//喚醒繼承者節(jié)點線程
????????release(h);
}
unlockWrite():釋放鎖與加鎖動作相反。將寫標(biāo)記位清零,如果state溢出,則退回到初始值;
- 「悲觀鎖readLock」,是個共享鎖,在沒有線程獲取獨占寫鎖的情況下,同時多個線程可以獲取該鎖;如果已經(jīng)有線程持有寫鎖,其他線程請求獲取該鎖會被阻塞,這類似ReentrantReadWriteLock 的讀鎖(不同在于這里的讀鎖是不可重入鎖)。
/**
?*??悲觀讀鎖,非獨占鎖,為獲得鎖一直處于阻塞狀態(tài),直到獲得鎖為止
?*/
public?long?readLock()?{
????long?s?=?state,?next;??
????//?隊列為空???&&?沒有寫鎖同時讀鎖數(shù)小于126??&&?CAS修改狀態(tài)成功??????則狀態(tài)加1并返回,否則自旋獲取讀鎖
????return?((whead?==?wtail?&&?(s?&?ABITS)?<?RFULL?&&
?????????????U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?RUNIT))??
????????????next?:?acquireRead(false,?0L));
}
樂觀鎖失敗后鎖升級為readLock():嘗試state+1,用于統(tǒng)計讀線程的數(shù)量,如果失敗,進(jìn)入acquireRead()進(jìn)行自旋,通過CAS獲取鎖。
如果自旋失敗,入CLH隊列,然后再自旋,如果成功獲得讀鎖,則激活cowait隊列中的讀線程Unsafe.unpark(),如果最終依然失敗,則Unsafe().park()掛起當(dāng)前線程。
/**
?*?@param?interruptible?是否允許中斷
?*?@param?標(biāo)識超時限時(0代表不限時),然后進(jìn)入循環(huán)。
?*?@return?next?state,?or?INTERRUPTED
?*/
private?long?acquireRead(boolean?interruptible,?long?deadline)?{
????WNode?node?=?null,?p;
????//自旋
????for?(int?spins?=?-1;;)?{
????????WNode?h;
????????//判斷隊列為空
????????if?((h?=?whead)?==?(p?=?wtail))?{
????????????//定義?long?m,s,ns,并循環(huán)
????????????for?(long?m,?s,?ns;;)?{
????????????????//將state超過?RFULL=126的值放到readerOverflow字段中
????????????????if?((m?=?(s?=?state)?&?ABITS)?<?RFULL??
????????????????????U.compareAndSwapLong(this,?STATE,?s,?ns?=?s?+?RUNIT)?:
????????????????????(m?<?WBIT?&&?(ns?=?tryIncReaderOverflow(s))?!=?0L))
????????????????????//獲取鎖成功返回
????????????????????return?ns;
????????????????//state高8位大于0,那么說明當(dāng)前鎖已經(jīng)被寫鎖獨占,那么我們嘗試自旋??+?隨機(jī)的方式來探測狀態(tài)
????????????????else?if?(m?>=?WBIT)?{
????????????????????if?(spins?>?0)?{
????????????????????????if?(LockSupport.nextSecondarySeed()?>=?0)
????????????????????????????--spins;
????????????????????}
????????????????????else?{
????????????????????????if?(spins?==?0)?{
????????????????????????????WNode?nh?=?whead,?np?=?wtail;
????????????????????????????//一直獲取鎖失敗,或者有線程入隊列了退出內(nèi)循環(huán)自旋,后續(xù)進(jìn)入隊列
????????????????????????????if?((nh?==?h?&&?np?==?p)?||?(h?=?nh)?!=?(p?=?np))
????????????????????????????????break;
????????????????????????}
????????????????????????//自旋?SPINS?次
????????????????????????spins?=?SPINS;
????????????????????}
????????????????}
????????????}
????????}
????????if?(p?==?null)?{?
????????????//初始隊列
????????????WNode?hd?=?new?WNode(WMODE,?null);
????????????if?(U.compareAndSwapObject(this,?WHEAD,?null,?hd))
????????????????wtail?=?hd;
????????}
????????//當(dāng)前節(jié)點為空則構(gòu)建當(dāng)前節(jié)點,模式為RMODE,前驅(qū)節(jié)點為p即尾節(jié)點。
????????else?if?(node?==?null)
????????????node?=?new?WNode(RMODE,?p);
????????//當(dāng)前隊列為空即只有一個節(jié)點(whead=wtail)或者當(dāng)前尾節(jié)點的模式不是RMODE,那么我們會嘗試在尾節(jié)點后面添加該節(jié)點作為尾節(jié)點,然后跳出外層循環(huán)
????????else?if?(h?==?p?||?p.mode?!=?RMODE)?{
????????????if?(node.prev?!=?p)
????????????????node.prev?=?p;
????????????else?if?(U.compareAndSwapObject(this,?WTAIL,?p,?node))?{
????????????????p.next?=?node;
????????????????//入隊列成功,退出自旋
????????????????break;
????????????}
????????}
????????//隊列不為空并且是RMODE模式,?添加該節(jié)點到尾節(jié)點的cowait鏈(實際上構(gòu)成一個讀線程stack)中
????????else?if?(!U.compareAndSwapObject(p,?WCOWAIT,
?????????????????????????????????????????node.cowait?=?p.cowait,?node))
????????????//失敗處理
????????????node.cowait?=?null;
????????else?{
????????????//通過CAS方法將該節(jié)點node添加至尾節(jié)點的cowait鏈中,node成為cowait中的頂元素,cowait構(gòu)成了一個LIFO隊列。
????????????//循環(huán)
????????????for?(;;)?{
????????????????WNode?pp,?c;?Thread?w;
????????????????//嘗試unpark頭元素(whead)的cowait中的第一個元素,假如是讀鎖會通過循環(huán)釋放cowait鏈
????????????????if?((h?=?whead)?!=?null?&&?(c?=?h.cowait)?!=?null?&&
????????????????????U.compareAndSwapObject(h,?WCOWAIT,?c,?c.cowait)?&&
????????????????????(w?=?c.thread)?!=?null)?
????????????????????U.unpark(w);
????????????????//node所在的根節(jié)點p的前驅(qū)就是whead或者p已經(jīng)是whead或者p的前驅(qū)為null
????????????????if?(h?==?(pp?=?p.prev)?||?h?==?p?||?pp?==?null)?{
????????????????????long?m,?s,?ns;
????????????????????do?{
????????????????????????//根據(jù)state再次積極的嘗試獲取鎖
????????????????????????if?((m?=?(s?=?state)?&?ABITS)?<?RFULL??
????????????????????????????U.compareAndSwapLong(this,?STATE,?s,
?????????????????????????????????????????????????ns?=?s?+?RUNIT)?:
????????????????????????????(m?<?WBIT?&&
?????????????????????????????(ns?=?tryIncReaderOverflow(s))?!=?0L))
????????????????????????????return?ns;
????????????????????}?while?(m?<?WBIT);//條件為讀模式
????????????????}
????????????????if?(whead?==?h?&&?p.prev?==?pp)?{
????????????????????long?time;
????????????????????if?(pp?==?null?||?h?==?p?||?p.status?>?0)?{
????????????????????????//這樣做的原因是被其他線程闖入奪取了鎖,或者p已經(jīng)被取消
????????????????????????node?=?null;?//?throw?away
????????????????????????break;
????????????????????}
????????????????????if?(deadline?==?0L)
????????????????????????time?=?0L;
????????????????????else?if?((time?=?deadline?-?System.nanoTime())?<=?0L)
????????????????????????return?cancelWaiter(node,?p,?false);
????????????????????Thread?wt?=?Thread.currentThread();
????????????????????U.putObject(wt,?PARKBLOCKER,?this);
????????????????????node.thread?=?wt;
????????????????????if?((h?!=?pp?||?(state?&?ABITS)?==?WBIT)?&&
????????????????????????whead?==?h?&&?p.prev?==?pp)
????????????????????????U.park(false,?time);
????????????????????node.thread?=?null;
????????????????????U.putObject(wt,?PARKBLOCKER,?null);
????????????????????//出現(xiàn)的中斷情況下取消當(dāng)前節(jié)點的cancelWaiter操作
????????????????????if?(interruptible?&&?Thread.interrupted())
????????????????????????return?cancelWaiter(node,?p,?true);
????????????????}
????????????}
????????}
????}
????for?(int?spins?=?-1;;)?{
????????WNode?h,?np,?pp;?int?ps;
????????if?((h?=?whead)?==?p)?{
????????????if?(spins?<?0)
????????????????spins?=?HEAD_SPINS;
????????????else?if?(spins?<?MAX_HEAD_SPINS)
????????????????spins?<<=?1;
????????????for?(int?k?=?spins;;)?{?//?spin?at?head
????????????????long?m,?s,?ns;
????????????????if?((m?=?(s?=?state)?&?ABITS)?<?RFULL??
????????????????????U.compareAndSwapLong(this,?STATE,?s,?ns?=?s?+?RUNIT)?:
????????????????????(m?<?WBIT?&&?(ns?=?tryIncReaderOverflow(s))?!=?0L))?{
????????????????????WNode?c;?Thread?w;
????????????????????whead?=?node;
????????????????????node.prev?=?null;
????????????????????while?((c?=?node.cowait)?!=?null)?{
????????????????????????if?(U.compareAndSwapObject(node,?WCOWAIT,
???????????????????????????????????????????????????c,?c.cowait)?&&
????????????????????????????(w?=?c.thread)?!=?null)
????????????????????????????U.unpark(w);
????????????????????}
????????????????????return?ns;
????????????????}
????????????????else?if?(m?>=?WBIT?&&
?????????????????????????LockSupport.nextSecondarySeed()?>=?0?&&?--k?<=?0)
????????????????????break;
????????????}
????????}
????????else?if?(h?!=?null)?{
????????????WNode?c;?Thread?w;
????????????while?((c?=?h.cowait)?!=?null)?{
????????????????if?(U.compareAndSwapObject(h,?WCOWAIT,?c,?c.cowait)?&&
????????????????????(w?=?c.thread)?!=?null)
????????????????????U.unpark(w);
????????????}
????????}
????????if?(whead?==?h)?{
????????????if?((np?=?node.prev)?!=?p)?{
????????????????if?(np?!=?null)
????????????????????(p?=?np).next?=?node;???//?stale
????????????}
????????????else?if?((ps?=?p.status)?==?0)
????????????????U.compareAndSwapInt(p,?WSTATUS,?0,?WAITING);
????????????else?if?(ps?==?CANCELLED)?{
????????????????if?((pp?=?p.prev)?!=?null)?{
????????????????????node.prev?=?pp;
????????????????????pp.next?=?node;
????????????????}
????????????}
????????????else?{
????????????????long?time;
????????????????if?(deadline?==?0L)
????????????????????time?=?0L;
????????????????else?if?((time?=?deadline?-?System.nanoTime())?<=?0L)
????????????????????return?cancelWaiter(node,?node,?false);
????????????????Thread?wt?=?Thread.currentThread();
????????????????U.putObject(wt,?PARKBLOCKER,?this);
????????????????node.thread?=?wt;
????????????????if?(p.status?<?0?&&
????????????????????(p?!=?h?||?(state?&?ABITS)?==?WBIT)?&&
????????????????????whead?==?h?&&?node.prev?==?p)
????????????????????U.park(false,?time);
????????????????node.thread?=?null;
????????????????U.putObject(wt,?PARKBLOCKER,?null);
????????????????if?(interruptible?&&?Thread.interrupted())
????????????????????return?cancelWaiter(node,?node,?true);
????????????}
????????}
????}
}
并且StampedLock還提供了非阻塞tryReadLock方法,源碼如下:
/**
*?可以立即獲得鎖,則獲取讀鎖,否則返回0
*/
public?long?tryReadLock()?{
????for?(;;)?{
????????long?s,?m,?next;
????????//持有寫鎖返回0
????????if?((m?=?(s?=?state)?&?ABITS)?==?WBIT)
????????????return?0L;
????????//讀線程數(shù)?<?RFULL,CAS變更狀態(tài)
????????else?if?(m?<?RFULL)?{
????????????if?(U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?RUNIT))
????????????????return?next;
????????}
????????//將state超過?RFULL的值放到readerOverflow字段
????????else?if?((next?=?tryIncReaderOverflow(s))?!=?0L)
????????????return?next;
????}
}
/**
?*?unit時間內(nèi)獲得讀鎖成功返回狀態(tài)值,失敗返回0,或拋出InterruptedException
?*/
public?long?tryReadLock(long?time,?TimeUnit?unit)
????throws?InterruptedException?{
????long?s,?m,?next,?deadline;
????long?nanos?=?unit.toNanos(time);
????if?(!Thread.interrupted())?{
????????if?((m?=?(s?=?state)?&?ABITS)?!=?WBIT)?{
????????????if?(m?<?RFULL)?{
????????????????if?(U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?RUNIT))
????????????????????return?next;
????????????}
????????????else?if?((next?=?tryIncReaderOverflow(s))?!=?0L)
????????????????return?next;
????????}
????????if?(nanos?<=?0L)
????????????return?0L;
????????if?((deadline?=?System.nanoTime()?+?nanos)?==?0L)
????????????deadline?=?1L;
????????if?((next?=?acquireRead(true,?deadline))?!=?INTERRUPTED)
????????????return?next;
????}
????throw?new?InterruptedException();
}
StampedLock的悲觀讀鎖readLock 當(dāng)釋放該鎖時候需要 unlockRead 并傳遞參數(shù) stamp。源碼如下:
/**
?*?state匹配stamp則釋放讀鎖,
?*/
public?void?unlockRead(long?stamp)?{
????long?s,?m;?WNode?h;
????for?(;;)?{
????????//不匹配拋出異常
????????if?(((s?=?state)?&?SBITS)?!=?(stamp?&?SBITS)?||
????????????(stamp?&?ABITS)?==?0L?||?(m?=?s?&?ABITS)?==?0L?||?m?==?WBIT)
????????????throw?new?IllegalMonitorStateException();
????????//小于最大記錄數(shù)值
????????if?(m?<?RFULL)?{
????????????if?(U.compareAndSwapLong(this,?STATE,?s,?s?-?RUNIT))?{
????????????????if?(m?==?RUNIT?&&?(h?=?whead)?!=?null?&&?h.status?!=?0)
????????????????????release(h);
????????????????break;
????????????}
????????}
????????//否則readerOverflow減一
????????else?if?(tryDecReaderOverflow(s)?!=?0L)
????????????break;
????}
}
- 「樂觀讀鎖 tryOptimisticRead」,是相對于悲觀鎖來說的,在操作數(shù)據(jù)前并沒有通過 CAS 設(shè)置鎖的狀態(tài),僅僅是通過位運算測試;如果當(dāng)前沒有線程持有寫鎖,則簡單的返回一個非 0 的 stamp 版本信息,
/**
?*?獲取樂觀讀鎖,返回郵票stamp
?*/
public?long?tryOptimisticRead()?{
????long?s;??//有寫鎖返回0.???否則返回256
????return?(((s?=?state)?&?WBIT)?==?0L)???(s?&?SBITS)?:?0L;
}
tryOptimisticRead():如果當(dāng)前沒有寫鎖占用,返回state(后7位清0,即清0讀線程數(shù)),如果有寫鎖,返回0,即失敗。
/**
?*?驗證從調(diào)用tryOptimisticRead開始到現(xiàn)在這段時間內(nèi)有無寫鎖占用過鎖資源,有寫鎖獲得過鎖資源則返回false.?stamp為0返回false.
?*?@return?從返回stamp開始,沒有寫鎖獲得過鎖資源返回true,否則返回false
?*/
public?boolean?validate(long?stamp)?{
????//強(qiáng)制讀取操作和驗證操作在一些情況下的內(nèi)存排序問題
????U.loadFence();
????//當(dāng)持有寫鎖后再釋放寫鎖,該校驗也不成立,返回false
????return?(stamp?&?SBITS)?==?(state?&?SBITS);
}
「StamedLock還支持這三種鎖在一定條件下進(jìn)行相互轉(zhuǎn)換,例如long tryConvertToWriteLock(long stamp)期望把stamp標(biāo)示的鎖升級為寫鎖,這個函數(shù)會在下面幾種情況下返回一個有效的 stamp(也就是晉升寫鎖成功):」
-
當(dāng)前鎖已經(jīng)是寫鎖模式了。
-
當(dāng)前鎖處于讀鎖模式,并且沒有其他線程是讀鎖模式
-
當(dāng)前處于樂觀讀模式,并且當(dāng)前寫鎖可用。
源碼如下:
/**
?*?state匹配stamp時,?執(zhí)行下列操作之一.?
?*???1、stamp?已經(jīng)持有寫鎖,直接返回.??
?*???2、讀模式,但是沒有更多的讀取者,并返回一個寫鎖stamp.
?*???3、有一個樂觀讀鎖,只在即時可用的前提下返回一個寫鎖stamp
?*???4、其他情況都返回0
?*/
public?long?tryConvertToWriteLock(long?stamp)?{
????long?a?=?stamp?&?ABITS,?m,?s,?next;
????//state匹配stamp
????while?(((s?=?state)?&?SBITS)?==?(stamp?&?SBITS))?{
????????//沒有鎖
????????if?((m?=?s?&?ABITS)?==?0L)?{
????????????if?(a?!=?0L)
????????????????break;
????????????//CAS修改狀態(tài)為持有寫鎖,并返回
????????????if?(U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?WBIT))
????????????????return?next;
????????}
????????//持有寫鎖
????????else?if?(m?==?WBIT)?{
????????????if?(a?!=?m)
????????????????//其他線程持有寫鎖
????????????????break;
????????????//當(dāng)前線程已經(jīng)持有寫鎖
????????????return?stamp;
????????}
????????//有一個讀鎖
????????else?if?(m?==?RUNIT?&&?a?!=?0L)?{
????????????//釋放讀鎖,并嘗試持有寫鎖
????????????if?(U.compareAndSwapLong(this,?STATE,?s,
?????????????????????????????????????next?=?s?-?RUNIT?+?WBIT))
????????????????return?next;
????????}
????????else
????????????break;
????}
????return?0L;
}
/**
*???state匹配stamp時,?執(zhí)行下列操作之一.
????1、stamp?表示持有寫鎖,釋放寫鎖,并持有讀鎖
????2?stamp?表示持有讀鎖?,返回該讀鎖
????3?有一個樂觀讀鎖,只在即時可用的前提下返回一個讀鎖stamp
????4、其他情況都返回0,表示失敗
?*
?*/
public?long?tryConvertToReadLock(long?stamp)?{
????long?a?=?stamp?&?ABITS,?m,?s,?next;?WNode?h;
????//state匹配stamp
????while?(((s?=?state)?&?SBITS)?==?(stamp?&?SBITS))?{
????????//沒有鎖
????????if?((m?=?s?&?ABITS)?==?0L)?{
????????????if?(a?!=?0L)
????????????????break;
????????????else?if?(m?<?RFULL)?{
????????????????if?(U.compareAndSwapLong(this,?STATE,?s,?next?=?s?+?RUNIT))
????????????????????return?next;
????????????}
????????????else?if?((next?=?tryIncReaderOverflow(s))?!=?0L)
????????????????return?next;
????????}
????????//寫鎖
????????else?if?(m?==?WBIT)?{
????????????//非當(dāng)前線程持有寫鎖
????????????if?(a?!=?m)
????????????????break;
????????????//釋放寫鎖持有讀鎖
????????????state?=?next?=?s?+?(WBIT?+?RUNIT);
????????????if?((h?=?whead)?!=?null?&&?h.status?!=?0)
????????????????release(h);
????????????return?next;
????????}
????????//持有讀鎖
????????else?if?(a?!=?0L?&&?a?<?WBIT)
????????????return?stamp;
????????else
????????????break;
????}
????return?0L;
}
校驗這個戳是否有效validate():比較當(dāng)前stamp和發(fā)生樂觀鎖得到的stamp比較,不一致則失敗。
還有一個轉(zhuǎn)換成樂觀鎖tryConvertToOptimisticRead(long stamp) ,這里就不講了,道理都差不多。 另外 StampedLock 的讀寫鎖都是不可重入鎖,所以當(dāng)獲取鎖后釋放鎖前,不應(yīng)該再調(diào)用會獲取鎖的操作,以避免產(chǎn)生死鎖。 當(dāng)多個線程同時嘗試獲取讀鎖和寫鎖的時候,誰先獲取鎖沒有一定的規(guī)則,完全都是盡力而為,是隨機(jī)的,并且該鎖不是直接實現(xiàn) Lock 或 ReadWriteLock 接口,而是內(nèi)部自己維護(hù)了一個雙向阻塞隊列。 「下面通過 JDK8 里面提供的一個管理二維點的例子講解來加深對上面講解的理解。代碼如下所示:」
package?com.hjc;
import?java.util.concurrent.locks.StampedLock;
/**
?*?Created?by?cong?on?2018/6/16.
?*/
public?class?Point?{
????//?成員變量
????private?double?x,?y;
????//?鎖實例
????private?final?StampedLock?sl?=?new?StampedLock();
????//?排它鎖-寫鎖(writeLock)
????void?move(double?deltaX,?double?deltaY)?{
????????long?stamp?=?sl.writeLock();
????????try?{
????????????x?+=?deltaX;
????????????y?+=?deltaY;
????????}?finally?{
????????????sl.unlockWrite(stamp);
????????}
????}
????//?樂觀讀鎖(tryOptimisticRead)
????double?distanceFromOrigin()?{
????????//?嘗試獲取樂觀讀鎖(1)
????????long?stamp?=?sl.tryOptimisticRead();
????????//?將全部變量拷貝到方法體棧內(nèi)(2)
????????double?currentX?=?x,?currentY?=?y;
????????//?檢查在(1)獲取到讀鎖票據(jù)后,鎖有沒被其它寫線程排它性搶占(3)
????????if?(!sl.validate(stamp))?{
????????????//?如果被搶占則獲取一個共享讀鎖(悲觀獲取)(4)
????????????stamp?=?sl.readLock();
????????????try?{
????????????????//?將全部變量拷貝到方法體棧內(nèi)(5)
????????????????currentX?=?x;
????????????????currentY?=?y;
????????????}?finally?{
????????????????//?釋放共享讀鎖(6)
????????????????sl.unlockRead(stamp);
????????????}
????????}
????????//?返回計算結(jié)果(7)
????????return?Math.sqrt(currentX?*?currentX?+?currentY?*?currentY);
????}
????//?使用悲觀鎖獲取讀鎖,并嘗試轉(zhuǎn)換為寫鎖
????void?moveIfAtOrigin(double?newX,?double?newY)?{
????????//?這里可以使用樂觀讀鎖替換(1)
????????long?stamp?=?sl.readLock();
????????try?{
????????????//?如果當(dāng)前點在原點則移動(2)
????????????while?(x?==?0.0?&&?y?==?0.0)?{
????????????????//?嘗試將獲取的讀鎖升級為寫鎖(3)
????????????????long?ws?=?sl.tryConvertToWriteLock(stamp);
????????????????//?升級成功,則更新票據(jù),并設(shè)置坐標(biāo)值,然后退出循環(huán)(4)
????????????????if?(ws?!=?0L)?{
????????????????????stamp?=?ws;
????????????????????x?=?newX;
????????????????????y?=?newY;
????????????????????break;
????????????????}?else?{
????????????????????//?讀鎖升級寫鎖失敗則釋放讀鎖,顯示獲取獨占寫鎖,然后循環(huán)重試(5)
????????????????????sl.unlockRead(stamp);
????????????????????stamp?=?sl.writeLock();
????????????????}
????????????}
????????}?finally?{
????????????//?釋放鎖(6)
????????????sl.unlock(stamp);
????????}
????}
????
}
如上代碼 Point 類里面有兩個成員變量(x,y) 來標(biāo)示一個點的二維坐標(biāo),和三個操作坐標(biāo)變量的方法,另外實例化了一個 StampedLock 對象用來保證操作的原子性。
首先分析下 move 方法,該函數(shù)作用是使用參數(shù)的增量值,改變當(dāng)前 point 坐標(biāo)的位置;代碼先獲取到了寫鎖,然后對 point 坐標(biāo)進(jìn)行修改,然后釋放鎖。該鎖是排它鎖,這保證了其它線程調(diào)用 move 函數(shù)時候會被阻塞,也保證了其它線程不能獲取讀鎖,讀取坐標(biāo)的值,直到當(dāng)前線程顯示釋放了寫鎖, 也就是保證了對變量 x,y 操作的原子性和數(shù)據(jù)一致性。 接下來再看 distanceFromOrigin 方法,該方法作用是計算當(dāng)前位置到原點(坐標(biāo)為 0,0)的距離,代碼(1)首先嘗試獲取樂觀讀鎖,如果當(dāng)前沒有其它線程獲取到了寫鎖,那么(1)會返回一個非 0 的 stamp 用來表示版本信息,代碼(2)拷貝坐標(biāo)變量到本地方法棧里面。 代碼(3)檢查在(1)獲取到的 stamp 值是否還有效,之所以還要在此校驗是因為代碼(1)獲取讀鎖時候并沒有通過 CAS 操作修改鎖的狀態(tài),而是簡單的通過與或操作返回了一個版本信息,這里校驗是看在在獲取版本信息到現(xiàn)在的時間段里面是否有其它線程持有了寫鎖,如果有則之前獲取的版本信息就無效了。 這里如果校驗成功則執(zhí)行(7)使用本地方法棧里面的值進(jìn)行計算然后返回。需要注意的是在代碼(3) 校驗成功后,代碼(7)計算期間,其它線程可能獲取到了寫鎖并且修改了 x,y 的值,而當(dāng)前線程執(zhí)行代碼(7)進(jìn)行計算時候采用的還是修改前值的拷貝,也就是操作的值是對之前值的一個拷貝,一個快照,并不是最新的值。 「也許我們會想,代碼(2) 和(3)能否互換?。」 答案是明顯不能的,如果位置換了,那么首先執(zhí)行validate ,假設(shè)驗證通過了,要拷貝x,y 值到本地方法棧,而在拷貝的過程中很有可能其他線程已經(jīng)修改過了 x,y 中的一個,這就造成了數(shù)據(jù)的不一致性了。 「那么你可能還會這樣會想,即使不交換代碼 (2) 和(3),在拷貝 x,y 值到本地方法棧里面時,也會存在其他線程修改了x,y中的一個值,這不也會存在問題嗎?」 這個確實會存在,但是別忘記了拷貝后還有一道validate,如果這時候有線程修改了x,y 中的值,那么肯定是有線程在調(diào)用 validate 前,調(diào)用 sl.tryOptimisticRead 后獲取了寫鎖,那么進(jìn)行 validate 時候就會失敗。 好了知道這么多原理后,我們就會驚嘆這也是樂觀讀設(shè)計的精妙之處也是使用時候容易出問題的地方。下面繼續(xù)分析 validate 失敗后會執(zhí)行代碼(4)獲取悲觀讀鎖,如果這時候其他線程持有寫鎖則代碼(4)會導(dǎo)致的當(dāng)前線程阻塞直到其它線程釋放了寫鎖。 如果這時候沒有其他線程獲取到寫鎖,那么當(dāng)前線程就可以獲取到讀鎖,然后執(zhí)行代碼(5)重新拷貝新的坐標(biāo)值到本地方法棧,然后就是代碼(6)釋放了鎖,拷貝的時候由于加了讀鎖,所以拷貝期間其它線程獲取寫鎖時候會被阻塞, 這保證了數(shù)據(jù)的一致性,另外這里 x,y 沒有被聲明為 volatie,會不會存在內(nèi)存不可見性問題那?答案是不會,因為加鎖的語義保證了內(nèi)存可見性, 最后代碼(7)使用方法棧里面數(shù)據(jù)計算返回,同理這里在計算時候使用的數(shù)據(jù)也可能不是最新的,其它寫線程可能已經(jīng)修改過原來的 x,y 值了。 最后一個方法 moveIfAtOrigin 作用是如果當(dāng)前坐標(biāo)為原點則移動到指定的位置。代碼(1)獲取悲觀讀鎖,保證其它線程不能獲取寫鎖修改 x,y 值,然后代碼(2)判斷如果當(dāng)前點在原點則更新坐標(biāo), 代碼(3) 嘗試升級讀鎖為寫鎖,這里升級不一定成功,因為多個線程都可以同時獲取悲觀讀鎖,當(dāng)多個線程都執(zhí)行到(3)時候只有一個可以升級成功,升級成功則返回非 0 的 stamp,否非返回 0。 這里假設(shè)當(dāng)前線程升級成功,然后執(zhí)行步驟(4)更新 stamp 值和坐標(biāo)值,然后退出循環(huán)。如果升級失敗則執(zhí)行步驟(5)首先釋放讀鎖然后申請寫鎖,獲取到寫鎖后在循環(huán)重新設(shè)置坐標(biāo)值。最后步驟(6) 釋放鎖。 「使用樂觀讀鎖還是很容易犯錯誤的,必須要嚴(yán)謹(jǐn),必須要保證如下的使用順序,用偽代碼作為講解,如下:」
long?stamp?=?lock.tryOptimisticRead();?//非阻塞獲取版本信息
copyVaraibale2ThreadMemory();//拷貝變量到線程本地堆棧
if(!lock.validate(stamp)){?//?校驗
????long?stamp?=?lock.readLock();//獲取讀鎖
????try?{
????????copyVaraibale2ThreadMemory();//拷貝變量到線程本地堆棧
?????}?finally?{
???????lock.unlock(stamp);//釋放悲觀鎖
????}
}
useThreadMemoryVarables();//使用線程本地堆棧里面的數(shù)據(jù)進(jìn)行操作
「總結(jié):」StampedLock 提供的讀寫鎖與 ReentrantReadWriteLock 類似,只是前者的都是不可重入鎖。但是前者通過提供樂觀讀鎖在多線程多讀的情況下提供更好的性能,這是因為獲取樂觀讀鎖時候不需要進(jìn)行 CAS 操作設(shè)置鎖的狀態(tài),而只是簡單的測試狀態(tài)。
來源:cnblogs.com/huangjuncong/p/9191760.htm
