<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          快速上手JUC下常見并發(fā)容器

          共 10603字,需瀏覽 22分鐘

           ·

          2020-12-30 13:57

          Python實戰(zhàn)社群

          Java實戰(zhàn)社群

          長按識別下方二維碼,按需求添加

          掃碼關(guān)注添加客服

          進(jìn)Python社群▲

          掃碼關(guān)注添加客服

          進(jìn)Java社群


          作者丨sowhat1412

          來源丨sowhat1412


          多線程環(huán)境下Java提供的一些簡單容器都無法使用了,此時要用到JUC中的容器,由于?ConcurrentHashMap?是高頻考點,用到也比較多因此著重寫過了,其余的容器就看今天咯。

          跳表知識點

          簡而言之跳表就是多層鏈表的結(jié)合體,跳表分為許多層(level),每一層都可以看作是數(shù)據(jù)的索引,這些索引的意義就是加快跳表查找數(shù)據(jù)速度。每一層的數(shù)據(jù)都是有序的,上一層數(shù)據(jù)是下一層數(shù)據(jù)的子集,并且第一層(level 1)包含了全部的數(shù)據(jù);層次越高,跳躍性越大,包含的數(shù)據(jù)越少。并且隨便插入一個數(shù)據(jù)該數(shù)據(jù)是否會是跳表索引完全隨機(jī)的跟玩骰子一樣,redis中的zset底層就是跳表數(shù)據(jù)結(jié)構(gòu)。并且跳表的速度幾乎接近紅黑樹了。跳表包含一個表頭,它查找數(shù)據(jù)時,是從上往下,從左往右進(jìn)行查找?,F(xiàn)在“需要找出值為37的節(jié)點”為例,來對比說明跳表和普遍的鏈表。

          1. 沒有跳表查詢 比如我查詢數(shù)據(jù)37,如果沒有上面的索引時候路線如下圖:
          2. 有跳表查詢 有跳表查詢37的時候路線如下圖:延伸思考:既然跳表實現(xiàn)簡單速度挺好ConcurrentHashMap為什么不直接用跳表用紅黑樹?先說下在HashMap中一般空間利用率就在40%作用,而ConcurrentHashMap空間利用率只能達(dá)到10%~20%。如果這個時候再不節(jié)省空間還用跳表替換紅黑樹。那么就涼涼了。

          ConcurrentSkipListMap

          我們在存儲 kv 的時候一般有三種容器可以使用,TreeMap、ConcurrentSkipListMap、HashMap三種容器。其中TreeMap可以理解為紅黑樹在Java中的具體實現(xiàn)(紅黑樹、2-3-4樹也是賊好玩的一個知識點,懶的寫了,如果讀者想看再寫不遲)。ConcurrentSkipListMap主要就是利用跳表的思維來實現(xiàn)速度的提升,他們區(qū)別跟性能對比如下:

          1. TreeMap基于紅黑樹(平衡二叉查找樹)實現(xiàn)的,時間復(fù)雜度平均能達(dá)到O(log n),多線程不安全。
          2. HashMap是基于散列表實現(xiàn)的,時間復(fù)雜度平均能達(dá)到O(1),多線程不安全。
          3. ConcurrentSkipListMap是基于跳表實現(xiàn)的,時間復(fù)雜度平均能達(dá)到O(log n),多線程安全。
          4. 紅黑樹涉及各種旋轉(zhuǎn)操作比較復(fù)雜,HashMap底層數(shù)組+ 鏈表+ 紅黑樹,跳表實現(xiàn)起來就很簡單了。

          結(jié)論:

          1. 當(dāng)數(shù)據(jù)量增加時,HashMap會引起散列沖突,解決沖突需要多花費一些時間代價,故在f(n)=1向上浮動。隨著數(shù)據(jù)量的增加,HashMap的時間花費小且穩(wěn)定,充分秉承著空間換時間的思想,在單線程的環(huán)境下比TreeMap和ConcurrentSkipListMap在插入和查找上有很大的優(yōu)勢。
          2. 如果必須有序且多線程就用ConcurrentSkipListMap,如果單線程不需要考慮是否有序就用HashMap。

          其中ConcurrentSkipListMap基礎(chǔ)結(jié)構(gòu)圖如下:

          ConcurrentSkipListSet

          Set是一個無序的數(shù)據(jù)集合,TreeSet的底層是通過TreeMap實現(xiàn)的,思想其實跟HashMapHashSet類似,TreeSet就是只有Key的TreeMap 。TreeSet是通過紅黑樹來實現(xiàn)的速度可達(dá)到O(log n)但是線程也是不安全的。ConcurrentSkipListSet是基于跳表實現(xiàn)的線程安全的ListSet。

          ConcurrentLinkedQueue

          可以認(rèn)為是LinkedList的多線程安全升級版。一個基于鏈表節(jié)點的無界線程安全隊列。此隊列按照 FIFO原則對元素進(jìn)行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列獲取操作從隊列頭部獲得元素。當(dāng)多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當(dāng)?shù)倪x擇,底層用了很多sun.misc.Unsafe UNSAFE硬件級別的原子操作。此隊列不允許使用 null 元素。

          1. offer(E e) ?:將指定元素插入此隊列的尾部。
          2. add(E e): 跟offer 功能一樣將指定元素插入此隊列的尾部, add方法體調(diào)用的就是offer.
          3. poll() : 獲取并移除此隊列的頭,如果此隊列為空,則返回 null
          4. peek() : 獲取但不移除此隊列的頭,如果此隊列為空,則返回 null
          5. remove(Object o) : 從隊列中移除指定元素的單個實例(如果存在)

          CopyOnWriteArrayList

          CopyOnWrite 寫時復(fù)制的容器。通俗的理解是當(dāng)我們往一個容器添加元素的時候,不直接往當(dāng)前容器添加,而是先將當(dāng)前容器進(jìn)行Copy,復(fù)制出一個新的容器,然后新的容器里添加元素,添加完元素之后,再將原容器的引用指向新的容器。這樣做的好處是我們可以對容器進(jìn)行并發(fā)的,而不需要加鎖,因為當(dāng)前容器不會添加任何元素,但是寫的時候還是要鎖的!所以寫時復(fù)制容器也是一種讀寫分離的思想,讀和寫不同的容器。如果讀的時候有多個線程正在向容器添加數(shù)據(jù),讀還是會讀到舊的數(shù)據(jù),因為寫的時候不會鎖住舊的,只能保證最終一致性Redis中執(zhí)行bgsave時候就是用的此機(jī)制。這種機(jī)制寫一次就要copy一份。多個線程要執(zhí)行寫操作必須等上一個線程執(zhí)行完畢。如果用讀寫鎖我在寫的時候你是無法讀的,鎖無法降級的。 CopyOnWriteArrayList底層用的ReentrantLock()來實現(xiàn)加鎖,這又印證了AQS占據(jù)JUC半壁江山。

          優(yōu)點

          對于一些讀多寫少的數(shù)據(jù),這種做法的確很不錯,例如配置、黑名單、物流地址等變化非常少的數(shù)據(jù),這是一種無鎖的實現(xiàn)??梢詭臀覀儗崿F(xiàn)程序更高的并發(fā)。

          缺點

          這種實現(xiàn)只是保證數(shù)據(jù)的最終一致性,在添加到拷貝數(shù)據(jù)而還沒進(jìn)行替換的時候,讀到的仍然是舊數(shù)據(jù)。如果對象比較大,頻繁地進(jìn)行替換會消耗內(nèi)存,從而引發(fā)Java的GC問題,這個時候,我們應(yīng)該考慮其他的容器,例如ConcurrentHashMap。

          CopyOnWriteArraySet

          CopyOnWriteArraySet是基于CopyOnWriteArrayList實現(xiàn)的,只有add的方法稍微有些不同,因為CopyOnWriteArraySetSet也就是不能有重復(fù)的元素,故在CopyOnWriteArraySet中用了addIfAbsent(e)這樣的方法。

          BlockingQueue

          在JUC包中BlockingQueue很好的解決了多線程中,如何高效安全傳輸數(shù)據(jù)的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利。BlockingQueue即阻塞隊列,它是基于 ReentrantLock 實現(xiàn)的,BlockingQueue阻塞隊列的概念:

          1. 當(dāng)隊列滿的時候,插入元素的線程被阻塞,直達(dá)隊列不滿。
          2. 隊列為空的時候,獲取元素的線程被阻塞,直到隊列不空。

          生產(chǎn)者和消費者模式概念:

          1、生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。
          2、在多線程開發(fā)中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。
          3、為了解決這種生產(chǎn)消費能力不均衡的問題,便有了生產(chǎn)者和消費者模式。生產(chǎn)者和消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強(qiáng)耦合問題。生產(chǎn)者和消費者彼此之間不直接通信,而是通過阻塞隊列來進(jìn)行通信,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。

          BlockingQueue是個接口,主要又有若干方法。常用方法:

          方法拋出異常返回值一直阻塞超時退出
          插入方法addofferputoffer(time)
          移除方法removepolltakepoll(time)
          檢查 方法elementpeekN/AN/A

          1、add(E e):在不違反容量限制的情況下,可立即將指定元素插入此隊列,成功返回true,當(dāng)無可用空間時候,返回IllegalStateException異常。
          2offer(E e):在不違反容量限制的情況下,可立即將指定元素插入此隊列,成功返回true,當(dāng)無可用空間時候,返回false。
          3、put(E e):直接在隊列中插入元素,當(dāng)無可用空間時候,阻塞等待。
          4、offer(E e, long time, timeunit unit):將給定元素在給定的時間內(nèi)設(shè)置到隊列中,如果設(shè)置成功返回true, 否則返回false。
          5、?E take():獲取并移除隊列頭部的元素,無元素時候阻塞等待。
          6、?E poll( long time, timeunit unit):獲取并移除隊列頭部的元素,無元素時候阻塞等待指定時間。
          7、remove(Object o) :若隊列為空,拋出NoSuchElementException異常
          8、E poll():若隊列為空,返回null

          BlockingQueue是一個接口,它的實現(xiàn)類有ArrayBlockingQueue、LinkedBlockingDeque、PriorityBlockingQueue、DelayQueue、SynchronousQueue 、LinkedTransferQueueLinkedBlockingQueue等,它們的區(qū)別主要體現(xiàn)在存儲結(jié)構(gòu)上或?qū)υ夭僮魃系牟煌菍τ趖ake與put操作的原理,卻是類似的。

          ArrayBlockingQueue

          一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列,按照先進(jìn)先出原則,其中有界也就意味著,它不能夠存儲無限多數(shù)量的對象,要求設(shè)定初始大小。數(shù)組類型:

          /**?The?queued?items?*/
          final?Object[]?items;

          唯一全局鎖

          //?這是一個掌管所有訪問操作的鎖。全局共享。都會使用這個鎖。
          final?ReentrantLock?lock;

          兩個等待隊列

          ????/**?Condition?for?waiting?takes?*/
          ????private?final?Condition?notEmpty;
          ????/**?Condition?for?waiting?puts?*/
          ????private?final?Condition?notFull;

          put 方法

          ????public?void?put(E?e)?throws?InterruptedException?{
          ????????checkNotNull(e);
          ????????final?ReentrantLock?lock?=?this.lock;?//?唯一鎖
          ????????lock.lockInterruptibly();//?加鎖
          ????????try?{
          ????????????while?(count?==?items.length)
          ????????????????notFull.await();//await?讓出操作權(quán)
          ????????????enqueue(e);//?被喚醒就加入隊列。
          ????????}?finally?{
          ????????????lock.unlock();//?解鎖
          ????????}
          ????}

          take方法

          ????public?E?take()?throws?InterruptedException?{
          ????????final?ReentrantLock?lock?=?this.lock;?//?加鎖
          ????????lock.lockInterruptibly();
          ????????try?{
          ????????????while?(count?==?0)//為空則釋放當(dāng)前鎖
          ????????????????notEmpty.await();
          ????????????return?dequeue();//?獲得鎖被喚醒了則返回數(shù)據(jù)
          ????????}?finally?{
          ????????????lock.unlock();//?釋放鎖
          ????????}
          ????}
          LinkedBlockingQueue

          LinkedBlockingQueue是一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列,按照先進(jìn)先出原則,可以不設(shè)定初始大小,默認(rèn)Integer.MAX_VALUE,為了避免隊列過大造成機(jī)器負(fù)載或者內(nèi)存爆滿的情況出現(xiàn),在使用的時候一般建議手動傳一個隊列的大小。

          //節(jié)點類,用于存儲數(shù)據(jù)
          static?class?Node<E>?{
          ????E?item;
          ????Node?next;
          ????Node(E?x)?{?item?=?x;?}
          }

          //?阻塞隊列的大小,默認(rèn)為Integer.MAX_VALUE?
          private?final?int?capacity;
          //當(dāng)前阻塞隊列中的元素個數(shù)?
          private?final?AtomicInteger?count?=?new?AtomicInteger();
          //?阻塞隊列的頭結(jié)點
          transient?Node?head;
          //?阻塞隊列的尾節(jié)點
          private?transient?Node?last;
          //?獲取并移除元素時使用的鎖,如take,?poll
          private?final?ReentrantLock?takeLock?=?new?ReentrantLock();
          //??notEmpty條件對象,當(dāng)隊列沒有數(shù)據(jù)時用于?掛起?執(zhí)行刪除的線程
          private?final?Condition?notEmpty?=?takeLock.newCondition();
          //?添加元素時使用的鎖如?put,?offer
          private?final?ReentrantLock?putLock?=?new?ReentrantLock();
          //?notFull條件對象,當(dāng)隊列數(shù)據(jù)已滿時用于?掛起?執(zhí)行添加的線程
          private?final?Condition?notFull?=?putLock.newCondition();

          添加到LinkedBlockingQueue隊列中的數(shù)據(jù)都將被封裝成Node節(jié)點,添加的鏈表隊列中,其中head和last分別指向隊列的頭結(jié)點和尾結(jié)點。與ArrayBlockingQueue不同的是,LinkedBlockingQueue內(nèi)部分別使用了takeLockputLock 對并發(fā)進(jìn)行控制,也就是說,添加和刪除操作并不是互斥操作,可以同時進(jìn)行,這樣也就可以大大提高吞吐量。

          ArrayBlockingQueue和LinkedBlockingQueue對比:

          1. 實現(xiàn):ArrayBlockingQueue底層上數(shù)組,ArrayBlockingQueue用Node包裝后的鏈表(包含包裝導(dǎo)致更大更冗余易觸發(fā)GC)
          2. 初始化:ArrayBlockingQueue必須要求有初始值,ArrayBlockingQueue沒有強(qiáng)制要求。
          3. 鎖上:ArrayBlockingQueue實現(xiàn)的隊列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,而ArrayBlockingQueue實現(xiàn)的隊列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高隊列的吞吐量。
          SynchronousQueue

          一個不存儲元素的阻塞隊列。每一個put操作都要等待一個take操作請求才會put數(shù)據(jù)。

          PriorityBlockingQueue

          PriorityBlockingQueue是一個支持優(yōu)先級的無界阻塞隊列,直到系統(tǒng)資源耗盡。默認(rèn)情況下元素采用自然順序升序排列。也可以自定義類繼承comparable實現(xiàn)compareTo()方法來指定元素排序規(guī)則,或者初始化PriorityBlockingQueue時,指定構(gòu)造參數(shù)Comparator來對元素進(jìn)行排序。但需要注意的是不能保證同優(yōu)先級元素的順序。PriorityBlockingQueue也是基最小二叉堆實現(xiàn),使用基于CAS實現(xiàn)的自旋鎖來控制隊列的動態(tài)擴(kuò)容,保證了擴(kuò)容操作不會阻塞take操作的執(zhí)行

          LinkedTransferQueue

          LinkedTransferQueue是一個由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue隊列。該類實現(xiàn)了一個TransferQueue接口,相對于其他阻塞隊列,LinkedTransferQueue多了tryTransfertransfer方法。

          public?interface?TransferQueue<E>?extends?BlockingQueue<E>?{
          ????//?如果可能,立即將元素轉(zhuǎn)移給等待的消費者。?
          ????//?更確切地說,如果存在消費者已經(jīng)等待接收它(在 take 或 timed poll(long,TimeUnit)poll)中,則?立即傳送指定?的元素,否則返回 false。
          ????boolean?tryTransfer(E?e);

          ????//?將元素轉(zhuǎn)移給消費者,如果需要的話等待。?
          ????//?更準(zhǔn)確地說,如果存在一個消費者已經(jīng)等待接收它(在 take 或timed poll(long,TimeUnit)poll)中,則立即傳送指定的元素,否則?等待?直到?元素由消費者接收。
          ????void?transfer(E?e)?throws?InterruptedException;

          ????//?上面方法的基礎(chǔ)上設(shè)置超時時間
          ????boolean?tryTransfer(E?e,?long?timeout,?TimeUnit?unit)?throws?InterruptedException;

          ????//?如果至少有一位消費者在等待,則返回?true
          ????boolean?hasWaitingConsumer();

          ????//?返回等待消費者人數(shù)的估計值
          ????int?getWaitingConsumerCount();
          }
          LinkedBlockingDeque

          LinkedBlockingDeque一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列,注意Deque的存在。可以從隊列的頭和尾都可以插入和移除元素,可以實現(xiàn)工作密取,比如ForkJoin底層任務(wù)隊列。方法名帶了first對頭部操作,帶了last從尾部操作。另外方法調(diào)用的時候默認(rèn)

          add=addLast; remove=removeFirst; take=takeFirst

          ??public?boolean?add(E?e)?{
          ????????addLast(e);?//?等價
          ????????return?true;
          ????}
          ????public?E?remove()?{
          ????????return?removeFirst();
          ????}
          ????public?E?take()?throws?InterruptedException?{
          ????????return?takeFirst();
          ????}
          DelayQueue

          DelayQueue 一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列。支持延時獲取的元素的阻塞隊列,元素必須要實現(xiàn)Delayed接口。放入隊列中的元素只有在指定的timeout后才可以取出,也就是說隊列中元素的順序是按到期時間排序的,而非它們進(jìn)入隊列的順序。排在隊列頭部的元素是最早到期的,越往后到期時間赿晚。適用場景:實現(xiàn)自己的緩存系統(tǒng),訂單到期,限時支付等。

          demo 加深印象

          任務(wù):一個訂單系統(tǒng),通過阻塞隊列延時功能實現(xiàn),需要(訂單類,包裝訂單類,生產(chǎn)者,消費者,測試)

          1. 訂單類
          public?class?Order?{
          ?private?final?String?orderNo;//訂單的編號
          ?private?final?double?orderMoney;//訂單的金額
          ?public?Order(String?orderNo,?double?orderMoney)?{
          ??super();
          ??this.orderNo?=?orderNo;
          ??this.orderMoney?=?orderMoney;
          ?}
          ?public?String?getOrderNo()?{
          ??return?orderNo;
          ?}
          ?public?double?getOrderMoney()?{
          ??return?orderMoney;
          ?}
          }
          1. 包裝類
          //?類說明:存放到隊列的元素
          public?class?ItemVo<T>?implements?Delayed?{

          ????private?long?activeTime;//到期時間,單位毫秒
          ????private?T?object;

          ????//activeTime是個過期時長
          ????public?ItemVo(long?activeTime,?T?object)?{
          ????????super();
          ????????this.activeTime?=?TimeUnit.NANOSECONDS.convert(activeTime,?TimeUnit.MILLISECONDS)?+?System.nanoTime();
          ????????//?將傳入的時長轉(zhuǎn)換為超時的時刻
          ????????this.object?=?object;
          ????}

          ????public?T?getObject()?{
          ????????return?object;
          ????}

          ????//按照剩余時間排序
          ????@Override
          ????public?int?compareTo(Delayed?o)?{
          ????????long?d?=?getDelay(TimeUnit.NANOSECONDS)?-?o.getDelay(TimeUnit.NANOSECONDS);
          ????????return?(d?==?0)???0?:?((d?>?0)???1?:?-1);
          ????}

          ????//返回元素的剩余時間
          ????@Override
          ????public?long?getDelay(TimeUnit?unit)?{
          ????????long?d?=?unit.convert(this.activeTime?-?System.nanoTime(),?TimeUnit.NANOSECONDS);
          ????????return?d;
          ????}
          }
          1. 生產(chǎn)者
          public?class?PutOrder?implements?Runnable?{
          ????private?DelayQueue>?queue;
          ????public?PutOrder(DelayQueue>?queue)?{
          ????????super();
          ????????this.queue?=?queue;
          ????}
          ????@Override
          ????public?void?run()?{
          ????????//5秒到期
          ????????Order?ordeTb?=?new?Order("TBSoWhat",?14);
          ????????ItemVo?itemTb?=?new?ItemVo(5000,?ordeTb);
          ????????queue.offer(itemTb);?//插入
          ????????System.out.println("訂單5秒后到期:"?+?ordeTb.getOrderNo());
          ????????//8秒到期
          ????????Order?ordeJd?=?new?Order("JDSoWhat",?12);
          ????????ItemVo?itemJd?=?new?ItemVo(8000,?ordeJd);
          ????????queue.offer(itemJd);//?插入
          ????????System.out.println("訂單8秒后到期:"?+?ordeJd.getOrderNo());
          ????}
          }
          1. 消費者
          public?class?FetchOrder?implements?Runnable?{
          ?private?DelayQueue>?queue;
          ?public?FetchOrder(DelayQueue>?queue)?{
          ??super();
          ??this.queue?=?queue;
          ?}
          ?@Override
          ?public?void?run()?{
          ??while(true)?{
          ???try?{
          ????ItemVo?item?=?queue.take();
          ????Order?order?=?(Order)item.getObject();
          ????System.out.println("get?from?queue:"+order.getOrderNo());
          ???}?catch?(InterruptedException?e)?{
          ????e.printStackTrace();
          ???}
          ??}
          ?}?
          }
          1. 測試延時功能
          public?class?Test
          {
          ?public?static?void?main(String[]?args)?throws?InterruptedException
          ?
          {
          ??DelayQueue>?queue?=?new?DelayQueue<>();
          ??new?Thread(new?PutOrder(queue)).start();
          ??new?Thread(new?FetchOrder(queue)).start();
          ??//每隔1秒,打印個數(shù)字
          ??for?(int?i?=?1;?i?10;?i++)
          ??{
          ???Thread.sleep(1000);
          ???System.out.println(i?*?1000);
          ??}
          ?}
          }


          ForkJoin

          1.Fork/Join流程:

          ForkJoin是一種分治的思想,在1.7中引入JDK中?,F(xiàn)實生活中的快排,隊排,MapReduce都是思想的 實現(xiàn),意思是在必要的情況下,將一個大任務(wù),進(jìn)行拆分(fork) 成若干個子任務(wù)(拆到不能再拆,這里就是指我們制定的拆分的臨界值),再將一個個小任務(wù)的結(jié)果進(jìn)行join匯總。

          2. 工作竊取模式

          從上述Fork/Join框架的描述可以看出,我們需要一些線程來執(zhí)行Fork出的任務(wù),在實際中,如果每次都創(chuàng)建新的線程執(zhí)行任務(wù),對系統(tǒng)資源的開銷會很大,所以Fork/Join框架利用了線程池來調(diào)度任務(wù)。

          另外,這里可以思考一個問題,既然由線程池調(diào)度,根據(jù)我們之前學(xué)習(xí)普通/計劃線程池的經(jīng)驗,必然存在兩個要素:

          工作線程
          任務(wù)隊列

          一般的線程池只有一個任務(wù)隊列,但是對于Fork/Join框架來說,由于Fork出的各個子任務(wù)其實是平行關(guān)系,為了提高效率,減少線程競爭,應(yīng)該將這些平行的任務(wù)放到中去,如上不同的隊列圖中,大任務(wù)分解成三個子任務(wù):子任務(wù)1、子任務(wù)2,那么就創(chuàng)建兩個任務(wù)隊列,然后再創(chuàng)建3個工作線程與隊列一一對應(yīng)。

          那么為什么需要使用工作竊取算法呢?假如我們需要做一個比較大的任務(wù),我們可以把這個任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競爭,于是把這些子任務(wù)分別放到不同的隊列里,并為每個隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務(wù),線程和隊列一一對應(yīng),比如A線程負(fù)責(zé)處理A隊列里的任務(wù)。但是有的線程會先把自己隊列里的任務(wù)干完,而其他線程對應(yīng)的隊列里還有任務(wù)等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務(wù)來執(zhí)行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競爭,通常會使用雙端隊列,被竊取任務(wù)線程永遠(yuǎn)從雙端隊列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊列的尾部拿任務(wù)執(zhí)行滿足這一需求的任務(wù)隊列其實就是JUC框架中介紹過的雙端阻塞隊列 LinkedBlockingDeque。

          工作竊取算法的優(yōu)點是充分利用線程進(jìn)行并行計算,并減少了線程間的競爭,其缺點是在某些情況下還是存在競爭,比如雙端隊列里只有一個任務(wù)時。并且消耗了更多的系統(tǒng)資源,比如創(chuàng)建多個線程和多個雙端隊列。并且在進(jìn)行RR跟上下文切換也會耗時的,所以不一定是多線程就一定 比單線程速度快。彈性而定,看任務(wù)量。

          3. demo演示

          ForkJoin有兩種繼承方式,RecursiveTask有返回值,RecursiveAction無返回值 任務(wù)需求:假設(shè)有個非常大的long[]數(shù)組,通過FJ框架求解數(shù)組所有元素的和。任務(wù)類定義,因為需要返回結(jié)果,所以繼承RecursiveTask,并覆寫compute方法。任務(wù)的fork通過ForkJoinTask的fork方法執(zhí)行,join方法方法用于等待任務(wù)執(zhí)行后返回:

          public?class?ForkJoinWork?extends?RecursiveTask<Long>?{
          ????private?Long?start;//起始值
          ????private?Long?end;//結(jié)束值
          ????public?static?final?Long?critical?=?100000L;//臨界值

          ????public?ForkJoinWork(Long?start,?Long?end)?{
          ????????this.start?=?start;
          ????????this.end?=?end;
          ????}

          ????@Override
          ????protected?Long?compute()?{
          ????????//?return?null;
          ????????//判斷是否是拆分完畢
          ????????Long?lenth?=?end?-?start;???//起始值差值
          ????????if?(lenth?<=?critical)?{
          ????????????//如果拆分完畢就相加
          ????????????Long?sum?=?0L;
          ????????????for?(Long?i?=?start;?i?<=?end;?i++)?{
          ????????????????sum?+=?i;
          ????????????}
          ????????????return?sum;
          ????????}?else?{
          ????????????//沒有拆分完畢就開始拆分
          ????????????Long?middle?=?(end?+?start)?/?2;//計算的兩個值的中間值
          ????????????ForkJoinWork?right?=?new?ForkJoinWork(start,?middle);
          ????????????right.fork();//拆分,并壓入線程隊列
          ????????????ForkJoinWork?left?=?new?ForkJoinWork(middle?+?1,?end);
          ????????????left.fork();//拆分,并壓入線程隊列

          ????????????//合并
          ????????????return?right.join()?+?left.join();
          ????????}

          ????}
          }

          測試:

          public?class?ForkJoinWorkTest?{
          ????@Test
          ????public?void?test()?{
          ????????//ForkJoin實現(xiàn)
          ????????long?l?=?System.currentTimeMillis();
          ????????ForkJoinPool?forkJoinPool?=?new?ForkJoinPool();//實現(xiàn)ForkJoin?就必須有ForkJoinPool的支持
          ????????ForkJoinTask?task?=?new?ForkJoinWork(0L,?10000000000L);//參數(shù)為起始值與結(jié)束值
          ????????Long?invoke?=?forkJoinPool.invoke(task);
          ????????long?l1?=?System.currentTimeMillis();
          ????????System.out.println("invoke?=?"?+?invoke?+?"??time:?"?+?(l1?-?l));
          ????????//invoke?=?-5340232216128654848??time:?56418
          ????????//ForkJoinWork?forkJoinWork?=?new?ForkJoinWork(0L,?10000000000L);
          ????}
          ????@Test
          ????public?void?test2()?{
          ????????//普通線程實現(xiàn)
          ????????Long?x?=?0L;
          ????????Long?y?=?10000000000L;
          ????????long?l?=?System.currentTimeMillis();
          ????????for?(Long?i?=?0L;?i?<=?y;?i++)?{
          ????????????x?+=?i;
          ????????}
          ????????long?l1?=?System.currentTimeMillis();
          ????????System.out.println("invoke?=?"?+?x?+?"??time:?"?+?(l1?-?l));
          ????????//invoke?=?-5340232216128654848??time:?64069
          ????}

          ????@Test
          ????public?void?test3()?{
          ????????//Java?8?并行流的實現(xiàn)
          ????????long?l?=?System.currentTimeMillis();
          ????????long?reduce?=?LongStream.rangeClosed(0,?10000000000L).parallel().reduce(0,?Long::sum);
          ????????long?l1?=?System.currentTimeMillis();
          ????????System.out.println("invoke?=?"?+?reduce?+?"??time:?"?+?(l1?-?l));
          ????????//invoke?=?-5340232216128654848??time:?2152
          ????}
          }

          結(jié)論:Java 8 就為我們提供了一個并行流來實現(xiàn)ForkJoin實現(xiàn)的功能??梢钥吹讲⑿辛鞅茸约簩崿F(xiàn)ForkJoin還要快。

          Java 8 中將并行流進(jìn)行了優(yōu)化,我們可以很容易的對數(shù)據(jù)進(jìn)行并行流的操作,Stream API可以聲明性的通過parallel()與sequential()在并行流與串行流中隨意切換!

          核心組件

          F/J框架的實現(xiàn)非常復(fù)雜,內(nèi)部大量運用了位操作和無鎖算法,撇開這些實現(xiàn)細(xì)節(jié)不談,該框架主要涉及三大核心組件:ForkJoinPool(線程池)、ForkJoinTask(任務(wù))、ForkJoinWorkerThread(工作線程),外加WorkQueue(任務(wù)隊列):

          1. ForkJoinPool:ExecutorService的實現(xiàn)類,負(fù)責(zé)工作線程的管理、任務(wù)隊列的維護(hù),以及控制整個任務(wù)調(diào)度流程;
          2. ForkJoinTask:Future接口的實現(xiàn)類,fork是其核心方法,用于分解任務(wù)并異步執(zhí)行;而join方法在任務(wù)結(jié)果計算完畢之后才會運行,用來合并或返回計算結(jié)果;
          3. ForkJoinWorkerThread:Thread的子類,作為線程池中的工作線程(Worker)執(zhí)行任務(wù);
          4. WorkQueue:任務(wù)隊列,用于保存任務(wù);
          ForkJoinPool

          ForkJoinPool作為Executors框架的一員,從外部看與其它線程池并沒有什么區(qū)別,僅僅是ExecutorService的一個實現(xiàn)類:

          ForkJoinPool的主要工作如下:

          1. 接受外部任務(wù)的提交(外部調(diào)用ForkJoinPool的invoke/execute/submit方法提交任務(wù));
          2. 接受ForkJoinTask自身fork出的子任務(wù)的提交;
          3. 任務(wù)隊列數(shù)組(WorkQueue[])的初始化和管理;工作線程(Worker)的創(chuàng)建/管理。

          注意:ForkJoinPool提供了3類外部提交任務(wù)的方法:invoke、execute、submit,它們的主要區(qū)別在于任務(wù)的執(zhí)行方式上。

          1. 通過invoke方法提交的任務(wù),調(diào)用線程直到任務(wù)執(zhí)行完成才會返回,也就是說這是一個同步方法,且有返回結(jié)果;
          2. 通過execute方法提交的任務(wù),調(diào)用線程會立即返回,也就是說這是一個異步方法,且沒有返回結(jié)果;
          3. 通過submit方法提交的任務(wù),調(diào)用線程會立即返回,也就是說這是一個異步方法,且返回結(jié)果(返回Future實現(xiàn)類,可以通過get獲取結(jié)果)。

          注意:ForkJoinPool支持兩種模式:

          同步模式(默認(rèn)方式) 異步模式

          這里的同步/異步并不是指F/J框架本身是采用同步模式還是采用異步模式工作,而是指其中的工作線程的工作方式。在F/J框架中,每個工作線程(Worker)都有一個屬于自己的任務(wù)隊列(WorkQueue),這是一個底層采用數(shù)組實現(xiàn)的雙向隊列。同步是指:對于工作線程(Worker)自身隊列中的任務(wù),采用后進(jìn)先出(LIFO)的方式執(zhí)行;異步是指:對于工作線程(Worker)自身隊列中的任務(wù),采用先進(jìn)先出(FIFO)的方式執(zhí)行

          ForkJoinTask

          從Fork/Join框架的描述上來看,“任務(wù)”必須要滿足一定的條件:

          支持Fork,即任務(wù)自身的分解 支持Join,即任務(wù)結(jié)果的合并

          因此JUC提供了一個抽象類 ForkJoinTask,來作為該類Fork/Join任務(wù)的抽象定義

          ForkJoinTask實現(xiàn)了Future接口,是一個異步任務(wù),我們在使用Fork/Join框架時,一般需要使用線程池來調(diào)度任務(wù),線程池內(nèi)部調(diào)度的其實都是ForkJoinTask任務(wù)(即使提交的是一個Runnable或Callable任務(wù),也會被適配成ForkJoinTask)。除了ForkJoinTask,F(xiàn)ork/Join框架還提供了兩個它的抽象實現(xiàn),我們在自定義ForkJoin任務(wù)時,一般繼承這兩個類:

          RecursiveAction:表示具有返回結(jié)果的ForkJoin任務(wù) RecursiveTask:表示沒有返回結(jié)果的ForkJoin任務(wù)

          ForkJoinWorkerThread

          Fork/Join框架中,每個工作線程(Worker)都有一個自己的任務(wù)隊列(WorkerQueue), 所以需要對一般的Thread做些特性化處理,J.U.C提供了ForkJoinWorkerThread類作為ForkJoinPool中的工作線程:

          public?class?ForkJoinWorkerThread?extends?Thread?{
          ????
          ????final?ForkJoinPool?pool;????????????????????//?該工作線程歸屬的線程池
          ????final?ForkJoinPool.WorkQueue?workQueue;?????//?對應(yīng)的任務(wù)隊列
          ?
          ????protected?ForkJoinWorkerThread(ForkJoinPool?pool)?{
          ????????super("aForkJoinWorkerThread");?????????//?指定工作線程名稱
          ????????this.pool?=?pool;
          ????????this.workQueue?=?pool.registerWorker(this);
          ????}
          ??
          ????//?...
          }

          ForkJoinWorkerThread 在構(gòu)造過程中,會。同時,它會通過ForkJoinPool的registerWorker方保存所屬線程池信息和與自己綁定的任務(wù)隊列信息法將自己注冊到線程池中。

          WorkQueue

          任務(wù)隊列(WorkQueue)是ForkJoinPool與其它線程池區(qū)別最大的地方,在ForkJoinPool內(nèi)部,維護(hù)著一個WorkQueue[]數(shù)組,它會在外部首次提交任務(wù))時進(jìn)行初始化:

          volatile?WorkQueue[]?workQueues;?//?main?registry

          當(dāng)通過線程池的外部方法(submit、invoke、execute)提交任務(wù)時,如果WorkQueue[]沒有初始化,則會進(jìn)行初始化;然后根據(jù)數(shù)組大小和線程隨機(jī)數(shù)(ThreadLocalRandom.probe)等信息,計算出任務(wù)隊列所在的數(shù)組索引(這個索引一定是偶數(shù)),如果索引處沒有任務(wù)隊列,則初始化一個,再將任務(wù)入隊。也就是說,通過外部方法提交的任務(wù)一定是在偶數(shù)隊列,沒有綁定工作線程。WorkQueue作為ForkJoinPool的內(nèi)部類,表示一個雙端隊列雙端隊列,既可以作為棧使用(LIFO),也可以作為隊列使用(FIFO)。ForkJoinPool的“工作竊取”正是利用了這個特點,當(dāng)工作線程從自己的隊列中獲取任務(wù)時,默認(rèn)總是以棧操作(LIFO)的方式從棧頂取任務(wù);當(dāng)工作線程嘗試竊取其它任務(wù)隊列中的任務(wù)時,則是FIFO的方式。

          線程池中的每個工作線程(ForkJoinWorkerThread)都有一個自己的任務(wù)隊列(WorkQueue),工作線程優(yōu)先處理自身隊列中的任務(wù)(LIFO或FIFO順序,由線程池構(gòu)造時的參數(shù) mode 決定),自身隊列為空時,以FIFO的順序隨機(jī)竊取其它隊列中的任務(wù)。

          F/J框架的核心來自于它的工作竊取調(diào)度策略,可以總結(jié)為以下幾點:

          1. 每個Worker線程利用它自己的任務(wù)隊列維護(hù)可執(zhí)行任務(wù);
          2. 任務(wù)隊列是一種雙端隊列,支持LIFO的push和pop操作,也支持FIFO的take操作;
          3. 任務(wù)fork的子任務(wù),只會push到它所在線程(調(diào)用fork方法的線程)的隊列;
          4. 工作線程既可以使用LIFO通過pop處理自己隊列中的任務(wù),也可以FIFO通過poll處理自己隊列中的任務(wù),具體取決于構(gòu)造線程池時的asyncMode參數(shù);
          5. 當(dāng)工作線程自己隊列中沒有待處理任務(wù)時,它嘗試去隨機(jī)讀?。ǜ`?。┢渌蝿?wù)隊列的base端的任務(wù);
          6. 當(dāng)線程進(jìn)入join操作,它也會去處理其它工作線程的隊列中的任務(wù)(自己的已經(jīng)處理完了),直到目標(biāo)任務(wù)完成(通過isDone方法);
          7. 當(dāng)一個工作線程沒有任務(wù)了,并且嘗試從其它隊列竊取也失敗了,它讓出資源(通過使用yields, sleeps或者其它優(yōu)先級調(diào)整)并且隨后會再次激活,直到所有工作線程都空閑了——此時,它們都阻塞在等待另一個頂層線程的調(diào)用。

          CountDownLatch

          CountDownLatch是一個非常實用的多線程控制工具類,可以簡單聯(lián)想到下課倒計時一起開飯,百米賽跑一起跑。常用的就下面幾個方法:

          CountDownLatch(int?count)?//實例化一個倒計數(shù)器,count指定計數(shù)個數(shù)
          countDown()?//?計數(shù)減一
          await()?//等待,當(dāng)計數(shù)減到0時,所有線程并行執(zhí)行

          CountDownLatch在我們工作的多個場景被使用,算是用的很頻繁的了,比如我們的API接口響應(yīng)時間被要求在200ms以內(nèi),但是如果一個接口內(nèi)部依賴多個三方/外部服務(wù),那串行調(diào)用接口的RT必然很久,所以個人用的最多的是接口RT優(yōu)化場景,內(nèi)部服務(wù)并行調(diào)用。

          對于倒計數(shù)器,一種典型的場景就是火箭發(fā)射。在火箭發(fā)射前,為了保證萬無一失,往往還要進(jìn)行各項設(shè)備、儀器的檢測。只有等到所有的檢查完畢后,引擎才能點火。那么在檢測環(huán)節(jié)當(dāng)然是多個檢測項可以的。同時進(jìn)行代碼:

          /**
          ?*?@Description:?倒計時器示例:火箭發(fā)射
          ?*/

          public?class?CountDownLatchDemo?implements?Runnable{
          ????static?final?CountDownLatch?latch?=?new?CountDownLatch(10);
          ????static?final?CountDownLatchDemo?demo?=?new?CountDownLatchDemo();

          ????@Override
          ????public?void?run()?{
          ????????//?模擬檢查任務(wù)
          ????????try?{
          ????????????Thread.sleep(new?Random().nextInt(10)?*?1000);
          ????????????System.out.println("檢查完畢");
          ????????}?catch?(InterruptedException?e)?{
          ????????????e.printStackTrace();
          ????????}?finally?{
          ????????????//計數(shù)減一
          ????????????//放在finally避免任務(wù)執(zhí)行過程出現(xiàn)異常,導(dǎo)致countDown()不能被執(zhí)行
          ????????????latch.countDown();
          ????????}
          ????}
          ????public?static?void?main(String[]?args)?throws?InterruptedException?{
          ????????ExecutorService?exec?=?Executors.newFixedThreadPool(10);
          ????????for?(int?i=0;?i<10;?i++){
          ????????????exec.submit(demo);
          ????????}
          ????????//?等待檢查
          ????????latch.await();?//?外部主線程main 方法來等待下面運行?。?!
          ????????//?發(fā)射火箭
          ????????System.out.println("Fire!");
          ????????//?關(guān)閉線程池
          ????????exec.shutdown();
          ????}
          }

          上述代碼中我們先生成了一個CountDownLatch實例。計數(shù)數(shù)量為10,這表示需要有10個線程來完成任務(wù),等待在CountDownLatch上的線程才能繼續(xù)執(zhí)行。latch.countDown(); 方法作用是通知CountDownLatch有一個線程已經(jīng)準(zhǔn)備完畢,倒計數(shù)器可以減一了。atch.await()方法要求主線程等待所有10個檢查任務(wù)全部準(zhǔn)備好才一起并行執(zhí)行。latch.countDown()的調(diào)用不一定非要開啟線程執(zhí)行,即使你在主線程中下面這樣寫效果也是一樣。

          ?for?(int?i?=?0;?i?10;?i++)?{
          ?????countDownLatch.countDown();
          ?}

          CyclicBarrier

          這個類的中文意思是循環(huán)柵欄。大概的意思就是一個可循環(huán)利用的屏障。它的作用就是會讓所有線程都等待完成后才會繼續(xù)下一步行動。舉個例子,就像生活中我們會約朋友們到某個餐廳一起吃飯,有些朋友可能會早到,有些朋友可能會晚到,但是這個餐廳規(guī)定必須等到所有人到齊之后才會讓我們進(jìn)去。這里的朋友們就是各個線程,餐廳就是 CyclicBarrier。構(gòu)造方法

          public?CyclicBarrier(int?parties)
          public?CyclicBarrier(int?parties,?Runnable?barrierAction)

          parties 是參與線程的個數(shù) 第二個構(gòu)造方法有一個 Runnable 參數(shù),這個參數(shù)的意思是到達(dá)線程最后一個要做的任務(wù)

          重要方法:

          public?int?await()?throws?InterruptedException,?BrokenBarrierException
          public?int?await(long?timeout,?TimeUnit?unit)?throws?InterruptedException,?BrokenBarrierException,?TimeoutException

          線程調(diào)用 await() 表示自己已經(jīng)到達(dá)柵欄 BrokenBarrierException 表示柵欄已經(jīng)被破壞,破壞的原因可能是其中一個線程 await() 時被中斷或者超時

          demo:一個線程組的線程需要等待所有線程完成任務(wù)后再繼續(xù)執(zhí)行下一次任務(wù)

          public?class?CyclicBarrierTest?{
          ????public?static?void?main(String[]?args)?{
          ????????//定義一個計數(shù)器,當(dāng)計數(shù)器的值累加到30,輸出"放行"
          ????????CyclicBarrier?cyclicBarrier?=?new?CyclicBarrier(30,()->{
          ????????????System.out.println("放行");
          ????????});
          ????????for?(int?i?=?1;?i?<=?90;?i++)?{
          ????????????final?int?temp?=?i;
          ????????????new?Thread(()->{
          ????????????????System.out.println("-->"+temp);
          ????????????????try?{
          ????????????????????cyclicBarrier.await();
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?catch?(BrokenBarrierException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}).start();

          ????????}
          ????}
          }

          上面的結(jié)果會出現(xiàn)3次放行哦。

          CyclicBarrier 與 CountDownLatch 區(qū)別

          1、CountDownLatch 是一次性的,CyclicBarrier 是可循環(huán)利用的
          2、CountDownLatch 參與的線程的職責(zé)是不一樣的,有的在倒計時,有的在等待倒計時結(jié)束。
          3、CyclicBarrier 參與的線程職責(zé)是一樣的。CountDownLatch 做減法計算,count=0,喚醒阻塞線程,CyclicBarrier 做加法計算,count=屏障值(parties),喚醒阻塞線程。
          4、最重要:CountDownLatch的放行由第三者控制,CyclicBarrier是由一組線程本身來控制的, CountDownLatch放行條件>=線程數(shù)。CyclicBarrier放行條件=線程數(shù)。

          Semaphore

          用途:控制同時訪問某個特定資源的線程數(shù)據(jù),用來流量控制。一個超市只能容納5個人購物,其余人排隊。


          public?class?SemaphoreTest?{
          ????public?static?void?main(String[]?args)?{
          ????????//同時只能進(jìn)5個人
          ????????Semaphore?semaphore?=?new?Semaphore(5);
          ????????for?(int?i?=?0;?i?15;?i++)?{
          ????????????new?Thread(()?->?{
          ????????????????try?{
          ????????????????????//獲得許可
          ????????????????????semaphore.acquire();?//?已經(jīng)進(jìn)店人+1
          ????????????????????System.out.println(Thread.currentThread().getName()?+?"進(jìn)店購物");
          ????????????????????TimeUnit.SECONDS.sleep(5);
          ????????????????????System.out.println(Thread.currentThread().getName()?+?"出店");
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}?finally?{
          ????????????????????//釋放許可
          ????????????????????semaphore.release();?//已經(jīng)進(jìn)店人?-1?
          ????????????????}
          ????????????},?String.valueOf(i)).start();
          ????????}
          ????}
          }

          實現(xiàn)數(shù)據(jù)庫連接池 數(shù)據(jù)庫連接實現(xiàn):

          public?class?SqlConnectImpl?implements?Connection{
          ?
          ?/*拿一個數(shù)據(jù)庫連接*/
          ????public?static?final?Connection?fetchConnection(){
          ????????return?new?SqlConnectImpl();
          ????}
          }

          連接池的實現(xiàn):

          public?class?DBPoolSemaphore?{

          ????private?final?static?int?POOL_SIZE?=?10;
          ????private?final?Semaphore?useful,?useless;//useful表示可用的數(shù)據(jù)庫連接,useless表示已用的數(shù)據(jù)庫連接

          ????public?DBPoolSemaphore()?{
          ????????this.useful?=?new?Semaphore(POOL_SIZE);
          ????????this.useless?=?new?Semaphore(0);
          ????}

          ????//存放數(shù)據(jù)庫連接的容器
          ????private?static?LinkedList?pool?=?new?LinkedList();

          ????//初始化池
          ????static?{
          ????????for?(int?i?=?0;?i?????????????pool.addLast(SqlConnectImpl.fetchConnection());
          ????????}
          ????}

          ????/*歸還連接*/
          ????public?void?returnConnect(Connection?connection)?throws?InterruptedException?{
          ????????if?(connection?!=?null)?{
          ????????????System.out.println("當(dāng)前有"?+?useful.getQueueLength()?+?"個線程等待數(shù)據(jù)庫連接?。?
          ????????????????????+?"可用連接數(shù):"?+?useful.availablePermits());
          ????????????useless.acquire();//?可用連接?+1
          ????????????synchronized?(pool)?{
          ????????????????pool.addLast(connection);
          ????????????}
          ????????????useful.release();?//?已用連接?-1
          ????????}
          ????}

          ????/*從池子拿連接*/
          ????public?Connection?takeConnect()?throws?InterruptedException?{
          ????????useful.acquire();?//?可用連接-1
          ????????Connection?conn;
          ????????synchronized?(pool)?{
          ????????????conn?=?pool.removeFirst();
          ????????}
          ????????useless.release();?//?以用連接+1
          ????????return?conn;
          ????}
          }

          測試代碼:

          public?class?AppTest?{

          ????private?static?DBPoolSemaphore?dbPool?=?new?DBPoolSemaphore();

          ????//業(yè)務(wù)線程
          ????private?static?class?BusiThread?extends?Thread?{
          ????????@Override
          ????????public?void?run()?{
          ????????????Random?r?=?new?Random();//讓每個線程持有連接的時間不一樣
          ????????????long?start?=?System.currentTimeMillis();
          ????????????try?{
          ????????????????Connection?connect?=?dbPool.takeConnect();
          ????????????????System.out.println("Thread_"?+?Thread.currentThread().getId()
          ????????????????????????+?"_獲取數(shù)據(jù)庫連接共耗時【"?+?(System.currentTimeMillis()?-?start)?+?"】ms.");
          ????????????????SleepTools.ms(100?+?r.nextInt(100));//模擬業(yè)務(wù)操作,線程持有連接查詢數(shù)據(jù)
          ????????????????System.out.println("查詢數(shù)據(jù)完成,歸還連接!");
          ????????????????dbPool.returnConnect(connect);
          ????????????}?catch?(InterruptedException?e)?{
          ????????????}
          ????????}
          ????}

          ????public?static?void?main(String[]?args)?{
          ????????for?(int?i?=?0;?i?50;?i++)?{
          ????????????Thread?thread?=?new?BusiThread();
          ????????????thread.start();
          ????????}
          ????}
          }

          Exchange

          兩個線程間的數(shù)據(jù)交換,局限性比較大。Exchange是 阻塞形式的,兩個線程要都到達(dá)執(zhí)行Exchange函數(shù)才會交換。


          public?class?UseExchange?{
          ????private?static?final?Exchanger>?exchange
          ????????????=?new?Exchanger>();

          ????public?static?void?main(String[]?args)?{

          ????????//第一個線程
          ????????new?Thread(new?Runnable()?{
          ????????????@Override
          ????????????public?void?run()?{
          ????????????????Set?setA?=?new?HashSet();//存放數(shù)據(jù)的容器
          ????????????????try?{
          ????????????????????setA.add("liu");
          ????????????????????setA.add("Liu");
          ????????????????????setA.add("LIU");
          ????????????????????setA?=?exchange.exchange(setA);//交換set
          ????????????????????/*處理交換后的數(shù)據(jù)*/
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????}
          ????????????}
          ????????}).start();

          ????????//第二個線程
          ????????new?Thread(new?Runnable()?{
          ????????????@Override
          ????????????public?void?run()?{
          ????????????????Set?setB?=?new?HashSet();//存放數(shù)據(jù)的容器
          ????????????????try?{
          ????????????????????setB.add("jin");
          ????????????????????setB.add("Jie");
          ????????????????????setB.add("JIN");
          ????????????????????setB?=?exchange.exchange(setB);//交換set
          ????????????????????/*處理交換后的數(shù)據(jù)*/
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????}
          ????????????}
          ????????}).start();
          ????}
          }

          Callable,Future,FutureTask

          這三個組合使用,一般我們可以將耗時任務(wù)用子線程去執(zhí)行,同時執(zhí)行我們自己的主線程任務(wù)。主線程執(zhí)行任務(wù)完畢后再調(diào)Future.get()來獲得子線程任務(wù)。說明:

          Callable有返回值可拋出異常,其中返回值有Future獲得。Future 獲得返回值。FutureTask實現(xiàn)Future跟Runnable。1.7前用AQS實現(xiàn)的,1.8以后不再是。

          Future主要函數(shù)功能:

          1. isDone,結(jié)束,正常還是異常結(jié)束,或者自己取消,都返回true;
          2. isCancelled 任務(wù)完成前被取消,返回true;
          3. cancel(boolean):
          • 任務(wù)還沒開始,返回false
          • 任務(wù)已經(jīng)啟動,cancel(true)
          • 中斷正在運行的任務(wù),中斷成功,返回true
          • cancel(false),不會去中斷已經(jīng)運行的任務(wù)
          • 任務(wù)已經(jīng)結(jié)束,返回false

          Future樣例:

          public?class?UseFuture?{

          ????/*實現(xiàn)Callable接口,允許有返回值*/
          ????private?static?class?UseCallable?implements?Callable<Integer>?{
          ????????private?int?sum;
          ????????@Override
          ????????public?Integer?call()?throws?Exception?
          ????????????System.out.println("Callable子線程開始計算")
          ;
          ????????????Thread.sleep(2000);
          ????????????for?(int?i?=?0;?i?5000;?i++)?{
          ????????????????sum?=?sum?+?i;
          ????????????}
          ????????????System.out.println("Callable子線程計算完成,結(jié)果="?+?sum);
          ????????????return?sum;
          ????????}
          ????}

          ????public?static?void?main(String[]?args)?throws?InterruptedException,?ExecutionException?{
          ????????UseCallable?useCallable?=?new?UseCallable();
          ????????FutureTask?futureTask?=?new?FutureTask(useCallable);
          ????????new?Thread(futureTask).start();
          ????????Random?r?=?new?Random();
          ????????SleepTools.second(1);
          ????????if?(r.nextBoolean())?{??//?方法調(diào)用返回下一個偽均勻分布的boolean值
          ????????????System.out.println("Get?UseCallable?result?=?"?+?futureTask.get());
          ????????}?else?{
          ????????????System.out.println("中斷計算");
          ????????????futureTask.cancel(true);
          ????????}
          ????}
          }
          程序員專欄
          ?掃碼關(guān)注填加客服?
          長按識別下方二維碼進(jìn)群

          近期精彩內(nèi)容推薦:??

          ?又一個程序員,被抓捕!(真實事件)

          ?程序員有個可愛女朋友是種什么體驗?

          ?“12306”的架構(gòu)到底有多牛逼?

          ?csv文件讀寫亂碼問題的一個簡單解決方法




          在看點這里好文分享給更多人↓↓

          瀏覽 25
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  狠狠躁日日躁夜夜躁A片无码视频 | 大香蕉在线电影 | 久久精品三级 | 一级a一级a爱片免费免免高潮按摩 | 国产成人小视频 |