Java8 中的 Stream 那么強大,那你知道它的原理是什么嗎?

作者:歲月安然
elsef.com/2019/09/16/Java8中Stream的原理分析
Java 8 API添加了一個新的抽象稱為流Stream,可以讓你以一種聲明的方式處理數(shù)據(jù)。 Stream 使用一種類似用 SQL 語句從數(shù)據(jù)庫查詢數(shù)據(jù)的直觀方式來提供一種對 Java 集合運算和表達的高階抽象。 Stream API可以極大提高Java程序員的生產(chǎn)力,讓程序員寫出高效率、干凈、簡潔的代碼。 本文會對Stream的實現(xiàn)原理進行剖析。
Stream的組成與特點
Stream(流)是一個來自數(shù)據(jù)源的元素隊列并支持聚合操作:元素是特定類型的對象,形成一個隊列。?
Java中的Stream并_不會_向集合那樣存儲和管理元素,而是按需計算數(shù)據(jù)源流的來源可以是集合
Collection、數(shù)組Array、I/O channel, 產(chǎn)生器generator?等聚合操作類似
SQL語句一樣的操作, 比如filter,?map,?reduce,?find,?match,?sorted等
Collection操作不同, Stream操作還有兩個基礎(chǔ)的特征:Pipelining: 中間操作都會返回流對象本身。這樣多個操作可以串聯(lián)成一個管道, 如同流式風格(fluent style)。這樣做可以對操作進行優(yōu)化, 比如延遲執(zhí)行(laziness evaluation)和短路(?short-circuiting)內(nèi)部迭代:以前對集合遍歷都是通過Iterator或者For-Each的方式, 顯式的在集合外部進行迭代, 這叫做外部迭代。?Stream提供了內(nèi)部迭代的方式, 通過訪問者模式 (Visitor)實現(xiàn)。
Stream?可以并行化操作,迭代器只能命令式地、串行化操作。顧名思義,當使用串行方式去遍歷時,每個?item?讀完后再讀下一個 item。而使用并行去遍歷時,數(shù)據(jù)會被分成多個段,其中每一個都在不同的線程中處理,然后將結(jié)果一起輸出。Stream?的并行操作依賴于?Java7?中引入的?Fork/Join?框架(JSR166y)來拆分任務(wù)和加速處理過程。Java?的并行 API 演變歷程基本如下:1.0-1.4 中的 java.lang.Thread 5.0 中的 java.util.concurrent 6.0 中的 Phasers 等 7.0 中的 Fork/Join 框架 8.0 中的 Lambda
Stream具有平行處理能力,處理的過程會分而治之,也就是將一個大任務(wù)切分成多個小任務(wù),這表示每個任務(wù)都是一個操作:List ?numbers?=?Arrays.asList(1,?2,?3,?4,?5,?6,?7,?8,?9);
numbers.parallelStream()
???????.forEach(out::println);?
forEachOrdered方法執(zhí)行終止操作:List ?numbers?=?Arrays.asList(1,?2,?3,?4,?5,?6,?7,?8,?9);
numbers.parallelStream()
???????.forEachOrdered(out::println);??
Stream接口的相關(guān)知識。BaseStream接口
Stream的父接口是BaseStream,后者是所有流實現(xiàn)的頂層接口,定義如下:public?interface?BaseStream<T,?S?extends?BaseStream<T,?S>>
????????extends?AutoCloseable?{
????Iterator?iterator() ;
????Spliterator?spliterator() ;
????boolean?isParallel();
????S?sequential();
????S?parallel();
????S?unordered();
????S?onClose(Runnable?closeHandler);
????void?close();
}
T為流中元素的類型,S為一個BaseStream的實現(xiàn)類,它里面的元素也是T并且S同樣是自己:S extends BaseStream
S的使用就知道了:如sequential()、parallel()這兩個方法,它們都返回了S實例,也就是說它們分別支持對當前流進行串行或者并行的操作,并返回「改變」后的流對象。如果是 并行一定涉及到對當前流的拆分,即將一個流拆分成多個子流,子流肯定和父流的類型是一致的。子流可以繼續(xù)拆分子流,一直拆分下去…
S是BaseStream的一個實現(xiàn)類,它同樣是一個流,比如Stream、IntStream、LongStream等。Stream接口
Stream的接口聲明:public?interface?Stream ?extends?BaseStream >?
Stream可以繼續(xù)拆分為Stream,我們可以通過它的一些方法來證實:Stream ?filter(Predicate?super?T>?predicate) ;?Stream ?map(Function?super?T,???extends?R>?mapper) ;?Stream ?flatMap(Function?super?T,???extends?Stream?extends?R>>?mapper) ;
Stream?sorted() ;
Stream?peek(Consumer?super?T>?action) ;
Stream?limit(long?maxSize) ;
Stream?skip(long?n) ;
...
中間操作,它們的返回結(jié)果必須是流對象本身。關(guān)閉流操作
AutoCloseable?接口,也就是?close()?方法會在流關(guān)閉時被調(diào)用。同時,BaseStream?中還給我們提供了onClose()方法:/**?*?Returns?an?equivalent?stream?with?an?additional?close?handler.?Close?*?handlers?are?run?when?the?{@link?#close()}?method?*?is?called?on?the?stream,?and?are?executed?in?the?order?they?were?*?added.?All?close?handlers?are?run,?even?if?earlier?close?handlers?throw?*?exceptions.?If?any?close?handler?throws?an?exception,?the?first?*?exception?thrown?will?be?relayed?to?the?caller?of?{@code?close()},?with?*?any?remaining?exceptions?added?to?that?exception?as?suppressed?exceptions?*?(unless?one?of?the?remaining?exceptions?is?the?same?exception?as?the?*?first?exception,?since?an?exception?cannot?suppress?itself.)?May?*?return?itself.?*?*? This?is?an?intermediate?*?operation.?*?*?@param?closeHandler?A?task?to?execute?when?the?stream?is?closed?*?@return?a?stream?with?a?handler?that?is?run?if?the?stream?is?closed?*/
S?onClose(Runnable?closeHandler);
AutoCloseable的close()接口被調(diào)用的時候會觸發(fā)調(diào)用流對象的onClose()方法,但有幾點需要注意:onClose()?方法會返回流對象本身,也就是說可以對改對象進行多次調(diào)用如果調(diào)用了多個
onClose()?方法,它會按照調(diào)用的順序觸發(fā),但是如果某個方法有異常則只會向上拋出第一個異常前一個?
onClose()?方法拋出了異常不會影響后續(xù)?onClose()?方法的使用如果多個?
onClose()?方法都拋出異常,只展示第一個異常的堆棧,而其他異常會被壓縮,只展示部分信息
并行流和串行流
BaseStream接口中分別提供了并行流和串行流兩個方法,這兩個方法可以任意調(diào)用若干次,也可以混合調(diào)用,但最終只會以最后一次方法調(diào)用的返回結(jié)果為準。parallel()方法的說明:Returns an equivalent stream that is parallel. May return itself, either because the stream was already parallel, or because the underlying stream state was modified to be parallel.
parallel()為準,最終是并行地計算sum:stream.parallel()
???.filter(...)
???.sequential()
???.map(...)
???.parallel()
???.sum();
ParallelStream背后的男人:ForkJoinPool
兩個500 萬的排序任務(wù)和一個針對這兩組500萬數(shù)據(jù)的合并任務(wù)。問題的關(guān)鍵在于,對于一個任務(wù)而言,只有當它所有的子任務(wù)完成之后,它才能夠被執(zhí)行,想象一下歸并排序的過程。
使用ForkJoinPool時,就能夠讓其中的線程創(chuàng)建新的任務(wù),并掛起當前的任務(wù),此時線程就能夠從隊列中選擇子任務(wù)執(zhí)行。每個工作線程都有自己的工作隊列WorkQueue; 這是一個雙端隊列dequeue,它是線程私有的; ForkJoinTask中fork的子任務(wù),將放入運行該任務(wù)的工作線程的隊頭,工作線程將以LIFO的順序來處理工作隊列中的任務(wù),即堆棧的方式; 為了最大化地利用CPU,空閑的線程將從其它線程的隊列中「竊取」任務(wù)來執(zhí)行 但是是從工作隊列的尾部竊取任務(wù),以減少和隊列所屬線程之間的競爭; 雙端隊列的操作:push()/pop()僅在其所有者工作線程中調(diào)用,poll()是由其它線程竊取任務(wù)時調(diào)用的; 當只剩下最后一個任務(wù)時,還是會存在競爭,是通過CAS來實現(xiàn)的;
用ForkJoinPool的眼光來看ParallelStream
List ?userInfoList?=
????????DaoContainers.getUserInfoDAO().queryAllByList(new?UserInfoModel());
userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);
forEach方法會為每個元素的計算操作創(chuàng)建一個任務(wù),該任務(wù)會被前文中提到的ForkJoinPool中的commonPool處理。以上的并行計算邏輯當然也可以使用ThreadPoolExecutor完成,但是就代碼的可讀性和代碼量而言,使用ForkJoinPool明顯更勝一籌。ForkJoinPool通用線程池的線程數(shù)量,通常使用默認值就可以了,即運行時計算機的處理器數(shù)量。也可以通過設(shè)置系統(tǒng)屬性:-Djava.util.concurrent .ForkJoinPool.common.parallelism=N?(N為線程數(shù)量),來調(diào)整ForkJoinPool的線程數(shù)量。N+1,1就是當前的主線程。public?static?String?query(String?question)?{
??List?engines?=?new?ArrayList ();
??engines.add("http://www.google.com/?q=");
??engines.add("http://duckduckgo.com/?q=");
??engines.add("http://www.bing.com/search?q=");
???
??//?get?element?as?soon?as?it?is?available
??Optional?result?=?engines.stream().parallel().map((base)?-?{
????String?url?=?base?+?question;
????//?open?connection?and?fetch?the?result
????return?WS.url(url).get();
??}).findAny();
??return?result.get();
}
這個并行流計算操作將由主線程和JVM默認的
ForkJoinPool.commonPool()來共同執(zhí)行。map中是一個阻塞方法,需要通過訪問HTTP接口并得到它的response,所以任何一個worker線程在執(zhí)行到這里的時候都會阻塞并等待結(jié)果。所以當此時再其他地方通過并行流方式調(diào)用計算方法的時候,將會受到此處阻塞等待的方法的影響。
目前的
ForkJoinPool的實現(xiàn)并未考慮補償?shù)却切┳枞诘却律傻木€程的工作worker線程,所以最終ForkJoinPool.commonPool()中的線程將備用光并且阻塞等待。
正如我們上面那個列子的情況分析得知,lambda的執(zhí)行并不是瞬間完成的,所有使用parallel streams的程序都有可能成為阻塞程序的源頭, 并且在執(zhí)行過程中程序中的其他部分將無法訪問這些workers,這意味著任何依賴parallel streams的程序在什么別的東西占用著common ForkJoinPool時將會變得不可預(yù)知并且暗藏危機。
當需要處理遞歸分治算法時,考慮使用ForkJoinPool。
仔細設(shè)置不再進行任務(wù)劃分的閾值,這個閾值對性能有影響。
Java 8中的一些特性會使用到ForkJoinPool中的通用線程池。在某些場合下,需要調(diào)整該線程池的默認的線程數(shù)量
lambda應(yīng)該盡量避免副作用,也就是說,避免突變基于堆的狀態(tài)以及任何IO
lambda應(yīng)該互不干擾,也就是說避免修改數(shù)據(jù)源(因為這可能帶來線程安全的問題)
避免訪問在流操作生命周期內(nèi)可能會改變的狀態(tài)
并行流的性能
數(shù)據(jù)大小:數(shù)據(jù)夠大,每個管道處理時間夠長,并行才有意義;
源數(shù)據(jù)結(jié)構(gòu):每個管道操作都是基于初始數(shù)據(jù)源,通常是集合,將不同的集合數(shù)據(jù)源分割會有一定消耗;
裝箱:處理基本類型比裝箱類型要快;
核的數(shù)量:默認情況下,核數(shù)量越多,底層fork/join線程池啟動線程就越多;
單元處理開銷:花在流中每個元素身上的時間越長,并行操作帶來的性能提升越明顯;
性能好:
ArrayList、數(shù)組或IntStream.range(數(shù)據(jù)支持隨機讀取,能輕易地被任意分割)性能一般:
HashSet、TreeSet(數(shù)據(jù)不易公平地分解,大部分也是可以的)性能差:
LinkedList(需要遍歷鏈表,難以對半分解)、Stream.iterate和BufferedReader.lines(長度未知,難以分解)
NQ模型
SHA-1?哈希值要少得多。為每個元素完成的工作越多,“大到足夠利用并行性” 的閾值就越低。類似地,擁有的數(shù)據(jù)越多, 拆分的分段就越多,而不會與 “太小” 閾值發(fā)生沖突。NQ?模型,其中?N?是數(shù)據(jù)元素數(shù)量,Q?是為每個元素執(zhí)行的工作量。乘積?N*Q?越大,就越有可能獲得并行提速。對于具有很小的?Q?的問題,比如對數(shù)字求和,您通常可能希望看到?N > 10,000?以獲得提速;隨著?Q?增加,獲得提速所需的數(shù)據(jù)大小將會減小。Q?更高的操作來緩解。盡管拆分某個?LinkedList?特征的結(jié)果可能很糟糕,但只要擁有足夠大的?Q,仍然可能獲得并行提速。遇到順序
ORDERED?描述了流是否有有意義的遇到順序。JDK 集合的?spliterator?會根據(jù)集合的規(guī)范來設(shè)置此標志;一些中間操作可能注入?ORDERED?(sorted()) 或清除它 (unordered())。reduce())),遵守遇到順序不會產(chǎn)生任何實際成本。但對于其他操作(有狀態(tài)中間操作,其語義與遇到順序關(guān)聯(lián)的終止操作,比如?findFirst()?或?forEachOrdered()), 在并行執(zhí)行中遵守遇到順序的責任可能很重大。如果流有一個已定義的遇到順序,但該順序?qū)Y(jié)果沒有意義, 那么可以通過使用?unordered()?操作刪除?ORDERED?標志,加速包含順序敏感型操作的管道的順序執(zhí)行。limit(),它會在指定大小處截斷一個流。在順序執(zhí)行中實現(xiàn)?limit()?很簡單:保留一個已看到多少元素的計數(shù)器,在這之后丟棄任何元素。但是在并行執(zhí)行中,實現(xiàn)?limit()?要復(fù)雜得多;您需要保留前?N?個元素。此要求大大限制了利用并行性的能力;如果輸入劃分為多個部分,您只有在某個部分之前的所有部分都已完成后,才知道該部分的結(jié)果是否將包含在最終結(jié)果中。因此,該實現(xiàn)一般會錯誤地選擇不使用所有可用的核心,或者緩存整個試驗性結(jié)果,直到您達到目標長度。limit()?操作可以自由選擇任何?N?個元素,這讓執(zhí)行效率變得高得多。知道元素后可立即將其發(fā)往下游, 無需任何緩存,而且線程之間唯一需要執(zhí)行的協(xié)調(diào)是發(fā)送一個信號來確保未超出目標流長度。sorted()?操作會實現(xiàn)一種穩(wěn)定 排序 (相同的元素按照它們進入輸入時的相同順序出現(xiàn)在輸出中),而對于無序的流,穩(wěn)定性(具有成本)不是必需的。?distinct()?具有類似的情況:如果流有一個遇到順序,那么對于多個相同的輸入元素,distinct()?必須發(fā)出其中的第一個, 而對于無序的流,它可以發(fā)出任何元素 — 同樣可以獲得高效得多的并行實現(xiàn)。collect()?聚合時會遇到類似的情形。如果在無序流上執(zhí)行?collect(groupingBy()) 操作, 與任何鍵對應(yīng)的元素都必須按它們在輸入中出現(xiàn)的順序提供給下游收集器。此順序?qū)?yīng)用程序通常沒有什么意義,而且任何順序都沒有意義。在這些情況下,可能最好選擇一個并發(fā) 收集器(比如?groupingByConcurrent()),它可以忽略遇到順序, 并讓所有線程直接收集到一個共享的并發(fā)數(shù)據(jù)結(jié)構(gòu)中(比如?ConcurrentHashMap),而不是讓每個線程收集到它自己的中間映射中, 然后再合并中間映射(這可能產(chǎn)生很高的成本)。什么時候該使用并行流
parallelStream的使用注意事項需要格外注意,它并不是解決性能的萬金油,相反,如果使用不當會嚴重影響性能。我會在另外一篇文章里單獨談這個問題。
逆鋒起筆是一個專注于程序員圈子的技術(shù)平臺,你可以收獲最新技術(shù)動態(tài)、最新內(nèi)測資格、BAT等大廠的經(jīng)驗、精品學習資料、職業(yè)路線、副業(yè)思維,微信搜索逆鋒起筆關(guān)注!
References
http://movingon.cn/2017/05/02/jdk8-Stream-BaseStream-%E6%BA%90%E7%A0%81%E9%9A%BE%E7%82%B9%E6%B5%85%E6%9E%901/ https://www.jianshu.com/p/bd825cb89e00 https://jrebel.com/rebellabs/java-parallel-streams-are-bad-for-your-health/ https://blog.csdn.net/weixx3/article/details/81266552 https://www.ibm.com/developerworks/cn/java/j-java-streams-5-brian-goetz/index.html https://www.ibm.com/developerworks/cn/java/j-java-streams-3-brian-goetz/index.html https://juejin.im/post/5dc5a148f265da4d4f65c191 https://stackoverrun.com/cn/q/10341100
推薦好文
Java 17:和遺留 25 年的漏洞 Say Goodbye
