<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          同步組件Semaphore源碼解析

          共 9077字,需瀏覽 19分鐘

           ·

          2023-06-25 22:36

          走過路過不要錯(cuò)過

          點(diǎn)擊藍(lán)字關(guān)注我們


          Semaphore概述及案例學(xué)習(xí)

          Semaphore信號(hào)量用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理地使用公共資源。

          public class SemaphoreTest {

          private static final int THREAD_COUNT = 30;
          private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

          private static Semaphore s = new Semaphore(10); //10個(gè)許可證數(shù)量,最大并發(fā)數(shù)為10

          public static void main(String[] args) {
          for(int i = 0; i < THREAD_COUNT; i ++){ //執(zhí)行30個(gè)線程
          threadPool.execute(new Runnable() {
          @Override
          public void run() {
          s.tryAcquire(); //嘗試獲取一個(gè)許可證
          System.out.println("save data");
          s.release(); //使用完之后歸還許可證
          }
          });
          }
          threadPool.shutdown();
          }
          }


          • 創(chuàng)建一個(gè)大小為30的線程池,但是信號(hào)量規(guī)定在10,保證許可證數(shù)量為10。

          • 每次線程調(diào)用tryAcquire()或者acquire()方法都會(huì)原子性的遞減許可證的數(shù)量,release()會(huì)原子性遞增許可證數(shù)量。

          類圖結(jié)構(gòu)及重要字段

          public class Semaphore implements java.io.Serializable {
          private static final long serialVersionUID = -3222578661600680210L;
          /** All mechanics via AbstractQueuedSynchronizer subclass */
          private final Sync sync;

          abstract static class Sync extends AbstractQueuedSynchronizer {
          // permits指定初始化信號(hào)量個(gè)數(shù)
          Sync(int permits) {
          setState(permits);
          }
          // ...
          }

          static final class NonfairSync extends Sync {...}

          static final class FairSync extends Sync {...}

          // 默認(rèn)采用非公平策略
          public Semaphore(int permits) {
          sync = new NonfairSync(permits);
          }

          // 可以指定公平策略
          public Semaphore(int permits, boolean fair) {
          sync = fair ? new FairSync(permits) : new NonfairSync(permits);
          }

          //...
          }


          • 基于AQS,類似于ReentrantLock,Sync繼承自AQS,有公平策略和非公平策略兩種實(shí)現(xiàn)。

          • 類似于CountDownLatch,state在這里也是通過構(gòu)造器指定,表示初始化信號(hào)量的個(gè)數(shù)。

          本篇文章閱讀需要建立在一定的AQS基礎(chǔ)之上,這邊推薦幾篇前置文章,可以瞅一眼:

          void acquire()

          調(diào)用該方法時(shí),表示希望獲取一個(gè)信號(hào)量資源,相當(dāng)于acquire(1)。

          如果當(dāng)前信號(hào)量個(gè)數(shù)大于0,CAS將當(dāng)前信號(hào)量值減1,成功后直接返回。

          如果當(dāng)前信號(hào)量個(gè)數(shù)等于0,則當(dāng)前線程將被置入AQS的阻塞隊(duì)列。

          該方法是響應(yīng)中斷的,其他線程調(diào)用了該線程的interrupt()方法,將會(huì)拋出中斷異常返回。

              // Semaphore.java
          public void acquire() throws InterruptedException {
          // 傳遞的 arg 為 1 , 獲取1個(gè)信號(hào)量資源
          sync.acquireSharedInterruptibly(1);
          }
          // AQS.java
          public final void acquireSharedInterruptibly(int arg)
          throws InterruptedException {
          // 線程被 中斷, 拋出中斷異常
          if (Thread.interrupted())
          throw new InterruptedException();
          // 子類實(shí)現(xiàn), 公平和非公平兩種策略
          if (tryAcquireShared(arg) < 0)
          // 如果獲取失敗, 則置入阻塞隊(duì)列,
          // 再次進(jìn)行嘗試, 嘗試失敗則掛起當(dāng)前線程
          doAcquireSharedInterruptibly(arg);
          }


          非公平

              static final class NonfairSync extends Sync {
          private static final long serialVersionUID = -2694183684443567898L;

          NonfairSync(int permits) {
          super(permits);
          }

          protected int tryAcquireShared(int acquires) {
          // 這里直接調(diào)用Sync定義的 非公平共享模式獲取方法
          return nonfairTryAcquireShared(acquires);
          }
          }

          abstract static class Sync extends AbstractQueuedSynchronizer {

          final int nonfairTryAcquireShared(int acquires) {
          for (;;) {
          // 獲取當(dāng)前信號(hào)量的值
          int available = getState();
          // 減去需要獲取的值, 得到剩余的信號(hào)量個(gè)數(shù)
          int remaining = available - acquires;
          // 不剩了,表示當(dāng)前信號(hào)量個(gè)數(shù)不能滿足需求, 返回負(fù)數(shù), 線程置入AQS阻塞
          // 還有的剩, CAS設(shè)置當(dāng)前信號(hào)量值為剩余值, 并返回剩余值
          if (remaining < 0 ||
          compareAndSetState(available, remaining))
          return remaining;
          }
          }
          }


          你會(huì)發(fā)現(xiàn),非公平策略是無法保證【AQS隊(duì)列中阻塞的線程】和【當(dāng)前線程】獲取的順序的,當(dāng)前線程是有可能在排隊(duì)的線程之前就拿到資源,產(chǎn)生插隊(duì)現(xiàn)象。

          公平策略就不一樣了,它會(huì)通過hasQueuedPredecessors()方法看看隊(duì)列中是否存在前驅(qū)節(jié)點(diǎn),以保證公平性。

          公平策略

              static final class FairSync extends Sync {
          private static final long serialVersionUID = 2014338818796000944L;

          FairSync(int permits) {
          super(permits);
          }

          protected int tryAcquireShared(int acquires) {
          for (;;) {
          // 如果隊(duì)列中在此之前已經(jīng)有線程在排隊(duì)了,直接放棄獲取
          if (hasQueuedPredecessors())
          return -1;
          int available = getState();
          int remaining = available - acquires;
          if (remaining < 0 ||
          compareAndSetState(available, remaining))
          return remaining;
          }
          }
          }


          void acquire(int permits)

          在acquire()的基礎(chǔ)上,指定了獲取信號(hào)量的數(shù)量permits。

              public void acquire(int permits) throws InterruptedException {
          if (permits < 0) throw new IllegalArgumentException();
          sync.acquireSharedInterruptibly(permits);
          }


          void acquireUninterruptibly()

          該方法與acquire()類似,但是不響應(yīng)中斷。

              public void acquireUninterruptibly() {
          sync.acquireShared(1);
          }

          public final void acquireShared(int arg) {
          if (tryAcquireShared(arg) < 0)
          doAcquireShared(arg);
          }


          void acquireUninterruptibly(int permits)

          該方法與acquire(permits)類似,但是不響應(yīng)中斷。

              public void acquireUninterruptibly(int permits) {
          if (permits < 0) throw new IllegalArgumentException();
          sync.acquireShared(permits);
          }


          boolean tryAcquire()

          tryAcquire和acquire非公平策略公用一個(gè)邏輯,但是區(qū)別在于,如果獲取信號(hào)量失敗,或者CAS失敗,將會(huì)直接返回false,而不會(huì)置入阻塞隊(duì)列中。

          一般try開頭的方法的特點(diǎn)就是這樣,嘗試一下,成功是最好,失敗也不至于被阻塞,而是立刻返回false。

              public boolean tryAcquire() {
          return sync.nonfairTryAcquireShared(1) >= 0;
          }
          abstract static class Sync extends AbstractQueuedSynchronizer {
          final int nonfairTryAcquireShared(int acquires) {
          for (;;) {
          int available = getState();
          int remaining = available - acquires;
          if (remaining < 0 ||
          compareAndSetState(available, remaining))
          return remaining;
          }
          }
          }


          boolean tryAcquire(int permits)

          相比于普通的tryAcquire(),指定了permits的值。

              public boolean tryAcquire(int permits) {
          if (permits < 0) throw new IllegalArgumentException();
          return sync.nonfairTryAcquireShared(permits) >= 0;
          }


          boolean tryAcquire(int permits, long timeout, TimeUnit unit)

          相比于tryAcquire(int permits),增加了超時(shí)控制。

              public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
          throws InterruptedException {
          if (permits < 0) throw new IllegalArgumentException();
          return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
          }


          void release()

          將信號(hào)量值加1,如果有線程因?yàn)檎{(diào)用acquire方法而被阻塞在AQS阻塞隊(duì)列中,將根據(jù)公平策略選擇一個(gè)信號(hào)量個(gè)數(shù)滿足需求的線程喚醒,線程喚醒后也會(huì)嘗試獲取新增的信號(hào)量。

          參考文章:Java并發(fā)包源碼學(xué)習(xí)系列:AQS共享模式獲取與釋放資源

              // Semaphore.java
          public void release() {
          sync.releaseShared(1);
          }
          // AQS.java
          public final boolean releaseShared(int arg) {
          // 嘗試釋放鎖
          if (tryReleaseShared(arg)) {
          // 釋放成功, 喚醒AQS隊(duì)列里面最先掛起的線程
          // https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838
          doReleaseShared();
          return true;
          }
          return false;
          }
          // Semaphore#Sync.java
          abstract static class Sync extends AbstractQueuedSynchronizer {
          protected final boolean tryReleaseShared(int releases) {
          for (;;) {
          // 獲取當(dāng)前信號(hào)量
          int current = getState();
          // 期望加上releases
          int next = current + releases;
          if (next < current) // overflow
          throw new Error("Maximum permit count exceeded");
          // CAS操作,更新
          if (compareAndSetState(current, next))
          return true;
          }
          }
          }


          void release(int permits)

          release()相比指定了permits的值。

              public void release(int permits) {
          if (permits < 0) throw new IllegalArgumentException();
          sync.releaseShared(permits);
          }


          其他方法

          Semaphore還提供其他一些方法,實(shí)現(xiàn)比較簡(jiǎn)單,這邊就簡(jiǎn)單寫一下吧:

              // 返回此信號(hào)量中當(dāng)前可用的許可證數(shù)量, 其實(shí)就是得到當(dāng)前的 state值  getState()
          public int availablePermits() {
          return sync.getPermits();
          }

          // 將state更新為0, 返回0
          public int drainPermits() {
          return sync.drainPermits();
          }

          // 減少reduction個(gè)許可證
          protected void reducePermits(int reduction) {
          if (reduction < 0) throw new IllegalArgumentException();
          sync.reducePermits(reduction);
          }

          // 判斷公平策略
          public boolean isFair() {
          return sync instanceof FairSync;
          }

          // 判斷是否有線程證在等待獲取許可證
          public final boolean hasQueuedThreads() {
          return sync.hasQueuedThreads();
          }

          // 返回正在等待獲取許可證的線程數(shù)
          public final int getQueueLength() {
          return sync.getQueueLength();
          }

          // 返回所有等待獲取許可證的線程集合
          protected Collection<Thread> getQueuedThreads() {
          return sync.getQueuedThreads();
          }


          總結(jié)

          Semaphore信號(hào)量用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理地使用公共資源。

          • 基于AQS,類似于ReentrantLock,Sync繼承自AQS,有公平策略和非公平策略兩種實(shí)現(xiàn)。

          • 類似于CountDownLatch,state在這里也是通過構(gòu)造器指定,表示初始化信號(hào)量的個(gè)數(shù)。

          每次線程調(diào)用tryAcquire()或者acquire()方法都會(huì)原子性的遞減許可證的數(shù)量,release()會(huì)原子性遞增許可證數(shù)量,只要有許可證就可以重復(fù)使用




          想進(jìn)大廠的小伙伴請(qǐng)注意,

          大廠面試的套路很神奇,

          早做準(zhǔn)備對(duì)大家更有好處,

          埋頭刷題效率低,

          看面經(jīng)會(huì)更有效率!

          小編準(zhǔn)備了一份大廠常問面經(jīng)匯總集

          剩下的就不會(huì)給大家一展出來了,以上資料按照一下操作即可獲得

          ——將文章進(jìn)行轉(zhuǎn)發(fā)評(píng)論,關(guān)注公眾號(hào)【Java烤豬皮】,關(guān)注后繼續(xù)后臺(tái)回復(fù)領(lǐng)取口令“ 666 ”即可免費(fèi)領(lǐng)文章取中所提供的資料。




          往期精品推薦



          騰訊、阿里、滴滴后臺(tái)試題匯集總結(jié) — (含答案)

          面試:史上最全多線程序面試題!

          最新阿里內(nèi)推Java后端試題

          JVM難學(xué)?那是因?yàn)槟銢]有真正看完整這篇文章


          結(jié)束


          關(guān)注作者微信公眾號(hào) — 《JAVA烤豬皮》


          了解了更多java后端架構(gòu)知識(shí)以及最新面試寶典



          看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者來源不斷出文的動(dòng)力~

          瀏覽 54
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  伊人成人网站在线观看 | 超级A片在线观看 | 操逼A片 中文字幕乱妇无码Av在线 | 亚洲 在线观看 | 日韩 欧美中文字幕第一页在线 |