深入java多線(xiàn)程與高并發(fā):JMH與Disruptor,確定能學(xué)會(huì)?
前言
今天我們講兩個(gè)內(nèi)容,第一個(gè)是JMH,第二個(gè)是Disruptor。這兩個(gè)內(nèi)容是給大家做更進(jìn)一步的這種多線(xiàn)程和高并發(fā)的一些專(zhuān)業(yè)上的處理。生產(chǎn)環(huán)境之中我們很可能不自己定義消息隊(duì)列,而是使用
Disruptor。我們生產(chǎn)環(huán)境做測(cè)試的時(shí)候也不是像我說(shuō)的那樣寫(xiě)一個(gè)start寫(xiě)一個(gè)end就測(cè)試完了。在這里給大家先介紹專(zhuān)業(yè)的JMH測(cè)試工具,在給大家介紹Disruptor號(hào)稱(chēng)最快的消息隊(duì)列。
JMH -java Microbenchmark Harness
微基準(zhǔn)測(cè)試,它是測(cè)的某一個(gè)方法的性能到底是好或者不好,換了方法的實(shí)現(xiàn)之后他的性能到底好還是不好。
這個(gè)測(cè)試的框架是2013年發(fā)出來(lái)的,由JLT的開(kāi)發(fā)人員開(kāi)發(fā),后來(lái)歸到了OpenJDK下面。
官網(wǎng):
http://openjdk.java.net/projects/code-tools/jmh/
下面我們來(lái)介紹什么是一個(gè)JMH,他是用來(lái)干什么的,我們來(lái)看到底怎么使用,給大家一個(gè)簡(jiǎn)單的介紹肯定是了解不了jmh是個(gè)什么東西,已經(jīng)把這個(gè)步驟給大家總結(jié)一篇文檔,官網(wǎng)在哪里,怎么樣去創(chuàng)建一個(gè)JMH的測(cè)試,創(chuàng)建一共大致有七個(gè)步驟,還有他的一些基本概念,什么叫預(yù)熱,什么叫Mesurement等等的,還有進(jìn)一步了解的官方地址。
JMH Java準(zhǔn)測(cè)試工具套件
什么是JMH
官網(wǎng):
http://openjdk.java.net/projects/code-tools/jmh/
創(chuàng)建JMH測(cè)試
1. 創(chuàng)建Maven項(xiàng)目,添加依賴(lài),我們需要添加兩個(gè)依賴(lài):
1.1:jmh-core (jmh的核心)
1.2:jmh-generator-annprocess(注解處理包)
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0modelVersion><properties><project.build.sourceEncoding>UTF-8project.build.sourceEncoding><encoding>UTF-8encoding><java.version>1.8java.version><maven.compiler.source>1.8maven.compiler.source><maven.compiler.target>1.8maven.compiler.target>properties><groupId>mashibing.comgroupId><artifactId>HelloJMH2artifactId><version>1.0-SNAPSHOTversion><dependencies><dependency><groupId>org.openjdk.jmhgroupId><artifactId>jmh-coreartifactId><version>1.21version>dependency><dependency><groupId>org.openjdk.jmhgroupId><artifactId>jmh-generator-annprocessartifactId><version>1.21version><scope>testscope>dependency>dependencies>project>2. idea安裝JMH插件 JMH plugin v1.0.3
JMH這個(gè)東西你要想真正的安安靜靜的去運(yùn)行,就不會(huì)去影響我們正常程序的執(zhí)行,最好的方式就是按照官網(wǎng)的說(shuō)法是命令行的方式,比方說(shuō)你要測(cè)試某一個(gè)包里面的類(lèi)的話(huà)你應(yīng)該把這個(gè)類(lèi)和其他的依賴(lài)類(lèi)打成一個(gè)jar包,然后單獨(dú)的把這個(gè)jar包放到某一個(gè)機(jī)器上,在這個(gè)機(jī)器上對(duì)這個(gè)jar包進(jìn)行微基準(zhǔn)的測(cè)試,如果對(duì)它進(jìn)行測(cè)試的比較好,那說(shuō)明最后的結(jié)果還可以,如果說(shuō)邊開(kāi)發(fā)邊進(jìn)行這種微基準(zhǔn)的測(cè)試實(shí)際上他非常的不準(zhǔn),因?yàn)槟愕拈_(kāi)發(fā)環(huán)境會(huì)對(duì)結(jié)果產(chǎn)生影響。
只不過(guò)我們自己開(kāi)發(fā)人員來(lái)說(shuō)平時(shí)你要想進(jìn)行一些微基準(zhǔn)的測(cè)試的話(huà),你要是每次打個(gè)包來(lái)進(jìn)行正規(guī)一個(gè)從頭到尾的測(cè)試 ,完了之后發(fā)現(xiàn)問(wèn)題不對(duì)再去重新改,效率太低了。所以在這里教大家的是怎么樣在IDE里面來(lái)進(jìn)行微基準(zhǔn)的測(cè)試。idea安裝JMH插件:fifile->Settings->Plugins->JMH-plugin。它運(yùn)行的時(shí)候需要這個(gè)plugin的支持,如果你用命令行是不需要這些東西的。
3. 由于用到了注解,打開(kāi)運(yùn)行程序注解配置
因?yàn)镴MH在運(yùn)行的時(shí)候他用到了注解,注解這個(gè)東西你自己得寫(xiě)一個(gè)程序得解釋他,所以你要把這個(gè)給設(shè)置上允許JMH能夠?qū)ψ⒔膺M(jìn)行處理:
compiler -> Annotation Processors -> Enable Annotation Processing
4. 定義需要測(cè)試類(lèi)PS (ParallelStream)看這里,寫(xiě)了一個(gè)類(lèi),并行處理流的一個(gè)程序,定義了一個(gè)list集合,然后往這個(gè)集合里扔了1000個(gè)數(shù)。寫(xiě)了一個(gè)方法來(lái)判斷這個(gè)數(shù)到底是不是一個(gè)質(zhì)數(shù)。寫(xiě)了兩個(gè)方法,第一個(gè)是用forEach來(lái)判斷我們這1000個(gè)數(shù)里到底有誰(shuí)是質(zhì)數(shù);第二個(gè)是使用了并行處理流,這個(gè)forEach的方法就只有單線(xiàn)程里面執(zhí)行,挨著牌從頭拿到尾,從0拿到1000,但是并行處理的時(shí)候會(huì)有多個(gè)線(xiàn)程采用ForkJoin的方式來(lái)把里面的數(shù)分成好幾份并行的進(jìn)行處理。一種是串行處理,一種是并行處理,都可以對(duì)他們進(jìn)行測(cè)試,但需要注意這個(gè)基準(zhǔn)測(cè)試并不是對(duì)比測(cè)試的,你只是測(cè)試一下你這方法寫(xiě)出這樣的情況下他的吞吐量到底是多少,這是一個(gè)非常專(zhuān)業(yè)的測(cè)試的工具。嚴(yán)格的來(lái)講這部分是測(cè)試開(kāi)發(fā)專(zhuān)業(yè)的。
package com.mashibing.jmh;import java.util.ArrayList;import java.util.List;import java.util.Random;public class PS {static List nums = new ArrayList<>();static {
Random r = new Random();for (int i = 0; i < 10000; i++) nums.add(1000000 +
r.nextInt(1000000));
}static void foreach() {
nums.forEach(v->isPrime(v));
}static void parallel() {
nums.parallelStream().forEach(PS::isPrime);
}static boolean isPrime(int num) {for(int i=2; i<=num/2; i++) {if(num % i == 0) return false;
}return true;
}
} 5. 寫(xiě)單元測(cè)試
這個(gè)測(cè)試類(lèi)一定要在test package下面
我對(duì)這個(gè)方法進(jìn)行測(cè)試testForEach,很簡(jiǎn)單我就調(diào)用PS這個(gè)類(lèi)的foreach就行了,對(duì)它測(cè)試最關(guān)鍵的是我加了這個(gè)注解@Benchmark,這個(gè)是JMH的注解,是要被JMH來(lái)解析處理的,這也是我們?yōu)槊匆涯莻€(gè)Annotation Processing給設(shè)置上的原因,非常簡(jiǎn)單,你只要加上注解就可以對(duì)這個(gè)方法進(jìn)行微基準(zhǔn)測(cè)試了,點(diǎn)擊右鍵直接run。
package com.mashibing.jmh;import org.openjdk.jmh.annotations.Benchmark;import static org.junit.jupiter.api.Assertions.*;public class PSTest {@Benchmark@Warmup(iteration=1, time=3)//在專(zhuān)業(yè)測(cè)試?yán)锩媸紫纫M(jìn)行預(yù)熱,預(yù)熱多少次,預(yù)熱多少時(shí)間@Fork(5)//意思是用多少個(gè)線(xiàn)程去執(zhí)行我們的程序@BenchmarkMode(Mode.Throughput)//是對(duì)基準(zhǔn)測(cè)試的一個(gè)模式,這個(gè)模式用的最多的是Throughput吞吐量@Measurement(iteration=1, time=3)//是整個(gè)測(cè)試要測(cè)試多少遍,調(diào)用這個(gè)方法要調(diào)用多少次
public void testForEach() {PS.foreach();
}
}6. 運(yùn)行測(cè)試類(lèi),如果遇到下面的錯(cuò)誤:
ERROR: org.openjdk.jmh.runner.RunnerException: ERROR: Exception while trying
to acquire the JMH lock (C:\WINDOWS\/jmh.lock): C:\WINDOWS\jmh.lock (拒絕訪問(wèn)。), exiting. Use -Djmh.ignoreLock=true to forcefully continue.
at org.openjdk.jmh.runner.Runner.run(Runner.java:216)
at org.openjdk.jmh.Main.main(Main.java:71)
這個(gè)錯(cuò)誤是因?yàn)镴MH運(yùn)行需要訪問(wèn)系統(tǒng)的TMP目錄,解決辦法是:
打開(kāi)RunConfifiguration -> Environment Variables -> include system environment viables
7. 閱讀測(cè)試報(bào)告
JMH中的基本概念
1. Warmup
預(yù)熱,由于JVM中對(duì)于特定代碼會(huì)存在優(yōu)化(本地化),預(yù)熱對(duì)于測(cè)試結(jié)果很重要
2. Mesurement
總共執(zhí)行多少次測(cè)試
3. Timeout
4. Threads
線(xiàn)程數(shù),由fork指定
5. Benchmark mode
基準(zhǔn)測(cè)試的模式
6. Benchmark
測(cè)試哪一段代碼
next
做個(gè)是JMH的一個(gè)入門(mén),嚴(yán)格來(lái)講這個(gè)和我們的關(guān)系其實(shí)并不大,這個(gè)是測(cè)試部門(mén)干的事兒,但是你了解一下沒(méi)有特別多的壞處,你也知道你的方法最后效率高或者底,可以通過(guò)一個(gè)簡(jiǎn)單的JMH插件來(lái)幫你完成,你不要在手動(dòng)的去寫(xiě)這件事兒了。
如果說(shuō)大家對(duì)JMH有興趣,你們?cè)诠ぷ髦锌赡軙?huì)有用的上大家去讀一下官方的例子,官方大概有好幾十個(gè)例子程序,你可以自己一個(gè)一個(gè)的去研究。
官方樣例:
http://hg.openjdk.java.net/code-tools/jmh/fifile/tip/jmh-samples/src/main/java/org/openjdk/jmh/s
amples/
Disruptor按照英文翻譯的話(huà),Disruptor應(yīng)該是分裂、瓦解。這個(gè)Disruptor是一個(gè)做金融的、做股票的這樣一個(gè)公司交易所來(lái)開(kāi)發(fā)的,為自己來(lái)開(kāi)發(fā)的這么一個(gè)底層的框架,開(kāi)發(fā)出來(lái)之后受到了很多的認(rèn)可,開(kāi)源之后,2011年獲得Duke獎(jiǎng)。如果你想把它用作MQ的話(huà),單機(jī)最快的MQ。性能非常的高,主要是它里面用的全都是cas,另外把各種各樣的性能開(kāi)發(fā)到了極致,所以他單機(jī)支持很高的一個(gè)并發(fā)。
Disruptor不是平時(shí)我們學(xué)的這個(gè)redis、不是平時(shí)我們所學(xué)的kafka,他可以跟他們一樣有類(lèi)似的用途,但他是單機(jī),redis、kafka也可以用于集群。redis他有這種序列化的機(jī)制,就是你可以把它存儲(chǔ)到硬盤(pán)上或數(shù)據(jù)庫(kù)當(dāng)中是可以的,kafka當(dāng)然也有,Disruptor沒(méi)有,Disruptor就是在內(nèi)存里,Disruptor簡(jiǎn)單理解就是內(nèi)存里用于存放元素的一個(gè)高效率的隊(duì)列。
介紹
關(guān)于Disruptor的一些資料,給大家列在這里。
主頁(yè):
http://imax-exchange.github.io/disruptor/源碼:
https://github.com/LMAX-Exchange/disruptorGettingStarted:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
api:http://imax-exchange.github.io/disruptor/docs/index.html
maven:https://mvnrepository.com/artifact/com.imax/disruptor
Disruptor叫無(wú)鎖、高并發(fā)、環(huán)形Buffffer,直接覆蓋(不用清除)舊的數(shù)據(jù),降低GC頻率,用于生產(chǎn)者消費(fèi)者模式(如果說(shuō)按照設(shè)計(jì)者角度來(lái)講他就是觀察者模式)。什么叫觀察者模式,想象一下,我們?cè)谇懊鎸W(xué)各種各樣的隊(duì)列的時(shí)候,隊(duì)列就是個(gè)容器,好多生產(chǎn)者往里頭扔?xùn)|西,好多消費(fèi)者從里頭往外拿東西。所謂的生產(chǎn)者消費(fèi)者就是這個(gè)意思,為什么我們可以叫他觀察者呢,因?yàn)檫@些消費(fèi)者正在觀察著里面有沒(méi)有新東西,如果有的話(huà)我馬上拿過(guò)來(lái)消費(fèi),所以他也是一種觀察者模式。Disruptor實(shí)現(xiàn)的就是這個(gè)容器
Disruptor核心與特點(diǎn)
Disruptor也是一個(gè)隊(duì)列,和其他隊(duì)列不一樣的是他是一個(gè)環(huán)形隊(duì)列,環(huán)形的Buffffer。一般情況下我們的容器是一個(gè)隊(duì)列,不管你是用鏈表實(shí)現(xiàn)還是用數(shù)組實(shí)現(xiàn)的,它會(huì)是一個(gè)隊(duì)列,那么這個(gè)隊(duì)列生產(chǎn)者這邊使勁往里塞,消費(fèi)者這邊使勁往外拿,但Disruptor的核心是一個(gè)環(huán)形的buffffer。

對(duì)比ConcurrentLinkedQueue:鏈表實(shí)現(xiàn)這種環(huán)形的buffffer速度就是更快,同學(xué)們可以去查一下JDK自帶的容器,你會(huì)發(fā)現(xiàn)效率比較高的有各種各樣的隊(duì)列,如果不想阻塞就可以用Concurrent相關(guān)的,ConcurrentLinkedQueue是并發(fā)的用鏈表實(shí)現(xiàn)的隊(duì)列,它里面大量的使用了cas,因此它的效率相對(duì)比較高,可是對(duì)于遍歷來(lái)講鏈表的效率一定會(huì)比數(shù)組低。
JDK中沒(méi)有ConcurrentArrayQueue
因?yàn)閿?shù)組的大小的固定的,如果想擴(kuò)展的話(huà)就要把原來(lái)的數(shù)組拷貝到新數(shù)組里,每次加都要拷貝這個(gè)效率相當(dāng)?shù)?,所以他并沒(méi)有給大家加這個(gè)叫ConcurrentArrayQueue,但是Disruptor就非常牛X,想到了這樣一個(gè)辦法,就是把數(shù)組的頭尾相連。
Disruptor是用數(shù)組實(shí)現(xiàn)的這樣的一個(gè)隊(duì)列,你可以認(rèn)為Disruptor就是用數(shù)組實(shí)現(xiàn)的ConcurrentArrayQueue,另外這個(gè)Queue是首尾相連的.
那Disruptor用數(shù)組實(shí)現(xiàn)的環(huán)形的就比上面兩個(gè)都牛嗎,牛在哪?為啥呢?如果我們用ConcurrentLinkedQueue這里面就是一個(gè)一個(gè)鏈表,這個(gè)鏈表遍歷起來(lái)肯定沒(méi)有數(shù)組快,這個(gè)是一點(diǎn)。還有第二點(diǎn)就是這個(gè)鏈表要維護(hù)一個(gè)頭指針和一個(gè)尾指針,我往頭部加的時(shí)候要加鎖,往尾部拿的時(shí)候也要加鎖。另外鏈表本身效率就偏低,還要維護(hù)兩個(gè)指針。關(guān)于環(huán)形的呢,環(huán)形本身就維護(hù)一個(gè)位置,這個(gè)位置稱(chēng)之為sequence序列,這個(gè)序列代表的是我下一個(gè)有效的元素指在什么位置上,就相當(dāng)于他只有一個(gè)指針來(lái)回轉(zhuǎn)。加在某個(gè)位置上怎么計(jì)算:直接用那個(gè)數(shù)除以我們整個(gè)的容量求余就可以了。
RingBuffffer是一個(gè)環(huán)形隊(duì)列
RingBuffffer的序號(hào),指向下一個(gè)可用的元素
采用數(shù)組實(shí)現(xiàn),沒(méi)有首尾指針
對(duì)比ConcurrentLinkedQueue,用數(shù)組實(shí)現(xiàn)的速度更快
假如長(zhǎng)度為8,當(dāng)添加到第12個(gè)元素的時(shí)候在哪個(gè)序號(hào)上呢?用12%8決定
當(dāng)Buffffer被填滿(mǎn)的時(shí)候到底是覆蓋還是等待,由Produce決定長(zhǎng)度設(shè)為2的n次冪,利于二進(jìn)制計(jì)算,例如:12%8=12&(8-1)
如果大家對(duì)于位運(yùn)算有疑問(wèn)的,在咱們網(wǎng)站上有一個(gè)菜鳥(niǎo)預(yù)習(xí),里面有一部分是二進(jìn)制,大家去翻看一下。
由于它會(huì)采用覆蓋的方式,所以他沒(méi)有必要記頭指針,沒(méi)有必要記尾指針。我只要記一個(gè)指針?lè)旁谶@就可以了。在這點(diǎn)上依然要比ConcurrentLinkedQueue要快。
那我生產(chǎn)者線(xiàn)程生產(chǎn)的特別多,消費(fèi)者沒(méi)來(lái)得及消費(fèi)那我在往后覆蓋的話(huà)怎么辦?不會(huì)那么輕易的讓你覆蓋的,我們是有策略的,我生產(chǎn)者生產(chǎn)滿(mǎn)了,要在生產(chǎn)一個(gè)的話(huà)就馬上覆蓋這個(gè)位置上的數(shù)了。這時(shí)候是不能覆蓋的,指定了一個(gè)策略叫等待策略,這里面有8種等待策略,分情況自己去用。最常見(jiàn)的是BlockingWait,滿(mǎn)了我就在這等著,什么時(shí)候你空了消費(fèi)者來(lái)喚醒一下就繼續(xù)。
Disruptor開(kāi)發(fā)步驟
開(kāi)發(fā)步驟是比較固定的一個(gè)開(kāi)發(fā)步驟。
1:定義Event-隊(duì)列中需要處理的元素。
在Disruptor他是每一個(gè)消息都認(rèn)為是一個(gè)事件,在他這個(gè)概念里就是一個(gè)事件,所以在這個(gè)環(huán)形隊(duì)列里面存的是一個(gè)一個(gè)的Event。
2:定義Event工廠,用于填充隊(duì)列
那這個(gè)Event怎么產(chǎn)生,就需要指定Event的工廠。3:定義EventHandler(消費(fèi)者),處理容器中的元素那這個(gè)Event怎么消費(fèi)呢,就需要指定Event的消費(fèi)者EventHandler。
下面我們直接看程序,先看來(lái)自官網(wǎng)的幾個(gè)輔助程序:LongEvent這個(gè)事件里面或者說(shuō)消息里面裝的什么值,我只裝了一個(gè)long值,但這里面可以裝任何值,任何類(lèi)型的都可以往里裝,這個(gè)long類(lèi)型的值我們可以指定它set,官網(wǎng)上沒(méi)有toString方法,我給大家加了一段主要是為了打印消息讓大家看的更清楚。
package com.mashibing.disruptor;public class LongEvent{private long value;public void set(long value){this.value = value;
}
@Overridepublic String toString(){return "LongEvent{" +"value=" + value +"}";
}
}然后呢,我需要一個(gè)EventFactory就是怎么產(chǎn)生這些個(gè)事件,這個(gè)Factory非常簡(jiǎn)單,
LongEventFactory去實(shí)現(xiàn)EventFactiry的接口,去重寫(xiě)它的newInstance方法直接new LongEvent。構(gòu)建這個(gè)環(huán)的時(shí)候?yàn)槭裁匆付ㄒ粋€(gè)產(chǎn)生事件的工廠,我直接new這個(gè)事件不可以嗎?但是有的事件里面的構(gòu)造方法不讓你new呢,產(chǎn)生事件工廠的話(huà)你可以靈活的指定一些 ,這里面也是牽扯到效率的。底層比較深,我給大家解釋一下:
這里牽扯效率問(wèn)題,因?yàn)镈isruptor初始化的時(shí)候會(huì)調(diào)用Event工廠,對(duì)ringBuffffer進(jìn)行內(nèi)存的提前分配,GC頻率會(huì)降低。
package com.mashibing.disruptor;import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactiry<LongEvent>{@Overridepublic LongEvent newInstance(){return new LongEvent();
}
}在看第三個(gè)叫LongEventHandler,Handler就是我拿到這個(gè)事件之后該怎么樣進(jìn)行處理,所以這里是消息的消費(fèi)者,怎么處理呢,很簡(jiǎn)單,我處理完這個(gè)消息之后呢就記一個(gè)數(shù),總共記下來(lái)我一共處理了多少消息了,處理消息的時(shí)候默認(rèn)調(diào)用的是onEvent方法,這個(gè)方法里面有三個(gè)參數(shù),第一個(gè)是你要處理的那個(gè)消息,第二個(gè)是你處理的是哪個(gè)位置上的消息,第三個(gè)是整體的消息結(jié)束沒(méi)結(jié)束,是不是處理完了。你可以判斷他如果是true的話(huà)消費(fèi)者就可以退出了,如果是false的話(huà)說(shuō)明后面還有繼續(xù)消費(fèi)。
package com.mashibing.disruptor;import com.lmax.disruptor.EventHandler;public class LongEventHandler implements EventHandler<LongEvent>{/**
*
*@param event
*@param sequence RingBuffer的序號(hào)
*@param endOfBatch 是否為最后一個(gè)元素
*@throws Exception
**/public static long count = 0;
@Overridepublic void onEvent(LongEvent event,long sequence,boolean endOfBatch) throwsException{
count++;
System.out.println("["+Thread.currentThread().getName()+"]"+event+"序
號(hào):"+sequence);
}
}所以我們定義了這三個(gè)類(lèi),關(guān)于這三個(gè)類(lèi)在給大家解釋一下,我們現(xiàn)在有一個(gè)環(huán),然后這個(gè)環(huán)上每一個(gè)位置裝LongEvent,怎么產(chǎn)生這個(gè)LongEvent通過(guò)這個(gè)LongEventFactory的newInstance方法來(lái)產(chǎn)生,當(dāng)我拿到這個(gè)Event之后通過(guò)LongEventHandler進(jìn)行處理。
到現(xiàn)在我們把這三個(gè)輔助類(lèi)都已經(jīng)定義好了,定義好的情況下我們?cè)趺床拍鼙容^有機(jī)的結(jié)合在一起,讓他在Disruptor進(jìn)行處理呢,看第一個(gè)小例子程序,首先把EvenFactory給他初始化了new
LongEventFactory,我們這個(gè)環(huán)應(yīng)該是2的N次方1024,然后new一個(gè)Disruptor出來(lái),需要指定這么幾個(gè)參數(shù):factory產(chǎn)生消息的工廠;bufffferSize是指定這個(gè)環(huán)大小到底是多少;defaultThreadFactory線(xiàn)程工廠,指的是當(dāng)他要產(chǎn)生消費(fèi)者的時(shí)候,當(dāng)要調(diào)用這個(gè)消費(fèi)者的時(shí)候他是在一個(gè)特定的線(xiàn)程里執(zhí)行的,這個(gè)線(xiàn)程就是通過(guò)defaultThreadFactory來(lái)產(chǎn)生;
繼續(xù)往下看,當(dāng)我們拿到這個(gè)消息之后怎么進(jìn)行處理啊,我們就用這個(gè)LongEventHandler來(lái)處理。然后start,當(dāng)start之后一個(gè)環(huán)起來(lái)了,每個(gè)環(huán)上指向的這個(gè)LongEvent也得初始化好,內(nèi)存分配好了,整個(gè)就安安靜靜的等待著生產(chǎn)者的到來(lái)。
看生產(chǎn)者的代碼,long sequence = ringBuffffer.next(),通過(guò)next找到下一個(gè)可用的位置,最開(kāi)始這個(gè)環(huán)是空的,下一個(gè)可用的位置是0這個(gè)位置,拿到這個(gè)位置之后直接去ringBuffffer里面get(0)這個(gè)位置上的event。如果說(shuō)你要是追求效率的極致,你應(yīng)該是一次性全部初始化好,你get的時(shí)候就不用再去判斷,如果你想做一個(gè)延遲,很不幸的是你每次都要做判斷是不是初始化了。get的時(shí)候就是拿到一個(gè)event,這個(gè)是我們new出來(lái)的默認(rèn)的,但是我們可以改里面的event.set( 值...),填好數(shù)據(jù)之后ringBuffffer.publish發(fā)布生產(chǎn)。
package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main01{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,
Executors.defaultThreadFactory());//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//官方例程long sequence = ringBuffer.next();//Grab the next sequencetry{
LongEvent event=ringBuffer.get(sequence);//Get the entry in theDisruptor//for the sequenceevent.set(8888L);//Fill with data}finally{
ringBuffer.publish(sequence);
}
}
} disruptor在后面提供了一些Lambda表達(dá)式的寫(xiě)法,為了支持這種寫(xiě)法對(duì)整個(gè)消息的構(gòu)建過(guò)程做了改進(jìn),讀下面02小程序使用translator,就是怎么樣構(gòu)建這個(gè)消息,原來(lái)我們都是用消息的factory,但是下面這次我們用translator對(duì)他進(jìn)行構(gòu)建,就是把某一些數(shù)據(jù)翻譯成消息。前面產(chǎn)生event工廠還是一樣,然后bufffferSize,后面再扔的是DaemonThreadFactory就是后臺(tái)線(xiàn)程了,new LongEventHandler然后start拿到他的ringBuffffer,前面都一樣。只有一個(gè)地方叫EventTranslator不一樣,我們?cè)趍ain01里面的代碼是要寫(xiě)try catch然后把里面的值給設(shè)好,相當(dāng)于把這個(gè)值轉(zhuǎn)換成event對(duì)象。相對(duì)簡(jiǎn)單的寫(xiě)法,它會(huì)把某些值轉(zhuǎn)成一個(gè)LongEvent,通過(guò)EventTranslator。new出來(lái)后實(shí)現(xiàn)了translateTo方法,EventTranslator它本身是一個(gè)接口,所以你要new的時(shí)候你又要實(shí)現(xiàn)它里面沒(méi)有實(shí)現(xiàn)的方法,translateTo的意思是你給我一個(gè)Event,我會(huì)把這個(gè)Event給你填好。
ringBuffffer.publishEvent(translator1) 你只要把translator1交給ringBuffffer就可以了。這個(gè)translator就是為了迎合Lambda表達(dá)式的寫(xiě)法(為java8的寫(xiě)法做準(zhǔn)備)
另外translator有很多種用法:
EventTranslatorOneArg只有帶一個(gè)參數(shù)的EventTranslator。我?guī)в幸粋€(gè)參數(shù),這個(gè)參數(shù)會(huì)通過(guò)我的translateTo方法轉(zhuǎn)換成一個(gè)LongEvent;既然有EventTranslatorOneArg就有EventTranslatorTwoArg、EventTranslatorThreeArg,還有EventTranslatorVararg多了去了Vararg就是有好多個(gè)值,我把里面的值全都給你加起來(lái)最后把結(jié)果set到event里面。
package com.mashibing.disruptor;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;public class Main02{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,
DaemonThreadFactory.INSTANCE);//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================EventTranslator translator1 = new EventTranslator
(){
@Overridepublic void translateTo(LongEvent event,long sequence){event.set(8888L); }
};
ringBuffer.publishEvent(translator1);//========================================================================EventTranslatorOneArg translator2 = newEventTranslatorOneArg(){
@Overridepublic void translateTo(LongEvent event,long sequence,Long l){event.set(l); }
};
ringBuffer.publishEvent(translator2,7777L);//========================================================================EventTranslatorTwoArg translator3 = new EventTranslatorTwoArg(){
@Overridepublic void translateTo(LongEvent event,long sequence,Long l1,Long l2){ event.set(l); }
};
ringBuffer.publishEvent(translator3,10000L,10000L);//========================================================================EventTranslatorThreeArg translator4 = newEventTranslatorThreeArg(){
@Overridepublic void translateTo(LongEvent event,long sequence,Long l1,Long
l2,Long l3){ event.set(l1+ l2+ l3); }
};
ringBuffer.publishEvent(translator4,10000L,10000L,10000L);//========================================================================EventTranslatorVararg translator5 = newEventTranslatorThreeArg(){
@Overridepublic void translateTo(LongEvent event,long sequence,Object...
objects){long result = 0;for(Object o : objects){long l =(Long)o;
result +=l;
}
}
};
ringBuffer.publishEvent(translator5,10000L,10000L,10000L,10000L);
}
} 有了上面Translator之后呢,下面看Lambda表達(dá)式怎么寫(xiě),這個(gè)是比較簡(jiǎn)潔的寫(xiě)法,連factory都省了,直接指定一個(gè)Lambda表達(dá)式LongEvent::new。繼續(xù)handleEventsWith把三個(gè)參數(shù)傳進(jìn)來(lái)后面寫(xiě)好Lambda表達(dá)式直接打印,然后start, 接著RingBuffffer,publishEvent原來(lái)我們還有寫(xiě)try...catch,現(xiàn)在簡(jiǎn)單了直接ringBuffffer.publishEvent(第一個(gè)是lambda表達(dá)式,表達(dá)式后是你指定的幾個(gè)參
數(shù)),所以現(xiàn)在的這種寫(xiě)法就不定義各種各樣的EventTranslator了。
package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main03{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new,
bufferSize,DaemonThreadFactory.INSTANCE);//Connect the handlerdisruptor.handleEventsWith((event,sequence,endOfBatch)-
>System.out.println("Event:"+event));//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ringBuffer.publishEvent((event, sequence)-> event.set(10000L));System.in.read();
}
}下面我們叫一些細(xì)節(jié),這些個(gè)細(xì)節(jié)也不難,講給大家。第一個(gè)細(xì)節(jié)是我們生產(chǎn)者的時(shí)候默認(rèn)會(huì)有好多種生產(chǎn)方式,默認(rèn)的是多線(xiàn)程生產(chǎn)者,但是假如你確定你整個(gè)程序里頭只有一個(gè)生產(chǎn)者的話(huà)那你還能提高效率,就是在你指定Disruptor生產(chǎn)者的線(xiàn)程的方式是SINGLE,生產(chǎn)者的類(lèi)型ProducerType。
ProducerType生產(chǎn)者線(xiàn)程模式
ProducerType有兩種模式ProducerMULTI和Producer.SINGLE
默認(rèn)是MULTI,表示在多線(xiàn)程模式下產(chǎn)生sequence
如果確認(rèn)是單線(xiàn)程生產(chǎn)者,那么可以指定SINGLE,效率會(huì)提升如果是多個(gè)生產(chǎn)者(多線(xiàn)程),但模式指定為SINGLE,會(huì)出什么問(wèn)題?
假如你的程序里頭只有一個(gè)生產(chǎn)者還用ProducerMULTI的話(huà),我們對(duì)序列來(lái)進(jìn)行多線(xiàn)程訪問(wèn)的時(shí)候肯定是要加鎖的,所以MULTI里面默認(rèn)是有鎖定處理的,但是假如你只有一個(gè)線(xiàn)程這個(gè)時(shí)候應(yīng)該把生產(chǎn)者指定為SINGLE,他的效率更高,因?yàn)樗锩娌患渔i。
下面這個(gè)小程序,我這里指定的是Producer.SINGLE,但是我生產(chǎn)的時(shí)候用的是一堆線(xiàn)程,當(dāng)我制定了Producer.SINGLE之后相當(dāng)于內(nèi)部對(duì)于序列的訪問(wèn)就沒(méi)有鎖了,它會(huì)把性能發(fā)揮到極致,它不會(huì)報(bào)錯(cuò),它會(huì)把你的消息靜悄悄的覆蓋了,因此你要小心一點(diǎn)。我這里這個(gè)寫(xiě)法是我有50 個(gè)線(xiàn)程然后每個(gè)線(xiàn)程生產(chǎn)100個(gè)數(shù),最后結(jié)果正常的話(huà)應(yīng)該是有5000個(gè)消費(fèi)產(chǎn)生。
package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main04_ProducerType{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the Disruptor//Disruptor disruptor = new Disruptor<>(factory,bufferSize, Executors.defaultThreadFactory());
Disruptor disruptor = new Disruptor<>(factory,bufferSize,
Executors.defaultThreadFactory(),ProducerType.SINGLE,newBlockingWaitStrategy());//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================final int threadCount = 50;
CycliBarrier barrier=new CycliBarrier(threadCount);
ExecutorService service = Executors.newCachedThreadPool();for(long i=0; ifinal long threadNum = i;
service.submit(()->{
System.out.printf("Thread %s ready to start!\n",threadNum);try{
barrier.await();
}catch(InterruptedException e){
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}for(int j=0; j<100;j++){
ringBuffer.publishEvent((event,sequence)->{
event.set(threadNum);
System.out.println("生產(chǎn)了"+threadNum);
});
}
});
}
service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);
System.out.println(LongEventHandler.count);
}
} 我們?cè)賮?lái)聊一下等待策略WaitStrategy,有好多種方法,看下面等待策略(常用)BlockingWaitStrategy:通過(guò)線(xiàn)程堵塞的方式,等待生產(chǎn)者喚醒,被喚醒后,再循環(huán)檢查依賴(lài)的sequence是否已經(jīng)消費(fèi)。
BusySpinWaitStrategy:線(xiàn)程一直自旋等待,可能比較耗cpu
LiteBlockingWaitStrategy:線(xiàn)程阻塞等待生產(chǎn)者喚醒,與BlockingWaitStrategy相比,區(qū)別在signalNeeded.getAndSet,如果兩個(gè)線(xiàn)程同時(shí)訪問(wèn)一個(gè)訪問(wèn)waitfor,一個(gè)訪問(wèn)signalAll時(shí),可以
減少lock加鎖次數(shù)
LiteTimeoutBlockingWaitStrategy:與LiteBlockingWaitStrategy相比,設(shè)置了阻塞時(shí)間,超過(guò)時(shí)
間后拋出異常
PhasedBackoffffWaitStrategy:根據(jù)時(shí)間參數(shù)和傳入的等待策略來(lái)決定使用那種等待策略TimeoutBlockingWaitStrategy:相對(duì)于BlockingWaitStrategy來(lái)說(shuō),設(shè)置了等待時(shí)間,超過(guò)后拋出異常
(常用)YieldingWaitStrategy:嘗試100次,然后Thread.yield()讓出cpu
(常用)SleepingWaitStrategy:sleep
我們常用的BlockingWaitStrategy滿(mǎn)了就等著;SleepingWaitStrategy滿(mǎn)了就睡一覺(jué),睡醒了看看能不能繼續(xù)執(zhí)行了;YieldingWaitStrategy讓出cpu,讓你消費(fèi)者趕緊消費(fèi),消費(fèi)完了之后我又回來(lái)看看我是不是又能生產(chǎn)了;一般YieldingWaitStrategy效率是最高的,但也要看實(shí)際情況適用不適用。
package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main05_WaitStrategy{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,
Executors.defaultThreadFactory(),ProducerType.MULTI,new SleepingWaitStrategy());//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================final int threadCount = 50;
CycliBarrier barrier=new CycliBarrier(threadCount);ExecutorService service = Executors.newCachedThreadPool();for(long i=0; ifinal long threadNum = i;
service.submit(()->{
System.out.printf("Thread %s ready to start!\n",threadNum);try{
barrier.await();
}catch(InterruptedException e){
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}for(int j=0; j<100;j++){
ringBuffer.publishEvent((event,sequence)->{
event.set(threadNum);
System.out.println("生產(chǎn)了"+threadNum);
});
}
});
}
service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);
System.out.println(LongEventHandler.count);
}
} 我們來(lái)看多個(gè)消費(fèi)者怎么指定,默認(rèn)的情況下只有一個(gè)消費(fèi)者,你想要有多個(gè)消費(fèi)者的時(shí)候也非常簡(jiǎn)單,看下面代碼我定義了兩個(gè)消費(fèi)者h(yuǎn)1、h2,
disruptor.handleEventsWith(h1,h2)這里面是一個(gè)可變參數(shù),所以你要想有多個(gè)消費(fèi)者的時(shí)候就往里裝,多個(gè)消費(fèi)者是位于多個(gè)線(xiàn)程里面的。
package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main06_MultiConsumer{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,
Executors.defaultThreadFactory(),ProducerType.MULTI,new SleepingWaitStrategy());//Connect the handlersLongEventHandler h1 = new LongEventHandler();LongEventHandler h2 = new LongEventHandler();
disruptor.handleEventsWith(h1,h2);//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================final int threadCount = 10;
CycliBarrier barrier=new CycliBarrier(threadCount);
ExecutorService service = Executors.newCachedThreadPool();for(long i=0; ifinal long threadNum = i;
service.submit(()->{
System.out.printf("Thread %s ready to start!\n",threadNum);try{
barrier.await();
}catch(InterruptedException e){
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}for(int j=0; j<10;j++){
ringBuffer.publishEvent((event,sequence)->{
event.set(threadNum);
System.out.println("生產(chǎn)了"+threadNum);
});
}
});
}
service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);
System.out.println(LongEventHandler.count);
}
} 還有disruptor最后一個(gè)問(wèn)題,出了異常怎么處理
消費(fèi)者異常處理
默認(rèn):
disruptor.setDefaultExceptionHandler()
覆蓋:
disruptor.handleExceptionFor().with()
看下面代碼,這這里方法里寫(xiě)了一個(gè)EventHandler是我們的消費(fèi)者,在消費(fèi)者里打印了event之后馬上拋出了異常,當(dāng)我們消費(fèi)者出現(xiàn)異常之后你不能讓整個(gè)線(xiàn)程停下來(lái),有一個(gè)消費(fèi)者出了異常那其他的消費(fèi)者就不干活了,肯定不行。handleExceptionsFor為消費(fèi)者指定Exception處理器 (h1).with后面是我們的ExceptionHandler出了異常之后該怎么辦進(jìn)行處理,重寫(xiě)三個(gè)方法,第一個(gè)是當(dāng)產(chǎn)生異常的時(shí)候在這很簡(jiǎn)單直接打印出來(lái)了;第二個(gè)是handleOnStart如果啟動(dòng)的時(shí)候出異常;第三個(gè)handleOnShutdown你該怎么處理。
package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main07_ExceptionHandler{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,
Executors.defaultThreadFactory(),ProducerType.MULTI,new SleepingWaitStrategy());//Connect the handlersEventHandler h1 = (event,sequence,end)->{
System.out.println("消費(fèi)者出異常");
};
disruptor.handleEventsWith(h1);
disruptor.handleExceptionsFor(h1).with(newExceptionHandler(){@Overridepublic void handleEventException(Throwable throwable,longl,LongEvent longEvent){
throwable.printStackTrace();
}@Overridepublic void handleOnStartException(Throwable throwable){
System.out.println("Exception Start to Handle!");
}@Overridepublic void handleOnShutdownException(Throwable throwable){
System.out.println("Exception End to Handle!");
}
});//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================final int threadCount = 1;
CycliBarrier barrier=new CycliBarrier(threadCount);
ExecutorService service = Executors.newCachedThreadPool();for(long i=0; ifinal long threadNum = i;
service.submit(()->{
System.out.printf("Thread %s ready to start!\n",threadNum);try{
barrier.await();
}catch(InterruptedException e){
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}for(int j=0; j<10;j++){
ringBuffer.publishEvent((event,sequence)->{
event.set(threadNum);
System.out.println("生產(chǎn)了"+threadNum);
});
}
});
}
service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);
System.out.println(LongEventHandler.count);
}
} disruptor是一個(gè)環(huán),然后這個(gè)環(huán)有多個(gè)生產(chǎn)者可以往里頭生產(chǎn),由于它是環(huán)形的設(shè)計(jì)效率會(huì)非常的高,我們寫(xiě)程序的時(shí)候是這樣寫(xiě)的,首先你自己定義好Event消息的格式,然后定義消息工廠,消息工廠是用來(lái)初始化整個(gè)環(huán)的時(shí)候相應(yīng)的一些位置上各種各樣不同的消息先把它new出來(lái),new出來(lái)之后先占好空間,我們?cè)谏a(chǎn)的時(shí)候只需要把這個(gè)位置上這個(gè)默認(rèn)的這塊空間拿出來(lái)往里頭填值,填好值之后
消費(fèi)者就可以往里頭消費(fèi)了,消費(fèi)完了生產(chǎn)者就可以繼續(xù)往里頭生產(chǎn)了,如果說(shuō)你生產(chǎn)者消費(fèi)的比較快,消費(fèi)者消費(fèi)的比較慢,滿(mǎn)了怎么辦,就是用各種各樣的等待策略,消費(fèi)者出了問(wèn)題之后可以用ExceptionHandler來(lái)進(jìn)行處理。
覺(jué)得文章內(nèi)容不錯(cuò)的話(huà),可以轉(zhuǎn)發(fā)關(guān)注一下小編~ 之后持續(xù)更新干貨好文~~
本文就是愿天堂沒(méi)有BUG給大家分享的內(nèi)容,大家有收獲的話(huà)可以分享下,想學(xué)習(xí)更多的話(huà)可以到微信公眾號(hào)里找我,我等你哦。
