<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Dubbo 是如何控制并發(fā)數(shù)和限流的?

          共 5139字,需瀏覽 11分鐘

           ·

          2021-02-24 14:18

          點擊關(guān)注公眾號,Java干貨及時送達

          ExecuteLimitFilter

          ExecuteLimitFilter ,在服務(wù)提供者,通過 的 "executes" 統(tǒng)一配置項開啟:表示每服務(wù)的每方法最大可并行執(zhí)行請求數(shù)。

          ExecuteLimitFilter是通過信號量來實現(xiàn)的對服務(wù)端的并發(fā)數(shù)的控制。

          ExecuteLimitFilter執(zhí)行流程:

          1. 首先會去獲得服務(wù)提供者每服務(wù)每方法最大可并行執(zhí)行請求數(shù)
          2. 如果每服務(wù)每方法最大可并行執(zhí)行請求數(shù)大于零,那么就基于基于服務(wù) URL + 方法維度獲取一個RpcStatus實例
          3. 通過RpcStatus實例獲取一個信號量,若果獲取的這個信號量調(diào)用tryAcquire返回false,則拋出異常
          4. 如果沒有拋異常,那么久調(diào)用RpcStatus靜態(tài)方法beginCount,給這個 URL + 方法維度開始計數(shù)
          5. 調(diào)用服務(wù)
          6. 調(diào)用結(jié)束后計數(shù)調(diào)用RpcStatus靜態(tài)方法endCount,計數(shù)結(jié)束
          7. 釋放信號量

          ExecuteLimitFilter

          @Override
          public?Result?invoke(Invoker?invoker,?Invocation?invocation)?throws?RpcException?{
          ????URL?url?=?invoker.getUrl();
          ????String?methodName?=?invocation.getMethodName();
          ????Semaphore?executesLimit?=?null;
          ????boolean?acquireResult?=?false;
          ????int?max?=?url.getMethodParameter(methodName,?Constants.EXECUTES_KEY,?0);
          ????if?(max?>?0)?{
          ????????RpcStatus?count?=?RpcStatus.getStatus(url,?invocation.getMethodName());
          ????????//????????????if?(count.getActive()?>=?max)?{
          ????????/**
          ?????????????*?http://manzhizhen.iteye.com/blog/2386408
          ?????????????*?use?semaphore?for?concurrency?control?(to?limit?thread?number)
          ?????????????*/

          ????????executesLimit?=?count.getSemaphore(max);
          ????????if(executesLimit?!=?null?&&?!(acquireResult?=?executesLimit.tryAcquire()))?{
          ????????????throw?new?RpcException("Failed?to?invoke?method?"?+?invocation.getMethodName()?+?"?in?provider?"?+?url?+?",?cause:?The?service?using?threads?greater?than??+?max?+?"\"?/>?limited.");
          ????????}
          ????}
          ????long?begin?=?System.currentTimeMillis();
          ????boolean?isSuccess?=?true;
          ????RpcStatus.beginCount(url,?methodName);
          ????try?{
          ????????Result?result?=?invoker.invoke(invocation);
          ????????return?result;
          ????}?catch?(Throwable?t)?{
          ????????isSuccess?=?false;
          ????????if?(t?instanceof?RuntimeException)?{
          ????????????throw?(RuntimeException)?t;
          ????????}?else?{
          ????????????throw?new?RpcException("unexpected?exception?when?ExecuteLimitFilter",?t);
          ????????}
          ????}?finally?{
          ????????RpcStatus.endCount(url,?methodName,?System.currentTimeMillis()?-?begin,?isSuccess);
          ????????if(acquireResult)?{
          ????????????executesLimit.release();
          ????????}
          ????}
          }

          我們接下來看看RpcStatus這個類

          private?static?final?ConcurrentMap>?METHOD_STATISTICS?=?new?ConcurrentHashMap>();

          public?static?RpcStatus?getStatus(URL?url,?String?methodName)?{
          ????String?uri?=?url.toIdentityString();
          ????ConcurrentMap?map?=?METHOD_STATISTICS.get(uri);
          ????if?(map?==?null)?{
          ????????METHOD_STATISTICS.putIfAbsent(uri,?new?ConcurrentHashMap());
          ????????map?=?METHOD_STATISTICS.get(uri);
          ????}
          ????RpcStatus?status?=?map.get(methodName);
          ????if?(status?==?null)?{
          ????????map.putIfAbsent(methodName,?new?RpcStatus());
          ????????status?=?map.get(methodName);
          ????}
          ????return?status;
          }

          這個方法很簡單,大概就是給RpcStatus這個類里面的靜態(tài)屬性METHOD_STATISTICS里面設(shè)值。外層的map是以url為key,里層的map是以方法名為key。

          private?volatile?int?executesPermits;
          public?Semaphore?getSemaphore(int?maxThreadNum)?{
          ????if(maxThreadNum?<=?0)?{
          ????????return?null;
          ????}

          ????if?(executesLimit?==?null?||?executesPermits?!=?maxThreadNum)?{
          ????????synchronized?(this)?{
          ????????????if?(executesLimit?==?null?||?executesPermits?!=?maxThreadNum)?{
          ????????????????executesLimit?=?new?Semaphore(maxThreadNum);
          ????????????????executesPermits?=?maxThreadNum;
          ????????????}
          ????????}
          ????}

          ????return?executesLimit;
          }

          這個方法是獲取信號量,如果這個實例里面的信號量是空的,那么就添加一個,如果不是空的就返回。另外,關(guān)注公眾號Java技術(shù)棧,在后臺回復(fù):面試,可以獲取我整理的 Dubbo 系列面試題和答案。

          TPSLimiter

          TpsLimitFilter 過濾器,用于服務(wù)提供者中,提供限流的功能。

          配置方式:

          通過 配置項,添加到 中開啟,例如:
          dubbo:service?interface="com.alibaba.dubbo.demo.DemoService"?ref="demoServiceImpl"?protocol="injvm"?>
          "tps"?value="100"?/>

          通過 配置項,設(shè)置 TPS 周期。

          源碼分析

          TpsLimitFilter

          private?final?TPSLimiter?tpsLimiter?=?new?DefaultTPSLimiter();

          @Override
          public?Result?invoke(Invoker?invoker,?Invocation?invocation)?throws?RpcException?{

          ????if?(!tpsLimiter.isAllowable(invoker.getUrl(),?invocation))?{
          ????????throw?new?RpcException(
          ????????????"Failed?to?invoke?service?"?+
          ????????????invoker.getInterface().getName()?+
          ????????????"."?+
          ????????????invocation.getMethodName()?+
          ????????????"?because?exceed?max?service?tps.");
          ????}

          ????return?invoker.invoke(invocation);
          }

          invoke方法調(diào)用了DefaultTPSLimiter的isAllowable,我們進入到isAllowable方法看一下

          DefaultTPSLimiter

          private?final?ConcurrentMap?stats
          ????=?new?ConcurrentHashMap();
          @Override
          public?boolean?isAllowable(URL?url,?Invocation?invocation)?{
          ????//獲取tps這個參數(shù)設(shè)置的大小
          ????int?rate?=?url.getParameter(Constants.TPS_LIMIT_RATE_KEY,?-1);
          ????//獲取tps.interval這個參數(shù)設(shè)置的大小,默認60秒
          ????long?interval?=?url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
          ?????????????????????????????????????Constants.DEFAULT_TPS_LIMIT_INTERVAL);
          ????String?serviceKey?=?url.getServiceKey();
          ????if?(rate?>?0)?{
          ????????StatItem?statItem?=?stats.get(serviceKey);
          ????????if?(statItem?==?null)?{
          ????????????stats.putIfAbsent(serviceKey,
          ??????????????????????????????new?StatItem(serviceKey,?rate,?interval));
          ????????????statItem?=?stats.get(serviceKey);
          ????????}
          ????????return?statItem.isAllowable();
          ????}?else?{
          ????????StatItem?statItem?=?stats.get(serviceKey);
          ????????if?(statItem?!=?null)?{
          ????????????stats.remove(serviceKey);
          ????????}
          ????}

          ????return?true;
          }

          若要限流,調(diào)用 StatItem#isAllowable(url, invocation) 方法,根據(jù) TPS 限流規(guī)則判斷是否限制此次調(diào)用。

          StatItem

          private?long?lastResetTime;

          private?long?interval;

          private?AtomicInteger?token;

          private?int?rate;

          public?boolean?isAllowable()?{
          ????long?now?=?System.currentTimeMillis();
          ????//?若到達下一個周期,恢復(fù)可用種子數(shù),設(shè)置最后重置時間。
          ????if?(now?>?lastResetTime?+?interval)?{
          ????????token.set(rate);//?回復(fù)可用種子數(shù)
          ????????lastResetTime?=?now;//?最后重置時間
          ????}
          ????//?CAS?,直到或得到一個種子,或者沒有足夠種子
          ????int?value?=?token.get();
          ????boolean?flag?=?false;
          ????while?(value?>?0?&&?!flag)?{
          ????????flag?=?token.compareAndSet(value,?value?-?1);
          ????????value?=?token.get();
          ????}

          ????return?flag;
          }

          關(guān)注公眾號Java技術(shù)棧,在后臺回復(fù):面試,可以獲取我整理的 Dubbo 系列面試題和答案。

          作者:luozhiyun
          出處:www.cnblogs.com/luozhiyun/p/10960593.html






          關(guān)注Java技術(shù)棧看更多干貨



          戳原文,獲取精選面試題!
          瀏覽 53
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  夜夜操夜夜爽 | 中文无码在线观看中文字幕av中文 | 日韩成人在线电影 | 成人一级黄色电影 | 精品白浆 |