同步鎖基本原理與實(shí)現(xiàn)
為充分利用機(jī)器性能,人們發(fā)明了多線程。但同時(shí)帶來(lái)了線程安全問(wèn)題,于是人們又發(fā)明了同步鎖。
這個(gè)問(wèn)題自然人人知道,但你真的了解同步鎖嗎?還是說(shuō)你會(huì)用其中的上鎖與解鎖功能?
今天我們就一起來(lái)深入看同步鎖的原理和實(shí)現(xiàn)吧!
一:同步鎖的職責(zé)
同步鎖的職責(zé)可以說(shuō)就一個(gè),限制資源的使用(線程安全從屬)。
它一般至少會(huì)包含兩個(gè)功能: 1. 給資源加鎖;2. 給資源解鎖;另外,它一般還有 等待/通知 即 wait/notify 的功能;
同步鎖的應(yīng)用場(chǎng)景:多個(gè)線程同時(shí)操作一個(gè)事務(wù)必須保證正確性;一個(gè)資源只能同時(shí)由一線程訪問(wèn)操作;一個(gè)資源最多只能接入k的并發(fā)訪問(wèn);保證訪問(wèn)的順序性;
同步鎖的實(shí)現(xiàn)方式:操作系統(tǒng)調(diào)度實(shí)現(xiàn);應(yīng)用自行實(shí)現(xiàn);CAS自旋;
同步鎖的幾個(gè)問(wèn)題:
為什么它能保證線程安全?
鎖等待耗CPU嗎?
使用鎖后性能下降嚴(yán)重的原因是啥?
二:同步鎖的實(shí)現(xiàn)一:lock/unlock
其實(shí)對(duì)于應(yīng)用層來(lái)說(shuō),非常多就是 lock/unlock , 這也是鎖的核心。
AQS 是java中很多鎖實(shí)現(xiàn)的基礎(chǔ),因?yàn)樗帘瘟撕芏喾彪s而底層的阻塞操作,為上層抽象出易用的接口。
我們就以AQS作為跳板,先來(lái)看一下上鎖的過(guò)程。為不至于陷入具體鎖的業(yè)務(wù)邏輯中,我們先以最簡(jiǎn)單的 CountDownLatch 看看。
// 先看看 CountDownLatch 的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu),可以說(shuō)是不能再簡(jiǎn)單了,就繼承了 AQS,然后簡(jiǎn)單覆寫(xiě)了幾個(gè)必要方法。// java.util.concurrent.CountDownLatch.Sync/*** Synchronization control For CountDownLatch.* Uses AQS state to represent count.*/private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}int getCount() {return getState();}protected int tryAcquireShared(int acquires) {// 只有一種情況會(huì)獲取鎖成功,即 state == 0 的時(shí)候return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;// 原始的鎖數(shù)量是在初始化時(shí)指定的不可變的,每次釋放一個(gè)鎖標(biāo)識(shí)int nextc = c-1;if (compareAndSetState(c, nextc))// 只有一情況會(huì)釋放鎖成功,即本次釋放后 state == 0return nextc == 0;}}}private final Sync sync;
重點(diǎn)1,我們看看上鎖過(guò)程,即 await() 的調(diào)用。
public void await() throws InterruptedException {// 調(diào)用 AQS 的接口,由AQS實(shí)現(xiàn)了鎖的骨架邏輯sync.acquireSharedInterruptibly(1);}// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly/*** Acquires in shared mode, aborting if interrupted. Implemented* by first checking interrupt status, then invoking at least once* {@link #tryAcquireShared}, returning on success. Otherwise the* thread is queued, possibly repeatedly blocking and unblocking,* invoking {@link #tryAcquireShared} until success or the thread* is interrupted.* @param arg the acquire argument.* This value is conveyed to {@link #tryAcquireShared} but is* otherwise uninterpreted and can represent anything* you like.* @throws InterruptedException if the current thread is interrupted*/public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 首先嘗試獲取鎖,如果成功就不用阻塞了// 而從上面的邏輯我們看到,獲取鎖相當(dāng)之簡(jiǎn)單,所以,獲取鎖本身并沒(méi)有太多的性能消耗喲// 如果獲取鎖失敗,則會(huì)進(jìn)行稍后嘗試,這應(yīng)該是復(fù)雜而精巧的if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}/*** Acquires in shared interruptible mode.* @param arg the acquire argument*/private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 首先將當(dāng)前線程添加排隊(duì)隊(duì)尾,此處會(huì)保證線程安全,稍后我們可以看到final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {// 獲取其上一節(jié)點(diǎn),如果上一節(jié)點(diǎn)是頭節(jié)點(diǎn),就代表當(dāng)前線程可以再次嘗試獲取鎖了final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 先檢測(cè)是否需要阻塞,然后再進(jìn)行阻塞等待,阻塞由 LockSupport 底層支持// 如果阻塞后,將不會(huì)主動(dòng)喚醒,只會(huì)由 unlock 時(shí),主動(dòng)被通知// 因此,此處即是獲取鎖的最終等待點(diǎn)// 操作系統(tǒng)將不會(huì)再次調(diào)度到本線程,直到獲取到鎖if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}// 如此線程安全地添加當(dāng)前線程到隊(duì)尾?CAS 保證/*** Creates and enqueues node for current thread and given mode.** @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared* @return the new node*/private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}/*** Inserts node into queue, initializing if necessary. See picture above.* @param node the node to insert* @return node's predecessor*/private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}// 檢測(cè)是否需要進(jìn)行阻塞/*** Checks and updates status for a node that failed to acquire.* Returns true if thread should block. This is the main signal* control in all acquire loops. Requires that pred == node.prev.** @param pred node's predecessor holding status* @param node the node* @return {@code true} if thread should block*/private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/// 只有前置節(jié)點(diǎn)是 SIGNAL 狀態(tài)的節(jié)點(diǎn),才需要進(jìn)行 阻塞等待,當(dāng)然前置節(jié)點(diǎn)會(huì)在下一次循環(huán)中被設(shè)置好return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}// park 阻塞實(shí)現(xiàn)/*** Convenience method to park and then check if interrupted** @return {@code true} if interrupted*/private final boolean parkAndCheckInterrupt() {// 將當(dāng)前 AQS 實(shí)例作為鎖對(duì)象 blocker, 進(jìn)行操作系統(tǒng)調(diào)用阻塞, 所以所有等待鎖的線程將會(huì)在同一個(gè)鎖前提下執(zhí)行LockSupport.park(this);return Thread.interrupted();}
如上,上鎖過(guò)程是比較簡(jiǎn)單明了的。加入一隊(duì)列,然后由操作系統(tǒng)將線程調(diào)出。(那么操作系統(tǒng)是如何把線程調(diào)出的呢?有興趣自行研究)
重點(diǎn)2. 解鎖過(guò)程,即 countDown() 調(diào)用
public void countDown() {// 同樣直接調(diào)用 AQS 的接口,由AQS實(shí)現(xiàn)了鎖的釋放骨架邏輯sync.releaseShared(1);}// java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared/*** Releases in shared mode. Implemented by unblocking one or more* threads if {@link #tryReleaseShared} returns true.** @param arg the release argument. This value is conveyed to* {@link #tryReleaseShared} but is otherwise uninterpreted* and can represent anything you like.* @return the value returned from {@link #tryReleaseShared}*/public final boolean releaseShared(int arg) {// 調(diào)用業(yè)務(wù)實(shí)現(xiàn)的釋放邏輯,如果成功,再執(zhí)行底層的釋放,如隊(duì)列移除,線程通知等等// 在 CountDownLatch 的實(shí)現(xiàn)中,只有 state == 0 時(shí)才會(huì)成功,所以它只會(huì)執(zhí)行一次底層釋放// 這也是我們認(rèn)為 CountDownLatch 能夠做到多線程同時(shí)執(zhí)行的效果的原因之一if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}/*** Release action for shared mode -- signals successor and ensures* propagation. (Note: For exclusive mode, release just amounts* to calling unparkSuccessor of head if it needs signal.)*/private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;// 隊(duì)列不為空才進(jìn)行釋放if (h != null && h != tail) {int ws = h.waitStatus;// 看過(guò)上面的 lock 邏輯,我們知道只要在阻塞狀態(tài),一定是 Node.SIGNALif (ws == Node.SIGNAL) {// 狀態(tài)改變成功,才進(jìn)行后續(xù)的喚醒邏輯// 因?yàn)橄雀淖儬顟B(tài)成功,才算是線程安全的,再進(jìn)行喚醒,否則進(jìn)入下一次循環(huán)再檢查if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck cases// 將頭節(jié)點(diǎn)的下一節(jié)點(diǎn)喚醒,如有必要unparkSuccessor(h);}// 這里的 propagates, 是要傳播啥呢??// 為什么只喚醒了一個(gè)線程,其他線程也可以動(dòng)了?else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}/*** Wakes up node's successor, if one exists.** @param node the node*/private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/// 喚醒下一個(gè)節(jié)點(diǎn)// 但如果下一節(jié)點(diǎn)已經(jīng)取消等待了,那么就找下一個(gè)沒(méi)最近的沒(méi)被取消的線程進(jìn)行喚醒// 喚醒只是針對(duì)一個(gè)線程的喲Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
重要3. 線程解鎖的傳播性?
因?yàn)閺纳弦还?jié)的講解中,我們看到,當(dāng)用戶調(diào)用 countDown 時(shí),僅僅是讓操作系統(tǒng)喚醒了 head 的下一個(gè)節(jié)點(diǎn)線程或者最近未取消的節(jié)點(diǎn)。那么,從哪里來(lái)的所有線程都獲取了鎖從而運(yùn)行呢?
其實(shí)是在 獲取鎖的過(guò)程中,還有一點(diǎn)我們未看清:
// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared/*** Acquires in shared uninterruptible mode.* @param arg the acquire argument*/private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {// 當(dāng)countDown被調(diào)用后,head節(jié)點(diǎn)被喚醒,執(zhí)行int r = tryAcquireShared(arg);if (r >= 0) {// 獲取到鎖后,設(shè)置node為下一個(gè)頭節(jié)點(diǎn),并把喚醒狀態(tài)傳播下去,而這里面肯定會(huì)做一些喚醒其他線程的操作,請(qǐng)看下文setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}/*** Sets head of queue, and checks if successor may be waiting* in shared mode, if so propagating if either propagate > 0 or* PROPAGATE status was set.** @param node the node* @param propagate the return value from a tryAcquireShared*/private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** Try to signal next queued node if:* Propagation was indicated by caller,* or was recorded (as h.waitStatus either before* or after setHead) by a previous operation* (note: this uses sign-check of waitStatus because* PROPAGATE status may transition to SIGNAL.)* and* The next node is waiting in shared mode,* or we don't know, because it appears null** The conservatism in both of these checks may cause* unnecessary wake-ups, but only when there are multiple* racing acquires/releases, so most need signals now or soon* anyway.*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {// 如果有必要,則做一次喚醒下一線程的操作// 在 countDown() 不會(huì)觸發(fā)此操作,所以這里只是一個(gè)內(nèi)部調(diào)用傳播Node s = node.next;if (s == null || s.isShared())// 此處鎖釋放邏輯如上,總之,又是另一次的喚醒觸發(fā)doReleaseShared();}}
到此,我們明白了它是怎么做到一個(gè)鎖釋放,所有線程可通行的。也從根本上回答了我們猜想,所有線程同時(shí)并發(fā)運(yùn)行。然而并沒(méi)有,它只是通過(guò)喚醒傳播性來(lái)依次喚醒各個(gè)等待線程的。從絕對(duì)時(shí)間性上來(lái)講,都是有先后關(guān)系的。以后可別再淺顯說(shuō)是同時(shí)執(zhí)行了喲。
三、 鎖的切換:wait/notify
上面看出,針對(duì)一個(gè)lock/unlock 的過(guò)程還是很簡(jiǎn)單的,由操作系統(tǒng)負(fù)責(zé)大頭,實(shí)現(xiàn)代碼也并不多。
但是針對(duì)稍微有點(diǎn)要求的場(chǎng)景,就會(huì)進(jìn)行條件式的操作。比如:持有某個(gè)鎖運(yùn)行一段代碼,但是,運(yùn)行時(shí)發(fā)現(xiàn)某條件不滿足,需要進(jìn)行等待而不能直接結(jié)束,直到條件成立。即所謂的 wait 操作。
乍一看,wait/notify 與 lock/unlock 很像,其實(shí)不然。區(qū)分主要是 lock/unlock 是針對(duì)整個(gè)代碼段的,而 wait/notify 則是針對(duì)某個(gè)條件的,即獲取了鎖不代表?xiàng)l件成立了,但是條件成立了一定要在鎖的前提下才能進(jìn)行安全操作。
那么,是否 wait/notify 也一樣的實(shí)現(xiàn)簡(jiǎn)單呢?比如java的最基礎(chǔ)類 Object 類就提供了 wait/notify 功能。
我們既然想一探究竟,還是以并發(fā)包下的實(shí)現(xiàn)作為基礎(chǔ)吧,畢竟 java 才是我們的強(qiáng)項(xiàng)。
本次,咱們以 ArrayBlockingQueue#put/take 作為基礎(chǔ)看下這種場(chǎng)景的使用先。
ArrayBlockingQueue 的put/take 特性就是,put當(dāng)隊(duì)列滿時(shí),一直阻塞,直到有可用位置才繼續(xù)運(yùn)行下一步。而take當(dāng)隊(duì)列為空時(shí)一樣阻塞,直到隊(duì)列里有數(shù)據(jù)才運(yùn)行下一步。這種場(chǎng)景使用鎖主不好搞了,因?yàn)檫@是一個(gè)條件判斷。put/take 如下:
// java.util.concurrent.ArrayBlockingQueue#put/*** Inserts the specified element at the tail of this queue, waiting* for space to become available if the queue is full.** @throws InterruptedException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 當(dāng)隊(duì)列滿時(shí),一直等待while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}// java.util.concurrent.ArrayBlockingQueue#takepublic E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {// 當(dāng)隊(duì)列為空時(shí)一直等待while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}
看起來(lái)相當(dāng)簡(jiǎn)單,完全符合人類思維。只是,這里使用的兩個(gè)變量進(jìn)行控制流程 notFull,notEmpty. 這兩個(gè)變量是如何進(jìn)行關(guān)聯(lián)的呢?
在這之前,我們還需要補(bǔ)充下上面的例子,即 notFull.await(), notEmpty.await(); 被阻塞了,何時(shí)才能運(yùn)行呢?如上代碼在各自的入隊(duì)和出隊(duì)完成之后進(jìn)行通知就可以了。
// 與 put 對(duì)應(yīng),入隊(duì)完成后,隊(duì)列自然就不為空了,通知下 notEmpty 就好了/*** Inserts element at current put position, advances, and signals.* Call only when holding lock.*/private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;// 我已放入一個(gè)元素,不為空了notEmpty.signal();}// 與 take 對(duì)應(yīng),出隊(duì)完成后,自然就不可能是滿的了,至少一個(gè)空余空間。/*** Extracts element at current take position, advances, and signals.* Call only when holding lock.*/private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();// 我已移除一個(gè)元素,肯定沒(méi)有滿了,你們繼續(xù)放入吧notFull.signal();return x;}
是不是超級(jí)好理解。是的。不過(guò),我們不是想看 ArrayBlockingQueue 是如何實(shí)現(xiàn)的,我們是要論清 wait/notify 是如何實(shí)現(xiàn)的。因?yàn)楫吘?,他們不是一個(gè)鎖那么簡(jiǎn)單。
// 三個(gè)鎖的關(guān)系,即 notEmpty, notFull 都是 ReentrantLock 的條件鎖,相當(dāng)于是其子集吧/** Main lock guarding all access */final ReentrantLock lock;/** Condition for waiting takes */private final Condition notEmpty;/** Condition for waiting puts */private final Condition notFull;public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}// lock.newCondition() 是什么鬼?它是 AQS 中實(shí)現(xiàn)的 ConditionObject// java.util.concurrent.locks.ReentrantLock#newConditionpublic Condition newCondition() {return sync.newCondition();}// java.util.concurrent.locks.ReentrantLock.Sync#newConditionfinal ConditionObject newCondition() {// AQS 中定義return new ConditionObject();}
接下來(lái),我們要帶著幾個(gè)疑問(wèn)來(lái)看這個(gè) Condition 的對(duì)象:
1. 它的 wait/notify 是如何實(shí)現(xiàn)的?
2. 它是如何與互相進(jìn)行聯(lián)系的?
3. 為什么 wait/notify 必須要在外面的lock獲取之后才能執(zhí)行?
4. 它與Object的wait/notify 有什么相同和不同點(diǎn)?
能夠回答了上面的問(wèn)題,基本上對(duì)其原理與實(shí)現(xiàn)也就理解得差不多了。
重點(diǎn)1. wait/notify 是如何實(shí)現(xiàn)的?
我們從上面可以看到,它是通過(guò)調(diào)用 await()/signal() 實(shí)現(xiàn)的,到底做事如何,且看下面。
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()/*** Implements interruptible condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled or interrupted.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 添加當(dāng)前線程到 等待線程隊(duì)列中,有 lastWaiter/firstWaiter 維護(hù)Node node = addConditionWaiter();// 釋放當(dāng)前l(fā)ock中持有的鎖,詳情且看下文int savedState = fullyRelease(node);// 從以下開(kāi)始,將不再保證線程安全性,因?yàn)楫?dāng)前的鎖已經(jīng)釋放,其他線程將會(huì)重新競(jìng)爭(zhēng)鎖使用int interruptMode = 0;// 循環(huán)判定,如果當(dāng)前節(jié)點(diǎn)不在 sync 同步隊(duì)列中,那么就反復(fù)阻塞自己// 所以判斷是否在 同步隊(duì)列上,是很重要的while (!isOnSyncQueue(node)) {// 沒(méi)有在同步隊(duì)列,阻塞LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 當(dāng)條件被滿足后,需要重新競(jìng)爭(zhēng)鎖,詳情看下文// 競(jìng)爭(zhēng)到鎖后,原樣返回到 wait 的原點(diǎn),繼續(xù)執(zhí)行業(yè)務(wù)邏輯if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;// 下面是異常處理,忽略if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}/*** Invokes release with current state value; returns saved state.* Cancels node and throws exception on failure.* @param node the condition node for this wait* @return previous sync state*/final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();// 預(yù)期的,都是釋放鎖成功,如果失敗,說(shuō)明當(dāng)前線程并并未獲取到鎖,引發(fā)異常if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}/*** Releases in exclusive mode. Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** @param arg the release argument. This value is conveyed to* {@link #tryRelease} but is otherwise uninterpreted and* can represent anything you like.* @return the value returned from {@link #tryRelease}*/public final boolean release(int arg) {// tryRelease 由客戶端自定義實(shí)現(xiàn)if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}// 如何判定當(dāng)前線程是否在同步隊(duì)列中或者可以進(jìn)行同步隊(duì)列?/*** Returns true if a node, always one that was initially placed on* a condition queue, is now waiting to reacquire on sync queue.* @param node the node* @return true if is reacquiring*/final boolean isOnSyncQueue(Node node) {// 如果上一節(jié)點(diǎn)還沒(méi)有被移除,當(dāng)前節(jié)點(diǎn)就不能被加入到同步隊(duì)列if (node.waitStatus == Node.CONDITION || node.prev == null)return false;// 如果當(dāng)前節(jié)點(diǎn)的下游節(jié)點(diǎn)已經(jīng)存在,則它自身必定已經(jīng)被移到同步隊(duì)列中if (node.next != null) // If has successor, it must be on queuereturn true;/** node.prev can be non-null, but not yet on queue because* the CAS to place it on queue can fail. So we have to* traverse from tail to make sure it actually made it. It* will always be near the tail in calls to this method, and* unless the CAS failed (which is unlikely), it will be* there, so we hardly ever traverse much.*/// 最終直接從同步隊(duì)列中查找,如果找到,則自身已經(jīng)在同步隊(duì)列中return findNodeFromTail(node);}/*** Returns true if node is on sync queue by searching backwards from tail.* Called only when needed by isOnSyncQueue.* @return true if present*/private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}}// 當(dāng)條件被滿足后,需要重新競(jìng)爭(zhēng)鎖,以保證外部的鎖語(yǔ)義,因?yàn)橹白约阂呀?jīng)將鎖主動(dòng)釋放// 這個(gè)鎖與 lock/unlock 時(shí)的一毛一樣,沒(méi)啥可講的// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued/*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}
總結(jié)一下 wait 的邏輯:
1. 前提:自身已獲取到外部鎖;
2. 將當(dāng)前線程添加到 ConditionQueue 等待隊(duì)列中;
3. 釋放已獲取到的鎖;
4. 反復(fù)檢查進(jìn)入等待,直到當(dāng)前節(jié)點(diǎn)被移動(dòng)到同步隊(duì)列中;
5. 條件滿足被喚醒,重新競(jìng)爭(zhēng)外部鎖,成功則返回,否則繼續(xù)阻塞;(外部鎖是同一個(gè),這也是要求兩個(gè)對(duì)象必須存在依賴關(guān)系的原因)
6. wait前線程持有鎖,wait后線程持有鎖,沒(méi)有一點(diǎn)外部鎖變化;
重點(diǎn)2. 厘清了 wait, 接下來(lái),我們看 signal() 通知喚醒的實(shí)現(xiàn):
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal/*** Moves the longest-waiting thread, if one exists, from the* wait queue for this condition to the wait queue for the* owning lock.** @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/public final void signal() {// 只有獲取鎖的實(shí)例,才可以進(jìn)行signal,否則你拿什么去保證線程安全呢if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;// 通知 firstWaiterif (first != null)doSignal(first);}/*** Removes and transfers nodes until hit non-cancelled one or* null. Split out from signal in part to encourage compilers* to inline the case of no waiters.* @param first (non-null) the first node on condition queue*/private void doSignal(Node first) {// 最多只轉(zhuǎn)移一個(gè) 節(jié)點(diǎn)do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}// 將一個(gè)節(jié)點(diǎn)從 等待隊(duì)列 移動(dòng)到 同步隊(duì)列中,即可參與下一輪競(jìng)爭(zhēng)// 只有確實(shí)移動(dòng)成功才會(huì)返回 true// 說(shuō)明:當(dāng)前線程是持有鎖的線程// java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal/*** Transfers a node from a condition queue onto sync queue.* Returns true if successful.* @param node the node* @return true if successfully transferred (else the node was* cancelled before signal)*/final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/// 同步隊(duì)列由 head/tail 指針維護(hù)Node p = enq(node);int ws = p.waitStatus;// 注意,此處正常情況下并不會(huì)喚醒等待線程,僅是將隊(duì)列轉(zhuǎn)移。// 因?yàn)楫?dāng)前線程的鎖保護(hù)區(qū)域并未完成,完成后自然會(huì)喚醒其他等待線程// 否則將會(huì)存在當(dāng)前線程任務(wù)還未執(zhí)行完成,卻被其他線程搶了先去,那接下來(lái)的任務(wù)當(dāng)如何??if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
總結(jié)一下,notify 的功能原理如下:
1. 前提:自身已獲取到外部鎖;
2. 轉(zhuǎn)移下一個(gè)等待隊(duì)列的節(jié)點(diǎn)到同步隊(duì)列中;
3. 如果遇到下一節(jié)點(diǎn)被取消情況,順延到再下一節(jié)點(diǎn)直到為空,至多轉(zhuǎn)移一個(gè)節(jié)點(diǎn);
4. 正常情況下不做線程的喚醒操作;
所以,實(shí)現(xiàn) wait/notify, 最關(guān)鍵的就是維護(hù)兩個(gè)隊(duì)列,等待隊(duì)列與同步隊(duì)列,而且都要求是在有外部鎖保證的情況下執(zhí)行。
到此,我們也能回答一個(gè)問(wèn)題:為什么wait/notify一定要在鎖模式下才能運(yùn)行?
因?yàn)閣ait是等待條件成立,此時(shí)必定存在競(jìng)爭(zhēng)需要做保護(hù),而它自身又必須釋放鎖以使外部條件可成立,且后續(xù)需要做恢復(fù)動(dòng)作;而notify之后可能還有后續(xù)工作必須保障安全,notify只是鎖的一個(gè)子集。。。
四、通知所有線程的實(shí)現(xiàn):notifyAll
有時(shí)條件成立后,可以允許所有線程通行,這時(shí)就可以進(jìn)行 notifyAll, 那么如果達(dá)到通知所有的目的呢?是一起通知還是??
以下是 AQS 中的實(shí)現(xiàn):
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAllpublic final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);}/*** Removes and transfers all nodes.* @param first (non-null) the first node on condition queue*/private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);}
可以看到,它是通過(guò)遍歷所有節(jié)點(diǎn),依次轉(zhuǎn)移等待隊(duì)列到同步隊(duì)列(通知)的,原本就沒(méi)有人能同時(shí)干幾件事的!
本文從java實(shí)現(xiàn)的角度去解析同步鎖的原理與實(shí)現(xiàn),但并不局限于java。道理總是相通的,只是像操作系統(tǒng)這樣的大佬,能干的活更純粹:比如讓cpu根本不用調(diào)度一個(gè)線程。

騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)
面試:史上最全多線程面試題 !
最新阿里內(nèi)推Java后端面試題
JVM難學(xué)?那是因?yàn)槟銢](méi)認(rèn)真看完這篇文章

關(guān)注作者微信公眾號(hào) —《JAVA爛豬皮》
了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力
作者:等你歸去來(lái)
出處:https://www.cnblogs.com/yougewe/p/11922194.html
