Java多線程之阻塞隊(duì)列
這里對(duì)Java中的阻塞隊(duì)列及其常見實(shí)現(xiàn)進(jìn)行介紹

楔子
在多線程環(huán)境下實(shí)現(xiàn)一個(gè)線程安全的隊(duì)列,大體可分為兩種思路:基于阻塞機(jī)制的、基于非阻塞機(jī)制的。后者通過CAS算法等手段以避免發(fā)生阻塞,典型地實(shí)現(xiàn)有ConcurrentLinkedQueue、ConcurrentLinkedDeque;前者則是通過鎖的方式來保證線程安全,其會(huì)在隊(duì)列已滿、隊(duì)列為空時(shí)分別阻塞生產(chǎn)者、消費(fèi)者。具體地,Java中則是提供了一個(gè)BlockingQueue阻塞隊(duì)列接口并提供相應(yīng)的實(shí)現(xiàn)類
BlockingQueue接口
BlockingQueue接口通過繼承Queue接口,實(shí)現(xiàn)了對(duì)傳統(tǒng)隊(duì)列操作方式的補(bǔ)充、增強(qiáng)。新增了阻塞、超時(shí)兩種形式的隊(duì)列操作方式。如下表所示
| 隊(duì)列操作 | 拋異常 | 返回特殊值 | 阻塞 | 支持超時(shí) |
|---|---|---|---|---|
| 入隊(duì) | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 出隊(duì) | remove() | poll() | take() | poll(time, unit) |
| 查看隊(duì)首元素 | element() | peek() | N/A | N/A |
前兩種形式(拋異常、返回特殊值)與Queue接口一致,當(dāng)隊(duì)列已滿添加元素失敗時(shí),會(huì)分別拋出異常、返回特殊值false;當(dāng)隊(duì)列為空時(shí),進(jìn)行移除元素或查看隊(duì)首元素時(shí),則會(huì)分別拋出異常、返回特殊值null。對(duì)于阻塞形式而言,其針對(duì)入隊(duì)、出隊(duì)操作分別定義了put、take方法。當(dāng)生產(chǎn)者線程向一個(gè)已滿隊(duì)列通過put方法添加元素時(shí),則其自身將會(huì)被阻塞直到隊(duì)列不為滿;類似地,對(duì)于消費(fèi)者的task方法而言同理,此處不再贅述。對(duì)于支持超時(shí)形式而言,其重載了原有的offer、poll方法,增加了對(duì)超時(shí)參數(shù)的支持。最后對(duì)于Java阻塞隊(duì)列來說,即BlockingQueue接口的實(shí)現(xiàn)類均不支持null值元素
ArrayBlockingQueue
其是一個(gè)基于數(shù)組的阻塞隊(duì)列,底層使用數(shù)組進(jìn)行元素的存儲(chǔ)。創(chuàng)建該阻塞隊(duì)列實(shí)例需要指定隊(duì)列容量,故其是一個(gè)有界隊(duì)列。在并發(fā)控制層面,無論是入隊(duì)還是出隊(duì)操作,均使用同一個(gè)ReentrantLock可重入鎖進(jìn)行控制,換言之生產(chǎn)者線程與消費(fèi)者線程間無法同時(shí)操作
LinkedBlockingQueue
其是一個(gè)基于鏈表的阻塞隊(duì)列,底層使用鏈表進(jìn)行元素的存儲(chǔ)。該阻塞隊(duì)列容量默認(rèn)為 Integer.MAX_VALUE,即如果未顯式設(shè)置隊(duì)列容量時(shí)可以視為是一個(gè)無界隊(duì)列;反之構(gòu)建實(shí)例過程中指定隊(duì)列容量,則其就是一個(gè)有界隊(duì)列。在并發(fā)控制層面,其使用了兩個(gè)ReentrantLock可重入鎖來分別控制對(duì)入隊(duì)、出隊(duì)這兩種類型的操作。使得生產(chǎn)者線程與消費(fèi)者線程間可以同時(shí)操作提高效率。特別地對(duì)于鏈表這種結(jié)構(gòu)而言,Java還提供了一個(gè)實(shí)現(xiàn)BlockingDeque接口的LinkedBlockingDeque類——其是一個(gè)基于鏈表的雙向阻塞隊(duì)列
PriorityBlockingQueue
提到優(yōu)先級(jí)隊(duì)列,我們會(huì)想到PriorityQueue,但其由于不是線程安全的,故無法在多線程環(huán)境下使用。為此Java提供了一個(gè)線程安全版本的優(yōu)先級(jí)隊(duì)列PriorityBlockingQueue,其是一個(gè)支持優(yōu)先級(jí)的無界阻塞隊(duì)列。底層使用數(shù)組實(shí)現(xiàn)元素的存儲(chǔ)、最小堆的表示。默認(rèn)使用元素的自然排序,即要求元素實(shí)現(xiàn)Comparable接口;或者顯式指定比較器Comparator。在并發(fā)控制層面,無論是入隊(duì)還是出隊(duì)操作,均使用同一個(gè)ReentrantLock可重入鎖進(jìn)行控制。值得一提的是,在創(chuàng)建該隊(duì)列實(shí)例時(shí)雖然可以指定容量。但這并不是隊(duì)列的最終容量,而只是該隊(duì)列實(shí)例的初始容量。一旦后續(xù)過程隊(duì)列容量不足,其會(huì)自動(dòng)進(jìn)行擴(kuò)容。值得一提的是,為了保證同時(shí)只有一個(gè)線程進(jìn)行擴(kuò)容,其內(nèi)部是通過CAS方式來實(shí)現(xiàn)的,而不是利用ReentrantLock可重入鎖來控制。故PriorityBlockingQueue是一個(gè)無界隊(duì)列。示例代碼如下所示
@Test
public?void?test1()?{
????BlockingQueue?blockingQueue?=?new?PriorityBlockingQueue<>(2);
????blockingQueue.offer(13);
????blockingQueue.offer(5);
????blockingQueue.offer(7);
????Integer?size?=?blockingQueue.size();
????System.out.println("blockingQueue:?"?+?blockingQueue?+?",?size:?"?+?size);
????Integer?e1?=?blockingQueue.poll();
????System.out.println("e1:?"?+?e1);
????Integer?e2?=?blockingQueue.poll();
????System.out.println("e2:?"?+?e2);
????Integer?e3?=?blockingQueue.poll();
????System.out.println("e3:?"?+?e3);
}
測(cè)試結(jié)果如下所示

DelayQueue
延遲隊(duì)列,一個(gè)無界的阻塞隊(duì)列。顧名思義,元素只有到了其指定的延遲時(shí)間才能出隊(duì),否則消費(fèi)者線程調(diào)用take方法會(huì)被一直阻塞。其底層使用PriorityQueue實(shí)現(xiàn)元素的存儲(chǔ),使用ReentrantLock實(shí)現(xiàn)線程同步。該隊(duì)列中的元素在實(shí)現(xiàn)Delayed接口時(shí)需要同時(shí)實(shí)現(xiàn)getDelay、compareTo方法。前者用于計(jì)算元素當(dāng)前剩余的延遲時(shí)間;后者用于實(shí)現(xiàn)延遲時(shí)間按從小到大進(jìn)行排序,以保證隊(duì)頭元素是延遲時(shí)間最小的。這里我們以緩存數(shù)據(jù)為場(chǎng)景進(jìn)行實(shí)踐,當(dāng)緩存到期后即可被從隊(duì)列中移除。示例代碼如下所示
public?class?BlockingQueueTest?{
????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss");
????@Test
????public?void?test2()?throws?Exception?{
????????BlockingQueue?blockingQueue?=?new?DelayQueue<>();
????????new?Thread(()?->?{
????????????while?(true)?{
????????????????try?{
????????????????????Cache?cache?=?blockingQueue.take();
????????????????????info("消費(fèi)者:?"?+?cache.toString());
????????????????}?catch?(Exception?e)?{
????????????????????System.out.println("Happen?Exception:?"?+?e.getMessage());
????????????????}
????????????}
????????}).start();
????????Long?timeStamp?=?System.currentTimeMillis();
????????Cache?cache1?=?new?Cache("name",?"Aaron",?timeStamp?+?15?*?1000);
????????blockingQueue.put(cache1);
????????Cache?cache2?=?new?Cache("age",?"18",?timeStamp?+?27?*?1000);
????????blockingQueue.put(cache2);
????????Cache?cache3?=?new?Cache("country",?"China",?timeStamp?+?7?*?1000);
????????blockingQueue.put(cache3);
????????Thread.sleep(120?*?1000);
????}
????/**
?????*?打印信息
?????*/
????private?static?void?info(String?msg)?{
????????String?time?=?formatter.format(LocalTime.now());
????????String?thread?=?Thread.currentThread().getName();
????????String?log?=?"["?+?time?+?"]?"?+?msg;
????????System.out.println(log);
????}
????@AllArgsConstructor
????@Data
????private?static?class?Cache?implements?Delayed?{
????????//?緩存?Key
????????private?String?key;
????????//?緩存?Value
????????private?String?value;
????????//?緩存到期時(shí)間
????????private?Long?expire;
????????/**
?????????*?計(jì)算當(dāng)前延遲時(shí)間
?????????*?@param?unit
?????????*?@return
?????????*/
????????@Override
????????public?long?getDelay(TimeUnit?unit)?{
????????????//?緩存有效的剩余毫秒數(shù)
????????????long?delta?=?expire?-?System.currentTimeMillis();
????????????return?unit.convert(delta,?TimeUnit.MILLISECONDS);
????????}
????????/**
?????????*?定義比較規(guī)則,?延遲時(shí)間按從小到大進(jìn)行排序
?????????*?@param?o
?????????*?@return
?????????*/
????????@Override
????????public?int?compareTo(Delayed?o)?{
????????????Cache?other?=?(Cache)?o;
????????????return?this.getExpire().compareTo(other.getExpire());
????????}
????????@Override
????????public?String?toString()?{
????????????Date?time?=?new?Date(expire);
????????????SimpleDateFormat?formatter?=?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss.SSS");
????????????String?timeStr?=?formatter.format(time);
????????????return?"Cache,?key:?"?+?key?+?",?expire:?"?+?timeStr;
????????}
????}
}
測(cè)試結(jié)果如下所示

SynchronousQueue
其是一個(gè)同步隊(duì)列。特別地是由于該隊(duì)列沒有容量無法存儲(chǔ)元素,故生產(chǎn)者添加的數(shù)據(jù)會(huì)直接被消費(fèi)者獲取并且立刻消費(fèi)。所以當(dāng)生產(chǎn)者線程添加數(shù)據(jù)時(shí),如果此時(shí)恰好有一個(gè)消費(fèi)者已經(jīng)準(zhǔn)備好獲取隊(duì)頭元素了,則會(huì)添加成功;否則要么添加失敗返回false要么被阻塞。通過Executors.newCachedThreadPool()創(chuàng)建的線程池實(shí)例,其內(nèi)部任務(wù)隊(duì)列使用的就是SynchronousQueue,故offer方法添加任務(wù)到隊(duì)列失敗后則會(huì)開啟新的線程來進(jìn)行處理。關(guān)于同步隊(duì)列的這一特性,通過下面的示例可以幫助我們更好的理解
@Test
public?void?test3()?{
????BlockingQueue?blockingQueue?=?new?SynchronousQueue<>();
????Boolean?b1?=?blockingQueue.offer(237);
????info("生產(chǎn)者?b1:?"?+?b1);
????//?消費(fèi)者線程
????new?Thread(?()->{
????????try{
????????????Integer?e?=?blockingQueue.take();
????????????info("消費(fèi)者:"?+?e);
????????}?catch?(Exception?e)?{
????????????info("Happen?Exception:?"?+?e.getMessage());
????????}
????}?).start();
????//?確保消費(fèi)者線程已經(jīng)準(zhǔn)備完畢
????try?{?Thread.sleep(2000);?}?catch?(Exception?e)?{}
????Boolean?b2?=?blockingQueue.offer(996);
????info("生產(chǎn)者?b2:?"?+?b2);
????try?{?Thread.sleep(120*1000);?}?catch?(Exception?e)?{}
}
測(cè)試結(jié)果如下,符合預(yù)期。生產(chǎn)者第一次添加元素結(jié)果失敗,原因很簡單。因?yàn)橥疥?duì)列沒有存儲(chǔ)元素的能力,故如果沒有消費(fèi)者直接取走,則生產(chǎn)者即會(huì)添加失??;第二次添加時(shí),消費(fèi)者線程已經(jīng)在阻塞等待了,故添加成功

下面我們利用阻塞的put方法來添加元素,示例代碼如下所示
@Test
public?void?test4()?{
????BlockingQueue?blockingQueue?=?new?SynchronousQueue<>();
????//?生產(chǎn)者線程
????new?Thread(()?->?{
????????try?{
????????????info("生產(chǎn)者:?Start");
????????????while?(true)?{
????????????????Integer?num?=?RandomUtil.randomInt(1,?100);
????????????????info("生產(chǎn)者:?put?"?+?num);
????????????????blockingQueue.put(num);
????????????}
????????}?catch?(Exception?e)?{
????????????info("Happen?Exception:?"?+?e.getMessage());
????????}
????}).start();
????//?消費(fèi)者線程
????new?Thread(()?->?{
????????try?{
????????????info("消費(fèi)者:?Start");
????????????while?(true)?{
????????????????try?{
????????????????????Thread.sleep(5000);
????????????????}?catch?(Exception?e)?{
????????????????}
????????????????Integer?e?=?blockingQueue.take();
????????????????info("消費(fèi)者:?"?+?e);
????????????}
????????}?catch?(Exception?e)?{
????????????info("Happen?Exception:?"?+?e.getMessage());
????????}
????}).start();
????try?{?Thread.sleep(120?*?1000);?}?catch?(Exception?e)?{}
}
從測(cè)試結(jié)果中的時(shí)間戳,可以很明顯看出只有當(dāng)消費(fèi)者取出元素,生產(chǎn)者線程的put方法才會(huì)結(jié)束阻塞

參考文獻(xiàn)
Java并發(fā)編程之美 翟陸續(xù)、薛賓田著
