<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Dubbo 是如何控制并發(fā)數(shù)和限流的?

          共 12056字,需瀏覽 25分鐘

           ·

          2021-05-14 12:14

          ExecuteLimitFilter

          ExecuteLimitFilter ,在服務(wù)提供者,通過 <dubbo:service /> 的 "executes" 統(tǒng)一配置項(xiàng)開啟:表示每服務(wù)的每方法最大可并行執(zhí)行請(qǐng)求數(shù)。

          ExecuteLimitFilter是通過信號(hào)量來實(shí)現(xiàn)的對(duì)服務(wù)端的并發(fā)數(shù)的控制。

          ExecuteLimitFilter執(zhí)行流程:

          1. 首先會(huì)去獲得服務(wù)提供者每服務(wù)每方法最大可并行執(zhí)行請(qǐng)求數(shù)
          2. 如果每服務(wù)每方法最大可并行執(zhí)行請(qǐng)求數(shù)大于零,那么就基于基于服務(wù) URL + 方法維度獲取一個(gè)RpcStatus實(shí)例
          3. 通過RpcStatus實(shí)例獲取一個(gè)信號(hào)量,若果獲取的這個(gè)信號(hào)量調(diào)用tryAcquire返回false,則拋出異常
          4. 如果沒有拋異常,那么久調(diào)用RpcStatus靜態(tài)方法beginCount,給這個(gè) URL + 方法維度開始計(jì)數(shù)
          5. 調(diào)用服務(wù)
          6. 調(diào)用結(jié)束后計(jì)數(shù)調(diào)用RpcStatus靜態(tài)方法endCount,計(jì)數(shù)結(jié)束
          7. 釋放信號(hào)量

          ExecuteLimitFilter

          @Override
          public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
              URL url = invoker.getUrl();
              String methodName = invocation.getMethodName();
              Semaphore executesLimit = null;
              boolean acquireResult = false;
              int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
              if (max > 0) {
                  RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
                  //            if (count.getActive() >= max) {
                  /**
                       * http://manzhizhen.iteye.com/blog/2386408
                       * use semaphore for concurrency control (to limit thread number)
                       */

                  executesLimit = count.getSemaphore(max);
                  if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                      throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
                  }
              }
              long begin = System.currentTimeMillis();
              boolean isSuccess = true;
              RpcStatus.beginCount(url, methodName);
              try {
                  Result result = invoker.invoke(invocation);
                  return result;
              } catch (Throwable t) {
                  isSuccess = false;
                  if (t instanceof RuntimeException) {
                      throw (RuntimeException) t;
                  } else {
                      throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
                  }
              } finally {
                  RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
                  if(acquireResult) {
                      executesLimit.release();
                  }
              }
          }

          我們接下來看看RpcStatus這個(gè)類

          private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();

          public static RpcStatus getStatus(URL url, String methodName) {
              String uri = url.toIdentityString();
              ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
              if (map == null) {
                  METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
                  map = METHOD_STATISTICS.get(uri);
              }
              RpcStatus status = map.get(methodName);
              if (status == null) {
                  map.putIfAbsent(methodName, new RpcStatus());
                  status = map.get(methodName);
              }
              return status;
          }

          這個(gè)方法很簡單,大概就是給RpcStatus這個(gè)類里面的靜態(tài)屬性METHOD_STATISTICS里面設(shè)值。外層的map是以u(píng)rl為key,里層的map是以方法名為key。

          private volatile int executesPermits;
          public Semaphore getSemaphore(int maxThreadNum) {
              if(maxThreadNum <= 0) {
                  return null;
              }

              if (executesLimit == null || executesPermits != maxThreadNum) {
                  synchronized (this) {
                      if (executesLimit == null || executesPermits != maxThreadNum) {
                          executesLimit = new Semaphore(maxThreadNum);
                          executesPermits = maxThreadNum;
                      }
                  }
              }

              return executesLimit;
          }

          這個(gè)方法是獲取信號(hào)量,如果這個(gè)實(shí)例里面的信號(hào)量是空的,那么就添加一個(gè),如果不是空的就返回。

          TPSLimiter

          TpsLimitFilter 過濾器,用于服務(wù)提供者中,提供限流的功能。

          配置方式:

          通過 <dubbo:parameter key="tps" value="" /> 配置項(xiàng),添加到 <dubbo:service /> 或 <dubbo:provider /> 或 <dubbo:protocol /> 中開啟,例如:
          dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceImpl" protocol="injvm" >
          <dubbo:parameter key="tps" value="100" />
          </dubbo:service>
          通過 <dubbo:parameter key="tps.interval" value="" /> 配置項(xiàng),設(shè)置 TPS 周期。

          源碼分析

          TpsLimitFilter

          private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

          @Override
          public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

              if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
                  throw new RpcException(
                      "Failed to invoke service " +
                      invoker.getInterface().getName() +
                      "." +
                      invocation.getMethodName() +
                      " because exceed max service tps.");
              }

              return invoker.invoke(invocation);
          }

          invoke方法調(diào)用了DefaultTPSLimiter的isAllowable,我們進(jìn)入到isAllowable方法看一下

          DefaultTPSLimiter

          private final ConcurrentMap<String, StatItem> stats
              = new ConcurrentHashMap<String, StatItem>();
          @Override
          public boolean isAllowable(URL url, Invocation invocation) {
              //獲取tps這個(gè)參數(shù)設(shè)置的大小
              int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
              //獲取tps.interval這個(gè)參數(shù)設(shè)置的大小,默認(rèn)60秒
              long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
                                               Constants.DEFAULT_TPS_LIMIT_INTERVAL);
              String serviceKey = url.getServiceKey();
              if (rate > 0) {
                  StatItem statItem = stats.get(serviceKey);
                  if (statItem == null) {
                      stats.putIfAbsent(serviceKey,
                                        new StatItem(serviceKey, rate, interval));
                      statItem = stats.get(serviceKey);
                  }
                  return statItem.isAllowable();
              } else {
                  StatItem statItem = stats.get(serviceKey);
                  if (statItem != null) {
                      stats.remove(serviceKey);
                  }
              }

              return true;
          }

          若要限流,調(diào)用 StatItem#isAllowable(url, invocation) 方法,根據(jù) TPS 限流規(guī)則判斷是否限制此次調(diào)用。

          StatItem

          private long lastResetTime;

          private long interval;

          private AtomicInteger token;

          private int rate;

          public boolean isAllowable() {
              long now = System.currentTimeMillis();
              // 若到達(dá)下一個(gè)周期,恢復(fù)可用種子數(shù),設(shè)置最后重置時(shí)間。
              if (now > lastResetTime + interval) {
                  token.set(rate);// 回復(fù)可用種子數(shù)
                  lastResetTime = now;// 最后重置時(shí)間
              }
              // CAS ,直到或得到一個(gè)種子,或者沒有足夠種子
              int value = token.get();
              boolean flag = false;
              while (value > 0 && !flag) {
                  flag = token.compareAndSet(value, value - 1);
                  value = token.get();
              }

              return flag;
          }

          作者:luozhiyun

          出處:www.cnblogs.com/luozhiyun/p/10960593.html

          往期資源  需要請(qǐng)自取

          Java項(xiàng)目分享 最新整理全集,找項(xiàng)目不累啦 03版

          臥槽!字節(jié)跳動(dòng)《算法中文手冊(cè)》火了,完整版 PDF 開放下載!

          字節(jié)跳動(dòng)總結(jié)的設(shè)計(jì)模式 PDF 火了,完整版開放下載!

          堪稱神級(jí)的Spring Boot手冊(cè),從基礎(chǔ)入門到實(shí)戰(zhàn)進(jìn)階

          臥槽!阿里大佬總結(jié)的《圖解Java》火了,完整版PDF開放下載!

          喜歡就"在看"唄^_^

          瀏覽 45
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美性爱日韩 | www.papapa | 天堂免费在线视频 | 国产高潮久久 | 豆花视频一区二区三区入口 |