不同并發(fā)場景下LongAdder與AtomicLong如何選擇
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

來源:https://juejin.cn/post/6921595303460241415
| 寫在前面
| volatile
volatile關(guān)鍵字可以理解為輕量級的synchronized,它的使用不會(huì)引起線程上下文的切換和調(diào)度,使用成本比synchronized低。但是volatile只保證了可見性,所謂可見性是指:當(dāng)一線程修改了被volatile修飾的變量時(shí),新值對其他線程來說總是立即可知的。volatile不適用于i++這樣的計(jì)算場景,即運(yùn)算結(jié)果依賴變量的當(dāng)前值。看個(gè)例子:VolatileTest.java。
public?class?VolatileTest?{
????private?static?final?int?THREAD_COUNT?=?20;
????private?static?volatile?int?race?=?0;
????public?static?void?increase()?{
????????race++;
????}
????public?static?void?main(String[]?args)?{
????????Thread[]?threads?=?new?Thread[THREAD_COUNT];
????????for?(int?i?=?0;?i?????????????threads[i]?=?new?Thread(new?Runnable()?{
????????????????@Override
????????????????public?void?run()?{
????????????????????for?(int?i?=?0;?i?1000;?i++)?{
????????????????????????increase();
????????????????????}
????????????????}
????????????});
????????????threads[i].start();
????????}
????????//等所有累加線程都結(jié)束
????????while?(Thread.activeCount()?>?1)?{
????????????Thread.yield();
????????}
????????System.out.println("race:?"?+?race);
????}
}
原因出在increase方法上,雖然increase方法只有一行,但是反編譯以后會(huì)發(fā)現(xiàn)只有一行代碼的increase方法是由四行字節(jié)碼指令構(gòu)成的。
| AtomicLong
雖然通過對increase方法加鎖可以保證結(jié)果的正確性,但是synchronized、ReentLock都是互斥鎖,同一時(shí)刻只允許一個(gè)線程執(zhí)行其余線程只能等待,執(zhí)行效率會(huì)非常差。還好jdk針對這種運(yùn)算的場景提供了原子類,將上述被volatile修飾的int類型的race變量修改為AtomicLong類型,代碼如下:AtomicLongTest.java。
public?class?AtomicLongTest?{
????private?static?final?int?THREAD_COUNT?=?20;
????private?static?volatile?AtomicLong?race?=?new?AtomicLong(0);
????public?static?void?increase()?{
????????race.getAndIncrement();
????}
????public?static?void?main(String[]?args)?{
????????Thread[]?threads?=?new?Thread[THREAD_COUNT];
????????for?(int?i?=?0;?i?????????????threads[i]?=?new?Thread(new?Runnable()?{
????????????????@Override
????????????????public?void?run()?{
????????????????????for?(int?i?=?0;?i?1000;?i++)?{
????????????????????????increase();
????????????????????}
????????????????}
????????????});
????????????threads[i].start();
????????}
????????//等所有累加線程都結(jié)束
????????while?(Thread.activeCount()?>?1)?{
????????????Thread.yield();
????????}
????????System.out.println("race:?"?+?race);
????}
}
運(yùn)算后得到了預(yù)期結(jié)果:20000。
| LongAdder
LongAdder的使用姿勢和AtomicLong類似,將上面代碼中的AtomicLong修改為LongAdder,測試代碼如下:
public?class?LongAdderTest?{
????private?static?final?int?THREAD_COUNT?=?20;
????//默認(rèn)初始化為0值
????private?static?volatile?LongAdder?race?=?new?LongAdder();
????public?static?void?increase()?{
????????race.increment();
????}
????public?static?void?main(String[]?args)?{
????????Thread[]?threads?=?new?Thread[THREAD_COUNT];
????????for?(int?i?=?0;?i?????????????threads[i]?=?new?Thread(new?Runnable()?{
????????????????@Override
????????????????public?void?run()?{
????????????????????for?(int?i?=?0;?i?1000;?i++)?{
????????????????????????increase();
????????????????????}
????????????????}
????????????});
????????????threads[i].start();
????????}
????????while?(Thread.activeCount()?>?1)?{
????????????Thread.yield();
????????}
????????System.out.println("race:?"?+?race);
????}
}
結(jié)果也是預(yù)期的20000。
| AtomicLong和LongAdder性能比較
了解了volatile關(guān)鍵字,AtomicLong和LongAdder后,來測試一下AtomicLong和LongAdder性能,兩者的功能都差不多,如何選擇應(yīng)該用數(shù)據(jù)說話
使用JMH做Benchmark基準(zhǔn)測試,測試代碼如下:
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public?class?PerformaceTest?{
????private?static?AtomicLong?atomicLong?=?new?AtomicLong();
????private?static?LongAdder?longAdder?=?new?LongAdder();
????@Benchmark
????@Threads(10)
????public?void?atomicLongAdd()?{
????????atomicLong.getAndIncrement();
????}
????@Benchmark
????@Threads(10)
????public?void?longAdderAdd()?{
????????longAdder.increment();
????}
????public?static?void?main(String[]?args)?throws?RunnerException?{
????????Options?options?=?new?OptionsBuilder().include(PerformaceTest.class.getSimpleName()).build();
????????new?Runner(options).run();
????}
}
說明:
@BenchmarkMode(Mode.Throughput) => 測試吞吐量 @OutputTimeUnit(TimeUnit.MILLISECONDS) => 輸出的時(shí)間單位 @Threads(10) => 每個(gè)進(jìn)程中的測試線程數(shù)
線程數(shù)為1:
Benchmark??????????????????????Mode??Cnt???????Score?????Error???Units
PerformaceTest.atomicLongAdd??thrpt??200??153824.699?±?137.947??ops/ms
PerformaceTest.longAdderAdd???thrpt??200??124087.220?±??81.015??ops/ms線程數(shù)為5:
PerformaceTest.atomicLongAdd??thrpt??200???56392.136?±?1165.361??ops/ms
PerformaceTest.longAdderAdd???thrpt??200??605501.870?±?4140.190??ops/ms線程數(shù)為10:
Benchmark??????????????????????Mode??Cnt???????Score??????Error???Units
PerformaceTest.atomicLongAdd??thrpt??200???53286.334?±??957.765??ops/ms
PerformaceTest.longAdderAdd???thrpt??200??713884.602?±?3950.884??ops/ms
從測試結(jié)果來看,當(dāng)線程數(shù)為5時(shí),LongAdder的性能已經(jīng)優(yōu)于AtomicLong。
| 產(chǎn)生性能差異的原因
AtomicLong#getAndIncrement方法分析
//AtomicLong#getAndIncrement
public?final?long?getAndIncrement()?{
????return?unsafe.getAndAddLong(this,?valueOffset,?1L);
}
//Unsafe#getAndAddLong
public?final?long?getAndAddLong(Object?var1,?long?var2,?long?var4)?{
????long?var6;
????do?{
????????var6?=?this.getLongVolatile(var1,?var2);
????}?while(!this.compareAndSwapLong(var1,?var2,?var6,?var6?+?var4));
????return?var6;
}
底層使用的是CAS算法,JVM中的CAS操作是利用了處理器提供的CMPXCHG指令實(shí)現(xiàn)的。自旋CAS實(shí)現(xiàn)的基本思路就是循環(huán)進(jìn)行CAS操作直到成功為止,也正是因?yàn)檫@樣的實(shí)現(xiàn)思路也帶來了在高并發(fā)下的性能問題。循環(huán)時(shí)間長開銷大,自旋CAS如果長時(shí)間不成功,會(huì)給處理器帶來非常大的執(zhí)行開銷。在高并發(fā)環(huán)境下,N個(gè)線程同時(shí)進(jìn)行自旋操作,會(huì)出現(xiàn)大量失敗并不斷自旋的情況,所以在上述測試中,當(dāng)測試線程數(shù)非常多時(shí),使用LongAdder的性能優(yōu)于使用AtomicLong。
LongAdder#increment方法分析
public?void?increment()?{
????add(1L);
}
public?void?add(long?x)?{
????Cell[]?as;?long?b,?v;?int?m;?Cell?a;
????if?((as?=?cells)?!=?null?||?!casBase(b?=?base,?b?+?x))?{
????????boolean?uncontended?=?true;
????????if?(as?==?null?||?(m?=?as.length?-?1)?0?||
????????????(a?=?as[getProbe()?&?m])?==?null?||
????????????!(uncontended?=?a.cas(v?=?a.value,?v?+?x)))
????????????longAccumulate(x,?null,?uncontended);
????}
}
final?void?longAccumulate(long?x,?LongBinaryOperator?fn,
??????????????????????????????boolean?wasUncontended)?{
????int?h;
????if?((h?=?getProbe())?==?0)?{
????????ThreadLocalRandom.current();?//?force?initialization
????????h?=?getProbe();
????????wasUncontended?=?true;
????}
????boolean?collide?=?false;????????????????//?True?if?last?slot?nonempty
????for?(;;)?{
????????Cell[]?as;?Cell?a;?int?n;?long?v;
????????if?((as?=?cells)?!=?null?&&?(n?=?as.length)?>?0)?{
????????????if?((a?=?as[(n?-?1)?&?h])?==?null)?{
????????????????if?(cellsBusy?==?0)?{???????//?Try?to?attach?new?Cell
????????????????????Cell?r?=?new?Cell(x);???//?Optimistically?create
????????????????????if?(cellsBusy?==?0?&&?casCellsBusy())?{
????????????????????????boolean?created?=?false;
????????????????????????try?{???????????????//?Recheck?under?lock
????????????????????????????Cell[]?rs;?int?m,?j;
????????????????????????????if?((rs?=?cells)?!=?null?&&
????????????????????????????????(m?=?rs.length)?>?0?&&
????????????????????????????????rs[j?=?(m?-?1)?&?h]?==?null)?{
????????????????????????????????rs[j]?=?r;
????????????????????????????????created?=?true;
????????????????????????????}
????????????????????????}?finally?{
????????????????????????????cellsBusy?=?0;
????????????????????????}
????????????????????????if?(created)
????????????????????????????break;
????????????????????????continue;???????????//?Slot?is?now?non-empty
????????????????????}
????????????????}
????????????????collide?=?false;
????????????}
????????????else?if?(!wasUncontended)???????//?CAS?already?known?to?fail
????????????????wasUncontended?=?true;??????//?Continue?after?rehash
????????????else?if?(a.cas(v?=?a.value,?((fn?==?null)???v?+?x?:
?????????????????????????????????????????????fn.applyAsLong(v,?x))))
????????????????break;
????????????else?if?(n?>=?NCPU?||?cells?!=?as)
????????????????collide?=?false;????????????//?At?max?size?or?stale
????????????else?if?(!collide)
????????????????collide?=?true;
????????????else?if?(cellsBusy?==?0?&&?casCellsBusy())?{
????????????????try?{
????????????????????if?(cells?==?as)?{??????//?Expand?table?unless?stale
????????????????????????Cell[]?rs?=?new?Cell[n?<1];
????????????????????????for?(int?i?=?0;?i?????????????????????????????rs[i]?=?as[i];
????????????????????????cells?=?rs;
????????????????????}
????????????????}?finally?{
????????????????????cellsBusy?=?0;
????????????????}
????????????????collide?=?false;
????????????????continue;???????????????????//?Retry?with?expanded?table
????????????}
????????????h?=?advanceProbe(h);
????????}
????????else?if?(cellsBusy?==?0?&&?cells?==?as?&&?casCellsBusy())?{
????????????boolean?init?=?false;
????????????try?{???????????????????????????//?Initialize?table
????????????????if?(cells?==?as)?{
????????????????????Cell[]?rs?=?new?Cell[2];
????????????????????rs[h?&?1]?=?new?Cell(x);
????????????????????cells?=?rs;
????????????????????init?=?true;
????????????????}
????????????}?finally?{
????????????????cellsBusy?=?0;
????????????}
????????????if?(init)
????????????????break;
????????}
????????else?if?(casBase(v?=?base,?((fn?==?null)???v?+?x?:
????????????????????????????????????????fn.applyAsLong(v,?x))))
????????????break;??????????????????????????//?Fall?back?on?using?base
????}
}代碼很長,可以結(jié)合圖片理解:

LongAdder性能高的原因是通過使用Cell數(shù)組,以空間換效率避免共享變量的競爭,在LongAdder中內(nèi)部使用base變量保存Long值 ,當(dāng)沒有線程沖突時(shí),使用CAS更新base的值,而存在線程沖突時(shí),沒有執(zhí)行CAS成功的線程將CAS操作Cell數(shù)組,將數(shù)組中的元素置為1,即cell[i]=1,最后獲取計(jì)數(shù)時(shí)會(huì)計(jì)算cell[i]的總和在加base,即為最后的計(jì)數(shù)結(jié)果,sum代碼如下:
public?long?sum()?{
????Cell[]?as?=?cells;?Cell?a;
????long?sum?=?base;
????if?(as?!=?null)?{
????????for?(int?i?=?0;?i?????????????if?((a?=?as[i])?!=?null)
????????????????sum?+=?a.value;
????????}
????}
????return?sum;
}

| AtomicLong和LongAdder選擇
高并發(fā)下選擇LongAdder,非高并發(fā)下選擇AtomicLong。
后臺回復(fù)?學(xué)習(xí)資料?領(lǐng)取學(xué)習(xí)視頻
