<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Java并發(fā)隊(duì)列原理剖析

          共 20414字,需瀏覽 41分鐘

           ·

          2021-05-25 21:19

          來(lái)源:https://github.com/afkbrb/java-concurrency-note

          LinkedBlockingQueue和ArrayBlockingQueue比較簡(jiǎn)單,不進(jìn)行講解了。下面只介紹PriorityBlockingQueue和DelayQueue。

          PriorityBlockingQueue

          PriorityBlockingQueue是帶優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列,每次出隊(duì)都返回優(yōu)先級(jí)最高或最低的元素。內(nèi)部使用二叉堆實(shí)現(xiàn)。

          類圖結(jié)構(gòu)

          PriorityBlockingQueue內(nèi)部有一個(gè)數(shù)組queue,用來(lái)存放隊(duì)列元素。allocationSpinLock是個(gè)自旋鎖,通過(guò)CAS操作來(lái)保證同時(shí)只有一個(gè)線程可以擴(kuò)容隊(duì)列,狀態(tài)為0或1。

          由于這是一個(gè)優(yōu)先隊(duì)列,所以有一個(gè)comparator用來(lái)比較元素大小。

          下面為構(gòu)造函數(shù):

          private static final int DEFAULT_INITIAL_CAPACITY = 11;

          public PriorityBlockingQueue() {
              this(DEFAULT_INITIAL_CAPACITY, null);
          }

          public PriorityBlockingQueue(int initialCapacity) {
              this(initialCapacity, null);
          }

          可知默認(rèn)隊(duì)列容量為11,默認(rèn)比較器為null,也就是使用元素的compareTo方法進(jìn)行比較來(lái)確定元素的優(yōu)先級(jí),這意味著隊(duì)列元素必須實(shí)現(xiàn)Comparable接口。

          原理講解

          boolean offer()
          public boolean offer(E e) {
              if (e == null)
                  throw new NullPointerException();
              // 獲取獨(dú)占鎖
              final ReentrantLock lock = this.lock;
              lock.lock();
              int n, cap;
              Object[] array;
              // 擴(kuò)容
              while ((n = size) >= (cap = (array = queue).length))
                  tryGrow(array, cap);
              try {
                  Comparator<? super E> cmp = comparator;
                  if (cmp == null)
                      // 通過(guò)對(duì)二叉堆的上浮操作保證最大或最小的元素總在根節(jié)點(diǎn)
                      siftUpComparable(n, e, array);
                  else
                      // 使用了自定義比較器
                      siftUpUsingComparator(n, e, array, cmp);
                  size = n + 1;
                  // 激活因調(diào)用take()方法被阻塞的線程
                  notEmpty.signal();
              } finally {
                  // 釋放鎖
                  lock.unlock();
              }
              return true;
          }

          流程比較簡(jiǎn)單,下面主要看擴(kuò)容和建堆操作。

          先看擴(kuò)容。

          private void tryGrow(Object[] array, int oldCap) {
              // 由前面的代碼可知,調(diào)用tryGrow函數(shù)前先獲取了獨(dú)占鎖,
              // 由于擴(kuò)容比較費(fèi)時(shí),此處先釋放鎖,
              // 讓其他線程可以繼續(xù)操作(如果滿足可操作的條件的話),
              // 以提升并發(fā)性能
              lock.unlock();
              Object[] newArray = null;
              // 通過(guò)allocationSpinLock保證同時(shí)最多只有一個(gè)線程進(jìn)行擴(kuò)容操作。
              if (allocationSpinLock == 0 &&
                  UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,01)) {
                  try {
                      // 當(dāng)容量比較小時(shí),一次只增加2容量
                      // 比較大時(shí)增加一倍
                      int newCap = oldCap + ((oldCap < 64) ?(oldCap + 2) : (oldCap >> 1));
                      // 溢出檢測(cè)
                      if (newCap - MAX_ARRAY_SIZE > 0) {
                          int minCap = oldCap + 1;
                          if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                              throw new OutOfMemoryError();
                          newCap = MAX_ARRAY_SIZE;
                      }
                      if (newCap > oldCap && queue == array)
                          newArray = new Object[newCap];
                  } finally {
                      // 釋放鎖,沒(méi)用CAS是因?yàn)橥瑫r(shí)最多有一個(gè)線程操作allocationSpinLock
                      allocationSpinLock = 0;
                  }
              }
              // 如果當(dāng)前線程發(fā)現(xiàn)有其他線程正在對(duì)隊(duì)列進(jìn)行擴(kuò)容,
              // 則調(diào)用yield方法嘗試讓出CPU資源促使擴(kuò)容操作盡快完成
              if (newArray == null)
                  Thread.yield();
              lock.lock();
              if (newArray != null && queue == array) {
                  queue = newArray;
                  System.arraycopy(array, 0, newArray, 0, oldCap);
              }
          }

          下面來(lái)看建堆算法

          private static <T> void siftUpComparable(int k, T x, Object[] array) {
              Comparable<? super T> key = (Comparable<? super T>) x;
              while (k > 0) {
                  // 獲取父節(jié)點(diǎn),設(shè)子節(jié)點(diǎn)索引為k,
                  // 則由二叉堆的性質(zhì)可知,父節(jié)點(diǎn)的索引總為(k - 1) >>> 1
                  int parent = (k - 1) >>> 1;
                  // 獲取父節(jié)點(diǎn)對(duì)應(yīng)的值
                  Object e = array[parent];
                  // 只有子節(jié)點(diǎn)的值小于父節(jié)點(diǎn)的值時(shí)才上浮
                  if (key.compareTo((T) e) >= 0)
                      break;
                  array[k] = e;
                  k = parent;
              }
              array[k] = key;
          }

          如果了解二叉堆的話,此處代碼是十分容易理解的。關(guān)于二叉堆,可參看《數(shù)據(jù)結(jié)構(gòu)之二叉堆》。

          E poll()
          public E poll() {
              final ReentrantLock lock = this.lock;
              lock.lock();
              try {
                  // 出隊(duì)
                  return dequeue();
              } finally {
                  lock.unlock();
              }
          }

          private E dequeue() {
              int n = size - 1;
              if (n < 0)
                  return null;
              else {
                  Object[] array = queue;
                  E result = (E) array[0];
                  // 獲取尾節(jié)點(diǎn),在實(shí)現(xiàn)對(duì)二叉堆的下沉操作時(shí)要用到
                  E x = (E) array[n];
                  array[n] = null;
                  Comparator<? super E> cmp = comparator;
                  if (cmp == null)
                      // 下沉操作,保證取走最小的節(jié)點(diǎn)(根節(jié)點(diǎn))后,新的根節(jié)點(diǎn)仍時(shí)最小的,二叉堆的性質(zhì)依然滿足
                      siftDownComparable(0, x, array, n);
                  else
                      // 使用自定義比較器
                      siftDownUsingComparator(0, x, array, n, cmp);
                  size = n;
                  return result;
              }
          }

          poll方法通過(guò)調(diào)用dequeue方法使最大或最小的節(jié)點(diǎn)出隊(duì)并將其返回。

          下面來(lái)看二叉堆的下沉操作。

          private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
              if (n > 0) {
                  Comparable<? super T> key = (Comparable<? super T>)x;
                  int half = n >>> 1;
                  while (k < half) {
                      // child為兩個(gè)子節(jié)點(diǎn)(如果有的話)中較小的那個(gè)對(duì)應(yīng)的索引
                      int child = (k << 1) + 1;
                      Object c = array[child];
                      int right = child + 1;
                      // 通過(guò)比較保證child對(duì)應(yīng)的為較小值的索引
                      if (right < n &&
                          ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                          c = array[child = right];
                      if (key.compareTo((T) c) <= 0)
                          break;
                      // 下沉,將較小的子節(jié)點(diǎn)換到父節(jié)點(diǎn)位置
                      array[k] = c;
                      k = child;
                  }
                  array[k] = key;
              }
          }

          同上,對(duì)下沉操作有疑問(wèn)的話可參考上述文章。

          void put(E e)

          調(diào)用了offer

          public void put(E e){
              offer(e);
          }
          E take()

          take操作的作用是獲取二叉堆的根節(jié)點(diǎn)元素,如果隊(duì)列為空則阻塞。

          public E take() throws InterruptedException {
              final ReentrantLock lock = this.lock;
              // 阻塞可被中斷
              lock.lockInterruptibly();
              E result;
              try {
                  // 隊(duì)列為空就將當(dāng)前線程放入notEmpty條件隊(duì)列
                  // 使用while循環(huán)判斷是為了避免虛假喚醒
                  while ( (result = dequeue()) == null)
                      notEmpty.await();
              } finally {
                  lock.unlock();
              }
              return result;
          }

          DelayQueue

          DelayQueue并發(fā)隊(duì)列是一個(gè)無(wú)界阻塞延遲隊(duì)列,隊(duì)列中的每一個(gè)元素都有一個(gè)過(guò)期時(shí)間,當(dāng)從隊(duì)列中獲取元素時(shí)只有過(guò)期元素才會(huì)出列。隊(duì)列頭元素是最快要過(guò)期的元素。

          類圖結(jié)構(gòu)

          DelayQueue內(nèi)部使用PriorityQueue存放數(shù)據(jù),使用ReentrantLock實(shí)現(xiàn)線程同步。
          隊(duì)列里的元素要實(shí)現(xiàn)Delayed接口(Delayed接口繼承了Comparable接口),用以得到過(guò)期時(shí)間并進(jìn)行過(guò)期時(shí)間的比較。

          public interface Delayed extends Comparable<Delayed{
              long getDelay(TimeUnit unit);
          }

          available是由lock生成的條件變量,用以實(shí)現(xiàn)線程間的同步。

          leader是leader-follower模式的變體,用于減少不必要的線程等待。當(dāng)一個(gè)線程調(diào)用隊(duì)列的take方法變?yōu)閘eader線程后,它會(huì)調(diào)用條件變量available.waitNanos(delay)等待delay時(shí)間,但是其他線程(follower)則會(huì)調(diào)用available.await()進(jìn)行無(wú)限等待。leader線程延遲時(shí)間過(guò)期后,會(huì)退出take方法,并通過(guò)調(diào)用available.signal()方法喚醒一個(gè)follower線程,被喚醒的線程會(huì)被選舉為新的leader線程。

          原理講解

          boolean offer(E e)
          public boolean offer(E e) {
              final ReentrantLock lock = this.lock;
              lock.lock();
              try {
                  // 添加新元素
                  q.offer(e);
                  // 查看新添加的元素是否為最先過(guò)期的
                  if (q.peek() == e) {
                      leader = null;
                      available.signal();
                  }
                  return true;
              } finally {
                  lock.unlock();
              }
          }

          上述代碼首先獲取獨(dú)占鎖,然后添加元素到優(yōu)先級(jí)隊(duì)列,由于q是優(yōu)先級(jí)隊(duì)列,所以添加元素后,調(diào)用q.peek()方法返回的并不一定是當(dāng)前添加的元素。當(dāng)如果q.peek() == e,說(shuō)明當(dāng)前元素是最先要過(guò)期的,那么重置leader線程為null并激活available條件隊(duì)列里的一個(gè)線程,告訴它隊(duì)列里面有元素了。

          E take()

          獲取并移除隊(duì)列里面過(guò)期的元素,如果隊(duì)列里面沒(méi)有過(guò)期元素則等待。

          public E take() throws InterruptedException {
              final ReentrantLock lock = this.lock;
              // 可中斷
              lock.lockInterruptibly();
              try {
                  for (;;) {
                      E first = q.peek();
                      // 為空則等待
                      if (first == null)
                          available.await();
                      else {
                          long delay = first.getDelay(NANOSECONDS);
                          // 過(guò)期則成功獲取
                          if (delay <= 0)
                              return q.poll();
                          // 執(zhí)行到此處,說(shuō)明頭元素未過(guò)期    
                          first = null// don't retain ref while waiting
                          // follower無(wú)限等待,直到被喚醒
                          if (leader != null)
                              available.await();
                          else {
                              Thread thisThread = Thread.currentThread();
                              leader = thisThread;
                              try {
                                  // leader等待lelay時(shí)間,則頭元素必定已經(jīng)過(guò)期
                                  available.awaitNanos(delay);
                              } finally {
                                  // 重置leader,給follower稱為leader的機(jī)會(huì)
                                  if (leader == thisThread)
                                      leader = null;
                              }
                          }
                      }
                  }
              } finally {
                  if (leader == null && q.peek() != null)
                      // 喚醒一個(gè)follower線程
                      available.signal();
                  lock.unlock();
              }
          }

          一個(gè)線程調(diào)用take方法時(shí),會(huì)首先查看頭元素是否為空,為空則直接等待,否則判斷是否過(guò)期。
          若頭元素已經(jīng)過(guò)期,則直接通過(guò)poll獲取并移除,否則判斷是否有l(wèi)eader線程。
          若有l(wèi)eader線程則一直等待,否則自己成為leader并等待頭元素過(guò)期。

          E poll()

          獲取并移除頭過(guò)期元素,如果沒(méi)有過(guò)期元素則返回null。

          public E poll() {
              final ReentrantLock lock = this.lock;
              lock.lock();
              try {
                  E first = q.peek();
                  // 若隊(duì)列為空或沒(méi)有元素過(guò)期則直接返回null
                  if (first == null || first.getDelay(NANOSECONDS) > 0)
                      return null;
                  else
                      return q.poll();
              } finally {
                  lock.unlock();
              }
          }
          int size()

          計(jì)算隊(duì)列元素個(gè)數(shù),包含過(guò)期的和未過(guò)期的。

          public int size() {
              final ReentrantLock lock = this.lock;
              lock.lock();
              try {
                  return q.size();
              } finally {
                  lock.unlock();
              }
          }

          最近給大家找了  百萬(wàn)級(jí)電商


          資源,怎么領(lǐng)取?


          掃二維碼,加我微信,回復(fù):百萬(wàn)級(jí)電商

           注意,不要亂回復(fù) 

          沒(méi)錯(cuò),不是機(jī)器人
          記得一定要等待,等待才有好東西


          瀏覽 36
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩成人无码AV | 大香蕉伊人综合在线观看 | 91精品国产综合久久久蜜臀酒店 | 欧美一级特黄一区二区 | 国产精品77777 |