擁抱 Java 8 并行流:執(zhí)行速度飛起
在 Java 7 之前,如果想要并行處理一個(gè)集合,我們需要以下幾步?
1. 手動(dòng)分成幾部分?
2. 為每部分創(chuàng)建線程?
3. 在適當(dāng)?shù)臅r(shí)候合并。
并且還需要關(guān)注多個(gè)線程之間共享變量的修改問題。而 Java8 為我們提供了并行流,可以一鍵開啟并行模式。是不是很酷呢?讓我們來看看。
并行流
認(rèn)識(shí)和開啟并行流
List appleList =?new?ArrayList<>();?// 假裝數(shù)據(jù)是從庫里查出來的
for?(Apple apple : appleList) {
????apple.setPrice(5.0?* apple.getWeight() /?1000);
} appleList.parallelStream().forEach(apple -> apple.setPrice(5.0?* apple.getWeight() /?1000));parallelStream()?方法。當(dāng)然也可以通過 stream.parallel() 將普通流轉(zhuǎn)換成并行流。并行流也能通過 sequential() 方法轉(zhuǎn)換為順序流,但要注意:流的并行和順序轉(zhuǎn)換不會(huì)對(duì)流本身做任何實(shí)際的變化,僅僅是打了個(gè)標(biāo)記而已。并且在一條流水線上對(duì)流進(jìn)行多次并行 / 順序的轉(zhuǎn)換,生效的是最后一次的方法調(diào)用測(cè)試并行流的性能
public?static?void?main(String[] args) throws InterruptedException {
????List appleList = initAppleList();
????Date?begin =?new?Date();
????for?(Apple apple : appleList) {
????????apple.setPrice(5.0?* apple.getWeight() /?1000);
????????Thread.sleep(1000);
????}
????Date?end =?new?Date();
????log.info("蘋果數(shù)量:{}個(gè), 耗時(shí):{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);
} 
List appleList = initAppleList();
Date?begin =?new?Date();
appleList.parallelStream().forEach(apple ->
???????????????????????????????????{
???????????????????????????????????????apple.setPrice(5.0?* apple.getWeight() /?1000);
???????????????????????????????????????try?{
???????????????????????????????????????????Thread.sleep(1000);
???????????????????????????????????????}?catch?(InterruptedException e) {
???????????????????????????????????????????e.printStackTrace();
???????????????????????????????????????}
???????????????????????????????????}
??????????????????????????????????);
Date?end =?new?Date();
log.info("蘋果數(shù)量:{}個(gè), 耗時(shí):{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000); 
并行流可以隨便用嗎?
可拆分性影響流的速度
1. 對(duì)于 iterate 方法來處理的前 n 個(gè)數(shù)字來說,不管并行與否,它總是慢于循環(huán)的,非并行版本可以理解為流化操作沒有循環(huán)更偏向底層導(dǎo)致的慢。可并行版本是為什么慢呢?這里有兩個(gè)需要注意的點(diǎn):
iterate 生成的是裝箱的對(duì)象,必須拆箱成數(shù)字才能求和
我們很難把 iterate 分成多個(gè)獨(dú)立的塊來并行執(zhí)行
這個(gè)問題很有意思,我們必須意識(shí)到某些流操作比其他操作更容易并行化。對(duì)于 iterate 來說,每次應(yīng)用這個(gè)函數(shù)都要依賴于前一次應(yīng)用的結(jié)果。因此在這種情況下,我們不僅不能有效的將流劃分成小塊處理。反而還因?yàn)椴⑿谢俅卧黾恿碎_支。
2. 而對(duì)于 LongStream.rangeClosed() 方法來說,就不存在 iterate 的第兩個(gè)痛點(diǎn)了。它生成的是基本類型的值,不用拆裝箱操作,另外它可以直接將要生成的數(shù)字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 這樣四部分。因此并行狀態(tài)下的 rangeClosed() 是快于 for 循環(huán)外部迭代的:
package?lambdasinaction.chap7;
import?java.util.stream.*;
public?class?ParallelStreams?{
????public?static?long?iterativeSum(long?n)?{
????????long?result =?0;
????????for?(long?i =?0; i <= n; i++) {
????????????result += i;
????????}
????????return?result;
????}
????public?static?long?sequentialSum(long?n)?{
????????return?Stream.iterate(1L, i -> i +?1).limit(n).reduce(Long::sum).get();
????}
????public?static?long?parallelSum(long?n)?{
????????return?Stream.iterate(1L, i -> i +?1).limit(n).parallel().reduce(Long::sum).get();
????}
????public?static?long?rangedSum(long?n)?{
????????return?LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();
????}
????public?static?long?parallelRangedSum(long?n)?{
????????return?LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();
????}
}package?lambdasinaction.chap7;
import?java.util.concurrent.*;
import?java.util.function.*;
public?class?ParallelStreamsHarness?{
????public?static?final?ForkJoinPool FORK_JOIN_POOL =?new?ForkJoinPool();
????public?static?void?main(String[] args)?{
????????System.out.println("Iterative Sum done in: "?+ measurePerf(ParallelStreams::iterativeSum,?10_000_000L) +?" msecs");
????????System.out.println("Sequential Sum done in: "?+ measurePerf(ParallelStreams::sequentialSum,?10_000_000L) +?" msecs");
????????System.out.println("Parallel forkJoinSum done in: "?+ measurePerf(ParallelStreams::parallelSum,?10_000_000L) +?" msecs"?);
????????System.out.println("Range forkJoinSum done in: "?+ measurePerf(ParallelStreams::rangedSum,?10_000_000L) +?" msecs");
????????System.out.println("Parallel range forkJoinSum done in: "?+ measurePerf(ParallelStreams::parallelRangedSum,?10_000_000L) +?" msecs"?);
????}
????public?static??long?measurePerf(Function f, T input) ?{
????????long?fastest = Long.MAX_VALUE;
????????for?(int?i =?0; i 10; i++) {
????????????long?start = System.nanoTime();
????????????R result = f.apply(input);
????????????long?duration = (System.nanoTime() - start) /?1_000_000;
????????????System.out.println("Result: "?+ result);
????????????if?(duration < fastest) fastest = duration;
????????}
????????return?fastest;
????}
} 共享變量修改的問題
public?static?long?sideEffectSum(long?n)?{
????Accumulator accumulator =?new?Accumulator();
????LongStream.rangeClosed(1, n).forEach(accumulator::add);
????return?accumulator.total;
}
public?static?long?sideEffectParallelSum(long?n)?{
????Accumulator accumulator =?new?Accumulator();
????LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
????return?accumulator.total;
}
public?static?class?Accumulator?{
????private?long?total =?0;
????public?void?add(long?value)?{
????????total += value;
????}
}順序執(zhí)行每次輸出的結(jié)果都是:50000005000000,而并行執(zhí)行的結(jié)果卻五花八門了。這是因?yàn)槊看卧L問 totle 都會(huì)存在數(shù)據(jù)競(jìng)爭(zhēng),關(guān)于數(shù)據(jù)競(jìng)爭(zhēng)的原因,大家可以看看關(guān)于 volatile 的博客。因此當(dāng)代碼中存在修改共享變量的操作時(shí),是不建議使用并行流的。
并行流的使用注意
在并行流的使用上有下面幾點(diǎn)需要注意:
對(duì)于較少的數(shù)據(jù)量,不建議使用并行流 容易拆分成塊的流數(shù)據(jù),建議使用并行流 以下是一些常見的集合框架對(duì)應(yīng)流的可拆分性能表

來源:xxxxxx
版權(quán)申明:內(nèi)容來源網(wǎng)絡(luò),版權(quán)歸原創(chuàng)者所有。除非無法確認(rèn),我們都會(huì)標(biāo)明作者及出處,如有侵權(quán)煩請(qǐng)告知,我們會(huì)立即刪除并表示歉意。謝謝!

