給力!Java 并發(fā)之 Semaphore 源碼剖析!
本文精選自 Doocs 開源社區(qū)旗下“源碼獵人”項目,作者 AmyliaY。
項目將會持續(xù)更新,歡迎 Star 關(guān)注。
項目地址:https://github.com/doocs/source-code-hunter
Semaphore 信號量,可用于控制一定時間內(nèi),并發(fā)執(zhí)行的線程數(shù),基于 AQS 實現(xiàn)??蓱?yīng)用于網(wǎng)關(guān)限流、資源限制 (如 最大可發(fā)起連接數(shù))。由于?release()?釋放許可時,未對釋放許可數(shù)做限制,所以可以通過該方法增加總的許可數(shù)量。
獲取許可支持公平和非公平模式,默認(rèn)非公平模式。公平模式無論是否有許可,都會先判斷是否有線程在排隊,如果有線程排隊,則進(jìn)入排隊,否則嘗試獲取許可;非公平模式無論許可是否充足,直接嘗試獲取許可。
不多廢話,下面直接挖源碼。
核心內(nèi)部類 Sync
abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;/* 賦值state為總許可數(shù) */Sync(int permits) {setState(permits);}/* 剩余許可數(shù) */final int getPermits() {return getState();}/* 自旋 + CAS非公平獲取 */final int nonfairTryAcquireShared(int acquires) {for (;;) {// 剩余可用許可數(shù)int available = getState();// 本次獲取許可后,剩余許可int remaining = available - acquires;// 如果獲取后,剩余許可大于0,則CAS更新剩余許可,否則獲取失敗失敗if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}/*** 自旋 + CAS 釋放許可* 由于未對釋放許可數(shù)做限制,所以可以通過release動態(tài)增加許可數(shù)量*/protected final boolean tryReleaseShared(int releases) {for (;;) {// 當(dāng)前剩余許可int current = getState();// 許可更新值int next = current + releases;// 如果許可更新值為負(fù)數(shù),說明許可數(shù)量溢出,拋出錯誤if (next < current) // overflowthrow new Error("Maximum permit count exceeded");// CAS更新許可數(shù)量if (compareAndSetState(current, next))return true;}}/* 自旋 + CAS 減少許可數(shù)量 */final void reducePermits(int reductions) {for (;;) {// 當(dāng)前剩余許可int current = getState();// 更新值int next = current - reductions;// 較少許可數(shù)錯誤,拋出異常if (next > current) // underflowthrow new Error("Permit count underflow");// CAS更新許可數(shù)if (compareAndSetState(current, next))return;}}/* 丟棄所有許可 */final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}}/*** 非公平模式*/static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}/*** 公平模式*/static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}/*** 公平模式獲取許可* 公平模式不論許可是否充足,都會判斷同步隊列中是否有線程在等地,如果有,獲取失敗,排隊阻塞*/protected int tryAcquireShared(int acquires) {for (;;) {// 如果有線程在排隊,立即返回if (hasQueuedPredecessors())return -1;// 自旋 + cas獲取許可int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}
主要 API
public class Semaphore implements java.io.Serializable {private static final long serialVersionUID = -3222578661600680210L;/** All mechanics via AbstractQueuedSynchronizer subclass */private final Sync sync;/*** 根據(jù)給定的 總許可數(shù)permits,創(chuàng)建 Semaphore*/public Semaphore(int permits) {sync = new NonfairSync(permits);}/*** fair為true表示使用公平鎖模式,false使用非公平鎖*/public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}// --------------------- 獲取許可 --------------------/* 獲取指定數(shù)量的許可 */public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}/* 獲取一個許可 */public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0) // 獲取許可,剩余許可>=0,則獲取許可成功,<0獲取許可失敗,進(jìn)入排隊doAcquireSharedInterruptibly(arg);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}/*** @return 剩余許可數(shù)量。非負(fù)數(shù),獲取許可成功,負(fù)數(shù),獲取許可失敗*/final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}/*** 獲取許可失敗,當(dāng)前線程進(jìn)入同步隊列,排隊阻塞*/private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 創(chuàng)建同步隊列節(jié)點,并入隊final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {// 如果當(dāng)前節(jié)點是第二個節(jié)點,嘗試獲取鎖final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 阻塞當(dāng)前線程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}// --------------------- 釋放歸還許可 -------------------------/* 釋放指定數(shù)量的許可 */public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);}/* 釋放一個許可 */public void release() {sync.releaseShared(1);}public final boolean releaseShared(int arg) {// 歸還許可成功if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}/*** 釋放許可* 由于未對釋放許可數(shù)做限制,所以可以通過release動態(tài)增加許可數(shù)量*/protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}private void doReleaseShared() {// 自旋,喚醒等待的第一個線程(其他線程將由第一個線程向后傳遞喚醒)for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck cases// 喚醒第一個等待線程unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}}
全文完!
希望本文對大家有所幫助。如果感覺本文有幫助,有勞轉(zhuǎn)發(fā)或點一下“在看”!讓更多人收獲知識!
長按識別下圖二維碼,關(guān)注公眾號「Doocs 開源社區(qū)」,第一時間跟你們分享好玩、實用的技術(shù)文章與業(yè)內(nèi)最新資訊。
評論
圖片
表情
