<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          擁抱 Java 8 并行流吧,讓執(zhí)行速度飛起!

          共 4370字,需瀏覽 9分鐘

           ·

          2020-12-31 05:09

          作者:后青春期的Keats

          cnblogs.com/keatsCoder/p/12934394.html

          前言

          在 Java7 之前,如果想要并行處理一個集合,我們需要以下幾步

          1. 手動分成幾部分
          2. 為每部分創(chuàng)建線程
          3. 在適當(dāng)?shù)臅r候合并。

          并且還需要關(guān)注多個線程之間共享變量的修改問題。而 Java8 為我們提供了并行流,可以一鍵開啟并行模式。是不是很酷呢?讓我們來看看吧

          并行流

          認(rèn)識和開啟并行流

          什么是并行流: 并行流就是將一個流的內(nèi)容分成多個數(shù)據(jù)塊,并用不同的線程分別處理每個不同數(shù)據(jù)塊的流。例如有這么一個需求:

          有一個 List?集合,而 list 中每個 apple 對象只有重量,我們也知道 apple 的單價是 5元/kg,現(xiàn)在需要計算出每個 apple 的單價

          傳統(tǒng)的方式是這樣:

          List?appleList?=?new?ArrayList<>();?//?假裝數(shù)據(jù)是從庫里查出來的

          for?(Apple?apple?:?appleList)?{
          ????apple.setPrice(5.0?*?apple.getWeight()?/?1000);
          }

          我們通過迭代器遍歷 list 中的 apple 對象,完成了每個 apple 價格的計算。而這個算法的時間復(fù)雜度是 O(list.size()) 隨著 list 大小的增加,耗時也會跟著線性增加。并行流

          可以大大縮短這個時間。并行流處理該集合的方法如下:

          appleList.parallelStream().forEach(apple?->?apple.setPrice(5.0?*?apple.getWeight()?/?1000));

          和普通流的區(qū)別是這里調(diào)用的?parallelStream()?方法。當(dāng)然也可以通過 stream.parallel() 將普通流轉(zhuǎn)換成并行流。并行流也能通過 sequential() 方法轉(zhuǎn)換為順序流。

          但要注意:流的并行和順序轉(zhuǎn)換不會對流本身做任何實際的變化,僅僅是打了個標(biāo)記而已。并且在一條流水線上對流進(jìn)行多次并行 / 順序的轉(zhuǎn)換,生效的是最后一次的方法調(diào)用

          并行流如此方便,它的線程從那里來呢?有多少個?怎么配置呢?

          并行流內(nèi)部使用了默認(rèn)的 ForkJoinPool 線程池。默認(rèn)的線程數(shù)量就是處理器的核心數(shù),而配置系統(tǒng)核心屬性:java.util.concurrent.ForkJoinPool.common.parallelism 可以改變線程池大小。

          不過該值是全局變量。改變他會影響所有并行流。目前還無法為每個流配置專屬的線程數(shù)。一般來說采用處理器核心數(shù)是不錯的選擇

          測試并行流的性能

          為了更容易的測試性能,我們在每次計算完蘋果價格后,讓線程睡 1s,表示在這期間執(zhí)行了其他 IO 相關(guān)的操作,并輸出程序執(zhí)行耗時,順序執(zhí)行的耗時:

          public?static?void?main(String[]?args)?throws?InterruptedException?{
          ????List?appleList?=?initAppleList();

          ????Date?begin?=?new?Date();
          ????for?(Apple?apple?:?appleList)?{
          ????????apple.setPrice(5.0?*?apple.getWeight()?/?1000);
          ????????Thread.sleep(1000);
          ????}
          ????Date?end?=?new?Date();
          ????log.info("蘋果數(shù)量:{}個, 耗時:{}s",?appleList.size(),?(end.getTime()?-?begin.getTime())?/1000);
          }


          并行版本

          List?appleList?=?initAppleList();

          Date?begin?=?new?Date();
          appleList.parallelStream().forEach(apple?->
          ????{
          ??????apple.setPrice(5.0?*?apple.getWeight()?/?1000);
          ??????try?{
          ????????????Thread.sleep(1000);
          ??????????}?catch?(InterruptedException?e)?{
          ??????????????e.printStackTrace();
          ??????????}
          ?????}
          ?);
          Date?end?=?new?Date();
          log.info("蘋果數(shù)量:{}個, 耗時:{}s",?appleList.size(),?(end.getTime()?-?begin.getTime())?/1000);

          耗時情況


          跟我們的預(yù)測一致,我的電腦是 四核I5 處理器,開啟并行后四個處理器每人執(zhí)行一個線程,最后 1s 完成了任務(wù)!

          并行流可以隨便用嗎?

          可拆分性影響流的速度

          通過上面的測試,有的人會輕易得到一個結(jié)論:并行流很快,我們可以完全放棄 foreach/fori/iter 外部迭代,使用 Stream 提供的內(nèi)部迭代來實現(xiàn)了。

          事實真的是這樣嗎?并行流真的如此完美嗎?答案當(dāng)然是否定的。大家可以復(fù)制下面的代碼,在自己的電腦上測試。測試完后可以發(fā)現(xiàn),并行流并不總是最快的處理方式。

          1.對于 iterate 方法來處理的前 n 個數(shù)字來說,不管并行與否,它總是慢于循環(huán)的,非并行版本可以理解為流化操作沒有循環(huán)更偏向底層導(dǎo)致的慢。 可并行版本是為什么慢呢?這里有兩個需要注意的點:

          1. iterate 生成的是裝箱的對象,必須拆箱成數(shù)字才能求和

          2. 我們很難把 iterate 分成多個獨立的塊來并行執(zhí)行

          這個問題很有意思,我們必須意識到某些流操作比其他操作更容易并行化。對于 iterate 來說,每次應(yīng)用這個函數(shù)都要依賴于前一次應(yīng)用的結(jié)果。

          因此在這種情況下,我們不僅不能有效的將流劃分成小塊處理。反而還因為并行化再次增加了開支。

          2.而對于 LongStream.rangeClosed() 方法來說,就不存在 iterate 的第兩個痛點了。 它生成的是基本類型的值,不用拆裝箱操作,另外它可以直接將要生成的數(shù)字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 這樣四部分。因此并行狀態(tài)下的 rangeClosed() 是快于 for 循環(huán)外部迭代的

          package?lambdasinaction.chap7;

          import?java.util.stream.*;

          public?class?ParallelStreams?{

          ????public?static?long?iterativeSum(long?n)?{
          ????????long?result?=?0;
          ????????for?(long?i?=?0;?i?<=?n;?i++)?{
          ????????????result?+=?i;
          ????????}
          ????????return?result;
          ????}

          ????public?static?long?sequentialSum(long?n)?{
          ????????return?Stream.iterate(1L,?i?->?i?+?1).limit(n).reduce(Long::sum).get();
          ????}

          ????public?static?long?parallelSum(long?n)?{
          ????????return?Stream.iterate(1L,?i?->?i?+?1).limit(n).parallel().reduce(Long::sum).get();
          ????}

          ????public?static?long?rangedSum(long?n)?{
          ????????return?LongStream.rangeClosed(1,?n).reduce(Long::sum).getAsLong();
          ????}

          ????public?static?long?parallelRangedSum(long?n)?{
          ????????return?LongStream.rangeClosed(1,?n).parallel().reduce(Long::sum).getAsLong();
          ????}

          }

          ~

          package?lambdasinaction.chap7;

          import?java.util.concurrent.*;
          import?java.util.function.*;

          public?class?ParallelStreamsHarness?{

          ????public?static?final?ForkJoinPool?FORK_JOIN_POOL?=?new?ForkJoinPool();

          ????public?static?void?main(String[]?args)?{
          ????????System.out.println("Iterative?Sum?done?in:?"?+?measurePerf(ParallelStreams::iterativeSum,?10_000_000L)?+?"?msecs");
          ????????System.out.println("Sequential?Sum?done?in:?"?+?measurePerf(ParallelStreams::sequentialSum,?10_000_000L)?+?"?msecs");
          ????????System.out.println("Parallel?forkJoinSum?done?in:?"?+?measurePerf(ParallelStreams::parallelSum,?10_000_000L)?+?"?msecs"?);
          ????????System.out.println("Range?forkJoinSum?done?in:?"?+?measurePerf(ParallelStreams::rangedSum,?10_000_000L)?+?"?msecs");
          ????????System.out.println("Parallel?range?forkJoinSum?done?in:?"?+?measurePerf(ParallelStreams::parallelRangedSum,?10_000_000L)?+?"?msecs"?);
          ????}

          ????public?static??long?measurePerf(Function?f,?T?input)?{
          ????????long?fastest?=?Long.MAX_VALUE;
          ????????for?(int?i?=?0;?i?10;?i++)?{
          ????????????long?start?=?System.nanoTime();
          ????????????R?result?=?f.apply(input);
          ????????????long?duration?=?(System.nanoTime()?-?start)?/?1_000_000;
          ????????????System.out.println("Result:?"?+?result);
          ????????????if?(duration?????????}
          ????????return?fastest;
          ????}
          }

          共享變量修改的問題

          并行流雖然輕易的實現(xiàn)了多線程,但是仍未解決多線程中共享變量的修改問題。下面代碼中存在共享變量 total,分別使用順序流和并行流計算前n個自然數(shù)的和

          public?static?long?sideEffectSum(long?n)?{
          ????Accumulator?accumulator?=?new?Accumulator();
          ????LongStream.rangeClosed(1,?n).forEach(accumulator::add);
          ????return?accumulator.total;
          }

          public?static?long?sideEffectParallelSum(long?n)?{
          ????Accumulator?accumulator?=?new?Accumulator();
          ????LongStream.rangeClosed(1,?n).parallel().forEach(accumulator::add);
          ????return?accumulator.total;
          }

          public?static?class?Accumulator?{
          ????private?long?total?=?0;

          ????public?void?add(long?value)?{
          ????????total?+=?value;
          ????}
          }

          順序執(zhí)行每次輸出的結(jié)果都是:50000005000000,而并行執(zhí)行的結(jié)果卻五花八門了。這是因為每次訪問 totle 都會存在數(shù)據(jù)競爭,關(guān)于數(shù)據(jù)競爭的原因,大家可以看看關(guān)于 volatile 的博客。因此當(dāng)代碼中存在修改共享變量的操作時,是不建議使用并行流的。

          并行流的使用注意

          在并行流的使用上有下面幾點需要注意:

          1.盡量使用 LongStream / IntStream / DoubleStream 等原始數(shù)據(jù)流代替 Stream 來處理數(shù)字,以避免頻繁拆裝箱帶來的額外開銷

          2.要考慮流的操作流水線的總計算成本 ,假設(shè) N 是要操作的任務(wù)總數(shù),Q 是每次操作的時間。N * Q 就是操作的總時間,Q 值越大就意味著使用并行流帶來收益的可能性越大

          例如:前端傳來幾種類型的資源,需要存儲到數(shù)據(jù)庫。每種資源對應(yīng)不同的表。我們可以視作類型數(shù)為 N,存儲數(shù)據(jù)庫的網(wǎng)絡(luò)耗時 + 插入操作耗時為 Q。一般情況下網(wǎng)絡(luò)耗時都是比較大的。因此該操作就比較適合并行處理。當(dāng)然當(dāng)類型數(shù)目大于核心數(shù)時,該操作的性能提升就會打一定的折扣了。更好的優(yōu)化方法在日后的博客會為大家奉上

          3.對于較少的數(shù)據(jù)量,不建議使用并行流

          4.容易拆分成塊的流數(shù)據(jù),建議使用并行流

          以下是一些常見的集合框架對應(yīng)流的可拆分性能表


          可拆分性
          ArrayList極佳
          LinkedList
          IntStream.range極佳
          Stream.iterate
          HashSet
          TreeSet
          瀏覽 55
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美成年网站 | 亚洲最大视频网站 | 777777亚洲成人 | 成人影音a片 | 国产黄片视频 |