用Java自己動(dòng)手實(shí)現(xiàn)一個(gè)阻塞隊(duì)列
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
作者 | 小熊餐館
來源 | urlify.cn/IbIB7j
1. 阻塞隊(duì)列介紹
顧名思義,阻塞隊(duì)列是一個(gè)具備先進(jìn)先出特性的隊(duì)列結(jié)構(gòu),從隊(duì)列末尾插入數(shù)據(jù),從隊(duì)列頭部取出數(shù)據(jù)。而阻塞隊(duì)列與普通隊(duì)列的最大不同在于阻塞隊(duì)列提供了阻塞式的同步插入、取出數(shù)據(jù)的功能(阻塞入隊(duì)put/阻塞出隊(duì)take)。
使用put插入數(shù)據(jù)時(shí),如果隊(duì)列空間已滿并不直接返回,而是令當(dāng)前操作的線程陷入阻塞態(tài)(生產(chǎn)者線程),等待著阻塞隊(duì)列中的元素被其它線程(消費(fèi)者線程)取走,令隊(duì)列重新變得不滿時(shí)被喚醒再次嘗試插入數(shù)據(jù)。使用take取出數(shù)據(jù)時(shí),如果隊(duì)列空間為空并不直接返回,而是令當(dāng)前操作的線程陷入阻塞態(tài)(消費(fèi)者線程),等待其它線程(生產(chǎn)者線程)插入新元素,令隊(duì)列非空時(shí)被喚醒再次嘗試取出數(shù)據(jù)。
阻塞隊(duì)列主要用于解決并發(fā)場景下消費(fèi)者線程與生產(chǎn)者線程處理速度不一致的問題。例如jdk的線程池實(shí)現(xiàn)中,線程池核心線程(消費(fèi)者線程)處理速度一定的情況下,如果業(yè)務(wù)方線程提交的任務(wù)過多導(dǎo)致核心線程處理不過來時(shí),將任務(wù)暫時(shí)放進(jìn)阻塞隊(duì)列等待核心線程消費(fèi)(阻塞隊(duì)列未滿);由于核心線程常駐的原因,當(dāng)業(yè)務(wù)方線程提交的任務(wù)較少,核心線程消費(fèi)速度高于業(yè)務(wù)方生產(chǎn)速度時(shí),核心線程作為消費(fèi)者會(huì)阻塞在阻塞隊(duì)列的take方法中,避免無謂的浪費(fèi)cpu資源。
由于阻塞隊(duì)列在內(nèi)部實(shí)現(xiàn)了協(xié)調(diào)生產(chǎn)者/消費(fèi)者的機(jī)制而不需要外部使用者過多的考慮并發(fā)同步問題,極大的降低了生產(chǎn)者/消費(fèi)者場景下程序的復(fù)雜度。
2. 自己實(shí)現(xiàn)阻塞隊(duì)列
下面我們自己動(dòng)手一步步的實(shí)現(xiàn)幾個(gè)不同版本、效率由低到高的的阻塞隊(duì)列,來加深對阻塞隊(duì)列工作原理的理解。
阻塞隊(duì)列接口
為了降低復(fù)雜度,我們的阻塞隊(duì)列只提供最基礎(chǔ)的出隊(duì)、入隊(duì)和判空接口。
/**
* 阻塞隊(duì)列
* 1. 首先是一個(gè)先進(jìn)先出的隊(duì)列
* 2. 提供特別的api,在入隊(duì)時(shí)如果隊(duì)列已滿令當(dāng)前操作線程阻塞;在出隊(duì)時(shí)如果隊(duì)列為空令當(dāng)前操作線程阻塞
* 3. 單個(gè)元素的插入、刪除操作是線程安全的
*/
public interface MyBlockingQueue<E> {
/**
* 插入特定元素e,加入隊(duì)尾
* 隊(duì)列已滿時(shí)阻塞當(dāng)前線程,直到隊(duì)列中元素被其它線程刪除并插入成功
* */
void put(E e) throws InterruptedException;
/**
* 隊(duì)列頭部的元素出隊(duì)(返回頭部元素,將其從隊(duì)列中刪除)
* 隊(duì)列為空時(shí)阻塞當(dāng)前線程,直到隊(duì)列被其它元素插入新元素并出隊(duì)成功
* */
E take() throws InterruptedException;
/**
* 隊(duì)列是否為空
* */
boolean isEmpty();
}
2.1 v1版本(最基本的隊(duì)列實(shí)現(xiàn))
博客中所實(shí)現(xiàn)的阻塞隊(duì)列底層是使用數(shù)組承載數(shù)據(jù)的(ArrayBlockingQueue),內(nèi)部提供了私有方法enqueue和dequeue來實(shí)現(xiàn)原始的內(nèi)部入隊(duì)和出隊(duì)操作。
最初始的v1版本中,我們只實(shí)現(xiàn)最基本的FIFO隊(duì)列功能,其put和take方法只是簡單的調(diào)用了enqueue和dequeue,因此v1版本中其入隊(duì)、出隊(duì)不是阻塞的,也無法保障線程安全,十分簡陋。
后續(xù)的版本中,我們會(huì)以v1版本為基礎(chǔ),實(shí)現(xiàn)阻塞調(diào)用以及線程安全的特性,并且對所實(shí)現(xiàn)的阻塞隊(duì)列性能進(jìn)行不斷的優(yōu)化。
/**
* 數(shù)組作為底層結(jié)構(gòu)的阻塞隊(duì)列 v1版本
*/
public class MyArrayBlockingQueueV1<E> implements MyBlockingQueue<E> {
/**
* 隊(duì)列默認(rèn)的容量大小
* */
private static final int DEFAULT_CAPACITY = 16;
/**
* 承載隊(duì)列元素的底層數(shù)組
* */
private final Object[] elements;
/**
* 當(dāng)前頭部元素的下標(biāo)
* */
private int head;
/**
* 下一個(gè)元素插入時(shí)的下標(biāo)
* */
private int tail;
/**
* 隊(duì)列中元素個(gè)數(shù)
* */
private int count;
//=================================================構(gòu)造方法======================================================
public MyArrayBlockingQueueV1() {
// 設(shè)置數(shù)組大小為默認(rèn)
this.elements = new Object[DEFAULT_CAPACITY];
// 初始化隊(duì)列 頭部,尾部下標(biāo)
this.head = 0;
this.tail = 0;
}
public MyArrayBlockingQueueV1(int initCapacity) {
assert initCapacity > 0;
this.elements = new Object[initCapacity];
// 初始化隊(duì)列 頭部,尾部下標(biāo)
this.head = 0;
this.tail = 0;
}
/**
* 下標(biāo)取模
* */
private int getMod(int logicIndex){
int innerArrayLength = this.elements.length;
// 由于隊(duì)列下標(biāo)邏輯上是循環(huán)的
if(logicIndex < 0){
// 當(dāng)邏輯下標(biāo)小于零時(shí)
// 真實(shí)下標(biāo) = 邏輯下標(biāo) + 加上當(dāng)前數(shù)組長度
return logicIndex + innerArrayLength;
} else if(logicIndex >= innerArrayLength){
// 當(dāng)邏輯下標(biāo)大于數(shù)組長度時(shí)
// 真實(shí)下標(biāo) = 邏輯下標(biāo) - 減去當(dāng)前數(shù)組長度
return logicIndex - innerArrayLength;
} else {
// 真實(shí)下標(biāo) = 邏輯下標(biāo)
return logicIndex;
}
}
/**
* 入隊(duì)
* */
private void enqueue(E e){
// 存放新插入的元素
this.elements[this.tail] = e;
// 尾部插入新元素后 tail下標(biāo)后移一位
this.tail = getMod(this.tail + 1);
this.count++;
}
/**
* 出隊(duì)
* */
private E dequeue(){
// 暫存需要被刪除的數(shù)據(jù)
E dataNeedRemove = (E)this.elements[this.head];
// 將當(dāng)前頭部元素引用釋放
this.elements[this.head] = null;
// 頭部下標(biāo) 后移一位
this.head = getMod(this.head + 1);
this.count--;
return dataNeedRemove;
}
@Override
public void put(E e){
enqueue(e);
}
@Override
public E take() {
return dequeue();
}
@Override
public boolean isEmpty() {
return this.count == 0;
}
}
2.2 v2版本(實(shí)現(xiàn)同步阻塞和線程安全的特性)
前面提到阻塞調(diào)用的出隊(duì)、入隊(duì)的功能是阻塞隊(duì)列區(qū)別于普通隊(duì)列的關(guān)鍵特性。阻塞調(diào)用實(shí)現(xiàn)的方式有很多,其中最容易理解的一種方式便是無限循環(huán)的輪詢,直到出隊(duì)/入隊(duì)成功(雖然cpu效率很低)。
v2版本在v1的基礎(chǔ)上,使用無限循環(huán)加定時(shí)休眠的方式簡單的實(shí)現(xiàn)了同步調(diào)用時(shí)阻塞的特性。并且在put/take內(nèi)增加了synchronized塊將入隊(duì)/出隊(duì)代碼包裹起來,阻止多個(gè)線程并發(fā)的操作隊(duì)列而產(chǎn)生線程安全問題。
v2版本入隊(duì)方法實(shí)現(xiàn):
@Override
public void put(E e) throws InterruptedException {
while (true) {
synchronized (this) {
// 隊(duì)列未滿時(shí)執(zhí)行入隊(duì)操作
if (count != elements.length) {
// 入隊(duì),并返回
enqueue(e);
return;
}
}
// 隊(duì)列已滿,休眠一段時(shí)間后重試
Thread.sleep(100L);
}
}
v2版本出隊(duì)方法實(shí)現(xiàn):
@Override
public E take() throws InterruptedException {
while (true) {
synchronized (this) {
// 隊(duì)列非空時(shí)執(zhí)行出隊(duì)操作
if (count != 0) {
// 出隊(duì)并立即返回
return dequeue();
}
}
// 隊(duì)列為空的情況下,休眠一段時(shí)間后重試
Thread.sleep(100L);
}
}
v2版本完整代碼:
/**
* 數(shù)組作為底層結(jié)構(gòu)的阻塞隊(duì)列 v2版本
*/
public class MyArrayBlockingQueueV2<E> implements MyBlockingQueue<E> {
/**
* 隊(duì)列默認(rèn)的容量大小
* */
private static final int DEFAULT_CAPACITY = 16;
/**
* 承載隊(duì)列元素的底層數(shù)組
* */
private final Object[] elements;
/**
* 當(dāng)前頭部元素的下標(biāo)
* */
private int head;
/**
* 下一個(gè)元素插入時(shí)的下標(biāo)
* */
private int tail;
/**
* 隊(duì)列中元素個(gè)數(shù)
* */
private int count;
//=================================================構(gòu)造方法======================================================
/**
* 默認(rèn)構(gòu)造方法
* */
public MyArrayBlockingQueueV2() {
// 設(shè)置數(shù)組大小為默認(rèn)
this.elements = new Object[DEFAULT_CAPACITY];
// 初始化隊(duì)列 頭部,尾部下標(biāo)
this.head = 0;
this.tail = 0;
}
/**
* 默認(rèn)構(gòu)造方法
* */
public MyArrayBlockingQueueV2(int initCapacity) {
assert initCapacity > 0;
// 設(shè)置數(shù)組大小為默認(rèn)
this.elements = new Object[initCapacity];
// 初始化隊(duì)列 頭部,尾部下標(biāo)
this.head = 0;
this.tail = 0;
}
/**
* 下標(biāo)取模
* */
private int getMod(int logicIndex){
int innerArrayLength = this.elements.length;
// 由于隊(duì)列下標(biāo)邏輯上是循環(huán)的
if(logicIndex < 0){
// 當(dāng)邏輯下標(biāo)小于零時(shí)
// 真實(shí)下標(biāo) = 邏輯下標(biāo) + 加上當(dāng)前數(shù)組長度
return logicIndex + innerArrayLength;
} else if(logicIndex >= innerArrayLength){
// 當(dāng)邏輯下標(biāo)大于數(shù)組長度時(shí)
// 真實(shí)下標(biāo) = 邏輯下標(biāo) - 減去當(dāng)前數(shù)組長度
return logicIndex - innerArrayLength;
} else {
// 真實(shí)下標(biāo) = 邏輯下標(biāo)
return logicIndex;
}
}
/**
* 入隊(duì)
* */
private void enqueue(E e){
// 存放新插入的元素
this.elements[this.tail] = e;
// 尾部插入新元素后 tail下標(biāo)后移一位
this.tail = getMod(this.tail + 1);
this.count++;
}
/**
* 出隊(duì)
* */
private E dequeue(){
// 暫存需要被刪除的數(shù)據(jù)
E dataNeedRemove = (E)this.elements[this.head];
// 將當(dāng)前頭部元素引用釋放
this.elements[this.head] = null;
// 頭部下標(biāo) 后移一位
this.head = getMod(this.head + 1);
this.count--;
return dataNeedRemove;
}
@Override
public void put(E e) throws InterruptedException {
while (true) {
synchronized (this) {
// 隊(duì)列未滿時(shí)執(zhí)行入隊(duì)操作
if (count != elements.length) {
// 入隊(duì),并返回
enqueue(e);
return;
}
}
// 隊(duì)列已滿,休眠一段時(shí)間后重試
Thread.sleep(100L);
}
}
@Override
public E take() throws InterruptedException {
while (true) {
synchronized (this) {
// 隊(duì)列非空時(shí)執(zhí)行出隊(duì)操作
if (count != 0) {
// 出隊(duì)并立即返回
return dequeue();
}
}
// 隊(duì)列為空的情況下,休眠一段時(shí)間后重試
Thread.sleep(100L);
}
}
@Override
public boolean isEmpty() {
return this.count == 0;
}
}
2.3 v3版本(引入條件變量優(yōu)化無限循環(huán)輪詢)
在有大量線程競爭的情況下,v2版本無限循環(huán)加休眠的阻塞方式存在兩個(gè)嚴(yán)重的問題。
無限循環(huán)輪詢的缺陷
1. 線程周期性的休眠/喚醒會(huì)造成頻繁的發(fā)生線程上下文切換,非常浪費(fèi)cpu資源
2. 線程在嘗試操作失敗被阻塞時(shí)(嘗試入隊(duì)時(shí)隊(duì)列已滿、嘗試出隊(duì)時(shí)隊(duì)列為空),如果休眠時(shí)間設(shè)置的太短,則休眠/喚醒的次數(shù)會(huì)非常多,cpu性能低下;但如果休眠的時(shí)間設(shè)置的較長,則會(huì)導(dǎo)致被阻塞線程在隊(duì)列狀態(tài)發(fā)生變化時(shí)無法及時(shí)的響應(yīng)
舉個(gè)例子:某一生產(chǎn)者線程在入隊(duì)時(shí)發(fā)現(xiàn)隊(duì)列已滿,當(dāng)前線程休眠1s,在0.1s之后一個(gè)消費(fèi)者線程取走了一個(gè)元素,而此時(shí)休眠的生產(chǎn)者線程還需要白白等待0.9s后才被喚醒并感知到隊(duì)列未滿而接著執(zhí)行入隊(duì)操作。綜上所述,無限循環(huán)加休眠的v2版本阻塞隊(duì)列其性能極差,需要進(jìn)一步的優(yōu)化。
使用條件變量進(jìn)行優(yōu)化
為了解決上述循環(huán)休眠浪費(fèi)cpu和隊(duì)列狀態(tài)發(fā)生變化時(shí)(已滿到未滿,已空到未空)被阻塞線程無法及時(shí)響應(yīng)的問題,v3版本引入條件變量對其進(jìn)行優(yōu)化。
條件變量由底層的操作系統(tǒng)內(nèi)核實(shí)現(xiàn)的、用于線程間同步的利器。(條件變量的實(shí)現(xiàn)原理可以參考我之前的博客:https://www.cnblogs.com/xiaoxiongcanguan/p/14152830.html)
java將不同操作系統(tǒng)內(nèi)核提供的條件變量機(jī)制抽象封裝后,作為可重入鎖ReentrantLock的附屬給程序員使用。且為了避免lost wakeup問題,在條件變量的實(shí)現(xiàn)中增加了校驗(yàn),要求調(diào)用條件變量的signal和await方法時(shí)當(dāng)前線程必須先獲得條件變量所附屬的鎖才行,更具體的解析可以參考這篇文章:https://mp.weixin.qq.com/s/ohcr6T1aB7-lVFJIfyJZjA。
引入條件變量后,可以令未滿足某種條件的線程暫時(shí)進(jìn)入阻塞態(tài),等待在一個(gè)條件變量上;當(dāng)對應(yīng)條件滿足時(shí)由其它的線程將等待在條件變量上的線程喚醒,將其從阻塞態(tài)再切換回就緒態(tài)。
舉個(gè)例子:當(dāng)某一生產(chǎn)者線程想要插入新元素但阻塞隊(duì)列已滿時(shí),可以令當(dāng)前生產(chǎn)者線程等待并阻塞在對應(yīng)的條件變量中;當(dāng)后續(xù)某一消費(fèi)者線程執(zhí)行出隊(duì)操作使得隊(duì)列非空后,將等待在條件變量上的生產(chǎn)者線程喚醒,被喚醒的生產(chǎn)者線程便能及時(shí)的再次嘗試進(jìn)行入隊(duì)操作。
v3和v2版本相比,等待在條件變量進(jìn)入阻塞態(tài)的線程不再周期性的被喚醒而占用過多的cpu資源,且在特定條件滿足時(shí)也能被及時(shí)喚醒。
引入條件變量后的v3版本阻塞隊(duì)列效率比v2高出許多。
v3版本完整代碼:
/**
* 數(shù)組作為底層結(jié)構(gòu)的阻塞隊(duì)列 v3版本
*/
public class MyArrayBlockingQueueV3<E> implements MyBlockingQueue<E> {
/**
* 隊(duì)列默認(rèn)的容量大小
* */
private static final int DEFAULT_CAPACITY = 16;
/**
* 承載隊(duì)列元素的底層數(shù)組
* */
private final Object[] elements;
/**
* 當(dāng)前頭部元素的下標(biāo)
* */
private int head;
/**
* 下一個(gè)元素插入時(shí)的下標(biāo)
* */
private int tail;
/**
* 隊(duì)列中元素個(gè)數(shù)
* */
private int count;
private final ReentrantLock reentrantLock;
private final Condition condition;
//=================================================構(gòu)造方法======================================================
/**
* 默認(rèn)構(gòu)造方法
* */
public MyArrayBlockingQueueV3() {
this(DEFAULT_CAPACITY);
}
/**
* 默認(rèn)構(gòu)造方法
* */
public MyArrayBlockingQueueV3(int initCapacity) {
assert initCapacity > 0;
// 設(shè)置數(shù)組大小為默認(rèn)
this.elements = new Object[initCapacity];
// 初始化隊(duì)列 頭部,尾部下標(biāo)
this.head = 0;
this.tail = 0;
this.reentrantLock = new ReentrantLock();
this.condition = this.reentrantLock.newCondition();
}
/**
* 下標(biāo)取模
* */
private int getMod(int logicIndex){
int innerArrayLength = this.elements.length;
// 由于隊(duì)列下標(biāo)邏輯上是循環(huán)的
if(logicIndex < 0){
// 當(dāng)邏輯下標(biāo)小于零時(shí)
// 真實(shí)下標(biāo) = 邏輯下標(biāo) + 加上當(dāng)前數(shù)組長度
return logicIndex + innerArrayLength;
} else if(logicIndex >= innerArrayLength){
// 當(dāng)邏輯下標(biāo)大于數(shù)組長度時(shí)
// 真實(shí)下標(biāo) = 邏輯下標(biāo) - 減去當(dāng)前數(shù)組長度
return logicIndex - innerArrayLength;
} else {
// 真實(shí)下標(biāo) = 邏輯下標(biāo)
return logicIndex;
}
}
/**
* 入隊(duì)
* */
private void enqueue(E e){
// 存放新插入的元素
this.elements[this.tail] = e;
// 尾部插入新元素后 tail下標(biāo)后移一位
this.tail = getMod(this.tail + 1);
this.count++;
}
/**
* 出隊(duì)
* */
private E dequeue(){
// 暫存需要被刪除的數(shù)據(jù)
E dataNeedRemove = (E)this.elements[this.head];
// 將當(dāng)前頭部元素引用釋放
this.elements[this.head] = null;
// 頭部下標(biāo) 后移一位
this.head = getMod(this.head + 1);
this.count--;
return dataNeedRemove;
}
@Override
public void put(E e) throws InterruptedException {
// 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
reentrantLock.lockInterruptibly();
try {
// 因?yàn)楸幌M(fèi)者喚醒后可能會(huì)被其它的生產(chǎn)者再度填滿隊(duì)列,需要循環(huán)的判斷
while (this.count == elements.length) {
// put操作時(shí),如果隊(duì)列已滿則進(jìn)入條件變量的等待隊(duì)列,并釋放條件變量對應(yīng)的鎖
condition.await();
}
// 走到這里,說明當(dāng)前隊(duì)列不滿,可以執(zhí)行入隊(duì)操作
enqueue(e);
// 喚醒可能等待著的消費(fèi)者線程
// 由于共用了一個(gè)condition,所以不能用signal,否則一旦喚醒的也是生產(chǎn)者線程就會(huì)陷入上面的while死循環(huán))
condition.signalAll();
} finally {
// 入隊(duì)完畢,釋放鎖
reentrantLock.unlock();
}
}
@Override
public E take() throws InterruptedException {
// 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
reentrantLock.lockInterruptibly();
try {
// 因?yàn)楸簧a(chǎn)者喚醒后可能會(huì)被其它的消費(fèi)者消費(fèi)而使得隊(duì)列再次為空,需要循環(huán)的判斷
while(this.count == 0){
condition.await();
}
E headElement = dequeue();
// 喚醒可能等待著的生產(chǎn)者線程
// 由于共用了一個(gè)condition,所以不能用signal,否則一旦喚醒的也是消費(fèi)者線程就會(huì)陷入上面的while死循環(huán))
condition.signalAll();
return headElement;
} finally {
// 出隊(duì)完畢,釋放鎖
reentrantLock.unlock();
}
}
@Override
public boolean isEmpty() {
return this.count == 0;
}
}
2.4 v4版本(引入雙條件變量,優(yōu)化喚醒效率)
v3版本通過引入條件變量解決了v2版本中循環(huán)休眠、喚醒效率低下的問題,但v3版本還是存在一定的性能問題。
v3版本中signalAll的效率問題
jdk的Condition條件變量提供了signal和signalAll這兩個(gè)方法用于喚醒等待在條件變量中的線程,其中signalAll會(huì)喚醒等待在條件變量上的所有線程,而signal則只會(huì)喚醒其中一個(gè)。
舉個(gè)例子,v3版本中消費(fèi)者線程在隊(duì)列已滿時(shí)進(jìn)行出隊(duì)操作后,通過signalAll會(huì)喚醒所有等待入隊(duì)的多個(gè)生產(chǎn)者線程,但最終只會(huì)有一個(gè)線程成功競爭到互斥鎖并成功執(zhí)行入隊(duì)操作,其它的生產(chǎn)者線程在被喚醒后發(fā)現(xiàn)隊(duì)列依然是滿的,而繼續(xù)等待。v3版本中的signalAll喚醒操作造成了驚群效應(yīng),無意義的喚醒了過多的等待中的線程。
但僅僅將v3版本中的signalAll改成signal是不行的,因?yàn)樯a(chǎn)者和消費(fèi)者線程是等待在同一個(gè)條件變量中的,如果消費(fèi)者在出隊(duì)后通過signal喚醒的不是與之對應(yīng)的生產(chǎn)者線程,而是另一個(gè)消費(fèi)者線程,則本該被喚醒的生產(chǎn)者線程可能遲遲無法被喚醒,甚至在一些場景下會(huì)永遠(yuǎn)被阻塞,無法再喚醒。
仔細(xì)思索后可以發(fā)現(xiàn),對于生產(chǎn)者線程其在隊(duì)列已滿時(shí)阻塞等待,等待的是隊(duì)列不滿的條件(notFull);而對于消費(fèi)者線程其在隊(duì)列為空時(shí)阻塞等待,等待的是隊(duì)列不空的條件(notEmpty)。隊(duì)列不滿和隊(duì)列不空實(shí)質(zhì)上是兩個(gè)互不相關(guān)的條件。
因此v4版本中將生產(chǎn)者線程和消費(fèi)者線程關(guān)注的條件變量拆分成兩個(gè):生產(chǎn)者線程在隊(duì)列已滿時(shí)阻塞等待在notFull條件變量上,消費(fèi)者線程出隊(duì)后通過notFull.signal嘗試著喚醒一個(gè)等待的生產(chǎn)者線程;與之相對的,消費(fèi)者線程在隊(duì)列為空時(shí)阻塞等待在notEmpty條件變量上,生產(chǎn)者線程入隊(duì)后通過notEmpty.signal嘗試著喚醒一個(gè)等待的消費(fèi)者線程。
通過拆分出兩個(gè)互相獨(dú)立的條件變量,v4版本避免了v3版本中signalAll操作帶來的驚群效應(yīng),避免了signalAll操作無效喚醒帶來的額外開銷。
v4版本完整代碼:
/**
* 數(shù)組作為底層結(jié)構(gòu)的阻塞隊(duì)列 v4版本
*/
public class MyArrayBlockingQueueV4<E> implements MyBlockingQueue<E> {
/**
* 隊(duì)列默認(rèn)的容量大小
* */
private static final int DEFAULT_CAPACITY = 16;
/**
* 承載隊(duì)列元素的底層數(shù)組
* */
private final Object[] elements;
/**
* 當(dāng)前頭部元素的下標(biāo)
* */
private int head;
/**
* 下一個(gè)元素插入時(shí)的下標(biāo)
* */
private int tail;
/**
* 隊(duì)列中元素個(gè)數(shù)
* */
private int count;
private final ReentrantLock reentrantLock;
private final Condition notEmpty;
private final Condition notFull;
//=================================================構(gòu)造方法======================================================
/**
* 默認(rèn)構(gòu)造方法
* */
public MyArrayBlockingQueueV4() {
this(DEFAULT_CAPACITY);
}
/**
* 默認(rèn)構(gòu)造方法
* */
public MyArrayBlockingQueueV4(int initCapacity) {
assert initCapacity > 0;
// 設(shè)置數(shù)組大小為默認(rèn)
this.elements = new Object[initCapacity];
// 初始化隊(duì)列 頭部,尾部下標(biāo)
this.head = 0;
this.tail = 0;
this.reentrantLock = new ReentrantLock();
this.notEmpty = this.reentrantLock.newCondition();
this.notFull = this.reentrantLock.newCondition();
}
/**
* 下標(biāo)取模
* */
private int getMod(int logicIndex){
int innerArrayLength = this.elements.length;
// 由于隊(duì)列下標(biāo)邏輯上是循環(huán)的
if(logicIndex < 0){
// 當(dāng)邏輯下標(biāo)小于零時(shí)
// 真實(shí)下標(biāo) = 邏輯下標(biāo) + 加上當(dāng)前數(shù)組長度
return logicIndex + innerArrayLength;
} else if(logicIndex >= innerArrayLength){
// 當(dāng)邏輯下標(biāo)大于數(shù)組長度時(shí)
// 真實(shí)下標(biāo) = 邏輯下標(biāo) - 減去當(dāng)前數(shù)組長度
return logicIndex - innerArrayLength;
} else {
// 真實(shí)下標(biāo) = 邏輯下標(biāo)
return logicIndex;
}
}
/**
* 入隊(duì)
* */
private void enqueue(E e){
// 存放新插入的元素
this.elements[this.tail] = e;
// 尾部插入新元素后 tail下標(biāo)后移一位
this.tail = getMod(this.tail + 1);
this.count++;
}
/**
* 出隊(duì)
* */
private E dequeue(){
// 暫存需要被刪除的數(shù)據(jù)
E dataNeedRemove = (E)this.elements[this.head];
// 將當(dāng)前頭部元素引用釋放
this.elements[this.head] = null;
// 頭部下標(biāo) 后移一位
this.head = getMod(this.head + 1);
this.count--;
return dataNeedRemove;
}
@Override
public void put(E e) throws InterruptedException {
// 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
reentrantLock.lockInterruptibly();
try {
// 因?yàn)楸幌M(fèi)者喚醒后可能會(huì)被其它的生產(chǎn)者再度填滿隊(duì)列,需要循環(huán)的判斷
while (this.count == elements.length) {
// put操作時(shí),如果隊(duì)列已滿則進(jìn)入notFull條件變量的等待隊(duì)列,并釋放條件變量對應(yīng)的互斥鎖
notFull.await();
// 消費(fèi)者進(jìn)行出隊(duì)操作時(shí)
}
// 走到這里,說明當(dāng)前隊(duì)列不滿,可以執(zhí)行入隊(duì)操作
enqueue(e);
// 喚醒可能等待在notEmpty中的一個(gè)消費(fèi)者線程
notEmpty.signal();
} finally {
// 入隊(duì)完畢,釋放鎖
reentrantLock.unlock();
}
}
@Override
public E take() throws InterruptedException {
// 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
reentrantLock.lockInterruptibly();
try {
// 因?yàn)楸簧a(chǎn)者喚醒后可能會(huì)被其它的消費(fèi)者消費(fèi)而使得隊(duì)列再次為空,需要循環(huán)的判斷
while(this.count == 0){
notEmpty.await();
}
E headElement = dequeue();
// 喚醒可能等待在notFull中的一個(gè)生產(chǎn)者線程
notFull.signal();
return headElement;
} finally {
// 出隊(duì)完畢,釋放鎖
reentrantLock.unlock();
}
}
@Override
public boolean isEmpty() {
return this.count == 0;
}
}
2.5 v5版本(引入雙鎖令生產(chǎn)者和消費(fèi)者能并發(fā)操作阻塞隊(duì)列)
v4版本的阻塞隊(duì)列采用雙條件變量之后,其性能已經(jīng)不錯(cuò)了,但仍存在進(jìn)一步優(yōu)化的空間。
v4版本單鎖的性能問題
v4版本中阻塞隊(duì)列的出隊(duì)、入隊(duì)操作是使用同一個(gè)互斥鎖進(jìn)行并發(fā)同步的,這意味著生產(chǎn)者線程和消費(fèi)者線程無法并發(fā)工作,消費(fèi)者線程必須等待生產(chǎn)者線程操作完成退出臨界區(qū)之后才能繼續(xù)執(zhí)行,反之亦然。單鎖的設(shè)計(jì)在生產(chǎn)者和消費(fèi)者都很活躍的高并發(fā)場景下會(huì)一定程度限制阻塞隊(duì)列的吞吐量。
因此v5版本在v4版本的基礎(chǔ)上,將出隊(duì)和入隊(duì)操作使用兩把鎖分別管理,使得生產(chǎn)者線程和消費(fèi)者線程可以并發(fā)的操作阻塞隊(duì)列,達(dá)到進(jìn)一步提高吞吐量的目的。
使用兩把鎖分別控制出隊(duì)、入隊(duì)后,還需要一些調(diào)整來解決生產(chǎn)者/消費(fèi)者并發(fā)操作隊(duì)列所帶來的問題。
存在并發(fā)問題的雙鎖版本出隊(duì)、入隊(duì)實(shí)現(xiàn)第一版(v4基礎(chǔ)上的微調(diào)):
/**
this.takeLock = new ReentrantLock();
this.notEmpty = this.takeLock.newCondition();
this.putLock = new ReentrantLock();
this.notFull = this.putLock.newCondition();
*/
@Override
public void put(E e) throws InterruptedException {
// 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
putLock.lockInterruptibly();
try {
// 因?yàn)楸幌M(fèi)者喚醒后可能會(huì)被其它的生產(chǎn)者再度填滿隊(duì)列,需要循環(huán)的判斷
while (this.count == elements.length) {
// put操作時(shí),如果隊(duì)列已滿則進(jìn)入notFull條件變量的等待隊(duì)列,并釋放條件變量對應(yīng)的互斥鎖
notFull.await();
}
// 走到這里,說明當(dāng)前隊(duì)列不滿,可以執(zhí)行入隊(duì)操作
enqueue(e);
// 喚醒可能等待在notEmpty中的一個(gè)消費(fèi)者線程
notEmpty.signal();
} finally {
// 入隊(duì)完畢,釋放鎖
putLock.unlock();
}
}
@Override
public E take() throws InterruptedException {
// 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
takeLock.lockInterruptibly();
try {
// 因?yàn)楸簧a(chǎn)者喚醒后可能會(huì)被其它的消費(fèi)者消費(fèi)而使得隊(duì)列再次為空,需要循環(huán)的判斷
while(this.count == 0){
notEmpty.await();
}
E headElement = dequeue();
// 喚醒可能等待在notFull中的一個(gè)生產(chǎn)者線程
notFull.signal();
return headElement;
} finally {
// 出隊(duì)完畢,釋放鎖
takeLock.unlock();
}
}
上面基于v4版本微調(diào)的雙鎖實(shí)現(xiàn)雖然容易理解,但由于允許消費(fèi)者和生產(chǎn)者線程并發(fā)的訪問隊(duì)列而存在幾個(gè)嚴(yán)重問題。
1. count屬性線程不安全
隊(duì)列長度count字段是一個(gè)用于判斷隊(duì)列是否已滿,隊(duì)列是否為空的重要屬性。在v5之前的版本count屬性一直被唯一的同步鎖保護(hù)著,任意時(shí)刻至多只有一個(gè)線程可以進(jìn)入臨界區(qū)修改count的值。而引入雙鎖令消費(fèi)者線程/生產(chǎn)者線程能并發(fā)訪問后,count變量的自增/自減操作會(huì)出現(xiàn)線程不安全的問題。
解決方案:將int類型的count修改為AtomicInteger來解決生產(chǎn)者/消費(fèi)者同時(shí)訪問、修改count時(shí)導(dǎo)致的并發(fā)問題。
2. 生產(chǎn)者/消費(fèi)者線程死鎖問題
在上述的代碼示例中,生產(chǎn)者線程首先獲得生產(chǎn)者鎖去執(zhí)行入隊(duì)操作,然后喚醒可能阻塞在notEmpty上的消費(fèi)者線程。由于使用條件變量前首先需要獲得其所屬的互斥鎖,如果生產(chǎn)者線程不先釋放生產(chǎn)者鎖就去獲取消費(fèi)者的互斥鎖,那么就存在出現(xiàn)死鎖的風(fēng)險(xiǎn)。消費(fèi)者線程和生產(chǎn)者線程可以并發(fā)的先分別獲得消費(fèi)者鎖和生產(chǎn)者鎖,并且也同時(shí)嘗試著獲取另一把鎖,這樣雙方都在等待著對方釋放鎖,互相阻塞出現(xiàn)死鎖現(xiàn)象。
解決方案:先釋放已獲得的鎖之后再去獲得另一個(gè)鎖執(zhí)行喚醒操作
存在并發(fā)問題的雙鎖版本出隊(duì)、入隊(duì)實(shí)現(xiàn)第二版(在上述第一版基礎(chǔ)上進(jìn)行微調(diào)):
/**
private final AtomicInteger count = new AtomicInteger();
this.takeLock = new ReentrantLock();
this.notEmpty = this.takeLock.newCondition();
this.putLock = new ReentrantLock();
this.notFull = this.putLock.newCondition();
*/
@Override
public void put(E e) throws InterruptedException {
int currentCount;
// 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
putLock.lockInterruptibly();
try {
// 因?yàn)楸幌M(fèi)者喚醒后可能會(huì)被其它的生產(chǎn)者再度填滿隊(duì)列,需要循環(huán)的判斷
while (count.get() == elements.length) {
// put操作時(shí),如果隊(duì)列已滿則進(jìn)入notFull條件變量的等待隊(duì)列,并釋放條件變量對應(yīng)的互斥鎖
notFull.await();
// 消費(fèi)者進(jìn)行出隊(duì)操作時(shí)
}
// 走到這里,說明當(dāng)前隊(duì)列不滿,可以執(zhí)行入隊(duì)操作
enqueue(e);
currentCount = count.getAndIncrement();
} finally {
// 入隊(duì)完畢,釋放鎖
putLock.unlock();
}
// 如果插入之前隊(duì)列為空,才喚醒等待彈出元素的線程
if (currentCount == 0) {
signalNotEmpty();
}
}
@Override
public E take() throws InterruptedException {
E headElement;
int currentCount;
// 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
takeLock.lockInterruptibly();
try {
// 因?yàn)楸簧a(chǎn)者喚醒后可能會(huì)被其它的消費(fèi)者消費(fèi)而使得隊(duì)列再次為空,需要循環(huán)的判斷
while(this.count.get() == 0){
notEmpty.await();
}
headElement = dequeue();
currentCount = this.count.getAndDecrement();
} finally {
// 出隊(duì)完畢,釋放鎖
takeLock.unlock();
}
// 只有在彈出之前隊(duì)列已滿的情況下才喚醒等待插入元素的線程
if (currentCount == elements.length) {
signalNotFull();
}
return headElement;
}
/**
* 喚醒等待隊(duì)列非空條件的線程
*/
private void signalNotEmpty() {
// 為了喚醒等待隊(duì)列非空條件的線程,需要先獲取對應(yīng)的takeLock
takeLock.lock();
try {
// 喚醒一個(gè)等待非空條件的線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* 喚醒等待隊(duì)列未滿條件的線程
*/
private void signalNotFull() {
// 為了喚醒等待隊(duì)列未滿條件的線程,需要先獲取對應(yīng)的putLock
putLock.lock();
try {
// 喚醒一個(gè)等待隊(duì)列未滿條件的線程
notFull.signal();
} finally {
putLock.unlock();
}
}
3. lost wakeup問題
在上述待改進(jìn)的雙鎖實(shí)現(xiàn)第二版中,阻塞在notFull中的生產(chǎn)者線程完全依賴相對應(yīng)的消費(fèi)者線程來將其喚醒(阻塞在notEmpty中的消費(fèi)者線程也同樣依賴對應(yīng)的生產(chǎn)者線程將其喚醒),這在生產(chǎn)者線程和消費(fèi)者線程并發(fā)時(shí)會(huì)出現(xiàn)lost wakeup的問題。
下面構(gòu)造一個(gè)簡單而不失一般性的例子來說明,為什么上述第二版的實(shí)現(xiàn)中會(huì)出現(xiàn)問題。
時(shí)序圖(假設(shè)阻塞隊(duì)列的長度為5(element.length=5),且一開始時(shí)隊(duì)列已滿)
| 生產(chǎn)者線程P1 | 生產(chǎn)者線程P2 | 消費(fèi)者線程C | |
| 1 | 執(zhí)行put操作,此時(shí)隊(duì)列已滿。 執(zhí)行while循環(huán)中的notfull.await陷入阻塞狀態(tài) (await會(huì)釋放putLock) | ||
| 2 | 執(zhí)行take操作,隊(duì)列未滿,成功執(zhí)行完dequeue。 此時(shí)currentCount=5,this.count=4, 執(zhí)行takeLock.unLock釋放takeLock鎖 | ||
| 3 | 執(zhí)行put操作,拿到putLock鎖,由于消費(fèi)者C已經(jīng)執(zhí)行完出隊(duì)操作, 成功執(zhí)行enqueue。 此時(shí)currentCount=4,this.count=5, 執(zhí)行putLock.unLock釋放putLock鎖 | ||
| 4 | 判斷currentCount == elements.length為真, 執(zhí)行signalNotFull,并成功拿到putLock。 notFull.signal喚醒等待在其上的生產(chǎn)者線程P1。 take方法執(zhí)行完畢,return返回 | ||
| 5 | 被消費(fèi)者C喚醒,但此時(shí)count=5,無法跳出while循環(huán), 繼續(xù)await阻塞在notFull條件變量中 | ||
| 6 | 判斷currentCount == 0為假,進(jìn)行處理。 put方法執(zhí)行完畢 ,return返回 |
可以看到,雖然生產(chǎn)者線程P1由于隊(duì)列已滿而先被阻塞,而消費(fèi)者線程C在出隊(duì)后也確實(shí)通知喚醒了生產(chǎn)者線程P1。但是由于生產(chǎn)者線程P2和消費(fèi)者線程C的并發(fā)執(zhí)行,導(dǎo)致了生產(chǎn)者線程P1在被喚醒后依然無法成功執(zhí)行入隊(duì)操作,只能繼續(xù)的阻塞下去。在一些情況下,P1生產(chǎn)者線程可能再也不會(huì)被喚醒而永久的阻塞在條件變量notFull上。
為了解決這一問題,雙鎖版本的阻塞隊(duì)列其生產(chǎn)者線程不能僅僅依靠消費(fèi)者線程來將其喚醒,而是需要在其它生產(chǎn)者線程在入隊(duì)操作完成后,發(fā)現(xiàn)隊(duì)列未滿時(shí)也嘗試著喚醒由于上述并發(fā)場景發(fā)生lost wakeup問題的生產(chǎn)者線程(消費(fèi)者線程在出隊(duì)時(shí)的優(yōu)化亦是如此)。
最終優(yōu)化的V5版本的出隊(duì)、入隊(duì)實(shí)現(xiàn):
/**
private final AtomicInteger count = new AtomicInteger();
this.takeLock = new ReentrantLock();
this.notEmpty = this.takeLock.newCondition();
this.putLock = new ReentrantLock();
this.notFull = this.putLock.newCondition();
*/
@Override
public void put(E e) throws InterruptedException {
int currentCount;
// 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
putLock.lockInterruptibly();
try {
// 因?yàn)楸幌M(fèi)者喚醒后可能會(huì)被其它的生產(chǎn)者再度填滿隊(duì)列,需要循環(huán)的判斷
while (count.get() == elements.length) {
// put操作時(shí),如果隊(duì)列已滿則進(jìn)入notFull條件變量的等待隊(duì)列,并釋放條件變量對應(yīng)的互斥鎖
notFull.await();
// 消費(fèi)者進(jìn)行出隊(duì)操作時(shí)
}
// 走到這里,說明當(dāng)前隊(duì)列不滿,可以執(zhí)行入隊(duì)操作
enqueue(e);
currentCount = count.getAndIncrement();
// 如果在插入后隊(duì)列仍然沒滿,則喚醒其他等待插入的線程
if (currentCount + 1 < elements.length) {
notFull.signal();
}
} finally {
// 入隊(duì)完畢,釋放鎖
putLock.unlock();
}
// 如果插入之前隊(duì)列為空,才喚醒等待彈出元素的線程
// 為了防止死鎖,不能在釋放putLock之前獲取takeLock
if (currentCount == 0) {
signalNotEmpty();
}
}
@Override
public E take() throws InterruptedException {
E headElement;
int currentCount;
// 先嘗試獲得互斥鎖,以進(jìn)入臨界區(qū)
takeLock.lockInterruptibly();
try {
// 因?yàn)楸簧a(chǎn)者喚醒后可能會(huì)被其它的消費(fèi)者消費(fèi)而使得隊(duì)列再次為空,需要循環(huán)的判斷
while(this.count.get() == 0){
notEmpty.await();
}
headElement = dequeue();
currentCount = this.count.getAndDecrement();
// 如果隊(duì)列在彈出一個(gè)元素后仍然非空,則喚醒其他等待隊(duì)列非空的線程
if (currentCount - 1 > 0) {
notEmpty.signal();
}
} finally {
// 出隊(duì)完畢,釋放鎖
takeLock.unlock();
}
// 只有在彈出之前隊(duì)列已滿的情況下才喚醒等待插入元素的線程
// 為了防止死鎖,不能在釋放takeLock之前獲取putLock
if (currentCount == elements.length) {
signalNotFull();
}
return headElement;
}
/**
* 喚醒等待隊(duì)列非空條件的線程
*/
private void signalNotEmpty() {
// 為了喚醒等待隊(duì)列非空條件的線程,需要先獲取對應(yīng)的takeLock
takeLock.lock();
try {
// 喚醒一個(gè)等待非空條件的線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* 喚醒等待隊(duì)列未滿條件的線程
*/
private void signalNotFull() {
// 為了喚醒等待隊(duì)列未滿條件的線程,需要先獲取對應(yīng)的putLock
putLock.lock();
try {
// 喚醒一個(gè)等待隊(duì)列未滿條件的線程
notFull.signal();
} finally {
putLock.unlock();
}
}
3. 不同版本阻塞隊(duì)列的性能測試
前面從v2版本開始,對所實(shí)現(xiàn)的阻塞隊(duì)列進(jìn)行了一系列的優(yōu)化,一直到最終的V5版本實(shí)現(xiàn)了一個(gè)基于雙鎖,雙條件變量的高性能版本。
下面對v3-v5版本進(jìn)行一輪基礎(chǔ)的性能測試(v2無限輪詢性能太差),看看其實(shí)際性能是否真的如博客第二章中所說的那般,高版本的性能是更優(yōu)秀的。同時(shí)令jdk中的ArrayBlockingQueue和LinkedBlockingQueue也實(shí)現(xiàn)MyBlockingQueue,也加入測試。
測試工具類BlockingQueueTestUtil:
public class BlockingQueueTestUtil {
public static long statisticBlockingQueueRuntime(
MyBlockingQueue<Integer> blockingQueue, int workerNum, int perWorkerProcessNum, int repeatTime) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(workerNum * 2);
// 第一次執(zhí)行時(shí)存在一定的初始化開銷,不進(jìn)行統(tǒng)計(jì)
oneTurnExecute(executorService,blockingQueue,workerNum,perWorkerProcessNum);
long totalTime = 0;
for(int i=0; i<repeatTime; i++){
long oneTurnTime = oneTurnExecute(executorService,blockingQueue,workerNum,perWorkerProcessNum);
totalTime += oneTurnTime;
}
executorService.shutdown();
assert blockingQueue.isEmpty();
return totalTime/repeatTime;
}
private static long oneTurnExecute(ExecutorService executorService, MyBlockingQueue<Integer> blockingQueue,
int workerNum, int perWorkerProcessNum) throws InterruptedException {
long startTime = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(workerNum * 2);
// 創(chuàng)建workerNum個(gè)生產(chǎn)者/消費(fèi)者
for(int i=0; i<workerNum; i++){
executorService.execute(()->{
produce(blockingQueue,perWorkerProcessNum);
countDownLatch.countDown();
});
executorService.execute(()->{
consume(blockingQueue,perWorkerProcessNum);
countDownLatch.countDown();
});
}
countDownLatch.await();
long endTime = System.currentTimeMillis();
return endTime - startTime;
}
private static void produce(MyBlockingQueue<Integer> blockingQueue,int perWorkerProcessNum){
try {
// 每個(gè)生產(chǎn)者生產(chǎn)perWorkerProcessNum個(gè)元素
for(int j=0; j<perWorkerProcessNum; j++){
blockingQueue.put(j);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static void consume(MyBlockingQueue<Integer> blockingQueue,int perWorkerProcessNum){
try {
// 每個(gè)消費(fèi)者消費(fèi)perWorkerProcessNum個(gè)元素
for(int j=0; j<perWorkerProcessNum; j++){
blockingQueue.take();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
jdk的ArrayBlockingQueue簡單包裝(JDKArrayBlockingQueue):
public class JDKArrayBlockingQueue<E> implements MyBlockingQueue<E> {
private final BlockingQueue<E> jdkBlockingQueue;
/**
* 指定隊(duì)列大小的構(gòu)造器
*
* @param capacity 隊(duì)列大小
*/
public JDKArrayBlockingQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
jdkBlockingQueue = new ArrayBlockingQueue<>(capacity);
}
@Override
public void put(E e) throws InterruptedException {
jdkBlockingQueue.put(e);
}
@Override
public E take() throws InterruptedException {
return jdkBlockingQueue.take();
}
@Override
public boolean isEmpty() {
return jdkBlockingQueue.isEmpty();
}
@Override
public String toString() {
return "JDKArrayBlockingQueue{" +
"jdkBlockingQueue=" + jdkBlockingQueue +
'}';
}
}
jdk的LinkedBlockingQueue簡單包裝(JDKLinkedBlockingQueue):
public class JDKLinkedBlockingQueue<E> implements MyBlockingQueue<E> {
private final BlockingQueue<E> jdkBlockingQueue;
/**
* 指定隊(duì)列大小的構(gòu)造器
*
* @param capacity 隊(duì)列大小
*/
public JDKLinkedBlockingQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
jdkBlockingQueue = new LinkedBlockingQueue<>(capacity);
}
@Override
public void put(E e) throws InterruptedException {
jdkBlockingQueue.put(e);
}
@Override
public E take() throws InterruptedException {
return jdkBlockingQueue.take();
}
@Override
public boolean isEmpty() {
return jdkBlockingQueue.isEmpty();
}
@Override
public String toString() {
return "JDKLinkedBlockingQueue{" +
"jdkBlockingQueue=" + jdkBlockingQueue +
'}';
}
}
測試主體代碼:
public class BlockingQueuePerformanceTest {
/**
* 隊(duì)列容量
* */
private static final int QUEUE_CAPACITY = 3;
/**
* 并發(fā)線程數(shù)(消費(fèi)者 + 生產(chǎn)者 = 2 * WORKER_NUM)
* */
private static final int WORKER_NUM = 30;
/**
* 單次測試中每個(gè)線程訪問隊(duì)列的次數(shù)
* */
private static final int PER_WORKER_PROCESS_NUM = 3000;
/**
* 重復(fù)執(zhí)行的次數(shù)
* */
private static final int REPEAT_TIME = 5;
public static void main(String[] args) throws InterruptedException {
{
MyBlockingQueue<Integer> myArrayBlockingQueueV3 = new MyArrayBlockingQueueV3<>(QUEUE_CAPACITY);
long avgCostTimeV3 = BlockingQueueTestUtil.statisticBlockingQueueRuntime(myArrayBlockingQueueV3, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME);
System.out.println(costTimeLog(MyArrayBlockingQueueV3.class, avgCostTimeV3));
}
{
MyBlockingQueue<Integer> myArrayBlockingQueueV4 = new MyArrayBlockingQueueV4<>(QUEUE_CAPACITY);
long avgCostTimeV4 = BlockingQueueTestUtil.statisticBlockingQueueRuntime(myArrayBlockingQueueV4, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME);
System.out.println(costTimeLog(MyArrayBlockingQueueV4.class, avgCostTimeV4));
}
{
MyBlockingQueue<Integer> myArrayBlockingQueueV5 = new MyArrayBlockingQueueV5<>(QUEUE_CAPACITY);
long avgCostTimeV5 = BlockingQueueTestUtil.statisticBlockingQueueRuntime(myArrayBlockingQueueV5, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME);
System.out.println(costTimeLog(MyArrayBlockingQueueV5.class, avgCostTimeV5));
}
{
MyBlockingQueue<Integer> jdkArrayBlockingQueue = new JDKArrayBlockingQueue<>(QUEUE_CAPACITY);
long avgCostTimeJDK = BlockingQueueTestUtil.statisticBlockingQueueRuntime(jdkArrayBlockingQueue, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME);
System.out.println(costTimeLog(JDKArrayBlockingQueue.class, avgCostTimeJDK));
}
{
MyBlockingQueue<Integer> jdkLinkedBlockingQueue = new JDKLinkedBlockingQueue<>(QUEUE_CAPACITY);
long avgCostTimeJDK = BlockingQueueTestUtil.statisticBlockingQueueRuntime(jdkLinkedBlockingQueue, WORKER_NUM, PER_WORKER_PROCESS_NUM, REPEAT_TIME);
System.out.println(costTimeLog(JDKLinkedBlockingQueue.class, avgCostTimeJDK));
}
}
private static String costTimeLog(Class blockQueueCLass,long costTime){
return blockQueueCLass.getSimpleName() + " avgCostTime=" + costTime + "ms";
}
}
上述代碼指定的參數(shù)為基于最大容量為3的阻塞隊(duì)列,生產(chǎn)者、消費(fèi)者線程各30個(gè),每個(gè)線程執(zhí)行3000次出隊(duì)或入隊(duì)操作,重復(fù)執(zhí)行5次用于統(tǒng)計(jì)平均時(shí)間。
我的機(jī)器上的運(yùn)行結(jié)果如下:
MyArrayBlockingQueueV3 avgCostTime=843ms
MyArrayBlockingQueueV4 avgCostTime=530ms
MyArrayBlockingQueueV5 avgCostTime=165ms
JDKArrayBlockingQueue avgCostTime=506ms
JDKLinkedBlockingQueue avgCostTime=163ms執(zhí)行時(shí)長v3 > v4 > JDKArrayBlockingQueue > MyArrayBlockingQueueV5 > JDKLinkedBlockingQueue,且v4耗時(shí)大致等于JDKArrayBlockingQueue、v5耗時(shí)大致等于JDKLinkedBlockingQueue。
究其原因是因?yàn)閖dk的ArrayBlockingQueue實(shí)現(xiàn)和V4版本一樣,是基于單鎖,雙條件變量的;而jdk的LinkedBlockingQueue實(shí)現(xiàn)和V5版本一樣,是基于雙鎖,雙條件變量的(V4、V5版本的實(shí)現(xiàn)就是參考的jdk源碼)。
雖然測試的用例不是很全面,但測試結(jié)果和理論大致是吻合的,希望大家通過測試結(jié)果來加深對不同版本間性能差異的背后原理的理解。
4. 為什么jdk中的ArrayBlockingQueue不基于性能更好的雙鎖實(shí)現(xiàn) ?
看到這里,不知你是否和我一樣對為什么jdk的ArrayBlockingQueue使用單鎖而不使用性能更好的雙鎖實(shí)現(xiàn)而感到疑惑。所幸網(wǎng)上也有不少小伙伴有類似的疑問,這里將相關(guān)內(nèi)容簡單梳理一下。
1. 基于數(shù)組實(shí)現(xiàn)的阻塞隊(duì)列(ABQ)是可以采用雙鎖實(shí)現(xiàn)更加高效率的出隊(duì)、入隊(duì)的。但由于jdk中阻塞隊(duì)列是屬于集合Collection的一個(gè)子類,雙鎖版本的ABQ其迭代器會(huì)比單鎖的復(fù)雜很多很多,但在性能上的改善并不那么的可觀。ABQ的實(shí)現(xiàn)在復(fù)雜度和性能上做了一個(gè)折中,選擇了容易實(shí)現(xiàn)但性能稍低的單鎖實(shí)現(xiàn)。
http://jsr166-concurrency.10961.n7.nabble.com/ArrayBlockingQueue-concurrent-put-and-take-tc1306.html
2. 如果對性能有更加苛刻要求的話,可以考慮使用jdk中基于雙鎖實(shí)現(xiàn)的LinkedBlockingQueue(LBQ)。需要注意的是,在高吞吐量的出隊(duì)、入隊(duì)的場景下,LBQ鏈?zhǔn)降慕Y(jié)構(gòu)在垃圾回收時(shí)性能會(huì)略低于基于數(shù)組的,緊湊結(jié)構(gòu)的ABQ。
3. jdk提供了一個(gè)龐大而全面的集合框架,每個(gè)具體的數(shù)據(jù)結(jié)構(gòu)都需要盡可能多的實(shí)現(xiàn)高層的接口和抽象方法。這樣的設(shè)計(jì)對于使用者來說確實(shí)很友好,但也令實(shí)現(xiàn)者背上了沉重的負(fù)擔(dān),必須為實(shí)現(xiàn)一些可能極少使用的接口而花費(fèi)巨大的精力,甚至反過來影響到特定數(shù)據(jù)結(jié)構(gòu)的本身的實(shí)現(xiàn)。ABQ受制于雙鎖版本迭代器實(shí)現(xiàn)的復(fù)雜度,而被迫改為效率更低的單鎖實(shí)現(xiàn)就是一個(gè)典型的例子。
5. 總結(jié)
前段時(shí)間迷上了MIT6.824的數(shù)據(jù)庫課程,在理解了課程所提供的實(shí)驗(yàn)后(共6個(gè)lab)收獲很大,因此想著自己再動(dòng)手實(shí)現(xiàn)一個(gè)更加全面的版本(并發(fā)的B+樹,MVCC多版本控制、行級鎖以及sql解釋器、網(wǎng)絡(luò)協(xié)議等等)。但一段時(shí)間后發(fā)現(xiàn)上述的功能難度很大且實(shí)現(xiàn)起來細(xì)節(jié)很多,這將耗費(fèi)我過多的時(shí)間而被迫放棄了(膨脹了Orz)。在被打擊后,清醒的意識到對于現(xiàn)階段的我來說還是應(yīng)該穩(wěn)扎穩(wěn)打,著眼于更小的知識點(diǎn),通過自己動(dòng)手造輪子的方式加深對知識點(diǎn)的理解,至于擼一個(gè)完善的關(guān)系型數(shù)據(jù)庫這種宏大的目標(biāo)受制于我目前的水平還是暫時(shí)先放放吧。
本篇博客的完整代碼在我的github上:https://github.com/1399852153/Reinventing-the-wheel-for-learning(blocking queue模塊)。后續(xù)應(yīng)該會(huì)陸續(xù)更新關(guān)于自己動(dòng)手實(shí)現(xiàn)線程池、抽象同步隊(duì)列AQS等的博客。
還存在很多不足之處,請多多指教。
鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布
??????
??長按上方微信二維碼 2 秒
感謝點(diǎn)贊支持下哈 
