<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          AQS解析與實(shí)戰(zhàn)

          共 12900字,需瀏覽 26分鐘

           ·

          2021-05-21 20:18

          走過(guò)路過(guò)不要錯(cuò)過(guò)

          點(diǎn)擊藍(lán)字關(guān)注我們


          前言


          前段時(shí)間在面試,發(fā)現(xiàn)面試官都有問(wèn)到同步器AQS的相關(guān)問(wèn)題。AQS為Java中幾乎所有的鎖和同步器提供一個(gè)基礎(chǔ)框架,派生出如ReentrantLock、Semaphore、CountDownLatch等AQS全家桶。本文基于AQS原理的幾個(gè)核心點(diǎn),談?wù)剬?duì)AbstractQueuedSynchronizer的理解,并實(shí)現(xiàn)一個(gè)自定義同步器。


          AQS原理面試題的核心回答要點(diǎn)


          1. state 狀態(tài)的維護(hù)。

          2. CLH隊(duì)列

          3. ConditionObject通知

          4. 模板方法設(shè)計(jì)模式

          5. 獨(dú)占與共享模式。

          6. 自定義同步器。

          7. AQS全家桶的一些延伸,如:ReentrantLock等。


          AQS的類圖結(jié)構(gòu)


          AQS全稱是AbstractQueuedSynchronizer,即抽象同步隊(duì)列。下面看一下AQS的類圖結(jié)構(gòu):




          為了方便下面幾個(gè)關(guān)鍵點(diǎn)的理解,大家先熟悉一下AQS的類圖結(jié)構(gòu)。


          state 狀態(tài)的維護(hù)

          在AQS中維持了一個(gè)單一的共享狀態(tài)state,來(lái)實(shí)現(xiàn)同步器同步??匆幌聅tate的相關(guān)代碼如下:

          state源碼


          /** * The synchronization state. */private volatile int state;
          /** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */protected final int getState() {return state; }
          /** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */protected final void setState(int newState) { state = newState; }
          /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);  }

          state 源碼設(shè)計(jì)幾個(gè)回答要點(diǎn):

          • state用volatile修飾,保證多線程中的可見(jiàn)性。

          • getState()和setState()方法采用final修飾,限制AQS的子類重寫(xiě)它們兩。

          • compareAndSetState()方法采用樂(lè)觀鎖思想的CAS算法,也是采用final修飾的,不允許子類重寫(xiě)。


          CLH隊(duì)列


          談到CLH隊(duì)列,我們結(jié)合以上state狀態(tài),先來(lái)看一下AQS原理圖





          CLH(Craig, Landin, and Hagersten locks) 同步隊(duì)列 是一個(gè)FIFO雙向隊(duì)列,其內(nèi)部通過(guò)節(jié)點(diǎn)head和tail記錄隊(duì)首和隊(duì)尾元素,隊(duì)列元素的類型為Node。AQS依賴它來(lái)完成同步狀態(tài)state的管理,當(dāng)前線程如果獲取同步狀態(tài)失敗時(shí),AQS則會(huì)將當(dāng)前線程已經(jīng)等待狀態(tài)等信息構(gòu)造成一個(gè)節(jié)點(diǎn)(Node)并將其加入到CLH同步隊(duì)列,同時(shí)會(huì)阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時(shí),會(huì)把首節(jié)點(diǎn)喚醒(公平鎖),使其再次嘗試獲取同步狀態(tài)。


          Node節(jié)點(diǎn)


          CLH同步隊(duì)列中,一個(gè)節(jié)點(diǎn)表示一個(gè)線程,它保存著線程的引用(thread)、狀態(tài)(waitStatus)、前驅(qū)節(jié)點(diǎn)(prev)、后繼節(jié)點(diǎn)(next),condition隊(duì)列的后續(xù)節(jié)點(diǎn)(nextWaiter)如下圖:





          waitStatus幾種狀態(tài)狀態(tài):





          我們?cè)倏匆幌翪LH隊(duì)列入列以及出列的代碼:


          入列


          CLH隊(duì)列入列就是tail指向新節(jié)點(diǎn)、新節(jié)點(diǎn)的prev指向當(dāng)前最后的節(jié)點(diǎn),當(dāng)前最后一個(gè)節(jié)點(diǎn)的next指向當(dāng)前節(jié)點(diǎn)。addWaiter方法如下:

          //構(gòu)造Nodeprivate Node addWaiter(Node mode) {  Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure(快速嘗試添加尾節(jié)點(diǎn))        Node pred = tail;if (pred != null) {            node.prev = pred;//CAS設(shè)置尾節(jié)點(diǎn)if (compareAndSetTail(pred, node)) {                pred.next = node;return node;            }        }//多次嘗試        enq(node);return node;        }

          由以上代碼可得,addWaiter設(shè)置尾節(jié)點(diǎn)失敗的話,調(diào)用enq(Node node)方法設(shè)置尾節(jié)點(diǎn),enq方法如下:

          private Node enq(final Node node) {//死循環(huán)嘗試,知道成功為止for (;;) {            Node t = tail;//tail 不存在,設(shè)置為首節(jié)點(diǎn)if (t == null) { // Must initializeif (compareAndSetHead(new Node()))                    tail = head;            } else {                node.prev = t;if (compareAndSetTail(t, node)) {                    t.next = node;return t;                }            }        }    }

          出列

          首節(jié)點(diǎn)的線程釋放同步狀態(tài)后,將會(huì)喚醒它的后繼節(jié)點(diǎn)(next),而后繼節(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置為首節(jié)點(diǎn)??梢钥匆幌乱韵聝啥卧创a:

            Node h = head;if (h != null && h.waitStatus != 0)  unparkSuccessor(h);
           private void unparkSuccessor(Node node) {        /*         * If status is negative (i.e., possibly needing signal) try         * to clear in anticipation of signalling.  It is OK if this         * fails or if status is changed by waiting thread.         */        int ws = node.waitStatus;if (ws < 0)            compareAndSetWaitStatus(node, ws, 0);
          /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next;if (s == null || s.waitStatus > 0) { s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0) s = t; }if (s != null) LockSupport.unpark(s.thread);    }

          CLH核心幾個(gè)回答要點(diǎn)

          • 雙向鏈表入列出列

          • CAS算法設(shè)置尾節(jié)點(diǎn)+死循環(huán)自旋。


          ConditionObject


          ConditionObject簡(jiǎn)介


          我們都知道,synchronized控制同步的時(shí)候,可以配合Object的wait()、notify(),notifyAll() 系列方法可以實(shí)現(xiàn)等待/通知模式。而Lock呢?它提供了條件Condition接口,配合await(),signal(),signalAll() 等方法也可以實(shí)現(xiàn)等待/通知機(jī)制。ConditionObject實(shí)現(xiàn)了Condition接口,給AQS提供條件變量的支持 。


          Condition隊(duì)列與CLH隊(duì)列的那些事


          我們先來(lái)看一下圖:





          ConditionObject隊(duì)列與CLH隊(duì)列的愛(ài)恨情仇:

          • 調(diào)用了await()方法的線程,會(huì)被加入到conditionObject等待隊(duì)列中,并且喚醒CLH隊(duì)列中head節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)。

          • 線程在某個(gè)ConditionObject對(duì)象上調(diào)用了singnal()方法后,等待隊(duì)列中的firstWaiter會(huì)被加入到AQS的CLH隊(duì)列中,等待被喚醒。

          • 當(dāng)線程調(diào)用unLock()方法釋放鎖時(shí),CLH隊(duì)列中的head節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)(在本例中是firtWaiter),會(huì)被喚醒。

          區(qū)別:

          • ConditionObject對(duì)象都維護(hù)了一個(gè)單獨(dú)的等待隊(duì)列 ,AQS所維護(hù)的CLH隊(duì)列是同步隊(duì)列,它們節(jié)點(diǎn)類型相同,都是Node。

          獨(dú)占與共享模式。


          AQS支持兩種同步模式:獨(dú)占式和共享式。


          獨(dú)占式


          同一時(shí)刻僅有一個(gè)線程持有同步狀態(tài),如ReentrantLock。又可分為公平鎖和非公平鎖。

          公平鎖: 按照線程在隊(duì)列中的排隊(duì)順序,有禮貌的,先到者先拿到鎖。

          非公平鎖: 當(dāng)線程要獲取鎖時(shí),無(wú)視隊(duì)列順序直接去搶鎖,不講道理的,誰(shuí)搶到就是誰(shuí)的。

          acquire(int arg)是獨(dú)占式獲取同步狀態(tài)的方法,我們來(lái)看一下源碼:

          • acquire(long arg)方法

          public final void acquire(long arg) {if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }
          • addWaiter方法

          //構(gòu)造Nodeprivate Node addWaiter(Node mode) {  Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failure(快速嘗試添加尾節(jié)點(diǎn))        Node pred = tail;if (pred != null) {            node.prev = pred;//CAS設(shè)置尾節(jié)點(diǎn)if (compareAndSetTail(pred, node)) {                pred.next = node;return node;            }        }//多次嘗試        enq(node);return node;        }
          • acquireQueued(final Node node, long arg)方法

          final boolean acquireQueued(final Node node, long arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {                    setHead(node);                    p.next = null; // help GC                    failed = false;return interrupted;                }if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {if (failed)                cancelAcquire(node);        }    }
          • selfInterrupt()方法

          static void selfInterrupt() {        Thread.currentThread().interrupt();    }

          結(jié)合源代碼,可得acquire(int arg)方法流程圖,如下:





          共享式


          多個(gè)線程可同時(shí)執(zhí)行,如Semaphore/CountDownLatch等都是共享式的產(chǎn)物。

          acquireShared(long arg)是共享式獲取同步狀態(tài)的方法,可以看一下源碼:

          public final void acquireShared(long arg) {if (tryAcquireShared(arg) < 0)            doAcquireShared(arg);    }

          由上可得,先調(diào)用tryAcquireShared(int arg)方法嘗試獲取同步狀態(tài),如果獲取失敗,調(diào)用doAcquireShared(int arg)自旋方式獲取同步狀態(tài),方法源碼如下:

          private void doAcquireShared(long arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {long r = tryAcquireShared(arg);if (r >= 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GCif (interrupted)                            selfInterrupt();                        failed = false;return;                    }                }if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {if (failed)                cancelAcquire(node);        }    }

          AQS的模板方法設(shè)計(jì)模式


          模板方法模式


          模板方法模式:在一個(gè)方法中定義一個(gè)算法的骨架,而將一些步驟延遲到子類中。模板方法使得子類可以在不改變算法結(jié)構(gòu)的情況下,重新定義算法中的某些步驟。


          模板方法模式生活中的例子:假設(shè)我們要去北京旅游,那么我們可以坐高鐵或者飛機(jī),或者火車(chē),那么定義交通方式的抽象類,可以有以下模板:買(mǎi)票->安檢->乘坐xx交通工具->到達(dá)北京。讓子類繼承該抽象類,實(shí)現(xiàn)對(duì)應(yīng)的模板方法。



          AQS定義的一些模板方法如下:


          isHeldExclusively()//該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
          tryAcquire(int)//獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
          tryRelease(int)//獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。tryAcquireShared(int)//共享方式。嘗試獲取資源。負(fù)數(shù)表示失?。?表示成功,但沒(méi)有剩余可用資源;正數(shù)表示成功,且有剩余資源。
          tryReleaseShared(int)//共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。

          簡(jiǎn)言之,就是AQS提供tryAcquire,tryAcquireShared等模板方法,給子類實(shí)現(xiàn)自定義的同步器。


          自定義同步器。


          基于以上分析,我們都知道state,CLH隊(duì)列,ConditionObject隊(duì)列 等這些關(guān)鍵點(diǎn),你要實(shí)現(xiàn)自定義鎖的話,首先需要確定你要實(shí)現(xiàn)的是獨(dú)占鎖還是共享鎖,定義原子變量state的含義,再定義一個(gè)內(nèi)部類去繼承AQS,重寫(xiě)對(duì)應(yīng)的模板方法。


          我們來(lái)看一下基于 AQS 實(shí)現(xiàn)的不可重入的獨(dú)占鎖的demo,來(lái)自《Java并發(fā)編程之美》:

          public class NonReentrantLock implements Lock,Serializable{
          //內(nèi)部類,自定義同步器static class Sync extends AbstractQueuedSynchronizer {//是否鎖已經(jīng)被持有public boolean isHeldExclusively() {return getState() == 1; }//如果state為0 則嘗試獲取鎖public boolean tryAcquire(int arg) {assert arg== 1 ;//CAS設(shè)置狀態(tài),能保證操作的原子性,當(dāng)前為狀態(tài)為0,操作成功狀態(tài)改為1if(compareAndSetState(0, 1)){//設(shè)置當(dāng)前獨(dú)占的線程 setExclusiveOwnerThread(Thread.currentThread());return true; }return false; }//嘗試釋放鎖,設(shè)置state為0public boolean tryRelease(int arg) {assert arg ==1;//如果同步器同步器狀態(tài)等于0,則拋出監(jiān)視器非法狀態(tài)異常if(getState() == 0)throw new IllegalMonitorStateException();//設(shè)置獨(dú)占鎖的線程為null setExclusiveOwnerThread(null);//設(shè)置同步狀態(tài)為0 setState(0);return true; }//返回Condition,每個(gè)Condition都包含了一個(gè)Condition隊(duì)列Condition newCondition(){return new ConditionObject(); } }//創(chuàng)建一個(gè)Sync來(lái)做具體的工作private final Sync sync= new Sync ();
          @Overridepublic void lock() { sync.acquire(1); }
          public boolean isLocked() {return sync.isHeldExclusively(); }@Overridepublic void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
          @Overridepublic boolean tryLock() {return sync.tryAcquire(1); }
          @Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time)); }
          @Overridepublic void unlock() { sync.release(1); }

          @Overridepublic Condition newCondition() {return sync.newCondition(); }    }

          NonReentrantLockDemoTest:

          public class NonReentrantLockDemoTest {
          private static NonReentrantLock nonReentrantLock = new NonReentrantLock();
          public static void main(String[] args) {for (int i = 0; i < 10; i++) { Thread thread = new Thread(() -> { nonReentrantLock.lock();try { System.out.println(Thread.currentThread().getName()); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } finally { nonReentrantLock.unlock(); } }); thread.start(); } }}

          運(yùn)行結(jié)果:





          AQS全家桶實(shí)戰(zhàn)


          AQS派生出如ReentrantLock、Semaphore等AQS全家桶,接下來(lái)可以看一下它們的使用案例。


          ReentrantLock


          ReentrantLock介紹


          • ReentrantLock為重入鎖,能夠?qū)蚕碣Y源能夠重復(fù)加鎖,是實(shí)現(xiàn)Lock接口的一個(gè)類。

          • ReentrantLock支持公平鎖和非公平鎖兩種方式


          ReentrantLock案例


          使用ReentrantLock來(lái)實(shí)現(xiàn)個(gè)簡(jiǎn)單線程安全的list,如下:

          public class ReentrantLockList {// 線程不安全的listprivate ArrayList<String> array = new ArrayList<>();//獨(dú)占鎖private volatile ReentrantLock lock = new ReentrantLock();
          //添加元素public void add(String e){lock.lock();try { array.add(e); }finally {lock.unlock(); } }
          //刪除元素public void remove(String e){lock.lock();try { array.remove(e); }finally {lock.unlock(); } }//獲取元素public String get(int index){lock.lock();try {return array.get(index); }finally {lock.unlock(); } }}

          Semaphore

          Semaphore介紹

          • Semaphore也叫信號(hào)量,可以用來(lái)控制資源并發(fā)訪問(wèn)的線程數(shù)量,通過(guò)協(xié)調(diào)各個(gè)線程,以保證合理的使用資源。

          Semaphore案例

          Java多線程有一到比較經(jīng)典的面試題:ABC三個(gè)線程順序輸出,循環(huán)10遍。

          public class ABCSemaphore {
          private static Semaphore A = new Semaphore(1);private static Semaphore B = new Semaphore(1);private static Semaphore C = new Semaphore(1);

          static class ThreadA extends Thread {
          @Overridepublic void run() {try {for (int i = 0; i < 10; i++) { A.acquire(); System.out.print("A"); B.release(); } } catch (InterruptedException e) { e.printStackTrace(); } }
          }
          static class ThreadB extends Thread {
          @Overridepublic void run() {try {for (int i = 0; i < 10; i++) { B.acquire(); System.out.print("B"); C.release(); } } catch (InterruptedException e) { e.printStackTrace(); } }
          }
          static class ThreadC extends Thread {
          @Overridepublic void run() {try {for (int i = 0; i < 10; i++) { C.acquire(); System.out.print("C"); A.release(); } } catch (InterruptedException e) { e.printStackTrace(); } }
          }
          public static void main(String[] args) throws InterruptedException {// 開(kāi)始只有A可以獲取, BC都不可以獲取, 保證了A最先執(zhí)行 B.acquire(); C.acquire();new ThreadA().start();new ThreadB().start();new ThreadC().start();    }





          往期精彩推薦



          騰訊、阿里、滴滴后臺(tái)面試題匯總總結(jié) — (含答案)

          面試:史上最全多線程面試題 !

          最新阿里內(nèi)推Java后端面試題

          JVM難學(xué)?那是因?yàn)槟銢](méi)認(rèn)真看完這篇文章


          END


          關(guān)注作者微信公眾號(hào) —《JAVA爛豬皮》


          了解更多java后端架構(gòu)知識(shí)以及最新面試寶典


          你點(diǎn)的每個(gè)好看,我都認(rèn)真當(dāng)成了


          看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動(dòng)力

          瀏覽 61
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  成人内射生活片 | 大香蕉电影大香蕉网 | 午夜福利久久 | 午夜福利视频在线 | 国内精品视频在线 |