Java多線程之Semaphore信號(hào)量
這里就JUC包中的Semaphore類做相關(guān)介紹

概述
JUC包中的Semaphore信號(hào)量作為一個(gè)并發(fā)工具類。其基本思想很簡(jiǎn)單,對(duì)于一個(gè)信號(hào)量實(shí)例而言,其含有指定數(shù)量的許可。每當(dāng)訪問資源前,需先向其申請(qǐng)?jiān)S可。并在處理完畢后釋放許可,以供后續(xù)申請(qǐng)。其實(shí),這個(gè)使用方式就很像現(xiàn)實(shí)世界的停車場(chǎng),即停車場(chǎng)有空余車位,車才可以進(jìn)車;否則要么等待要么離開(尋找下一個(gè)停車場(chǎng))。當(dāng)車從停車場(chǎng)的車位駛離時(shí),則會(huì)將相應(yīng)的車位就會(huì)空余出來。在整個(gè)過程停車場(chǎng)的車位資源是有限的固定的。常見的使用場(chǎng)景是對(duì)業(yè)務(wù)所使用的線程數(shù)進(jìn)行控制,即所謂基于線程數(shù)的限流方式。其常用方法及功能如下所示
//?創(chuàng)建一個(gè)指定許可數(shù)的非公平信號(hào)量
public?Semaphore(int?permits);
//?創(chuàng)建一個(gè)指定許可數(shù)的公平/非公平信號(hào)量
public?Semaphore(int?permits,?boolean?fair);
//?釋放一個(gè)許可
public?void?release();
//?釋放指定數(shù)量的許可
public?void?release(int?permits);
//?當(dāng)前剩余可用的許可數(shù)量
public?int?availablePermits();
/***************************?獲取許可?******************************/
//?阻塞等待,直到獲取一個(gè)許可
public?void?acquire()?throws?InterruptedException;
//?阻塞等待,直到獲取全部所需數(shù)量的許可
public?void?acquire(int?permits)?throws?InterruptedException;
//?阻塞等待(忽略InterruptedException異常),直到獲取一個(gè)許可
public?void?acquireUninterruptibly();
//?阻塞等待(忽略InterruptedException異常),直到獲取全部所需數(shù)量的許可
public?void?acquireUninterruptibly(int?permits);
//?非阻塞式獲取一個(gè)許可,?ture:?獲取成功;?false:?獲取失敗
public?boolean?tryAcquire();
//?非阻塞式獲取全部所需數(shù)量的許可,?ture:?獲取成功;?false:?獲取失敗
public?boolean?tryAcquire(int?permits);
//?支持超時(shí)機(jī)制的tryAcquire方法,?獲取一個(gè)許可,?ture:?獲取成功;?false:?獲取失敗
public?boolean?tryAcquire(long?timeout,?TimeUnit?unit)?throws?InterruptedException;
//?支持超時(shí)機(jī)制的tryAcquire方法,?獲取全部所需數(shù)量的許可,?ture:?獲取成功;?false:?獲取失敗
public?boolean?tryAcquire(int?permits,?long?timeout,?TimeUnit?unit)?throws?InterruptedException;
//?一次性獲取所有剩余可用的許可,?返回成功獲取的許可數(shù)
public?int?drainPermits();
/******************************************************************/
可以看到,對(duì)于信號(hào)量而言,其支持公平和非公平兩種類型。默認(rèn)為非公平的。值得一提的是,對(duì)于tryAcquire()方法而言,其是非阻塞的。并且一旦存在可用的許可,會(huì)立即分配給它。不論是否存在其他正在等待許可的線程。即使當(dāng)前這個(gè)信號(hào)量實(shí)例是公平的,換言之tryAcquire()方法會(huì)破壞公平信號(hào)量實(shí)例的公平性。如果既期望使用非阻塞方式,又期望不破壞公平信號(hào)量的公平性,可以使用它的超時(shí)機(jī)制版本,同時(shí)將超時(shí)時(shí)間設(shè)為0。即 tryAcquire(0, TimeUnit.SECONDS) 。方法tryAcquire(int permits)同理,此處不再贅述
基本實(shí)踐
這里通過一個(gè)簡(jiǎn)單的實(shí)例,來進(jìn)行展示其基本的使用流程
public?class?SemaphoreTest?{
????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss");
????//?系統(tǒng)最大的并發(fā)處理量
????private?static?Integer?maxLimit?=?5;
????@Test
????public?void?test1()?{
????????System.out.println("----------------------?系統(tǒng)上線?----------------------");
????????Semaphore?semaphore?=?new?Semaphore(maxLimit,?true);
????????ExecutorService?threadPool?=?Executors.newFixedThreadPool(10);
????????IntStream.rangeClosed(1,8)
????????????.mapToObj(?num?->?new?UserReq("用戶#"+num,?semaphore)?)
????????????.forEach(?threadPool::execute?);
????????//?主線程等待所有任務(wù)執(zhí)行完畢
????????try{?Thread.sleep(?120*1000?);?}?catch?(Exception?e)?{}
????????System.out.println("----------------------?系統(tǒng)下線?----------------------");
????}
????
????/**
?????*?打印信息
?????*?@param?msg
?????*/
????private?static?void?info(String?msg)?{
????????String?time?=?formatter.format(LocalTime.now());
????????String?log?=?"["+time+"]?"+?msg;
????????System.out.println(log);
????}
????@AllArgsConstructor
????private?static?class?UserReq?implements?Runnable{
????????private?String?name;
????????private?Semaphore?semaphore;
????????@Override
????????public?void?run()?{
????????????//?模擬用戶不定時(shí)發(fā)起請(qǐng)求
????????????try{?Thread.sleep(RandomUtils.nextLong(500,?2000));?}?catch?(Exception?e)?{}
????????????String?msg?=?name?+?":?發(fā)起請(qǐng)求,?系統(tǒng)可用資源數(shù):?"?+?semaphore.availablePermits();
????????????info(msg);
????????????//?阻塞等待,直到獲取許可
????????????try?{
????????????????semaphore.acquire();
????????????}catch?(InterruptedException?e)?{
????????????????System.out.println(?"Happen?Exception:?"?+?e.getMessage());
????????????}
????????????info(name?+?":?系統(tǒng)開始處理請(qǐng)求");
????????????//?模擬業(yè)務(wù)耗時(shí)
????????????try{?Thread.sleep(RandomUtils.nextInt(5,?20)*1000);?}?catch?(Exception?e)?{}
????????????//?用戶請(qǐng)求處理完畢,釋放許可
????????????semaphore.release();
????????????info(name?+?":?系統(tǒng)處理完畢");
????????}
????}
}
測(cè)試結(jié)果如下,符合預(yù)期

實(shí)現(xiàn)原理
構(gòu)造器
Semaphore信號(hào)量類的實(shí)現(xiàn)過程同樣依賴于AQS。具體地,其是對(duì)AQS中共享鎖的使用。在構(gòu)建Semaphore實(shí)例過程時(shí),一方面,通過sync變量持有AQS的實(shí)現(xiàn)類Sync,同時(shí)按公平性與否進(jìn)一步地可細(xì)分為NonfairSync、FairSync;另一方面,通過AQS的state字段來存儲(chǔ)許可的數(shù)量
public?class?Semaphore?implements?java.io.Serializable?{
????private?final?Sync?sync;
????public?Semaphore(int?permits)?{
????????sync?=?new?NonfairSync(permits);
????}
????public?Semaphore(int?permits,?boolean?fair)?{
????????sync?=?fair???new?FairSync(permits)?:?new?NonfairSync(permits);
????}
????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
????????Sync(int?permits)?{
????????????setState(permits);
????????}
????}
????static?final?class?NonfairSync?extends?Sync?{
????????NonfairSync(int?permits)?{
????????????super(permits);
????????}
????}
????static?final?class?FairSync?extends?Sync?{
????????FairSync(int?permits)?{
????????????super(permits);
????????}
????}
}
acquire方法
首先來看Semaphore的acquire()方法。其委托sync調(diào)用AQS的acquireSharedInterruptibly方法。而在AQS中通過調(diào)用tryAcquireShared方法判斷是否需要阻塞調(diào)用線程。具體地,在Semaphore的NonfairSync、FairSync內(nèi)部類分別實(shí)現(xiàn)了該tryAcquireShared方法的兩個(gè)版本:非公平、公平??梢钥吹絻煞N實(shí)現(xiàn)基本一致。tryAcquireShared如果返回負(fù)值,則說明當(dāng)前許可數(shù)不夠,當(dāng)前線程需要進(jìn)入AQS阻塞隊(duì)列;反之則獲取成功。只是在公平版本的實(shí)現(xiàn)中,會(huì)調(diào)用AQS的hasQueuedPredecessors方法來判斷是否有其他線程已經(jīng)在AQS隊(duì)列中進(jìn)行排隊(duì)。如果有,則tryAcquireShared直接返回-1,即當(dāng)前調(diào)用線程放棄獲取,轉(zhuǎn)而準(zhǔn)備進(jìn)入AQS隊(duì)列以保障公平性
public?class?Semaphore?implements?java.io.Serializable?{
????public?void?acquire()?throws?InterruptedException?{
????????sync.acquireSharedInterruptibly(1);
????}
????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
????????//?非公平信號(hào)量獲取許可
????????final?int?nonfairTryAcquireShared(int?acquires)?{
????????????for?(;;)?{
????????????????int?available?=?getState();
????????????????int?remaining?=?available?-?acquires;
????????????????if?(remaining?0?||
????????????????????compareAndSetState(available,?remaining))
????????????????????return?remaining;
????????????}
????????}
????}????
????static?final?class?NonfairSync?extends?Sync?{
????????protected?int?tryAcquireShared(int?acquires)?{
????????????return?nonfairTryAcquireShared(acquires);
????????}
????}
????static?final?class?FairSync?extends?Sync?{
????????//?公平信號(hào)量獲取許可
????????protected?int?tryAcquireShared(int?acquires)?{
????????????for?(;;)?{
????????????????if?(hasQueuedPredecessors())
????????????????//?對(duì)于公平性實(shí)現(xiàn)而言,?如果AQS隊(duì)列存在排隊(duì)的節(jié)點(diǎn)
????????????????//?則直接返回-1,?即進(jìn)入AQS隊(duì)列進(jìn)行排隊(duì)以保證公平性
????????????????????return?-1;
????????????????//?通過訪問AQS的state字段,?獲取當(dāng)前可用的許可數(shù)量????
????????????????int?available?=?getState();
????????????????//?計(jì)算剩余可用的許可數(shù)量
????????????????int?remaining?=?available?-?acquires;
????????????????if?(remaining?0?||
????????????????????compareAndSetState(available,?remaining))
????????????????????return?remaining;
????????????}
????????}
????}
}
...
public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
????public?final?void?acquireSharedInterruptibly(int?arg)?throws?InterruptedException?{
????????//?線程被中斷則直接拋出異常
????????if?(Thread.interrupted())
????????????throw?new?InterruptedException();
????????if?(tryAcquireShared(arg)?0)
????????????doAcquireSharedInterruptibly(arg);
????}
????
????//?需要子類去實(shí)現(xiàn)
????protected?int?tryAcquireShared(int?arg)?{
????????throw?new?UnsupportedOperationException();
????}
}
release方法
Semaphore的release()方法類似。其同樣是委托sync調(diào)用AQS的releaseShared方法。然后AQS執(zhí)行tryReleaseShared方法,如果該方法返回true,則會(huì)進(jìn)一步調(diào)用AQS的doReleaseShared方法來喚醒AQS隊(duì)列中其他線程??梢钥吹皆赟emaphore的Sync內(nèi)部類中,tryReleaseShared總是會(huì)返回true。其實(shí)現(xiàn)過程也很簡(jiǎn)單,如下所示
public?class?Semaphore?implements?java.io.Serializable?{
????public?void?release()?{
????????sync.releaseShared(1);
????}
????abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{
????????protected?final?boolean?tryReleaseShared(int?releases)?{
????????????for?(;;)?{
????????????????//?通過訪問AQS的state字段,?獲取當(dāng)前可用的許可數(shù)量????
????????????????int?current?=?getState();
????????????????//?將釋放的許可數(shù)添加到當(dāng)前可用許可數(shù)量上
????????????????int?next?=?current?+?releases;
????????????????if?(next?//?overflow
????????????????????throw?new?Error("Maximum?permit?count?exceeded");
????????????????//?通過CAS的方式更新state字段
????????????????if?(compareAndSetState(current,?next))
????????????????????return?true;
????????????}
????????}
????}
}
...
public?abstract?class?AbstractQueuedSynchronizer?extends?AbstractOwnableSynchronizer?implements?java.io.Serializable?{
????public?final?boolean?releaseShared(int?arg)?{
????????if?(tryReleaseShared(arg))?{
????????????doReleaseShared();
????????????return?true;
????????}
????????return?false;
????}
????
????//?需要子類去實(shí)現(xiàn)
????protected?boolean?tryReleaseShared(int?arg)?{
????????throw?new?UnsupportedOperationException();
????}
}
參考文獻(xiàn)
Java并發(fā)編程之美 翟陸續(xù)、薛賓田著
