Semaphore 信號量限流,這東西真管用嗎?
最近參與公司的服務治理項目,主要目的是為了保證生產(chǎn)服務高可用性,以及高穩(wěn)定性。
為了更好的參與的這個項目,這段時間一直在充電學習這方面的相關知識,包括限流,熔斷,服務降級等等。
那在學習限流的時候,看到網(wǎng)上很多文章中直接使用了JDK 中 Semaphore 實現(xiàn)了限流器。
雖然到達的限流的目的,但是實際上其還是存在很大缺陷。

那你如果沒有經(jīng)過完整測試,直接將這套限流方式照搬過來,發(fā)到了生產(chǎn)環(huán)境,那就等著背這口大鍋吧。
好了,今天我們主要來聊聊 Semaphore ,文章主要內(nèi)容如下圖所示:

semaphore
限流的方式有很多,從類型上分類,一般可以分為兩種:
并發(fā)數(shù)限流 QPS 限流
并發(fā)數(shù)限流就是限制同一時刻的最大并發(fā)請求數(shù),而 QPS 限流指的是限制一段時間內(nèi)請求數(shù)。
那我們今天的講的 semaphore 限流其實屬于第一類,通過限制并發(fā)數(shù),到達限流的目的。
semaphore中文翻譯為信號量,它其實是并發(fā)領域中一個重要編程模型,幾乎所有支持并發(fā)編程的語言都支持信號量這個機制。
JDK 并發(fā)包下 Semaphore 類就是信號量的實現(xiàn)類,它的模型比較簡單,如下圖所示:

Semaphore 內(nèi)部有一個計數(shù)器,我們使用的時候,需要提前初始化。
初始化之后,我們就可以調用 acquire方法,獲取信號量,這時計數(shù)器將會減 1。如果此時計數(shù)器值小于 0,則會將當前線程阻塞,并且加入到等待隊列,否則當前線程繼續(xù)執(zhí)行。
執(zhí)行結束之后,調用 release方法,釋放信號量,計數(shù)器將會加 1。那如果此時計數(shù)器值的小于或等于0,則會喚醒的等待隊列一個線程,然后將其移出隊列。
并發(fā)流量通過 Semaphore進行限流,只有拿到信號量才能繼續(xù)執(zhí)行,保證后端資源訪問數(shù)總是在安全范圍。

Semaphore 限流
Semaphore 限流常見使用方式
了解完 Semaphore 基本原理之后,我們就來實現(xiàn)一個限流器。
public class ConcurrencyLimit {
private Semaphore semaphore;
private ConcurrencyLimit() {
}
public static ConcurrencyLimit create(int permits) {
ConcurrencyLimit concurrencyLimit = new ConcurrencyLimit();
concurrencyLimit.semaphore = new Semaphore(permits);
return concurrencyLimit;
}
public void acquire() throws InterruptedException {
this.semaphore.acquire();
}
public void release() {
this.semaphore.release();
}
public boolean tryAcquire() {
return this.semaphore.tryAcquire();
}
}
限流器底層直接使用 Semaphore,我們寫個例子實際測試一下:
ConcurrencyLimit limit = ConcurrencyLimit.create(5);
ExecutorService executorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("limit-%d")
.build());
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
try {
limit.acquire();
System.out.println(Thread.currentThread().getName() + " START");
// 模擬內(nèi)部耗時
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(500));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + " END");
limit.release();
}
});
}
碎碎念:這里要注意了,
Semaphore的acquire方法與release方法,一定要成對出現(xiàn)。如果調用
acquire,最后別忘了調用release,可能會導致程序發(fā)生假死等詭異的情況。
輸出結果如下:

可以看到,同一時刻,最多只有 5 個線程開始執(zhí)行任務,起到限流了目的。
其實隨便搜下 Semaphore限流,可以看到實現(xiàn)方式跟上面差不多。
那這上面的限流實現(xiàn)真的沒問題嗎?
「可以說有,也可以說沒有,這主要還是要看限流器使用場景?!?/strong>
Semaphore 限流缺陷
如果我們換一個場景,將這個限流器用在一個 Web 服務,我們來看下高并發(fā)情況下會有什么問題。
@Slf4j
@Component
public class LimitInterceptor extends HandlerInterceptorAdapter {
ConcurrencyLimit concurrencyLimit;
public LimitInterceptor() {
this.concurrencyLimit = ConcurrencyLimit.create(10);
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
concurrencyLimit.acquire();
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
concurrencyLimit.release();
}
}
如上代碼所示,我們在 SpringMVC 的攔截器中使用限流器。
任一請求執(zhí)行的時候,首先將會經(jīng)過 Interceptor攔截器中 preHandle 方法,在這里面我們調用獲取信號量方法。
當請求邏輯完成之后,內(nèi)部將會調用攔截器的 afterCompletion,我們在這里釋放信號量。
在服務請求內(nèi),休眠 100ms,模擬內(nèi)部接口耗時。

下面使用壓測神器 「jmeter」 同時發(fā)起 500 個并發(fā)請求,模擬高并發(fā)的情況。

壓測結果如下圖所示:

從報表數(shù)據(jù)可以看到,雖然我們內(nèi)部耗時僅僅只有 100 毫秒,但是接口平均請求耗時已經(jīng)到達了 「2.4s」,P99 的耗時更是到達了 「4.4s」。
響應時間增長圖如下所示:

可以看到并發(fā)數(shù)越大,接口響應時間也越大。
如果這個限流器真的應用在生產(chǎn)環(huán)境,可能剛發(fā)布上線的時候,流量比較小,接口響應一切正常。
后面一旦碰到請求數(shù)變大,接口響應時間將會拉長,然后客戶請求出現(xiàn)大規(guī)模的超時。
當壓力繼續(xù)增大,服務端可能就沒辦法再接受新的請求。
那為什么會這樣?
主要是因為 Semaphore#acquire方法如果沒有獲取到信號量,是會阻塞線程的,然后線程進入等待隊列。
默認情況下 Semaphore 使用不公平鎖競爭,那在高并發(fā)請求下,線程競爭資源比較激烈,有的線程可能運氣比較好,直接拿到信號量,那這部分請求接口耗時將會是正常。
但是有部分線程可能運氣不佳,直接被阻塞,一直等到最后才能拿到信號量,才能執(zhí)行。
優(yōu)化 Semaphore 限流
我們目前使用的大多數(shù)服務,追求的就是一個「低延遲,高吞吐」,那這類服務到達限流線之后,就應該直接拒絕,響應響應錯誤信息,快速結束請求。
那 Semaphore 實際還提供另一個tryAcquire 方法,這個方法如果拿不到信號量,將會直接返回 false,比較符合這種場景。
下面優(yōu)化一下上面的限流代碼,主要修改一下攔截器內(nèi) preHandle 使用的方法。
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (!concurrencyLimit.tryAcquire()) {
response.getWriter().println("ERROR");
return false;
}
return true;
}
那通過這種方式,我們可以快速返回錯誤信息,不用讓調用者一直等待。
再使用 jmeter 模擬高并發(fā)請求,結果如下:

可以看到這次響應時間就沒有上一次那么夸張。
總結
一切拋開業(yè)務的架構設計都是耍流氓!
我們可以使用 Semaphore 快速實現(xiàn)一個限流器,不過使用過程一定注意使用場景,謹慎測試,切勿直接復制亂搬網(wǎng)上的代碼。
那像大多數(shù)的互聯(lián)網(wǎng)服務來講,快速響應才是最重要的,所以限流一定不能使用 Semaphore#acquire阻塞式方法。
而像有些后臺離線服務,不追求快速響應,只需要完成即可,那這類我們可以使用 Semaphore#acquire,將線程阻塞直到完成任務。
