<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          給力!Java 并發(fā)之 Semaphore 源碼剖析!

          共 1038字,需瀏覽 3分鐘

           ·

          2020-08-23 11:28

          本文精選自 Doocs 開源社區(qū)旗下“源碼獵人”項目,作者 AmyliaY。

          項目將會持續(xù)更新,歡迎 Star 關(guān)注。

          項目地址:https://github.com/doocs/source-code-hunter

          Semaphore 信號量,可用于控制一定時間內(nèi),并發(fā)執(zhí)行的線程數(shù),基于 AQS 實現(xiàn)??蓱?yīng)用于網(wǎng)關(guān)限流、資源限制 (如 最大可發(fā)起連接數(shù))。由于?release()?釋放許可時,未對釋放許可數(shù)做限制,所以可以通過該方法增加總的許可數(shù)量。

          獲取許可支持公平和非公平模式,默認(rèn)非公平模式。公平模式無論是否有許可,都會先判斷是否有線程在排隊,如果有線程排隊,則進(jìn)入排隊,否則嘗試獲取許可;非公平模式無論許可是否充足,直接嘗試獲取許可。

          不多廢話,下面直接挖源碼。

          核心內(nèi)部類 Sync

          abstract static class Sync extends AbstractQueuedSynchronizer {    private static final long serialVersionUID = 1192457210091910933L;    /* 賦值state為總許可數(shù) */    Sync(int permits) {        setState(permits);    }    /* 剩余許可數(shù) */    final int getPermits() {        return getState();    }    /* 自旋 + CAS非公平獲取 */    final int nonfairTryAcquireShared(int acquires) {        for (;;) {            // 剩余可用許可數(shù)            int available = getState();            // 本次獲取許可后,剩余許可            int remaining = available - acquires;            // 如果獲取后,剩余許可大于0,則CAS更新剩余許可,否則獲取失敗失敗            if (remaining < 0 ||                compareAndSetState(available, remaining))                return remaining;        }    }    /**     * 自旋 + CAS 釋放許可     * 由于未對釋放許可數(shù)做限制,所以可以通過release動態(tài)增加許可數(shù)量     */    protected final boolean tryReleaseShared(int releases) {        for (;;) {            // 當(dāng)前剩余許可            int current = getState();            // 許可更新值            int next = current + releases;            // 如果許可更新值為負(fù)數(shù),說明許可數(shù)量溢出,拋出錯誤            if (next < current) // overflow                throw new Error("Maximum permit count exceeded");            // CAS更新許可數(shù)量            if (compareAndSetState(current, next))                return true;        }    }    /* 自旋 + CAS 減少許可數(shù)量 */    final void reducePermits(int reductions) {        for (;;) {            // 當(dāng)前剩余許可            int current = getState();            // 更新值            int next = current - reductions;               // 較少許可數(shù)錯誤,拋出異常            if (next > current) // underflow                throw new Error("Permit count underflow");            // CAS更新許可數(shù)            if (compareAndSetState(current, next))                return;        }    }    /* 丟棄所有許可 */    final int drainPermits() {        for (;;) {            int current = getState();            if (current == 0 || compareAndSetState(current, 0))                return current;        }    }}/** * 非公平模式 */static final class NonfairSync extends Sync {    private static final long serialVersionUID = -2694183684443567898L;    NonfairSync(int permits) {        super(permits);    }    protected int tryAcquireShared(int acquires) {        return nonfairTryAcquireShared(acquires);    }}/** * 公平模式 */static final class FairSync extends Sync {    private static final long serialVersionUID = 2014338818796000944L;    FairSync(int permits) {        super(permits);    }    /**     * 公平模式獲取許可     * 公平模式不論許可是否充足,都會判斷同步隊列中是否有線程在等地,如果有,獲取失敗,排隊阻塞     */    protected int tryAcquireShared(int acquires) {        for (;;) {            // 如果有線程在排隊,立即返回            if (hasQueuedPredecessors())                return -1;            // 自旋 + cas獲取許可            int available = getState();            int remaining = available - acquires;            if (remaining < 0 ||                compareAndSetState(available, remaining))                return remaining;        }    }}

          主要 API

          public class Semaphore implements java.io.Serializable {    private static final long serialVersionUID = -3222578661600680210L;    /** All mechanics via AbstractQueuedSynchronizer subclass */    private final Sync sync;    /**     * 根據(jù)給定的 總許可數(shù)permits,創(chuàng)建 Semaphore     */    public Semaphore(int permits) {        sync = new NonfairSync(permits);    }    /**     * fairtrue表示使用公平鎖模式,false使用非公平鎖     */    public Semaphore(int permits, boolean fair) {        sync = fair ? new FairSync(permits) : new NonfairSync(permits);    }    // --------------------- 獲取許可 --------------------    /* 獲取指定數(shù)量的許可    */    public void acquire(int permits) throws InterruptedException {        if (permits < 0) throw new IllegalArgumentException();        sync.acquireSharedInterruptibly(permits);    }    /* 獲取一個許可    */    public void acquire() throws InterruptedException {        sync.acquireSharedInterruptibly(1);    }    public final void acquireSharedInterruptibly(int arg)        throws InterruptedException {        if (Thread.interrupted())            throw new InterruptedException();        if (tryAcquireShared(arg) < 0) // 獲取許可,剩余許可>=0,則獲取許可成功,<0獲取許可失敗,進(jìn)入排隊            doAcquireSharedInterruptibly(arg);    }    protected int tryAcquireShared(int acquires) {        return nonfairTryAcquireShared(acquires);    }    /**     * @return 剩余許可數(shù)量。非負(fù)數(shù),獲取許可成功,負(fù)數(shù),獲取許可失敗     */    final int nonfairTryAcquireShared(int acquires) {        for (;;) {            int available = getState();            int remaining = available - acquires;            if (remaining < 0 ||                compareAndSetState(available, remaining))                return remaining;        }    }    /**     * 獲取許可失敗,當(dāng)前線程進(jìn)入同步隊列,排隊阻塞     */    private void doAcquireSharedInterruptibly(int arg)        throws InterruptedException {        // 創(chuàng)建同步隊列節(jié)點,并入隊        final Node node = addWaiter(Node.SHARED);        boolean failed = true;        try {            for (;;) {                // 如果當(dāng)前節(jié)點是第二個節(jié)點,嘗試獲取鎖                final Node p = node.predecessor();                if (p == head) {                    int r = tryAcquireShared(arg);                    if (r >= 0) {                        setHeadAndPropagate(node, r);                        p.next = null; // help GC                        failed = false;                        return;                    }                }                // 阻塞當(dāng)前線程                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    throw new InterruptedException();            }        } finally {            if (failed)                cancelAcquire(node);        }    }    // --------------------- 釋放歸還許可 -------------------------    /* 釋放指定數(shù)量的許可 */    public void release(int permits) {        if (permits < 0) throw new IllegalArgumentException();        sync.releaseShared(permits);    }    /* 釋放一個許可 */    public void release() {        sync.releaseShared(1);    }    public final boolean releaseShared(int arg) {        // 歸還許可成功        if (tryReleaseShared(arg)) {            doReleaseShared();            return true;        }        return false;    }    /**     * 釋放許可     * 由于未對釋放許可數(shù)做限制,所以可以通過release動態(tài)增加許可數(shù)量     */    protected final boolean tryReleaseShared(int releases) {        for (;;) {            int current = getState();            int next = current + releases;            if (next < current) // overflow                throw new Error("Maximum permit count exceeded");            if (compareAndSetState(current, next))                return true;        }    }    private void doReleaseShared() {        // 自旋,喚醒等待的第一個線程(其他線程將由第一個線程向后傳遞喚醒)        for (;;) {            Node h = head;            if (h != null && h != tail) {                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                        continue;            // loop to recheck cases                    // 喚醒第一個等待線程                    unparkSuccessor(h);                }                else if (ws == 0 &&                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                    continue;                // loop on failed CAS            }            if (h == head)                   // loop if head changed                break;        }    }}

          全文完!

          希望本文對大家有所幫助。如果感覺本文有幫助,有勞轉(zhuǎn)發(fā)或點一下“在看”!讓更多人收獲知識!


          長按識別下圖二維碼,關(guān)注公眾號「Doocs 開源社區(qū)」,第一時間跟你們分享好玩、實用的技術(shù)文章與業(yè)內(nèi)最新資訊。



          瀏覽 54
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  99在线免费视频观看 | 久久久伊人网 | 黄色视频免费国产 | 美女天天干 | 狼友网址 |