<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          深入java多線(xiàn)程與高并發(fā):JMH與Disruptor,確定能學(xué)會(huì)?

          共 22149字,需瀏覽 45分鐘

           ·

          2022-06-08 15:44

          前言

          今天我們講兩個(gè)內(nèi)容,第一個(gè)是JMH,第二個(gè)是Disruptor。這兩個(gè)內(nèi)容是給大家做更進(jìn)一步的這種多線(xiàn)程和高并發(fā)的一些專(zhuān)業(yè)上的處理。生產(chǎn)環(huán)境之中我們很可能不自己定義消息隊(duì)列,而是使用

          Disruptor。我們生產(chǎn)環(huán)境做測(cè)試的時(shí)候也不是像我說(shuō)的那樣寫(xiě)一個(gè)start寫(xiě)一個(gè)end就測(cè)試完了。在這里給大家先介紹專(zhuān)業(yè)的JMH測(cè)試工具,在給大家介紹Disruptor號(hào)稱(chēng)最快的消息隊(duì)列。

          JMH -java Microbenchmark Harness

          微基準(zhǔn)測(cè)試,它是測(cè)的某一個(gè)方法的性能到底是好或者不好,換了方法的實(shí)現(xiàn)之后他的性能到底好還是不好。

          這個(gè)測(cè)試的框架是2013年發(fā)出來(lái)的,由JLT的開(kāi)發(fā)人員開(kāi)發(fā),后來(lái)歸到了OpenJDK下面。

          官網(wǎng):
          http://openjdk.java.net/projects/code-tools/jmh/

          下面我們來(lái)介紹什么是一個(gè)JMH,他是用來(lái)干什么的,我們來(lái)看到底怎么使用,給大家一個(gè)簡(jiǎn)單的介紹肯定是了解不了jmh是個(gè)什么東西,已經(jīng)把這個(gè)步驟給大家總結(jié)一篇文檔,官網(wǎng)在哪里,怎么樣去創(chuàng)建一個(gè)JMH的測(cè)試,創(chuàng)建一共大致有七個(gè)步驟,還有他的一些基本概念,什么叫預(yù)熱,什么叫Mesurement等等的,還有進(jìn)一步了解的官方地址。

          JMH Java準(zhǔn)測(cè)試工具套件

          什么是JMH

          官網(wǎng):
          http://openjdk.java.net/projects/code-tools/jmh/

          創(chuàng)建JMH測(cè)試

          1. 創(chuàng)建Maven項(xiàng)目,添加依賴(lài),我們需要添加兩個(gè)依賴(lài):

          1.1:jmh-core (jmh的核心)

          1.2:jmh-generator-annprocess(注解處理包)

          <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
          http://maven.apache.org/xsd/maven-4.0.0.xsd"
          >
          <modelVersion>4.0.0modelVersion><properties><project.build.sourceEncoding>UTF-8project.build.sourceEncoding><encoding>UTF-8encoding><java.version>1.8java.version><maven.compiler.source>1.8maven.compiler.source><maven.compiler.target>1.8maven.compiler.target>properties><groupId>mashibing.comgroupId><artifactId>HelloJMH2artifactId><version>1.0-SNAPSHOTversion><dependencies><dependency><groupId>org.openjdk.jmhgroupId><artifactId>jmh-coreartifactId><version>1.21version>dependency><dependency><groupId>org.openjdk.jmhgroupId><artifactId>jmh-generator-annprocessartifactId><version>1.21version><scope>testscope>dependency>dependencies>project>

          2. idea安裝JMH插件 JMH plugin v1.0.3

          JMH這個(gè)東西你要想真正的安安靜靜的去運(yùn)行,就不會(huì)去影響我們正常程序的執(zhí)行,最好的方式就是按照官網(wǎng)的說(shuō)法是命令行的方式,比方說(shuō)你要測(cè)試某一個(gè)包里面的類(lèi)的話(huà)你應(yīng)該把這個(gè)類(lèi)和其他的依賴(lài)類(lèi)打成一個(gè)jar包,然后單獨(dú)的把這個(gè)jar包放到某一個(gè)機(jī)器上,在這個(gè)機(jī)器上對(duì)這個(gè)jar包進(jìn)行微基準(zhǔn)的測(cè)試,如果對(duì)它進(jìn)行測(cè)試的比較好,那說(shuō)明最后的結(jié)果還可以,如果說(shuō)邊開(kāi)發(fā)邊進(jìn)行這種微基準(zhǔn)的測(cè)試實(shí)際上他非常的不準(zhǔn),因?yàn)槟愕拈_(kāi)發(fā)環(huán)境會(huì)對(duì)結(jié)果產(chǎn)生影響。

          只不過(guò)我們自己開(kāi)發(fā)人員來(lái)說(shuō)平時(shí)你要想進(jìn)行一些微基準(zhǔn)的測(cè)試的話(huà),你要是每次打個(gè)包來(lái)進(jìn)行正規(guī)一個(gè)從頭到尾的測(cè)試 ,完了之后發(fā)現(xiàn)問(wèn)題不對(duì)再去重新改,效率太低了。所以在這里教大家的是怎么樣在IDE里面來(lái)進(jìn)行微基準(zhǔn)的測(cè)試。idea安裝JMH插件:fifile->Settings->Plugins->JMH-plugin。它運(yùn)行的時(shí)候需要這個(gè)plugin的支持,如果你用命令行是不需要這些東西的。

          3. 由于用到了注解,打開(kāi)運(yùn)行程序注解配置

          因?yàn)镴MH在運(yùn)行的時(shí)候他用到了注解,注解這個(gè)東西你自己得寫(xiě)一個(gè)程序得解釋他,所以你要把這個(gè)給設(shè)置上允許JMH能夠?qū)ψ⒔膺M(jìn)行處理:

          compiler -> Annotation Processors -> Enable Annotation Processing

          4. 定義需要測(cè)試類(lèi)PS (ParallelStream)看這里,寫(xiě)了一個(gè)類(lèi),并行處理流的一個(gè)程序,定義了一個(gè)list集合,然后往這個(gè)集合里扔了1000個(gè)數(shù)。寫(xiě)了一個(gè)方法來(lái)判斷這個(gè)數(shù)到底是不是一個(gè)質(zhì)數(shù)。寫(xiě)了兩個(gè)方法,第一個(gè)是用forEach來(lái)判斷我們這1000個(gè)數(shù)里到底有誰(shuí)是質(zhì)數(shù);第二個(gè)是使用了并行處理流,這個(gè)forEach的方法就只有單線(xiàn)程里面執(zhí)行,挨著牌從頭拿到尾,從0拿到1000,但是并行處理的時(shí)候會(huì)有多個(gè)線(xiàn)程采用ForkJoin的方式來(lái)把里面的數(shù)分成好幾份并行的進(jìn)行處理。一種是串行處理,一種是并行處理,都可以對(duì)他們進(jìn)行測(cè)試,但需要注意這個(gè)基準(zhǔn)測(cè)試并不是對(duì)比測(cè)試的,你只是測(cè)試一下你這方法寫(xiě)出這樣的情況下他的吞吐量到底是多少,這是一個(gè)非常專(zhuān)業(yè)的測(cè)試的工具。嚴(yán)格的來(lái)講這部分是測(cè)試開(kāi)發(fā)專(zhuān)業(yè)的。

          package com.mashibing.jmh;import java.util.ArrayList;import java.util.List;import java.util.Random;public class PS {static List nums = new ArrayList<>();static {
          Random r = new Random();for (int i = 0; i < 10000; i++) nums.add(1000000 +
          r.nextInt(1000000));
          }static void foreach() {
          nums.forEach(v->isPrime(v));
          }static void parallel() {
          nums.parallelStream().forEach(PS::isPrime);
          }static boolean isPrime(int num) {for(int i=2; i<=num/2; i++) {if(num % i == 0) return false;
          }return true;
          }
          }

          5. 寫(xiě)單元測(cè)試

          這個(gè)測(cè)試類(lèi)一定要在test package下面

          我對(duì)這個(gè)方法進(jìn)行測(cè)試testForEach,很簡(jiǎn)單我就調(diào)用PS這個(gè)類(lèi)的foreach就行了,對(duì)它測(cè)試最關(guān)鍵的是我加了這個(gè)注解@Benchmark,這個(gè)是JMH的注解,是要被JMH來(lái)解析處理的,這也是我們?yōu)槊匆涯莻€(gè)Annotation Processing給設(shè)置上的原因,非常簡(jiǎn)單,你只要加上注解就可以對(duì)這個(gè)方法進(jìn)行微基準(zhǔn)測(cè)試了,點(diǎn)擊右鍵直接run。

          package com.mashibing.jmh;import org.openjdk.jmh.annotations.Benchmark;import static org.junit.jupiter.api.Assertions.*;public class PSTest {@Benchmark@Warmup(iteration=1, time=3//在專(zhuān)業(yè)測(cè)試?yán)锩媸紫纫M(jìn)行預(yù)熱,預(yù)熱多少次,預(yù)熱多少時(shí)間@Fork(5)//意思是用多少個(gè)線(xiàn)程去執(zhí)行我們的程序@BenchmarkMode(Mode.Throughput)//是對(duì)基準(zhǔn)測(cè)試的一個(gè)模式,這個(gè)模式用的最多的是Throughput吞吐量@Measurement(iteration=1, time=3)//是整個(gè)測(cè)試要測(cè)試多少遍,調(diào)用這個(gè)方法要調(diào)用多少次
          public void testForEach() {PS.foreach();
          }
          }

          6. 運(yùn)行測(cè)試類(lèi),如果遇到下面的錯(cuò)誤:

          ERROR: org.openjdk.jmh.runner.RunnerException: ERROR: Exception while trying

          to acquire the JMH lock (C:\WINDOWS\/jmh.lock): C:\WINDOWS\jmh.lock (拒絕訪問(wèn)。), exiting. Use -Djmh.ignoreLock=true to forcefully continue.

          at org.openjdk.jmh.runner.Runner.run(Runner.java:216)

          at org.openjdk.jmh.Main.main(Main.java:71)

          這個(gè)錯(cuò)誤是因?yàn)镴MH運(yùn)行需要訪問(wèn)系統(tǒng)的TMP目錄,解決辦法是:

          打開(kāi)RunConfifiguration -> Environment Variables -> include system environment viables

          7. 閱讀測(cè)試報(bào)告

          JMH中的基本概念

          1. Warmup

          預(yù)熱,由于JVM中對(duì)于特定代碼會(huì)存在優(yōu)化(本地化),預(yù)熱對(duì)于測(cè)試結(jié)果很重要

          2. Mesurement

          總共執(zhí)行多少次測(cè)試

          3. Timeout

          4. Threads

          線(xiàn)程數(shù),由fork指定

          5. Benchmark mode

          基準(zhǔn)測(cè)試的模式

          6. Benchmark

          測(cè)試哪一段代碼

          next

          做個(gè)是JMH的一個(gè)入門(mén),嚴(yán)格來(lái)講這個(gè)和我們的關(guān)系其實(shí)并不大,這個(gè)是測(cè)試部門(mén)干的事兒,但是你了解一下沒(méi)有特別多的壞處,你也知道你的方法最后效率高或者底,可以通過(guò)一個(gè)簡(jiǎn)單的JMH插件來(lái)幫你完成,你不要在手動(dòng)的去寫(xiě)這件事兒了。

          如果說(shuō)大家對(duì)JMH有興趣,你們?cè)诠ぷ髦锌赡軙?huì)有用的上大家去讀一下官方的例子,官方大概有好幾十個(gè)例子程序,你可以自己一個(gè)一個(gè)的去研究。

          官方樣例:

          http://hg.openjdk.java.net/code-tools/jmh/fifile/tip/jmh-samples/src/main/java/org/openjdk/jmh/s

          amples/

          Disruptor按照英文翻譯的話(huà),Disruptor應(yīng)該是分裂、瓦解。這個(gè)Disruptor是一個(gè)做金融的、做股票的這樣一個(gè)公司交易所來(lái)開(kāi)發(fā)的,為自己來(lái)開(kāi)發(fā)的這么一個(gè)底層的框架,開(kāi)發(fā)出來(lái)之后受到了很多的認(rèn)可,開(kāi)源之后,2011年獲得Duke獎(jiǎng)。如果你想把它用作MQ的話(huà),單機(jī)最快的MQ。性能非常的高,主要是它里面用的全都是cas,另外把各種各樣的性能開(kāi)發(fā)到了極致,所以他單機(jī)支持很高的一個(gè)并發(fā)。

          Disruptor不是平時(shí)我們學(xué)的這個(gè)redis、不是平時(shí)我們所學(xué)的kafka,他可以跟他們一樣有類(lèi)似的用途,但他是單機(jī),redis、kafka也可以用于集群。redis他有這種序列化的機(jī)制,就是你可以把它存儲(chǔ)到硬盤(pán)上或數(shù)據(jù)庫(kù)當(dāng)中是可以的,kafka當(dāng)然也有,Disruptor沒(méi)有,Disruptor就是在內(nèi)存里,Disruptor簡(jiǎn)單理解就是內(nèi)存里用于存放元素的一個(gè)高效率的隊(duì)列。

          介紹

          關(guān)于Disruptor的一些資料,給大家列在這里。

          主頁(yè):
          http://imax-exchange.github.io/disruptor/

          源碼:
          https://github.com/LMAX-Exchange/disruptor

          GettingStarted:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started

          api:http://imax-exchange.github.io/disruptor/docs/index.html

          maven:https://mvnrepository.com/artifact/com.imax/disruptor

          Disruptor叫無(wú)鎖、高并發(fā)、環(huán)形Buffffer,直接覆蓋(不用清除)舊的數(shù)據(jù),降低GC頻率,用于生產(chǎn)者消費(fèi)者模式(如果說(shuō)按照設(shè)計(jì)者角度來(lái)講他就是觀察者模式)。什么叫觀察者模式,想象一下,我們?cè)谇懊鎸W(xué)各種各樣的隊(duì)列的時(shí)候,隊(duì)列就是個(gè)容器,好多生產(chǎn)者往里頭扔?xùn)|西,好多消費(fèi)者從里頭往外拿東西。所謂的生產(chǎn)者消費(fèi)者就是這個(gè)意思,為什么我們可以叫他觀察者呢,因?yàn)檫@些消費(fèi)者正在觀察著里面有沒(méi)有新東西,如果有的話(huà)我馬上拿過(guò)來(lái)消費(fèi),所以他也是一種觀察者模式。Disruptor實(shí)現(xiàn)的就是這個(gè)容器

          Disruptor核心與特點(diǎn)

          Disruptor也是一個(gè)隊(duì)列,和其他隊(duì)列不一樣的是他是一個(gè)環(huán)形隊(duì)列,環(huán)形的Buffffer。一般情況下我們的容器是一個(gè)隊(duì)列,不管你是用鏈表實(shí)現(xiàn)還是用數(shù)組實(shí)現(xiàn)的,它會(huì)是一個(gè)隊(duì)列,那么這個(gè)隊(duì)列生產(chǎn)者這邊使勁往里塞,消費(fèi)者這邊使勁往外拿,但Disruptor的核心是一個(gè)環(huán)形的buffffer。


          對(duì)比ConcurrentLinkedQueue:鏈表實(shí)現(xiàn)這種環(huán)形的buffffer速度就是更快,同學(xué)們可以去查一下JDK自帶的容器,你會(huì)發(fā)現(xiàn)效率比較高的有各種各樣的隊(duì)列,如果不想阻塞就可以用Concurrent相關(guān)的,ConcurrentLinkedQueue是并發(fā)的用鏈表實(shí)現(xiàn)的隊(duì)列,它里面大量的使用了cas,因此它的效率相對(duì)比較高,可是對(duì)于遍歷來(lái)講鏈表的效率一定會(huì)比數(shù)組低。

          JDK中沒(méi)有ConcurrentArrayQueue

          因?yàn)閿?shù)組的大小的固定的,如果想擴(kuò)展的話(huà)就要把原來(lái)的數(shù)組拷貝到新數(shù)組里,每次加都要拷貝這個(gè)效率相當(dāng)?shù)?,所以他并沒(méi)有給大家加這個(gè)叫ConcurrentArrayQueue,但是Disruptor就非常牛X,想到了這樣一個(gè)辦法,就是把數(shù)組的頭尾相連。

          Disruptor是用數(shù)組實(shí)現(xiàn)的這樣的一個(gè)隊(duì)列,你可以認(rèn)為Disruptor就是用數(shù)組實(shí)現(xiàn)的ConcurrentArrayQueue,另外這個(gè)Queue是首尾相連的.

          那Disruptor用數(shù)組實(shí)現(xiàn)的環(huán)形的就比上面兩個(gè)都牛嗎,牛在哪?為啥呢?如果我們用ConcurrentLinkedQueue這里面就是一個(gè)一個(gè)鏈表,這個(gè)鏈表遍歷起來(lái)肯定沒(méi)有數(shù)組快,這個(gè)是一點(diǎn)。還有第二點(diǎn)就是這個(gè)鏈表要維護(hù)一個(gè)頭指針和一個(gè)尾指針,我往頭部加的時(shí)候要加鎖,往尾部拿的時(shí)候也要加鎖。另外鏈表本身效率就偏低,還要維護(hù)兩個(gè)指針。關(guān)于環(huán)形的呢,環(huán)形本身就維護(hù)一個(gè)位置,這個(gè)位置稱(chēng)之為sequence序列,這個(gè)序列代表的是我下一個(gè)有效的元素指在什么位置上,就相當(dāng)于他只有一個(gè)指針來(lái)回轉(zhuǎn)。加在某個(gè)位置上怎么計(jì)算:直接用那個(gè)數(shù)除以我們整個(gè)的容量求余就可以了。

          RingBuffffer是一個(gè)環(huán)形隊(duì)列

          RingBuffffer的序號(hào),指向下一個(gè)可用的元素

          采用數(shù)組實(shí)現(xiàn),沒(méi)有首尾指針

          對(duì)比ConcurrentLinkedQueue,用數(shù)組實(shí)現(xiàn)的速度更快

          假如長(zhǎng)度為8,當(dāng)添加到第12個(gè)元素的時(shí)候在哪個(gè)序號(hào)上呢?用12%8決定

          當(dāng)Buffffer被填滿(mǎn)的時(shí)候到底是覆蓋還是等待,由Produce決定長(zhǎng)度設(shè)為2的n次冪,利于二進(jìn)制計(jì)算,例如:12%8=12&(8-1)

          如果大家對(duì)于位運(yùn)算有疑問(wèn)的,在咱們網(wǎng)站上有一個(gè)菜鳥(niǎo)預(yù)習(xí),里面有一部分是二進(jìn)制,大家去翻看一下。

          由于它會(huì)采用覆蓋的方式,所以他沒(méi)有必要記頭指針,沒(méi)有必要記尾指針。我只要記一個(gè)指針?lè)旁谶@就可以了。在這點(diǎn)上依然要比ConcurrentLinkedQueue要快。

          那我生產(chǎn)者線(xiàn)程生產(chǎn)的特別多,消費(fèi)者沒(méi)來(lái)得及消費(fèi)那我在往后覆蓋的話(huà)怎么辦?不會(huì)那么輕易的讓你覆蓋的,我們是有策略的,我生產(chǎn)者生產(chǎn)滿(mǎn)了,要在生產(chǎn)一個(gè)的話(huà)就馬上覆蓋這個(gè)位置上的數(shù)了。這時(shí)候是不能覆蓋的,指定了一個(gè)策略叫等待策略,這里面有8種等待策略,分情況自己去用。最常見(jiàn)的是BlockingWait,滿(mǎn)了我就在這等著,什么時(shí)候你空了消費(fèi)者來(lái)喚醒一下就繼續(xù)。

          Disruptor開(kāi)發(fā)步驟

          開(kāi)發(fā)步驟是比較固定的一個(gè)開(kāi)發(fā)步驟。

          1:定義Event-隊(duì)列中需要處理的元素。

          在Disruptor他是每一個(gè)消息都認(rèn)為是一個(gè)事件,在他這個(gè)概念里就是一個(gè)事件,所以在這個(gè)環(huán)形隊(duì)列里面存的是一個(gè)一個(gè)的Event。

          2:定義Event工廠,用于填充隊(duì)列

          那這個(gè)Event怎么產(chǎn)生,就需要指定Event的工廠。3:定義EventHandler(消費(fèi)者),處理容器中的元素那這個(gè)Event怎么消費(fèi)呢,就需要指定Event的消費(fèi)者EventHandler。

          下面我們直接看程序,先看來(lái)自官網(wǎng)的幾個(gè)輔助程序:LongEvent這個(gè)事件里面或者說(shuō)消息里面裝的什么值,我只裝了一個(gè)long值,但這里面可以裝任何值,任何類(lèi)型的都可以往里裝,這個(gè)long類(lèi)型的值我們可以指定它set,官網(wǎng)上沒(méi)有toString方法,我給大家加了一段主要是為了打印消息讓大家看的更清楚。

          package com.mashibing.disruptor;public class LongEvent{private long value;public void set(long value){this.value = value;
          }
          @Overridepublic String toString(){return "LongEvent{" +"value=" + value +"}";
          }
          }

          然后呢,我需要一個(gè)EventFactory就是怎么產(chǎn)生這些個(gè)事件,這個(gè)Factory非常簡(jiǎn)單,

          LongEventFactory去實(shí)現(xiàn)EventFactiry的接口,去重寫(xiě)它的newInstance方法直接new LongEvent。構(gòu)建這個(gè)環(huán)的時(shí)候?yàn)槭裁匆付ㄒ粋€(gè)產(chǎn)生事件的工廠,我直接new這個(gè)事件不可以嗎?但是有的事件里面的構(gòu)造方法不讓你new呢,產(chǎn)生事件工廠的話(huà)你可以靈活的指定一些 ,這里面也是牽扯到效率的。底層比較深,我給大家解釋一下:

          這里牽扯效率問(wèn)題,因?yàn)镈isruptor初始化的時(shí)候會(huì)調(diào)用Event工廠,對(duì)ringBuffffer進(jìn)行內(nèi)存的提前分配,GC頻率會(huì)降低。

          package com.mashibing.disruptor;import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactiry<LongEvent>{@Overridepublic LongEvent newInstance(){return new LongEvent();
          }
          }

          在看第三個(gè)叫LongEventHandler,Handler就是我拿到這個(gè)事件之后該怎么樣進(jìn)行處理,所以這里是消息的消費(fèi)者,怎么處理呢,很簡(jiǎn)單,我處理完這個(gè)消息之后呢就記一個(gè)數(shù),總共記下來(lái)我一共處理了多少消息了,處理消息的時(shí)候默認(rèn)調(diào)用的是onEvent方法,這個(gè)方法里面有三個(gè)參數(shù),第一個(gè)是你要處理的那個(gè)消息,第二個(gè)是你處理的是哪個(gè)位置上的消息,第三個(gè)是整體的消息結(jié)束沒(méi)結(jié)束,是不是處理完了。你可以判斷他如果是true的話(huà)消費(fèi)者就可以退出了,如果是false的話(huà)說(shuō)明后面還有繼續(xù)消費(fèi)。

          package com.mashibing.disruptor;import com.lmax.disruptor.EventHandler;public class LongEventHandler implements EventHandler<LongEvent>{/**
          *
          *@param event
          *@param sequence RingBuffer的序號(hào)
          *@param endOfBatch 是否為最后一個(gè)元素
          *@throws Exception
          **/
          public static long count = 0;
          @Overridepublic void onEvent(LongEvent event,long sequence,boolean endOfBatch) throwsException{
          count++;
          System.out.println("["+Thread.currentThread().getName()+"]"+event+"序
          號(hào):"
          +sequence);
          }
          }

          所以我們定義了這三個(gè)類(lèi),關(guān)于這三個(gè)類(lèi)在給大家解釋一下,我們現(xiàn)在有一個(gè)環(huán),然后這個(gè)環(huán)上每一個(gè)位置裝LongEvent,怎么產(chǎn)生這個(gè)LongEvent通過(guò)這個(gè)LongEventFactory的newInstance方法來(lái)產(chǎn)生,當(dāng)我拿到這個(gè)Event之后通過(guò)LongEventHandler進(jìn)行處理。

          到現(xiàn)在我們把這三個(gè)輔助類(lèi)都已經(jīng)定義好了,定義好的情況下我們?cè)趺床拍鼙容^有機(jī)的結(jié)合在一起,讓他在Disruptor進(jìn)行處理呢,看第一個(gè)小例子程序,首先把EvenFactory給他初始化了new

          LongEventFactory,我們這個(gè)環(huán)應(yīng)該是2的N次方1024,然后new一個(gè)Disruptor出來(lái),需要指定這么幾個(gè)參數(shù):factory產(chǎn)生消息的工廠;bufffferSize是指定這個(gè)環(huán)大小到底是多少;defaultThreadFactory線(xiàn)程工廠,指的是當(dāng)他要產(chǎn)生消費(fèi)者的時(shí)候,當(dāng)要調(diào)用這個(gè)消費(fèi)者的時(shí)候他是在一個(gè)特定的線(xiàn)程里執(zhí)行的,這個(gè)線(xiàn)程就是通過(guò)defaultThreadFactory來(lái)產(chǎn)生;

          繼續(xù)往下看,當(dāng)我們拿到這個(gè)消息之后怎么進(jìn)行處理啊,我們就用這個(gè)LongEventHandler來(lái)處理。然后start,當(dāng)start之后一個(gè)環(huán)起來(lái)了,每個(gè)環(huán)上指向的這個(gè)LongEvent也得初始化好,內(nèi)存分配好了,整個(gè)就安安靜靜的等待著生產(chǎn)者的到來(lái)。

          看生產(chǎn)者的代碼,long sequence = ringBuffffer.next(),通過(guò)next找到下一個(gè)可用的位置,最開(kāi)始這個(gè)環(huán)是空的,下一個(gè)可用的位置是0這個(gè)位置,拿到這個(gè)位置之后直接去ringBuffffer里面get(0)這個(gè)位置上的event。如果說(shuō)你要是追求效率的極致,你應(yīng)該是一次性全部初始化好,你get的時(shí)候就不用再去判斷,如果你想做一個(gè)延遲,很不幸的是你每次都要做判斷是不是初始化了。get的時(shí)候就是拿到一個(gè)event,這個(gè)是我們new出來(lái)的默認(rèn)的,但是我們可以改里面的event.set( 值...),填好數(shù)據(jù)之后ringBuffffer.publish發(fā)布生產(chǎn)。

          package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main01{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,
          Executors.defaultThreadFactory());//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//官方例程long sequence = ringBuffer.next();//Grab the next sequencetry{
          LongEvent event=ringBuffer.get(sequence);//Get the entry in theDisruptor//for the sequenceevent.set(8888L);//Fill with data}finally{
          ringBuffer.publish(sequence);
          }
          }
          }

          disruptor在后面提供了一些Lambda表達(dá)式的寫(xiě)法,為了支持這種寫(xiě)法對(duì)整個(gè)消息的構(gòu)建過(guò)程做了改進(jìn),讀下面02小程序使用translator,就是怎么樣構(gòu)建這個(gè)消息,原來(lái)我們都是用消息的factory,但是下面這次我們用translator對(duì)他進(jìn)行構(gòu)建,就是把某一些數(shù)據(jù)翻譯成消息。前面產(chǎn)生event工廠還是一樣,然后bufffferSize,后面再扔的是DaemonThreadFactory就是后臺(tái)線(xiàn)程了,new LongEventHandler然后start拿到他的ringBuffffer,前面都一樣。只有一個(gè)地方叫EventTranslator不一樣,我們?cè)趍ain01里面的代碼是要寫(xiě)try catch然后把里面的值給設(shè)好,相當(dāng)于把這個(gè)值轉(zhuǎn)換成event對(duì)象。相對(duì)簡(jiǎn)單的寫(xiě)法,它會(huì)把某些值轉(zhuǎn)成一個(gè)LongEvent,通過(guò)EventTranslator。new出來(lái)后實(shí)現(xiàn)了translateTo方法,EventTranslator它本身是一個(gè)接口,所以你要new的時(shí)候你又要實(shí)現(xiàn)它里面沒(méi)有實(shí)現(xiàn)的方法,translateTo的意思是你給我一個(gè)Event,我會(huì)把這個(gè)Event給你填好。

          ringBuffffer.publishEvent(translator1) 你只要把translator1交給ringBuffffer就可以了。這個(gè)translator就是為了迎合Lambda表達(dá)式的寫(xiě)法(為java8的寫(xiě)法做準(zhǔn)備)

          另外translator有很多種用法:

          EventTranslatorOneArg只有帶一個(gè)參數(shù)的EventTranslator。我?guī)в幸粋€(gè)參數(shù),這個(gè)參數(shù)會(huì)通過(guò)我的translateTo方法轉(zhuǎn)換成一個(gè)LongEvent;既然有EventTranslatorOneArg就有EventTranslatorTwoArg、EventTranslatorThreeArg,還有EventTranslatorVararg多了去了Vararg就是有好多個(gè)值,我把里面的值全都給你加起來(lái)最后把結(jié)果set到event里面。

          package com.mashibing.disruptor;
          import java.util.concurrent.Executor;
          import java.util.concurrent.Executors;
          import com.lmax.disruptor.dsl.Disruptor;
          import com.lmax.disruptor.RingBuffer;
          import com.lmax.disruptor.util.DaemonThreadFactory;
          import java.nio.ByteBuffer;public class Main02{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,
          DaemonThreadFactory.INSTANCE);//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================EventTranslator translator1 = new EventTranslator
          (){
          @Overridepublic void translateTo(LongEvent event,long sequence){event.set(8888L); }
          };
          ringBuffer.publishEvent(translator1);//========================================================================EventTranslatorOneArg translator2 = newEventTranslatorOneArg(){
          @Overridepublic void translateTo(LongEvent event,long sequence,Long l){event.set(l); }
          };
          ringBuffer.publishEvent(translator2,7777L);//========================================================================EventTranslatorTwoArg translator3 = newEventTranslatorTwoArg(){
          @Overridepublic void translateTo(LongEvent event,long sequence,Long l1,Long l2){ event.set(l); }
          };
          ringBuffer.publishEvent(translator3,10000L,10000L);//========================================================================EventTranslatorThreeArg translator4 = newEventTranslatorThreeArg(){
          @Overridepublic void translateTo(LongEvent event,long sequence,Long l1,Long
          l2,Long l3
          )
          { event.set(l1+ l2+ l3); }
          };
          ringBuffer.publishEvent(translator4,10000L,10000L,10000L);//========================================================================EventTranslatorVararg translator5 = newEventTranslatorThreeArg(){
          @Overridepublic void translateTo(LongEvent event,long sequence,Object...
          objects
          )
          {long result = 0;for(Object o : objects){long l =(Long)o;
          result +=l;
          }
          }
          };
          ringBuffer.publishEvent(translator5,10000L,10000L,10000L,10000L);
          }
          }

          有了上面Translator之后呢,下面看Lambda表達(dá)式怎么寫(xiě),這個(gè)是比較簡(jiǎn)潔的寫(xiě)法,連factory都省了,直接指定一個(gè)Lambda表達(dá)式LongEvent::new。繼續(xù)handleEventsWith把三個(gè)參數(shù)傳進(jìn)來(lái)后面寫(xiě)好Lambda表達(dá)式直接打印,然后start, 接著RingBuffffer,publishEvent原來(lái)我們還有寫(xiě)try...catch,現(xiàn)在簡(jiǎn)單了直接ringBuffffer.publishEvent(第一個(gè)是lambda表達(dá)式,表達(dá)式后是你指定的幾個(gè)參

          數(shù)),所以現(xiàn)在的這種寫(xiě)法就不定義各種各樣的EventTranslator了。

          package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main03{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new,
          bufferSize,DaemonThreadFactory.INSTANCE);//Connect the handlerdisruptor.handleEventsWith((event,sequence,endOfBatch)-
          >System.out.println("Event:"+event));//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
          ringBuffer.publishEvent((event, sequence)-> event.set(10000L));System.in.read();
          }
          }

          下面我們叫一些細(xì)節(jié),這些個(gè)細(xì)節(jié)也不難,講給大家。第一個(gè)細(xì)節(jié)是我們生產(chǎn)者的時(shí)候默認(rèn)會(huì)有好多種生產(chǎn)方式,默認(rèn)的是多線(xiàn)程生產(chǎn)者,但是假如你確定你整個(gè)程序里頭只有一個(gè)生產(chǎn)者的話(huà)那你還能提高效率,就是在你指定Disruptor生產(chǎn)者的線(xiàn)程的方式是SINGLE,生產(chǎn)者的類(lèi)型ProducerType。

          ProducerType生產(chǎn)者線(xiàn)程模式

          ProducerType有兩種模式ProducerMULTI和Producer.SINGLE

          默認(rèn)是MULTI,表示在多線(xiàn)程模式下產(chǎn)生sequence

          如果確認(rèn)是單線(xiàn)程生產(chǎn)者,那么可以指定SINGLE,效率會(huì)提升如果是多個(gè)生產(chǎn)者(多線(xiàn)程),但模式指定為SINGLE,會(huì)出什么問(wèn)題?

          假如你的程序里頭只有一個(gè)生產(chǎn)者還用ProducerMULTI的話(huà),我們對(duì)序列來(lái)進(jìn)行多線(xiàn)程訪問(wèn)的時(shí)候肯定是要加鎖的,所以MULTI里面默認(rèn)是有鎖定處理的,但是假如你只有一個(gè)線(xiàn)程這個(gè)時(shí)候應(yīng)該把生產(chǎn)者指定為SINGLE,他的效率更高,因?yàn)樗锩娌患渔i。

          下面這個(gè)小程序,我這里指定的是Producer.SINGLE,但是我生產(chǎn)的時(shí)候用的是一堆線(xiàn)程,當(dāng)我制定了Producer.SINGLE之后相當(dāng)于內(nèi)部對(duì)于序列的訪問(wèn)就沒(méi)有鎖了,它會(huì)把性能發(fā)揮到極致,它不會(huì)報(bào)錯(cuò),它會(huì)把你的消息靜悄悄的覆蓋了,因此你要小心一點(diǎn)。我這里這個(gè)寫(xiě)法是我有50 個(gè)線(xiàn)程然后每個(gè)線(xiàn)程生產(chǎn)100個(gè)數(shù),最后結(jié)果正常的話(huà)應(yīng)該是有5000個(gè)消費(fèi)產(chǎn)生。

          package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main04_ProducerType{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the Disruptor//Disruptor disruptor = new Disruptor<>(factory,bufferSize,Executors.defaultThreadFactory());
          Disruptor disruptor = new Disruptor<>(factory,bufferSize,
          Executors.defaultThreadFactory(),ProducerType.SINGLE,newBlockingWaitStrategy());//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================final int threadCount = 50;
          CycliBarrier barrier=new CycliBarrier(threadCount);
          ExecutorService service = Executors.newCachedThreadPool();for(long i=0; ifinal long threadNum = i;
          service.submit(()->{
          System.out.printf("Thread %s ready to start!\n",threadNum);try{
          barrier.await();
          }catch(InterruptedException e){
          e.printStackTrace();
          }catch(BrokenBarrierException e){
          e.printStackTrace();
          }for(int j=0; j<100;j++){
          ringBuffer.publishEvent((event,sequence)->{
          event.set(threadNum);
          System.out.println("生產(chǎn)了"+threadNum);
          });
          }
          });
          }
          service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);
          System.out.println(LongEventHandler.count);
          }
          }

          我們?cè)賮?lái)聊一下等待策略WaitStrategy,有好多種方法,看下面等待策略(常用)BlockingWaitStrategy:通過(guò)線(xiàn)程堵塞的方式,等待生產(chǎn)者喚醒,被喚醒后,再循環(huán)檢查依賴(lài)的sequence是否已經(jīng)消費(fèi)。

          BusySpinWaitStrategy:線(xiàn)程一直自旋等待,可能比較耗cpu

          LiteBlockingWaitStrategy:線(xiàn)程阻塞等待生產(chǎn)者喚醒,與BlockingWaitStrategy相比,區(qū)別在signalNeeded.getAndSet,如果兩個(gè)線(xiàn)程同時(shí)訪問(wèn)一個(gè)訪問(wèn)waitfor,一個(gè)訪問(wèn)signalAll時(shí),可以

          減少lock加鎖次數(shù)
          LiteTimeoutBlockingWaitStrategy:與LiteBlockingWaitStrategy相比,設(shè)置了阻塞時(shí)間,超過(guò)時(shí)

          間后拋出異常
          PhasedBackoffffWaitStrategy:根據(jù)時(shí)間參數(shù)和傳入的等待策略來(lái)決定使用那種等待策略TimeoutBlockingWaitStrategy:相對(duì)于BlockingWaitStrategy來(lái)說(shuō),設(shè)置了等待時(shí)間,超過(guò)后拋出異常

          (常用)YieldingWaitStrategy:嘗試100次,然后Thread.yield()讓出cpu

          (常用)SleepingWaitStrategy:sleep

          我們常用的BlockingWaitStrategy滿(mǎn)了就等著;SleepingWaitStrategy滿(mǎn)了就睡一覺(jué),睡醒了看看能不能繼續(xù)執(zhí)行了;YieldingWaitStrategy讓出cpu,讓你消費(fèi)者趕緊消費(fèi),消費(fèi)完了之后我又回來(lái)看看我是不是又能生產(chǎn)了;一般YieldingWaitStrategy效率是最高的,但也要看實(shí)際情況適用不適用。

          package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main05_WaitStrategy{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,
          Executors.defaultThreadFactory(),ProducerType.MULTI,new SleepingWaitStrategy());//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================final int threadCount = 50;
          CycliBarrier barrier=new CycliBarrier(threadCount);ExecutorService service = Executors.newCachedThreadPool();for(long i=0; ifinal long threadNum = i;
          service.submit(()->{
          System.out.printf("Thread %s ready to start!\n",threadNum);try{
          barrier.await();
          }catch(InterruptedException e){
          e.printStackTrace();
          }catch(BrokenBarrierException e){
          e.printStackTrace();
          }for(int j=0; j<100;j++){
          ringBuffer.publishEvent((event,sequence)->{
          event.set(threadNum);
          System.out.println("生產(chǎn)了"+threadNum);
          });
          }
          });
          }
          service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);
          System.out.println(LongEventHandler.count);
          }
          }

          我們來(lái)看多個(gè)消費(fèi)者怎么指定,默認(rèn)的情況下只有一個(gè)消費(fèi)者,你想要有多個(gè)消費(fèi)者的時(shí)候也非常簡(jiǎn)單,看下面代碼我定義了兩個(gè)消費(fèi)者h(yuǎn)1、h2,
          disruptor.handleEventsWith(h1,h2)這里面是一個(gè)可變參數(shù),所以你要想有多個(gè)消費(fèi)者的時(shí)候就往里裝,多個(gè)消費(fèi)者是位于多個(gè)線(xiàn)程里面的。

          package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main06_MultiConsumer{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,
          Executors.defaultThreadFactory(),ProducerType.MULTI,new SleepingWaitStrategy());//Connect the handlersLongEventHandler h1 = new LongEventHandler();LongEventHandler h2 = new LongEventHandler();
          disruptor.handleEventsWith(h1,h2);//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================final int threadCount = 10;
          CycliBarrier barrier=new CycliBarrier(threadCount);
          ExecutorService service = Executors.newCachedThreadPool();for(long i=0; ifinal long threadNum = i;
          service.submit(()->{
          System.out.printf("Thread %s ready to start!\n",threadNum);try{
          barrier.await();
          }catch(InterruptedException e){
          e.printStackTrace();
          }catch(BrokenBarrierException e){
          e.printStackTrace();
          }for(int j=0; j<10;j++){
          ringBuffer.publishEvent((event,sequence)->{
          event.set(threadNum);
          System.out.println("生產(chǎn)了"+threadNum);
          });
          }
          });
          }
          service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);
          System.out.println(LongEventHandler.count);
          }
          }

          還有disruptor最后一個(gè)問(wèn)題,出了異常怎么處理

          消費(fèi)者異常處理

          默認(rèn):
          disruptor.setDefaultExceptionHandler()

          覆蓋:
          disruptor.handleExceptionFor().with()

          看下面代碼,這這里方法里寫(xiě)了一個(gè)EventHandler是我們的消費(fèi)者,在消費(fèi)者里打印了event之后馬上拋出了異常,當(dāng)我們消費(fèi)者出現(xiàn)異常之后你不能讓整個(gè)線(xiàn)程停下來(lái),有一個(gè)消費(fèi)者出了異常那其他的消費(fèi)者就不干活了,肯定不行。handleExceptionsFor為消費(fèi)者指定Exception處理器 (h1).with后面是我們的ExceptionHandler出了異常之后該怎么辦進(jìn)行處理,重寫(xiě)三個(gè)方法,第一個(gè)是當(dāng)產(chǎn)生異常的時(shí)候在這很簡(jiǎn)單直接打印出來(lái)了;第二個(gè)是handleOnStart如果啟動(dòng)的時(shí)候出異常;第三個(gè)handleOnShutdown你該怎么處理。

          package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main07_ExceptionHandler{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,
          Executors.defaultThreadFactory(),ProducerType.MULTI,new SleepingWaitStrategy());//Connect the handlersEventHandler h1 = (event,sequence,end)->{
          System.out.println("消費(fèi)者出異常");
          };
          disruptor.handleEventsWith(h1);
          disruptor.handleExceptionsFor(h1).with(newExceptionHandler(){@Overridepublic void handleEventException(Throwable throwable,longl,LongEvent longEvent){
          throwable.printStackTrace();
          }@Overridepublic void handleOnStartException(Throwable throwable){
          System.out.println("Exception Start to Handle!");
          }@Overridepublic void handleOnShutdownException(Throwable throwable){
          System.out.println("Exception End to Handle!");
          }
          });//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================final int threadCount = 1;
          CycliBarrier barrier=new CycliBarrier(threadCount);
          ExecutorService service = Executors.newCachedThreadPool();for(long i=0; ifinal long threadNum = i;
          service.submit(()->{
          System.out.printf("Thread %s ready to start!\n",threadNum);try{
          barrier.await();
          }catch(InterruptedException e){
          e.printStackTrace();
          }catch(BrokenBarrierException e){
          e.printStackTrace();
          }for(int j=0; j<10;j++){
          ringBuffer.publishEvent((event,sequence)->{
          event.set(threadNum);
          System.out.println("生產(chǎn)了"+threadNum);
          });
          }
          });
          }
          service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);
          System.out.println(LongEventHandler.count);
          }
          }

          disruptor是一個(gè)環(huán),然后這個(gè)環(huán)有多個(gè)生產(chǎn)者可以往里頭生產(chǎn),由于它是環(huán)形的設(shè)計(jì)效率會(huì)非常的高,我們寫(xiě)程序的時(shí)候是這樣寫(xiě)的,首先你自己定義好Event消息的格式,然后定義消息工廠,消息工廠是用來(lái)初始化整個(gè)環(huán)的時(shí)候相應(yīng)的一些位置上各種各樣不同的消息先把它new出來(lái),new出來(lái)之后先占好空間,我們?cè)谏a(chǎn)的時(shí)候只需要把這個(gè)位置上這個(gè)默認(rèn)的這塊空間拿出來(lái)往里頭填值,填好值之后

          消費(fèi)者就可以往里頭消費(fèi)了,消費(fèi)完了生產(chǎn)者就可以繼續(xù)往里頭生產(chǎn)了,如果說(shuō)你生產(chǎn)者消費(fèi)的比較快,消費(fèi)者消費(fèi)的比較慢,滿(mǎn)了怎么辦,就是用各種各樣的等待策略,消費(fèi)者出了問(wèn)題之后可以用ExceptionHandler來(lái)進(jìn)行處理。

          覺(jué)得文章內(nèi)容不錯(cuò)的話(huà),可以轉(zhuǎn)發(fā)關(guān)注一下小編~ 之后持續(xù)更新干貨好文~~


          本文就是愿天堂沒(méi)有BUG給大家分享的內(nèi)容,大家有收獲的話(huà)可以分享下,想學(xué)習(xí)更多的話(huà)可以到微信公眾號(hào)里找我,我等你哦。

          瀏覽 61
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  不卡不卡不卡不卡不卡国产精品视频 | 老黄色网址 | 日韩香蕉视频 | 色综合色综合 | 蜜乳视频在线观看 |