<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          同步鎖基本原理與實(shí)現(xiàn)

          共 28715字,需瀏覽 58分鐘

           ·

          2021-03-26 16:05

          走過(guò)路過(guò)不要錯(cuò)過(guò)

          點(diǎn)擊藍(lán)字關(guān)注我們

          為充分利用機(jī)器性能,人們發(fā)明了多線程。但同時(shí)帶來(lái)了線程安全問(wèn)題,于是人們又發(fā)明了同步鎖。

          這個(gè)問(wèn)題自然人人知道,但你真的了解同步鎖嗎?還是說(shuō)你會(huì)用其中的上鎖與解鎖功能?

          今天我們就一起來(lái)深入看同步鎖的原理和實(shí)現(xiàn)吧!

          一:同步鎖的職責(zé)

          同步鎖的職責(zé)可以說(shuō)就一個(gè),限制資源的使用(線程安全從屬)。

          它一般至少會(huì)包含兩個(gè)功能: 1. 給資源加鎖;2. 給資源解鎖;另外,它一般還有 等待/通知 即 wait/notify 的功能;

          同步鎖的應(yīng)用場(chǎng)景:多個(gè)線程同時(shí)操作一個(gè)事務(wù)必須保證正確性;一個(gè)資源只能同時(shí)由一線程訪問(wèn)操作;一個(gè)資源最多只能接入k的并發(fā)訪問(wèn);保證訪問(wèn)的順序性;

          同步鎖的實(shí)現(xiàn)方式:操作系統(tǒng)調(diào)度實(shí)現(xiàn);應(yīng)用自行實(shí)現(xiàn);CAS自旋;

          同步鎖的幾個(gè)問(wèn)題:

          為什么它能保證線程安全?

          鎖等待耗CPU嗎?

          使用鎖后性能下降嚴(yán)重的原因是啥?

          二:同步鎖的實(shí)現(xiàn)一:lock/unlock

          其實(shí)對(duì)于應(yīng)用層來(lái)說(shuō),非常多就是 lock/unlock , 這也是鎖的核心。

          AQS 是java中很多鎖實(shí)現(xiàn)的基礎(chǔ),因?yàn)樗帘瘟撕芏喾彪s而底層的阻塞操作,為上層抽象出易用的接口。

          我們就以AQS作為跳板,先來(lái)看一下上鎖的過(guò)程。為不至于陷入具體鎖的業(yè)務(wù)邏輯中,我們先以最簡(jiǎn)單的 CountDownLatch 看看。

              // 先看看 CountDownLatch 的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu),可以說(shuō)是不能再簡(jiǎn)單了,就繼承了 AQS,然后簡(jiǎn)單覆寫(xiě)了幾個(gè)必要方法。    // java.util.concurrent.CountDownLatch.Sync    /**     * Synchronization control For CountDownLatch.     * Uses AQS state to represent count.     */    private static final class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = 4982264981922014374L;
          Sync(int count) { setState(count); }
          int getCount() { return getState(); }
          protected int tryAcquireShared(int acquires) { // 只有一種情況會(huì)獲取鎖成功,即 state == 0 的時(shí)候 return (getState() == 0) ? 1 : -1; }
          protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; // 原始的鎖數(shù)量是在初始化時(shí)指定的不可變的,每次釋放一個(gè)鎖標(biāo)識(shí) int nextc = c-1; if (compareAndSetState(c, nextc)) // 只有一情況會(huì)釋放鎖成功,即本次釋放后 state == 0 return nextc == 0; } } } private final Sync sync;

          重點(diǎn)1,我們看看上鎖過(guò)程,即 await() 的調(diào)用。

              public void await() throws InterruptedException {        // 調(diào)用 AQS 的接口,由AQS實(shí)現(xiàn)了鎖的骨架邏輯        sync.acquireSharedInterruptibly(1);    }
          // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly /** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 首先嘗試獲取鎖,如果成功就不用阻塞了 // 而從上面的邏輯我們看到,獲取鎖相當(dāng)之簡(jiǎn)單,所以,獲取鎖本身并沒(méi)有太多的性能消耗喲 // 如果獲取鎖失敗,則會(huì)進(jìn)行稍后嘗試,這應(yīng)該是復(fù)雜而精巧的 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
          /** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 首先將當(dāng)前線程添加排隊(duì)隊(duì)尾,此處會(huì)保證線程安全,稍后我們可以看到 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 獲取其上一節(jié)點(diǎn),如果上一節(jié)點(diǎn)是頭節(jié)點(diǎn),就代表當(dāng)前線程可以再次嘗試獲取鎖了 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 先檢測(cè)是否需要阻塞,然后再進(jìn)行阻塞等待,阻塞由 LockSupport 底層支持 // 如果阻塞后,將不會(huì)主動(dòng)喚醒,只會(huì)由 unlock 時(shí),主動(dòng)被通知 // 因此,此處即是獲取鎖的最終等待點(diǎn) // 操作系統(tǒng)將不會(huì)再次調(diào)度到本線程,直到獲取到鎖 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
          // 如此線程安全地添加當(dāng)前線程到隊(duì)尾?CAS 保證 /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
          // 檢測(cè)是否需要進(jìn)行阻塞 /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ // 只有前置節(jié)點(diǎn)是 SIGNAL 狀態(tài)的節(jié)點(diǎn),才需要進(jìn)行 阻塞等待,當(dāng)然前置節(jié)點(diǎn)會(huì)在下一次循環(huán)中被設(shè)置好 return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
          // park 阻塞實(shí)現(xiàn) /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { // 將當(dāng)前 AQS 實(shí)例作為鎖對(duì)象 blocker, 進(jìn)行操作系統(tǒng)調(diào)用阻塞, 所以所有等待鎖的線程將會(huì)在同一個(gè)鎖前提下執(zhí)行 LockSupport.park(this); return Thread.interrupted(); }

          如上,上鎖過(guò)程是比較簡(jiǎn)單明了的。加入一隊(duì)列,然后由操作系統(tǒng)將線程調(diào)出。(那么操作系統(tǒng)是如何把線程調(diào)出的呢?有興趣自行研究)

          重點(diǎn)2. 解鎖過(guò)程,即 countDown() 調(diào)用

              public void countDown() {        // 同樣直接調(diào)用 AQS 的接口,由AQS實(shí)現(xiàn)了鎖的釋放骨架邏輯        sync.releaseShared(1);    }    // java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared    /**     * Releases in shared mode.  Implemented by unblocking one or more     * threads if {@link #tryReleaseShared} returns true.     *     * @param arg the release argument.  This value is conveyed to     *        {@link #tryReleaseShared} but is otherwise uninterpreted     *        and can represent anything you like.     * @return the value returned from {@link #tryReleaseShared}     */    public final boolean releaseShared(int arg) {        // 調(diào)用業(yè)務(wù)實(shí)現(xiàn)的釋放邏輯,如果成功,再執(zhí)行底層的釋放,如隊(duì)列移除,線程通知等等        // 在 CountDownLatch 的實(shí)現(xiàn)中,只有 state == 0 時(shí)才會(huì)成功,所以它只會(huì)執(zhí)行一次底層釋放        // 這也是我們認(rèn)為 CountDownLatch 能夠做到多線程同時(shí)執(zhí)行的效果的原因之一        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    }
          /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; // 隊(duì)列不為空才進(jìn)行釋放 if (h != null && h != tail) { int ws = h.waitStatus; // 看過(guò)上面的 lock 邏輯,我們知道只要在阻塞狀態(tài),一定是 Node.SIGNAL if (ws == Node.SIGNAL) { // 狀態(tài)改變成功,才進(jìn)行后續(xù)的喚醒邏輯 // 因?yàn)橄雀淖儬顟B(tài)成功,才算是線程安全的,再進(jìn)行喚醒,否則進(jìn)入下一次循環(huán)再檢查 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 將頭節(jié)點(diǎn)的下一節(jié)點(diǎn)喚醒,如有必要 unparkSuccessor(h); } // 這里的 propagates, 是要傳播啥呢?? // 為什么只喚醒了一個(gè)線程,其他線程也可以動(dòng)了? else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } /** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
          /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 喚醒下一個(gè)節(jié)點(diǎn) // 但如果下一節(jié)點(diǎn)已經(jīng)取消等待了,那么就找下一個(gè)沒(méi)最近的沒(méi)被取消的線程進(jìn)行喚醒 // 喚醒只是針對(duì)一個(gè)線程的喲 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

          重要3. 線程解鎖的傳播性?

          因?yàn)閺纳弦还?jié)的講解中,我們看到,當(dāng)用戶調(diào)用 countDown 時(shí),僅僅是讓操作系統(tǒng)喚醒了 head 的下一個(gè)節(jié)點(diǎn)線程或者最近未取消的節(jié)點(diǎn)。那么,從哪里來(lái)的所有線程都獲取了鎖從而運(yùn)行呢?

          其實(shí)是在 獲取鎖的過(guò)程中,還有一點(diǎn)我們未看清:


          // java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared /** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { // 當(dāng)countDown被調(diào)用后,head節(jié)點(diǎn)被喚醒,執(zhí)行 int r = tryAcquireShared(arg); if (r >= 0) { // 獲取到鎖后,設(shè)置node為下一個(gè)頭節(jié)點(diǎn),并把喚醒狀態(tài)傳播下去,而這里面肯定會(huì)做一些喚醒其他線程的操作,請(qǐng)看下文 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
          /** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 如果有必要,則做一次喚醒下一線程的操作 // 在 countDown() 不會(huì)觸發(fā)此操作,所以這里只是一個(gè)內(nèi)部調(diào)用傳播 Node s = node.next; if (s == null || s.isShared()) // 此處鎖釋放邏輯如上,總之,又是另一次的喚醒觸發(fā) doReleaseShared(); } }

          到此,我們明白了它是怎么做到一個(gè)鎖釋放,所有線程可通行的。也從根本上回答了我們猜想,所有線程同時(shí)并發(fā)運(yùn)行。然而并沒(méi)有,它只是通過(guò)喚醒傳播性來(lái)依次喚醒各個(gè)等待線程的。從絕對(duì)時(shí)間性上來(lái)講,都是有先后關(guān)系的。以后可別再淺顯說(shuō)是同時(shí)執(zhí)行了喲。

          三、 鎖的切換:wait/notify

          上面看出,針對(duì)一個(gè)lock/unlock 的過(guò)程還是很簡(jiǎn)單的,由操作系統(tǒng)負(fù)責(zé)大頭,實(shí)現(xiàn)代碼也并不多。

          但是針對(duì)稍微有點(diǎn)要求的場(chǎng)景,就會(huì)進(jìn)行條件式的操作。比如:持有某個(gè)鎖運(yùn)行一段代碼,但是,運(yùn)行時(shí)發(fā)現(xiàn)某條件不滿足,需要進(jìn)行等待而不能直接結(jié)束,直到條件成立。即所謂的 wait 操作。

          乍一看,wait/notify 與 lock/unlock 很像,其實(shí)不然。區(qū)分主要是 lock/unlock 是針對(duì)整個(gè)代碼段的,而 wait/notify 則是針對(duì)某個(gè)條件的,即獲取了鎖不代表?xiàng)l件成立了,但是條件成立了一定要在鎖的前提下才能進(jìn)行安全操作。

          那么,是否 wait/notify 也一樣的實(shí)現(xiàn)簡(jiǎn)單呢?比如java的最基礎(chǔ)類 Object 類就提供了 wait/notify 功能。

          我們既然想一探究竟,還是以并發(fā)包下的實(shí)現(xiàn)作為基礎(chǔ)吧,畢竟 java 才是我們的強(qiáng)項(xiàng)。

          本次,咱們以  ArrayBlockingQueue#put/take 作為基礎(chǔ)看下這種場(chǎng)景的使用先。

          ArrayBlockingQueue 的put/take 特性就是,put當(dāng)隊(duì)列滿時(shí),一直阻塞,直到有可用位置才繼續(xù)運(yùn)行下一步。而take當(dāng)隊(duì)列為空時(shí)一樣阻塞,直到隊(duì)列里有數(shù)據(jù)才運(yùn)行下一步。這種場(chǎng)景使用鎖主不好搞了,因?yàn)檫@是一個(gè)條件判斷。put/take 如下:

              // java.util.concurrent.ArrayBlockingQueue#put    /**     * Inserts the specified element at the tail of this queue, waiting     * for space to become available if the queue is full.     *     * @throws InterruptedException {@inheritDoc}     * @throws NullPointerException {@inheritDoc}     */    public void put(E e) throws InterruptedException {        checkNotNull(e);        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            // 當(dāng)隊(duì)列滿時(shí),一直等待            while (count == items.length)                notFull.await();            enqueue(e);        } finally {            lock.unlock();        }    }
          // java.util.concurrent.ArrayBlockingQueue#take public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 當(dāng)隊(duì)列為空時(shí)一直等待 while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }

          看起來(lái)相當(dāng)簡(jiǎn)單,完全符合人類思維。只是,這里使用的兩個(gè)變量進(jìn)行控制流程 notFull,notEmpty. 這兩個(gè)變量是如何進(jìn)行關(guān)聯(lián)的呢?

          在這之前,我們還需要補(bǔ)充下上面的例子,即 notFull.await(), notEmpty.await(); 被阻塞了,何時(shí)才能運(yùn)行呢?如上代碼在各自的入隊(duì)和出隊(duì)完成之后進(jìn)行通知就可以了。

              // 與 put 對(duì)應(yīng),入隊(duì)完成后,隊(duì)列自然就不為空了,通知下 notEmpty 就好了    /**     * Inserts element at current put position, advances, and signals.     * Call only when holding lock.     */    private void enqueue(E x) {        // assert lock.getHoldCount() == 1;        // assert items[putIndex] == null;        final Object[] items = this.items;        items[putIndex] = x;        if (++putIndex == items.length)            putIndex = 0;        count++;        // 我已放入一個(gè)元素,不為空了        notEmpty.signal();    }    // 與 take 對(duì)應(yīng),出隊(duì)完成后,自然就不可能是滿的了,至少一個(gè)空余空間。    /**     * Extracts element at current take position, advances, and signals.     * Call only when holding lock.     */    private E dequeue() {        // assert lock.getHoldCount() == 1;        // assert items[takeIndex] != null;        final Object[] items = this.items;        @SuppressWarnings("unchecked")        E x = (E) items[takeIndex];        items[takeIndex] = null;        if (++takeIndex == items.length)            takeIndex = 0;        count--;        if (itrs != null)            itrs.elementDequeued();        // 我已移除一個(gè)元素,肯定沒(méi)有滿了,你們繼續(xù)放入吧        notFull.signal();        return x;    }

          是不是超級(jí)好理解。是的。不過(guò),我們不是想看 ArrayBlockingQueue 是如何實(shí)現(xiàn)的,我們是要論清 wait/notify 是如何實(shí)現(xiàn)的。因?yàn)楫吘?,他們不是一個(gè)鎖那么簡(jiǎn)單。

              // 三個(gè)鎖的關(guān)系,即 notEmpty, notFull 都是 ReentrantLock 的條件鎖,相當(dāng)于是其子集吧    /** Main lock guarding all access */    final ReentrantLock lock;
          /** Condition for waiting takes */ private final Condition notEmpty;
          /** Condition for waiting puts */ private final Condition notFull;
          public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } // lock.newCondition() 是什么鬼?它是 AQS 中實(shí)現(xiàn)的 ConditionObject // java.util.concurrent.locks.ReentrantLock#newCondition public Condition newCondition() { return sync.newCondition(); } // java.util.concurrent.locks.ReentrantLock.Sync#newCondition final ConditionObject newCondition() { // AQS 中定義 return new ConditionObject(); }

          接下來(lái),我們要帶著幾個(gè)疑問(wèn)來(lái)看這個(gè) Condition 的對(duì)象:

              1. 它的 wait/notify 是如何實(shí)現(xiàn)的?
              2. 它是如何與互相進(jìn)行聯(lián)系的?
              3. 為什么 wait/notify 必須要在外面的lock獲取之后才能執(zhí)行?
              4. 它與Object的wait/notify 有什么相同和不同點(diǎn)?

          能夠回答了上面的問(wèn)題,基本上對(duì)其原理與實(shí)現(xiàn)也就理解得差不多了。


          重點(diǎn)1. wait/notify 是如何實(shí)現(xiàn)的?

          我們從上面可以看到,它是通過(guò)調(diào)用 await()/signal() 實(shí)現(xiàn)的,到底做事如何,且看下面。

                  // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()        /**         * Implements interruptible condition wait.         * <ol>         * <li> If current thread is interrupted, throw InterruptedException.         * <li> Save lock state returned by {@link #getState}.         * <li> Invoke {@link #release} with saved state as argument,         *      throwing IllegalMonitorStateException if it fails.         * <li> Block until signalled or interrupted.         * <li> Reacquire by invoking specialized version of         *      {@link #acquire} with saved state as argument.         * <li> If interrupted while blocked in step 4, throw InterruptedException.         * </ol>         */        public final void await() throws InterruptedException {            if (Thread.interrupted())                throw new InterruptedException();            // 添加當(dāng)前線程到 等待線程隊(duì)列中,有 lastWaiter/firstWaiter 維護(hù)            Node node = addConditionWaiter();            // 釋放當(dāng)前l(fā)ock中持有的鎖,詳情且看下文            int savedState = fullyRelease(node);            // 從以下開(kāi)始,將不再保證線程安全性,因?yàn)楫?dāng)前的鎖已經(jīng)釋放,其他線程將會(huì)重新競(jìng)爭(zhēng)鎖使用            int interruptMode = 0;            // 循環(huán)判定,如果當(dāng)前節(jié)點(diǎn)不在 sync 同步隊(duì)列中,那么就反復(fù)阻塞自己            // 所以判斷是否在 同步隊(duì)列上,是很重要的            while (!isOnSyncQueue(node)) {                // 沒(méi)有在同步隊(duì)列,阻塞                LockSupport.park(this);                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)                    break;            }            // 當(dāng)條件被滿足后,需要重新競(jìng)爭(zhēng)鎖,詳情看下文            // 競(jìng)爭(zhēng)到鎖后,原樣返回到 wait 的原點(diǎn),繼續(xù)執(zhí)行業(yè)務(wù)邏輯            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)                interruptMode = REINTERRUPT;            // 下面是異常處理,忽略            if (node.nextWaiter != null) // clean up if cancelled                unlinkCancelledWaiters();            if (interruptMode != 0)                reportInterruptAfterWait(interruptMode);        }    /**     * Invokes release with current state value; returns saved state.     * Cancels node and throws exception on failure.     * @param node the condition node for this wait     * @return previous sync state     */    final int fullyRelease(Node node) {        boolean failed = true;        try {            int savedState = getState();            // 預(yù)期的,都是釋放鎖成功,如果失敗,說(shuō)明當(dāng)前線程并并未獲取到鎖,引發(fā)異常            if (release(savedState)) {                failed = false;                return savedState;            } else {                throw new IllegalMonitorStateException();            }        } finally {            if (failed)                node.waitStatus = Node.CANCELLED;        }    }    /**     * Releases in exclusive mode.  Implemented by unblocking one or     * more threads if {@link #tryRelease} returns true.     * This method can be used to implement method {@link Lock#unlock}.     *     * @param arg the release argument.  This value is conveyed to     *        {@link #tryRelease} but is otherwise uninterpreted and     *        can represent anything you like.     * @return the value returned from {@link #tryRelease}     */    public final boolean release(int arg) {        // tryRelease 由客戶端自定義實(shí)現(xiàn)        if (tryRelease(arg)) {            Node h = head;            if (h != null && h.waitStatus != 0)                unparkSuccessor(h);            return true;        }        return false;    }
          // 如何判定當(dāng)前線程是否在同步隊(duì)列中或者可以進(jìn)行同步隊(duì)列? /** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ final boolean isOnSyncQueue(Node node) { // 如果上一節(jié)點(diǎn)還沒(méi)有被移除,當(dāng)前節(jié)點(diǎn)就不能被加入到同步隊(duì)列 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果當(dāng)前節(jié)點(diǎn)的下游節(jié)點(diǎn)已經(jīng)存在,則它自身必定已經(jīng)被移到同步隊(duì)列中 if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ // 最終直接從同步隊(duì)列中查找,如果找到,則自身已經(jīng)在同步隊(duì)列中 return findNodeFromTail(node); }
          /** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
          // 當(dāng)條件被滿足后,需要重新競(jìng)爭(zhēng)鎖,以保證外部的鎖語(yǔ)義,因?yàn)橹白约阂呀?jīng)將鎖主動(dòng)釋放 // 這個(gè)鎖與 lock/unlock 時(shí)的一毛一樣,沒(méi)啥可講的 // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

          總結(jié)一下 wait 的邏輯:

          1. 前提:自身已獲取到外部鎖;
          2. 將當(dāng)前線程添加到 ConditionQueue 等待隊(duì)列中;
          3. 釋放已獲取到的鎖;
          4. 反復(fù)檢查進(jìn)入等待,直到當(dāng)前節(jié)點(diǎn)被移動(dòng)到同步隊(duì)列中;
          5. 條件滿足被喚醒,重新競(jìng)爭(zhēng)外部鎖,成功則返回,否則繼續(xù)阻塞;(外部鎖是同一個(gè),這也是要求兩個(gè)對(duì)象必須存在依賴關(guān)系的原因)
          6. wait前線程持有鎖,wait后線程持有鎖,沒(méi)有一點(diǎn)外部鎖變化;


          重點(diǎn)2. 厘清了 wait, 接下來(lái),我們看 signal() 通知喚醒的實(shí)現(xiàn):

                  // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal        /**         * Moves the longest-waiting thread, if one exists, from the         * wait queue for this condition to the wait queue for the         * owning lock.         *         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}         *         returns {@code false}         */        public final void signal() {            // 只有獲取鎖的實(shí)例,才可以進(jìn)行signal,否則你拿什么去保證線程安全呢            if (!isHeldExclusively())                throw new IllegalMonitorStateException();            Node first = firstWaiter;            // 通知 firstWaiter             if (first != null)                doSignal(first);        }
          /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { // 最多只轉(zhuǎn)移一個(gè) 節(jié)點(diǎn) do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } // 將一個(gè)節(jié)點(diǎn)從 等待隊(duì)列 移動(dòng)到 同步隊(duì)列中,即可參與下一輪競(jìng)爭(zhēng) // 只有確實(shí)移動(dòng)成功才會(huì)返回 true // 說(shuō)明:當(dāng)前線程是持有鎖的線程 // java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal /** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;
          /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ // 同步隊(duì)列由 head/tail 指針維護(hù) Node p = enq(node); int ws = p.waitStatus; // 注意,此處正常情況下并不會(huì)喚醒等待線程,僅是將隊(duì)列轉(zhuǎn)移。 // 因?yàn)楫?dāng)前線程的鎖保護(hù)區(qū)域并未完成,完成后自然會(huì)喚醒其他等待線程 // 否則將會(huì)存在當(dāng)前線程任務(wù)還未執(zhí)行完成,卻被其他線程搶了先去,那接下來(lái)的任務(wù)當(dāng)如何?? if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }

          總結(jié)一下,notify 的功能原理如下:

              1. 前提:自身已獲取到外部鎖;
              2. 轉(zhuǎn)移下一個(gè)等待隊(duì)列的節(jié)點(diǎn)到同步隊(duì)列中;
              3. 如果遇到下一節(jié)點(diǎn)被取消情況,順延到再下一節(jié)點(diǎn)直到為空,至多轉(zhuǎn)移一個(gè)節(jié)點(diǎn);
              4. 正常情況下不做線程的喚醒操作;

          所以,實(shí)現(xiàn) wait/notify, 最關(guān)鍵的就是維護(hù)兩個(gè)隊(duì)列,等待隊(duì)列與同步隊(duì)列,而且都要求是在有外部鎖保證的情況下執(zhí)行。

          到此,我們也能回答一個(gè)問(wèn)題:為什么wait/notify一定要在鎖模式下才能運(yùn)行?

          因?yàn)閣ait是等待條件成立,此時(shí)必定存在競(jìng)爭(zhēng)需要做保護(hù),而它自身又必須釋放鎖以使外部條件可成立,且后續(xù)需要做恢復(fù)動(dòng)作;而notify之后可能還有后續(xù)工作必須保障安全,notify只是鎖的一個(gè)子集。。。

          四、通知所有線程的實(shí)現(xiàn):notifyAll

          有時(shí)條件成立后,可以允許所有線程通行,這時(shí)就可以進(jìn)行 notifyAll, 那么如果達(dá)到通知所有的目的呢?是一起通知還是??

          以下是 AQS 中的實(shí)現(xiàn):

                  // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll        public final void signalAll() {            if (!isHeldExclusively())                throw new IllegalMonitorStateException();            Node first = firstWaiter;            if (first != null)                doSignalAll(first);        }        /**         * Removes and transfers all nodes.         * @param first (non-null) the first node on condition queue         */        private void doSignalAll(Node first) {            lastWaiter = firstWaiter = null;            do {                Node next = first.nextWaiter;                first.nextWaiter = null;                transferForSignal(first);                first = next;            } while (first != null);        }

          可以看到,它是通過(guò)遍歷所有節(jié)點(diǎn),依次轉(zhuǎn)移等待隊(duì)列到同步隊(duì)列(通知)的,原本就沒(méi)有人能同時(shí)干幾件事的!

          本文從java實(shí)現(xiàn)的角度去解析同步鎖的原理與實(shí)現(xiàn),但并不局限于java。道理總是相通的,只是像操作系統(tǒng)這樣的大佬,能干的活更純粹:比如讓cpu根本不用調(diào)度一個(gè)線程。




          往期精彩推薦



          騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)

          面試:史上最全多線程面試題 !

          最新阿里內(nèi)推Java后端面試題

          JVM難學(xué)?那是因?yàn)槟銢](méi)認(rèn)真看完這篇文章


          END


          關(guān)注作者微信公眾號(hào) —《JAVA爛豬皮》


          了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


          你點(diǎn)的每個(gè)好看,我都認(rèn)真當(dāng)成了


          看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力


          作者:等你歸去來(lái)

          出處:https://www.cnblogs.com/yougewe/p/11922194.html

          瀏覽 44
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产18禁黄网站禁片免费视频 | 看全色黄大色大片 | 深夜精品福利 | 亚洲无码一区在线 | wwwjizz国产 |