<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          用Java自己動(dòng)手實(shí)現(xiàn)一個(gè)阻塞隊(duì)列

          共 75076字,需瀏覽 151分鐘

           ·

          2021-04-25 10:04

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

            作者 |  小熊餐館

          來源 |  urlify.cn/IbIB7j

          1. 阻塞隊(duì)列介紹

                顧名思義,阻塞隊(duì)列是一個(gè)具備先進(jìn)先出特性的隊(duì)列結(jié)構(gòu),從隊(duì)列末尾插入數(shù)據(jù),從隊(duì)列頭部取出數(shù)據(jù)。而阻塞隊(duì)列與普通隊(duì)列的最大不同在于阻塞隊(duì)列提供了阻塞式的同步插入、取出數(shù)據(jù)的功能(阻塞入隊(duì)put/阻塞出隊(duì)take)。

                使用put插入數(shù)據(jù)時(shí),如果隊(duì)列空間已滿并不直接返回,而是令當(dāng)前操作的線程陷入阻塞態(tài)(生產(chǎn)者線程),等待著阻塞隊(duì)列中的元素被其它線程(消費(fèi)者線程)取走,令隊(duì)列重新變得不滿時(shí)被喚醒再次嘗試插入數(shù)據(jù)。使用take取出數(shù)據(jù)時(shí),如果隊(duì)列空間為空并不直接返回,而是令當(dāng)前操作的線程陷入阻塞態(tài)(消費(fèi)者線程),等待其它線程(生產(chǎn)者線程)插入新元素,令隊(duì)列非空時(shí)被喚醒再次嘗試取出數(shù)據(jù)。

                阻塞隊(duì)列主要用于解決并發(fā)場景下消費(fèi)者線程與生產(chǎn)者線程處理速度不一致的問題。例如jdk的線程池實(shí)現(xiàn)中,線程池核心線程(消費(fèi)者線程)處理速度一定的情況下,如果業(yè)務(wù)方線程提交的任務(wù)過多導(dǎo)致核心線程處理不過來時(shí),將任務(wù)暫時(shí)放進(jìn)阻塞隊(duì)列等待核心線程消費(fèi)(阻塞隊(duì)列未滿);由于核心線程常駐的原因,當(dāng)業(yè)務(wù)方線程提交的任務(wù)較少,核心線程消費(fèi)速度高于業(yè)務(wù)方生產(chǎn)速度時(shí),核心線程作為消費(fèi)者會(huì)阻塞在阻塞隊(duì)列的take方法中,避免無謂的浪費(fèi)cpu資源。

                由于阻塞隊(duì)列在內(nèi)部實(shí)現(xiàn)了協(xié)調(diào)生產(chǎn)者/消費(fèi)者的機(jī)制而不需要外部使用者過多的考慮并發(fā)同步問題,極大的降低了生產(chǎn)者/消費(fèi)者場景下程序的復(fù)雜度。

          2. 自己實(shí)現(xiàn)阻塞隊(duì)列    

                下面我們自己動(dòng)手一步步的實(shí)現(xiàn)幾個(gè)不同版本、效率由低到高的的阻塞隊(duì)列,來加深對阻塞隊(duì)列工作原理的理解。

          阻塞隊(duì)列接口

                為了降低復(fù)雜度,我們的阻塞隊(duì)列只提供最基礎(chǔ)的出隊(duì)、入隊(duì)和判空接口。

          /**
           * 阻塞隊(duì)列
           * 1. 首先是一個(gè)先進(jìn)先出的隊(duì)列
           * 2. 提供特別的api,在入隊(duì)時(shí)如果隊(duì)列已滿令當(dāng)前操作線程阻塞;在出隊(duì)時(shí)如果隊(duì)列為空令當(dāng)前操作線程阻塞
           * 3. 單個(gè)元素的插入、刪除操作是線程安全的
           */
          public interface MyBlockingQueue<E> {
             /**
               * 插入特定元素e,加入隊(duì)尾
               * 隊(duì)列已滿時(shí)阻塞當(dāng)前線程,直到隊(duì)列中元素被其它線程刪除并插入成功
               * */
              void put(E e) throws InterruptedException;

              /**
               * 隊(duì)列頭部的元素出隊(duì)(返回頭部元素,將其從隊(duì)列中刪除)
               * 隊(duì)列為空時(shí)阻塞當(dāng)前線程,直到隊(duì)列被其它元素插入新元素并出隊(duì)成功
               * */
              E take() throws InterruptedException;

              /**
               * 隊(duì)列是否為空
               * */
              boolean isEmpty();
          }

          2.1 v1版本(最基本的隊(duì)列實(shí)現(xiàn))

                博客中所實(shí)現(xiàn)的阻塞隊(duì)列底層是使用數(shù)組承載數(shù)據(jù)的(ArrayBlockingQueue),內(nèi)部提供了私有方法enqueue和dequeue來實(shí)現(xiàn)原始的內(nèi)部入隊(duì)和出隊(duì)操作。

                最初始的v1版本中,我們只實(shí)現(xiàn)最基本的FIFO隊(duì)列功能,其put和take方法只是簡單的調(diào)用了enqueue和dequeue,因此v1版本中其入隊(duì)、出隊(duì)不是阻塞的,也無法保障線程安全,十分簡陋。

                后續(xù)的版本中,我們會(huì)以v1版本為基礎(chǔ),實(shí)現(xiàn)阻塞調(diào)用以及線程安全的特性,并且對所實(shí)現(xiàn)的阻塞隊(duì)列性能進(jìn)行不斷的優(yōu)化。

          /**
           * 數(shù)組作為底層結(jié)構(gòu)的阻塞隊(duì)列 v1版本
           */
          public class MyArrayBlockingQueueV1<E> implements MyBlockingQueue<E> {

              /**
               * 隊(duì)列默認(rèn)的容量大小
               * */
              private static final int DEFAULT_CAPACITY = 16;

              /**
               * 承載隊(duì)列元素的底層數(shù)組
               * */
              private final Object[] elements;

              /**
               * 當(dāng)前頭部元素的下標(biāo)
               * */
              private int head;

              /**
               * 下一個(gè)元素插入時(shí)的下標(biāo)
               * */
              private int tail;

              /**
               * 隊(duì)列中元素個(gè)數(shù)
               * */
              private int count;

              //=================================================構(gòu)造方法======================================================
              public MyArrayBlockingQueueV1() {
                  // 設(shè)置數(shù)組大小為默認(rèn)
                  this.elements = new Object[DEFAULT_CAPACITY];

                  // 初始化隊(duì)列 頭部,尾部下標(biāo)
                  this.head = 0;
                  this.tail = 0;
              }

              public MyArrayBlockingQueueV1(int initCapacity) {
                  assert initCapacity > 0;
                  this.elements = new Object[initCapacity];

                  // 初始化隊(duì)列 頭部,尾部下標(biāo)
                  this.head = 0;
                  this.tail = 0;
              }

              /**
               * 下標(biāo)取模
               * */
              private int getMod(int logicIndex){
                  int innerArrayLength = this.elements.length;

                  // 由于隊(duì)列下標(biāo)邏輯上是循環(huán)的
                  if(logicIndex < 0){
                      // 當(dāng)邏輯下標(biāo)小于零時(shí)

                      // 真實(shí)下標(biāo) = 邏輯下標(biāo) + 加上當(dāng)前數(shù)組長度
                      return logicIndex + innerArrayLength;
                  } else if(logicIndex >= innerArrayLength){
                      // 當(dāng)邏輯下標(biāo)大于數(shù)組長度時(shí)

                      // 真實(shí)下標(biāo) = 邏輯下標(biāo) - 減去當(dāng)前數(shù)組長度
                      return logicIndex - innerArrayLength;
                  } else {
                      // 真實(shí)下標(biāo) = 邏輯下標(biāo)
                      return logicIndex;
                  }
              }

              /**
               * 入隊(duì)
               * */
              private void enqueue(E e){
                  // 存放新插入的元素
                  this.elements[this.tail] = e;
                  // 尾部插入新元素后 tail下標(biāo)后移一位
                  this.tail = getMod(this.tail + 1);

                  this.count++;
              }

              /**
               * 出隊(duì)
               * */
              private E dequeue(){
                  // 暫存需要被刪除的數(shù)據(jù)
                  E dataNeedRemove = (E)this.elements[this.head];
                  // 將當(dāng)前頭部元素引用釋放
                  this.elements[this.head] = null;
                  // 頭部下標(biāo) 后移一位
                  this.head = getMod(this.head + 1);

                  this.count--;

                  return dataNeedRemove;
              }

              @Override
              public void put(E e){
                  enqueue(e);
              }

              @Override
              public E take() {
                 return dequeue();
              }

              @Override
              public boolean isEmpty() {
                  return this.count == 0;
              }
          }

           2.2 v2版本(實(shí)現(xiàn)同步阻塞和線程安全的特性)

               前面提到阻塞調(diào)用的出隊(duì)、入隊(duì)的功能是阻塞隊(duì)列區(qū)別于普通隊(duì)列的關(guān)鍵特性。阻塞調(diào)用實(shí)現(xiàn)的方式有很多,其中最容易理解的一種方式便是無限循環(huán)的輪詢,直到出隊(duì)/入隊(duì)成功(雖然cpu效率很低)。

               v2版本在v1的基礎(chǔ)上,使用無限循環(huán)加定時(shí)休眠的方式簡單的實(shí)現(xiàn)了同步調(diào)用時(shí)阻塞的特性。并且在put/take內(nèi)增加了synchronized塊將入隊(duì)/出隊(duì)代碼包裹起來,阻止多個(gè)線程并發(fā)的操作隊(duì)列而產(chǎn)生線程安全問題。

          v2版本入隊(duì)方法實(shí)現(xiàn): 

          @Override
              public void put(E e) throws InterruptedException {
                  while (true) {
                      synchronized (this) {
                          // 隊(duì)列未滿時(shí)執(zhí)行入隊(duì)操作
                          if (count != elements.length) {
                              // 入隊(duì),并返回
                              enqueue(e);
                              return;
                          }
                      }

                      // 隊(duì)列已滿,休眠一段時(shí)間后重試
                      Thread.sleep(100L);
                  }
              }

          v2版本出隊(duì)方法實(shí)現(xiàn):

          @Override
              public E take() throws InterruptedException {
                  while (true) {
                      synchronized (this) {
                          // 隊(duì)列非空時(shí)執(zhí)行出隊(duì)操作
                          if (count != 0) {
                              // 出隊(duì)并立即返回
                              return dequeue();
                          }
                      }

                      // 隊(duì)列為空的情況下,休眠一段時(shí)間后重試
                      Thread.sleep(100L);
                  }
              }

          v2版本完整代碼:

          /**
           * 數(shù)組作為底層結(jié)構(gòu)的阻塞隊(duì)列 v2版本
           */
          public class MyArrayBlockingQueueV2<E> implements MyBlockingQueue<E> {

              /**
               * 隊(duì)列默認(rèn)的容量大小
               * */
              private static final int DEFAULT_CAPACITY = 16;

              /**
               * 承載隊(duì)列元素的底層數(shù)組
               * */
              private final Object[] elements;

              /**
               * 當(dāng)前頭部元素的下標(biāo)
               * */
              private int head;

              /**
               * 下一個(gè)元素插入時(shí)的下標(biāo)
               * */
              private int tail;

              /**
               * 隊(duì)列中元素個(gè)數(shù)
               * */
              private int count;

              //=================================================構(gòu)造方法======================================================
              /**
               * 默認(rèn)構(gòu)造方法
               * */
              public MyArrayBlockingQueueV2() {
                  // 設(shè)置數(shù)組大小為默認(rèn)
                  this.elements = new Object[DEFAULT_CAPACITY];

                  // 初始化隊(duì)列 頭部,尾部下標(biāo)
                  this.head = 0;
                  this.tail = 0;
              }

              /**
               * 默認(rèn)構(gòu)造方法
               * */
              public MyArrayBlockingQueueV2(int initCapacity) {
                  assert initCapacity > 0;

                  // 設(shè)置數(shù)組大小為默認(rèn)
                  this.elements = new Object[initCapacity];

                  // 初始化隊(duì)列 頭部,尾部下標(biāo)
                  this.head = 0;
                  this.tail = 0;
              }

              /**
               * 下標(biāo)取模
               * */
              private int getMod(int logicIndex){
                  int innerArrayLength = this.elements.length;

                  // 由于隊(duì)列下標(biāo)邏輯上是循環(huán)的
                  if(logicIndex < 0){
                      // 當(dāng)邏輯下標(biāo)小于零時(shí)

                      // 真實(shí)下標(biāo) = 邏輯下標(biāo) + 加上當(dāng)前數(shù)組長度
                      return logicIndex + innerArrayLength;
                  } else if(logicIndex >= innerArrayLength){
                      // 當(dāng)邏輯下標(biāo)大于數(shù)組長度時(shí)

                      // 真實(shí)下標(biāo) = 邏輯下標(biāo) - 減去當(dāng)前數(shù)組長度
                      return logicIndex - innerArrayLength;
                  } else {
                      // 真實(shí)下標(biāo) = 邏輯下標(biāo)
                      return logicIndex;
                  }
              }

              /**
               * 入隊(duì)
               * */
              private void enqueue(E e){
                  // 存放新插入的元素
                  this.elements[this.tail] = e;
                  // 尾部插入新元素后 tail下標(biāo)后移一位
                  this.tail = getMod(this.tail + 1);

                  this.count++;
              }

              /**
               * 出隊(duì)
               * */
              private E dequeue(){
                  // 暫存需要被刪除的數(shù)據(jù)
                  E dataNeedRemove = (E)this.elements[this.head];
                  // 將當(dāng)前頭部元素引用釋放
                  this.elements[this.head] = null;
                  // 頭部下標(biāo) 后移一位
                  this.head = getMod(this.head + 1);

                  this.count--;

                  return dataNeedRemove;
              }

              @Override
              public void put(E e) throws InterruptedException {
                  while (true) {
                      synchronized (this) {
                          // 隊(duì)列未滿時(shí)執(zhí)行入隊(duì)操作
                          if (count != elements.length) {
                              // 入隊(duì),并返回
                              enqueue(e);
                              return;
                          }
                      }

                      // 隊(duì)列已滿,休眠一段時(shí)間后重試
                      Thread.sleep(100L);
                  }
              }

              @Override
              public E take() throws InterruptedException {
                  while (true) {
                      synchronized (this) {
                          // 隊(duì)列非空時(shí)執(zhí)行出隊(duì)操作
                          if (count != 0) {
                              // 出隊(duì)并立即返回
                              return dequeue();
                          }
                      }

                      // 隊(duì)列為空的情況下,休眠一段時(shí)間后重試
                      Thread.sleep(100L);
                  }
              }

              @Override
              public boolean isEmpty() {
                  return this.count == 0;
              }
          }

          2.3 v3版本(引入條件變量優(yōu)化無限循環(huán)輪詢)

              在有大量線程競爭的情況下,v2版本無限循環(huán)加休眠的阻塞方式存在兩個(gè)嚴(yán)重的問題。

          無限循環(huán)輪詢的缺陷

                1. 線程周期性的休眠/喚醒會(huì)造成頻繁的發(fā)生線程上下文切換,非常浪費(fèi)cpu資源

                2. 線程在嘗試操作失敗被阻塞時(shí)(嘗試入隊(duì)時(shí)隊(duì)列已滿、嘗試出隊(duì)時(shí)隊(duì)列為空),如果休眠時(shí)間設(shè)置的太短,則休眠/喚醒的次數(shù)會(huì)非常多,cpu性能低下;但如果休眠的時(shí)間設(shè)置的較長,則會(huì)導(dǎo)致被阻塞線程在隊(duì)列狀態(tài)發(fā)生變化時(shí)無法及時(shí)的響應(yīng)

                舉個(gè)例子:某一生產(chǎn)者線程在入隊(duì)時(shí)發(fā)現(xiàn)隊(duì)列已滿,當(dāng)前線程休眠1s,在0.1s之后一個(gè)消費(fèi)者線程取走了一個(gè)元素,而此時(shí)休眠的生產(chǎn)者線程還需要白白等待0.9s后才被喚醒并感知到隊(duì)列未滿而接著執(zhí)行入隊(duì)操作。綜上所述,無限循環(huán)加休眠的v2版本阻塞隊(duì)列其性能極差,需要進(jìn)一步的優(yōu)化。

          使用條件變量進(jìn)行優(yōu)化

                為了解決上述循環(huán)休眠浪費(fèi)cpu和隊(duì)列狀態(tài)發(fā)生變化時(shí)(已滿到未滿,已空到未空)被阻塞線程無法及時(shí)響應(yīng)的問題,v3版本引入條件變量對其進(jìn)行優(yōu)化。

                條件變量由底層的操作系統(tǒng)內(nèi)核實(shí)現(xiàn)的、用于線程間同步的利器。(條件變量的實(shí)現(xiàn)原理可以參考我之前的博客:https://www.cnblogs.com/xiaoxiongcanguan/p/14152830.html)

                java將不同操作系統(tǒng)內(nèi)核提供的條件變量機(jī)制抽象封裝后,作為可重入鎖ReentrantLock的附屬給程序員使用。且為了避免lost wakeup問題,在條件變量的實(shí)現(xiàn)中增加了校驗(yàn),要求調(diào)用條件變量的signal和await方法時(shí)當(dāng)前線程必須先獲得條件變量所附屬的鎖才行,更具體的解析可以參考這篇文章:https://mp.weixin.qq.com/s/ohcr6T1aB7-lVFJIfyJZjA。

                引入條件變量后,可以令未滿足某種條件的線程暫時(shí)進(jìn)入阻塞態(tài),等待在一個(gè)條件變量上;當(dāng)對應(yīng)條件滿足時(shí)由其它的線程將等待在條件變量上的線程喚醒,將其從阻塞態(tài)再切換回就緒態(tài)。

                舉個(gè)例子:當(dāng)某一生產(chǎn)者線程想要插入新元素但阻塞隊(duì)列已滿時(shí),可以令當(dāng)前生產(chǎn)者線程等待并阻塞在對應(yīng)的條件變量中;當(dāng)后續(xù)某一消費(fèi)者線程執(zhí)行出隊(duì)操作使得隊(duì)列非空后,將等待在條件變量上的生產(chǎn)者線程喚醒,被喚醒的生產(chǎn)者線程便能及時(shí)的再次嘗試進(jìn)行入隊(duì)操作。

                v3和v2版本相比,等待在條件變量進(jìn)入阻塞態(tài)的線程不再周期性的被喚醒而占用過多的cpu資源,且在特定條件滿足時(shí)也能被及時(shí)喚醒。

                引入條件變量后的v3版本阻塞隊(duì)列效率比v2高出許多。

          v3版本完整代碼:

          /**
           * 數(shù)組作為底層結(jié)構(gòu)的阻塞隊(duì)列 v3版本
           */
          public class MyArrayBlockingQueueV3<E> implements MyBlockingQueue<E> {

              /**
               * 隊(duì)列默認(rèn)的容量大小
               * */
              private static final int DEFAULT_CAPACITY = 16;

              /**
               * 承載隊(duì)列元素的底層數(shù)組
               * */
              private final Object[] elements;

              /**
               * 當(dāng)前頭部元素的下標(biāo)
               * */
              private int head;

              /**
               * 下一個(gè)元素插入時(shí)的下標(biāo)
               * */
              private int tail;

              /**
               * 隊(duì)列中元素個(gè)數(shù)
               * */
              private int count;

              private final ReentrantLock reentrantLock;

              private final Condition condition;

              //=================================================構(gòu)造方法======================================================
              /**
               * 默認(rèn)構(gòu)造方法
               * */
              public MyArrayBlockingQueueV3() {
                 this(DEFAULT_CAPACITY);
              }

              /**
               * 默認(rèn)構(gòu)造方法
               * */
              public MyArrayBlockingQueueV3(int initCapacity) {
                  assert initCapacity > 0;

                  // 設(shè)置數(shù)組大小為默認(rèn)
                  this.elements = new Object[initCapacity];

                  // 初始化隊(duì)列 頭部,尾部下標(biāo)
                  this.head = 0;
                  this.tail = 0;

                  this.reentrantLock = new ReentrantLock();
                  this.condition = this.reentrantLock.newCondition();
              }

              /**
               * 下標(biāo)取模
               * */
              private int getMod(int logicIndex){
                  int innerArrayLength = this.elements.length;

                  // 由于隊(duì)列下標(biāo)邏輯上是循環(huán)的
                  if(logicIndex < 0){
                      // 當(dāng)邏輯下標(biāo)小于零時(shí)

                      // 真實(shí)下標(biāo) = 邏輯下標(biāo) + 加上當(dāng)前數(shù)組長度
                      return logicIndex + innerArrayLength;
                  } else if(logicIndex >= innerArrayLength){
                      // 當(dāng)邏輯下標(biāo)大于數(shù)組長度時(shí)

                      // 真實(shí)下標(biāo) = 邏輯下標(biāo) - 減去當(dāng)前數(shù)組長度
                      return logicIndex - innerArrayLength;
                  } else {
                      // 真實(shí)下標(biāo) = 邏輯下標(biāo)
                      return logicIndex;
                  }
              }

              /**
               * 入隊(duì)
               * */
              private void enqueue(E e){
                  // 存放新插入的元素
                  this.elements[this.tail] = e;
                  // 尾部插入新元素后 tail下標(biāo)后移一位
                  this.tail = getMod(this.tail + 1);

                  this.count++;
              }

              /**
               * 出隊(duì)
               * */
              private E dequeue(){
                  // 暫存需要被刪除的數(shù)據(jù)
                  E dataNeedRemove = (E)this.elements[this.head];
                  // 將當(dāng)前頭部元素引用釋放
                  this.elements[this.head] = null;
                  // 頭部下標(biāo) 后移一位
                  this.head = getMod(this.head + 1);

                  this.count--;

                  return dataNeedRemove;
              }

              @Override
              public void put(E e) throws InterruptedException {
                  // 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
                  reentrantLock.lockInterruptibly();
                  try {
                      // 因?yàn)楸幌M(fèi)者喚醒后可能會(huì)被其它的生產(chǎn)者再度填滿隊(duì)列,需要循環(huán)的判斷
                      while (this.count == elements.length) {
                          // put操作時(shí),如果隊(duì)列已滿則進(jìn)入條件變量的等待隊(duì)列,并釋放條件變量對應(yīng)的鎖
                          condition.await();
                      }
                      // 走到這里,說明當(dāng)前隊(duì)列不滿,可以執(zhí)行入隊(duì)操作
                      enqueue(e);

                      // 喚醒可能等待著的消費(fèi)者線程
                      // 由于共用了一個(gè)condition,所以不能用signal,否則一旦喚醒的也是生產(chǎn)者線程就會(huì)陷入上面的while死循環(huán))
                      condition.signalAll();
                  } finally {
                      // 入隊(duì)完畢,釋放鎖
                      reentrantLock.unlock();
                  }
              }

              @Override
              public E take() throws InterruptedException {
                  // 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
                  reentrantLock.lockInterruptibly();

                  try {
                      // 因?yàn)楸簧a(chǎn)者喚醒后可能會(huì)被其它的消費(fèi)者消費(fèi)而使得隊(duì)列再次為空,需要循環(huán)的判斷
                      while(this.count == 0){
                          condition.await();
                      }

                      E headElement = dequeue();

                      // 喚醒可能等待著的生產(chǎn)者線程
                      // 由于共用了一個(gè)condition,所以不能用signal,否則一旦喚醒的也是消費(fèi)者線程就會(huì)陷入上面的while死循環(huán))
                      condition.signalAll();

                      return headElement;
                  } finally {
                      // 出隊(duì)完畢,釋放鎖
                      reentrantLock.unlock();
                  }
              }

              @Override
              public boolean isEmpty() {
                  return this.count == 0;
              }
          }

          2.4 v4版本(引入雙條件變量,優(yōu)化喚醒效率)

                v3版本通過引入條件變量解決了v2版本中循環(huán)休眠、喚醒效率低下的問題,但v3版本還是存在一定的性能問題。

          v3版本中signalAll的效率問題

                jdk的Condition條件變量提供了signal和signalAll這兩個(gè)方法用于喚醒等待在條件變量中的線程,其中signalAll會(huì)喚醒等待在條件變量上的所有線程,而signal則只會(huì)喚醒其中一個(gè)。

                舉個(gè)例子,v3版本中消費(fèi)者線程在隊(duì)列已滿時(shí)進(jìn)行出隊(duì)操作后,通過signalAll會(huì)喚醒所有等待入隊(duì)的多個(gè)生產(chǎn)者線程,但最終只會(huì)有一個(gè)線程成功競爭到互斥鎖并成功執(zhí)行入隊(duì)操作,其它的生產(chǎn)者線程在被喚醒后發(fā)現(xiàn)隊(duì)列依然是滿的,而繼續(xù)等待。v3版本中的signalAll喚醒操作造成了驚群效應(yīng),無意義的喚醒了過多的等待中的線程。

                但僅僅將v3版本中的signalAll改成signal是不行的,因?yàn)樯a(chǎn)者和消費(fèi)者線程是等待在同一個(gè)條件變量中的,如果消費(fèi)者在出隊(duì)后通過signal喚醒的不是與之對應(yīng)的生產(chǎn)者線程,而是另一個(gè)消費(fèi)者線程,則本該被喚醒的生產(chǎn)者線程可能遲遲無法被喚醒,甚至在一些場景下會(huì)永遠(yuǎn)被阻塞,無法再喚醒。

                仔細(xì)思索后可以發(fā)現(xiàn),對于生產(chǎn)者線程其在隊(duì)列已滿時(shí)阻塞等待,等待的是隊(duì)列不滿的條件(notFull);而對于消費(fèi)者線程其在隊(duì)列為空時(shí)阻塞等待,等待的是隊(duì)列不空的條件(notEmpty)。隊(duì)列不滿和隊(duì)列不空實(shí)質(zhì)上是兩個(gè)互不相關(guān)的條件。

                因此v4版本中將生產(chǎn)者線程和消費(fèi)者線程關(guān)注的條件變量拆分成兩個(gè):生產(chǎn)者線程在隊(duì)列已滿時(shí)阻塞等待在notFull條件變量上,消費(fèi)者線程出隊(duì)后通過notFull.signal嘗試著喚醒一個(gè)等待的生產(chǎn)者線程;與之相對的,消費(fèi)者線程在隊(duì)列為空時(shí)阻塞等待在notEmpty條件變量上,生產(chǎn)者線程入隊(duì)后通過notEmpty.signal嘗試著喚醒一個(gè)等待的消費(fèi)者線程。

                通過拆分出兩個(gè)互相獨(dú)立的條件變量,v4版本避免了v3版本中signalAll操作帶來的驚群效應(yīng),避免了signalAll操作無效喚醒帶來的額外開銷。

          v4版本完整代碼:

          /**
           * 數(shù)組作為底層結(jié)構(gòu)的阻塞隊(duì)列 v4版本
           */
          public class MyArrayBlockingQueueV4<E> implements MyBlockingQueue<E> {

              /**
               * 隊(duì)列默認(rèn)的容量大小
               * */
              private static final int DEFAULT_CAPACITY = 16;

              /**
               * 承載隊(duì)列元素的底層數(shù)組
               * */
              private final Object[] elements;

              /**
               * 當(dāng)前頭部元素的下標(biāo)
               * */
              private int head;

              /**
               * 下一個(gè)元素插入時(shí)的下標(biāo)
               * */
              private int tail;

              /**
               * 隊(duì)列中元素個(gè)數(shù)
               * */
              private int count;

              private final ReentrantLock reentrantLock;

              private final Condition notEmpty;

              private final Condition notFull;


              //=================================================構(gòu)造方法======================================================
              /**
               * 默認(rèn)構(gòu)造方法
               * */
              public MyArrayBlockingQueueV4() {
                 this(DEFAULT_CAPACITY);
              }

              /**
               * 默認(rèn)構(gòu)造方法
               * */
              public MyArrayBlockingQueueV4(int initCapacity) {
                  assert initCapacity > 0;

                  // 設(shè)置數(shù)組大小為默認(rèn)
                  this.elements = new Object[initCapacity];

                  // 初始化隊(duì)列 頭部,尾部下標(biāo)
                  this.head = 0;
                  this.tail = 0;

                  this.reentrantLock = new ReentrantLock();
                  this.notEmpty = this.reentrantLock.newCondition();
                  this.notFull = this.reentrantLock.newCondition();
              }

              /**
               * 下標(biāo)取模
               * */
              private int getMod(int logicIndex){
                  int innerArrayLength = this.elements.length;

                  // 由于隊(duì)列下標(biāo)邏輯上是循環(huán)的
                  if(logicIndex < 0){
                      // 當(dāng)邏輯下標(biāo)小于零時(shí)

                      // 真實(shí)下標(biāo) = 邏輯下標(biāo) + 加上當(dāng)前數(shù)組長度
                      return logicIndex + innerArrayLength;
                  } else if(logicIndex >= innerArrayLength){
                      // 當(dāng)邏輯下標(biāo)大于數(shù)組長度時(shí)

                      // 真實(shí)下標(biāo) = 邏輯下標(biāo) - 減去當(dāng)前數(shù)組長度
                      return logicIndex - innerArrayLength;
                  } else {
                      // 真實(shí)下標(biāo) = 邏輯下標(biāo)
                      return logicIndex;
                  }
              }

              /**
               * 入隊(duì)
               * */
              private void enqueue(E e){
                  // 存放新插入的元素
                  this.elements[this.tail] = e;
                  // 尾部插入新元素后 tail下標(biāo)后移一位
                  this.tail = getMod(this.tail + 1);

                  this.count++;
              }

              /**
               * 出隊(duì)
               * */
              private E dequeue(){
                  // 暫存需要被刪除的數(shù)據(jù)
                  E dataNeedRemove = (E)this.elements[this.head];
                  // 將當(dāng)前頭部元素引用釋放
                  this.elements[this.head] = null;
                  // 頭部下標(biāo) 后移一位
                  this.head = getMod(this.head + 1);

                  this.count--;

                  return dataNeedRemove;
              }

              @Override
              public void put(E e) throws InterruptedException {
                  // 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
                  reentrantLock.lockInterruptibly();
                  try {
                      // 因?yàn)楸幌M(fèi)者喚醒后可能會(huì)被其它的生產(chǎn)者再度填滿隊(duì)列,需要循環(huán)的判斷
                      while (this.count == elements.length) {
                          // put操作時(shí),如果隊(duì)列已滿則進(jìn)入notFull條件變量的等待隊(duì)列,并釋放條件變量對應(yīng)的互斥鎖
                          notFull.await();
                          // 消費(fèi)者進(jìn)行出隊(duì)操作時(shí)
                      }
                      // 走到這里,說明當(dāng)前隊(duì)列不滿,可以執(zhí)行入隊(duì)操作
                      enqueue(e);

                      // 喚醒可能等待在notEmpty中的一個(gè)消費(fèi)者線程
                      notEmpty.signal();
                  } finally {
                      // 入隊(duì)完畢,釋放鎖
                      reentrantLock.unlock();
                  }
              }

              @Override
              public E take() throws InterruptedException {
                  // 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
                  reentrantLock.lockInterruptibly();

                  try {
                      // 因?yàn)楸簧a(chǎn)者喚醒后可能會(huì)被其它的消費(fèi)者消費(fèi)而使得隊(duì)列再次為空,需要循環(huán)的判斷
                      while(this.count == 0){
                          notEmpty.await();
                      }

                      E headElement = dequeue();

                      // 喚醒可能等待在notFull中的一個(gè)生產(chǎn)者線程
                      notFull.signal();

                      return headElement;
                  } finally {
                      // 出隊(duì)完畢,釋放鎖
                      reentrantLock.unlock();
                  }
              }

              @Override
              public boolean isEmpty() {
                  return this.count == 0;
              }
          }

          2.5 v5版本(引入雙鎖令生產(chǎn)者和消費(fèi)者能并發(fā)操作阻塞隊(duì)列)

                v4版本的阻塞隊(duì)列采用雙條件變量之后,其性能已經(jīng)不錯(cuò)了,但仍存在進(jìn)一步優(yōu)化的空間。

          v4版本單鎖的性能問題

                v4版本中阻塞隊(duì)列的出隊(duì)、入隊(duì)操作是使用同一個(gè)互斥鎖進(jìn)行并發(fā)同步的,這意味著生產(chǎn)者線程和消費(fèi)者線程無法并發(fā)工作,消費(fèi)者線程必須等待生產(chǎn)者線程操作完成退出臨界區(qū)之后才能繼續(xù)執(zhí)行,反之亦然。單鎖的設(shè)計(jì)在生產(chǎn)者和消費(fèi)者都很活躍的高并發(fā)場景下會(huì)一定程度限制阻塞隊(duì)列的吞吐量。

                因此v5版本在v4版本的基礎(chǔ)上,將出隊(duì)和入隊(duì)操作使用兩把鎖分別管理,使得生產(chǎn)者線程和消費(fèi)者線程可以并發(fā)的操作阻塞隊(duì)列,達(dá)到進(jìn)一步提高吞吐量的目的。

                使用兩把鎖分別控制出隊(duì)、入隊(duì)后,還需要一些調(diào)整來解決生產(chǎn)者/消費(fèi)者并發(fā)操作隊(duì)列所帶來的問題。

          存在并發(fā)問題的雙鎖版本出隊(duì)、入隊(duì)實(shí)現(xiàn)第一版(v4基礎(chǔ)上的微調(diào)):

          /**
                  this.takeLock = new ReentrantLock();
                  this.notEmpty = this.takeLock.newCondition();

                  this.putLock = new ReentrantLock();
                  this.notFull = this.putLock.newCondition();
          */

              @Override
              public void put(E e) throws InterruptedException {
                  // 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
                  putLock.lockInterruptibly();
                  try {
                      // 因?yàn)楸幌M(fèi)者喚醒后可能會(huì)被其它的生產(chǎn)者再度填滿隊(duì)列,需要循環(huán)的判斷
                      while (this.count == elements.length) {
                          // put操作時(shí),如果隊(duì)列已滿則進(jìn)入notFull條件變量的等待隊(duì)列,并釋放條件變量對應(yīng)的互斥鎖
                          notFull.await();
                      }
                      // 走到這里,說明當(dāng)前隊(duì)列不滿,可以執(zhí)行入隊(duì)操作
                      enqueue(e);

                      // 喚醒可能等待在notEmpty中的一個(gè)消費(fèi)者線程
                      notEmpty.signal();
                  } finally {
                      // 入隊(duì)完畢,釋放鎖
                      putLock.unlock();
                  }
              }

              @Override
              public E take() throws InterruptedException {
                  // 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
                  takeLock.lockInterruptibly();

                  try {
                      // 因?yàn)楸簧a(chǎn)者喚醒后可能會(huì)被其它的消費(fèi)者消費(fèi)而使得隊(duì)列再次為空,需要循環(huán)的判斷
                      while(this.count == 0){
                          notEmpty.await();
                      }

                      E headElement = dequeue();

                      // 喚醒可能等待在notFull中的一個(gè)生產(chǎn)者線程
                      notFull.signal();

                      return headElement;
                  } finally {
                      // 出隊(duì)完畢,釋放鎖
                      takeLock.unlock();
                  }
              }

                 上面基于v4版本微調(diào)的雙鎖實(shí)現(xiàn)雖然容易理解,但由于允許消費(fèi)者和生產(chǎn)者線程并發(fā)的訪問隊(duì)列而存在幾個(gè)嚴(yán)重問題。

          1. count屬性線程不安全

                隊(duì)列長度count字段是一個(gè)用于判斷隊(duì)列是否已滿,隊(duì)列是否為空的重要屬性。在v5之前的版本count屬性一直被唯一的同步鎖保護(hù)著,任意時(shí)刻至多只有一個(gè)線程可以進(jìn)入臨界區(qū)修改count的值。而引入雙鎖令消費(fèi)者線程/生產(chǎn)者線程能并發(fā)訪問后,count變量的自增/自減操作會(huì)出現(xiàn)線程不安全的問題。 

                解決方案:將int類型的count修改為AtomicInteger來解決生產(chǎn)者/消費(fèi)者同時(shí)訪問、修改count時(shí)導(dǎo)致的并發(fā)問題。

          2. 生產(chǎn)者/消費(fèi)者線程死鎖問題

                在上述的代碼示例中,生產(chǎn)者線程首先獲得生產(chǎn)者鎖去執(zhí)行入隊(duì)操作,然后喚醒可能阻塞在notEmpty上的消費(fèi)者線程。由于使用條件變量前首先需要獲得其所屬的互斥鎖,如果生產(chǎn)者線程不先釋放生產(chǎn)者鎖就去獲取消費(fèi)者的互斥鎖,那么就存在出現(xiàn)死鎖的風(fēng)險(xiǎn)。消費(fèi)者線程和生產(chǎn)者線程可以并發(fā)的先分別獲得消費(fèi)者鎖和生產(chǎn)者鎖,并且也同時(shí)嘗試著獲取另一把鎖,這樣雙方都在等待著對方釋放鎖,互相阻塞出現(xiàn)死鎖現(xiàn)象。 

                解決方案:先釋放已獲得的鎖之后再去獲得另一個(gè)鎖執(zhí)行喚醒操作

          存在并發(fā)問題的雙鎖版本出隊(duì)、入隊(duì)實(shí)現(xiàn)第二版(在上述第一版基礎(chǔ)上進(jìn)行微調(diào)):

          /**
              private final AtomicInteger count = new AtomicInteger();

              this.takeLock = new ReentrantLock();
              this.notEmpty = this.takeLock.newCondition();

              this.putLock = new ReentrantLock();
              this.notFull = this.putLock.newCondition();
          */

              @Override
              public void put(E e) throws InterruptedException {
                  int currentCount;
                  // 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
                  putLock.lockInterruptibly();
                  try {
                      // 因?yàn)楸幌M(fèi)者喚醒后可能會(huì)被其它的生產(chǎn)者再度填滿隊(duì)列,需要循環(huán)的判斷
                      while (count.get() == elements.length) {
                          // put操作時(shí),如果隊(duì)列已滿則進(jìn)入notFull條件變量的等待隊(duì)列,并釋放條件變量對應(yīng)的互斥鎖
                          notFull.await();
                          // 消費(fèi)者進(jìn)行出隊(duì)操作時(shí)
                      }
                      // 走到這里,說明當(dāng)前隊(duì)列不滿,可以執(zhí)行入隊(duì)操作
                      enqueue(e);

                      currentCount = count.getAndIncrement();
                  } finally {
                      // 入隊(duì)完畢,釋放鎖
                      putLock.unlock();
                  }

                  // 如果插入之前隊(duì)列為空,才喚醒等待彈出元素的線程
                  if (currentCount == 0) {
                      signalNotEmpty();
                  }
              }

              @Override
              public E take() throws InterruptedException {
                  E headElement;
                  int currentCount;

                  // 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
                  takeLock.lockInterruptibly();
                  try {
                      // 因?yàn)楸簧a(chǎn)者喚醒后可能會(huì)被其它的消費(fèi)者消費(fèi)而使得隊(duì)列再次為空,需要循環(huán)的判斷
                      while(this.count.get() == 0){
                          notEmpty.await();
                      }

                      headElement = dequeue();

                      currentCount = this.count.getAndDecrement();
                  } finally {
                      // 出隊(duì)完畢,釋放鎖
                      takeLock.unlock();
                  }

                  // 只有在彈出之前隊(duì)列已滿的情況下才喚醒等待插入元素的線程
                  if (currentCount == elements.length) {
                      signalNotFull();
                  }

                  return headElement;
              }

             /**
               * 喚醒等待隊(duì)列非空條件的線程
               */
              private void signalNotEmpty() {
                  // 為了喚醒等待隊(duì)列非空條件的線程,需要先獲取對應(yīng)的takeLock
                  takeLock.lock();
                  try {
                      // 喚醒一個(gè)等待非空條件的線程
                      notEmpty.signal();
                  } finally {
                      takeLock.unlock();
                  }
              }

              /**
               * 喚醒等待隊(duì)列未滿條件的線程
               */
              private void signalNotFull() {
                  // 為了喚醒等待隊(duì)列未滿條件的線程,需要先獲取對應(yīng)的putLock
                  putLock.lock();
                  try {
                      // 喚醒一個(gè)等待隊(duì)列未滿條件的線程
                      notFull.signal();
                  } finally {
                      putLock.unlock();
                  }
              }

          3. lost wakeup問題

                在上述待改進(jìn)的雙鎖實(shí)現(xiàn)第二版中,阻塞在notFull中的生產(chǎn)者線程完全依賴相對應(yīng)的消費(fèi)者線程來將其喚醒(阻塞在notEmpty中的消費(fèi)者線程也同樣依賴對應(yīng)的生產(chǎn)者線程將其喚醒),這在生產(chǎn)者線程和消費(fèi)者線程并發(fā)時(shí)會(huì)出現(xiàn)lost wakeup的問題。

                下面構(gòu)造一個(gè)簡單而不失一般性的例子來說明,為什么上述第二版的實(shí)現(xiàn)中會(huì)出現(xiàn)問題。

           時(shí)序圖(假設(shè)阻塞隊(duì)列的長度為5(element.length=5),且一開始時(shí)隊(duì)列已滿)


          生產(chǎn)者線程P1生產(chǎn)者線程P2消費(fèi)者線程C
          1

          執(zhí)行put操作,此時(shí)隊(duì)列已滿。

          執(zhí)行while循環(huán)中的notfull.await陷入阻塞狀態(tài)

          (await會(huì)釋放putLock)



          2

          執(zhí)行take操作,隊(duì)列未滿,成功執(zhí)行完dequeue。

          此時(shí)currentCount=5,this.count=4,

          執(zhí)行takeLock.unLock釋放takeLock鎖

          3

          執(zhí)行put操作,拿到putLock鎖,由于消費(fèi)者C已經(jīng)執(zhí)行完出隊(duì)操作,

          成功執(zhí)行enqueue。

          此時(shí)currentCount=4,this.count=5,

          執(zhí)行putLock.unLock釋放putLock鎖


          4

          判斷currentCount == elements.length為真,

          執(zhí)行signalNotFull,并成功拿到putLock。

          notFull.signal喚醒等待在其上的生產(chǎn)者線程P1。

          take方法執(zhí)行完畢,return返回

          5

           被消費(fèi)者C喚醒,但此時(shí)count=5,無法跳出while循環(huán),

          繼續(xù)await阻塞在notFull條件變量中



          6

          判斷currentCount == 0為假,進(jìn)行處理。

          put方法執(zhí)行完畢 ,return返回


           

                可以看到,雖然生產(chǎn)者線程P1由于隊(duì)列已滿而先被阻塞,而消費(fèi)者線程C在出隊(duì)后也確實(shí)通知喚醒了生產(chǎn)者線程P1。但是由于生產(chǎn)者線程P2和消費(fèi)者線程C的并發(fā)執(zhí)行,導(dǎo)致了生產(chǎn)者線程P1在被喚醒后依然無法成功執(zhí)行入隊(duì)操作,只能繼續(xù)的阻塞下去。在一些情況下,P1生產(chǎn)者線程可能再也不會(huì)被喚醒而永久的阻塞在條件變量notFull上。 

                為了解決這一問題,雙鎖版本的阻塞隊(duì)列其生產(chǎn)者線程不能僅僅依靠消費(fèi)者線程來將其喚醒,而是需要在其它生產(chǎn)者線程在入隊(duì)操作完成后,發(fā)現(xiàn)隊(duì)列未滿時(shí)也嘗試著喚醒由于上述并發(fā)場景發(fā)生lost wakeup問題的生產(chǎn)者線程(消費(fèi)者線程在出隊(duì)時(shí)的優(yōu)化亦是如此)。

          最終優(yōu)化的V5版本的出隊(duì)、入隊(duì)實(shí)現(xiàn):

          /**
              private final AtomicInteger count = new AtomicInteger();

              this.takeLock = new ReentrantLock();
              this.notEmpty = this.takeLock.newCondition();

              this.putLock = new ReentrantLock();
              this.notFull = this.putLock.newCondition();
          */

              @Override
              public void put(E e) throws InterruptedException {
                  int currentCount;
                  // 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
                  putLock.lockInterruptibly();
                  try {
                      // 因?yàn)楸幌M(fèi)者喚醒后可能會(huì)被其它的生產(chǎn)者再度填滿隊(duì)列,需要循環(huán)的判斷
                      while (count.get() == elements.length) {
                          // put操作時(shí),如果隊(duì)列已滿則進(jìn)入notFull條件變量的等待隊(duì)列,并釋放條件變量對應(yīng)的互斥鎖
                          notFull.await();
                          // 消費(fèi)者進(jìn)行出隊(duì)操作時(shí)
                      }
                      // 走到這里,說明當(dāng)前隊(duì)列不滿,可以執(zhí)行入隊(duì)操作
                      enqueue(e);

                      currentCount = count.getAndIncrement();

                      // 如果在插入后隊(duì)列仍然沒滿,則喚醒其他等待插入的線程
                      if (currentCount + 1 < elements.length) {
                          notFull.signal();
                      }
                  } finally {
                      // 入隊(duì)完畢,釋放鎖
                      putLock.unlock();
                  }

                  // 如果插入之前隊(duì)列為空,才喚醒等待彈出元素的線程
                  // 為了防止死鎖,不能在釋放putLock之前獲取takeLock
                  if (currentCount == 0) {
                      signalNotEmpty();
                  }
              }

              @Override
              public E take() throws InterruptedException {
                  E headElement;
                  int currentCount;

                  // 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
                  takeLock.lockInterruptibly();
                  try {
                      // 因?yàn)楸簧a(chǎn)者喚醒后可能會(huì)被其它的消費(fèi)者消費(fèi)而使得隊(duì)列再次為空,需要循環(huán)的判斷
                      while(this.count.get() == 0){
                          notEmpty.await();
                      }

                      headElement = dequeue();

                      currentCount = this.count.getAndDecrement();

                      // 如果隊(duì)列在彈出一個(gè)元素后仍然非空,則喚醒其他等待隊(duì)列非空的線程
                      if (currentCount - 1 > 0) {
                          notEmpty.signal();
                      }
                  } finally {
                      // 出隊(duì)完畢,釋放鎖
                      takeLock.unlock();
                  }

                  // 只有在彈出之前隊(duì)列已滿的情況下才喚醒等待插入元素的線程
                  // 為了防止死鎖,不能在釋放takeLock之前獲取putLock
                  if (currentCount == elements.length) {
                      signalNotFull();
                  }

                  return headElement;
              }

             /**
               * 喚醒等待隊(duì)列非空條件的線程
               */
              private void signalNotEmpty() {
                  // 為了喚醒等待隊(duì)列非空條件的線程,需要先獲取對應(yīng)的takeLock
                  takeLock.lock();
                  try {
                      // 喚醒一個(gè)等待非空條件的線程
                      notEmpty.signal();
                  } finally {
                      takeLock.unlock();
                  }
              }

              /**
               * 喚醒等待隊(duì)列未滿條件的線程
               */
              private void signalNotFull() {
                  // 為了喚醒等待隊(duì)列未滿條件的線程,需要先獲取對應(yīng)的putLock
                  putLock.lock();
                  try {
                      // 喚醒一個(gè)等待隊(duì)列未滿條件的線程
                      notFull.signal();
                  } finally {
                      putLock.unlock();
                  }
              }

          3. 不同版本阻塞隊(duì)列的性能測試

                前面從v2版本開始,對所實(shí)現(xiàn)的阻塞隊(duì)列進(jìn)行了一系列的優(yōu)化,一直到最終的V5版本實(shí)現(xiàn)了一個(gè)基于雙鎖,雙條件變量的高性能版本。

                下面對v3-v5版本進(jìn)行一輪基礎(chǔ)的性能測試(v2無限輪詢性能太差),看看其實(shí)際性能是否真的如博客第二章中所說的那般,高版本的性能是更優(yōu)秀的。同時(shí)令jdk中的ArrayBlockingQueue和LinkedBlockingQueue也實(shí)現(xiàn)MyBlockingQueue,也加入測試。

          測試工具類BlockingQueueTestUtil:

          public class BlockingQueueTestUtil {
              public static long statisticBlockingQueueRuntime(
                      MyBlockingQueue<Integer> blockingQueue, int workerNum, int perWorkerProcessNum, int repeatTime) throws InterruptedException {
                  ExecutorService executorService = Executors.newFixedThreadPool(workerNum * 2);
                  // 第一次執(zhí)行時(shí)存在一定的初始化開銷,不進(jìn)行統(tǒng)計(jì)
                  oneTurnExecute(executorService,blockingQueue,workerNum,perWorkerProcessNum);

                  long totalTime = 0;
                  for(int i=0; i<repeatTime; i++){
                      long oneTurnTime = oneTurnExecute(executorService,blockingQueue,workerNum,perWorkerProcessNum);
                      totalTime += oneTurnTime;
                  }

                  executorService.shutdown();

                  assert blockingQueue.isEmpty();

                  return totalTime/repeatTime;
              }

              private static long oneTurnExecute(ExecutorService executorService, MyBlockingQueue<Integer> blockingQueue,
                                                 int workerNum, int perWorkerProcessNum) throws InterruptedException {
                  long startTime = System.currentTimeMillis();
                  CountDownLatch countDownLatch = new CountDownLatch(workerNum * 2);

                  // 創(chuàng)建workerNum個(gè)生產(chǎn)者/消費(fèi)者
                  for(int i=0; i<workerNum; i++){
                      executorService.execute(()->{
                          produce(blockingQueue,perWorkerProcessNum);
                          countDownLatch.countDown();
                      });

                      executorService.execute(()->{
                          consume(blockingQueue,perWorkerProcessNum);
                          countDownLatch.countDown();
                      });
                  }
                  countDownLatch.await();
                  long endTime = System.currentTimeMillis();

                  return endTime - startTime;
              }

              private static void produce(MyBlockingQueue<Integer> blockingQueue,int perWorkerProcessNum){
                  try {
                      // 每個(gè)生產(chǎn)者生產(chǎn)perWorkerProcessNum個(gè)元素
                      for(int j=0; j<perWorkerProcessNum; j++){
                          blockingQueue.put(j);
                      }
                  } catch (InterruptedException e) {
                      throw new RuntimeException(e);
                  }
              }

              private static void consume(MyBlockingQueue<Integer> blockingQueue,int perWorkerProcessNum){
                  try {
                      // 每個(gè)消費(fèi)者消費(fèi)perWorkerProcessNum個(gè)元素
                      for(int j=0; j<perWorkerProcessNum; j++){
                          blockingQueue.take();
                      }
                  } catch (InterruptedException e) {
                      throw new RuntimeException(e);
                  }
              }
          }

          jdk的ArrayBlockingQueue簡單包裝(JDKArrayBlockingQueue):

          public class JDKArrayBlockingQueue<E> implements MyBlockingQueue<E> {

              private final BlockingQueue<E> jdkBlockingQueue;

              /**
               * 指定隊(duì)列大小的構(gòu)造器
               *
               * @param capacity  隊(duì)列大小
               */
              public JDKArrayBlockingQueue(int capacity) {
                  if (capacity <= 0)
                      throw new IllegalArgumentException();
                  jdkBlockingQueue = new ArrayBlockingQueue<>(capacity);
              }

              @Override
              public void put(E e) throws InterruptedException {
                  jdkBlockingQueue.put(e);
              }

              @Override
              public E take() throws InterruptedException {
                  return jdkBlockingQueue.take();
              }

              @Override
              public boolean isEmpty() {
                  return jdkBlockingQueue.isEmpty();
              }

              @Override
              public String toString() {
                  return "JDKArrayBlockingQueue{" +
                          "jdkBlockingQueue=" + jdkBlockingQueue +
                          '}';
              }
          }

          jdk的LinkedBlockingQueue簡單包裝(JDKLinkedBlockingQueue):

          public class JDKLinkedBlockingQueue<E> implements MyBlockingQueue<E> {

              private final BlockingQueue<E> jdkBlockingQueue;

              /**
               * 指定隊(duì)列大小的構(gòu)造器
               *
               * @param capacity  隊(duì)列大小
               */
              public JDKLinkedBlockingQueue(int capacity) {
                  if (capacity <= 0)
                      throw new IllegalArgumentException();
                  jdkBlockingQueue = new LinkedBlockingQueue<>(capacity);
              }

              @Override
              public void put(E e) throws InterruptedException {
                  jdkBlockingQueue.put(e);
              }

              @Override
              public E take() throws InterruptedException {
                  return jdkBlockingQueue.take();
              }

              @Override
              public boolean isEmpty() {
                  return jdkBlockingQueue.isEmpty();
              }

              @Override
              public String toString() {
                  return "JDKLinkedBlockingQueue{" +
                          "jdkBlockingQueue=" + jdkBlockingQueue +
                          '}';
              }
          }

          測試主體代碼:

          public class BlockingQueuePerformanceTest {
              /**
               * 隊(duì)列容量
               * */
              private static final int QUEUE_CAPACITY = 3;
              /**
               * 并發(fā)線程數(shù)(消費(fèi)者 + 生產(chǎn)者 = 2 * WORKER_NUM)
               * */
              private static final int WORKER_NUM = 30;
              /**
               * 單次測試中每個(gè)線程訪問隊(duì)列的次數(shù)
               * */
              private static final int PER_WORKER_PROCESS_NUM = 3000;
              /**
               * 重復(fù)執(zhí)行的次數(shù)
               * */
              private static final int REPEAT_TIME = 5;

              public static void main(String[] args) throws InterruptedException {
                  {
                      MyBlockingQueue<Integer> myArrayBlockingQueueV3 = new MyArrayBlockingQueueV3<>(QUEUE_CAPACITY);
                      long avgCostTimeV3 = BlockingQueueTestUtil.statisticBlockingQueueRuntime(myArrayBlockingQueueV3, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME);
                      System.out.println(costTimeLog(MyArrayBlockingQueueV3.class, avgCostTimeV3));
                  }
                  {
                      MyBlockingQueue<Integer> myArrayBlockingQueueV4 = new MyArrayBlockingQueueV4<>(QUEUE_CAPACITY);
                      long avgCostTimeV4 = BlockingQueueTestUtil.statisticBlockingQueueRuntime(myArrayBlockingQueueV4, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME);
                      System.out.println(costTimeLog(MyArrayBlockingQueueV4.class, avgCostTimeV4));
                  }
                  {
                      MyBlockingQueue<Integer> myArrayBlockingQueueV5 = new MyArrayBlockingQueueV5<>(QUEUE_CAPACITY);
                      long avgCostTimeV5 = BlockingQueueTestUtil.statisticBlockingQueueRuntime(myArrayBlockingQueueV5, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME);
                      System.out.println(costTimeLog(MyArrayBlockingQueueV5.class, avgCostTimeV5));
                  }
                  {
                      MyBlockingQueue<Integer> jdkArrayBlockingQueue = new JDKArrayBlockingQueue<>(QUEUE_CAPACITY);
                      long avgCostTimeJDK = BlockingQueueTestUtil.statisticBlockingQueueRuntime(jdkArrayBlockingQueue, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME);
                      System.out.println(costTimeLog(JDKArrayBlockingQueue.class, avgCostTimeJDK));
                  }
                  {
                      MyBlockingQueue<Integer> jdkLinkedBlockingQueue = new JDKLinkedBlockingQueue<>(QUEUE_CAPACITY);
                      long avgCostTimeJDK = BlockingQueueTestUtil.statisticBlockingQueueRuntime(jdkLinkedBlockingQueue, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME);
                      System.out.println(costTimeLog(JDKLinkedBlockingQueue.class, avgCostTimeJDK));
                  }
              }

              private static String costTimeLog(Class blockQueueCLass,long costTime){
                  return blockQueueCLass.getSimpleName() + " avgCostTime=" + costTime + "ms";
              }
          }

                上述代碼指定的參數(shù)為基于最大容量為3的阻塞隊(duì)列,生產(chǎn)者、消費(fèi)者線程各30個(gè),每個(gè)線程執(zhí)行3000次出隊(duì)或入隊(duì)操作,重復(fù)執(zhí)行5次用于統(tǒng)計(jì)平均時(shí)間。

          我的機(jī)器上的運(yùn)行結(jié)果如下:

          MyArrayBlockingQueueV3 avgCostTime=843ms
          MyArrayBlockingQueueV4 avgCostTime
          =530ms
          MyArrayBlockingQueueV5 avgCostTime
          =165ms
          JDKArrayBlockingQueue avgCostTime
          =506ms
          JDKLinkedBlockingQueue avgCostTime
          =163ms

               執(zhí)行時(shí)長v3 > v4 > JDKArrayBlockingQueue > MyArrayBlockingQueueV5 > JDKLinkedBlockingQueue,且v4耗時(shí)大致等于JDKArrayBlockingQueue、v5耗時(shí)大致等于JDKLinkedBlockingQueue。

               究其原因是因?yàn)閖dk的ArrayBlockingQueue實(shí)現(xiàn)和V4版本一樣,是基于單鎖,雙條件變量的;而jdk的LinkedBlockingQueue實(shí)現(xiàn)和V5版本一樣,是基于雙鎖,雙條件變量的(V4、V5版本的實(shí)現(xiàn)就是參考的jdk源碼)。

               雖然測試的用例不是很全面,但測試結(jié)果和理論大致是吻合的,希望大家通過測試結(jié)果來加深對不同版本間性能差異的背后原理的理解。

          4. 為什么jdk中的ArrayBlockingQueue不基于性能更好的雙鎖實(shí)現(xiàn) ?    

               看到這里,不知你是否和我一樣對為什么jdk的ArrayBlockingQueue使用單鎖而不使用性能更好的雙鎖實(shí)現(xiàn)而感到疑惑。所幸網(wǎng)上也有不少小伙伴有類似的疑問,這里將相關(guān)內(nèi)容簡單梳理一下。

               1. 基于數(shù)組實(shí)現(xiàn)的阻塞隊(duì)列(ABQ)是可以采用雙鎖實(shí)現(xiàn)更加高效率的出隊(duì)、入隊(duì)的。但由于jdk中阻塞隊(duì)列是屬于集合Collection的一個(gè)子類,雙鎖版本的ABQ其迭代器會(huì)比單鎖的復(fù)雜很多很多,但在性能上的改善并不那么的可觀。ABQ的實(shí)現(xiàn)在復(fù)雜度和性能上做了一個(gè)折中,選擇了容易實(shí)現(xiàn)但性能稍低的單鎖實(shí)現(xiàn)。

               http://jsr166-concurrency.10961.n7.nabble.com/ArrayBlockingQueue-concurrent-put-and-take-tc1306.html

               2. 如果對性能有更加苛刻要求的話,可以考慮使用jdk中基于雙鎖實(shí)現(xiàn)的LinkedBlockingQueue(LBQ)。需要注意的是,在高吞吐量的出隊(duì)、入隊(duì)的場景下,LBQ鏈?zhǔn)降慕Y(jié)構(gòu)在垃圾回收時(shí)性能會(huì)略低于基于數(shù)組的,緊湊結(jié)構(gòu)的ABQ。

               3. jdk提供了一個(gè)龐大而全面的集合框架,每個(gè)具體的數(shù)據(jù)結(jié)構(gòu)都需要盡可能多的實(shí)現(xiàn)高層的接口和抽象方法。這樣的設(shè)計(jì)對于使用者來說確實(shí)很友好,但也令實(shí)現(xiàn)者背上了沉重的負(fù)擔(dān),必須為實(shí)現(xiàn)一些可能極少使用的接口而花費(fèi)巨大的精力,甚至反過來影響到特定數(shù)據(jù)結(jié)構(gòu)的本身的實(shí)現(xiàn)。ABQ受制于雙鎖版本迭代器實(shí)現(xiàn)的復(fù)雜度,而被迫改為效率更低的單鎖實(shí)現(xiàn)就是一個(gè)典型的例子。

          5. 總結(jié)

                前段時(shí)間迷上了MIT6.824的數(shù)據(jù)庫課程,在理解了課程所提供的實(shí)驗(yàn)后(共6個(gè)lab)收獲很大,因此想著自己再動(dòng)手實(shí)現(xiàn)一個(gè)更加全面的版本(并發(fā)的B+樹,MVCC多版本控制、行級鎖以及sql解釋器、網(wǎng)絡(luò)協(xié)議等等)。但一段時(shí)間后發(fā)現(xiàn)上述的功能難度很大且實(shí)現(xiàn)起來細(xì)節(jié)很多,這將耗費(fèi)我過多的時(shí)間而被迫放棄了(膨脹了Orz)。在被打擊后,清醒的意識到對于現(xiàn)階段的我來說還是應(yīng)該穩(wěn)扎穩(wěn)打,著眼于更小的知識點(diǎn),通過自己動(dòng)手造輪子的方式加深對知識點(diǎn)的理解,至于擼一個(gè)完善的關(guān)系型數(shù)據(jù)庫這種宏大的目標(biāo)受制于我目前的水平還是暫時(shí)先放放吧。

            本篇博客的完整代碼在我的github上:https://github.com/1399852153/Reinventing-the-wheel-for-learning(blocking queue模塊)。后續(xù)應(yīng)該會(huì)陸續(xù)更新關(guān)于自己動(dòng)手實(shí)現(xiàn)線程池、抽象同步隊(duì)列AQS等的博客。

            還存在很多不足之處,請多多指教。




          鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布

          ??????

          ??長按上方微信二維碼 2 秒





          感謝點(diǎn)贊支持下哈 

          瀏覽 84
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美三级午夜理伦 | 久久日本道 | 大香蕉欧美在线观看不卡视频 | 精品久久久久久久久久久久久久久 | 日本骚片在线观看 |