JUC并發(fā)編程之ReentrantLock非公平鎖源碼詳解

下圖就是基于AQS所實(shí)現(xiàn)的一些比較強(qiáng)大的功能類,例如公平與非公平鎖,信號(hào)量,線程池


// 標(biāo)識(shí)拿到鎖的是哪個(gè)線程private transient Thread exclusiveOwnerThread;
// 標(biāo)識(shí)頭節(jié)點(diǎn)private transient volatile Node head;// 標(biāo)識(shí)尾節(jié)點(diǎn)private transient volatile Node tail;// 同步狀態(tài),為0時(shí),說(shuō)明可以搶鎖private volatile int state;
// 記錄Node狀態(tài)volatile int waitStatus;// 標(biāo)識(shí)Node是哪個(gè)線程所持有volatile Node prev;// 前驅(qū)節(jié)點(diǎn)(當(dāng)前Node的上一個(gè)是誰(shuí))volatile Node next;// 后繼節(jié)點(diǎn)(當(dāng)前Node的個(gè)一個(gè)是誰(shuí))volatile Thread thread;
下面是一個(gè)簡(jiǎn)單的lock案例,使用lock需要注意點(diǎn),我們獲取鎖需要被try包裹,釋放鎖一定要在finally里面,為什么需要這么做呢?是因?yàn)槿绻鄠€(gè)線程同時(shí)進(jìn)來(lái)?yè)屾i,當(dāng)其中一個(gè)線程搶到鎖后,執(zhí)行完業(yè)務(wù)邏輯還沒(méi)來(lái)得及釋放鎖就發(fā)生了異常,就會(huì)導(dǎo)致鎖沒(méi)有被釋放,從而該業(yè)務(wù)邏輯一直被卡住。所以為了避免這一現(xiàn)象的發(fā)生,我們的釋放鎖一定要寫(xiě)在finally里面啦~
public class ReentrantLockTest {// 初始化Lock類static final Lock lock = new ReentrantLock();static int sum = 0;public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 3; i++) {Thread thread = new Thread(new Runnable() {@Overridepublic void run() {try {// 獲取鎖lock.lock();for (int i1 = 0; i1 < 10000; i1++) {sum++;}} catch (Exception e) {e.printStackTrace();} finally {//釋放鎖lock.unlock();}}});thread.start();}TimeUnit.SECONDS.sleep(1);System.out.println(sum);}}
在ReentLock里面,其中有兩個(gè)構(gòu)造方法值得我們注意


public void lock() {sync.lock();}

首先嘗試以最快的方式獲取鎖,當(dāng)多個(gè)線程同時(shí)進(jìn)來(lái)了,只會(huì)有其中一個(gè)線程以CAS的方式將state的值更新為1,前提條件是只有當(dāng)state的值為0的時(shí)候才能更新成功,同時(shí)會(huì)記錄獲取鎖的線程,若獲取鎖失敗,則執(zhí)行acquire方法
static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {//以cas的方式將AQS中的state屬性值從0更新為1if (compareAndSetState(0, 1))//如果更新成功,則將當(dāng)前線程設(shè)置為持有鎖的線程setExclusiveOwnerThread(Thread.currentThread());else//獲取鎖失敗,則進(jìn)入該方法acquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}
protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
在ReentrantLock中,它的加鎖鉤子方法如下所示,如果不進(jìn)行重寫(xiě)該方法,則強(qiáng)制拋出異常。
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}
找到關(guān)鍵點(diǎn) -> 非公平類加鎖入口

如下則是非公平鎖的實(shí)現(xiàn)方式,在其底層調(diào)用了nonfairTryAcquire()方法
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
final boolean nonfairTryAcquire(int acquires) {//記錄當(dāng)前線程final Thread current = Thread.currentThread();//獲取當(dāng)前state的值int c = getState();//如果state的值為0,則證明沒(méi)有線程搶占鎖if (c == 0) {// 通過(guò)cas進(jìn)行加鎖操作if (compareAndSetState(0, acquires)) {// 記錄加鎖線程setExclusiveOwnerThread(current);return true;}}// 如果鎖已經(jīng)被搶占,且當(dāng)前線程就是持有鎖的線程,則說(shuō)明該鎖被重入else if (current == getExclusiveOwnerThread()) {//計(jì)算要更新后的state值int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");//給state設(shè)置值,該方式為非同步setState(nextc);//重入鎖獲取成功return true;}//獲取鎖失敗return false;}
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
private Node addWaiter(Node mode) {//創(chuàng)建一個(gè)節(jié)點(diǎn),將線程實(shí)例封裝到Node節(jié)點(diǎn)內(nèi)部,當(dāng)前mode這里是為null的Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure//獲取末尾節(jié)點(diǎn)Node pred = tail;//如果末尾節(jié)點(diǎn)不為空,則證明前面已經(jīng)有線程排隊(duì)啦if (pred != null) {//將末尾節(jié)點(diǎn)的引用交給當(dāng)前線程的前節(jié)點(diǎn)node.prev = pred;// 通過(guò)cas,將當(dāng)前線程設(shè)置為鏈表的尾節(jié)點(diǎn)if (compareAndSetTail(pred, node)) {// 將前尾節(jié)點(diǎn)的next引用,指向當(dāng)前節(jié)點(diǎn),那么當(dāng)前節(jié)點(diǎn)就是鏈表尾節(jié)點(diǎn)了pred.next = node;return node;}}//構(gòu)建鏈表核心方法enq(node);return node;}
private Node enq(final Node node) {//自旋for (;;) {//獲取當(dāng)前雙向鏈表的末尾節(jié)點(diǎn)Node t = tail;//如果當(dāng)前還沒(méi)有雙向鏈表,則進(jìn)行構(gòu)建鏈表if (t == null) { // Must initialize//初始化一個(gè)head節(jié)點(diǎn),空的node節(jié)點(diǎn)if (compareAndSetHead(new Node()))//將head節(jié)點(diǎn)賦值給tail節(jié)點(diǎn),這樣的頭尾節(jié)點(diǎn)都已生成好了tail = head;} else {//自旋第二次會(huì)進(jìn)來(lái)//頭節(jié)點(diǎn)設(shè)置為 空節(jié)點(diǎn)node.prev = t;//通過(guò)cas,將當(dāng)前節(jié)點(diǎn)設(shè)置為末尾節(jié)點(diǎn)if (compareAndSetTail(t, node)) {//設(shè)置尾節(jié)點(diǎn)引用t.next = node;return t;}}}}

那么到此獲取鎖失敗的線程,通過(guò)構(gòu)建鏈表,然后假如到同步隊(duì)列的邏輯到此就結(jié)束了,但是線程加入同步隊(duì)列后會(huì)做什么我們并不清楚,這部分我們并不清楚,所以我們接著退回到acquire方法來(lái),我們進(jìn)入到acquireQueued方法內(nèi)部,看看它里面做了些啥邏輯
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
第一個(gè)if判斷中,首先會(huì)判斷當(dāng)前線程的前驅(qū)節(jié)點(diǎn)是否為頭節(jié)點(diǎn),如果是則嘗試獲取鎖,獲取鎖成功則將當(dāng)前線程節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn),為什么必須是前驅(qū)節(jié)點(diǎn)才嘗試去獲取鎖,因?yàn)檎G闆r下,頭節(jié)點(diǎn)才是持有鎖的線程,頭節(jié)點(diǎn)線程釋放掉鎖后,會(huì)喚醒后驅(qū)節(jié)點(diǎn)的線程,這個(gè)時(shí)候被喚醒后才會(huì)進(jìn)行獲取鎖。舉個(gè)很形象的例子,就比如火車站窗口排隊(duì)買(mǎi)火車票,此時(shí)有人正在購(gòu)買(mǎi)火車票,而排在第二個(gè)人可以知道第一個(gè)人是否購(gòu)買(mǎi)完成,如果買(mǎi)完了我就可以去買(mǎi)票了啊,如果沒(méi)買(mǎi)完,那么第二個(gè)人后面的人就只能老老實(shí)實(shí)的進(jìn)行排隊(duì)購(gòu)票啊。
那么第二個(gè)if判斷就是如果前面有人在排隊(duì)購(gòu)票,那么我就只能老老實(shí)實(shí)排在后面進(jìn)行等待唄。再舉個(gè)很形象的例子哈,有一款很好玩的游戲,因?yàn)檫@款游戲還沒(méi)發(fā)售需要進(jìn)行預(yù)約,此時(shí)我并不知道它什么時(shí)候才會(huì)發(fā)售,所以就先預(yù)約排隊(duì)嘛,當(dāng)要發(fā)售了,就會(huì)有工作人員去通知你,你預(yù)約的游戲可以已經(jīng)發(fā)售了,你可以進(jìn)行購(gòu)買(mǎi)了,這就是排隊(duì)與通知的。
//Node: 為線程1的節(jié)點(diǎn) arg:1final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;//只有獲取到鎖的線程才會(huì)跳出循環(huán)for (;;) {//獲取當(dāng)前線程節(jié)點(diǎn)的 prev 節(jié)點(diǎn)final Node p = node.predecessor();// 第二個(gè)線程 第一個(gè)條件為true , 第二個(gè)條件為false,因?yàn)榈诙€(gè)條件是 搶鎖邏輯if (p == head && tryAcquire(arg)) {setHead(node); //將當(dāng)前節(jié)點(diǎn)更新為頭節(jié)點(diǎn)p.next = null; // help GCfailed = false;return interrupted; //正常情況下死循環(huán)的唯一出口}// 將上一個(gè)節(jié)點(diǎn)設(shè)置為獨(dú)占鎖-1,條件返回fasle,第二次自旋進(jìn)來(lái)if (shouldParkAfterFailedAcquire(p, node) && //判斷線程是否需要被阻塞//阻塞當(dāng)前線程parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
我們可以前驅(qū)節(jié)點(diǎn)pred的狀態(tài)會(huì)進(jìn)行不同處理
其實(shí)這個(gè)方法主要作用就是確保當(dāng)前結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)的狀態(tài)為SIGNAL,SIGNAL意味著線程釋放鎖后會(huì)喚醒后面阻塞的線程。畢竟只有確保能夠被喚醒,當(dāng)前線程才能放心的阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL) //狀態(tài)為SIGNAL/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) { //狀態(tài)為CANCELLED/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else { //狀態(tài)為初始化狀態(tài)(ReentrentLock語(yǔ)境下)/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
public void unlock() {sync.release(1);}
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
protected final boolean tryRelease(int releases) {int c = getState() - releases; //計(jì)算待更新的state值if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) { //待更新的state值為0,說(shuō)明持有鎖的線程未重入,一旦釋放鎖其他線程將能獲取free = true;setExclusiveOwnerThread(null); //清除鎖的持有線程標(biāo)記}setState(c); //更新state值return free;}
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
private void unparkSuccessor(Node node) {// 先獲取head節(jié)點(diǎn)的狀態(tài),應(yīng)該是等于-1,原因在shouldParkAfterFailedAcquire方法中有體現(xiàn)int ws = node.waitStatus;// 由于-1會(huì)小于0,所以更新改為0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 獲取第一個(gè)正常排隊(duì)的節(jié)點(diǎn)Node s = node.next;//正常解鎖流程不會(huì)走該if判斷if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 正常來(lái)說(shuō)第一個(gè)排隊(duì)的節(jié)點(diǎn)不應(yīng)該為空,所以直接把第一個(gè)排隊(duì)的線程喚醒if (s != null)LockSupport.unpark(s.thread);}
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
我是黎明大大,我知道我沒(méi)有驚世的才華,也沒(méi)有超于凡人的能力,但畢竟我還有一個(gè)不屈服,敢于選擇向命運(yùn)沖鋒的靈魂,和一個(gè)就是傷痕累累也要義無(wú)反顧走下去的心。
如果您覺(jué)得本文對(duì)您有幫助,還請(qǐng)關(guān)注點(diǎn)贊一波,后期將不間斷更新更多技術(shù)文章

●JUC并發(fā)編程之Synchronized關(guān)鍵字詳解
●JUC并發(fā)編程之MESI緩存一致協(xié)議詳解

