【并發(fā)編程】聊聊阻塞隊列那些事(一)
在線程池的手動創(chuàng)建一文中 ,我們介紹了通過 ThreadPoolExecutor創(chuàng)建線程池,其中有一個參數(shù)是BlockingQueue< Runnable> workQueue,這個參數(shù)的數(shù)據(jù)類型就是我們今天要介紹的阻塞隊列, 其作用是當線程池的核心線程都在執(zhí)行任務時,此時再有任務提交時用來存放任務。
阻塞隊列簡介
隊列
在介紹阻塞隊列之前,我們先來簡單介紹下隊列這種數(shù)據(jù)結構。
隊列是一種先進先出的線性表,它只允許在表的前端(front)進行刪除操作,而在表的后端(rear)進行插入操作,進行插入操作的端稱為隊尾,進行刪除操作的端稱為隊頭

阻塞隊列
阻塞隊列(BlockingQueue)是一種特殊的隊列,其特殊之處在于:在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强铡.旉犃袧M時,存儲元素的線程會等待隊列可用。Java中,阻塞隊列它是一個繼承了 Queue 接口的接口,也可以證明阻塞隊列就是一種特殊隊列。
public interface BlockingQueue<E> extends Queue<E>{
...
}阻塞隊列常用于生產(chǎn)者和消費者的場景,我們先簡單了解下生產(chǎn)者-消費者模型:兩個線程操作一個隊列,一個線程往隊列中插入數(shù)據(jù)(生產(chǎn)線程生產(chǎn)數(shù)據(jù));一個線程往隊列中取出數(shù)據(jù)(消費線程消費數(shù)據(jù))。
生產(chǎn)者的生產(chǎn)速度和消費者的消費之間的速度可能不匹配,就可以通過阻塞隊列讓速度快的暫時阻塞, 如生產(chǎn)者每秒生產(chǎn)兩個數(shù)據(jù),而消費者每秒消費一個數(shù)據(jù),當隊列已滿時,生產(chǎn)者就會阻塞(掛起),等待消費者消費后,再進行喚醒。

阻塞隊列特點
阻塞隊列線程安全
阻塞隊列是線程安全的,我們在程序中使用阻塞隊列不需要自己去考慮更多的線程安全問題。降低了我們開發(fā)的難度和工作量。
比如在生產(chǎn)者-消費者模式使用阻塞隊列的時候,因為阻塞隊列是線程安全的,所以生產(chǎn)者和消費者即便 是多線程環(huán)境,自己不需要去考慮更多的線程安全問題,也不會發(fā)生線程安全問題。如下圖, 左側有兩個生產(chǎn)者線程,它們會將生產(chǎn)出來的結果放到中間的阻塞隊列中,而右側的兩個消費者只需直接從阻塞隊列中取出結果進行處理即可。

阻塞隊列種類
下圖展示了Queue 最主要的實現(xiàn)類,可以看出阻塞隊列主要 有6種實現(xiàn)類,分別是 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue 和 LinkedTransferQueue,將在下一篇文章再介紹它們各自的特點 。

阻塞隊列的容量
阻塞隊列分為有界和無界兩種。LinkedBlockingQueue 的上限是 Integer.MAX_VALUE,是非常大的一個數(shù),可以近似認為是無限容量的 隊列,F(xiàn)ixedThreadPool類型線程池的實現(xiàn)就是用一個LinkedBlockingQueue來存放任務的。
有界阻塞隊列典型的代表是ArrayBlockingQueue,使用這種阻塞隊列 ,一旦隊列容量滿了,就無法再往隊列里放數(shù)據(jù)了。
阻塞隊列常用方法
阻塞隊列中有三組和添加、刪除相關的方法,這三組方法比較相似,我們需要對這些方法進行梳理。這三組方法是:
? 1.拋出異常:add、remove、element
? 2.返回結果但不拋出異常:offer、poll、peek
? 3.阻塞:put、take
add、remove、element
? 1.add:往隊列里添加一個元素,如果隊列滿了,就會拋出異常來提示隊列已滿;
? 2.remove:刪除元素,如果我們刪除的隊列是空的,remove 方法就會拋出異常;
? 3.element:返回隊列的頭部節(jié)點,但是并不刪除,如果隊列為空,element就會拋出異常。
三個方法示例代碼如下:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//add方法示例
public class AddExample {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(2);
blockingQueue.add("1");
blockingQueue.add("2");
System.out.println("blockingQueue size:" + blockingQueue.size());
blockingQueue.add("3");
}
}
******************【運行結果】******************
blockingQueue size:2
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.base/java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:326)
at thread.blockingqueue.AddExample.main(AddExample.java:13)
在這段程序中,我們創(chuàng)建了一個容量為 2 的 BlockingQueue,通過add方法放入元素, 前2個能正常放入 ,但是在添加第3個元素的時候就拋出了IllegalStateException:Queue full異常。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//remove方法示例
public class RemoveExample {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(1);
blockingQueue.add("1");
System.out.println("remove element:" + blockingQueue.remove());
blockingQueue.remove();
}
}
******************【運行結果】******************
remove element:1
Exception in thread "main" java.util.NoSuchElementException
at java.base/java.util.AbstractQueue.remove(AbstractQueue.java:117)
at thread.blockingqueue.RemoveExample.main(RemoveExample.java:14)
這段程序中,我們創(chuàng)建了一個容量為1的BlockingQueue并往中間放入一個元素,然后通過remove方法執(zhí)行刪除操作,刪除第一個元素正常,此時BlockingQueue為空,再執(zhí)行刪除操作,拋出NoSuchElementException異常。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//element示例
public class ElementExample {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(1);
blockingQueue.element();
}
}
******************【運行結果】******************
Exception in thread "main" java.util.NoSuchElementException
at java.base/java.util.AbstractQueue.element(AbstractQueue.java:136)
at thread.blockingqueue.ElementExample.main(ElementExample.java:11)
這段程序中,我們新建了一個容量為 1 的 ArrayBlockingQueue,但是并沒有往里面添加元素,也就是說ArrayBlockingQueue為空,我們調(diào)用 element 方法,也得到NoSuchElementException異常。
offer、poll、peek
這一組方法當發(fā)現(xiàn)隊列滿了無法添加,或者隊列為空無法刪除的時候,是通過返回值來提示我們,而不是像第一組方法那樣拋出異常。
? 1.offer:用來插入一個元素,用返回值來提示插入是否成功。添加成功返回 true,隊列已經(jīng)滿,調(diào)用 offer 方法返回false。
? 2.poll:移除并返回隊列的頭節(jié)點,如果當隊列里面是空的返回 null 作為提示。
? 3.peek:返回隊列的頭元素但并不刪除。如果隊列里面是空的,它便會返回 null 作為提示。
三個方法示例代碼如下:
//offer示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class OfferExample {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(2);
boolean res1 = blockingQueue.offer("1");
boolean res2 = blockingQueue.offer("2");
boolean res3 = blockingQueue.offer("3");
System.out.println("res1:" + res1);
System.out.println("res2:" + res2);
System.out.println("res3:" + res3);
System.out.println("blockingQueue size:" + blockingQueue.size());
}
}
******************【運行結果】******************
res1:true
res2:true
res3:false
blockingQueue size:2
在這段程序中,我們創(chuàng)建了一個容量為 2 的 BlockingQueue,通過offer方法放入元素,前面兩次添加成功了,返回true,但是第三次添加的時候,已經(jīng)超過了隊列的最大容量,所以會返回 false。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//poll示例
public class PollExample {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(1);
blockingQueue.offer("1");
//不允許放入null,否則會拋出NullPointerException異常
//blockingQueue.offer(null);
System.out.println("poll element:" + blockingQueue.poll());
System.out.println("poll element:" + blockingQueue.poll());
}
}
******************【運行結果】******************
poll element:1
poll element:null
這段程序中,我們創(chuàng)建了一個容量為1的BlockingQueue并往中間放入一個元素,然后執(zhí)行poll方法,正常執(zhí)行將頭結點返回并刪除,此時BlockingQueue為空,再執(zhí)行poll操作,返回null。注意,在使用這組方法時,隊列中不允許插入null值,否則會拋出NullPointerException異常。
//peek示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class PeekExample {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(1);
System.out.println(blockingQueue.peek());
}
}
******************【運行結果】******************null
我們新建了一個容量為 1 的 ArrayBlockingQueue,但是并沒有往里面添加元素,然后直接調(diào)用 peek,返回結果 null。
put、take
? 1.put:插入元素。隊列沒滿的時候是正常的插入,如果隊列已滿,插入的線程陷入阻塞狀態(tài),直到隊列里有了空閑空間,此時隊列就會讓之前的線程解除阻塞狀態(tài),并把剛才那個元素添加進去。
? 2.take:獲取并移除隊列的頭結點。隊列里有數(shù)據(jù)的時候會正常取出數(shù)據(jù)并刪除;隊列里無數(shù)據(jù),則阻塞,直到隊列里有數(shù)據(jù);一旦隊列里有數(shù)據(jù)了,就會立刻解除阻塞狀態(tài),并且取到數(shù)據(jù)。
兩個個方法示例代碼如下:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//put示例
public class PutExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(2);
blockingQueue.put("1");
blockingQueue.put("2");
System.out.println("【0】blockingQueue size:" + blockingQueue.size());
blockingQueue.put("3");
System.out.println("【1】blockingQueue size:" + blockingQueue.size());
}
}
******************【運行結果】******************
【0】blockingQueue size:2
//阻塞中
在這段程序中,我們創(chuàng)建了一個容量為 2 的 BlockingQueue,通過put方法添加完前2個元素,【0】處打印隊列的元素個數(shù)是2,說明前面兩個元素正常放入到隊列了。當執(zhí)行第三次put操作,則由于隊列已滿使得線程阻塞,后面的【1】處打印得不到執(zhí)行 。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
//take示例
public class TakeExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue =
new ArrayBlockingQueue<String>(1);
blockingQueue.take();
System.out.println("測試能否執(zhí)行到這一句");
}
}
******************【運行結果】******************//阻塞中
我們新建了一個容量為 1 的 ArrayBlockingQueue,但是并沒有往里面添加元素,然后直接調(diào)用take,線程阻塞,打印語句得不到執(zhí)行。
本文源碼地址:
https://github.com/qinlizhong1/javaStudy/tree/master/javaExample/src/main/java/thread/blockingqueue
本文示例代碼環(huán)境:
操作系統(tǒng):macOs 12.1
JDK版本:12.0.1
maven版本: 3.8.4
— 完 —
歡迎關注↓↓↓ 如有幫助,辛苦點贊和在看
