JUC 中 4 個(gè)常用的并發(fā)工具類
CountDownLatch
CountDownLatch是我目前使用比較多的類,CountDownLatch初始化時(shí)會(huì)給定一個(gè)計(jì)數(shù),然后每次調(diào)用countDown() 計(jì)數(shù)減1,
當(dāng)計(jì)數(shù)未到達(dá)0之前調(diào)用await() 方法會(huì)阻塞直到計(jì)數(shù)減到0;
使用場(chǎng)景:多用于劃分任務(wù)由多個(gè)線程執(zhí)行,例如:最近寫個(gè)豆瓣爬蟲,需要爬取每個(gè)電影的前五頁短評(píng),可以劃分成五個(gè)線程來處理數(shù)據(jù)。通過latch.await()保證全部完成再返回。
????public?void?latch()?throws?InterruptedException?{
????????int?count=?5;
????????CountDownLatch?latch?=?new?CountDownLatch(count);
????????for?(int?x=0;x????????????new?Worker(x*20,latch).start();
????????}
????????latch.await();
????????System.out.println("全部執(zhí)行完畢");
????}
????
????class?Worker?extends?Thread{
????????Integer?start;
????????CountDownLatch?latch;
????????public?Worker(Integer?start,CountDownLatch?latch){
????????????this.start=start;
????????????this.latch=latch;
????????}????????@Override
????????public?void?run()?{
????????????System.out.println(start+"?已執(zhí)行");
????????????latch.countDown();
????????}
????}
輸出如下:
20?已執(zhí)行
0?已執(zhí)行
40?已執(zhí)行
60?已執(zhí)行
80?已執(zhí)行
全部執(zhí)行完畢
CyclicBarrier
它允許一組線程互相等待,直到到達(dá)某個(gè)公共屏障點(diǎn) (common barrier point)也就是阻塞在調(diào)用cyclicBarrier.await()的地方。
看上去CyclicBarrier 跟CountDownLatch 功能上類似,在官方doc上CountDownLatch的描述上就說明了,CountDownLatch 的計(jì)數(shù)無法被重置,
如果需要重置計(jì)數(shù),請(qǐng)考慮使用CyclicBarrier。
CyclicBarrier初始時(shí)還可添加一個(gè)Runnable的參數(shù), 此Runnable在CyclicBarrier的數(shù)目達(dá)到后,所有其它線程被喚醒前被最后一個(gè)進(jìn)入 CyclicBarrier 的線程執(zhí)行
使用場(chǎng)景:類似CyclicBarrier,但是 CyclicBarrier提供了幾個(gè)countdownlatch 沒有的方法以應(yīng)付更復(fù)雜的場(chǎng)景,例如:
getNumberWaiting() 獲取阻塞線程數(shù)量,
isBroken() 用來知道阻塞的線程是否被中斷等方法。
reset() 將屏障重置為其初始狀態(tài)。如果所有參與者目前都在屏障處等待,則它們將返回,同時(shí)拋出一個(gè) BrokenBarrierException。
????public?void?latch()?throws?InterruptedException?{
????????int?count?=?5;
????????CyclicBarrier?cb?=?new?CyclicBarrier(count,?new?Runnable()?{
????????????@Override
????????????public?void?run()?{
????????????????System.out.println("全部執(zhí)行完畢");
????????????}
????????});
????????ExecutorService?executorService?=?Executors.newFixedThreadPool(count);
????????while?(true){
????????????for?(int?x=0;x????????????????executorService.execute(new?Worker(x,cb));
????????????}
????????}
????}????
????
????class?Worker?extends?Thread?{
????????Integer?start;
????????CyclicBarrier?cyclicBarrier;????????public?Worker(Integer?start,?CyclicBarrier?cyclicBarrier)?{
????????????this.start?=?start;
????????????this.cyclicBarrier?=?cyclicBarrier;
????????}????????@Override
????????public?void?run()?{
????????????System.out.println(start?+?"?已執(zhí)行");
????????????try?{
????????????????cyclicBarrier.await();
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}?catch?(BrokenBarrierException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
輸出如下:
0?已執(zhí)行
3?已執(zhí)行
4?已執(zhí)行
2?已執(zhí)行
1?已執(zhí)行
全部執(zhí)行完畢
0?已執(zhí)行
1?已執(zhí)行
2?已執(zhí)行
3?已執(zhí)行
4?已執(zhí)行
全部執(zhí)行完畢
Semaphore
Semaphore 信號(hào)量維護(hù)了一個(gè)許可集,每次使用時(shí)執(zhí)行acquire()從Semaphore獲取許可,如果沒有則會(huì)阻塞,每次使用完執(zhí)行release()釋放許可。
使用場(chǎng)景:Semaphore對(duì)用于對(duì)資源的控制,比如數(shù)據(jù)連接有限,使用Semaphore限制訪問數(shù)據(jù)庫的線程數(shù)。
????public?void?latch()?throws?InterruptedException,?IOException?{
????????int?count?=?5;
????????Semaphore?semaphore?=?new?Semaphore(1);
????????ExecutorService?executorService?=?Executors.newFixedThreadPool(count);
????????????for?(int?x=0;x????????????????executorService.execute(new?Worker(x,semaphore));
????????????}
????????System.in.read();
????}????
????
????class?Worker?extends?Thread?{
????????Integer?start;
????????Semaphore?semaphore;????????public?Worker(Integer?start,?Semaphore?semaphore)?{
????????????this.start?=?start;
????????????this.semaphore?=?semaphore;
????????}????????@Override
????????public?void?run()?throws?IllegalArgumentException?{
????????????try?{
????????????????System.out.println(start?+?"?準(zhǔn)備執(zhí)行");
????????????????TimeUnit.SECONDS.sleep(1);
????????????????semaphore.acquire();
????????????????System.out.println(start?+?"?已經(jīng)執(zhí)行");
????????????????semaphore.release();
????????????????System.out.println(start?+?"?已經(jīng)釋放");
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}????????}
????}
輸出如下:
0?準(zhǔn)備執(zhí)行
2?準(zhǔn)備執(zhí)行
1?準(zhǔn)備執(zhí)行
3?準(zhǔn)備執(zhí)行
4?準(zhǔn)備執(zhí)行
2?已經(jīng)執(zhí)行
2?已經(jīng)釋放
4?已經(jīng)執(zhí)行
4?已經(jīng)釋放
1?已經(jīng)執(zhí)行
1?已經(jīng)釋放
0?已經(jīng)執(zhí)行
0?已經(jīng)釋放
3?已經(jīng)執(zhí)行
3?已經(jīng)釋放
Exchanger
Exchanger 用于兩個(gè)線程間的數(shù)據(jù)交換,它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn)兩個(gè)線程可以交換彼此的數(shù)據(jù)。
使用場(chǎng)景:兩個(gè)線程相互等待處理結(jié)果并進(jìn)行數(shù)據(jù)傳遞。
????public?void?latch()?throws?InterruptedException,?IOException?{
????????int?count?=?5;
????????Exchanger?exchanger?=?new?Exchanger<>();
????????ExecutorService?executorService?=?Executors.newFixedThreadPool(count);
????????????for?(int?x=0;x????????????????executorService.execute(new?Worker(x,exchanger));
????????????}
????????System.in.read();
????}????
????
????class?Worker?extends?Thread?{
????????Integer?start;
????????Exchanger??exchanger;????????public?Worker(Integer?start,?Exchanger?exchanger) ?{
????????????this.start?=?start;
????????????this.exchanger?=?exchanger;
????????}????????@Override
????????public?void?run()?throws?IllegalArgumentException?{
????????????try?{
????????????????System.out.println(Thread.currentThread().getName()?+?"?準(zhǔn)備執(zhí)行");
????????????????TimeUnit.SECONDS.sleep(start);
????????????????System.out.println(Thread.currentThread().getName()?+?"?等待交換");
????????????????String?value?=?exchanger.exchange(Thread.currentThread().getName());
????????????????System.out.println(Thread.currentThread().getName()?+?"?交換得到數(shù)據(jù)為:"+value);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}????????}
????}
輸出如下:
pool-1-thread-1?準(zhǔn)備執(zhí)行
pool-1-thread-1?等待交換
pool-1-thread-3?準(zhǔn)備執(zhí)行
pool-1-thread-2?準(zhǔn)備執(zhí)行
pool-1-thread-5?準(zhǔn)備執(zhí)行
pool-1-thread-4?準(zhǔn)備執(zhí)行
pool-1-thread-2?等待交換
pool-1-thread-1 交換得到數(shù)據(jù)為:pool-1-thread-2
pool-1-thread-2 交換得到數(shù)據(jù)為:pool-1-thread-1
pool-1-thread-3?等待交換
pool-1-thread-4?等待交換
pool-1-thread-4 交換得到數(shù)據(jù)為:pool-1-thread-3
pool-1-thread-3 交換得到數(shù)據(jù)為:pool-1-thread-4
pool-1-thread-5?等待交換
Exchanger必須成對(duì)出現(xiàn),否則會(huì)像上面代碼執(zhí)行結(jié)果那樣,pool-1-thread-5一直阻塞等待與其交換數(shù)據(jù)的線程,為了避免這一現(xiàn)象,可以使用exchange(V x, long timeout, TimeUnit unit)設(shè)置最大等待時(shí)長
原文出處:https://www.shuzhiduo.com/A/kPzOYlXa5x/
