<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          擁抱 Java 8 并行流:執(zhí)行速度飛起 !

          共 4723字,需瀏覽 10分鐘

           ·

          2020-12-26 16:57

          作者:后青春期的Keats

          地址:https://www.cnblogs.com/keatsCoder/p/12934394.html

          前言

          在 Java7 之前,如果想要并行處理一個(gè)集合,我們需要以下幾步 1. 手動(dòng)分成幾部分 2. 為每部分創(chuàng)建線程 3. 在適當(dāng)?shù)臅r(shí)候合并。并且還需要關(guān)注多個(gè)線程之間共享變量的修改問(wèn)題。而 Java8 為我們提供了并行流,可以一鍵開啟并行模式。是不是很酷呢?讓我們來(lái)看看吧

          并行流

          認(rèn)識(shí)和開啟并行流

          什么是并行流: 并行流就是將一個(gè)流的內(nèi)容分成多個(gè)數(shù)據(jù)塊,并用不同的線程分別處理每個(gè)不同數(shù)據(jù)塊的流。例如有這么一個(gè)需求:

          有一個(gè) List 集合,而 list 中每個(gè) apple 對(duì)象只有重量,我們也知道 apple 的單價(jià)是 5元/kg,現(xiàn)在需要計(jì)算出每個(gè) apple 的單價(jià),傳統(tǒng)的方式是這樣:

          List?appleList?=?new?ArrayList<>();?//?假裝數(shù)據(jù)是從庫(kù)里查出來(lái)的

          for?(Apple?apple?:?appleList)?{
          ????apple.setPrice(5.0?*?apple.getWeight()?/?1000);
          }

          我們通過(guò)迭代器遍歷 list 中的 apple 對(duì)象,完成了每個(gè) apple 價(jià)格的計(jì)算。而這個(gè)算法的時(shí)間復(fù)雜度是 O(list.size()) 隨著 list 大小的增加,耗時(shí)也會(huì)跟著線性增加。并行流

          可以大大縮短這個(gè)時(shí)間。并行流處理該集合的方法如下:

          appleList.parallelStream().forEach(apple?->?apple.setPrice(5.0?*?apple.getWeight()?/?1000));

          和普通流的區(qū)別是這里調(diào)用的 parallelStream() 方法。當(dāng)然也可以通過(guò) stream.parallel() 將普通流轉(zhuǎn)換成并行流。并行流也能通過(guò) sequential() 方法轉(zhuǎn)換為順序流,但要注意:流的并行和順序轉(zhuǎn)換不會(huì)對(duì)流本身做任何實(shí)際的變化,僅僅是打了個(gè)標(biāo)記而已。并且在一條流水線上對(duì)流進(jìn)行多次并行 / 順序的轉(zhuǎn)換,生效的是最后一次的方法調(diào)用

          并行流如此方便,它的線程從那里來(lái)呢?有多少個(gè)?怎么配置呢?

          并行流內(nèi)部使用了默認(rèn)的 ForkJoinPool 線程池。默認(rèn)的線程數(shù)量就是處理器的核心數(shù),而配置系統(tǒng)核心屬性:java.util.concurrent.ForkJoinPool.common.parallelism 可以改變線程池大小。不過(guò)該值是全局變量。改變他會(huì)影響所有并行流。目前還無(wú)法為每個(gè)流配置專屬的線程數(shù)。一般來(lái)說(shuō)采用處理器核心數(shù)是不錯(cuò)的選擇

          測(cè)試并行流的性能

          為了更容易的測(cè)試性能,我們?cè)诿看斡?jì)算完蘋果價(jià)格后,讓線程睡 1s,表示在這期間執(zhí)行了其他 IO 相關(guān)的操作,并輸出程序執(zhí)行耗時(shí),順序執(zhí)行的耗時(shí):

          public?static?void?main(String[]?args)?throws?InterruptedException?{
          ????List?appleList?=?initAppleList();

          ????Date?begin?=?new?Date();
          ????for?(Apple?apple?:?appleList)?{
          ????????apple.setPrice(5.0?*?apple.getWeight()?/?1000);
          ????????Thread.sleep(1000);
          ????}
          ????Date?end?=?new?Date();
          ????log.info("蘋果數(shù)量:{}個(gè), 耗時(shí):{}s",?appleList.size(),?(end.getTime()?-?begin.getTime())?/1000);
          }
          Snipaste_2020-05-21_21-49-44

          并行版本

          List?appleList?=?initAppleList();

          Date?begin?=?new?Date();
          appleList.parallelStream().forEach(apple?->
          ???????????????????????????????????{
          ???????????????????????????????????????apple.setPrice(5.0?*?apple.getWeight()?/?1000);
          ???????????????????????????????????????try?{
          ???????????????????????????????????????????Thread.sleep(1000);
          ???????????????????????????????????????}?catch?(InterruptedException?e)?{
          ???????????????????????????????????????????e.printStackTrace();
          ???????????????????????????????????????}
          ???????????????????????????????????}
          ??????????????????????????????????);
          Date?end?=?new?Date();
          log.info("蘋果數(shù)量:{}個(gè), 耗時(shí):{}s",?appleList.size(),?(end.getTime()?-?begin.getTime())?/1000);

          耗時(shí)情況

          Snipaste_2020-05-21_22-16-08

          跟我們的預(yù)測(cè)一致,我的電腦是 四核I5 處理器,開啟并行后四個(gè)處理器每人執(zhí)行一個(gè)線程,最后 1s 完成了任務(wù)!

          并行流可以隨便用嗎?

          可拆分性影響流的速度

          通過(guò)上面的測(cè)試,有的人會(huì)輕易得到一個(gè)結(jié)論:并行流很快,我們可以完全放棄 foreach/fori/iter 外部迭代,使用 Stream 提供的內(nèi)部迭代來(lái)實(shí)現(xiàn)了。事實(shí)真的是這樣嗎?并行流真的如此完美嗎?答案當(dāng)然是否定的。大家可以復(fù)制下面的代碼,在自己的電腦上測(cè)試。測(cè)試完后可以發(fā)現(xiàn),并行流并不總是最快的處理方式。

          1. 對(duì)于 iterate 方法來(lái)處理的前 n 個(gè)數(shù)字來(lái)說(shuō),不管并行與否,它總是慢于循環(huán)的,非并行版本可以理解為流化操作沒有循環(huán)更偏向底層導(dǎo)致的慢??刹⑿邪姹臼菫槭裁绰??這里有兩個(gè)需要注意的點(diǎn):

            1. iterate 生成的是裝箱的對(duì)象,必須拆箱成數(shù)字才能求和

            2. 我們很難把 iterate 分成多個(gè)獨(dú)立的塊來(lái)并行執(zhí)行

              這個(gè)問(wèn)題很有意思,我們必須意識(shí)到某些流操作比其他操作更容易并行化。對(duì)于 iterate 來(lái)說(shuō),每次應(yīng)用這個(gè)函數(shù)都要依賴于前一次應(yīng)用的結(jié)果。因此在這種情況下,我們不僅不能有效的將流劃分成小塊處理。反而還因?yàn)椴⑿谢俅卧黾恿碎_支。

          2. 而對(duì)于 LongStream.rangeClosed() 方法來(lái)說(shuō),就不存在 iterate 的第兩個(gè)痛點(diǎn)了。它生成的是基本類型的值,不用拆裝箱操作,另外它可以直接將要生成的數(shù)字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 這樣四部分。因此并行狀態(tài)下的 rangeClosed() 是快于 for 循環(huán)外部迭代的

          package?lambdasinaction.chap7;

          import?java.util.stream.*;

          public?class?ParallelStreams?{

          ????public?static?long?iterativeSum(long?n)?{
          ????????long?result?=?0;
          ????????for?(long?i?=?0;?i?<=?n;?i++)?{
          ????????????result?+=?i;
          ????????}
          ????????return?result;
          ????}

          ????public?static?long?sequentialSum(long?n)?{
          ????????return?Stream.iterate(1L,?i?->?i?+?1).limit(n).reduce(Long::sum).get();
          ????}

          ????public?static?long?parallelSum(long?n)?{
          ????????return?Stream.iterate(1L,?i?->?i?+?1).limit(n).parallel().reduce(Long::sum).get();
          ????}

          ????public?static?long?rangedSum(long?n)?{
          ????????return?LongStream.rangeClosed(1,?n).reduce(Long::sum).getAsLong();
          ????}

          ????public?static?long?parallelRangedSum(long?n)?{
          ????????return?LongStream.rangeClosed(1,?n).parallel().reduce(Long::sum).getAsLong();
          ????}

          }
          package?lambdasinaction.chap7;

          import?java.util.concurrent.*;
          import?java.util.function.*;

          public?class?ParallelStreamsHarness?{

          ????public?static?final?ForkJoinPool?FORK_JOIN_POOL?=?new?ForkJoinPool();

          ????public?static?void?main(String[]?args)?{
          ????????System.out.println("Iterative?Sum?done?in:?"?+?measurePerf(ParallelStreams::iterativeSum,?10_000_000L)?+?"?msecs");
          ????????System.out.println("Sequential?Sum?done?in:?"?+?measurePerf(ParallelStreams::sequentialSum,?10_000_000L)?+?"?msecs");
          ????????System.out.println("Parallel?forkJoinSum?done?in:?"?+?measurePerf(ParallelStreams::parallelSum,?10_000_000L)?+?"?msecs"?);
          ????????System.out.println("Range?forkJoinSum?done?in:?"?+?measurePerf(ParallelStreams::rangedSum,?10_000_000L)?+?"?msecs");
          ????????System.out.println("Parallel?range?forkJoinSum?done?in:?"?+?measurePerf(ParallelStreams::parallelRangedSum,?10_000_000L)?+?"?msecs"?);
          ????}

          ????public?static??long?measurePerf(Function?f,?T?input)?{
          ????????long?fastest?=?Long.MAX_VALUE;
          ????????for?(int?i?=?0;?i?10;?i++)?{
          ????????????long?start?=?System.nanoTime();
          ????????????R?result?=?f.apply(input);
          ????????????long?duration?=?(System.nanoTime()?-?start)?/?1_000_000;
          ????????????System.out.println("Result:?"?+?result);
          ????????????if?(duration?????????}
          ????????return?fastest;
          ????}
          }

          共享變量修改的問(wèn)題

          并行流雖然輕易的實(shí)現(xiàn)了多線程,但是仍未解決多線程中共享變量的修改問(wèn)題。下面代碼中存在共享變量 total,分別使用順序流和并行流計(jì)算前n個(gè)自然數(shù)的和

          public?static?long?sideEffectSum(long?n)?{
          ????Accumulator?accumulator?=?new?Accumulator();
          ????LongStream.rangeClosed(1,?n).forEach(accumulator::add);
          ????return?accumulator.total;
          }

          public?static?long?sideEffectParallelSum(long?n)?{
          ????Accumulator?accumulator?=?new?Accumulator();
          ????LongStream.rangeClosed(1,?n).parallel().forEach(accumulator::add);
          ????return?accumulator.total;
          }

          public?static?class?Accumulator?{
          ????private?long?total?=?0;

          ????public?void?add(long?value)?{
          ????????total?+=?value;
          ????}
          }

          順序執(zhí)行每次輸出的結(jié)果都是:50000005000000,而并行執(zhí)行的結(jié)果卻五花八門了。這是因?yàn)槊看卧L問(wèn) totle 都會(huì)存在數(shù)據(jù)競(jìng)爭(zhēng),關(guān)于數(shù)據(jù)競(jìng)爭(zhēng)的原因,大家可以看看關(guān)于 volatile 的博客。因此當(dāng)代碼中存在修改共享變量的操作時(shí),是不建議使用并行流的。

          并行流的使用注意

          在并行流的使用上有下面幾點(diǎn)需要注意:

          • 盡量使用 LongStream / IntStream / DoubleStream 等原始數(shù)據(jù)流代替 Stream 來(lái)處理數(shù)字,以避免頻繁拆裝箱帶來(lái)的額外開銷

          • 要考慮流的操作流水線的總計(jì)算成本,假設(shè) N 是要操作的任務(wù)總數(shù),Q 是每次操作的時(shí)間。N * Q 就是操作的總時(shí)間,Q 值越大就意味著使用并行流帶來(lái)收益的可能性越大

            例如:前端傳來(lái)幾種類型的資源,需要存儲(chǔ)到數(shù)據(jù)庫(kù)。每種資源對(duì)應(yīng)不同的表。我們可以視作類型數(shù)為 N,存儲(chǔ)數(shù)據(jù)庫(kù)的網(wǎng)絡(luò)耗時(shí) + 插入操作耗時(shí)為 Q。一般情況下網(wǎng)絡(luò)耗時(shí)都是比較大的。因此該操作就比較適合并行處理。當(dāng)然當(dāng)類型數(shù)目大于核心數(shù)時(shí),該操作的性能提升就會(huì)打一定的折扣了。更好的優(yōu)化方法在日后的博客會(huì)為大家奉上

          • 對(duì)于較少的數(shù)據(jù)量,不建議使用并行流

          • 容易拆分成塊的流數(shù)據(jù),建議使用并行流

          以下是一些常見的集合框架對(duì)應(yīng)流的可拆分性能表

          可拆分性
          ArrayList極佳
          LinkedList
          IntStream.range極佳
          Stream.iterate
          HashSet
          TreeSet

          END


          有熱門推薦?

          1.?從Nginx、Apache工作原理看為什么Nginx比Apache高效!

          2.?用Java實(shí)現(xiàn)天天酷跑(附源碼),這個(gè)真的有點(diǎn)強(qiáng)了!

          3.?網(wǎng)易游戲基于 Flink 的流式 ETL 建設(shè)

          4.?用了3年CAT,這次我想選擇SkyWalking,老板反手就是一個(gè)贊!

          最近面試BAT,整理一份面試資料Java面試BATJ通關(guān)手冊(cè),覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫(kù)、數(shù)據(jù)結(jié)構(gòu)等等。

          獲取方式:點(diǎn)“在看”,關(guān)注公眾號(hào)并回復(fù)?Java?領(lǐng)取,更多內(nèi)容陸續(xù)奉上。

          文章有幫助的話,在看,轉(zhuǎn)發(fā)吧。

          謝謝支持喲 (*^__^*)

          瀏覽 49
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美精品一一色哟哟 | 无码A区| 逼特逼视频在线 | 日韩免费中文字幕 | 日韩一区二区三区黄片 |