<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          我工作三年了,該懂并發(fā)了(干貨)

          共 1347字,需瀏覽 3分鐘

           ·

          2020-08-31 08:54

          點擊藍色“程序員cxuan?”關(guān)注我喲

          加個“星標”,歡迎來撩

          這是程序員cxuan的第36期分享


          本文的組織形式如下,主要會介紹到同步容器類,操作系統(tǒng)的并發(fā)工具,Java 開發(fā)工具包(只是簡單介紹一下,后面會有源碼分析)。同步工具類有哪些。

          下面我們就來介紹一下 Java 并發(fā)中都涉及哪些模塊,這些并發(fā)模塊都是 Java 并發(fā)類庫所提供的。

          同步容器類

          同步容器主要包括兩類,一種是本來就是線程安全實現(xiàn)的容器,這類容器有 Vector、Hashtable、Stack,這類容器的方法上都加了 synchronized 鎖,是線程安全的實現(xiàn)。

          Vector、Hashtable、Stack 這些容器我們現(xiàn)在幾乎都不在使用,因為這些容器在多線程環(huán)境下的效率不高。

          還有一類是由 Collections.synchronizedxxx 實現(xiàn)的非線程安全的容器,使用 Collections.synchronized 會把它們封裝起來編程線程安全的容器,舉出兩個例子

          • Collections.synchronizedList
          • Collections.synchronizedMap

          我們可以通過 Collections 源碼可以看出這些線程安全的實現(xiàn)

          要不為啥要稱 Collections 為集合工具類呢?Collections 會把這些容器類的狀態(tài)封裝起來,并對每個同步方法進行同步,使得每次只有一個線程能夠訪問容器的狀態(tài)。

          其中每個 synchronized xxx都是相當于創(chuàng)建了一個靜態(tài)內(nèi)部類。

          雖然同步容器類都是線程安全的,但是在某些情況下需要額外的客戶端加鎖來保證一些復(fù)合操作的安全性,復(fù)合操作就是有兩個及以上的方法組成的操作,比如最典型的就是 若沒有則添加,用偽代碼表示則是

          if(a?==?null){
          ??a?=?get();
          }

          比如可以用來判斷 Map 中是否有某個 key,如果沒有則添加進 Map 中。這些復(fù)合操作在沒有客戶端加鎖的情況下是線程安全的,但是當多個線程并發(fā)修改容器時,可能會表現(xiàn)出意料之外的行為。例如下面這段代碼

          public?class?TestVector?implements?Runnable{

          ????static?Vector?vector?=?new?Vector();
          ????static?void?addVector(){
          ????????for(int?i?=?0;i?10000
          ;i++){
          ????????????vector.add(i);
          ????????}
          ????}

          ????static?Object?getVector(){
          ????????int?index?=?vector.size()?-?1;
          ????????return?vector.get(index);
          ????}

          ????static?void?removeVector(){
          ????????int?index?=?vector.size()?-?1;
          ????????vector.remove(index);
          ????}

          ????@Override
          ????public?void?run()?{
          ????????getVector();
          ????}

          ????public?static?void?main(String[]?args)?{
          ????????TestVector?testVector?=?new?TestVector();
          ????????testVector.addVector();
          ????????Thread?t1?=?new?Thread(()?->?{
          ????????????for(int?i?=?0;i?????????????????getVector();
          ????????????}
          ????????});

          ????????Thread?t2?=?new?Thread(()?->?{
          ????????????for(int?i?=?0;i?????????????????removeVector();
          ????????????}
          ????????});

          ????????t1.start();
          ????????t2.start();
          ????}

          }

          這些方法看似沒有問題,因為 Vector 能夠保證線程安全性,無論多少個線程訪問 Vector 也不會造成 Vector 的內(nèi)部產(chǎn)生破壞,但是從整個系統(tǒng)來說,是存在線程安全性的,事實上你運行一下,也會發(fā)現(xiàn)報錯。

          會出現(xiàn)

          如果線程 A 在包含這么多元素的基礎(chǔ)上調(diào)用 getVector 方法,會得到一個數(shù)值,getVector 只是取得該元素,而并不是從 vector 中移除,removeVector 方法是得到一個元素進行移除,這段代碼的不安全因素就是,因為線程的時間片是亂序的,而且 getVector 和 removeVector 并不會保證互斥,所以在 removeVector 方法把某個值比如 6666 移除后,vector 中就不存在這個 6666 的元素,此時 getVector 方法取得 6666 ,就會拋出數(shù)組越界異常。為什么是數(shù)組越界異常呢?可以看一下 vector 的源碼

          如果用圖表示的話,則會是下面這樣。

          所以,從系統(tǒng)的層面來看,上面這段代碼也要保證線程安全性才可以,也就是在客戶端加鎖 實現(xiàn),只要我們讓復(fù)合操作使用一把鎖,那么這些操作就和其他單獨的操作一樣都是原子性的。如下面例子所示

          static?Object?getVector(){
          ??synchronized?(vector){
          ????int?index?=?vector.size()?-?1;
          ????return?vector.get(index);
          ??}
          }

          static?void?removeVector(){
          ??synchronized?(vector)?{
          ????int?index?=?vector.size()?-?1;
          ????vector.remove(index);
          ??}
          }

          也可以通過鎖住 .class 來保證原子性操作,也能達到同樣的效果。

          static?Object?getVector(){
          ??synchronized?(TestVector.class){
          ????int?index?=?vector.size()?-?1;
          ????return?vector.get(index);
          ??}
          }

          static?void?removeVector(){
          ??synchronized?(TestVector.class)?{
          ????int?index?=?vector.size()?-?1;
          ????vector.remove(index);
          ??}
          }

          在調(diào)用 size 和 get 之間,Vector 的長度可能會發(fā)生變化,這種變化在對 Vector 進行排序時出現(xiàn),如下所示

          for(int?i?=?0;i??doSomething(vector.get(i));
          }

          這種迭代的操作正確性取決于運氣,即在調(diào)用 size 和 get 之間會修改 Vector,在單線程環(huán)境中,這種假設(shè)完全成立,但是再有其他線程并發(fā)修改 Vector 時,則可能會導(dǎo)致麻煩。

          我們?nèi)耘f可以通過客戶端加鎖的方式來避免這種情況

          synchronized(vector){
          ??for(int?i?=?0;i????doSomething(vector.get(i));
          ??}??
          }

          這種方式為客戶端的可靠性提供了保證,但是犧牲了伸縮性,而且這種在遍歷過程中進行加鎖,也不是我們所希望看到的。

          fail-fast

          針對上面這種情況,很多集合類都提供了一種 fail-fast 機制,因為大部分集合內(nèi)部都是使用 Iterator 進行遍歷,在循環(huán)中使用同步鎖的開銷會很大,而 Iterator 的創(chuàng)建是輕量級的,所以在集合內(nèi)部如果有并發(fā)修改的操作,集合會進行快速失敗,也就是 fail-fast。當他們發(fā)現(xiàn)容器在迭代過程中被修改時,會拋出 ConcurrentModificationException 異常,這種快速失敗不是一種完備的處理機制,而只是 善意的捕獲并發(fā)錯誤。

          如果查看過 ConcurrentModificationException 的注解,你會發(fā)現(xiàn),ConcurrentModificationException 拋出的原則由兩種,如下

          造成這種異常的原因是由于多個線程在遍歷集合的同時對集合類內(nèi)部進行了修改,這也就是 fail-fast 機制。

          該注解還聲明了另外一種方式

          這個問題也是很經(jīng)典的一個問題,我們使用 ArrayList 來舉例子。如下代碼所示

          public?static?void?main(String[]?args)?{
          ??List?list?=?new?ArrayList<>();
          ??for?(int?i?=?0?;?i?10
          ?;?i++?)?{
          ????list.add(i?+?"");
          ??}
          ??Iterator?iterator?=?list.iterator();
          ??int?i?=?0?;
          ??while(iterator.hasNext())?{
          ????if?(i?==?3)?{
          ??????list.remove(3);
          ????}
          ????System.out.println(iterator.next());
          ????i?++;
          ??}
          }

          該段代碼會發(fā)生異常,因為在 ArrayList 內(nèi)部,有兩個屬性,一個是 modCount ,一個是 expectedModCount ,ArrayList 在 remove 等對集合結(jié)構(gòu)的元素造成數(shù)量上的操作會有 checkForComodification 的判斷,如下所示,這也是這段代碼的錯誤原因。

          fail-safe

          fail-safe 是 Java 中的一種 安全失敗 機制,它表示的是在遍歷時不是直接在原集合上進行訪問,而是先復(fù)制原有集合內(nèi)容,在拷貝的集合上進行遍歷。由于迭代時是對原集合的拷貝進行遍歷,所以在遍歷過程中對原集合所作的修改并不能被迭代器檢測到,所以不會觸發(fā) ConcurrentModificationException。java.util.concurrent 包下的容器都是安全失敗的,可以在多線程條件下使用,并發(fā)修改。

          比如 CopyOnWriteArrayList, 它就是一種 fail-safe 機制的集合,它就不會出現(xiàn)異常,例如如下操作

          List?integers?=?new?CopyOnWriteArrayList<>();
          integers.add(1);
          integers.add(2);
          integers.add(3);
          Iterator?itr?=?integers.iterator();
          while?(itr.hasNext())?{
          ????Integer?a?=?itr.next();
          ????integers.remove(a);
          }

          CopyOnWriteArrayList 就是 ArrayList 的一種線程安全的變體,CopyOnWriteArrayList 中的所有可變操作比如 add 和 set 等等都是通過對數(shù)組進行全新復(fù)制來實現(xiàn)的。

          操作系統(tǒng)中的并發(fā)工具

          講到并發(fā)容器,就不得不提操作系統(tǒng)級別實現(xiàn)了哪些進程/線程間的并發(fā)容器,說白了其實就是數(shù)據(jù)結(jié)構(gòu)的設(shè)計。下面我們就來一起看一下操作系統(tǒng)級別的并發(fā)工具

          信號量

          信號量是 E.W.Dijkstra 在 1965 年提出的一種方法,它使用一個整形變量來累計喚醒次數(shù),以供之后使用。在他的觀點中,有一個新的變量類型稱作 信號量(semaphore)。一個信號量的取值可以是 0 ,或任意正數(shù)。0 表示的是不需要任何喚醒,任意的正數(shù)表示的就是喚醒次數(shù)。

          Dijkstra 提出了信號量有兩個操作,現(xiàn)在通常使用 downup(分別可以用 sleep 和 wakeup 來表示)。down 這個指令的操作會檢查值是否大于 0 。如果大于 0 ,則將其值減 1 ;若該值為 0 ,則進程將睡眠,而且此時 down 操作將會繼續(xù)執(zhí)行。檢查數(shù)值、修改變量值以及可能發(fā)生的睡眠操作均為一個單一的、不可分割的 原子操作(atomic action) 完成。

          互斥量

          如果不需要信號量的計數(shù)能力時,可以使用信號量的一個簡單版本,稱為 mutex(互斥量)?;コ饬康膬?yōu)勢就在于在一些共享資源和一段代碼中保持互斥。由于互斥的實現(xiàn)既簡單又有效,這使得互斥量在實現(xiàn)用戶空間線程包時非常有用。

          互斥量是一個處于兩種狀態(tài)之一的共享變量:解鎖(unlocked)加鎖(locked)。這樣,只需要一個二進制位來表示它,不過一般情況下,通常會用一個 整型(integer) 來表示。0 表示解鎖,其他所有的值表示加鎖,比 1 大的值表示加鎖的次數(shù)。

          mutex 使用兩個過程,當一個線程(或者進程)需要訪問關(guān)鍵區(qū)域時,會調(diào)用 mutex_lock 進行加鎖。如果互斥鎖當前處于解鎖狀態(tài)(表示關(guān)鍵區(qū)域可用),則調(diào)用成功,并且調(diào)用線程可以自由進入關(guān)鍵區(qū)域。

          另一方面,如果 mutex 互斥量已經(jīng)鎖定的話,調(diào)用線程會阻塞直到關(guān)鍵區(qū)域內(nèi)的線程執(zhí)行完畢并且調(diào)用了 mutex_unlock 。如果多個線程在 mutex 互斥量上阻塞,將隨機選擇一個線程并允許它獲得鎖。


          Futexes

          隨著并行的增加,有效的同步(synchronization)鎖定(locking) 對于性能來說是非常重要的。如果進程等待時間很短,那么自旋鎖(Spin lock) 是非常有效;但是如果等待時間比較長,那么這會浪費 CPU 周期。如果進程很多,那么阻塞此進程,并僅當鎖被釋放的時候讓內(nèi)核解除阻塞是更有效的方式。不幸的是,這種方式也會導(dǎo)致另外的問題:它可以在進程競爭頻繁的時候運行良好,但是在競爭不是很激烈的情況下內(nèi)核切換的消耗會非常大,而且更困難的是,預(yù)測鎖的競爭數(shù)量更不容易。

          有一種有趣的解決方案是把兩者的優(yōu)點結(jié)合起來,提出一種新的思想,稱為 futex,或者是 快速用戶空間互斥(fast user space mutex),是不是聽起來很有意思?

          futex 是 Linux 中的特性實現(xiàn)了基本的鎖定(很像是互斥鎖)而且避免了陷入內(nèi)核中,因為內(nèi)核的切換的開銷非常大,這樣做可以大大提高性能。futex 由兩部分組成:內(nèi)核服務(wù)和用戶庫。內(nèi)核服務(wù)提供了了一個 等待隊列(wait queue) 允許多個進程在鎖上排隊等待。除非內(nèi)核明確的對他們解除阻塞,否則它們不會運行。


          Pthreads 中的互斥量

          Pthreads 提供了一些功能用來同步線程。最基本的機制是使用互斥量變量,可以鎖定和解鎖,用來保護每個關(guān)鍵區(qū)域。希望進入關(guān)鍵區(qū)域的線程首先要嘗試獲取 mutex。如果 mutex 沒有加鎖,線程能夠馬上進入并且互斥量能夠自動鎖定,從而阻止其他線程進入。如果 mutex 已經(jīng)加鎖,調(diào)用線程會阻塞,直到 mutex 解鎖。如果多個線程在相同的互斥量上等待,當互斥量解鎖時,只有一個線程能夠進入并且重新加鎖。這些鎖并不是必須的,程序員需要正確使用它們。

          下面是與互斥量有關(guān)的函數(shù)調(diào)用

          和我們想象中的一樣,mutex 能夠被創(chuàng)建和銷毀,扮演這兩個角色的分別是 Phread_mutex_initPthread_mutex_destroy。mutex 也可以通過 Pthread_mutex_lock 來進行加鎖,如果互斥量已經(jīng)加鎖,則會阻塞調(diào)用者。還有一個調(diào)用Pthread_mutex_trylock 用來嘗試對線程加鎖,當 mutex 已經(jīng)被加鎖時,會返回一個錯誤代碼而不是阻塞調(diào)用者。這個調(diào)用允許線程有效的進行忙等。最后,Pthread_mutex_unlock 會對 mutex 解鎖并且釋放一個正在等待的線程。

          除了互斥量以外,Pthreads 還提供了第二種同步機制:條件變量(condition variables) 。mutex 可以很好的允許或阻止對關(guān)鍵區(qū)域的訪問。條件變量允許線程由于未滿足某些條件而阻塞。絕大多數(shù)情況下這兩種方法是一起使用的。下面我們進一步來研究線程、互斥量、條件變量之間的關(guān)聯(lián)。

          下面再來重新認識一下生產(chǎn)者和消費者問題:一個線程將東西放在一個緩沖區(qū)內(nèi),由另一個線程將它們?nèi)〕?。如果生產(chǎn)者發(fā)現(xiàn)緩沖區(qū)沒有空槽可以使用了,生產(chǎn)者線程會阻塞起來直到有一個線程可以使用。生產(chǎn)者使用 mutex 來進行原子性檢查從而不受其他線程干擾。但是當發(fā)現(xiàn)緩沖區(qū)已經(jīng)滿了以后,生產(chǎn)者需要一種方法來阻塞自己并在以后被喚醒。這便是條件變量做的工作。

          下面是一些與條件變量有關(guān)的最重要的 pthread 調(diào)用

          上表中給出了一些調(diào)用用來創(chuàng)建和銷毀條件變量。條件變量上的主要屬性是 Pthread_cond_waitPthread_cond_signal。前者阻塞調(diào)用線程,直到其他線程發(fā)出信號為止(使用后者調(diào)用)。阻塞的線程通常需要等待喚醒的信號以此來釋放資源或者執(zhí)行某些其他活動。只有這樣阻塞的線程才能繼續(xù)工作。條件變量允許等待與阻塞原子性的進程。Pthread_cond_broadcast 用來喚醒多個阻塞的、需要等待信號喚醒的線程。

          需要注意的是,條件變量(不像是信號量)不會存在于內(nèi)存中。如果將一個信號量傳遞給一個沒有線程等待的條件變量,那么這個信號就會丟失,這個需要注意

          管程

          為了能夠編寫更加準確無誤的程序,Brinch Hansen 和 Hoare 提出了一個更高級的同步原語叫做 管程(monitor)。管程有一個很重要的特性,即在任何時候管程中只能有一個活躍的進程,這一特性使管程能夠很方便的實現(xiàn)互斥操作。管程是編程語言的特性,所以編譯器知道它們的特殊性,因此可以采用與其他過程調(diào)用不同的方法來處理對管程的調(diào)用。通常情況下,當進程調(diào)用管程中的程序時,該程序的前幾條指令會檢查管程中是否有其他活躍的進程。如果有的話,調(diào)用進程將被掛起,直到另一個進程離開管程才將其喚醒。如果沒有活躍進程在使用管程,那么該調(diào)用進程才可以進入。

          進入管程中的互斥由編譯器負責,但是一種通用做法是使用 互斥量(mutex)二進制信號量(binary semaphore)。由于編譯器而不是程序員在操作,因此出錯的幾率會大大降低。在任何時候,編寫管程的程序員都無需關(guān)心編譯器是如何處理的。他只需要知道將所有的臨界區(qū)轉(zhuǎn)換成為管程過程即可。絕不會有兩個進程同時執(zhí)行臨界區(qū)中的代碼。

          即使管程提供了一種簡單的方式來實現(xiàn)互斥,但在我們看來,這還不夠。因為我們還需要一種在進程無法執(zhí)行被阻塞。在生產(chǎn)者-消費者問題中,很容易將針對緩沖區(qū)滿和緩沖區(qū)空的測試放在管程程序中,但是生產(chǎn)者在發(fā)現(xiàn)緩沖區(qū)滿的時候該如何阻塞呢?

          解決的辦法是引入條件變量(condition variables) 以及相關(guān)的兩個操作 waitsignal。當一個管程程序發(fā)現(xiàn)它不能運行時(例如,生產(chǎn)者發(fā)現(xiàn)緩沖區(qū)已滿),它會在某個條件變量(如 full)上執(zhí)行 wait 操作。這個操作造成調(diào)用進程阻塞,并且還將另一個以前等在管程之外的進程調(diào)入管程。在前面的 pthread 中我們已經(jīng)探討過條件變量的實現(xiàn)細節(jié)了。另一個進程,比如消費者可以通過執(zhí)行 signal 來喚醒阻塞的調(diào)用進程。

          通過臨界區(qū)自動的互斥,管程比信號量更容易保證并行編程的正確性。但是管程也有缺點,我們前面說到過管程是一個編程語言的概念,編譯器必須要識別管程并用某種方式對其互斥作出保證。C、Pascal 以及大多數(shù)其他編程語言都沒有管程,所以不能依靠編譯器來遵守互斥規(guī)則。

          與管程和信號量有關(guān)的另一個問題是,這些機制都是設(shè)計用來解決訪問共享內(nèi)存的一個或多個 CPU 上的互斥問題的。通過將信號量放在共享內(nèi)存中并用 TSLXCHG 指令來保護它們,可以避免競爭。但是如果是在分布式系統(tǒng)中,可能同時具有多個 CPU 的情況,并且每個 CPU 都有自己的私有內(nèi)存呢,它們通過網(wǎng)絡(luò)相連,那么這些原語將會失效。因為信號量太低級了,而管程在少數(shù)幾種編程語言之外無法使用,所以還需要其他方法。

          消息傳遞

          上面提到的其他方法就是 消息傳遞(messaage passing)。這種進程間通信的方法使用兩個原語 sendreceive ,它們像信號量而不像管程,是系統(tǒng)調(diào)用而不是語言級別。示例如下

          send(destination,?&message);

          receive(source,?&message);

          send 方法用于向一個給定的目標發(fā)送一條消息,receive 從一個給定的源接收一條消息。如果沒有消息,接受者可能被阻塞,直到接收一條消息或者帶著錯誤碼返回。

          消息傳遞系統(tǒng)現(xiàn)在面臨著許多信號量和管程所未涉及的問題和設(shè)計難點,尤其對那些在網(wǎng)絡(luò)中不同機器上的通信狀況。例如,消息有可能被網(wǎng)絡(luò)丟失。為了防止消息丟失,發(fā)送方和接收方可以達成一致:一旦接受到消息后,接收方馬上回送一條特殊的 確認(acknowledgement) 消息。如果發(fā)送方在一段時間間隔內(nèi)未收到確認,則重發(fā)消息。

          現(xiàn)在考慮消息本身被正確接收,而返回給發(fā)送著的確認消息丟失的情況。發(fā)送者將重發(fā)消息,這樣接受者將收到兩次相同的消息。

          對于接收者來說,如何區(qū)分新的消息和一條重發(fā)的老消息是非常重要的。通常采用在每條原始消息中嵌入一個連續(xù)的序號來解決此問題。如果接受者收到一條消息,它具有與前面某一條消息一樣的序號,就知道這條消息是重復(fù)的,可以忽略。

          消息系統(tǒng)還必須處理如何命名進程的問題,以便在發(fā)送或接收調(diào)用中清晰的指明進程。身份驗證(authentication) 也是一個問題,比如客戶端怎么知道它是在與一個真正的文件服務(wù)器通信,從發(fā)送方到接收方的信息有可能被中間人所篡改。

          屏障

          最后一個同步機制是準備用于進程組而不是進程間的生產(chǎn)者-消費者情況的。在某些應(yīng)用中劃分了若干階段,并且規(guī)定,除非所有的進程都就緒準備著手下一個階段,否則任何進程都不能進入下一個階段,可以通過在每個階段的結(jié)尾安裝一個 屏障(barrier) 來實現(xiàn)這種行為。當一個進程到達屏障時,它會被屏障所攔截,直到所有的屏障都到達為止。屏障可用于一組進程同步,如下圖所示

          在上圖中我們可以看到,有四個進程接近屏障,這意味著每個進程都在進行運算,但是還沒有到達每個階段的結(jié)尾。過了一段時間后,A、B、D 三個進程都到達了屏障,各自的進程被掛起,但此時還不能進入下一個階段呢,因為進程 B 還沒有執(zhí)行完畢。結(jié)果,當最后一個 C 到達屏障后,這個進程組才能夠進入下一個階段。

          避免鎖:讀-復(fù)制-更新

          最快的鎖是根本沒有鎖。問題在于沒有鎖的情況下,我們是否允許對共享數(shù)據(jù)結(jié)構(gòu)的并發(fā)讀寫進行訪問。答案當然是不可以。假設(shè)進程 A 正在對一個數(shù)字數(shù)組進行排序,而進程 B 正在計算其平均值,而此時你進行 A 的移動,會導(dǎo)致 B 會多次讀到重復(fù)值,而某些值根本沒有遇到過。

          然而,在某些情況下,我們可以允許寫操作來更新數(shù)據(jù)結(jié)構(gòu),即便還有其他的進程正在使用。竅門在于確保每個讀操作要么讀取舊的版本,要么讀取新的版本,例如下面的樹

          上面的樹中,讀操作從根部到葉子遍歷整個樹。加入一個新節(jié)點 X 后,為了實現(xiàn)這一操作,我們要讓這個節(jié)點在樹中可見之前使它"恰好正確":我們對節(jié)點 X 中的所有值進行初始化,包括它的子節(jié)點指針。然后通過原子寫操作,使 X 稱為 A 的子節(jié)點。所有的讀操作都不會讀到前后不一致的版本

          在上面的圖中,我們接著移除 B 和 D。首先,將 A 的左子節(jié)點指針指向 C 。所有原本在 A 中的讀操作將會后續(xù)讀到節(jié)點 C ,而永遠不會讀到 B 和 D。也就是說,它們將只會讀取到新版數(shù)據(jù)。同樣,所有當前在 B 和 D 中的讀操作將繼續(xù)按照原始的數(shù)據(jù)結(jié)構(gòu)指針并且讀取舊版數(shù)據(jù)。所有操作均能正確運行,我們不需要鎖住任何東西。而不需要鎖住數(shù)據(jù)就能夠移除 B 和 D 的主要原因就是 讀-復(fù)制-更新(Ready-Copy-Update,RCU),將更新過程中的移除和再分配過程分離開。

          Java 并發(fā)工具包

          JDK 1.5 提供了許多種并發(fā)容器來改進同步容器的性能,同步容器將所有對容器狀態(tài)的訪問都串行化,以實現(xiàn)他們之間的線程安全性。這種方法的代價是嚴重降低了并發(fā)性能,當多個線程爭搶容器鎖的同時,嚴重降低吞吐量。

          下面我們就來一起認識一下 Java 中都用了哪些并發(fā)工具

          Java 并發(fā)工具綜述

          在 Java 5.0 中新增加了 ConcurrentHashMap 用來替代基于散列的 Map 容器;新增加了 CopyOnWriteArrayListCopyOnWriteArraySet ?來分別替代 ArrayList 和 Set 接口實現(xiàn)類;還新增加了兩種容器類型,分別是 QueueBlockingQueue, Queue 是隊列的意思,它有一些實現(xiàn)分別是傳統(tǒng)的先進先出隊列 ConcurrentLinkedQueue以及并發(fā)優(yōu)先級隊列 PriorityQueue。Queue 是一個先入先出的隊列,它的操作不會阻塞,如果隊列為空那么獲取元素的操作會返回空值。PriorityQueue 擴展了 Queue,增加了可阻塞的插入和獲取等操作。如果隊列為空,那么獲取元素的操作將一直阻塞,直到隊列中出現(xiàn)一個可用的元素為止。如果隊列已滿,那么插入操作則一直阻塞,直到隊列中有可用的空間為止。

          Java 6.0 還引入了 ConcurrentSkipListMapConcurrentSkipListSet 分別作為同步的 SortedMap 和 SortedSet 的并發(fā)替代品。下面我們就展開探討了,設(shè)計不到底層源碼,因為本篇文章主要目的就是為了描述一下有哪些東西以及用了哪些東西。

          ConcurrentHashMap

          我們先來看一下 ConcurrentHashMap 在并發(fā)集合中的位置

          可以看到,ConcurrentHashMap 繼承了 AbstractMap 接口并實現(xiàn)了 ConcurrentMap 和 Serializable 接口,AbstractMap 和 ConcurrentMap 都是 Map 的實現(xiàn)類,只不過 AbstractMap 是抽象實現(xiàn)。

          ConcurrentHashMap 和 Hashtable 的構(gòu)造非常相似,只不過 Hashtable 容器在激烈競爭的場景中會表現(xiàn)出效率低下的現(xiàn)象,這是因為所有訪問 Hashtable 的線程都想獲取同一把鎖,如果容器里面有多把鎖,并且每一把鎖都只用來鎖定一段數(shù)據(jù),那么當多個線程訪問不同的數(shù)據(jù)段時,就不存在競爭關(guān)系。這就是 ConcurreentHashMap 采用的 分段鎖 實現(xiàn)。在這種鎖實現(xiàn)中,任意數(shù)量的讀取線程可以并發(fā)的訪問 Map,執(zhí)行讀取操作的線程和執(zhí)行寫入的線程可以并發(fā)的訪問 Map,并且在讀取的同時也可以并發(fā)修改 Map。

          ConcurrentHashMap 分段鎖實現(xiàn)帶來的結(jié)果是,在并發(fā)環(huán)境下可以實現(xiàn)更高的吞吐量,在單線程環(huán)境下只損失非常小的性能。

          你知道 HashMap 是具有 fail-fast 機制的,也就是說它是一種強一致性的集合,在數(shù)據(jù)不一致的情況下會拋出 ConcurrentModificationException 異常,而 ConcurrentHashMap 是一種 弱一致性 的集合,在并發(fā)修改其內(nèi)部結(jié)構(gòu)時,它不會拋出 ConcurrentModificationException 異常,弱一致性能夠容忍并發(fā)修改。

          在 HashMap 中,我們一般使用的 size、empty、containsKey 等方法都是標準方法,其返回的結(jié)果是一定的,包含就是包含,不包含就是不包含,可以作為判斷條件;而 ConcurrentHashMap 中的這些方法只是參考方法,它不是一個 精確值,像是 size、empty 這些方法在并發(fā)場景下用處很小,因為他們的返回值總是在不斷變化,所以這些操作的需求就被弱化了。

          在 ConcurrentHashMap 中沒有實現(xiàn)對 Map 加鎖從而實現(xiàn)獨占訪問。在線程安全的 Map 實現(xiàn) HashtableCollections.synchronizedMap 中都實現(xiàn)了獨占訪問,因此只能單個線程修改 Map 。ConcurrentHashMap 與這些 Map 容器相比,具有更多的優(yōu)勢和更少的劣勢,只有當需要獨占訪問的需求時才會使用 Hashtable 或者是 Collections.synchronizedMap ,否則其他并發(fā)場景下,應(yīng)該使用 ConcurrentHashMap。

          ConcurrentMap

          ConcurrentMap 是一個接口,它繼承了 Map 接口并提供了 Map 接口中四個新的方法,這四個方法都是 原子性 方法,進一步擴展了 Map 的功能。

          public?interface?ConcurrentMap<K,?V>?extends?Map<K,?V>?{
          ?
          ??//?僅當?key?沒有相應(yīng)的映射值時才插入
          ??V?putIfAbsent(K?key,?V?value);
          ??
          ??//?僅當?key?被映射到?value?時才移除
          ??boolean?remove(Object?key,?Object?value);
          ??
          ??//?僅當?key?被映射到?value?時才移除
          ??V?replace(K?key,?V?value);
          ??
          ??//?僅當?key?被映射到?oldValue?時才替換為?newValue
          ??boolean?replace(K?key,?V?oldValue,?V?newValue);
          ??
          }

          ConcurrentNavigableMap

          java.util.concurrent.ConcurrentNavigableMap 類是 java.util.NavigableMap 的子類,它支持并發(fā)訪問,并且允許其視圖的并發(fā)訪問。

          什么是視圖呢?視圖就是集合中的一段數(shù)據(jù)序列,ConcurrentNavigableMap 中支持使用 headMap、subMap、tailMap 返回的視圖。與其重新解釋一下 NavigableMap 中找到的所有方法,不如看一下 ConcurrentNavigableMap 中添加的方法

          • headMap 方法:headMap 方法返回一個嚴格小于給定鍵的視圖
          • tailMap 方法:tailMap 方法返回包含大于或等于給定鍵的視圖。
          • subMap 方法:subMap 方法返回給定兩個參數(shù)的視圖

          ConcurrentNavigableMap 接口包含一些可能有用的其他方法

          • descendingKeySet()
          • descendingMap()
          • navigableKeySet()

          更多關(guān)于方法的描述這里就不再贅述了,讀者朋友們可自行查閱 javadoc

          ConcurrentSkipListMap

          ConcurrentSkipListMap 是線程安全的有序的哈希表,適用于高并發(fā)的場景。

          ConcurrentSkipListMap 的底層數(shù)據(jù)結(jié)構(gòu)是基于跳表實現(xiàn)的。ConcurrentSkipListMap 可以提供 Comparable 內(nèi)部排序或者是 Comparator 外部排序,具體取決于使用哪個構(gòu)造函數(shù)。

          ConcurrentSkipListSet

          ConcurrentSkipListSet 是線程安全的有序的集合,適用于高并發(fā)的場景。ConcurrentSkipListSet 底層是通過 ConcurrentNavigableMap 來實現(xiàn)的,它是一個有序的線程安全的集合。

          ConcurrentSkipListSet有序的,基于元素的自然排序或者通過比較器確定的順序;

          ConcurrentSkipListSet是線程安全的;

          CopyOnWriteArrayList

          CopyOnWriteArrayList 是 ArrayList 的變體,在 CopyOnWriteArrayList 中,所有可變操作比如 add、set 其實都是重新創(chuàng)建了一個副本,通過對數(shù)組進行復(fù)制而實現(xiàn)的。

          CopyOnWriteArrayList 其內(nèi)部有一個指向數(shù)組的引用,數(shù)組是不會被修改的,每次并發(fā)修改 CopyOnWriteArrayList 都相當于重新創(chuàng)建副本,CopyOnWriteArrayList 是一種 fail-safe 機制的,它不會拋出 ConcurrentModificationException 異常,并且返回元素與迭代器創(chuàng)建時的元素相同。

          每次并發(fā)寫操作都會創(chuàng)建新的副本,這個過程存在一定的開銷,所以,一般在規(guī)模很大時,讀操作要遠遠多于寫操作時,為了保證線程安全性,會使用 CopyOnWriteArrayList。

          類似的,CopyOnWriteArraySet 的作用也相當于替代了 Set 接口。


          BlockingQueue

          BlockingQueue 譯為 阻塞隊列,它是 JDK 1.5 添加的新的工具類,它繼承于 Queue 隊列,并擴展了 Queue 的功能。

          BlockingQueue 在檢索元素時會等待隊列變成非空,并在存儲元素時會等待隊列變?yōu)榭捎?。BlockingQueue 的方法有四種實現(xiàn)形式,以不同的方式來處理。

          • 第一種是拋出異常
          • 特殊值:第二種是根據(jù)情況會返回 null 或者 false
          • 阻塞:第三種是無限期的阻塞當前線程直到操作變?yōu)榭捎煤?/span>
          • 超時:第四種是給定一個最大的超時時間,超過后才會放棄

          BlockingQueue 不允許添加 null 元素,在其實現(xiàn)類的方法 add、put 或者 offer 后時添加 null 會拋出空指針異常。BlockingQueue 會有容量限制。在任意時間內(nèi),它都會有一個 remainCapacity,超過該值之前,任意 put 元素都會阻塞。

          BlockingQueue 一般用于實現(xiàn)生產(chǎn)者 - 消費者 隊列,如下圖所示

          BlockingQueue 有多種實現(xiàn),下面我們一起來認識一下這些容器。

          其中 LinkedBlockingQueueArrayBlockingQueue 是 FIFO 先入先出隊列,二者分別和 ?LinkedListArrayList 對應(yīng),比同步 List 具有更好的并發(fā)性能。PriorityBlockingQueue 是一個優(yōu)先級排序的阻塞隊列,如果你希望按照某種順序而不是 FIFO 處理元素時這個隊列將非常有用。正如其他有序的容器一樣,PriorityBlockingQueue 既可以按照自然順序來比較元素,也可以使用 Comparator 比較器進行外部元素比較。SynchronousQueue 它維護的是一組線程而不是一組隊列,實際上它不是一個隊列,它的每個 insert 操作必須等待其他相關(guān)線程的 remove 方法后才能執(zhí)行,反之亦然。

          LinkedBlockingQueue

          LinkedBlockingQueue 是一種 BlockingQueue 的實現(xiàn)。

          它是一種基于鏈表的構(gòu)造、先入先出的有界阻塞隊列。隊列的 head 也就是頭元素是在隊列中等待時間最長的元素;隊列的 tail也就是隊尾元素是隊列中等待時間最短的元素。新的元素會被插入到隊尾中,檢索操作將獲取隊列中的頭部元素。鏈表隊列通常比基于數(shù)組的隊列具有更高的吞吐量,但是在大多數(shù)并發(fā)應(yīng)用程序中,可預(yù)測的性能較差。

          ArrayBlockingQueue

          ArrayBlockingQueue 是一個用數(shù)組實現(xiàn)的有界隊列,此隊列順序按照先入先出的原則對元素進行排序。

          默認情況下不保證線程公平的訪問隊列,所謂公平訪問隊列指的是阻塞的線程,可以按照阻塞的先后順序訪問,即先阻塞線程先訪問隊列。非公平性是對先等待的線程是非公平的。有可能先阻塞的線程最后才訪問隊列。

          PriorityBlockingQueue

          PriorityBlockingQueue 是一個支持優(yōu)先級的阻塞隊列,默認情況下的元素采取自然順序生序或者降序,也可以自己定義 Comparator 進行外部排序。但需要注意的是不能保證同優(yōu)先級元素的順序。

          DelayQueue

          DelayQueue 是一個支持延時獲取元素的無阻塞隊列,其中的元素只能在延遲到期后才能使用,DelayQueue 中的隊列頭是延遲最長時間的元素,如果沒有延遲,則沒有 head 頭元素,poll 方法會返回 null。判斷的依據(jù)就是 getDelay(TimeUnit.NANOSECONDS) 方法返回一個值小于或者等于 0 就會發(fā)生過期。

          TransferQueue

          TransferQueue 繼承于 BlockingQueue,它是一個接口,一個 BlockingQueue 是一個生產(chǎn)者可能等待消費者接受元素,TransferQueue 則更進一步,生產(chǎn)者會一直阻塞直到所添加到隊列的元素被某一個消費者所消費,新添加的transfer 方法用來實現(xiàn)這種約束。

          TransferQueue 有下面這些方法:兩個 tryTransfer 方法,一個是非阻塞的,另一個是帶有 timeout 參數(shù)設(shè)置超時時間的。還有兩個輔助方法 hasWaitingConsumergetWaitingConcusmerCount。

          LinkedTransferQueue

          一個無界的基于鏈表的 TransferQueue。這個隊列對任何給定的生產(chǎn)者進行 FIFO 排序,head 是隊列中存在時間最長的元素。tail 是隊列中存在時間最短的元素。

          BlockingDeque

          與 BlockingQueue 相對的還有 BlockingDeque 和 Deque,它們是 JDK1.6 被提出的,分別對 Queue 和 BlockingQueue 做了擴展。

          Deque 是一個雙端隊列,分別實現(xiàn)了在隊列頭和隊列尾的插入。Deque 的實現(xiàn)有 ArrayDeque、ConcurrentLinkedDeque,BlockingDeque 的實現(xiàn)有 LinkedBlockingDeque 。

          阻塞模式一般用于生產(chǎn)者 - 消費者隊列,而雙端隊列適用于工作密取。在工作密取的設(shè)計中,每個消費者都有各自的雙端隊列,如果一個消費者完成了自己雙端隊列的任務(wù),就會去其他雙端隊列的末尾進行消費。密取方式要比傳統(tǒng)的生產(chǎn)者 - 消費者隊列具有更高的可伸縮性,這是因為每個工作密取的工作者都有自己的雙端隊列,不存在競爭的情況。

          ArrayDeque

          ArrayDeque 是 Deque 的可動態(tài)調(diào)整大小的數(shù)組實現(xiàn),其內(nèi)部沒有容量限制,他們會根據(jù)需要進行增長。ArrayDeque 不是線程安全的,如果沒有外部加鎖的情況下,不支持多線程訪問。ArrayDeque 禁止空元素,這個類作為棧使用時要比 Stack 快,作為 queue 使用時要比 LinkedList 快。

          除了 remove、removeFirstOccurrence、removeLastOccurrence、contains、interator.remove 外,大部分的 ArrayDeque 都以恒定的開銷運行。

          注意:ArrayDeque 是 fail-fast 的,如果創(chuàng)建了迭代器之后,卻使用了迭代器外部的 remove 等修改方法,那么這個類將會拋出 ConcurrentModificationException 異常。

          ConcurrentLinkedDeque

          ConcurrentLinkedDeque 是 JDK1.7 引入的雙向鏈表的無界并發(fā)隊列。它與 ConcurrentLinkedQueue 的區(qū)別是 ConcurrentLinkedDeque 同時支持 FIFO 和 FILO 兩種操作方式,即可以從隊列的頭和尾同時操作(插入/刪除)。ConcurrentLinkedDeque 也支持 happen-before 原則。ConcurrentLinkedDeque 不允許空元素。

          LinkedBlockingDeque

          LinkedBlockingDeque 是一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列,即可以從隊列的兩端插入和移除元素。雙向隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。LinkedBlockingDeque 把初始容量和構(gòu)造函數(shù)綁定,這樣能夠有效過度拓展。初始容量如果沒有指定,就取的是 Integer.MAX_VALUE,這也是 LinkedBlockingDeque 的默認構(gòu)造函數(shù)。

          同步工具類

          同步工具類可以是任何一個對象,只要它根據(jù)自身狀態(tài)來協(xié)調(diào)線程的控制流。阻塞隊列可以作為同步控制類,其他類型的同步工具類還包括 信號量(Semaphore)柵欄(Barrier)閉鎖(Latch)。下面我們就來一起認識一下這些工具類

          Semaphore

          Semaphore 翻譯過來就是 信號量,信號量是什么?它其實就是一種信號,在操作系統(tǒng)中,也有信號量的這個概念,在進程間通信的時候,我們就會談到信號量進行通信。還有在 Linux 操作系統(tǒng)采取中斷時,也會向進程發(fā)出中斷信號,根據(jù)進程的種類和信號的類型判斷是否應(yīng)該結(jié)束進程。

          在 Java 中,Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源。

          Semaphore 管理著一組許可(permit),許可的初始數(shù)量由構(gòu)造函數(shù)來指定。在獲取某個資源之前,應(yīng)該先從信號量獲取許可(permit),以確保資源是否可用。當線程完成對資源的操作后,會把它放在池中并向信號量返回一個許可,從而允許其他線程訪問資源,這叫做釋放許可。如果沒有許可的話,那么 acquire 將會阻塞直到有許可(中斷或者操作超時)為止。release 方法將返回一個許可信號量。

          Semaphore 可以用來實現(xiàn)流量控制,例如常用的數(shù)據(jù)庫連接池,線程請求資源時,如果數(shù)據(jù)庫連接池為空則阻塞線程,直接返回失敗,如果連接池不為空時解除阻塞。

          CountDownLatch

          閉鎖(Latch) 是一種同步工具類,它可以延遲線程的進度以直到其到達終止狀態(tài)。閉鎖的作用相當于是一扇門,在閉鎖達到結(jié)束狀態(tài)前,門是一直關(guān)著的,沒有任何線程能夠通過。當閉鎖到達結(jié)束狀態(tài)后,這扇門會打開并且允許任何線程通過,然后就一直保持打開狀態(tài)。

          CountDownLatch 就是閉鎖的一種實現(xiàn)。它可以使一個或者多個線程等待一組事件的發(fā)生。閉鎖有一個計數(shù)器,閉鎖需要對計數(shù)器進行初始化,表示需要等待的次數(shù),閉鎖在調(diào)用 await 處進行等待,其他線程在調(diào)用 countDown 把閉鎖 count 次數(shù)進行遞減,直到遞減為 0 ,喚醒 await。如下代碼所示

          public?class?TCountDownLatch?{

          ????public?static?void?main(String[]?args)?{
          ????????CountDownLatch?latch?=?new?CountDownLatch(5);
          ????????Increment?increment?=?new?Increment(latch);
          ????????Decrement?decrement?=?new?Decrement(latch);

          ????????new?Thread(increment).start();
          ????????new?Thread(decrement).start();

          ????????try?{
          ????????????Thread.sleep(6000);
          ????????}?catch?(InterruptedException?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????}
          }
          class?Decrement?implements?Runnable?{

          ????CountDownLatch?countDownLatch;

          ????public?Decrement(CountDownLatch?countDownLatch){
          ????????this.countDownLatch?=?countDownLatch;
          ????}

          ????@Override
          ????public?void?run()?{
          ????????try?{

          ????????????for(long?i?=?countDownLatch.getCount();i?>?0;i--){
          ????????????????Thread.sleep(1000);
          ????????????????System.out.println("countdown");
          ????????????????this.countDownLatch.countDown();
          ????????????}

          ????????}?catch?(InterruptedException?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????}
          }
          class?Increment?implements?Runnable?{

          ????CountDownLatch?countDownLatch;

          ????public?Increment(CountDownLatch?countDownLatch){
          ????????this.countDownLatch?=?countDownLatch;
          ????}

          ????@Override
          ????public?void?run()?{
          ????????try?{
          ????????????System.out.println("await");
          ????????????countDownLatch.await();
          ????????}?catch?(InterruptedException?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????????System.out.println("Waiter?Released");
          ????}
          }

          Future

          我們常見的創(chuàng)建多線程的方式有兩種,一種是繼承 Thread 類,一種是實現(xiàn) Runnable 接口。這兩種方式都沒有返回值。相對的,創(chuàng)建多線程還有其他三種方式,那就是使用 Callable接口、 Future 接口和 FutureTask 類。Callable 我們之前聊過,這里就不再描述了,我們主要來描述一下 Future 和 FutureTask 接口。

          Future 就是對具體的 Runnable 或者 Callable 任務(wù)的執(zhí)行結(jié)果進行一系列的操作,必要時可通過 get 方法獲取執(zhí)行結(jié)果,這個方法會阻塞直到執(zhí)行結(jié)束。Future 中的主要方法有

          public?interface?Future<V>?{
          ????boolean?cancel(boolean?mayInterruptIfRunning);
          ????boolean?isCancelled();
          ????boolean?isDone();
          ????V?get()?throws?InterruptedException,?ExecutionException;
          ????V?get(long?timeout,?TimeUnit?unit)
          ????????throws?InterruptedException,?ExecutionException,?TimeoutException
          ;
          }
          • cancel(boolean mayInterruptIfRunning) : 嘗試取消任務(wù)的執(zhí)行。如果任務(wù)已經(jīng)完成、已經(jīng)被取消或者由于某些原因而無法取消,那么這個嘗試會失敗。如果取消成功,或者在調(diào)用 cancel 時此任務(wù)尚未開始,那么此任務(wù)永遠不會執(zhí)行。如果任務(wù)已經(jīng)開始,那么 mayInterruptIfRunning 參數(shù)會確定是否中斷執(zhí)行任務(wù)以便于嘗試停止該任務(wù)。這個方法返回后,會對 isDone 的后續(xù)調(diào)用也返回 true,如果 cancel 返回 true,那么后續(xù)的調(diào)用 isCancelled 也會返回 true。
          • boolean isCancelled():如果此任務(wù)在正常完成之前被取消,則返回 true。
          • boolean isDone():如果任務(wù)完成,返回 true。
          • V get() throws InterruptedException, ExecutionException:等待必要的計算完成,然后檢索其結(jié)果
          • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException :必要時最多等待給定時間以完成計算,然后檢索其結(jié)果。

          因為Future只是一個接口,所以是無法直接用來創(chuàng)建對象使用的,因此就有了下面的FutureTask。

          FutureTask

          FutureTask 實現(xiàn)了 RunnableFuture 接口,RunnableFuture 接口是什么呢?

          RunnableFuture 接口又繼承了 Runnable 接口和 Future 接口。納尼?在 Java 中不是只允許單繼承么,是的,單繼承更多的是說的類與類之間的繼承關(guān)系,子類繼承父類,擴展父類的接口,這個過程是單向的,就是為了解決多繼承引起的過渡引用問題。而接口之間的繼承是接口的擴展,在 Java 編程思想中也印證了這一點

          對 RunnableFuture 接口的解釋是:成功執(zhí)行的 run 方法會使 Future 接口的完成并允許訪問其結(jié)果。所以它既可以作為 Runnable 被線程執(zhí)行,又可以作為 Future 得到 Callable 的返回值。

          FutureTask 也可以用作閉鎖,它可以處于以下三種狀態(tài)

          • 等待運行
          • 正在運行
          • 運行完成

          FutureTask 在 Executor 框架中表示異步任務(wù),此外還可以表示一些時間較長的計算,這些計算可以在使用計算結(jié)果之前啟動。

          FutureTask 具體的源碼我后面會單獨出文章進行描述。

          Barrier

          我們上面聊到了通過閉鎖來啟動一組相關(guān)的操作,使用閉鎖來等待一組事件的執(zhí)行。閉鎖是一種一次性對象,一旦進入終止狀態(tài)后,就不能被 重置。

          Barrier 的特點和閉鎖也很類似,它也是阻塞一組線程直到某個事件發(fā)生。柵欄與閉鎖的區(qū)別在于,所有線程必須同時到達柵欄的位置,才能繼續(xù)執(zhí)行,就像我們上面操作系統(tǒng)給出的這幅圖一樣。

          ABCD 四條線程,必須同時到達 Barrier,然后 手牽手一起走過幸福的殿堂。

          當線程到達 Barrier 的位置時會調(diào)用 await 方法,這個方法會阻塞直到所有線程都到達 Barrier 的位置,如果所有線程都到達 Barrier 的位置,那么 Barrier 將會打開使所有線程都被釋放,而 Barrier 將被重置以等待下次使用。如果調(diào)用 await 方法導(dǎo)致超時,或者 await 阻塞的線程被中斷,那么 Barrier 就被認為被打破,所有阻塞的 await 都會拋出 BrokenBarrierException 。如果成功通過柵欄后,await 方法返回一個唯一索引號,可以利用這些索引號選舉一個新的 leader,來處理一下其他工作。

          public?class?TCyclicBarrier?{

          ????public?static?void?main(String[]?args)?{

          ????????Runnable?runnable?=?()?->?System.out.println("Barrier?1?開始...");

          ????????Runnable?runnable2?=?()?->?System.out.println("Barrier?2?開始...");

          ????????CyclicBarrier?barrier1?=?new?CyclicBarrier(2,runnable);
          ????????CyclicBarrier?barrier2?=?new?CyclicBarrier(2,runnable2);

          ????????CyclicBarrierRunnable?b1?=?new?CyclicBarrierRunnable(barrier1,barrier2);
          ????????CyclicBarrierRunnable?b2?=?new?CyclicBarrierRunnable(barrier1,barrier2);

          ????????new?Thread(b1).start();
          ????????new?Thread(b2).start();
          ????}

          }

          class?CyclicBarrierRunnable?implements?Runnable?{

          ????CyclicBarrier?barrier1;
          ????CyclicBarrier?barrier2;

          ????public?CyclicBarrierRunnable(CyclicBarrier?barrier1,CyclicBarrier?barrier2){
          ????????this.barrier1?=?barrier1;
          ????????this.barrier2?=?barrier2;
          ????}

          ????@Override
          ????public?void?run()?{

          ????????try?{
          ????????????Thread.sleep(1000);
          ????????????System.out.println(Thread.currentThread().getName()?+?"等待?barrier1"?);
          ????????????barrier1.await();

          ????????????Thread.sleep(1000);
          ????????????System.out.println(Thread.currentThread().getName()?+?"等待?barrier2"?);
          ????????????barrier2.await();

          ????????}?catch?(InterruptedException?|?BrokenBarrierException?e)?{
          ????????????e.printStackTrace();
          ????????}

          ????????System.out.println(Thread.currentThread().getName()?+
          ????????????????"?做完了!");

          ????}
          }

          Exchanger

          與 Barrier 相關(guān)聯(lián)的還有一個工具類就是 Exchanger, Exchanger 是一個用于線程間協(xié)作的工具類。Exchanger用于進行線程間的數(shù)據(jù)交換。

          它提供一個同步點,在這個同步點兩個線程可以交換彼此的數(shù)據(jù)。這兩個線程通過exchange 方法交換數(shù)據(jù), 如果第一個線程先執(zhí)行 exchange方法,它會一直等待第二個線程也執(zhí)行 exchange,當兩個線程都到達同步點時,這兩個線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對方。因此使用Exchanger 的重點是成對的線程使用 exchange() 方法,當有一對線程達到了同步點,就會進行交換數(shù)據(jù)。因此該工具類的線程對象是成對的。

          下面通過一段例子代碼來講解一下

          public?class?TExchanger?{

          ????public?static?void?main(String[]?args)?{

          ????????Exchanger?exchanger?=?new?Exchanger();

          ????????ExchangerRunnable?exchangerRunnable?=?new?ExchangerRunnable(exchanger,"A");
          ????????ExchangerRunnable?exchangerRunnable2?=?new?ExchangerRunnable(exchanger,"B");

          ????????new?Thread(exchangerRunnable).start();
          ????????new?Thread(exchangerRunnable2).start();
          ????}
          }


          class?ExchangerRunnable?implements?Runnable?{

          ????Exchanger?exchanger;
          ????Object?object;

          ????public?ExchangerRunnable(Exchanger?exchanger,Object?object){
          ????????this.exchanger?=?exchanger;
          ????????this.object?=?object;
          ????}


          ????@Override
          ????public?void?run()?{

          ????????Object?previous?=?object;

          ????????try?{
          ????????????object?=?this.exchanger.exchange(object);
          ????????????System.out.println(
          ????????????????????Thread.currentThread().getName()?+?"改變前是"?+?previous?+?"改變后是"?+?object);
          ????????}?catch?(InterruptedException?e)?{
          ????????????e.printStackTrace();
          ????????}

          ????}
          }

          總結(jié)

          本篇文章我們從同步容器類入手,主要講了 fail-fastfail-safe 機制,這兩個機制在并發(fā)編程中非常重要。然后我們從操作系統(tǒng)的角度,聊了聊操作系統(tǒng)層面實現(xiàn)安全性的幾種方式,然后從操作系統(tǒng) -> 并發(fā)我們聊了聊 Java 中的并發(fā)工具包有哪些,以及構(gòu)建并發(fā)的幾種工具類。

          如果這篇文章幫助到你,小伙伴們不妨點贊、再看、轉(zhuǎn)發(fā),三連是讓我繼續(xù)更文的最大動力!


          往期精選

          我的文章是最棒的狗糧

          今天懟我的面試官竟然和我在同一個群里...

          寫給自己的 25 歲

          2w字 + 40張圖帶你參透并發(fā)編程!

          完了,這個硬件成精了,它竟然繞過了 CPU......


          瀏覽 44
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线三级在线观看网站 | 激情无码网站 | 99玖玖视频 | 青娱乐在线免费视频 | 国产av一区二区三本 |