<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          一種極致性能的緩沖隊列

          共 9147字,需瀏覽 19分鐘

           ·

          2021-03-03 12:24

          背景

          在多線程下的生產者-消費者模型中,需求滿足如下情況:

          • 對生產者生產投遞數(shù)據(jù)的性能要求非常高
          • 多個生產者,單個(多個也可以,本文只介紹單個的情況)消費者
          • 當消費者跟不上生產者速度時,可容忍少部分數(shù)據(jù)丟失
          • 生產者是單條單條地生產數(shù)據(jù)

          舉個日志采集的例子,日志在不同的線程上生產,在日志生產速度遠超消費者速度時,可以丟棄部分數(shù)據(jù),要求打日志的性能損耗最小,這種情況下可采用本文提供的極致性能的緩沖隊列。

          實現(xiàn)細節(jié)

          多個生產者向一個緩沖隊列提交消息,說到底是線程安全問題,如果不考慮線程安全,性能必然是最高的,但出現(xiàn)的問題是,數(shù)據(jù)經常被覆蓋。雖然可以容忍少部分數(shù)據(jù)丟失,但也是在消費者跟不上生產者速度時。緩沖區(qū)必然有界,無界可能導致內存泄露,如果緩沖區(qū)滿,再生產新數(shù)據(jù),可選的策略一般有如下幾種:

          • 阻塞直到被消費
          • 覆蓋舊數(shù)據(jù)
          • 丟棄新數(shù)據(jù)

          在要求對生產者性能損耗最小的情況下一般不選1,通常采取覆蓋策略。

          環(huán)形隊列

          有一種環(huán)形隊列的數(shù)據(jù)結構(ring buffer)可以很好的解決解決上面提到的生產者-消費者模型、緩沖區(qū)有界、覆蓋策略。通常用數(shù)組來實現(xiàn)ring buffer,只要保證生產者獲取下標是線程安全的即可解決線程安全問題。而且數(shù)組內存預先分配加上連續(xù)內存索引更加快速的特點也保證了強悍的性能。

          AtomicInteger

          在環(huán)形隊列上如何保證線程安全地獲取數(shù)組下標?線程安全地自增我們想到了AtomicInteger,很容易寫出如下代碼

          public class AtomicRangeInteger extends Number {

              private final AtomicInteger value;

              private final int startValue;
              private final int endValue;

              public AtomicRangeInteger(int startValue, int endValue) {
                  this.startValue = startValue;
                  this.endValue = endValue;
                  this.value = new AtomicInteger(startValue);
              }

              public final int incrementAndGet() {
                  int next;
                  do {
                      next = value.incrementAndGet();
                      if (next > endValue && value.compareAndSet(next, startValue)) {
                          return startValue;
                      }
                  } while (next > endValue);

                  return next;
              }

              public final int get() {
                  return value.intValue();
              }

              @Override
              public int intValue() {
                  return value.intValue();
              }

              @Override
              public long longValue() {
                  return value.intValue();
              }

              @Override
              public float floatValue() {
                  return value.intValue();
              }

              @Override
              public double doubleValue() {
                  return value.intValue();
              }
          }
          public final class RingBuffer<T{

              private int bufferSize;
              private AtomicRangeInteger index;
              private final T[] buffer;

              @SuppressWarnings("unchecked")
              public RingBuffer(int bufferSize) {
                  this.bufferSize = bufferSize;
                  this.index = new AtomicRangeInteger(0, bufferSize);
                  this.buffer = (T[]) new Object[bufferSize];
              }

              public final void offer(final T data) {
                  buffer[index.incrementAndGet()] = data;
              }

              public final T poll(int index) {
                  T tmp = buffer[index];
                  buffer[index] = null;
                  return tmp;
              }

              public int getBufferSize() {
                  return bufferSize;
              }
          }

          核心代碼其實就是這一段

          public final int incrementAndGet() {
              int next;
              do {
                  next = value.incrementAndGet();
                  if (next > endValue && value.compareAndSet(next, startValue)) {
                      return startValue;
                  }
              } while (next > endValue);

              return next;
          }
          • 首選生產者獲取下一個可用的index,直接在當前基礎上調用incrementAndGet進行加1操作,該操作是原子的,故拿到的一定是沒有被其他線程占用的index
          • 獲取上一步的返回值,該返回值有可能超過ring buffer的最大下標值,如果超過則將其置為startValue,這一步使用compareAndSet,可能會失敗,如果失敗說明有其他線程做了該操作,故可以再調用一次incrementAndGet獲取下一個下標

          為何是極致的性能

          目前有一款開源ring buffer的實現(xiàn)——disruptor,關于它的介紹網上可以找到很多,這里簡單介紹一下。它也是使用的數(shù)組來充當環(huán)形隊列,但與上面的實現(xiàn)有一點差別,它可以批量插入,所以它使用的是compareAndSet,它在緩沖區(qū)填滿以后的策略是阻塞,它的下標是不回溯,永遠往后加,采用取模來映射到對應的index,為了性能使用位運算(&),所以它的容量只能是2的N次方。

          主要的不同就在compareAndSet(v1)與incrementAndGet(v0),我們可以將原先的incrementAndGet實現(xiàn)改為compareAndSet測試一下性能差異(緩沖區(qū)大小為1000):

          Benchmark                    Mode  Cnt         Score   Error  Units
          RingBufferBenchmark.testV0  thrpt    2  39969002.156          ops/s
          RingBufferBenchmark.testV1  thrpt    2  15533576.961          ops/s

          為什么會有三倍性能的差距?看一下incrementAndGet的實現(xiàn):

          /**
           * Atomically increments by one the current value.
           *
           * @return the updated value
           */

          public final int incrementAndGet() {
              return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
          }
          public final int getAndAddInt(Object var1, long var2, int var4) {
              int var5;
              do {
                  var5 = this.getIntVolatile(var1, var2);
              } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

              return var5;
          }

          發(fā)現(xiàn)incrementAndGet底層也是CAS的實現(xiàn)。其實在JDK8之后對incrementAndGet做了優(yōu)化:

          ?

          Unsafe是經過特殊處理的,不能理解成常規(guī)的java代碼,在調用getAndAddInt的時候,如果系統(tǒng)底層支持fetch-and-add,那么它執(zhí)行的就是native方法,使用的是fetch-and-add;如果不支持,就按照上面的所看到的getAndAddInt方法體那樣,以java代碼的方式去執(zhí)行,使用的是compare-and-swap

          ?

          CAS使用的是jdk層面的自旋鎖,fetch-and-add是cpu指令,性能上fetch-and-add要強很多。所以如果拿上面的代碼用jdk1.7來測試,會發(fā)現(xiàn)性能沒有差別。

          再優(yōu)化空間

          • 緩存行填充

          AtomicRangeInteger對象中存在三個屬性,value,startValue,endValue,value是經常變化的,startValue,endValue是不會變化,所以當value經常變化會導致讀取startValue,endValue時不會命中cpu緩存,性能有所下降,我們使用jdk8的注解@Contended來填充value所在行的緩存。

          public class AtomicRangeIntegerV2 extends Number {

              @Contended
              protected final AtomicInteger value;

              protected final int startValue;
              protected final int endValue;
              ...
          }

          做一下benchmark(v2為加@Contended注解):

          Benchmark                    Mode  Cnt         Score   Error  Units
          RingBufferBenchmark.testV2  thrpt    2  72095754.040          ops/s
          RingBufferBenchmark.testV0  thrpt    2  44360926.943          ops/s
          • 多個ring buffer分擔沖突

          這個優(yōu)化就不做過多說明了,多線程中一般都采取分段的思想來降低沖突,ring buffer也可以,當一個ring buffer存在性能瓶頸時,可以利用多個ring buffer來分擔,最佳狀態(tài)是每個線程分配一個ring buffer,具體怎么分配,在之前的文章《實現(xiàn)一個比LongAdder更高性能的計數(shù)器有多難?》中有一個比較巧妙的辦法,可以參考下。

          彩蛋

          這個ring buffer本來是借鑒skywalking中ring buffer的實現(xiàn),但當時skywalking的實現(xiàn)也是使用CAS,性能不是很滿意,于是我就做了這些優(yōu)化,具體github的issue可以參考如下鏈接:

          ?

          https://github.com/apache/skywalking/pull/2874 https://github.com/apache/skywalking/pull/2930

          ?

          現(xiàn)在skywalking中已經是我寫的這個版本了,算得上是一個極致性能的緩沖隊列了。

          后臺回復 學習資料 領取學習視頻


          如有收獲,點個在看,誠摯感謝


          瀏覽 51
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黄色视频久 | 访问页面亚洲日韩 | 北条麻妃亚洲无码 | 538色视频一区二区三区 | 亚洲在线成人 |