如何合理地估算線程池大小?

Java技術(shù)棧
www.javastack.cn
關(guān)注閱讀更多優(yōu)質(zhì)文章
這個(gè)問題雖然看起來很小,卻并不那么容易回答。
大家如果有更好的方法歡迎賜教,先來一個(gè)天真的估算方法:
假設(shè)要求一個(gè)系統(tǒng)的TPS(Transaction Per Second或者Task Per Second)至少為20,然后假設(shè)每個(gè)Transaction由一個(gè)線程完成,繼續(xù)假設(shè)平均每個(gè)線程處理一個(gè)Transaction的時(shí)間為4s。
那么問題轉(zhuǎn)化為:如何設(shè)計(jì)線程池大小,使得可以在1s內(nèi)處理完20個(gè)Transaction?
計(jì)算過程很簡(jiǎn)單,每個(gè)線程的處理能力為0.25TPS,那么要達(dá)到20TPS,顯然需要20/0.25=80個(gè)線程。
很顯然這個(gè)估算方法很天真,因?yàn)樗鼪]有考慮到CPU數(shù)目。一般服務(wù)器的CPU核數(shù)為16或者32,如果有80個(gè)線程,那么肯定會(huì)帶來太多不必要的線程上下文切換開銷。
再來第二種簡(jiǎn)單的但不知是否可行的方法(N為CPU總核數(shù)):
如果是CPU密集型應(yīng)用,則線程池大小設(shè)置為N+1 如果是IO密集型應(yīng)用,則線程池大小設(shè)置為2N+1
如果一臺(tái)服務(wù)器上只部署這一個(gè)應(yīng)用并且只有這一個(gè)線程池,那么這種估算或許合理,具體還需自行測(cè)試驗(yàn)證。
接下來在這個(gè)文檔:服務(wù)器性能IO優(yōu)化 中發(fā)現(xiàn)一個(gè)估算公式:
最佳線程數(shù)目 = ((線程等待時(shí)間+線程CPU時(shí)間)/線程CPU時(shí)間 )* CPU數(shù)目
比如平均每個(gè)線程CPU運(yùn)行時(shí)間為0.5s,而線程等待時(shí)間(非CPU運(yùn)行時(shí)間,比如IO)為1.5s,CPU核心數(shù)為8,那么根據(jù)上面這個(gè)公式估算得到:((0.5+1.5)/0.5)*8=32。這個(gè)公式進(jìn)一步轉(zhuǎn)化為:
可以得出一個(gè)結(jié)論:線程等待時(shí)間所占比例越高,需要越多線程。線程CPU時(shí)間所占比例越高,需要越少線程。
上一種估算方法也和這個(gè)結(jié)論相合。
一個(gè)系統(tǒng)最快的部分是CPU,所以決定一個(gè)系統(tǒng)吞吐量上限的是CPU。增強(qiáng)CPU處理能力,可以提高系統(tǒng)吞吐量上限。但根據(jù)短板效應(yīng),真實(shí)的系統(tǒng)吞吐量并不能單純根據(jù)CPU來計(jì)算。那要提高系統(tǒng)吞吐量,就需要從“系統(tǒng)短板”(比如網(wǎng)絡(luò)延遲、IO)著手:
盡量提高短板操作的并行化比率,比如多線程下載技術(shù) 增強(qiáng)短板能力,比如用NIO替代IO
第一條可以聯(lián)系到Amdahl定律,這條定律定義了串行系統(tǒng)并行化后的加速比計(jì)算公式:
加速比=優(yōu)化前系統(tǒng)耗時(shí) / 優(yōu)化后系統(tǒng)耗時(shí)
加速比越大,表明系統(tǒng)并行化的優(yōu)化效果越好。Addahl定律還給出了系統(tǒng)并行度、CPU數(shù)目和加速比的關(guān)系,加速比為Speedup,系統(tǒng)串行化比率(指串行執(zhí)行代碼所占比率)為F,CPU數(shù)目為N:
Speedup?<=?1?/?(F?+?(1-F)/N)
當(dāng)N足夠大時(shí),串行化比率F越小,加速比Speedup越大。
寫到這里,我突然冒出一個(gè)問題。
答案是否定的,比如Redis就是單線程的,但它卻非常高效,基本操作都能達(dá)到十萬量級(jí)/s。從線程這個(gè)角度來看,部分原因在于:
多線程帶來線程上下文切換開銷,單線程就沒有這種開銷 鎖
當(dāng)然“Redis很快”更本質(zhì)的原因在于:Redis基本都是內(nèi)存操作,這種情況下單線程可以很高效地利用CPU。而多線程適用場(chǎng)景一般是:存在相當(dāng)比例的IO和網(wǎng)絡(luò)操作。
另外,關(guān)注公眾號(hào)Java技術(shù)棧,在后臺(tái)回復(fù):面試,可以獲取我整理的 Java 系統(tǒng)面試題和答案,包括多線程、JVM 等,非常齊全。
所以即使有上面的簡(jiǎn)單估算方法,也許看似合理,但實(shí)際上也未必合理,都需要結(jié)合系統(tǒng)真實(shí)情況(比如是IO密集型或者是CPU密集型或者是純內(nèi)存操作)和硬件環(huán)境(CPU、內(nèi)存、硬盤讀寫速度、網(wǎng)絡(luò)狀況等)來不斷嘗試達(dá)到一個(gè)符合實(shí)際的合理估算值。
最后來一個(gè)“Dark Magic”估算方法(因?yàn)槲視簳r(shí)還沒有搞懂它的原理),使用下面的類:
package?threadpool;
import?java.math.BigDecimal;
import?java.math.RoundingMode;
import?java.util.Timer;
import?java.util.TimerTask;
import?java.util.concurrent.BlockingQueue;
/**
?*?A?class?that?calculates?the?optimal?thread?pool?boundaries.?It?takes?the
?*?desired?target?utilization?and?the?desired?work?queue?memory?consumption?as
?*?input?and?retuns?thread?count?and?work?queue?capacity.
?*
?*?@author?Niklas?Schlimm
?*/
public?abstract?class?PoolSizeCalculator?{
????/**
?????*?The?sample?queue?size?to?calculate?the?size?of?a?single?{@link?Runnable}
?????*?element.
?????*/
????private?final?int?SAMPLE_QUEUE_SIZE?=?1000;
????/**
?????*?Accuracy?of?test?run.?It?must?finish?within?20ms?of?the?testTime
?????*?otherwise?we?retry?the?test.?This?could?be?configurable.
?????*/
????private?final?int?EPSYLON?=?20;
????/**
?????*?Control?variable?for?the?CPU?time?investigation.
?????*/
????private?volatile?boolean?expired;
????/**
?????*?Time?(millis)?of?the?test?run?in?the?CPU?time?calculation.
?????*/
????private?final?long?testtime?=?3000;
????/**
?????*?Calculates?the?boundaries?of?a?thread?pool?for?a?given?{@link?Runnable}.
?????*
?????*?@param?targetUtilization?the?desired?utilization?of?the?CPUs?(0?<=?targetUtilization?<=??????*????????????1)??????*?@param?targetQueueSizeBytes??????*????????????the?desired?maximum?work?queue?size?of?the?thread?pool?(bytes)
?????*/
????protected?void?calculateBoundaries(BigDecimal?targetUtilization,?BigDecimal?targetQueueSizeBytes)?{
????????calculateOptimalCapacity(targetQueueSizeBytes);
????????Runnable?task?=?creatTask();
????????start(task);
????????start(task);?//?warm?up?phase
????????long?cputime?=?getCurrentThreadCPUTime();
????????start(task);?//?test?intervall
????????cputime?=?getCurrentThreadCPUTime()?-?cputime;
????????long?waittime?=?(testtime?*?1000000)?-?cputime;
????????calculateOptimalThreadCount(cputime,?waittime,?targetUtilization);
????}
????private?void?calculateOptimalCapacity(BigDecimal?targetQueueSizeBytes)?{
????????long?mem?=?calculateMemoryUsage();
????????BigDecimal?queueCapacity?=?targetQueueSizeBytes.divide(new?BigDecimal(mem),
????????????????RoundingMode.HALF_UP);
????????System.out.println("Target?queue?memory?usage?(bytes):?"
????????????????+?targetQueueSizeBytes);
????????System.out.println("createTask()?produced?"?+?creatTask().getClass().getName()?+?"?which?took?"?+?mem?+?"?bytes?in?a?queue");
????????System.out.println("Formula:?"?+?targetQueueSizeBytes?+?"?/?"?+?mem);
????????System.out.println("*?Recommended?queue?capacity?(bytes):?"?+?queueCapacity);
????}
????/**
?????*?Brian?Goetz'?optimal?thread?count?formula,?see?'Java?Concurrency?in
?????*?*?Practice'?(chapter?8.2)??????*
?????*?*?@param?cpu
?????*?*????????????cpu?time?consumed?by?considered?task
?????*?*?@param?wait
?????*?*????????????wait?time?of?considered?task
?????*?*?@param?targetUtilization
?????*?*????????????target?utilization?of?the?system
?????*/
????private?void?calculateOptimalThreadCount(long?cpu,?long?wait,
?????????????????????????????????????????????BigDecimal?targetUtilization)?{
????????BigDecimal?waitTime?=?new?BigDecimal(wait);
????????BigDecimal?computeTime?=?new?BigDecimal(cpu);
????????BigDecimal?numberOfCPU?=?new?BigDecimal(Runtime.getRuntime()
????????????????.availableProcessors());
????????BigDecimal?optimalthreadcount?=?numberOfCPU.multiply(targetUtilization)
????????????????.multiply(new?BigDecimal(1).add(waitTime.divide(computeTime,
????????????????????????RoundingMode.HALF_UP)));
????????System.out.println("Number?of?CPU:?"?+?numberOfCPU);
????????System.out.println("Target?utilization:?"?+?targetUtilization);
????????System.out.println("Elapsed?time?(nanos):?"?+?(testtime?*?1000000));
????????System.out.println("Compute?time?(nanos):?"?+?cpu);
????????System.out.println("Wait?time?(nanos):?"?+?wait);
????????System.out.println("Formula:?"?+?numberOfCPU?+?"?*?"
????????????????+?targetUtilization?+?"?*?(1?+?"?+?waitTime?+?"?/?"
????????????????+?computeTime?+?")");
????????System.out.println("*?Optimal?thread?count:?"?+?optimalthreadcount);
????}
????/**
?????*?*?Runs?the?{@link?Runnable}?over?a?period?defined?in?{@link?#testtime}.
?????*?*?Based?on?Heinz?Kabbutz'?ideas
?????*?*?(http://www.javaspecialists.eu/archive/Issue124.html).
?????*?*
?????*?*?@param?task
?????*?*????????????the?runnable?under?investigation
?????*/
????public?void?start(Runnable?task)?{
????????long?start?=?0;
????????int?runs?=?0;
????????do?{
????????????if?(++runs?>?5)?{
????????????????throw?new?IllegalStateException("Test?not?accurate");
????????????}
????????????expired?=?false;
????????????start?=?System.currentTimeMillis();
????????????Timer?timer?=?new?Timer();
????????????timer.schedule(new?TimerTask()?{
????????????????public?void?run()?{
????????????????????expired?=?true;
????????????????}
????????????},?testtime);
????????????while?(!expired)?{
????????????????task.run();
????????????}
????????????start?=?System.currentTimeMillis()?-?start;
????????????timer.cancel();
????????}?while?(Math.abs(start?-?testtime)?>?EPSYLON);
????????collectGarbage(3);
????}
????private?void?collectGarbage(int?times)?{
????????for?(int?i?=?0;?i?times;?i++)?{
????????????System.gc();
????????????try?{
????????????????Thread.sleep(10);
????????????}?catch?(InterruptedException?e)?{
????????????????Thread.currentThread().interrupt();
????????????????break;
????????????}
????????}
????}
????/**
?????*?Calculates?the?memory?usage?of?a?single?element?in?a?work?queue.?Based?on
?????*?Heinz?Kabbutz'?ideas
?????*?(http://www.javaspecialists.eu/archive/Issue029.html).
?????*
?????*?@return?memory?usage?of?a?single?{@link?Runnable}?element?in?the?thread
?????*?pools?work?queue
?????*/
????public?long?calculateMemoryUsage()?{
????????BlockingQueue?queue?=?createWorkQueue();
????????for?(int?i?=?0;?i?????????????queue.add(creatTask());
????????}
????????long?mem0?=?Runtime.getRuntime().totalMemory()?-?Runtime.getRuntime().freeMemory();
????????long?mem1?=?Runtime.getRuntime().totalMemory()?-?Runtime.getRuntime().freeMemory();
????????queue?=?null;
????????collectGarbage(15);
????????mem0?=?Runtime.getRuntime().totalMemory()?-?Runtime.getRuntime().freeMemory();
????????queue?=?createWorkQueue();
????????for?(int?i?=?0;?i?????????????queue.add(creatTask());
????????}
????????collectGarbage(15);
????????mem1?=?Runtime.getRuntime().totalMemory()?-?Runtime.getRuntime().freeMemory();
????????return?(mem1?-?mem0)?/?SAMPLE_QUEUE_SIZE;
????}
????/**
?????*?Create?your?runnable?task?here.
?????*
?????*?@return?an?instance?of?your?runnable?task?under?investigation
?????*/
????protected?abstract?Runnable?creatTask();
????/**
?????*?Return?an?instance?of?the?queue?used?in?the?thread?pool.
?????*
?????*?@return?queue?instance
?????*/
????protected?abstract?BlockingQueue?createWorkQueue();
????/**
?????*?Calculate?current?cpu?time.?Various?frameworks?may?be?used?here,
?????*?depending?on?the?operating?system?in?use.?(e.g.
?????*?http://www.hyperic.com/products/sigar).?The?more?accurate?the?CPU?time
?????*?measurement,?the?more?accurate?the?results?for?thread?count?boundaries.
?????*
?????*?@return?current?cpu?time?of?current?thread
?????*/
????protected?abstract?long?getCurrentThreadCPUTime();
}
然后自己繼承這個(gè)抽象類并實(shí)現(xiàn)它的三個(gè)抽象方法,比如下面是我寫的一個(gè)示例(任務(wù)是請(qǐng)求網(wǎng)絡(luò)數(shù)據(jù)),其中我指定期望CPU利用率為1.0(即100%),任務(wù)隊(duì)列總大小不超過100,000字節(jié):
package?threadpool;
import?java.io.BufferedReader;
import?java.io.IOException;
import?java.io.InputStreamReader;
import?java.lang.management.ManagementFactory;
import?java.math.BigDecimal;
import?java.net.HttpURLConnection;
import?java.net.URL;
import?java.util.concurrent.BlockingQueue;
import?java.util.concurrent.LinkedBlockingQueue;
public?class?SimplePoolSizeCaculatorImpl?extends?PoolSizeCalculator?{
????@Override
????protected?Runnable?creatTask()?{
????????return?new?AsyncIOTask();
????}
????@Override
????protected?BlockingQueue?createWorkQueue()?{
????????return?new?LinkedBlockingQueue(1000);
????}
????@Override
????protected?long?getCurrentThreadCPUTime()?{
????????return?ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();
????}
????public?static?void?main(String[]?args)?{
????????PoolSizeCalculator?poolSizeCalculator?=?new?SimplePoolSizeCaculatorImpl();
????????poolSizeCalculator.calculateBoundaries(new?BigDecimal(1.0),?new?BigDecimal(100000));
????}
}
/**
?*?自定義的異步IO任務(wù)
?*?@author?Will
?*
?*/
class?AsyncIOTask?implements?Runnable?{
????public?void?run()?{
????????HttpURLConnection?connection?=?null;
????????BufferedReader?reader?=?null;
????????try?{
????????????String?getURL?=?"http://baidu.com";
????????????URL?getUrl?=?new?URL(getURL);
????????????connection?=?(HttpURLConnection)?getUrl.openConnection();
????????????connection.connect();
????????????reader?=?new?BufferedReader(new?InputStreamReader(
????????????????????connection.getInputStream()));
????????????String?line;
????????????while?((line?=?reader.readLine())?!=?null)?{
????????????????//?empty?loop
????????????}
????????}
????????catch?(IOException?e)?{
????????}?finally?{
????????????if(reader?!=?null)?{
????????????????try?{
????????????????????reader.close();
????????????????}
????????????????catch(Exception?e)?{
????????????????}
????????????}
????????????connection.disconnect();
????????}
????}
}
得到如下輸出:
Target?queue?memory?usage?(bytes):?100000
createTask()?produced?threadpool.AsyncIOTask?which?took?40?bytes?in?a?queue
Formula:?100000?/?40
*?Recommended?queue?capacity?(bytes):?2500
Number?of?CPU:?8
Target?utilization:?1
Elapsed?time?(nanos):?3000000000
Compute?time?(nanos):?280801800
Wait?time?(nanos):?2719198200
Formula:?8?*?1?*?(1?+?2719198200?/?280801800)
*?Optimal?thread?count:?88
推薦的任務(wù)隊(duì)列大小為2500,線程數(shù)為88。依次為依據(jù),我們就可以構(gòu)造這樣一個(gè)線程池:
ThreadPoolExecutor?pool?=?new?ThreadPoolExecutor(88,?88,?0L,?TimeUnit.MILLISECONDS,?new?LinkedBlockingQueue(2500));
可以將這個(gè)文件打包成可執(zhí)行的jar文件,這樣就可以拷貝到測(cè)試/正式環(huán)境上執(zhí)行。
"http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
??xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/xsd/maven-4.0.0.xsd">
????4.0.0
????threadpool
????dark-magic
????1.0-SNAPSHOT
????jar
????dark_magic
????http://maven.apache.org
????
????????UTF-8
????
????
????
????
????????dark-magic
????????
????????????
????????????????maven-assembly-plugin
????????????????
????????????????????false
????????????????????
????????????????????????jar-with-dependencies
????????????????????
????????????????????
????????????????????????
????????????????????????????
????????????????????????????threadpool.SimplePoolSizeCaculatorImpl
????????????????????????
????????????????????
????????????????
????????????????
????????????????????
????????????????????????make-assembly
????????????????????????package
????????????????????????
????????????????????????????assembly
????????????????????????
????????????????????
????????????????
????????????
????????
????

來源:
www.cnblogs.com/cjsblog/p/9068886.html
參考:
http://ifeve.com/how-to-calculate-threadpool-size/
http://www.importnew.com/17384.html
https://www.cnblogs.com/cherish010/p/8334952.html






關(guān)注Java技術(shù)棧看更多干貨


