多線程必考的「生產(chǎn)者 - 消費者」模型,看這篇文章就夠了
生產(chǎn)者 - 消費者模型 Producer-consumer problem 是一個非常經(jīng)典的多線程并發(fā)協(xié)作的模型,在分布式系統(tǒng)里非常常見。也是面試中無論中美大廠都非常愛考的一個問題,對應屆生問的要少一些,但是對于有工作經(jīng)驗的工程師來說,非常愛考。
這個問題有非常多的版本和解決方式,在本文我重點是和大家壹齊理清思路,由淺入深的思考問題,保證大家看完了都能有所收獲。
問題背景
簡單來說,這個模型是由兩類線程構成:
生產(chǎn)者線程:“生產(chǎn)”產(chǎn)品,并把產(chǎn)品放到一個隊列里; 消費者線程:“消費”產(chǎn)品。 隊列:數(shù)據(jù)緩存區(qū)。

有了這個隊列,生產(chǎn)者就只需要關注生產(chǎn),而不用管消費者的消費行為,更不用等待消費者線程執(zhí)行完;消費者也只管消費,不用管生產(chǎn)者是怎么生產(chǎn)的,更不用等著生產(chǎn)者生產(chǎn)。
所以該模型實現(xiàn)了生產(chǎn)者和消費者之間的解藕和異步。
什么是異步呢?
比如說你和你女朋友打電話,就得等她接了電話你們才能說話,這是同步。
但是如果你跟她發(fā)微信,并不需要等她回復,她也不需要立刻回復,而是等她有空了再回,這就是異步。
但是呢,生產(chǎn)者和消費者之間也不能完全沒有聯(lián)系的。
如果隊列里的產(chǎn)品已經(jīng)滿了,生產(chǎn)者就不能繼續(xù)生產(chǎn); 如果隊列里的產(chǎn)品從無到有,生產(chǎn)者就得通知一下消費者,告訴它可以來消費了; 如果隊列里已經(jīng)沒有產(chǎn)品了,消費者也無法繼續(xù)消費; 如果隊列里的產(chǎn)品從滿到不滿,消費者也得去通知下生產(chǎn)者,說你可以來生產(chǎn)了。
所以它們之間還需要有協(xié)作,最經(jīng)典的就是使用 Object 類里自帶的 wait() 和 notify() 或者 notifyAll() 的消息通知機制。
上述描述中的等著,其實就是用 wait() 來實現(xiàn)的;
而通知,就是 notify() 或者 notifyAll() 。
那么基于這種消息通知機制,我們還能夠平衡生產(chǎn)者和消費者之間的速度差異。
如果生產(chǎn)者的生產(chǎn)速度很慢,但是消費者消費的很快,就像是我們每月工資就發(fā)兩次,但是每天都要花錢,也就是 1:15.
那么我們就需要調(diào)整生產(chǎn)者(發(fā)工資)為 15 個線程,消費者保持 1 個線程,這樣是不是很爽~
總結下該模型的三大優(yōu)點:
解藕,異步,平衡速度差異。
wait()/notify()
接下來我們需要重點看下這個通知機制。
wait() 和 notify() 都是 Java 中的 Object 類自帶的方法,可以用來實現(xiàn)線程間的通信。
在上一節(jié)講的 11 個 APIs 里我也提到了它,我們這里再展開講一下。
wait() 方法是用來讓當前線程等待,直到有別的線程調(diào)用 notify() 將它喚醒,或者我們可以設定一個時間讓它自動蘇醒。
調(diào)用該方法之前,線程必須要獲得該對象的對象監(jiān)視器鎖,也就是只能用在加鎖的方法下。
而調(diào)用該方法之后,當前線程會釋放鎖。(提示:這里很重要,也是下文代碼中用 while 而非 if 的原因。)
notify() 方法只能通知一個線程,如果多個線程在等待,那就喚醒任意一個。
notifyAll() 方法是可以喚醒所有等待線程,然后加入同步隊列。

這里我們用到了 2 個隊列:
同步隊列:對應于我們上一節(jié)講的線程狀態(tài)中的 Runnable,也就是線程準備就緒,就等著搶資源了。等待隊列:對應于我們上一節(jié)講的線程狀態(tài)中的 Waiting,也就是等待狀態(tài)。
這里需要注意,從等待狀態(tài)線程無法直接進入 Q2,而是要先重新加入同步隊列,再次等待拿鎖,拿到了鎖才能進去 Q2;一旦出了 Q2,鎖就丟了。
在 Q2 里,其實只有一個線程,因為這里我們必須要加鎖才能進行操作。
實現(xiàn)
這里我首先建了一個簡單的 Product 類,用來表示生產(chǎn)和消費的產(chǎn)品,大家可以自行添加更多的 fields。
public?class?Product??{
????private?String?name;
????public?Product(String?name)?{
????????this.name?=?name;
????}
????public?String?getName()?{
????????return?name;
????}
????public?void?setName(String?name)?{
????????this.name?=?name;
????}
}
主函數(shù)里我設定了兩類線程,并且這里選擇用普通的 ArrayDeque 來實現(xiàn) Queue,更簡單的方式是直接用 Java 中的 BlockingQueue 來實現(xiàn)。
BlockingQueue 是阻塞隊列,它有一系列的方法可以讓線程實現(xiàn)自動阻塞,常用的 BlockingQueue 有很多,后面會單獨出一篇文章來講。
這里為了更好的理解并發(fā)協(xié)同的這個過程,我們先自己處理。
public?class?Test?{
????public?static?void?main(String[]?args)?{
????????Queue?queue?=?new?ArrayDeque<>();
????????for?(int?i?=?0;?i?100;?i++)?{
????????????new?Thread(new?Producer(queue,?100)).start();
????????????new?Thread(new?Consumer(queue,?100)).start();
????????}
????}
}
然后就是 Producer 和 Consumer 了。
public?class?Producer?implements?Runnable{
????private?Queue?queue;
????private?int?maxCapacity;
????public?Producer(Queue?queue,?int?maxCapacity)?{
????????this.queue?=?queue;
????????this.maxCapacity?=?maxCapacity;
????}
????@Override
????public?void?run()?{
????????synchronized?(queue)?{
????????????while?(queue.size()?==?maxCapacity)?{?//一定要用?while,而不是?if,下文解釋
????????????????try?{
????????????????????System.out.println("生產(chǎn)者"?+?Thread.currentThread().getName()?+?"等待中...?Queue?已達到最大容量,無法生產(chǎn)");
????????????????????wait();
????????????????????System.out.println("生產(chǎn)者"?+?Thread.currentThread().getName()?+?"退出等待");
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????if?(queue.size()?==?0)?{?//隊列里的產(chǎn)品從無到有,需要通知在等待的消費者
????????????????queue.notifyAll();
????????????}
????????????Random?random?=?new?Random();
????????????Integer?i?=?random.nextInt();
????????????queue.offer(new?Product("產(chǎn)品"??+?i.toString()));
????????????System.out.println("生產(chǎn)者"?+?Thread.currentThread().getName()?+?"生產(chǎn)了產(chǎn)品:"?+?i.toString());
????????}
????}
}
其實它的主邏輯很簡單,我這里為了方便演示加了很多打印語句才顯得有點復雜。
我們把主要邏輯拎出來看:
?public?void?run()?{
????????synchronized?(queue)?{
????????????while?(queue.size()?==?maxCapacity)?{?//一定要用?while,而不是?if,下文解釋
????????????????try?{
????????????????????wait();
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????if?(queue.size()?==?0)?{
????????????????queue.notifyAll();
????????????}
????????????queue.offer(new?Product("產(chǎn)品"??+?i.toString()));
????????}
????}
}
這里有 3 塊內(nèi)容,再對照這個過程來看:

生產(chǎn)者線程拿到鎖后,其實就是進入了 Q2階段。首先檢查隊列是否容量已滿,如果滿了,那就要去Q3等待;如果不滿,先檢查一下隊列原本是否為空,如果原來是空的,那就需要通知消費者; 最后生產(chǎn)產(chǎn)品。
這里有個問題,為什么只能用 while 而不是 if?
其實在這一小段,生產(chǎn)者線程經(jīng)歷了幾個過程:
如果隊列已滿,它就沒法生產(chǎn),那也不能占著位置不做事,所以要把鎖讓出來,去 Q3 - 等待隊列等著;在等待隊列里被喚醒之后,不能直接奪過鎖來,而是要先加入 Q1 - 同步隊列等待資源;一旦搶到資源,關門上鎖,才能來到 Q2繼續(xù)執(zhí)行wait()之后的活,但是,此時這個隊列有可能又滿了,所以退出wait()之后,還需要再次檢查queue.size() == maxCapacity這個條件,所以要用while。
那么為什么可能又滿了呢?
因為線程沒有一直拿著鎖,在被喚醒之后,到拿到鎖之間的這段時間里,有可能其他的生產(chǎn)者線程先拿到了鎖進行了生產(chǎn),所以隊列又經(jīng)歷了一個從不滿到滿的過程。
總結:在使用線程的等待通知機制時,一般都要在 while 循環(huán)中調(diào)用 wait() 方法。
消費者線程是完全對稱的,我們來看代碼。
public?class?Consumer?implements?Runnable{
????private?Queue?queue;
????private?int?maxCapacity;
????public?Consumer(Queue?queue,?int?maxCapacity)?{
????????this.queue?=?queue;
????????this.maxCapacity?=?maxCapacity;
????}
????@Override
????public?void?run()?{
????????synchronized?(queue)?{
????????????while?(queue.isEmpty())?{
????????????????try?{
????????????????????System.out.println("消費者"?+?Thread.currentThread().getName()?+?"等待中...?Queue?已缺貨,無法消費");
????????????????????wait();
????????????????????System.out.println("消費者"?+?Thread.currentThread().getName()?+?"退出等待");
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????if?(queue.size()?==?maxCapacity)?{
????????????????queue.notifyAll();
????????????}
????????????Product?product?=?queue.poll();
????????????System.out.println("消費者"?+?Thread.currentThread().getName()?+?"消費了:"?+?product.getName());
????????}
????}
}
結果如下:

小結
生產(chǎn)者 - 消費者問題是面試中經(jīng)常會遇到的題目,本文首先講了該模型的三大優(yōu)點:解藕,異步,平衡速度差異,然后講解了等待/通知的消息機制以及在該模型中的應用,最后進行了代碼實現(xiàn)。
完
? ? ? ?
???覺得不錯,點個在看~

