JUC并發(fā)編程之CountDownLatch源碼詳解

如上這段話可能不是那么好理解,我舉一個(gè)生活中很通俗的例子:

以及上述所說的自駕游場(chǎng)景,當(dāng)然大家也可以自己發(fā)揮想象,項(xiàng)目中是否有場(chǎng)景能夠用到該工具類。
/*** @author sunny* @date 2021/07/06 09:30* @description*/@Slf4jpublic class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {test01();// test02();// test03();}/*** 模擬高并發(fā)場(chǎng)景** @throws InterruptedException*/public static void test01() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 10; i++) {new Thread(() -> {try {log.info("{}:線程已就緒,當(dāng)前時(shí)間戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());countDownLatch.await();log.info("{}:線程已釋放,,當(dāng)前時(shí)間戳:{}", Thread.currentThread().getName(), System.currentTimeMillis());} catch (InterruptedException e) {e.printStackTrace();}}).start();}TimeUnit.SECONDS.sleep(3);System.out.println("\n ========================= \n");countDownLatch.countDown();}/*** 模擬家庭旅游場(chǎng)景*/public static void test02() {CountDownLatch countDownLatch = new CountDownLatch(11);for (int i = 0; i < 10; i++) {new Thread(() -> {log.info("{}:已經(jīng)上車了", Thread.currentThread().getName());countDownLatch.countDown();}).start();}new Thread(() -> {try {TimeUnit.SECONDS.sleep(5);log.info("{}:五秒后已經(jīng)上車了", Thread.currentThread().getName());countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}).start();try {TimeUnit.SECONDS.sleep(2);System.out.println("\n ========================= \n");countDownLatch.await();log.info("開車旅游啦");} catch (InterruptedException e) {e.printStackTrace();}}/*** 模擬去峨眉山游玩,但是因?yàn)橹型居械缆沸枰蘼罚孕枰却尥旰蟛拍艹霭l(fā)* 那么修路的過程中*/public static void test03() throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(10);for (int i = 1; i <= 9; i++) {new Thread(() -> {try {log.info("{}:線程,我要準(zhǔn)備前往峨眉山,路被堵住了", Thread.currentThread().getName());countDownLatch.await();log.info("{}:線程,道路終于可以通行了", Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}).start();}TimeUnit.SECONDS.sleep(3);log.info("======================準(zhǔn)備開始動(dòng)工======================");for (int i = 1; i <= 9; i++) {Thread.sleep(300);int fi = i;new Thread(() -> {countDownLatch.countDown();log.info("{}:線程,已開工:{}天,剩余:{}天", Thread.currentThread().getName(), fi, countDownLatch.getCount());}).start();}TimeUnit.SECONDS.sleep(6);new Thread(() -> {log.info("{}:線程,已開工", Thread.currentThread().getName());countDownLatch.countDown();}).start();TimeUnit.SECONDS.sleep(8);for (int i = 0; i < 5; i++) {new Thread(() -> {try {countDownLatch.await();log.info("{}:線程,去峨眉山游玩啦", Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}).start();}}}
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count); // 更新 state 值}
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED); // 構(gòu)建雙向鏈表 或 入隊(duì)操作boolean failed = true;try {for (;;) { // 自旋final Node p = node.predecessor(); //獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)if (p == head) {int r = tryAcquireShared(arg); // 嘗試獲取令牌if (r >= 0) { // 獲取令牌成功setHeadAndPropagate(node, r); //傳播鏈表p.next = null; // help GC 將前驅(qū)節(jié)點(diǎn)的引用指向?yàn)镹ULL,待垃圾回收器回收failed = false;return; // 獲取令牌成功,退出自旋}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) // 阻塞當(dāng)前線程throw new InterruptedException();}} finally {// 如果某個(gè)線程被中斷,非正常流程退出則將當(dāng)前線程的節(jié)點(diǎn)設(shè)置為cancel狀態(tài)if (failed)cancelAcquire(node);}}
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode); // 封裝節(jié)點(diǎn)// Try the fast path of enq; backup to full enq on failureNode pred = tail; // 獲取末尾節(jié)點(diǎn)if (pred != null) {node.prev = pred; // 當(dāng)前節(jié)點(diǎn)的前驅(qū)引用指向?yàn)閜redif (compareAndSetTail(pred, node)) { // 將當(dāng)前節(jié)點(diǎn)設(shè)置為鏈表末尾節(jié)點(diǎn)pred.next = node; // 原末尾節(jié)點(diǎn)后驅(qū)引用指向?yàn)楫?dāng)前節(jié)點(diǎn)return node;}}enq(node);return node;}


private Node enq(final Node node) {for (;;) {Node t = tail; // 獲取末尾節(jié)點(diǎn)if (t == null) { // Must initialize // 構(gòu)建雙向鏈表if (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED); // 構(gòu)建雙向鏈表 或 入隊(duì)操作boolean failed = true;try {for (;;) { // 自旋final Node p = node.predecessor(); //獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)if (p == head) {int r = tryAcquireShared(arg); // 嘗試獲取令牌if (r >= 0) { // 獲取令牌成功setHeadAndPropagate(node, r); //傳播鏈表p.next = null; // help GC 將前驅(qū)節(jié)點(diǎn)的引用指向?yàn)镹ULL,待垃圾回收器回收failed = false;return; // 獲取令牌成功,退出自旋}}if (shouldParkAfterFailedAcquire(p, node) && //判斷線程是否需要被阻塞parkAndCheckInterrupt()) // 阻塞當(dāng)前線程throw new InterruptedException();}} finally {// 如果某個(gè)線程被中斷,非正常流程退出則將當(dāng)前線程的節(jié)點(diǎn)設(shè)置為cancel狀態(tài)if (failed)cancelAcquire(node);}}
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);/** Try to signal next queued node if:* Propagation was indicated by caller,* or was recorded (as h.waitStatus either before* or after setHead) by a previous operation* (note: this uses sign-check of waitStatus because* PROPAGATE status may transition to SIGNAL.)* and* The next node is waiting in shared mode,* or we don't know, because it appears null** The conservatism in both of these checks may cause* unnecessary wake-ups, but only when there are multiple* racing acquires/releases, so most need signals now or soon* anyway.*/if (propagate > 0 || h == null || h.waitStatus < 0 || // 還有令牌可獲取 || 頭節(jié)點(diǎn)狀態(tài)處于等待狀態(tài)(h = head) == null || h.waitStatus < 0) {Node s = node.next; // 獲取當(dāng)前下一節(jié)點(diǎn)if (s == null || s.isShared()) // 判斷下節(jié)點(diǎn)是否為共享節(jié)點(diǎn)doReleaseShared(); // 傳播~~ 具體傳播什么呢???}}
private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}

private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) { // 自旋 可以理解為傳播 【加自旋的原因,可能同時(shí)有多個(gè)令牌被釋放,那么在這里就可以喚醒后續(xù)所有節(jié)點(diǎn)去獲取令牌,就不用在前面再去判斷是否要去喚醒后驅(qū)節(jié)點(diǎn)了。 如果沒有獲取到令牌也沒關(guān)系,后面還是會(huì)將沒有搶到的線程進(jìn)行阻塞住】Node h = head;if (h != null && h != tail) { // 頭節(jié)點(diǎn)不為null 其 頭非等于尾節(jié)點(diǎn) 則證明當(dāng)前鏈表還有多個(gè)節(jié)點(diǎn)int ws = h.waitStatus; // 獲取head的節(jié)點(diǎn)狀態(tài)if (ws == Node.SIGNAL) { // 如果當(dāng)前節(jié)點(diǎn)狀態(tài)為SIGNAL,就代表后驅(qū)節(jié)點(diǎn)正在被阻塞著if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 通過cas將狀態(tài)從等待更換為非等待,然后取反的話,將下一個(gè)節(jié)點(diǎn)喚醒continue; // loop to recheck casesunparkSuccessor(h); // 喚醒線程 去獲取令牌}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 如果節(jié)點(diǎn)狀態(tài)已經(jīng)為0,則會(huì)將節(jié)點(diǎn)的狀態(tài)更新為PROPAGATE PROPAGATE:表示下一次共享式同步狀態(tài)獲取將會(huì)被無條件地傳播下去continue; // loop on failed CAS}if (h == head) // loop if head changedbreak; // 跳出當(dāng)前循環(huán)}}
private void unparkSuccessor(Node node) {// 先獲取head節(jié)點(diǎn)的狀態(tài),應(yīng)該是等于-1,原因在shouldParkAfterFailedAcquire方法中有體現(xiàn)int ws = node.waitStatus;// 由于-1會(huì)小于0,所以更新改為0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 獲取第一個(gè)正常排隊(duì)的節(jié)點(diǎn)Node s = node.next;//正常解鎖流程不會(huì)走該if判斷if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}// 正常來說第一個(gè)排隊(duì)的節(jié)點(diǎn)不應(yīng)該為空,所以直接把第一個(gè)排隊(duì)的線程喚醒if (s != null)LockSupport.unpark(s.thread);}
sync.releaseShared(1);public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { //通用釋放令牌doReleaseShared(); //喚醒后驅(qū)節(jié)點(diǎn)return true;}return false;}
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}
JUC并發(fā)編程之CountDownLatch源碼講解視頻
我是黎明大大,我知道我沒有驚世的才華,也沒有超于凡人的能力,但畢竟我還有一個(gè)不屈服,敢于選擇向命運(yùn)沖鋒的靈魂,和一個(gè)就是傷痕累累也要義無反顧走下去的心。
如果您覺得本文對(duì)您有幫助,還請(qǐng)關(guān)注點(diǎn)贊一波,后期將不間斷更新更多技術(shù)文章

●JUC并發(fā)編程之ReentrantLock非公平鎖源碼詳解
●JUC并發(fā)編程之Synchronized關(guān)鍵字詳解
●JUC并發(fā)編程之MESI緩存一致協(xié)議詳解

評(píng)論
圖片
表情
