Dubbo 是如何控制并發(fā)數(shù)和限流的?
點擊關(guān)注公眾號,Java干貨及時送達
ExecuteLimitFilter
ExecuteLimitFilter ,在服務(wù)提供者,通過
ExecuteLimitFilter是通過信號量來實現(xiàn)的對服務(wù)端的并發(fā)數(shù)的控制。
ExecuteLimitFilter執(zhí)行流程:
首先會去獲得服務(wù)提供者每服務(wù)每方法最大可并行執(zhí)行請求數(shù) 如果每服務(wù)每方法最大可并行執(zhí)行請求數(shù)大于零,那么就基于基于服務(wù) URL + 方法維度獲取一個RpcStatus實例 通過RpcStatus實例獲取一個信號量,若果獲取的這個信號量調(diào)用tryAcquire返回false,則拋出異常 如果沒有拋異常,那么久調(diào)用RpcStatus靜態(tài)方法beginCount,給這個 URL + 方法維度開始計數(shù) 調(diào)用服務(wù) 調(diào)用結(jié)束后計數(shù)調(diào)用RpcStatus靜態(tài)方法endCount,計數(shù)結(jié)束 釋放信號量
ExecuteLimitFilter
@Override
public?Result?invoke(Invoker>?invoker,?Invocation?invocation)?throws?RpcException?{
????URL?url?=?invoker.getUrl();
????String?methodName?=?invocation.getMethodName();
????Semaphore?executesLimit?=?null;
????boolean?acquireResult?=?false;
????int?max?=?url.getMethodParameter(methodName,?Constants.EXECUTES_KEY,?0);
????if?(max?>?0)?{
????????RpcStatus?count?=?RpcStatus.getStatus(url,?invocation.getMethodName());
????????//????????????if?(count.getActive()?>=?max)?{
????????/**
?????????????*?http://manzhizhen.iteye.com/blog/2386408
?????????????*?use?semaphore?for?concurrency?control?(to?limit?thread?number)
?????????????*/
????????executesLimit?=?count.getSemaphore(max);
????????if(executesLimit?!=?null?&&?!(acquireResult?=?executesLimit.tryAcquire()))?{
????????????throw?new?RpcException("Failed?to?invoke?method?"?+?invocation.getMethodName()?+?"?in?provider?"?+?url?+?",?cause:?The?service?using?threads?greater?than??+?max?+?"\"?/>?limited.");
????????}
????}
????long?begin?=?System.currentTimeMillis();
????boolean?isSuccess?=?true;
????RpcStatus.beginCount(url,?methodName);
????try?{
????????Result?result?=?invoker.invoke(invocation);
????????return?result;
????}?catch?(Throwable?t)?{
????????isSuccess?=?false;
????????if?(t?instanceof?RuntimeException)?{
????????????throw?(RuntimeException)?t;
????????}?else?{
????????????throw?new?RpcException("unexpected?exception?when?ExecuteLimitFilter",?t);
????????}
????}?finally?{
????????RpcStatus.endCount(url,?methodName,?System.currentTimeMillis()?-?begin,?isSuccess);
????????if(acquireResult)?{
????????????executesLimit.release();
????????}
????}
}
我們接下來看看RpcStatus這個類
private?static?final?ConcurrentMap>?METHOD_STATISTICS?=?new?ConcurrentHashMap>();
public?static?RpcStatus?getStatus(URL?url,?String?methodName)?{
????String?uri?=?url.toIdentityString();
????ConcurrentMap?map?=?METHOD_STATISTICS.get(uri);
????if?(map?==?null)?{
????????METHOD_STATISTICS.putIfAbsent(uri,?new?ConcurrentHashMap());
????????map?=?METHOD_STATISTICS.get(uri);
????}
????RpcStatus?status?=?map.get(methodName);
????if?(status?==?null)?{
????????map.putIfAbsent(methodName,?new?RpcStatus());
????????status?=?map.get(methodName);
????}
????return?status;
}
這個方法很簡單,大概就是給RpcStatus這個類里面的靜態(tài)屬性METHOD_STATISTICS里面設(shè)值。外層的map是以url為key,里層的map是以方法名為key。
private?volatile?int?executesPermits;
public?Semaphore?getSemaphore(int?maxThreadNum)?{
????if(maxThreadNum?<=?0)?{
????????return?null;
????}
????if?(executesLimit?==?null?||?executesPermits?!=?maxThreadNum)?{
????????synchronized?(this)?{
????????????if?(executesLimit?==?null?||?executesPermits?!=?maxThreadNum)?{
????????????????executesLimit?=?new?Semaphore(maxThreadNum);
????????????????executesPermits?=?maxThreadNum;
????????????}
????????}
????}
????return?executesLimit;
}
這個方法是獲取信號量,如果這個實例里面的信號量是空的,那么就添加一個,如果不是空的就返回。另外,關(guān)注公眾號Java技術(shù)棧,在后臺回復(fù):面試,可以獲取我整理的 Dubbo 系列面試題和答案。
TPSLimiter
TpsLimitFilter 過濾器,用于服務(wù)提供者中,提供限流的功能。
配置方式:
dubbo:service?interface="com.alibaba.dubbo.demo.DemoService"?ref="demoServiceImpl"?protocol="injvm"?>
"tps"?value="100"?/>
源碼分析
TpsLimitFilter
private?final?TPSLimiter?tpsLimiter?=?new?DefaultTPSLimiter();
@Override
public?Result?invoke(Invoker>?invoker,?Invocation?invocation)?throws?RpcException?{
????if?(!tpsLimiter.isAllowable(invoker.getUrl(),?invocation))?{
????????throw?new?RpcException(
????????????"Failed?to?invoke?service?"?+
????????????invoker.getInterface().getName()?+
????????????"."?+
????????????invocation.getMethodName()?+
????????????"?because?exceed?max?service?tps.");
????}
????return?invoker.invoke(invocation);
}
invoke方法調(diào)用了DefaultTPSLimiter的isAllowable,我們進入到isAllowable方法看一下
DefaultTPSLimiter
private?final?ConcurrentMap?stats
????=?new?ConcurrentHashMap();
@Override
public?boolean?isAllowable(URL?url,?Invocation?invocation)?{
????//獲取tps這個參數(shù)設(shè)置的大小
????int?rate?=?url.getParameter(Constants.TPS_LIMIT_RATE_KEY,?-1);
????//獲取tps.interval這個參數(shù)設(shè)置的大小,默認60秒
????long?interval?=?url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
?????????????????????????????????????Constants.DEFAULT_TPS_LIMIT_INTERVAL);
????String?serviceKey?=?url.getServiceKey();
????if?(rate?>?0)?{
????????StatItem?statItem?=?stats.get(serviceKey);
????????if?(statItem?==?null)?{
????????????stats.putIfAbsent(serviceKey,
??????????????????????????????new?StatItem(serviceKey,?rate,?interval));
????????????statItem?=?stats.get(serviceKey);
????????}
????????return?statItem.isAllowable();
????}?else?{
????????StatItem?statItem?=?stats.get(serviceKey);
????????if?(statItem?!=?null)?{
????????????stats.remove(serviceKey);
????????}
????}
????return?true;
}
若要限流,調(diào)用 StatItem#isAllowable(url, invocation) 方法,根據(jù) TPS 限流規(guī)則判斷是否限制此次調(diào)用。
StatItem
private?long?lastResetTime;
private?long?interval;
private?AtomicInteger?token;
private?int?rate;
public?boolean?isAllowable()?{
????long?now?=?System.currentTimeMillis();
????//?若到達下一個周期,恢復(fù)可用種子數(shù),設(shè)置最后重置時間。
????if?(now?>?lastResetTime?+?interval)?{
????????token.set(rate);//?回復(fù)可用種子數(shù)
????????lastResetTime?=?now;//?最后重置時間
????}
????//?CAS?,直到或得到一個種子,或者沒有足夠種子
????int?value?=?token.get();
????boolean?flag?=?false;
????while?(value?>?0?&&?!flag)?{
????????flag?=?token.compareAndSet(value,?value?-?1);
????????value?=?token.get();
????}
????return?flag;
}
關(guān)注公眾號Java技術(shù)棧,在后臺回復(fù):面試,可以獲取我整理的 Dubbo 系列面試題和答案。
作者:luozhiyun
出處:www.cnblogs.com/luozhiyun/p/10960593.html






關(guān)注Java技術(shù)棧看更多干貨


