Dubbo 是如何控制并發(fā)數(shù)和限流的?
ExecuteLimitFilter
ExecuteLimitFilter是通過信號(hào)量來實(shí)現(xiàn)的對(duì)服務(wù)端的并發(fā)數(shù)的控制。
ExecuteLimitFilter執(zhí)行流程:
首先會(huì)去獲得服務(wù)提供者每服務(wù)每方法最大可并行執(zhí)行請(qǐng)求數(shù) 如果每服務(wù)每方法最大可并行執(zhí)行請(qǐng)求數(shù)大于零,那么就基于基于服務(wù) URL + 方法維度獲取一個(gè)RpcStatus實(shí)例 通過RpcStatus實(shí)例獲取一個(gè)信號(hào)量,若果獲取的這個(gè)信號(hào)量調(diào)用tryAcquire返回false,則拋出異常 如果沒有拋異常,那么久調(diào)用RpcStatus靜態(tài)方法beginCount,給這個(gè) URL + 方法維度開始計(jì)數(shù) 調(diào)用服務(wù) 調(diào)用結(jié)束后計(jì)數(shù)調(diào)用RpcStatus靜態(tài)方法endCount,計(jì)數(shù)結(jié)束 釋放信號(hào)量
ExecuteLimitFilter
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
Semaphore executesLimit = null;
boolean acquireResult = false;
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
if (max > 0) {
RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
// if (count.getActive() >= max) {
/**
* http://manzhizhen.iteye.com/blog/2386408
* use semaphore for concurrency control (to limit thread number)
*/
executesLimit = count.getSemaphore(max);
if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
}
}
long begin = System.currentTimeMillis();
boolean isSuccess = true;
RpcStatus.beginCount(url, methodName);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (Throwable t) {
isSuccess = false;
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
} finally {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
if(acquireResult) {
executesLimit.release();
}
}
}
我們接下來看看RpcStatus這個(gè)類
private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
public static RpcStatus getStatus(URL url, String methodName) {
String uri = url.toIdentityString();
ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
if (map == null) {
METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
map = METHOD_STATISTICS.get(uri);
}
RpcStatus status = map.get(methodName);
if (status == null) {
map.putIfAbsent(methodName, new RpcStatus());
status = map.get(methodName);
}
return status;
}
這個(gè)方法很簡單,大概就是給RpcStatus這個(gè)類里面的靜態(tài)屬性METHOD_STATISTICS里面設(shè)值。外層的map是以u(píng)rl為key,里層的map是以方法名為key。
private volatile int executesPermits;
public Semaphore getSemaphore(int maxThreadNum) {
if(maxThreadNum <= 0) {
return null;
}
if (executesLimit == null || executesPermits != maxThreadNum) {
synchronized (this) {
if (executesLimit == null || executesPermits != maxThreadNum) {
executesLimit = new Semaphore(maxThreadNum);
executesPermits = maxThreadNum;
}
}
}
return executesLimit;
}
這個(gè)方法是獲取信號(hào)量,如果這個(gè)實(shí)例里面的信號(hào)量是空的,那么就添加一個(gè),如果不是空的就返回。
TPSLimiter
TpsLimitFilter 過濾器,用于服務(wù)提供者中,提供限流的功能。
配置方式:
dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceImpl" protocol="injvm" >
<dubbo:parameter key="tps" value="100" />
</dubbo:service>
源碼分析
TpsLimitFilter
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
throw new RpcException(
"Failed to invoke service " +
invoker.getInterface().getName() +
"." +
invocation.getMethodName() +
" because exceed max service tps.");
}
return invoker.invoke(invocation);
}
invoke方法調(diào)用了DefaultTPSLimiter的isAllowable,我們進(jìn)入到isAllowable方法看一下
DefaultTPSLimiter
private final ConcurrentMap<String, StatItem> stats
= new ConcurrentHashMap<String, StatItem>();
@Override
public boolean isAllowable(URL url, Invocation invocation) {
//獲取tps這個(gè)參數(shù)設(shè)置的大小
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
//獲取tps.interval這個(gè)參數(shù)設(shè)置的大小,默認(rèn)60秒
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
Constants.DEFAULT_TPS_LIMIT_INTERVAL);
String serviceKey = url.getServiceKey();
if (rate > 0) {
StatItem statItem = stats.get(serviceKey);
if (statItem == null) {
stats.putIfAbsent(serviceKey,
new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
return statItem.isAllowable();
} else {
StatItem statItem = stats.get(serviceKey);
if (statItem != null) {
stats.remove(serviceKey);
}
}
return true;
}
若要限流,調(diào)用 StatItem#isAllowable(url, invocation) 方法,根據(jù) TPS 限流規(guī)則判斷是否限制此次調(diào)用。
StatItem
private long lastResetTime;
private long interval;
private AtomicInteger token;
private int rate;
public boolean isAllowable() {
long now = System.currentTimeMillis();
// 若到達(dá)下一個(gè)周期,恢復(fù)可用種子數(shù),設(shè)置最后重置時(shí)間。
if (now > lastResetTime + interval) {
token.set(rate);// 回復(fù)可用種子數(shù)
lastResetTime = now;// 最后重置時(shí)間
}
// CAS ,直到或得到一個(gè)種子,或者沒有足夠種子
int value = token.get();
boolean flag = false;
while (value > 0 && !flag) {
flag = token.compareAndSet(value, value - 1);
value = token.get();
}
return flag;
}
作者:luozhiyun
出處:www.cnblogs.com/luozhiyun/p/10960593.html

往期資源 需要請(qǐng)自取
喜歡就"在看"唄^_^
評(píng)論
圖片
表情
