JUC 常用4大并發(fā)工具類,值得一看 !
點擊上方藍色字體,選擇“標星公眾號”
優(yōu)質(zhì)文章,第一時間送達
什么是JUC?
JUC就是java.util.concurrent包,這個包俗稱JUC,里面都是解決并發(fā)問題的一些東西
該包的位置位于java下面的rt.jar包下面
4大常用并發(fā)工具類:
CountDownLatch
CyclicBarrier
Semaphore
ExChanger?
CountDownLatch:
CountDownLatch,俗稱閉鎖,作用是類似加強版的Join,是讓一組線程等待其他的線程完成工作以后才執(zhí)行
就比如在啟動框架服務(wù)的時候,我們主線程需要在環(huán)境線程初始化完成之后才能啟動,這時候我們就可以實現(xiàn)使用CountDownLatch來完成
/**
?????*?Constructs?a?{@code?CountDownLatch}?initialized?with?the?given?count.
?????*
?????*?@param?count?the?number?of?times?{@link?#countDown}?must?be?invoked
?????*????????before?threads?can?pass?through?{@link?#await}
?????*?@throws?IllegalArgumentException?if?{@code?count}?is?negative
?????*/
????public?CountDownLatch(int?count)?{
????????if?(count?0)?throw?new?IllegalArgumentException("count?0");
????????this.sync?=?new?Sync(count);
????}
在源碼中可以看到,創(chuàng)建CountDownLatch時,需要傳入一個int類型的參數(shù),將決定在執(zhí)行次扣減之后,等待的線程被喚醒

?通過這個類圖就可以知道其實CountDownLatch并沒有多少東西
方法介紹:
CountDownLatch:初始化方法
await:等待方法,同時帶參數(shù)的是超時重載方法
countDown:每執(zhí)行一次,計數(shù)器減一,就是初始化傳入的數(shù)字,也代表著一個線程完成了任務(wù)
getCount:獲取當前值
toString:這個就不用說了
里面的Sync是一個內(nèi)部類,外面的方法其實都是操作這個內(nèi)部類的,這個內(nèi)部類繼承了AQS,實現(xiàn)的標準方法,AQS將在后面的章節(jié)寫

?
主線程中創(chuàng)建CountDownLatch(3),然后主線程await阻塞,然后線程A,B,C各自完成了任務(wù),調(diào)用了countDown,之后,每個線程調(diào)用一次計數(shù)器就會減一,初始是3,然后A線程調(diào)用后變成2,B線程調(diào)用后變成1,C線程調(diào)用后,變成0,這時就會喚醒正在await的主線程,然后主線程繼續(xù)執(zhí)行
說一千道一萬,不如代碼寫幾行,上代碼:
休眠工具類,之后的代碼都會用到
package?org.dance.tools;
import?java.util.concurrent.TimeUnit;
/**
?*?類說明:線程休眠輔助工具類
?*/
public?class?SleepTools?{
????/**
?????*?按秒休眠
?????*?@param?seconds?秒數(shù)
?????*/
????public?static?final?void?second(int?seconds)?{
????????try?{
????????????TimeUnit.SECONDS.sleep(seconds);
????????}?catch?(InterruptedException?e)?{
????????}
????}
????/**
?????*?按毫秒數(shù)休眠
?????*?@param?seconds?毫秒數(shù)
?????*/
????public?static?final?void?ms(int?seconds)?{
????????try?{
????????????TimeUnit.MILLISECONDS.sleep(seconds);
????????}?catch?(InterruptedException?e)?{
????????}
????}
}
package?org.dance.day2.util;
import?org.dance.tools.SleepTools;
import?java.util.concurrent.CountDownLatch;
/**
?*?CountDownLatch的使用,有五個線程,6個扣除點
?*?扣除完成后主線程和業(yè)務(wù)線程,才能執(zhí)行工作
?*??扣除點一般都是大于等于需要初始化的線程的
?*?@author?ZYGisComputer
?*/
public?class?UseCountDownLatch?{
????/**
?????*?設(shè)置為6個扣除點
?????*/
????static?CountDownLatch?countDownLatch?=?new?CountDownLatch(6);
????/**
?????*?初始化線程
?????*/
????private?static?class?InitThread?implements?Runnable?{
????????@Override
????????public?void?run()?{
????????????System.out.println("thread_"?+?Thread.currentThread().getId()?+?"?ready?init?work?.....");
????????????//?執(zhí)行扣減?扣減不代表結(jié)束
????????????countDownLatch.countDown();
????????????for?(int?i?=?0;?i?2;?i++)?{
????????????????System.out.println("thread_"?+?Thread.currentThread().getId()?+?".....continue?do?its?work");
????????????}
????????}
????}
????/**
?????*?業(yè)務(wù)線程
?????*/
????private?static?class?BusiThread?implements?Runnable?{
????????@Override
????????public?void?run()?{
????????????//?業(yè)務(wù)線程需要在等初始化完畢后才能執(zhí)行
????????????try?{
????????????????countDownLatch.await();
????????????????for?(int?i?=?0;?i?3;?i++)?{
????????????????????System.out.println("BusiThread?"?+?Thread.currentThread().getId()?+?"?do?business-----");
????????????????}
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
????public?static?void?main(String[]?args)?{
????????//?創(chuàng)建單獨的初始化線程
????????new?Thread(){
????????????@Override
????????????public?void?run()?{
????????????????SleepTools.ms(1);
????????????????System.out.println("thread_"?+?Thread.currentThread().getId()?+?"?ready?init?work?step?1st.....");
????????????????//?扣減一次
????????????????countDownLatch.countDown();
????????????????System.out.println("begin?stop?2nd.....");
????????????????SleepTools.ms(1);
????????????????System.out.println("thread_"?+?Thread.currentThread().getId()?+?"?ready?init?work?step?2nd.....");
????????????????//?扣減一次
????????????????countDownLatch.countDown();
????????????}
????????}.start();
????????//?啟動業(yè)務(wù)線程
????????new?Thread(new?BusiThread()).start();
????????//?啟動初始化線程
????????for?(int?i?=?0;?i?<=?3;?i++)?{
????????????new?Thread(new?InitThread()).start();
????????}
????????//?主線程進入等待
????????try?{
????????????countDownLatch.await();
????????????System.out.println("Main?do?ites?work.....");
????????}?catch?(InterruptedException?e)?{
????????????e.printStackTrace();
????????}
????}
}
返回結(jié)果:
thread_13?ready?init?work?.....
thread_13.....continue?do?its?work
thread_13.....continue?do?its?work
thread_14?ready?init?work?.....
thread_14.....continue?do?its?work
thread_14.....continue?do?its?work
thread_15?ready?init?work?.....
thread_15.....continue?do?its?work
thread_11?ready?init?work?step?1st.....
begin?stop?2nd.....
thread_16?ready?init?work?.....
thread_16.....continue?do?its?work
thread_16.....continue?do?its?work
thread_15.....continue?do?its?work
thread_11?ready?init?work?step?2nd.....
Main?do?ites?work.....
BusiThread?12?do?business-----
BusiThread?12?do?business-----
BusiThread?12?do?business-----
通過返回結(jié)果就可以很直接的看到業(yè)務(wù)線程是在初始化線程完全跑完之后,才開始執(zhí)行的
?
CyclicBarrier:
CyclicBarrier,俗稱柵欄鎖,作用是讓一組線程到達某個屏障,被阻塞,一直到組內(nèi)的最后一個線程到達,然后屏障開放,接著,所有的線程繼續(xù)運行
這個感覺和CountDownLatch有點相似,但是其實是不一樣的,所謂的差別,將在下面詳解
CyclicBarrier的構(gòu)造參數(shù)有兩個
/**
?????*?Creates?a?new?{@code?CyclicBarrier}?that?will?trip?when?the
?????*?given?number?of?parties?(threads)?are?waiting?upon?it,?and
?????*?does?not?perform?a?predefined?action?when?the?barrier?is?tripped.
?????*
?????*?@param?parties?the?number?of?threads?that?must?invoke?{@link?#await}
?????*????????before?the?barrier?is?tripped
?????*?@throws?IllegalArgumentException?if?{@code?parties}?is?less?than?1
?????*/
????public?CyclicBarrier(int?parties)?{
????????this(parties,?null);
????}
/**
?????*?Creates?a?new?{@code?CyclicBarrier}?that?will?trip?when?the
?????*?given?number?of?parties?(threads)?are?waiting?upon?it,?and?which
?????*?will?execute?the?given?barrier?action?when?the?barrier?is?tripped,
?????*?performed?by?the?last?thread?entering?the?barrier.
?????*
?????*?@param?parties?the?number?of?threads?that?must?invoke?{@link?#await}
?????*????????before?the?barrier?is?tripped
?????*?@param?barrierAction?the?command?to?execute?when?the?barrier?is
?????*????????tripped,?or?{@code?null}?if?there?is?no?action
?????*?@throws?IllegalArgumentException?if?{@code?parties}?is?less?than?1
?????*/
????public?CyclicBarrier(int?parties,?Runnable?barrierAction)?{
????????if?(parties?<=?0)?throw?new?IllegalArgumentException();
????????this.parties?=?parties;
????????this.count?=?parties;
????????this.barrierCommand?=?barrierAction;
????}
很明顯能感覺出來,上面的構(gòu)造參數(shù)調(diào)用了下面的構(gòu)造參數(shù),是一個構(gòu)造方法重載
首先這個第一個參數(shù)也樹Int類型的,傳入的是執(zhí)行線程的個數(shù),這個數(shù)量和CountDownLatch不一樣,這個數(shù)量是需要和線程數(shù)量吻合的,CountDownLatch則不一樣,CountDownLatch可以大于等于,而CyclicBarrier只能等于,然后是第二個參數(shù),第二個參數(shù)是barrierAction,這個參數(shù)是當屏障開放后,執(zhí)行的任務(wù)線程,如果當屏障開放后需要執(zhí)行什么任務(wù),可以寫在這個線程中

?
?
主線程創(chuàng)建CyclicBarrier(3,barrierAction),然后由線程開始執(zhí)行,線程A,B執(zhí)行完成后都調(diào)用了await,然后他們都在一個屏障前阻塞者,需要等待線程C也,執(zhí)行完成,調(diào)用await之后,然后三個線程都達到屏障后,屏障開放,然后線程繼續(xù)執(zhí)行,并且barrierAction在屏障開放的一瞬間也開始執(zhí)行
上代碼:
package?org.dance.day2.util;
import?org.dance.tools.SleepTools;
import?java.util.Map;
import?java.util.Random;
import?java.util.concurrent.BrokenBarrierException;
import?java.util.concurrent.ConcurrentHashMap;
import?java.util.concurrent.CyclicBarrier;
/**
?*?CyclicBarrier的使用
?*
?*?@author?ZYGisComputer
?*/
public?class?UseCyclicBarrier?{
????/**
?????*?存放子線程工作結(jié)果的安全容器
?????*/
????private?static?ConcurrentHashMap?resultMap?=?new?ConcurrentHashMap<>();
????private?static?CyclicBarrier?cyclicBarrier?=?new?CyclicBarrier(5,new?CollectThread());
????/**
?????*?結(jié)果打印線程
?????*?用來演示CyclicBarrier的第二個參數(shù),barrierAction
?????*/
????private?static?class?CollectThread?implements?Runnable?{
????????@Override
????????public?void?run()?{
????????????StringBuffer?result?=?new?StringBuffer();
????????????for?(Map.Entry?workResult?:?resultMap.entrySet())?{
????????????????result.append("["?+?workResult.getValue()?+?"]");
????????????}
????????????System.out.println("the?result?=?"?+?result);
????????????System.out.println("do?other?business.....");
????????}
????}
????/**
?????*?工作子線程
?????*?用于CyclicBarrier的一組線程
?????*/
????private?static?class?SubThread?implements?Runnable?{
????????@Override
????????public?void?run()?{
????????????//?獲取當前線程的ID
????????????long?id?=?Thread.currentThread().getId();
????????????//?放入統(tǒng)計容器中
????????????resultMap.put(String.valueOf(id),?id);
????????????Random?random?=?new?Random();
????????????try?{
????????????????if?(random.nextBoolean())?{
????????????????????Thread.sleep(1000?+?id);
????????????????????System.out.println("Thread_"+id+".....?do?something");
????????????????}
????????????????System.out.println(id+"?is?await");
????????????????cyclicBarrier.await();
????????????????Thread.sleep(1000+id);
????????????????System.out.println("Thread_"+id+".....do?its?business");
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}?catch?(BrokenBarrierException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
????public?static?void?main(String[]?args)?{
????????for?(int?i?=?0;?i?<=?4;?i++)?{
????????????Thread?thread?=?new?Thread(new?SubThread());
????????????thread.start();
????????}
????}
}
返回結(jié)果:
11?is?await
14?is?await
15?is?await
Thread_12.....?do?something
12?is?await
Thread_13.....?do?something
13?is?await
the?result?=?[11][12][13][14][15]
do?other?business.....
Thread_11.....do?its?business
Thread_12.....do?its?business
Thread_13.....do?its?business
Thread_14.....do?its?business
Thread_15.....do?its?business
通過返回結(jié)果可以看出前面的11 14 15三個線程沒有進入if語句塊,在執(zhí)行到await的時候進入了等待,而另外12 13兩個線程進入到了if語句塊當中,多休眠了1秒多,然后當5個線程同時到達await的時候,屏障開放,執(zhí)行了barrierAction線程,然后線程組繼續(xù)執(zhí)行
解釋一下CountDownLatch和CyclicBarrier的卻別吧!
首先就是CountDownLatch的構(gòu)造參數(shù)傳入的數(shù)量一般都是大于等于線程,數(shù)量的,因為他是有第三方控制的,可以扣減多次,然后就是CyclicBarrier的構(gòu)造參數(shù)第一個參數(shù)傳入的數(shù)量一定是等于線程的個數(shù)的,因為他是由一組線程自身控制的
區(qū)別
CountDownLatch CyclicBarrier
控制 ? ? ? ?第三方控制 ? ?自身控制
傳入數(shù)量 大于等于線程數(shù)量? ? ? ?等于線程數(shù)量
?
Semaphore:
Semaphore,俗稱信號量,作用于控制同時訪問某個特定資源的線程數(shù)量,用在流量控制
一說特定資源控制,那么第一時間就想到了數(shù)據(jù)庫連接..
之前用等待超時模式寫了一個數(shù)據(jù)庫連接池,打算用這個Semaphone也寫一個
/**
?????*?Creates?a?{@code?Semaphore}?with?the?given?number?of
?????*?permits?and?nonfair?fairness?setting.
?????*
?????*?@param?permits?the?initial?number?of?permits?available.
?????*????????This?value?may?be?negative,?in?which?case?releases
?????*????????must?occur?before?any?acquires?will?be?granted.
?????*/
????public?Semaphore(int?permits)?{
????????sync?=?new?NonfairSync(permits);
????}
在源碼中可以看到在構(gòu)建Semaphore信號量的時候,需要傳入許可證的數(shù)量,這個數(shù)量就是資源的最大允許的訪問的線程數(shù)
接下里用信號量實現(xiàn)一個數(shù)據(jù)庫連接池
連接對象
package?org.dance.day2.util.pool;
import?org.dance.tools.SleepTools;
import?java.sql.*;
import?java.util.Map;
import?java.util.Properties;
import?java.util.concurrent.Executor;
/**
?*?數(shù)據(jù)庫連接
?*?@author?ZYGisComputer
?*/
public?class?SqlConnection?implements?Connection?{
????/**
?????*?獲取數(shù)據(jù)庫連接
?????*?@return
?????*/
????public?static?final?Connection?fetchConnection(){
????????return?new?SqlConnection();
????}
????@Override
????public?void?commit()?throws?SQLException?{
????????SleepTools.ms(70);
????}
????@Override
????public?Statement?createStatement()?throws?SQLException?{
????????SleepTools.ms(1);
????????return?null;
????}
????@Override
????public?PreparedStatement?prepareStatement(String?sql)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?CallableStatement?prepareCall(String?sql)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?String?nativeSQL(String?sql)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?void?setAutoCommit(boolean?autoCommit)?throws?SQLException?{
????}
????@Override
????public?boolean?getAutoCommit()?throws?SQLException?{
????????return?false;
????}
????@Override
????public?void?rollback()?throws?SQLException?{
????}
????@Override
????public?void?close()?throws?SQLException?{
????}
????@Override
????public?boolean?isClosed()?throws?SQLException?{
????????return?false;
????}
????@Override
????public?DatabaseMetaData?getMetaData()?throws?SQLException?{
????????return?null;
????}
????@Override
????public?void?setReadOnly(boolean?readOnly)?throws?SQLException?{
????}
????@Override
????public?boolean?isReadOnly()?throws?SQLException?{
????????return?false;
????}
????@Override
????public?void?setCatalog(String?catalog)?throws?SQLException?{
????}
????@Override
????public?String?getCatalog()?throws?SQLException?{
????????return?null;
????}
????@Override
????public?void?setTransactionIsolation(int?level)?throws?SQLException?{
????}
????@Override
????public?int?getTransactionIsolation()?throws?SQLException?{
????????return?0;
????}
????@Override
????public?SQLWarning?getWarnings()?throws?SQLException?{
????????return?null;
????}
????@Override
????public?void?clearWarnings()?throws?SQLException?{
????}
????@Override
????public?Statement?createStatement(int?resultSetType,?int?resultSetConcurrency)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?PreparedStatement?prepareStatement(String?sql,?int?resultSetType,?int?resultSetConcurrency)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?CallableStatement?prepareCall(String?sql,?int?resultSetType,?int?resultSetConcurrency)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?Map>?getTypeMap()?throws?SQLException?{
????????return?null;
????}
????@Override
????public?void?setTypeMap(Map>?map)?throws?SQLException?{
????}
????@Override
????public?void?setHoldability(int?holdability)?throws?SQLException?{
????}
????@Override
????public?int?getHoldability()?throws?SQLException?{
????????return?0;
????}
????@Override
????public?Savepoint?setSavepoint()?throws?SQLException?{
????????return?null;
????}
????@Override
????public?Savepoint?setSavepoint(String?name)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?void?rollback(Savepoint?savepoint)?throws?SQLException?{
????}
????@Override
????public?void?releaseSavepoint(Savepoint?savepoint)?throws?SQLException?{
????}
????@Override
????public?Statement?createStatement(int?resultSetType,?int?resultSetConcurrency,?int?resultSetHoldability)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?PreparedStatement?prepareStatement(String?sql,?int?resultSetType,?int?resultSetConcurrency,?int?resultSetHoldability)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?CallableStatement?prepareCall(String?sql,?int?resultSetType,?int?resultSetConcurrency,?int?resultSetHoldability)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?PreparedStatement?prepareStatement(String?sql,?int?autoGeneratedKeys)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?PreparedStatement?prepareStatement(String?sql,?int[]?columnIndexes)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?PreparedStatement?prepareStatement(String?sql,?String[]?columnNames)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?Clob?createClob()?throws?SQLException?{
????????return?null;
????}
????@Override
????public?Blob?createBlob()?throws?SQLException?{
????????return?null;
????}
????@Override
????public?NClob?createNClob()?throws?SQLException?{
????????return?null;
????}
????@Override
????public?SQLXML?createSQLXML()?throws?SQLException?{
????????return?null;
????}
????@Override
????public?boolean?isValid(int?timeout)?throws?SQLException?{
????????return?false;
????}
????@Override
????public?void?setClientInfo(String?name,?String?value)?throws?SQLClientInfoException?{
????}
????@Override
????public?void?setClientInfo(Properties?properties)?throws?SQLClientInfoException?{
????}
????@Override
????public?String?getClientInfo(String?name)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?Properties?getClientInfo()?throws?SQLException?{
????????return?null;
????}
????@Override
????public?Array?createArrayOf(String?typeName,?Object[]?elements)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?Struct?createStruct(String?typeName,?Object[]?attributes)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?void?setSchema(String?schema)?throws?SQLException?{
????}
????@Override
????public?String?getSchema()?throws?SQLException?{
????????return?null;
????}
????@Override
????public?void?abort(Executor?executor)?throws?SQLException?{
????}
????@Override
????public?void?setNetworkTimeout(Executor?executor,?int?milliseconds)?throws?SQLException?{
????}
????@Override
????public?int?getNetworkTimeout()?throws?SQLException?{
????????return?0;
????}
????@Override
????public??T?unwrap(Class?iface)?throws?SQLException?{
????????return?null;
????}
????@Override
????public?boolean?isWrapperFor(Class>?iface)?throws?SQLException?{
????????return?false;
????}
}
連接池對象
package?org.dance.day2.util.pool;
import?java.sql.Connection;
import?java.util.ArrayList;
import?java.util.HashSet;
import?java.util.Iterator;
import?java.util.LinkedList;
import?java.util.concurrent.Semaphore;
/**
?*?使用信號量控制數(shù)據(jù)庫的鏈接和釋放
?*
?*?@author?ZYGisComputer
?*/
public?class?DBPoolSemaphore?{
????/**
?????*?池容量
?????*/
????private?final?static?int?POOL_SIZE?=?10;
????/**
?????*?useful?代表可用連接
?????*?useless?代表已用連接
?????*??為什么要使用兩個Semaphore呢?是因為,在連接池中不只有連接本身是資源,空位也是資源,也需要記錄
?????*/
????private?final?Semaphore?useful,?useless;
????/**
?????*?連接池
?????*/
????private?final?static?LinkedList?POOL?=?new?LinkedList<>();
????/**
?????*?使用靜態(tài)塊初始化池
?????*/
????static?{
????????for?(int?i?=?0;?i?????????????POOL.addLast(SqlConnection.fetchConnection());
????????}
????}
????public?DBPoolSemaphore()?{
????????//?初始可用的許可證等于池容量
????????useful?=?new?Semaphore(POOL_SIZE);
????????//?初始不可用的許可證容量為0
????????useless?=?new?Semaphore(0);
????}
????/**
?????*?獲取數(shù)據(jù)庫連接
?????*
?????*?@return?連接對象
?????*/
????public?Connection?takeConnection()?throws?InterruptedException?{
????????//?可用許可證減一
????????useful.acquire();
????????Connection?connection;
????????synchronized?(POOL)?{
????????????connection?=?POOL.removeFirst();
????????}
????????//?不可用許可證數(shù)量加一
????????useless.release();
????????return?connection;
????}
????/**
?????*?釋放鏈接
?????*
?????*?@param?connection?連接對象
?????*/
????public?void?returnConnection(Connection?connection)?throws?InterruptedException?{
????????if(null!=connection){
????????????//?打印日志
????????????System.out.println("當前有"+useful.getQueueLength()+"個線程等待獲取連接,,"
????????????????????+"可用連接有"+useful.availablePermits()+"個");
????????????//?不可用許可證減一
????????????useless.acquire();
????????????synchronized?(POOL){
????????????????POOL.addLast(connection);
????????????}
????????????//?可用許可證加一
????????????useful.release();
????????}
????}
}
測試類:
package?org.dance.day2.util.pool;
import?org.dance.tools.SleepTools;
import?java.sql.Connection;
import?java.util.Random;
/**
?*?測試Semaphore
?*?@author?ZYGisComputer
?*/
public?class?UseSemaphore?{
????/**
?????*?連接池
?????*/
????public?static?final?DBPoolSemaphore?pool?=?new?DBPoolSemaphore();
????private?static?class?BusiThread?extends?Thread{
????????@Override
????????public?void?run()?{
????????????//?隨機數(shù)工具類?為了讓每個線程持有連接的時間不一樣
????????????Random?random?=?new?Random();
????????????long?start?=?System.currentTimeMillis();
????????????try?{
????????????????Connection?connection?=?pool.takeConnection();
????????????????System.out.println("Thread_"+Thread.currentThread().getId()+
????????????????????????"_獲取數(shù)據(jù)庫連接耗時["+(System.currentTimeMillis()-start)+"]ms.");
????????????????//?模擬使用連接查詢數(shù)據(jù)
????????????????SleepTools.ms(100+random.nextInt(100));
????????????????System.out.println("查詢數(shù)據(jù)完成歸還連接");
????????????????pool.returnConnection(connection);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????}
????}
????public?static?void?main(String[]?args)?{
????????for?(int?i?=?0;?i?50;?i++)?{
????????????BusiThread?busiThread?=?new?BusiThread();
????????????busiThread.start();
????????}
????}
}
測試返回結(jié)果:
Thread_11_獲取數(shù)據(jù)庫連接耗時[0]ms.
Thread_12_獲取數(shù)據(jù)庫連接耗時[0]ms.
Thread_13_獲取數(shù)據(jù)庫連接耗時[0]ms.
Thread_14_獲取數(shù)據(jù)庫連接耗時[0]ms.
Thread_15_獲取數(shù)據(jù)庫連接耗時[0]ms.
Thread_16_獲取數(shù)據(jù)庫連接耗時[0]ms.
Thread_17_獲取數(shù)據(jù)庫連接耗時[0]ms.
Thread_18_獲取數(shù)據(jù)庫連接耗時[0]ms.
Thread_19_獲取數(shù)據(jù)庫連接耗時[0]ms.
Thread_20_獲取數(shù)據(jù)庫連接耗時[0]ms.
查詢數(shù)據(jù)完成歸還連接
當前有40個線程等待獲取連接,,可用連接有0個
Thread_21_獲取數(shù)據(jù)庫連接耗時[112]ms.
查詢數(shù)據(jù)完成歸還連接
...................查詢數(shù)據(jù)完成歸還連接
當前有2個線程等待獲取連接,,可用連接有0個
Thread_59_獲取數(shù)據(jù)庫連接耗時[637]ms.
查詢數(shù)據(jù)完成歸還連接
當前有1個線程等待獲取連接,,可用連接有0個
Thread_60_獲取數(shù)據(jù)庫連接耗時[660]ms.
查詢數(shù)據(jù)完成歸還連接
當前有0個線程等待獲取連接,,可用連接有0個
查詢數(shù)據(jù)完成歸還連接...................
當前有0個線程等待獲取連接,,可用連接有8個
查詢數(shù)據(jù)完成歸還連接
當前有0個線程等待獲取連接,,可用連接有9個
通過執(zhí)行結(jié)果可以很明確的看到,一上來就有10個線程獲取到了連接,,然后后面的40個線程進入阻塞,然后只有釋放鏈接之后,等待的線程就會有一個拿到,然后越后面的線程等待的時間就越長,然后一直到所有的線程執(zhí)行完畢
最后打印的可用連接有九個不是因為少了一個是因為在釋放之前打印的,不是錯誤
從結(jié)果中可以看到,我們對連接池中的資源的到了控制,這就是信號量的流量控制
?
Exchanger:
Exchanger,俗稱交換器,用于在線程之間交換數(shù)據(jù),但是比較受限,因為只能兩個線程之間交換數(shù)據(jù)
/**
?????*?Creates?a?new?Exchanger.
?????*/
????public?Exchanger()?{
????????participant?=?new?Participant();
????}
這個構(gòu)造函數(shù)沒有什么好說的,也沒有入?yún)?只有在創(chuàng)建的時候指定一下需要交換的數(shù)據(jù)的泛型即可,下面看代碼
package?org.dance.day2.util;
import?java.util.HashSet;
import?java.util.Set;
import?java.util.concurrent.Exchanger;
/**
?*?線程之間交換數(shù)據(jù)
?*?@author?ZYGisComputer
?*/
public?class?UseExchange?{
????private?static?final?Exchanger>?exchanger?=?new?Exchanger<>();
????public?static?void?main(String[]?args)?{
????????new?Thread(){
????????????@Override
????????????public?void?run()?{
????????????????Set?aSet?=?new?HashSet<>();
????????????????aSet.add("A");
????????????????aSet.add("B");
????????????????aSet.add("C");
????????????????try?{
????????????????????Set?exchange?=?exchanger.exchange(aSet);
????????????????????for?(String?s?:?exchange)?{
????????????????????????System.out.println("aSet"+s);
????????????????????}
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}.start();
????????new?Thread(){
????????????@Override
????????????public?void?run()?{
????????????????Set?bSet?=?new?HashSet<>();
????????????????bSet.add("1");
????????????????bSet.add("2");
????????????????bSet.add("3");
????????????????try?{
????????????????????Set?exchange?=?exchanger.exchange(bSet);
????????????????????for?(String?s?:?exchange)?{
????????????????????????System.out.println("bSet"+s);
????????????????????}
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}.start();
????}
}
執(zhí)行結(jié)果:
bSetA
bSetB
bSetC
aSet1
aSet2
aSet3
通過執(zhí)行結(jié)果可以清晰的看到,兩個線程中的數(shù)據(jù)發(fā)生了交換,這就是Exchanger的線程數(shù)據(jù)交換了
以上就是JUC的4大常用并發(fā)工具類了
??作者?|??彼岸舞
來源 |??cnblogs.com/flower-dance/p/13714006.html

