同步組件Semaphore源碼解析
Semaphore概述及案例學(xué)習(xí)
Semaphore信號(hào)量用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理地使用公共資源。
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10); //10個(gè)許可證數(shù)量,最大并發(fā)數(shù)為10
public static void main(String[] args) {
for(int i = 0; i < THREAD_COUNT; i ++){ //執(zhí)行30個(gè)線程
threadPool.execute(new Runnable() {
@Override
public void run() {
s.tryAcquire(); //嘗試獲取一個(gè)許可證
System.out.println("save data");
s.release(); //使用完之后歸還許可證
}
});
}
threadPool.shutdown();
}
}
創(chuàng)建一個(gè)大小為30的線程池,但是信號(hào)量規(guī)定在10,保證許可證數(shù)量為10。
每次線程調(diào)用
tryAcquire()或者acquire()方法都會(huì)原子性的遞減許可證的數(shù)量,release()會(huì)原子性遞增許可證數(shù)量。
類圖結(jié)構(gòu)及重要字段

public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
/** All mechanics via AbstractQueuedSynchronizer subclass */
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
// permits指定初始化信號(hào)量個(gè)數(shù)
Sync(int permits) {
setState(permits);
}
// ...
}
static final class NonfairSync extends Sync {...}
static final class FairSync extends Sync {...}
// 默認(rèn)采用非公平策略
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 可以指定公平策略
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//...
}
基于AQS,類似于ReentrantLock,Sync繼承自AQS,有公平策略和非公平策略兩種實(shí)現(xiàn)。
類似于CountDownLatch,state在這里也是通過構(gòu)造器指定,表示初始化信號(hào)量的個(gè)數(shù)。
本篇文章閱讀需要建立在一定的AQS基礎(chǔ)之上,這邊推薦幾篇前置文章,可以瞅一眼:
void acquire()
調(diào)用該方法時(shí),表示希望獲取一個(gè)信號(hào)量資源,相當(dāng)于acquire(1)。
如果當(dāng)前信號(hào)量個(gè)數(shù)大于0,CAS將當(dāng)前信號(hào)量值減1,成功后直接返回。
如果當(dāng)前信號(hào)量個(gè)數(shù)等于0,則當(dāng)前線程將被置入AQS的阻塞隊(duì)列。
該方法是響應(yīng)中斷的,其他線程調(diào)用了該線程的interrupt()方法,將會(huì)拋出中斷異常返回。
// Semaphore.java
public void acquire() throws InterruptedException {
// 傳遞的 arg 為 1 , 獲取1個(gè)信號(hào)量資源
sync.acquireSharedInterruptibly(1);
}
// AQS.java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 線程被 中斷, 拋出中斷異常
if (Thread.interrupted())
throw new InterruptedException();
// 子類實(shí)現(xiàn), 公平和非公平兩種策略
if (tryAcquireShared(arg) < 0)
// 如果獲取失敗, 則置入阻塞隊(duì)列,
// 再次進(jìn)行嘗試, 嘗試失敗則掛起當(dāng)前線程
doAcquireSharedInterruptibly(arg);
}
非公平
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
// 這里直接調(diào)用Sync定義的 非公平共享模式獲取方法
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 獲取當(dāng)前信號(hào)量的值
int available = getState();
// 減去需要獲取的值, 得到剩余的信號(hào)量個(gè)數(shù)
int remaining = available - acquires;
// 不剩了,表示當(dāng)前信號(hào)量個(gè)數(shù)不能滿足需求, 返回負(fù)數(shù), 線程置入AQS阻塞
// 還有的剩, CAS設(shè)置當(dāng)前信號(hào)量值為剩余值, 并返回剩余值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
你會(huì)發(fā)現(xiàn),非公平策略是無法保證【AQS隊(duì)列中阻塞的線程】和【當(dāng)前線程】獲取的順序的,當(dāng)前線程是有可能在排隊(duì)的線程之前就拿到資源,產(chǎn)生插隊(duì)現(xiàn)象。
公平策略就不一樣了,它會(huì)通過hasQueuedPredecessors()方法看看隊(duì)列中是否存在前驅(qū)節(jié)點(diǎn),以保證公平性。
公平策略
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果隊(duì)列中在此之前已經(jīng)有線程在排隊(duì)了,直接放棄獲取
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
void acquire(int permits)
在acquire()的基礎(chǔ)上,指定了獲取信號(hào)量的數(shù)量permits。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
void acquireUninterruptibly()
該方法與acquire()類似,但是不響應(yīng)中斷。
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
void acquireUninterruptibly(int permits)
該方法與acquire(permits)類似,但是不響應(yīng)中斷。
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
boolean tryAcquire()
tryAcquire和acquire非公平策略公用一個(gè)邏輯,但是區(qū)別在于,如果獲取信號(hào)量失敗,或者CAS失敗,將會(huì)直接返回false,而不會(huì)置入阻塞隊(duì)列中。
一般try開頭的方法的特點(diǎn)就是這樣,嘗試一下,成功是最好,失敗也不至于被阻塞,而是立刻返回false。
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
boolean tryAcquire(int permits)
相比于普通的tryAcquire(),指定了permits的值。
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
相比于tryAcquire(int permits),增加了超時(shí)控制。
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
void release()
將信號(hào)量值加1,如果有線程因?yàn)檎{(diào)用acquire方法而被阻塞在AQS阻塞隊(duì)列中,將根據(jù)公平策略選擇一個(gè)信號(hào)量個(gè)數(shù)滿足需求的線程喚醒,線程喚醒后也會(huì)嘗試獲取新增的信號(hào)量。
參考文章:Java并發(fā)包源碼學(xué)習(xí)系列:AQS共享模式獲取與釋放資源
// Semaphore.java
public void release() {
sync.releaseShared(1);
}
// AQS.java
public final boolean releaseShared(int arg) {
// 嘗試釋放鎖
if (tryReleaseShared(arg)) {
// 釋放成功, 喚醒AQS隊(duì)列里面最先掛起的線程
// https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838
doReleaseShared();
return true;
}
return false;
}
// Semaphore#Sync.java
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 獲取當(dāng)前信號(hào)量
int current = getState();
// 期望加上releases
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS操作,更新
if (compareAndSetState(current, next))
return true;
}
}
}
void release(int permits)
和release()相比指定了permits的值。
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
其他方法
Semaphore還提供其他一些方法,實(shí)現(xiàn)比較簡(jiǎn)單,這邊就簡(jiǎn)單寫一下吧:
// 返回此信號(hào)量中當(dāng)前可用的許可證數(shù)量, 其實(shí)就是得到當(dāng)前的 state值 getState()
public int availablePermits() {
return sync.getPermits();
}
// 將state更新為0, 返回0
public int drainPermits() {
return sync.drainPermits();
}
// 減少reduction個(gè)許可證
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
// 判斷公平策略
public boolean isFair() {
return sync instanceof FairSync;
}
// 判斷是否有線程證在等待獲取許可證
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
// 返回正在等待獲取許可證的線程數(shù)
public final int getQueueLength() {
return sync.getQueueLength();
}
// 返回所有等待獲取許可證的線程集合
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
總結(jié)
Semaphore信號(hào)量用來控制同時(shí)訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個(gè)線程,以保證合理地使用公共資源。
基于AQS,類似于ReentrantLock,Sync繼承自AQS,有公平策略和非公平策略兩種實(shí)現(xiàn)。
類似于CountDownLatch,state在這里也是通過構(gòu)造器指定,表示初始化信號(hào)量的個(gè)數(shù)。
每次線程調(diào)用tryAcquire()或者acquire()方法都會(huì)原子性的遞減許可證的數(shù)量,release()會(huì)原子性遞增許可證數(shù)量,只要有許可證就可以重復(fù)使用

剩下的就不會(huì)給大家一展出來了,以上資料按照一下操作即可獲得
——將文章進(jìn)行轉(zhuǎn)發(fā)和評(píng)論,關(guān)注公眾號(hào)【Java烤豬皮】,關(guān)注后繼續(xù)后臺(tái)回復(fù)領(lǐng)取口令“ 666 ”即可免費(fèi)領(lǐng)文章取中所提供的資料。

騰訊、阿里、滴滴后臺(tái)試題匯集總結(jié) — (含答案)
面試:史上最全多線程序面試題!
最新阿里內(nèi)推Java后端試題
JVM難學(xué)?那是因?yàn)槟銢]有真正看完整這篇文章

關(guān)注作者微信公眾號(hào) — 《JAVA烤豬皮》
了解了更多java后端架構(gòu)知識(shí)以及最新面試寶典
看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者來源不斷出文的動(dòng)力~
