<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Java高并發(fā)編程基礎(chǔ)三大利器之Semaphore

          共 21672字,需瀏覽 44分鐘

           ·

          2021-03-07 16:51

          Python實戰(zhàn)社群

          Java實戰(zhàn)社群

          長按識別下方二維碼,按需求添加

          掃碼關(guān)注添加客服

          進(jìn)Python社群▲

          掃碼關(guān)注添加客服

          進(jìn)Java社群


          作者丨java金融

          來源丨java金融

          引言

          最近可以進(jìn)行個稅申報了,還沒有申報的同學(xué)可以趕緊去試試哦。不過我反正是從上午到下午(3月1日)一直都沒有成功的進(jìn)行申報,一進(jìn)行申報 就返回“當(dāng)前訪問人數(shù)過多,請稍后再試”。為什么有些人就能夠申報成功,有些人就直接返回失敗。這很明顯申報處理資源是有限的, 只能等別人處理完了在來處理你的,你如果運氣好可能重試幾次就輪到你了,如果運氣不好可能重試一天也可能輪不到你。我反正已經(jīng)是放棄了,等到夜深人靜的時候再來試試。作為一個程序員我們肯定知道這是個稅申請app的限流操作,如果還有不懂什么 是限流操作的可以參考下這個文章《高并發(fā)系統(tǒng)三大利器之限流》。比如個稅申報系統(tǒng)每臺機器只最多分別最多只能處理1000個請求,再多的請求就會把機器打掛。如果是多余的請求就把這些請求拒絕掉。直接給你返回一句溫馨提示:“當(dāng)前訪問人數(shù)過多,請稍后再試”,如果要實現(xiàn)這個功能大家想想可以通過哪些方法算法來實現(xiàn)。

          共享鎖、獨占鎖

          學(xué)習(xí)semaphore之前我們必須要先了解下什么是共享鎖。在上一篇文章《Java高并發(fā)編程基礎(chǔ)之AQS》我們介紹了公平鎖與非公平鎖的區(qū)別。

          • 共享鎖:它是允許多個線程同時獲取鎖,并發(fā)的訪問共享資源
          • 獨占鎖:也有人把它叫做“獨享鎖”,它是是獨占的,排他的,只能被一個線程可持有, 當(dāng)獨占鎖已經(jīng)被某個線程持有時,其他線程只能等待它被釋放后,才能去爭鎖,并且同一時刻只有一個線程能爭鎖成功。

          什么是Semaphore

          在《Java并發(fā)編程藝術(shù)》這一書中是這么說的:

          Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源。很多年以來,我都覺得從字面上很難理解Semaphore所表達(dá)的含義,只能把它比作是控制流量的紅綠燈,比如XX馬路要限制流量,只允許同時有一百輛車在這條路上行使,其他的都必須在路口等待,所以前一百輛車會看到綠燈,可以開進(jìn)這條馬路,后面的車會看到紅燈,不能駛?cè)隭X馬路,但是如果前一百輛中有五輛車已經(jīng)離開了XX馬路,那么后面就允許有5輛車駛?cè)腭R路,這個例子里說的車就是線程,駛?cè)腭R路就表示線程在執(zhí)行,離開馬路就表示線程執(zhí)行完成,看見紅燈就表示線程被阻塞,不能執(zhí)行。

          • Semaphore機制是提供給線程搶占式獲取許可,所以他可以實現(xiàn)公平或者非公平,類似于ReentrantLock。說了這么多我們來個實際的例子看一看,比如我們?nèi)ネ\噲鐾\嚕\噲隹偣仓挥?code style="font-size: 14px;overflow-wrap: break-word;padding: 2px 4px;border-radius: 4px;margin-right: 2px;margin-left: 2px;color: rgb(30, 107, 184);background-color: rgba(27, 31, 35, 0.05);font-family: "Operator Mono", Consolas, Monaco, Menlo, monospace;word-break: break-all;">5個車位,但是現(xiàn)在有8輛汽車來停車,剩下的3輛汽車要么等其他汽車開走后進(jìn)行停車,或者去找別的停車位?
          /**
           * @author: 公眾號【Java金融】
           */

          public class SemaphoreTest {
              public static void main(String[] args) throws InterruptedException {
                   // 初始化五個車位
                  Semaphore semaphore = new Semaphore(5);
                  // 等所有車子
                  final CountDownLatch latch = new CountDownLatch(8);
                  for (int i = 0; i < 8; i++) {
                      int finalI = i;
                      if (i == 5) {
                          Thread.sleep(1000);
                          new Thread(() -> {
                              stopCarNotWait(semaphore, finalI);
                              latch.countDown();
                          }).start();
                          continue;
                      }
                      new Thread(() -> {
                          stopCarWait(semaphore, finalI);
                          latch.countDown();
                      }).start();
                  }
                  latch.await();
                  log("總共還剩:" + semaphore.availablePermits() + "個車位");
              }

              private static void stopCarWait(Semaphore semaphore, int finalI) {
                  String format = String.format("車牌號%d", finalI);
                  try {
                      semaphore.acquire(1);
                      log(format + "找到車位了,去停車了");
                      Thread.sleep(10000);
                  } catch (Exception e) {
                      e.printStackTrace();
                  } finally {
                      semaphore.release(1);
                      log(format + "開走了");
                  }
              }

              private static void stopCarNotWait(Semaphore semaphore, int finalI) {
                   String format = String.format("車牌號%d", finalI);
                  try {
                      if (semaphore.tryAcquire()) {
                          log(format + "找到車位了,去停車了");
                          Thread.sleep(10000);
                          log(format + "開走了");
                          semaphore.release();
                      } else {
                          log(format + "沒有停車位了,不在這里等了去其他地方停車去了");
                      }
                  } catch (Exception e) {
                      e.printStackTrace();
                  }

              }

              public static void log(String content) {
                  // 格式化
                  DateTimeFormatter fmTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                  // 當(dāng)前時間
                  LocalDateTime now = LocalDateTime.now();
                  System.out.println(now.format(fmTime) + "  "+content);
              }
          }
          2021-03-01 18:54:57  車牌號0找到車位了,去停車了
          2021-03-01 18:54:57  車牌號3找到車位了,去停車了
          2021-03-01 18:54:57  車牌號2找到車位了,去停車了
          2021-03-01 18:54:57  車牌號1找到車位了,去停車了
          2021-03-01 18:54:57  車牌號4找到車位了,去停車了
          2021-03-01 18:54:58  車牌號5沒有停車位了,不在這里等了去其他地方停車去了
          2021-03-01 18:55:07  車牌號7找到車位了,去停車了
          2021-03-01 18:55:07  車牌號6找到車位了,去停車了
          2021-03-01 18:55:07  車牌號2開走了
          2021-03-01 18:55:07  車牌號0開走了
          2021-03-01 18:55:07  車牌號3開走了
          2021-03-01 18:55:07  車牌號4開走了
          2021-03-01 18:55:07  車牌號1開走了
          2021-03-01 18:55:17  車牌號7開走了
          2021-03-01 18:55:17  車牌號6開走了
          2021-03-01 18:55:17  總共還剩:5個車位

          從輸出結(jié)果我們可以看到車牌號5這輛車看見沒有車位了,就不在這個地方傻傻的等了,而是去其他地方了,但是車牌號6車牌號7分別需要等到車庫開出兩輛車空出兩個車位后才停進(jìn)去。這就體現(xiàn)了Semaphoreacquire 方法如果沒有獲取到憑證它就會阻塞,而tryAcquire方法如果沒有獲取到憑證不會阻塞的。

          semaphore在dubbo中的應(yīng)用

          Dubbo中可以給Provider配置線程池大小來控制系統(tǒng)提供服務(wù)的最大并行度,默認(rèn)是200

          <dubbo:provider  threads="200"/>

          比如我現(xiàn)在這個訂單系統(tǒng)有三個接口,分別為創(chuàng)單、取消訂單、修改訂單。這三個接口加起來的并發(fā)是200但是創(chuàng)單接口是核心接口,我想讓它多分點線程來執(zhí)行 讓它可以有最大150個線程,取消訂單和修改訂單分別最大25個線程執(zhí)行就可以了。dubbo提供了executes這一屬性來實現(xiàn)這個功能

          <dubbo:service interface="cn.javajr.service.CreateOrderService" executes="150"/>
          <dubbo:service interface="cn.javajr.service.CancelOrderService" executes="25"/>
          <dubbo:service interface="cn.javajr.service.EditOrderService" executes="25"/>

          我們可以看看dubbo內(nèi)部是如何來executes的,具體實現(xiàn)是在ExecuteLimitFilter這個類我們可以

           public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
                  URL url = invoker.getUrl();
                  String methodName = invocation.getMethodName();
                  Semaphore executesLimit = null;
                  boolean acquireResult = false;
                  int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
                  if (max > 0) {
                      RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
                      // 如果當(dāng)前使用的線程數(shù)量已經(jīng)大于等于設(shè)置的閾值,那么直接拋出異常
          //            if (count.getActive() >= max) {
          // throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service // using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
                      /**
                       * http://manzhizhen.iteye.com/blog/2386408
                       * use semaphore for concurrency control (to limit thread number)
                       */

                       
                      executesLimit = count.getSemaphore(max);
                      if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                          throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
                      }
                  }
                  long begin = System.currentTimeMillis();
                  boolean isSuccess = true;
                  // 計數(shù)器+1
                  RpcStatus.beginCount(url, methodName);
                  try {
                      Result result = invoker.invoke(invocation);
                      return result;
                  } catch (Throwable t) {
                      isSuccess = false;
                      if (t instanceof RuntimeException) {
                          throw (RuntimeException) t;
                      } else {
                          throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
                      }
                  } finally {
                     // 計數(shù)器-1
                      RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
                      if(acquireResult) {
                          executesLimit.release();
                      }
                  }
              }

          從上述代碼我們也可以看出早期這個是沒有采用Semaphore來實現(xiàn)的,而是直接采用被注釋的  if (count.getActive() >= max) 這個來來實現(xiàn)的,由于這個count.getActive() >= max 和這個計數(shù)加1不是原子性的,所以會有問題,具體bug號可以看https://github.com/apache/dubbo/pull/582后面才采用上述代碼用Semaphore來修復(fù)非原子性問題。具體更詳細(xì)的分析可以參見代碼的鏈接。不過現(xiàn)在最新版本(2.7.9)我看是采用采用自旋加上和CAS來實現(xiàn)的。

          Semaphore

          上面就是對Semaphore一個簡單的使用以及dubbo中用到的例子,說句實話Semaphore在工作中用的還是比較少的,不過面試又有可能會被問到,所以還是有必要來一起學(xué)習(xí)一下它。我們前面《Java高并發(fā)編程基礎(chǔ)之AQS》通過ReentrantLock 一起學(xué)習(xí)了下AQS,其實Semaphore同樣也是通過AQS來是實現(xiàn)的,我們可以一起來對照下獨占鎖的方法,基本上都是有方法一一相對應(yīng)的。這里有兩點稍微需要注意的地方:

          • 在獨占鎖模式中,我們只有在獲取了獨占鎖的節(jié)點釋放鎖時,才會喚醒后繼節(jié)點,因為獨占鎖只能被一個線程持有,如果它還沒有被釋放,就沒有必要去喚醒它的后繼節(jié)點。
          • 在共享鎖模式下,當(dāng)一個節(jié)點獲取到了共享鎖,我們在獲取成功后就可以喚醒后繼節(jié)點了,而不需要等到該節(jié)點釋放鎖的時候,這是因為共享鎖可以被多個線程同時持有,一個鎖獲取到了,則后繼的節(jié)點都可以直接來獲取。因此,在共享鎖模式下,在獲取鎖和釋放鎖結(jié)束時,都會喚醒后繼節(jié)點。

          獲取憑證

          我們同樣還是通過非公平鎖的模式來獲取憑證 我們可以看下acquire的核心方法

            public final void acquireSharedInterruptibly(int arg)
                     throws InterruptedException 
          {
                 if (Thread.interrupted())
                     throw new InterruptedException();
                 if (tryAcquireShared(arg) < 0)
                     doAcquireSharedInterruptibly(arg);
             }
              protected int tryAcquireShared(int acquires) {
                      return nonfairTryAcquireShared(acquires);
             }
           
           // 主要看下這個方法,這個方法返回的值也就是tryAcquireShared返回的值,因為tryAcquireShared->nonfairTryAcquireShared
              final int nonfairTryAcquireShared(int acquires) {
                    //自旋
              for (;;) {
                   //Semaphore用AQS的state變量的值代表可用許可數(shù)
                   int available = getState();
                   //可用許可數(shù)減去本次需要獲取的許可數(shù)即為剩余許可數(shù)
                   int remaining = available - acquires;
                   //如果剩余許可數(shù)小于0或者CAS將當(dāng)前可用許可數(shù)設(shè)置為剩余許可數(shù)成功,則返回成功許可數(shù)
                   if (remaining < 0 ||
                       compareAndSetState(available, remaining))
                       return remaining;
               }
          • 當(dāng)tryAcquireShared 獲取返回許可書小于0時說明獲取許可失敗需要進(jìn)入doAcquireSharedInterruptibly這個方法去休眠。
          • 當(dāng)tryAcquireShared 獲取返回許可書小于0時說明獲取許可成功直接結(jié)束。

          doAcquireSharedInterruptibly


           private void doAcquireSharedInterruptibly(int arg)
                  throws InterruptedException 
          {
                  // 獨占鎖的acquireQueued調(diào)用的是addWaiter(Node.EXCLUSIVE),
                  // 而共享鎖調(diào)用的是addWaiter(Node.SHARED),表明了該節(jié)點處于共享模式
                  final Node node = addWaiter(Node.SHARED);
                  boolean failed = true;
                  try {
                      for (;;) {
                          final Node p = node.predecessor();
                          if (p == head) {
                              int r = tryAcquireShared(arg);
                              if (r >= 0) {
                                  setHeadAndPropagate(node, r);
                                  p.next = null// help GC
                                  failed = false;
                                  return;
                              }
                          }
                          if (shouldParkAfterFailedAcquire(p, node) &&
                              parkAndCheckInterrupt())
                              throw new InterruptedException();
                      }
                  } finally {
                      if (failed)
                          cancelAcquire(node);
                  }
              }

          這個方法是不是跟我們上篇文章講的AQS的獨占鎖的acquireQueued很像,不過獨占鎖它是直接調(diào)用了用了setHead(node)方法,而共享鎖調(diào)用的是setHeadAndPropagate(node, r)這個方法除了調(diào)用setHead 里面還調(diào)用了doReleaseShared(喚醒后繼節(jié)點)

              private void setHeadAndPropagate(Node node, int propagate) {
                  Node h = head; // Record old head for check below
                  setHead(node);
                  if (propagate > 0 || h == null || h.waitStatus < 0 ||
                      (h = head) == null || h.waitStatus < 0) {
                      Node s = node.next;
                      if (s == null || s.isShared())
                          doReleaseShared();
                  }
              }

          其他的方法基本上是和ReentrantLock來實現(xiàn)的獨占鎖差不多,我相信大家對源碼分析感興趣的應(yīng)該也不多,其他更多細(xì)節(jié)問題還是需要自己親自動手去看源碼的。

          總結(jié)

          • 當(dāng)信號量Semaphore初始化設(shè)置許可證為1 時,它也可以當(dāng)作互斥鎖使用。其中0、1就相當(dāng)于它的狀態(tài),當(dāng)=1時表示其他線程可以獲取,當(dāng)=0時,排他,即其他線程必須要等待。
          • SemaphoreJUC包中的一個很簡單的工具類,用來實現(xiàn)多線程下對于資源的同一時刻的訪問線程數(shù)限制
          • Semaphore中存在一個【許可】的概念,即訪問資源之前,先要獲得許可,如果當(dāng)前許可數(shù)量為0,那么線程阻塞,直到獲得許可
          • Semaphore內(nèi)部使用AQS實現(xiàn),由抽象內(nèi)部類Sync繼承了AQS。因為Semaphore天生就是共享的場景,所以其內(nèi)部實際上類似于共享鎖的實現(xiàn)
          • 共享鎖的調(diào)用框架和獨占鎖很相似,它們最大的不同在于獲取鎖的邏輯——共享鎖可以被多個線程同時持有,而獨占鎖同一時刻只能被一個線程持有。
          • 由于共享鎖同一時刻可以被多個線程持有,因此當(dāng)頭節(jié)點獲取到共享鎖時,可以立即喚醒后繼節(jié)點來爭鎖,而不必等到釋放鎖的時候。因此,共享鎖觸發(fā)喚醒后繼節(jié)點的行為可能有兩處,一處在當(dāng)前節(jié)點成功獲得共享鎖后,一處在當(dāng)前節(jié)點釋放共享鎖后。
          • 采用semaphore來進(jìn)行限流的話會產(chǎn)生突刺現(xiàn)象

          指在一定時間內(nèi)的一小段時間內(nèi)就用完了所有資源,后大部分時間中無資源可用。比如在限流方法中的計算器算法,設(shè)置1s內(nèi)的最大請求數(shù)為100,在前100ms已經(jīng)有了100個請求,則后面900ms將無法處理請求,這就是突刺現(xiàn)象

          結(jié)束

          • 由于自己才疏學(xué)淺,難免會有紕漏,假如你發(fā)現(xiàn)了錯誤的地方,還望留言給我指出來,我會對其加以修正。
          • 如果你覺得文章還不錯,你的轉(zhuǎn)發(fā)、分享、贊賞、點贊、留言就是對我最大的鼓勵。
          • 感謝您的閱讀,十分歡迎并感謝您的關(guān)注。
          • 站在巨人的肩膀上摘蘋果:
            https://segmentfault.com/a/1190000016447307
          程序員專欄
           掃碼關(guān)注填加客服 
          長按識別下方二維碼進(jìn)群

          近期精彩內(nèi)容推薦:  

           華為正式宣布養(yǎng)豬,網(wǎng)友:支持華為自救!

           入職騰訊第九年,我辭職了!

           Windows藍(lán)屏為什么是藍(lán)底白字?

           955 互聯(lián)網(wǎng)公司白名單來了!




          在看點這里好文分享給更多人↓↓

          瀏覽 34
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲天堂三级片 | 五月天黄色电影网站 | 成人AV一区二区三区四区 | 精品视频天天在线免费 | 中文字幕日产乱码中 |