<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          多線程必考的「生產(chǎn)者 - 消費(fèi)者」模型,看齊姐這篇文章就夠了

          共 2171字,需瀏覽 5分鐘

           ·

          2020-10-01 19:55

          這里是《壹齊學(xué)多線程》系列的第 3 篇

          生產(chǎn)者 - 消費(fèi)者模型 Producer-consumer problem 是一個(gè)非常經(jīng)典的多線程并發(fā)協(xié)作的模型,在分布式系統(tǒng)里非常常見。也是面試中無論中美大廠都非常愛考的一個(gè)問題,對(duì)應(yīng)屆生問的要少一些,但是對(duì)于有工作經(jīng)驗(yàn)的工程師來說,非常愛考。

          這個(gè)問題有非常多的版本和解決方式,在本文我重點(diǎn)是和大家壹齊理清思路,由淺入深的思考問題,保證大家看完了都能有所收獲。

          問題背景

          簡(jiǎn)單來說,這個(gè)模型是由兩類線程構(gòu)成:

          • 生產(chǎn)者線程:“生產(chǎn)”產(chǎn)品,并把產(chǎn)品放到一個(gè)隊(duì)列里;
          • 消費(fèi)者線程:“消費(fèi)”產(chǎn)品。
          • 隊(duì)列:數(shù)據(jù)緩存區(qū)。

          有了這個(gè)隊(duì)列,生產(chǎn)者就只需要關(guān)注生產(chǎn),而不用管消費(fèi)者的消費(fèi)行為,更不用等待消費(fèi)者線程執(zhí)行完;消費(fèi)者也只管消費(fèi),不用管生產(chǎn)者是怎么生產(chǎn)的,更不用等著生產(chǎn)者生產(chǎn)。

          所以該模型實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者之間的解藕異步

          什么是異步呢?

          比如說你和你女朋友打電話,就得等她接了電話你們才能說話,這是同步。

          但是如果你跟她發(fā)微信,并不需要等她回復(fù),她也不需要立刻回復(fù),而是等她有空了再回,這就是異步。

          但是呢,生產(chǎn)者和消費(fèi)者之間也不能完全沒有聯(lián)系的。

          • 如果隊(duì)列里的產(chǎn)品已經(jīng)滿了,生產(chǎn)者就不能繼續(xù)生產(chǎn);
          • 如果隊(duì)列里的產(chǎn)品從無到有,生產(chǎn)者就得通知一下消費(fèi)者,告訴它可以來消費(fèi)了;
          • 如果隊(duì)列里已經(jīng)沒有產(chǎn)品了,消費(fèi)者也無法繼續(xù)消費(fèi);
          • 如果隊(duì)列里的產(chǎn)品從滿到不滿,消費(fèi)者也得去通知下生產(chǎn)者,說你可以來生產(chǎn)了。

          所以它們之間還需要有協(xié)作,最經(jīng)典的就是使用 Object 類里自帶的 wait()notify() 或者 notifyAll() 的消息通知機(jī)制。

          上述描述中的等著,其實(shí)就是用 wait() 來實(shí)現(xiàn)的;

          通知,就是 notify() 或者 notifyAll()

          那么基于這種消息通知機(jī)制,我們還能夠平衡生產(chǎn)者和消費(fèi)者之間的速度差異

          如果生產(chǎn)者的生產(chǎn)速度很慢,但是消費(fèi)者消費(fèi)的很快,就像是我們每月工資就發(fā)兩次,但是每天都要花錢,也就是 1:15.

          那么我們就需要調(diào)整生產(chǎn)者(發(fā)工資)為 15 個(gè)線程,消費(fèi)者保持 1 個(gè)線程,這樣是不是很爽~

          總結(jié)下該模型的三大優(yōu)點(diǎn):
          解藕,異步,平衡速度差異。

          wait()/notify()

          接下來我們需要重點(diǎn)看下這個(gè)通知機(jī)制。

          wait()notify() 都是 Java 中的 Object 類自帶的方法,可以用來實(shí)現(xiàn)線程間的通信。

          上一節(jié)講的 11 個(gè) APIs 里我也提到了它,我們這里再展開講一下。

          wait() 方法是用來讓當(dāng)前線程等待,直到有別的線程調(diào)用 notify() 將它喚醒,或者我們可以設(shè)定一個(gè)時(shí)間讓它自動(dòng)蘇醒。

          調(diào)用該方法之前,線程必須要獲得該對(duì)象的對(duì)象監(jiān)視器鎖,也就是只能用在加鎖的方法下。

          而調(diào)用該方法之后,當(dāng)前線程會(huì)釋放鎖。(提示:這里很重要,也是下文代碼中用 while 而非 if 的原因。)

          notify() 方法只能通知一個(gè)線程,如果多個(gè)線程在等待,那就喚醒任意一個(gè)。

          notifyAll() 方法是可以喚醒所有等待線程,然后加入同步隊(duì)列。

          這里我們用到了 2 個(gè)隊(duì)列:

          • 同步隊(duì)列:對(duì)應(yīng)于我們上一節(jié)講的線程狀態(tài)中的 Runnable,也就是線程準(zhǔn)備就緒,就等著搶資源了。
          • 等待隊(duì)列:對(duì)應(yīng)于我們上一節(jié)講的線程狀態(tài)中的 Waiting,也就是等待狀態(tài)。

          這里需要注意,從等待狀態(tài)線程無法直接進(jìn)入 Q2,而是要先重新加入同步隊(duì)列,再次等待拿鎖,拿到了鎖才能進(jìn)去 Q2;一旦出了 Q2,鎖就丟了。

          Q2 里,其實(shí)只有一個(gè)線程,因?yàn)檫@里我們必須要加鎖才能進(jìn)行操作。

          實(shí)現(xiàn)

          這里我首先建了一個(gè)簡(jiǎn)單的 Product 類,用來表示生產(chǎn)和消費(fèi)的產(chǎn)品,大家可以自行添加更多的 fields

          public?class?Product??{
          ????private?String?name;

          ????public?Product(String?name)?{
          ????????this.name?=?name;
          ????}

          ????public?String?getName()?{
          ????????return?name;
          ????}

          ????public?void?setName(String?name)?{
          ????????this.name?=?name;
          ????}
          }

          主函數(shù)里我設(shè)定了兩類線程,并且這里選擇用普通的 ArrayDeque 來實(shí)現(xiàn) Queue,更簡(jiǎn)單的方式是直接用 Java 中的 BlockingQueue 來實(shí)現(xiàn)。

          BlockingQueue 是阻塞隊(duì)列,它有一系列的方法可以讓線程實(shí)現(xiàn)自動(dòng)阻塞,常用的 BlockingQueue 有很多,后面會(huì)單獨(dú)出一篇文章來講。

          這里為了更好的理解并發(fā)協(xié)同的這個(gè)過程,我們先自己處理。

          public?class?Test?{
          ????public?static?void?main(String[]?args)?{
          ????????Queue?queue?=?new?ArrayDeque<>();

          ????????for?(int?i?=?0;?i?100;?i++)?{
          ????????????new?Thread(new?Producer(queue,?100)).start();
          ????????????new?Thread(new?Consumer(queue,?100)).start();
          ????????}
          ????}
          }

          然后就是 ProducerConsumer 了。

          public?class?Producer?implements?Runnable{
          ????private?Queue?queue;
          ????private?int?maxCapacity;

          ????public?Producer(Queue?queue,?int?maxCapacity)?{
          ????????this.queue?=?queue;
          ????????this.maxCapacity?=?maxCapacity;
          ????}

          ????@Override
          ????public?void?run()?{
          ????????synchronized?(queue)?{
          ????????????while?(queue.size()?==?maxCapacity)?{?//一定要用?while,而不是?if,下文解釋
          ????????????????try?{
          ????????????????????System.out.println("生產(chǎn)者"?+?Thread.currentThread().getName()?+?"等待中...?Queue?已達(dá)到最大容量,無法生產(chǎn)");
          ????????????????????wait();
          ????????????????????System.out.println("生產(chǎn)者"?+?Thread.currentThread().getName()?+?"退出等待");
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????????if?(queue.size()?==?0)?{?//隊(duì)列里的產(chǎn)品從無到有,需要通知在等待的消費(fèi)者
          ????????????????queue.notifyAll();
          ????????????}
          ????????????Random?random?=?new?Random();
          ????????????Integer?i?=?random.nextInt();
          ????????????queue.offer(new?Product("產(chǎn)品"??+?i.toString()));
          ????????????System.out.println("生產(chǎn)者"?+?Thread.currentThread().getName()?+?"生產(chǎn)了產(chǎn)品:"?+?i.toString());
          ????????}
          ????}
          }

          其實(shí)它的主邏輯很簡(jiǎn)單,我這里為了方便演示加了很多打印語句才顯得有點(diǎn)復(fù)雜。

          我們把主要邏輯拎出來看:

          ?public?void?run()?{
          ????????synchronized?(queue)?{
          ????????????while?(queue.size()?==?maxCapacity)?{?//一定要用?while,而不是?if,下文解釋
          ????????????????try?{
          ????????????????????wait();
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????????if?(queue.size()?==?0)?{
          ????????????????queue.notifyAll();
          ????????????}
          ????????????queue.offer(new?Product("產(chǎn)品"??+?i.toString()));
          ????????}
          ????}
          }

          這里有 3 塊內(nèi)容,再對(duì)照這個(gè)過程來看:

          1. 生產(chǎn)者線程拿到鎖后,其實(shí)就是進(jìn)入了 Q2 階段。首先檢查隊(duì)列是否容量已滿,如果滿了,那就要去 Q3 等待;
          2. 如果不滿,先檢查一下隊(duì)列原本是否為空,如果原來是空的,那就需要通知消費(fèi)者;
          3. 最后生產(chǎn)產(chǎn)品。

          這里有個(gè)問題,為什么只能用 while 而不是 if

          其實(shí)在這一小段,生產(chǎn)者線程經(jīng)歷了幾個(gè)過程:

          1. 如果隊(duì)列已滿,它就沒法生產(chǎn),那也不能占著位置不做事,所以要把鎖讓出來,去 Q3 - 等待隊(duì)列 等著;
          2. 在等待隊(duì)列里被喚醒之后,不能直接奪過鎖來,而是要先加入 Q1 - 同步隊(duì)列 等待資源;
          3. 一旦搶到資源,關(guān)門上鎖,才能來到 Q2 繼續(xù)執(zhí)行 wait() 之后的活,但是,此時(shí)這個(gè)隊(duì)列有可能又滿了,所以退出 wait() 之后,還需要再次檢查 queue.size() == maxCapacity 這個(gè)條件,所以要用 while

          那么為什么可能又滿了呢?

          因?yàn)榫€程沒有一直拿著鎖,在被喚醒之后,到拿到鎖之間的這段時(shí)間里,有可能其他的生產(chǎn)者線程先拿到了鎖進(jìn)行了生產(chǎn),所以隊(duì)列又經(jīng)歷了一個(gè)從不滿到滿的過程。

          總結(jié):在使用線程的等待通知機(jī)制時(shí),一般都要在 while 循環(huán)中調(diào)用 wait() 方法。

          消費(fèi)者線程是完全對(duì)稱的,我們來看代碼。

          public?class?Consumer?implements?Runnable{
          ????private?Queue?queue;
          ????private?int?maxCapacity;

          ????public?Consumer(Queue?queue,?int?maxCapacity)?{
          ????????this.queue?=?queue;
          ????????this.maxCapacity?=?maxCapacity;
          ????}

          ????@Override
          ????public?void?run()?{
          ????????synchronized?(queue)?{
          ????????????while?(queue.isEmpty())?{
          ????????????????try?{
          ????????????????????System.out.println("消費(fèi)者"?+?Thread.currentThread().getName()?+?"等待中...?Queue?已缺貨,無法消費(fèi)");
          ????????????????????wait();
          ????????????????????System.out.println("消費(fèi)者"?+?Thread.currentThread().getName()?+?"退出等待");
          ????????????????}?catch?(InterruptedException?e)?{
          ????????????????????e.printStackTrace();
          ????????????????}
          ????????????}
          ????????????if?(queue.size()?==?maxCapacity)?{
          ????????????????queue.notifyAll();
          ????????????}

          ????????????Product?product?=?queue.poll();
          ????????????System.out.println("消費(fèi)者"?+?Thread.currentThread().getName()?+?"消費(fèi)了:"?+?product.getName());
          ????????}
          ????}
          }

          結(jié)果如下:

          小結(jié)

          生產(chǎn)者 - 消費(fèi)者問題是面試中經(jīng)常會(huì)遇到的題目,本文首先講了該模型的三大優(yōu)點(diǎn):解藕,異步,平衡速度差異,然后講解了等待/通知的消息機(jī)制以及在該模型中的應(yīng)用,最后進(jìn)行了代碼實(shí)現(xiàn)。


          瀏覽 43
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美日韩乱伦视频 | 色老板美国在线视频网页 | 国产av午夜福利 国产操逼免费视频 | 久久久久久欧美二区电影网 | 99欧美视频免费在线观看 |