一種極致性能的緩沖隊列
背景
在多線程下的生產者-消費者模型中,需求滿足如下情況:
對生產者生產投遞數(shù)據(jù)的性能要求非常高 多個生產者,單個(多個也可以,本文只介紹單個的情況)消費者 當消費者跟不上生產者速度時,可容忍少部分數(shù)據(jù)丟失 生產者是單條單條地生產數(shù)據(jù)
舉個日志采集的例子,日志在不同的線程上生產,在日志生產速度遠超消費者速度時,可以丟棄部分數(shù)據(jù),要求打日志的性能損耗最小,這種情況下可采用本文提供的極致性能的緩沖隊列。
實現(xiàn)細節(jié)
多個生產者向一個緩沖隊列提交消息,說到底是線程安全問題,如果不考慮線程安全,性能必然是最高的,但出現(xiàn)的問題是,數(shù)據(jù)經常被覆蓋。雖然可以容忍少部分數(shù)據(jù)丟失,但也是在消費者跟不上生產者速度時。緩沖區(qū)必然有界,無界可能導致內存泄露,如果緩沖區(qū)滿,再生產新數(shù)據(jù),可選的策略一般有如下幾種:
阻塞直到被消費 覆蓋舊數(shù)據(jù) 丟棄新數(shù)據(jù)
在要求對生產者性能損耗最小的情況下一般不選1,通常采取覆蓋策略。
環(huán)形隊列
有一種環(huán)形隊列的數(shù)據(jù)結構(ring buffer)可以很好的解決解決上面提到的生產者-消費者模型、緩沖區(qū)有界、覆蓋策略。通常用數(shù)組來實現(xiàn)ring buffer,只要保證生產者獲取下標是線程安全的即可解決線程安全問題。而且數(shù)組內存預先分配加上連續(xù)內存索引更加快速的特點也保證了強悍的性能。

AtomicInteger
在環(huán)形隊列上如何保證線程安全地獲取數(shù)組下標?線程安全地自增我們想到了AtomicInteger,很容易寫出如下代碼
public class AtomicRangeInteger extends Number {
private final AtomicInteger value;
private final int startValue;
private final int endValue;
public AtomicRangeInteger(int startValue, int endValue) {
this.startValue = startValue;
this.endValue = endValue;
this.value = new AtomicInteger(startValue);
}
public final int incrementAndGet() {
int next;
do {
next = value.incrementAndGet();
if (next > endValue && value.compareAndSet(next, startValue)) {
return startValue;
}
} while (next > endValue);
return next;
}
public final int get() {
return value.intValue();
}
@Override
public int intValue() {
return value.intValue();
}
@Override
public long longValue() {
return value.intValue();
}
@Override
public float floatValue() {
return value.intValue();
}
@Override
public double doubleValue() {
return value.intValue();
}
}
public final class RingBuffer<T> {
private int bufferSize;
private AtomicRangeInteger index;
private final T[] buffer;
@SuppressWarnings("unchecked")
public RingBuffer(int bufferSize) {
this.bufferSize = bufferSize;
this.index = new AtomicRangeInteger(0, bufferSize);
this.buffer = (T[]) new Object[bufferSize];
}
public final void offer(final T data) {
buffer[index.incrementAndGet()] = data;
}
public final T poll(int index) {
T tmp = buffer[index];
buffer[index] = null;
return tmp;
}
public int getBufferSize() {
return bufferSize;
}
}
核心代碼其實就是這一段
public final int incrementAndGet() {
int next;
do {
next = value.incrementAndGet();
if (next > endValue && value.compareAndSet(next, startValue)) {
return startValue;
}
} while (next > endValue);
return next;
}
首選生產者獲取下一個可用的index,直接在當前基礎上調用incrementAndGet進行加1操作,該操作是原子的,故拿到的一定是沒有被其他線程占用的index 獲取上一步的返回值,該返回值有可能超過ring buffer的最大下標值,如果超過則將其置為startValue,這一步使用compareAndSet,可能會失敗,如果失敗說明有其他線程做了該操作,故可以再調用一次incrementAndGet獲取下一個下標
為何是極致的性能
目前有一款開源ring buffer的實現(xiàn)——disruptor,關于它的介紹網上可以找到很多,這里簡單介紹一下。它也是使用的數(shù)組來充當環(huán)形隊列,但與上面的實現(xiàn)有一點差別,它可以批量插入,所以它使用的是compareAndSet,它在緩沖區(qū)填滿以后的策略是阻塞,它的下標是不回溯,永遠往后加,采用取模來映射到對應的index,為了性能使用位運算(&),所以它的容量只能是2的N次方。
主要的不同就在compareAndSet(v1)與incrementAndGet(v0),我們可以將原先的incrementAndGet實現(xiàn)改為compareAndSet測試一下性能差異(緩沖區(qū)大小為1000):
Benchmark Mode Cnt Score Error Units
RingBufferBenchmark.testV0 thrpt 2 39969002.156 ops/s
RingBufferBenchmark.testV1 thrpt 2 15533576.961 ops/s
為什么會有三倍性能的差距?看一下incrementAndGet的實現(xiàn):
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
發(fā)現(xiàn)incrementAndGet底層也是CAS的實現(xiàn)。其實在JDK8之后對incrementAndGet做了優(yōu)化:
?Unsafe是經過特殊處理的,不能理解成常規(guī)的java代碼,在調用getAndAddInt的時候,如果系統(tǒng)底層支持fetch-and-add,那么它執(zhí)行的就是native方法,使用的是fetch-and-add;如果不支持,就按照上面的所看到的getAndAddInt方法體那樣,以java代碼的方式去執(zhí)行,使用的是compare-and-swap
?
CAS使用的是jdk層面的自旋鎖,fetch-and-add是cpu指令,性能上fetch-and-add要強很多。所以如果拿上面的代碼用jdk1.7來測試,會發(fā)現(xiàn)性能沒有差別。
再優(yōu)化空間
緩存行填充
AtomicRangeInteger對象中存在三個屬性,value,startValue,endValue,value是經常變化的,startValue,endValue是不會變化,所以當value經常變化會導致讀取startValue,endValue時不會命中cpu緩存,性能有所下降,我們使用jdk8的注解@Contended來填充value所在行的緩存。
public class AtomicRangeIntegerV2 extends Number {
@Contended
protected final AtomicInteger value;
protected final int startValue;
protected final int endValue;
...
}
做一下benchmark(v2為加@Contended注解):
Benchmark Mode Cnt Score Error Units
RingBufferBenchmark.testV2 thrpt 2 72095754.040 ops/s
RingBufferBenchmark.testV0 thrpt 2 44360926.943 ops/s
多個ring buffer分擔沖突
這個優(yōu)化就不做過多說明了,多線程中一般都采取分段的思想來降低沖突,ring buffer也可以,當一個ring buffer存在性能瓶頸時,可以利用多個ring buffer來分擔,最佳狀態(tài)是每個線程分配一個ring buffer,具體怎么分配,在之前的文章《實現(xiàn)一個比LongAdder更高性能的計數(shù)器有多難?》中有一個比較巧妙的辦法,可以參考下。
彩蛋
這個ring buffer本來是借鑒skywalking中ring buffer的實現(xiàn),但當時skywalking的實現(xiàn)也是使用CAS,性能不是很滿意,于是我就做了這些優(yōu)化,具體github的issue可以參考如下鏈接:
?https://github.com/apache/skywalking/pull/2874 https://github.com/apache/skywalking/pull/2930
?
現(xiàn)在skywalking中已經是我寫的這個版本了,算得上是一個極致性能的緩沖隊列了。
后臺回復 學習資料 領取學習視頻
如有收獲,點個在看,誠摯感謝
