快速上手JUC下常見并發(fā)容器
Python實戰(zhàn)社群
Java實戰(zhàn)社群
長按識別下方二維碼,按需求添加
掃碼關(guān)注添加客服
進(jìn)Python社群▲
掃碼關(guān)注添加客服
進(jìn)Java社群▲
作者丨sowhat1412
來源丨sowhat1412

多線程環(huán)境下Java提供的一些簡單容器都無法使用了,此時要用到JUC中的容器,由于?ConcurrentHashMap?是高頻考點,用到也比較多因此著重寫過了,其余的容器就看今天咯。
跳表知識點
簡而言之跳表就是多層鏈表的結(jié)合體,跳表分為許多層(level),每一層都可以看作是數(shù)據(jù)的索引,這些索引的意義就是加快跳表查找數(shù)據(jù)速度。每一層的數(shù)據(jù)都是有序的,上一層數(shù)據(jù)是下一層數(shù)據(jù)的子集,并且第一層(level 1)包含了全部的數(shù)據(jù);層次越高,跳躍性越大,包含的數(shù)據(jù)越少。并且隨便插入一個數(shù)據(jù)該數(shù)據(jù)是否會是跳表索引完全隨機(jī)的跟玩骰子一樣,redis中的zset底層就是跳表數(shù)據(jù)結(jié)構(gòu)。并且跳表的速度幾乎接近紅黑樹了。跳表包含一個表頭,它查找數(shù)據(jù)時,是從上往下,從左往右進(jìn)行查找?,F(xiàn)在“需要找出值為37的節(jié)點”為例,來對比說明跳表和普遍的鏈表。
沒有跳表查詢 比如我查詢數(shù)據(jù)37,如果沒有上面的索引時候路線如下圖: 
有跳表查詢 有跳表查詢37的時候路線如下圖:
延伸思考:既然跳表實現(xiàn)簡單速度挺好ConcurrentHashMap為什么不直接用跳表用紅黑樹?先說下在HashMap中一般空間利用率就在40%作用,而ConcurrentHashMap空間利用率只能達(dá)到10%~20%。如果這個時候再不節(jié)省空間還用跳表替換紅黑樹。那么就涼涼了。
ConcurrentSkipListMap
我們在存儲 kv 的時候一般有三種容器可以使用,TreeMap、ConcurrentSkipListMap、HashMap三種容器。其中TreeMap可以理解為紅黑樹在Java中的具體實現(xiàn)(紅黑樹、2-3-4樹也是賊好玩的一個知識點,懶的寫了,如果讀者想看再寫不遲)。ConcurrentSkipListMap主要就是利用跳表的思維來實現(xiàn)速度的提升,他們區(qū)別跟性能對比如下:
TreeMap基于紅黑樹(平衡二叉查找樹)實現(xiàn)的,時間復(fù)雜度平均能達(dá)到O(log n),多線程不安全。HashMap是基于散列表實現(xiàn)的,時間復(fù)雜度平均能達(dá)到O(1),多線程不安全。ConcurrentSkipListMap是基于跳表實現(xiàn)的,時間復(fù)雜度平均能達(dá)到O(log n),多線程安全。紅黑樹涉及各種旋轉(zhuǎn)操作比較復(fù)雜,HashMap底層數(shù)組+ 鏈表+ 紅黑樹,跳表實現(xiàn)起來就很簡單了。
結(jié)論:
當(dāng)數(shù)據(jù)量增加時,HashMap會引起散列沖突,解決沖突需要多花費一些時間代價,故在f(n)=1向上浮動。隨著數(shù)據(jù)量的增加,HashMap的時間花費小且穩(wěn)定,充分秉承著空間換時間的思想,在單線程的環(huán)境下比TreeMap和ConcurrentSkipListMap在插入和查找上有很大的優(yōu)勢。 如果必須有序且多線程就用ConcurrentSkipListMap,如果單線程不需要考慮是否有序就用HashMap。
其中ConcurrentSkipListMap基礎(chǔ)結(jié)構(gòu)圖如下:
ConcurrentSkipListSet
Set是一個無序的數(shù)據(jù)集合,TreeSet的底層是通過TreeMap實現(xiàn)的,思想其實跟HashMap和HashSet類似,TreeSet就是只有Key的TreeMap 。TreeSet是通過紅黑樹來實現(xiàn)的速度可達(dá)到O(log n)但是線程也是不安全的。ConcurrentSkipListSet是基于跳表實現(xiàn)的線程安全的ListSet。
ConcurrentLinkedQueue
可以認(rèn)為是LinkedList的多線程安全升級版。一個基于鏈表節(jié)點的無界線程安全隊列。此隊列按照 FIFO原則對元素進(jìn)行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列獲取操作從隊列頭部獲得元素。當(dāng)多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當(dāng)?shù)倪x擇,底層用了很多sun.misc.Unsafe UNSAFE硬件級別的原子操作。此隊列不允許使用 null 元素。
offer(E e) ?:將指定元素插入此隊列的尾部。 add(E e): 跟offer 功能一樣將指定元素插入此隊列的尾部, add方法體調(diào)用的就是offer. poll() : 獲取并移除此隊列的頭,如果此隊列為空,則返回 null peek() : 獲取但不移除此隊列的頭,如果此隊列為空,則返回 null remove(Object o) : 從隊列中移除指定元素的單個實例(如果存在)
CopyOnWriteArrayList
CopyOnWrite 寫時復(fù)制的容器。通俗的理解是當(dāng)我們往一個容器添加元素的時候,不直接往當(dāng)前容器添加,而是先將當(dāng)前容器進(jìn)行Copy,復(fù)制出一個新的容器,然后新的容器里添加元素,添加完元素之后,再將原容器的引用指向新的容器。這樣做的好處是我們可以對容器進(jìn)行并發(fā)的讀,而不需要加鎖,因為當(dāng)前容器不會添加任何元素,但是寫的時候還是要鎖的!所以寫時復(fù)制容器也是一種讀寫分離的思想,讀和寫不同的容器。如果讀的時候有多個線程正在向容器添加數(shù)據(jù),讀還是會讀到舊的數(shù)據(jù),因為寫的時候不會鎖住舊的,只能保證最終一致性。Redis中執(zhí)行bgsave時候就是用的此機(jī)制。這種機(jī)制寫一次就要copy一份。多個線程要執(zhí)行寫操作必須等上一個線程執(zhí)行完畢。如果用讀寫鎖我在寫的時候你是無法讀的,鎖無法降級的。 CopyOnWriteArrayList底層用的ReentrantLock()來實現(xiàn)加鎖,這又印證了AQS占據(jù)JUC半壁江山。
優(yōu)點
對于一些讀多寫少的數(shù)據(jù),這種做法的確很不錯,例如配置、黑名單、物流地址等變化非常少的數(shù)據(jù),這是一種無鎖的實現(xiàn)??梢詭臀覀儗崿F(xiàn)程序更高的并發(fā)。
缺點
這種實現(xiàn)只是保證數(shù)據(jù)的最終一致性,在添加到拷貝數(shù)據(jù)而還沒進(jìn)行替換的時候,讀到的仍然是舊數(shù)據(jù)。如果對象比較大,頻繁地進(jìn)行替換會消耗內(nèi)存,從而引發(fā)Java的GC問題,這個時候,我們應(yīng)該考慮其他的容器,例如ConcurrentHashMap。
CopyOnWriteArraySet
CopyOnWriteArraySet是基于CopyOnWriteArrayList實現(xiàn)的,只有add的方法稍微有些不同,因為CopyOnWriteArraySet是Set也就是不能有重復(fù)的元素,故在CopyOnWriteArraySet中用了addIfAbsent(e)這樣的方法。
BlockingQueue
在JUC包中BlockingQueue很好的解決了多線程中,如何高效安全傳輸數(shù)據(jù)的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利。BlockingQueue即阻塞隊列,它是基于 ReentrantLock 實現(xiàn)的,BlockingQueue阻塞隊列的概念:
當(dāng)隊列滿的時候,插入元素的線程被阻塞,直達(dá)隊列不滿。 隊列為空的時候,獲取元素的線程被阻塞,直到隊列不空。
生產(chǎn)者和消費者模式概念:
1、生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。
2、在多線程開發(fā)中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。
3、為了解決這種生產(chǎn)消費能力不均衡的問題,便有了生產(chǎn)者和消費者模式。生產(chǎn)者和消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強(qiáng)耦合問題。生產(chǎn)者和消費者彼此之間不直接通信,而是通過阻塞隊列來進(jìn)行通信,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。
BlockingQueue是個接口,主要又有若干方法。
常用方法:
| 方法 | 拋出異常 | 返回值 | 一直阻塞 | 超時退出 |
|---|---|---|---|---|
| 插入方法 | add | offer | put | offer(time) |
| 移除方法 | remove | poll | take | poll(time) |
| 檢查 方法 | element | peek | N/A | N/A |
1、add(E e):在不違反容量限制的情況下,可立即將指定元素插入此隊列,成功返回true,當(dāng)無可用空間時候,返回IllegalStateException異常。
2、offer(E e):在不違反容量限制的情況下,可立即將指定元素插入此隊列,成功返回true,當(dāng)無可用空間時候,返回false。
3、put(E e):直接在隊列中插入元素,當(dāng)無可用空間時候,阻塞等待。
4、offer(E e, long time, timeunit unit):將給定元素在給定的時間內(nèi)設(shè)置到隊列中,如果設(shè)置成功返回true, 否則返回false。
5、?E take():獲取并移除隊列頭部的元素,無元素時候阻塞等待。
6、?E poll( long time, timeunit unit):獲取并移除隊列頭部的元素,無元素時候阻塞等待指定時間。
7、remove(Object o) :若隊列為空,拋出NoSuchElementException異常
8、E poll():若隊列為空,返回null
BlockingQueue是一個接口,它的實現(xiàn)類有ArrayBlockingQueue、LinkedBlockingDeque、PriorityBlockingQueue、DelayQueue、SynchronousQueue 、LinkedTransferQueue、LinkedBlockingQueue等,它們的區(qū)別主要體現(xiàn)在存儲結(jié)構(gòu)上或?qū)υ夭僮魃系牟煌菍τ趖ake與put操作的原理,卻是類似的。
ArrayBlockingQueue
一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列,按照先進(jìn)先出原則,其中有界也就意味著,它不能夠存儲無限多數(shù)量的對象,要求設(shè)定初始大小。數(shù)組類型:
/**?The?queued?items?*/
final?Object[]?items;
唯一全局鎖
//?這是一個掌管所有訪問操作的鎖。全局共享。都會使用這個鎖。
final?ReentrantLock?lock;
兩個等待隊列
????/**?Condition?for?waiting?takes?*/
????private?final?Condition?notEmpty;
????/**?Condition?for?waiting?puts?*/
????private?final?Condition?notFull;
put 方法
????public?void?put(E?e)?throws?InterruptedException?{
????????checkNotNull(e);
????????final?ReentrantLock?lock?=?this.lock;?//?唯一鎖
????????lock.lockInterruptibly();//?加鎖
????????try?{
????????????while?(count?==?items.length)
????????????????notFull.await();//await?讓出操作權(quán)
????????????enqueue(e);//?被喚醒就加入隊列。
????????}?finally?{
????????????lock.unlock();//?解鎖
????????}
????}
take方法
????public?E?take()?throws?InterruptedException?{
????????final?ReentrantLock?lock?=?this.lock;?//?加鎖
????????lock.lockInterruptibly();
????????try?{
????????????while?(count?==?0)//為空則釋放當(dāng)前鎖
????????????????notEmpty.await();
????????????return?dequeue();//?獲得鎖被喚醒了則返回數(shù)據(jù)
????????}?finally?{
????????????lock.unlock();//?釋放鎖
????????}
????}
LinkedBlockingQueue
LinkedBlockingQueue是一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列,按照先進(jìn)先出原則,可以不設(shè)定初始大小,默認(rèn)Integer.MAX_VALUE,為了避免隊列過大造成機(jī)器負(fù)載或者內(nèi)存爆滿的情況出現(xiàn),在使用的時候一般建議手動傳一個隊列的大小。
//節(jié)點類,用于存儲數(shù)據(jù)
static?class?Node<E>?{
????E?item;
????Node?next;
????Node(E?x)?{?item?=?x;?}
}
//?阻塞隊列的大小,默認(rèn)為Integer.MAX_VALUE?
private?final?int?capacity;
//當(dāng)前阻塞隊列中的元素個數(shù)?
private?final?AtomicInteger?count?=?new?AtomicInteger();
//?阻塞隊列的頭結(jié)點
transient?Node?head;
//?阻塞隊列的尾節(jié)點
private?transient?Node?last;
//?獲取并移除元素時使用的鎖,如take,?poll
private?final?ReentrantLock?takeLock?=?new?ReentrantLock();
//??notEmpty條件對象,當(dāng)隊列沒有數(shù)據(jù)時用于?掛起?執(zhí)行刪除的線程
private?final?Condition?notEmpty?=?takeLock.newCondition();
//?添加元素時使用的鎖如?put,?offer
private?final?ReentrantLock?putLock?=?new?ReentrantLock();
//?notFull條件對象,當(dāng)隊列數(shù)據(jù)已滿時用于?掛起?執(zhí)行添加的線程
private?final?Condition?notFull?=?putLock.newCondition();
添加到LinkedBlockingQueue隊列中的數(shù)據(jù)都將被封裝成Node節(jié)點,添加的鏈表隊列中,其中head和last分別指向隊列的頭結(jié)點和尾結(jié)點。與ArrayBlockingQueue不同的是,LinkedBlockingQueue內(nèi)部分別使用了takeLock 和putLock 對并發(fā)進(jìn)行控制,也就是說,添加和刪除操作并不是互斥操作,可以同時進(jìn)行,這樣也就可以大大提高吞吐量。
ArrayBlockingQueue和LinkedBlockingQueue對比:
實現(xiàn):ArrayBlockingQueue底層上數(shù)組,ArrayBlockingQueue用Node包裝后的鏈表(包含包裝導(dǎo)致更大更冗余易觸發(fā)GC) 初始化:ArrayBlockingQueue必須要求有初始值,ArrayBlockingQueue沒有強(qiáng)制要求。 鎖上:ArrayBlockingQueue實現(xiàn)的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,而ArrayBlockingQueue實現(xiàn)的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量。
SynchronousQueue
一個不存儲元素的阻塞隊列。每一個put操作都要等待一個take操作請求才會put數(shù)據(jù)。
PriorityBlockingQueue
PriorityBlockingQueue是一個支持優(yōu)先級的無界阻塞隊列,直到系統(tǒng)資源耗盡。默認(rèn)情況下元素采用自然順序升序排列。也可以自定義類繼承comparable實現(xiàn)compareTo()方法來指定元素排序規(guī)則,或者初始化PriorityBlockingQueue時,指定構(gòu)造參數(shù)Comparator來對元素進(jìn)行排序。但需要注意的是不能保證同優(yōu)先級元素的順序。PriorityBlockingQueue也是基最小二叉堆實現(xiàn),使用基于CAS實現(xiàn)的自旋鎖來控制隊列的動態(tài)擴(kuò)容,保證了擴(kuò)容操作不會阻塞take操作的執(zhí)行
LinkedTransferQueue
LinkedTransferQueue是一個由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue隊列。該類實現(xiàn)了一個TransferQueue接口,相對于其他阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。
public?interface?TransferQueue<E>?extends?BlockingQueue<E>?{
????//?如果可能,立即將元素轉(zhuǎn)移給等待的消費者。?
????//?更確切地說,如果存在消費者已經(jīng)等待接收它(在 take 或 timed poll(long,TimeUnit)poll)中,則?立即傳送指定?的元素,否則返回 false。
????boolean?tryTransfer(E?e);
????//?將元素轉(zhuǎn)移給消費者,如果需要的話等待。?
????//?更準(zhǔn)確地說,如果存在一個消費者已經(jīng)等待接收它(在 take 或timed poll(long,TimeUnit)poll)中,則立即傳送指定的元素,否則?等待?直到?元素由消費者接收。
????void?transfer(E?e)?throws?InterruptedException;
????//?上面方法的基礎(chǔ)上設(shè)置超時時間
????boolean?tryTransfer(E?e,?long?timeout,?TimeUnit?unit)?throws?InterruptedException;
????//?如果至少有一位消費者在等待,則返回?true
????boolean?hasWaitingConsumer();
????//?返回等待消費者人數(shù)的估計值
????int?getWaitingConsumerCount();
}
LinkedBlockingDeque
LinkedBlockingDeque一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列,注意Deque的存在。可以從隊列的頭和尾都可以插入和移除元素,可以實現(xiàn)工作密取,比如ForkJoin底層任務(wù)隊列。方法名帶了first對頭部操作,帶了last從尾部操作。另外方法調(diào)用的時候默認(rèn)
add=addLast; remove=removeFirst; take=takeFirst
??public?boolean?add(E?e)?{
????????addLast(e);?//?等價
????????return?true;
????}
????public?E?remove()?{
????????return?removeFirst();
????}
????public?E?take()?throws?InterruptedException?{
????????return?takeFirst();
????}
DelayQueue
DelayQueue 一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列。支持延時獲取的元素的阻塞隊列,元素必須要實現(xiàn)Delayed接口。放入隊列中的元素只有在指定的timeout后才可以取出,也就是說隊列中元素的順序是按到期時間排序的,而非它們進(jìn)入隊列的順序。排在隊列頭部的元素是最早到期的,越往后到期時間赿晚。適用場景:實現(xiàn)自己的緩存系統(tǒng),訂單到期,限時支付等。
demo 加深印象
任務(wù):一個訂單系統(tǒng),通過阻塞隊列延時功能實現(xiàn),需要(訂單類,包裝訂單類,生產(chǎn)者,消費者,測試)
訂單類
public?class?Order?{
?private?final?String?orderNo;//訂單的編號
?private?final?double?orderMoney;//訂單的金額
?public?Order(String?orderNo,?double?orderMoney)?{
??super();
??this.orderNo?=?orderNo;
??this.orderMoney?=?orderMoney;
?}
?public?String?getOrderNo()?{
??return?orderNo;
?}
?public?double?getOrderMoney()?{
??return?orderMoney;
?}
}
包裝類
//?類說明:存放到隊列的元素
public?class?ItemVo<T>?implements?Delayed?{
????private?long?activeTime;//到期時間,單位毫秒
????private?T?object;
????//activeTime是個過期時長
????public?ItemVo(long?activeTime,?T?object)?{
????????super();
????????this.activeTime?=?TimeUnit.NANOSECONDS.convert(activeTime,?TimeUnit.MILLISECONDS)?+?System.nanoTime();
????????//?將傳入的時長轉(zhuǎn)換為超時的時刻
????????this.object?=?object;
????}
????public?T?getObject()?{
????????return?object;
????}
????//按照剩余時間排序
????@Override
????public?int?compareTo(Delayed?o)?{
????????long?d?=?getDelay(TimeUnit.NANOSECONDS)?-?o.getDelay(TimeUnit.NANOSECONDS);
????????return?(d?==?0)???0?:?((d?>?0)???1?:?-1);
????}
????//返回元素的剩余時間
????@Override
????public?long?getDelay(TimeUnit?unit)?{
????????long?d?=?unit.convert(this.activeTime?-?System.nanoTime(),?TimeUnit.NANOSECONDS);
????????return?d;
????}
}
生產(chǎn)者
public?class?PutOrder?implements?Runnable?{
????private?DelayQueue>?queue;
????public?PutOrder(DelayQueue>?queue) ?{
????????super();
????????this.queue?=?queue;
????}
????@Override
????public?void?run()?{
????????//5秒到期
????????Order?ordeTb?=?new?Order("TBSoWhat",?14);
????????ItemVo?itemTb?=?new?ItemVo(5000,?ordeTb);
????????queue.offer(itemTb);?//插入
????????System.out.println("訂單5秒后到期:"?+?ordeTb.getOrderNo());
????????//8秒到期
????????Order?ordeJd?=?new?Order("JDSoWhat",?12);
????????ItemVo?itemJd?=?new?ItemVo(8000,?ordeJd);
????????queue.offer(itemJd);//?插入
????????System.out.println("訂單8秒后到期:"?+?ordeJd.getOrderNo());
????}
}
消費者
public?class?FetchOrder?implements?Runnable?{
?private?DelayQueue>?queue;
?public?FetchOrder(DelayQueue>?queue) ?{
??super();
??this.queue?=?queue;
?}
?@Override
?public?void?run()?{
??while(true)?{
???try?{
????ItemVo?item?=?queue.take();
????Order?order?=?(Order)item.getObject();
????System.out.println("get?from?queue:"+order.getOrderNo());
???}?catch?(InterruptedException?e)?{
????e.printStackTrace();
???}
??}
?}?
}
測試延時功能
public?class?Test
{
?public?static?void?main(String[]?args)?throws?InterruptedException
?{
??DelayQueue>?queue?=?new?DelayQueue<>();
??new?Thread(new?PutOrder(queue)).start();
??new?Thread(new?FetchOrder(queue)).start();
??//每隔1秒,打印個數(shù)字
??for?(int?i?=?1;?i?10;?i++)
??{
???Thread.sleep(1000);
???System.out.println(i?*?1000);
??}
?}
}

ForkJoin
1.Fork/Join流程:
ForkJoin是一種分治的思想,在1.7中引入JDK中?,F(xiàn)實生活中的快排,隊排,MapReduce都是思想的 實現(xiàn),意思是在必要的情況下,將一個大任務(wù),進(jìn)行拆分(fork) 成若干個子任務(wù)(拆到不能再拆,這里就是指我們制定的拆分的臨界值),再將一個個小任務(wù)的結(jié)果進(jìn)行join匯總。
2. 工作竊取模式
從上述Fork/Join框架的描述可以看出,我們需要一些線程來執(zhí)行Fork出的任務(wù),在實際中,如果每次都創(chuàng)建新的線程執(zhí)行任務(wù),對系統(tǒng)資源的開銷會很大,所以Fork/Join框架利用了線程池來調(diào)度任務(wù)。
另外,這里可以思考一個問題,既然由線程池調(diào)度,根據(jù)我們之前學(xué)習(xí)普通/計劃線程池的經(jīng)驗,必然存在兩個要素:
工作線程
任務(wù)隊列
一般的線程池只有一個任務(wù)隊列,但是對于Fork/Join框架來說,由于Fork出的各個子任務(wù)其實是平行關(guān)系,為了提高效率,減少線程競爭,應(yīng)該將這些平行的任務(wù)放到中去,如上不同的隊列圖中,大任務(wù)分解成三個子任務(wù):子任務(wù)1、子任務(wù)2,那么就創(chuàng)建兩個任務(wù)隊列,然后再創(chuàng)建3個工作線程與隊列一一對應(yīng)。
那么為什么需要使用工作竊取算法呢?假如我們需要做一個比較大的任務(wù),我們可以把這個任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競爭,于是把這些子任務(wù)分別放到不同的隊列里,并為每個隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務(wù),線程和隊列一一對應(yīng),比如A線程負(fù)責(zé)處理A隊列里的任務(wù)。但是有的線程會先把自己隊列里的任務(wù)干完,而其他線程對應(yīng)的隊列里還有任務(wù)等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務(wù)來執(zhí)行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競爭,通常會使用雙端隊列,被竊取任務(wù)線程永遠(yuǎn)從雙端隊列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊列的尾部拿任務(wù)執(zhí)行。
滿足這一需求的任務(wù)隊列其實就是JUC框架中介紹過的雙端阻塞隊列 LinkedBlockingDeque。
工作竊取算法的優(yōu)點是充分利用線程進(jìn)行并行計算,并減少了線程間的競爭,其缺點是在某些情況下還是存在競爭,比如雙端隊列里只有一個任務(wù)時。并且消耗了更多的系統(tǒng)資源,比如創(chuàng)建多個線程和多個雙端隊列。并且在進(jìn)行RR跟上下文切換也會耗時的,所以不一定是多線程就一定 比單線程速度快。彈性而定,看任務(wù)量。
3. demo演示
ForkJoin有兩種繼承方式,RecursiveTask有返回值,RecursiveAction無返回值
任務(wù)需求:假設(shè)有個非常大的long[]數(shù)組,通過FJ框架求解數(shù)組所有元素的和。任務(wù)類定義,因為需要返回結(jié)果,所以繼承RecursiveTask,并覆寫compute方法。任務(wù)的fork通過ForkJoinTask的fork方法執(zhí)行,join方法方法用于等待任務(wù)執(zhí)行后返回:
public?class?ForkJoinWork?extends?RecursiveTask<Long>?{
????private?Long?start;//起始值
????private?Long?end;//結(jié)束值
????public?static?final?Long?critical?=?100000L;//臨界值
????public?ForkJoinWork(Long?start,?Long?end)?{
????????this.start?=?start;
????????this.end?=?end;
????}
????@Override
????protected?Long?compute()?{
????????//?return?null;
????????//判斷是否是拆分完畢
????????Long?lenth?=?end?-?start;???//起始值差值
????????if?(lenth?<=?critical)?{
????????????//如果拆分完畢就相加
????????????Long?sum?=?0L;
????????????for?(Long?i?=?start;?i?<=?end;?i++)?{
????????????????sum?+=?i;
????????????}
????????????return?sum;
????????}?else?{
????????????//沒有拆分完畢就開始拆分
????????????Long?middle?=?(end?+?start)?/?2;//計算的兩個值的中間值
????????????ForkJoinWork?right?=?new?ForkJoinWork(start,?middle);
????????????right.fork();//拆分,并壓入線程隊列
????????????ForkJoinWork?left?=?new?ForkJoinWork(middle?+?1,?end);
????????????left.fork();//拆分,并壓入線程隊列
????????????//合并
????????????return?right.join()?+?left.join();
????????}
????}
}
測試:
public?class?ForkJoinWorkTest?{
????@Test
????public?void?test()?{
????????//ForkJoin實現(xiàn)
????????long?l?=?System.currentTimeMillis();
????????ForkJoinPool?forkJoinPool?=?new?ForkJoinPool();//實現(xiàn)ForkJoin?就必須有ForkJoinPool的支持
????????ForkJoinTask?task?=?new?ForkJoinWork(0L,?10000000000L);//參數(shù)為起始值與結(jié)束值
????????Long?invoke?=?forkJoinPool.invoke(task);
????????long?l1?=?System.currentTimeMillis();
????????System.out.println("invoke?=?"?+?invoke?+?"??time:?"?+?(l1?-?l));
????????//invoke?=?-5340232216128654848??time:?56418
????????//ForkJoinWork?forkJoinWork?=?new?ForkJoinWork(0L,?10000000000L);
????}
????@Test
????public?void?test2()?{
????????//普通線程實現(xiàn)
????????Long?x?=?0L;
????????Long?y?=?10000000000L;
????????long?l?=?System.currentTimeMillis();
????????for?(Long?i?=?0L;?i?<=?y;?i++)?{
????????????x?+=?i;
????????}
????????long?l1?=?System.currentTimeMillis();
????????System.out.println("invoke?=?"?+?x?+?"??time:?"?+?(l1?-?l));
????????//invoke?=?-5340232216128654848??time:?64069
????}
????@Test
????public?void?test3()?{
????????//Java?8?并行流的實現(xiàn)
????????long?l?=?System.currentTimeMillis();
????????long?reduce?=?LongStream.rangeClosed(0,?10000000000L).parallel().reduce(0,?Long::sum);
????????long?l1?=?System.currentTimeMillis();
????????System.out.println("invoke?=?"?+?reduce?+?"??time:?"?+?(l1?-?l));
????????//invoke?=?-5340232216128654848??time:?2152
????}
}
結(jié)論:Java 8 就為我們提供了一個并行流來實現(xiàn)ForkJoin實現(xiàn)的功能??梢钥吹讲⑿辛鞅茸约簩崿F(xiàn)ForkJoin還要快。
Java 8 中將并行流進(jìn)行了優(yōu)化,我們可以很容易的對數(shù)據(jù)進(jìn)行并行流的操作,Stream API可以聲明性的通過parallel()與sequential()在并行流與串行流中隨意切換!
核心組件
F/J框架的實現(xiàn)非常復(fù)雜,內(nèi)部大量運用了位操作和無鎖算法,撇開這些實現(xiàn)細(xì)節(jié)不談,該框架主要涉及三大核心組件:ForkJoinPool(線程池)、ForkJoinTask(任務(wù))、ForkJoinWorkerThread(工作線程),外加WorkQueue(任務(wù)隊列):
ForkJoinPool:ExecutorService的實現(xiàn)類,負(fù)責(zé)工作線程的管理、任務(wù)隊列的維護(hù),以及控制整個任務(wù)調(diào)度流程; ForkJoinTask:Future接口的實現(xiàn)類,fork是其核心方法,用于分解任務(wù)并異步執(zhí)行;而join方法在任務(wù)結(jié)果計算完畢之后才會運行,用來合并或返回計算結(jié)果; ForkJoinWorkerThread:Thread的子類,作為線程池中的工作線程(Worker)執(zhí)行任務(wù); WorkQueue:任務(wù)隊列,用于保存任務(wù);
ForkJoinPool
ForkJoinPool作為Executors框架的一員,從外部看與其它線程池并沒有什么區(qū)別,僅僅是ExecutorService的一個實現(xiàn)類:
ForkJoinPool的主要工作如下:
接受外部任務(wù)的提交(外部調(diào)用ForkJoinPool的invoke/execute/submit方法提交任務(wù)); 接受ForkJoinTask自身fork出的子任務(wù)的提交; 任務(wù)隊列數(shù)組(WorkQueue[])的初始化和管理;工作線程(Worker)的創(chuàng)建/管理。
注意:ForkJoinPool提供了3類外部提交任務(wù)的方法:invoke、execute、submit,它們的主要區(qū)別在于任務(wù)的執(zhí)行方式上。
通過invoke方法提交的任務(wù),調(diào)用線程直到任務(wù)執(zhí)行完成才會返回,也就是說這是一個 同步方法,且有返回結(jié)果;通過execute方法提交的任務(wù),調(diào)用線程會立即返回,也就是說這是一個 異步方法,且沒有返回結(jié)果;通過submit方法提交的任務(wù),調(diào)用線程會立即返回,也就是說這是一個 異步方法,且有返回結(jié)果(返回Future實現(xiàn)類,可以通過get獲取結(jié)果)。
注意:ForkJoinPool支持兩種模式:
同步模式(默認(rèn)方式) 異步模式
這里的同步/異步并不是指F/J框架本身是采用同步模式還是采用異步模式工作,而是指其中的工作線程的工作方式。在F/J框架中,每個工作線程(Worker)都有一個屬于自己的任務(wù)隊列(WorkQueue),這是一個底層采用數(shù)組實現(xiàn)的雙向隊列。同步是指:對于工作線程(Worker)自身隊列中的任務(wù),采用后進(jìn)先出(LIFO)的方式執(zhí)行;異步是指:對于工作線程(Worker)自身隊列中的任務(wù),采用先進(jìn)先出(FIFO)的方式執(zhí)行
ForkJoinTask
從Fork/Join框架的描述上來看,“任務(wù)”必須要滿足一定的條件:
支持Fork,即任務(wù)自身的分解 支持Join,即任務(wù)結(jié)果的合并
因此JUC提供了一個抽象類 ForkJoinTask,來作為該類Fork/Join任務(wù)的抽象定義
ForkJoinTask實現(xiàn)了Future接口,是一個異步任務(wù),我們在使用Fork/Join框架時,一般需要使用線程池來調(diào)度任務(wù),線程池內(nèi)部調(diào)度的其實都是ForkJoinTask任務(wù)(即使提交的是一個Runnable或Callable任務(wù),也會被適配成ForkJoinTask)。除了ForkJoinTask,F(xiàn)ork/Join框架還提供了兩個它的抽象實現(xiàn),我們在自定義ForkJoin任務(wù)時,一般繼承這兩個類:
RecursiveAction:表示具有返回結(jié)果的ForkJoin任務(wù) RecursiveTask:表示沒有返回結(jié)果的ForkJoin任務(wù)
ForkJoinWorkerThread
Fork/Join框架中,每個工作線程(Worker)都有一個自己的任務(wù)隊列(WorkerQueue), 所以需要對一般的Thread做些特性化處理,J.U.C提供了ForkJoinWorkerThread類作為ForkJoinPool中的工作線程:
public?class?ForkJoinWorkerThread?extends?Thread?{
????
????final?ForkJoinPool?pool;????????????????????//?該工作線程歸屬的線程池
????final?ForkJoinPool.WorkQueue?workQueue;?????//?對應(yīng)的任務(wù)隊列
?
????protected?ForkJoinWorkerThread(ForkJoinPool?pool)?{
????????super("aForkJoinWorkerThread");?????????//?指定工作線程名稱
????????this.pool?=?pool;
????????this.workQueue?=?pool.registerWorker(this);
????}
??
????//?...
}
ForkJoinWorkerThread 在構(gòu)造過程中,會。同時,它會通過ForkJoinPool的registerWorker方保存所屬線程池信息和與自己綁定的任務(wù)隊列信息法將自己注冊到線程池中。
WorkQueue
任務(wù)隊列(WorkQueue)是ForkJoinPool與其它線程池區(qū)別最大的地方,在ForkJoinPool內(nèi)部,維護(hù)著一個WorkQueue[]數(shù)組,它會在外部首次提交任務(wù))時進(jìn)行初始化:
volatile?WorkQueue[]?workQueues;?//?main?registry
當(dāng)通過線程池的外部方法(submit、invoke、execute)提交任務(wù)時,如果WorkQueue[]沒有初始化,則會進(jìn)行初始化;然后根據(jù)數(shù)組大小和線程隨機(jī)數(shù)(ThreadLocalRandom.probe)等信息,計算出任務(wù)隊列所在的數(shù)組索引(這個索引一定是偶數(shù)),如果索引處沒有任務(wù)隊列,則初始化一個,再將任務(wù)入隊。也就是說,通過外部方法提交的任務(wù)一定是在偶數(shù)隊列,沒有綁定工作線程。WorkQueue作為ForkJoinPool的內(nèi)部類,表示一個雙端隊列雙端隊列,既可以作為棧使用(LIFO),也可以作為隊列使用(FIFO)。ForkJoinPool的“工作竊取”正是利用了這個特點,當(dāng)工作線程從自己的隊列中獲取任務(wù)時,默認(rèn)總是以棧操作(LIFO)的方式從棧頂取任務(wù);當(dāng)工作線程嘗試竊取其它任務(wù)隊列中的任務(wù)時,則是FIFO的方式。
線程池中的每個工作線程(ForkJoinWorkerThread)都有一個自己的任務(wù)隊列(WorkQueue),工作線程優(yōu)先處理自身隊列中的任務(wù)(LIFO或FIFO順序,由線程池構(gòu)造時的參數(shù) mode 決定),自身隊列為空時,以FIFO的順序隨機(jī)竊取其它隊列中的任務(wù)。
F/J框架的核心來自于它的工作竊取及調(diào)度策略,可以總結(jié)為以下幾點:
每個Worker線程利用它自己的任務(wù)隊列維護(hù)可執(zhí)行任務(wù); 任務(wù)隊列是一種雙端隊列,支持LIFO的push和pop操作,也支持FIFO的take操作; 任務(wù)fork的子任務(wù),只會push到它所在線程(調(diào)用fork方法的線程)的隊列; 工作線程既可以使用LIFO通過pop處理自己隊列中的任務(wù),也可以FIFO通過poll處理自己隊列中的任務(wù),具體取決于構(gòu)造線程池時的asyncMode參數(shù); 當(dāng)工作線程自己隊列中沒有待處理任務(wù)時,它嘗試去隨機(jī)讀?。ǜ`?。┢渌蝿?wù)隊列的base端的任務(wù); 當(dāng)線程進(jìn)入join操作,它也會去處理其它工作線程的隊列中的任務(wù)(自己的已經(jīng)處理完了),直到目標(biāo)任務(wù)完成(通過isDone方法); 當(dāng)一個工作線程沒有任務(wù)了,并且嘗試從其它隊列竊取也失敗了,它讓出資源(通過使用yields, sleeps或者其它優(yōu)先級調(diào)整)并且隨后會再次激活,直到所有工作線程都空閑了——此時,它們都阻塞在等待另一個頂層線程的調(diào)用。
CountDownLatch
CountDownLatch是一個非常實用的多線程控制工具類,可以簡單聯(lián)想到下課倒計時一起開飯,百米賽跑一起跑。常用的就下面幾個方法:
CountDownLatch(int?count)?//實例化一個倒計數(shù)器,count指定計數(shù)個數(shù)
countDown()?//?計數(shù)減一
await()?//等待,當(dāng)計數(shù)減到0時,所有線程并行執(zhí)行
CountDownLatch在我們工作的多個場景被使用,算是用的很頻繁的了,比如我們的API接口響應(yīng)時間被要求在200ms以內(nèi),但是如果一個接口內(nèi)部依賴多個三方/外部服務(wù),那串行調(diào)用接口的RT必然很久,所以個人用的最多的是接口RT優(yōu)化場景,內(nèi)部服務(wù)并行調(diào)用。
對于倒計數(shù)器,一種典型的場景就是火箭發(fā)射。在火箭發(fā)射前,為了保證萬無一失,往往還要進(jìn)行各項設(shè)備、儀器的檢測。只有等到所有的檢查完畢后,引擎才能點火。那么在檢測環(huán)節(jié)當(dāng)然是多個檢測項可以的。同時進(jìn)行代碼:
/**
?*?@Description:?倒計時器示例:火箭發(fā)射
?*/
public?class?CountDownLatchDemo?implements?Runnable{
????static?final?CountDownLatch?latch?=?new?CountDownLatch(10);
????static?final?CountDownLatchDemo?demo?=?new?CountDownLatchDemo();
????@Override
????public?void?run()?{
????????//?模擬檢查任務(wù)
????????try?{
????????????Thread.sleep(new?Random().nextInt(10)?*?1000);
????????????System.out.println("檢查完畢");
????????}?catch?(InterruptedException?e)?{
????????????e.printStackTrace();
????????}?finally?{
????????????//計數(shù)減一
????????????//放在finally避免任務(wù)執(zhí)行過程出現(xiàn)異常,導(dǎo)致countDown()不能被執(zhí)行
????????????latch.countDown();
????????}
????}
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????ExecutorService?exec?=?Executors.newFixedThreadPool(10);
????????for?(int?i=0;?i<10;?i++){
????????????exec.submit(demo);
????????}
????????//?等待檢查
????????latch.await();?//?外部主線程main 方法來等待下面運行?。?!
????????//?發(fā)射火箭
????????System.out.println("Fire!");
????????//?關(guān)閉線程池
????????exec.shutdown();
????}
}
上述代碼中我們先生成了一個CountDownLatch實例。計數(shù)數(shù)量為10,這表示需要有10個線程來完成任務(wù),等待在CountDownLatch上的線程才能繼續(xù)執(zhí)行。latch.countDown(); 方法作用是通知CountDownLatch有一個線程已經(jīng)準(zhǔn)備完畢,倒計數(shù)器可以減一了。atch.await()方法要求主線程等待所有10個檢查任務(wù)全部準(zhǔn)備好才一起并行執(zhí)行。latch.countDown()的調(diào)用不一定非要開啟線程執(zhí)行,即使你在主線程中下面這樣寫效果也是一樣。
?for?(int?i?=?0;?i?10;?i++)?{
?????countDownLatch.countDown();
?}
CyclicBarrier
這個類的中文意思是循環(huán)柵欄。大概的意思就是一個可循環(huán)利用的屏障。它的作用就是會讓所有線程都等待完成后才會繼續(xù)下一步行動。舉個例子,就像生活中我們會約朋友們到某個餐廳一起吃飯,有些朋友可能會早到,有些朋友可能會晚到,但是這個餐廳規(guī)定必須等到所有人到齊之后才會讓我們進(jìn)去。這里的朋友們就是各個線程,餐廳就是 CyclicBarrier。構(gòu)造方法
public?CyclicBarrier(int?parties)
public?CyclicBarrier(int?parties,?Runnable?barrierAction)
parties 是參與線程的個數(shù) 第二個構(gòu)造方法有一個 Runnable 參數(shù),這個參數(shù)的意思是到達(dá)線程
最后一個要做的任務(wù)
重要方法:
public?int?await()?throws?InterruptedException,?BrokenBarrierException
public?int?await(long?timeout,?TimeUnit?unit)?throws?InterruptedException,?BrokenBarrierException,?TimeoutException
線程調(diào)用 await() 表示自己已經(jīng)到達(dá)柵欄 BrokenBarrierException 表示柵欄已經(jīng)被破壞,破壞的原因可能是其中一個線程 await() 時被中斷或者超時
demo:一個線程組的線程需要等待所有線程完成任務(wù)后再繼續(xù)執(zhí)行下一次任務(wù)
public?class?CyclicBarrierTest?{
????public?static?void?main(String[]?args)?{
????????//定義一個計數(shù)器,當(dāng)計數(shù)器的值累加到30,輸出"放行"
????????CyclicBarrier?cyclicBarrier?=?new?CyclicBarrier(30,()->{
????????????System.out.println("放行");
????????});
????????for?(int?i?=?1;?i?<=?90;?i++)?{
????????????final?int?temp?=?i;
????????????new?Thread(()->{
????????????????System.out.println("-->"+temp);
????????????????try?{
????????????????????cyclicBarrier.await();
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?catch?(BrokenBarrierException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}).start();
????????}
????}
}
上面的結(jié)果會出現(xiàn)3次放行哦。
CyclicBarrier 與 CountDownLatch 區(qū)別
1、CountDownLatch 是一次性的,CyclicBarrier 是可循環(huán)利用的
2、CountDownLatch 參與的線程的職責(zé)是不一樣的,有的在倒計時,有的在等待倒計時結(jié)束。
3、CyclicBarrier 參與的線程職責(zé)是一樣的。CountDownLatch 做減法計算,count=0,喚醒阻塞線程,CyclicBarrier 做加法計算,count=屏障值(parties),喚醒阻塞線程。
4、最重要:CountDownLatch的放行由第三者控制,CyclicBarrier是由一組線程本身來控制的, CountDownLatch放行條件>=線程數(shù)。CyclicBarrier放行條件=線程數(shù)。
Semaphore
用途:控制同時訪問某個特定資源的線程數(shù)據(jù),用來流量控制。一個超市只能容納5個人購物,其余人排隊。
public?class?SemaphoreTest?{
????public?static?void?main(String[]?args)?{
????????//同時只能進(jìn)5個人
????????Semaphore?semaphore?=?new?Semaphore(5);
????????for?(int?i?=?0;?i?15;?i++)?{
????????????new?Thread(()?->?{
????????????????try?{
????????????????????//獲得許可
????????????????????semaphore.acquire();?//?已經(jīng)進(jìn)店人+1
????????????????????System.out.println(Thread.currentThread().getName()?+?"進(jìn)店購物");
????????????????????TimeUnit.SECONDS.sleep(5);
????????????????????System.out.println(Thread.currentThread().getName()?+?"出店");
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}?finally?{
????????????????????//釋放許可
????????????????????semaphore.release();?//已經(jīng)進(jìn)店人?-1?
????????????????}
????????????},?String.valueOf(i)).start();
????????}
????}
}
實現(xiàn)數(shù)據(jù)庫連接池 數(shù)據(jù)庫連接實現(xiàn):
public?class?SqlConnectImpl?implements?Connection{
?
?/*拿一個數(shù)據(jù)庫連接*/
????public?static?final?Connection?fetchConnection(){
????????return?new?SqlConnectImpl();
????}
}
連接池的實現(xiàn):
public?class?DBPoolSemaphore?{
????private?final?static?int?POOL_SIZE?=?10;
????private?final?Semaphore?useful,?useless;//useful表示可用的數(shù)據(jù)庫連接,useless表示已用的數(shù)據(jù)庫連接
????public?DBPoolSemaphore()?{
????????this.useful?=?new?Semaphore(POOL_SIZE);
????????this.useless?=?new?Semaphore(0);
????}
????//存放數(shù)據(jù)庫連接的容器
????private?static?LinkedList?pool?=?new?LinkedList();
????//初始化池
????static?{
????????for?(int?i?=?0;?i?????????????pool.addLast(SqlConnectImpl.fetchConnection());
????????}
????}
????/*歸還連接*/
????public?void?returnConnect(Connection?connection)?throws?InterruptedException?{
????????if?(connection?!=?null)?{
????????????System.out.println("當(dāng)前有"?+?useful.getQueueLength()?+?"個線程等待數(shù)據(jù)庫連接?。?
????????????????????+?"可用連接數(shù):"?+?useful.availablePermits());
????????????useless.acquire();//?可用連接?+1
????????????synchronized?(pool)?{
????????????????pool.addLast(connection);
????????????}
????????????useful.release();?//?已用連接?-1
????????}
????}
????/*從池子拿連接*/
????public?Connection?takeConnect()?throws?InterruptedException?{
????????useful.acquire();?//?可用連接-1
????????Connection?conn;
????????synchronized?(pool)?{
????????????conn?=?pool.removeFirst();
????????}
????????useless.release();?//?以用連接+1
????????return?conn;
????}
}
測試代碼:
public?class?AppTest?{
????private?static?DBPoolSemaphore?dbPool?=?new?DBPoolSemaphore();
????//業(yè)務(wù)線程
????private?static?class?BusiThread?extends?Thread?{
????????@Override
????????public?void?run()?{
????????????Random?r?=?new?Random();//讓每個線程持有連接的時間不一樣
????????????long?start?=?System.currentTimeMillis();
????????????try?{
????????????????Connection?connect?=?dbPool.takeConnect();
????????????????System.out.println("Thread_"?+?Thread.currentThread().getId()
????????????????????????+?"_獲取數(shù)據(jù)庫連接共耗時【"?+?(System.currentTimeMillis()?-?start)?+?"】ms.");
????????????????SleepTools.ms(100?+?r.nextInt(100));//模擬業(yè)務(wù)操作,線程持有連接查詢數(shù)據(jù)
????????????????System.out.println("查詢數(shù)據(jù)完成,歸還連接!");
????????????????dbPool.returnConnect(connect);
????????????}?catch?(InterruptedException?e)?{
????????????}
????????}
????}
????public?static?void?main(String[]?args)?{
????????for?(int?i?=?0;?i?50;?i++)?{
????????????Thread?thread?=?new?BusiThread();
????????????thread.start();
????????}
????}
}
Exchange
兩個線程間的數(shù)據(jù)交換,局限性比較大。Exchange是 阻塞形式的,兩個線程要都到達(dá)執(zhí)行Exchange函數(shù)才會交換。
public?class?UseExchange?{
????private?static?final?Exchanger>?exchange
????????????=?new?Exchanger>();
????public?static?void?main(String[]?args)?{
????????//第一個線程
????????new?Thread(new?Runnable()?{
????????????@Override
????????????public?void?run()?{
????????????????Set?setA?=?new?HashSet();//存放數(shù)據(jù)的容器
????????????????try?{
????????????????????setA.add("liu");
????????????????????setA.add("Liu");
????????????????????setA.add("LIU");
????????????????????setA?=?exchange.exchange(setA);//交換set
????????????????????/*處理交換后的數(shù)據(jù)*/
????????????????}?catch?(InterruptedException?e)?{
????????????????}
????????????}
????????}).start();
????????//第二個線程
????????new?Thread(new?Runnable()?{
????????????@Override
????????????public?void?run()?{
????????????????Set?setB?=?new?HashSet();//存放數(shù)據(jù)的容器
????????????????try?{
????????????????????setB.add("jin");
????????????????????setB.add("Jie");
????????????????????setB.add("JIN");
????????????????????setB?=?exchange.exchange(setB);//交換set
????????????????????/*處理交換后的數(shù)據(jù)*/
????????????????}?catch?(InterruptedException?e)?{
????????????????}
????????????}
????????}).start();
????}
}
Callable,Future,FutureTask
這三個組合使用,一般我們可以將耗時任務(wù)用子線程去執(zhí)行,同時執(zhí)行我們自己的主線程任務(wù)。主線程執(zhí)行任務(wù)完畢后再調(diào)Future.get()來獲得子線程任務(wù)。
說明:
Callable有返回值可拋出異常,其中返回值有Future獲得。Future 獲得返回值。FutureTask實現(xiàn)Future跟Runnable。1.7前用AQS實現(xiàn)的,1.8以后不再是。
Future主要函數(shù)功能:
isDone,結(jié)束,正常還是異常結(jié)束,或者自己取消,都返回true; isCancelled 任務(wù)完成前被取消,返回true; cancel(boolean):
任務(wù)還沒開始,返回false 任務(wù)已經(jīng)啟動,cancel(true) 中斷正在運行的任務(wù),中斷成功,返回true cancel(false),不會去中斷已經(jīng)運行的任務(wù) 任務(wù)已經(jīng)結(jié)束,返回false
Future樣例:
public?class?UseFuture?{
????/*實現(xiàn)Callable接口,允許有返回值*/
????private?static?class?UseCallable?implements?Callable<Integer>?{
????????private?int?sum;
????????@Override
????????public?Integer?call()?throws?Exception?
????????????System.out.println("Callable子線程開始計算");
????????????Thread.sleep(2000);
????????????for?(int?i?=?0;?i?5000;?i++)?{
????????????????sum?=?sum?+?i;
????????????}
????????????System.out.println("Callable子線程計算完成,結(jié)果="?+?sum);
????????????return?sum;
????????}
????}
????public?static?void?main(String[]?args)?throws?InterruptedException,?ExecutionException?{
????????UseCallable?useCallable?=?new?UseCallable();
????????FutureTask?futureTask?=?new?FutureTask(useCallable);
????????new?Thread(futureTask).start();
????????Random?r?=?new?Random();
????????SleepTools.second(1);
????????if?(r.nextBoolean())?{??//?方法調(diào)用返回下一個偽均勻分布的boolean值
????????????System.out.println("Get?UseCallable?result?=?"?+?futureTask.get());
????????}?else?{
????????????System.out.println("中斷計算");
????????????futureTask.cancel(true);
????????}
????}
}


近期精彩內(nèi)容推薦:??


