到底什么是阻塞隊(duì)列?有啥用?
點(diǎn)擊關(guān)注公眾號(hào),Java干貨及時(shí)送達(dá)
作者:海向
地址:www.cnblogs.com/haixiang/p/12354520.html
什么是阻塞隊(duì)列
阻塞隊(duì)列是一個(gè)支持阻塞的插入和移除的隊(duì)列。
支持阻塞的插入方法:意思是當(dāng)隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不滿。 支持阻塞的移除方法:意思是隊(duì)列為空時(shí),獲取元素(同時(shí)移除元素)的線程會(huì)被阻塞,等到隊(duì)列變?yōu)榉强铡?/section>
阻塞隊(duì)列用法
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是向隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里獲取元素的線程。
當(dāng)阻塞隊(duì)列不可用時(shí),會(huì)有四種相應(yīng)的處理方式:
| 處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時(shí)退出 |
|---|---|---|---|---|
| 插入操作 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 移除操作 | remove() | poll() | take() | poll(time, unit) |
| 獲取操作 | element() | peek() | 不可用 | 不可用 |
返回特殊值:插入元素時(shí),會(huì)返回是否插入成功,成功返回true。如果是移除方法,則是從隊(duì)列中取出一個(gè)元素,沒有則返回null。 一直阻塞:當(dāng)阻塞隊(duì)列滿時(shí),如果生產(chǎn)者線程往隊(duì)列里面put元素,則生產(chǎn)者線程會(huì)被阻塞,知道隊(duì)列不滿或者響應(yīng)中斷退出。當(dāng)隊(duì)列為空時(shí),如果消費(fèi)者線程從隊(duì)列里take元素。 超時(shí)退出:當(dāng)阻塞隊(duì)列滿時(shí),如果生產(chǎn)者線程往隊(duì)列里插入元素,隊(duì)列會(huì)阻塞生產(chǎn)者線程一段時(shí)間,如果超過了指定時(shí)間,生產(chǎn)者線程就會(huì)退出。
如果是無界阻塞隊(duì)列,隊(duì)列則不會(huì)出現(xiàn)滿的情況。
阻塞隊(duì)列
ArrayBlockingQueue:一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列 LinkedBlockingQueue:一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列 PriorityBlockingQueue:一個(gè)支持優(yōu)先級(jí)排序的無界阻塞隊(duì)列 DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無界阻塞隊(duì)列 SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列 LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無界阻塞隊(duì)列 LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列
1.ArrayBlockingQueue
此隊(duì)列按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序
默認(rèn)情況下不保證線程公平地訪問隊(duì)列(所謂公平是指當(dāng)隊(duì)列可用時(shí),先被阻塞的線程先訪問隊(duì)列)
為了保證公平性通常會(huì)降低吞吐量。
公平鎖是利用了可重入鎖的公平鎖來實(shí)現(xiàn)。
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
2.ArrayBlockingQueue
此隊(duì)列按照先進(jìn)先出(FIFO)的原則對(duì)元素進(jìn)行排序
默認(rèn)長(zhǎng)度為Integer.MAX_VALUE
3.PriorityBlockingQueue
默認(rèn)情況下元素采取自然順序升序排列
可以自定義Comparator或者自定義類實(shí)現(xiàn)compareTo()方法來指定排序規(guī)則
不支持同優(yōu)先級(jí)元素排序
4.DelayQueue
隊(duì)列使用PriorityQueue來實(shí)現(xiàn),隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口
只有在延時(shí)期滿才能從隊(duì)列中提取元素。Java 核心技術(shù)知識(shí)點(diǎn)我都整理成面試題和答案了,關(guān)注公眾號(hào)Java技術(shù)棧,回復(fù):面試,可以免費(fèi)獲取。
阻塞隊(duì)列原理
如果隊(duì)列是空的,消費(fèi)者會(huì)一直等待,當(dāng)生產(chǎn)者添加元素時(shí),消費(fèi)者是如何知道當(dāng)前隊(duì)列有元素的呢?
使用通知模式實(shí)現(xiàn)。所謂通知模式,就是當(dāng)生產(chǎn)者往滿的隊(duì)列添加元素時(shí)會(huì)阻塞住生產(chǎn)者,當(dāng)消費(fèi)者消費(fèi)了一個(gè)隊(duì)列中的元素后,會(huì)通知生產(chǎn)者當(dāng)前隊(duì)列可用。
以ArrayBlockingQueue為例子
/** items 存放隊(duì)列中的元素*/
final Object[] items;
/** take 操作的索引 */
int takeIndex;
/** put 操作的索引 */
int putIndex;
/** 隊(duì)列中元素個(gè)數(shù) */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** 控制生產(chǎn)者 takes 操作的 Condition */
private final Condition notEmpty;
/** 控制消費(fèi)者 put 操作的 Condition */
private final Condition notFull;
put操作
public void put(E e) throws InterruptedException {
checkNotNull(e); //判斷 e == null
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //獲取鎖,與lock不同,可以嘗試中斷阻塞
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
入隊(duì)操作,入隊(duì)之后喚醒消費(fèi)者線程。
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
notFull.await();中其實(shí)調(diào)用了park方法,先使用setBlocker保存一下將要阻塞的線程,然后調(diào)用本地方法UNSAFE.park(boolean isAbsolute, long time)進(jìn)行阻塞。
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}






關(guān)注Java技術(shù)棧看更多干貨


