<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          dubbo是如何控制并發(fā)數(shù)和限流的?

          共 5542字,需瀏覽 12分鐘

           ·

          2020-12-18 17:51

          點(diǎn)擊上方「藍(lán)字」關(guān)注我們

          0x01: ExecuteLimitFilter

          ExecuteLimitFilter ,在服務(wù)提供者,通過 的 "executes" 統(tǒng)一配置項(xiàng)開啟:
          表示每服務(wù)的每方法最大可并行執(zhí)行請求數(shù)。

          ExecuteLimitFilter是通過信號(hào)量來實(shí)現(xiàn)的對服務(wù)端的并發(fā)數(shù)的控制。

          ExecuteLimitFilter執(zhí)行流程:

          1. 首先會(huì)去獲得服務(wù)提供者每服務(wù)每方法最大可并行執(zhí)行請求數(shù)

          2. 如果每服務(wù)每方法最大可并行執(zhí)行請求數(shù)大于零,那么就基于基于服務(wù) URL + 方法維度獲取一個(gè)RpcStatus實(shí)例

          3. 通過RpcStatus實(shí)例獲取一個(gè)信號(hào)量,若果獲取的這個(gè)信號(hào)量調(diào)用tryAcquire返回false,則拋出異常

          4. 如果沒有拋異常,那么調(diào)用RpcStatus靜態(tài)方法beginCount,給這個(gè) URL + 方法維度開始計(jì)數(shù)

          5. 調(diào)用服務(wù)

          6. 調(diào)用結(jié)束后計(jì)數(shù)調(diào)用RpcStatus靜態(tài)方法endCount,計(jì)數(shù)結(jié)束

          7. 釋放信號(hào)量

          ExecuteLimitFilter

          @Override
          ????public?Result?invoke(Invoker?invoker,?Invocation?invocation)?throws?RpcException?{
          ????????URL?url?=?invoker.getUrl();
          ????????String?methodName?=?invocation.getMethodName();
          ????????Semaphore?executesLimit?=?null;
          ????????boolean?acquireResult?=?false;
          ????????int?max?=?url.getMethodParameter(methodName,?Constants.EXECUTES_KEY,?0);
          ????????if?(max?>?0)?{
          ????????????RpcStatus?count?=?RpcStatus.getStatus(url,?invocation.getMethodName());
          //????????????if?(count.getActive()?>=?max)?{
          ????????????/**
          ?????????????*?http://manzhizhen.iteye.com/blog/2386408
          ?????????????*?use?semaphore?for?concurrency?control?(to?limit?thread?number)
          ?????????????*/

          ????????????executesLimit?=?count.getSemaphore(max);
          ????????????if(executesLimit?!=?null?&&?!(acquireResult?=?executesLimit.tryAcquire()))?{
          ????????????????throw?new?RpcException("Failed?to?invoke?method?"?+?invocation.getMethodName()?+?"?in?provider?"?+?url?+?",?cause:?The?service?using?threads?greater?than??+?max?+?"\"?/>?limited.");
          ????????????}
          ????????}
          ????????long?begin?=?System.currentTimeMillis();
          ????????boolean?isSuccess?=?true;
          ????????RpcStatus.beginCount(url,?methodName);
          ????????try?{
          ????????????Result?result?=?invoker.invoke(invocation);
          ????????????return?result;
          ????????}?catch?(Throwable?t)?{
          ????????????isSuccess?=?false;
          ????????????if?(t?instanceof?RuntimeException)?{
          ????????????????throw?(RuntimeException)?t;
          ????????????}?else?{
          ????????????????throw?new?RpcException("unexpected?exception?when?ExecuteLimitFilter",?t);
          ????????????}
          ????????}?finally?{
          ????????????RpcStatus.endCount(url,?methodName,?System.currentTimeMillis()?-?begin,?isSuccess);
          ????????????if(acquireResult)?{
          ????????????????executesLimit.release();
          ????????????}
          ????????}
          ????}

          我們接下來看看RpcStatus這個(gè)類

          ?private?static?final?ConcurrentMap<String,?ConcurrentMap<String,?RpcStatus>>?METHOD_STATISTICS?=?new?ConcurrentHashMap<String,?ConcurrentMap<String,?RpcStatus>>();

          ????public?static?RpcStatus?getStatus(URL?url,?String?methodName)?{
          ????????String?uri?=?url.toIdentityString();
          ????????ConcurrentMap<String,?RpcStatus>?map?=?METHOD_STATISTICS.get(uri);
          ????????if?(map?==?null)?{
          ????????????METHOD_STATISTICS.putIfAbsent(uri,?new?ConcurrentHashMap<String,?RpcStatus>());
          ????????????map?=?METHOD_STATISTICS.get(uri);
          ????????}
          ????????RpcStatus?status?=?map.get(methodName);
          ????????if?(status?==?null)?{
          ????????????map.putIfAbsent(methodName,?new?RpcStatus());
          ????????????status?=?map.get(methodName);
          ????????}
          ????????return?status;
          ????}

          這個(gè)方法很簡單,大概就是給RpcStatus這個(gè)類里面的靜態(tài)屬性METHOD_STATISTICS里面設(shè)值。外層的map是以url為key,里層的map是以方法名為key。

          ?private?volatile?int?executesPermits;
          ????public?Semaphore?getSemaphore(int?maxThreadNum)?{
          ????????if(maxThreadNum?<=?0)?{
          ????????????return?null;
          ????????}

          ????????if?(executesLimit?==?null?||?executesPermits?!=?maxThreadNum)?{
          ????????????synchronized?(this)?{
          ????????????????if?(executesLimit?==?null?||?executesPermits?!=?maxThreadNum)?{
          ????????????????????executesLimit?=?new?Semaphore(maxThreadNum);
          ????????????????????executesPermits?=?maxThreadNum;
          ????????????????}
          ????????????}
          ????????}

          ????????return?executesLimit;
          ????}

          這個(gè)方法是獲取信號(hào)量,如果這個(gè)實(shí)例里面的信號(hào)量是空的,那么就添加一個(gè),如果不是空的就返回。


          0x02: TPSLimiter

          TpsLimitFilter 過濾器,用于服務(wù)提供者中,提供限流的功能。

          配置方式:

          • 通過 配置項(xiàng),添加到 中開啟,例如:

          dubbo:service?interface="com.alibaba.dubbo.demo.DemoService"?ref="demoServiceImpl"?protocol="injvm"?>
          ????"tps"
          ?value="100"?/>

          • 通過 配置項(xiàng),設(shè)置 TPS 周期。

          源碼分析

          TpsLimitFilter

          ?private?final?TPSLimiter?tpsLimiter?=?new?DefaultTPSLimiter();

          ????@Override
          ????public?Result?invoke(Invoker?invoker,?Invocation?invocation)?throws?RpcException?{

          ????????if?(!tpsLimiter.isAllowable(invoker.getUrl(),?invocation))?{
          ????????????throw?new?RpcException(
          ????????????????????"Failed?to?invoke?service?"?+
          ????????????????????????????invoker.getInterface().getName()?+
          ????????????????????????????"."?+
          ????????????????????????????invocation.getMethodName()?+
          ????????????????????????????"?because?exceed?max?service?tps.");
          ????????}

          ????????return?invoker.invoke(invocation);
          ????}

          invoke方法調(diào)用了DefaultTPSLimiter的isAllowable,我們進(jìn)入到isAllowable方法看一下

          DefaultTPSLimiter

          private?final?ConcurrentMap?stats
          ????????????=?new?ConcurrentHashMap();
          ????@Override
          ????public?boolean?isAllowable(URL?url,?Invocation?invocation)?
          {
          ????????//獲取tps這個(gè)參數(shù)設(shè)置的大小
          ????????int?rate?=?url.getParameter(Constants.TPS_LIMIT_RATE_KEY,?-1);
          ????????//獲取tps.interval這個(gè)參數(shù)設(shè)置的大小,默認(rèn)60秒
          ????????long?interval?=?url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
          ????????????????Constants.DEFAULT_TPS_LIMIT_INTERVAL);
          ????????String?serviceKey?=?url.getServiceKey();
          ????????if?(rate?>?0)?{
          ????????????StatItem?statItem?=?stats.get(serviceKey);
          ????????????if?(statItem?==?null)?{
          ????????????????stats.putIfAbsent(serviceKey,
          ????????????????????????new?StatItem(serviceKey,?rate,?interval));
          ????????????????statItem?=?stats.get(serviceKey);
          ????????????}
          ????????????return?statItem.isAllowable();
          ????????}?else?{
          ????????????StatItem?statItem?=?stats.get(serviceKey);
          ????????????if?(statItem?!=?null)?{
          ????????????????stats.remove(serviceKey);
          ????????????}
          ????????}

          ????????return?true;
          ????}

          若要限流,調(diào)用 StatItem#isAllowable(url, invocation) 方法,根據(jù) TPS 限流規(guī)則判斷是否限制此次調(diào)用。

          StatItem

          ??private?long?lastResetTime;

          ????private?long?interval;

          ????private?AtomicInteger?token;

          ????private?int?rate;

          ????public?boolean?isAllowable()?{
          ????????long?now?=?System.currentTimeMillis();
          ?????????//?若到達(dá)下一個(gè)周期,恢復(fù)可用種子數(shù),設(shè)置最后重置時(shí)間。
          ????????if?(now?>?lastResetTime?+?interval)?{
          ????????????token.set(rate);//?回復(fù)可用種子數(shù)
          ????????????lastResetTime?=?now;//?最后重置時(shí)間
          ????????}
          ????????//?CAS?,直到或得到一個(gè)種子,或者沒有足夠種子
          ????????int?value?=?token.get();
          ????????boolean?flag?=?false;
          ????????while?(value?>?0?&&?!flag)?{
          ????????????flag?=?token.compareAndSet(value,?value?-?1);
          ????????????value?=?token.get();
          ????????}

          ????????return?flag;
          ????}


          source:?https://www.cnblogs.com/luozhiyun/p/10960593.html


          掃碼二維碼

          獲取更多精彩

          Java樂園

          有用!分享+在看?


          瀏覽 25
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产在线精品视频豆花 | 豆花视频成人 | 91精品在线播放 | 天天躁日日躁狠狠躁欧美男男 | 日韩又大又粗精品 |