摘要:類似的你可以用將并行流變?yōu)轫樞蛄?。中的使用順序求和并行求和將流轉(zhuǎn)為并行流配置并行流線程池并行流內(nèi)部使用了默認的,默認的線程數(shù)量就是處理器的數(shù)量包括虛擬內(nèi)核通過得到。
【概念
并行流就是一個把內(nèi)容分成多個數(shù)據(jù)塊,并用不同的線程分別處理每一個數(shù)據(jù)塊的流。在java7之前,并行處理數(shù)據(jù)很麻煩,第一,需要明確的把包含數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)分成若干子部分。第二,給每一個子部分分配一個獨立的線程。第三,適當?shù)臅r候進行同步,避免出現(xiàn)數(shù)據(jù)競爭帶來的問題,最后將每一個子部分的結(jié)果合并。在java7中引入了forkjoin框架來完成這些步驟,而java8中的stream接口可以讓你不費吹灰之力就對數(shù)據(jù)執(zhí)行并行處理,而stream接口幕后正是使用的forkjoin框架。不過,對順序流調(diào)用parallel()并不意味著流本身有任何的變化。它在內(nèi)部實際上就是設(shè)了一個boolean標志,表示你想讓parallel()之后的操作都并行執(zhí)行。類似的你可以用sequential()將并行流變?yōu)轫樞蛄?。這兩個方法可以讓我們更細化的控制流。
eg.java8中stream的使用:
//順序求和 public static long sum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) .reduce(0l,Long::sum); } //并行求和 public static long parallelSum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) //將流轉(zhuǎn)為并行流 .parallel() .reduce(0l,Long::sum); }【配置并行流線程池
并行流內(nèi)部使用了默認的forkjoinPool,默認的線程數(shù)量就是處理器的數(shù)量(包括虛擬內(nèi)核),
通過:Runtime.getRuntime().availableProcessors() 得到。
通過:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12")來改變線程池大小。
我們不應(yīng)該理所當然的任認為多線程比順序執(zhí)行的效率更高,來看下面的例子:
public class Exercise { public static void main(String[] args) { long num = 1000_000_0; long st = System.currentTimeMillis(); System.out.println("iterate順序" + sum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("iterate并行" + parallelSum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("迭代" + forSum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("LongStream并行" + longStreamParallelSum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("LongStream順序" + longStreamSum(num) + ":" +(System.currentTimeMillis() - st)); } //順序求和 public static long sum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) .reduce(0l,Long::sum); } //并行求和 public static long parallelSum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) //將流轉(zhuǎn)為并行流 .parallel() .reduce(0l,Long::sum); } //迭代求和 public static long forSum(long n){ long result = 0; for(long i = 0 ;i <= n ; i++){ result += i; } return result; } //longStream并行 public static long longStreamParallelSum(long n){ return LongStream.rangeClosed(1,n) .parallel() .reduce(0l,Long::sum); } //longStream順序執(zhí)行 public static long longStreamSum(long n){ return LongStream.rangeClosed(1,n) .reduce(0l,Long::sum); } }
并行流執(zhí)行的時間比順序流和迭代執(zhí)行的要長很多,兩個原因:
iterate()生成的是裝箱對象,必須要拆箱才能求和;
iterate()很難分成多個獨立的塊并行運行,因為每次應(yīng)用這個函數(shù)都要依賴前一次的應(yīng)用的結(jié)果。數(shù)字列表在歸納的過程開始時沒有準備好,因而無法有效的把流劃分成小塊來并行處理。但是我們又標記流為并行執(zhí)行,這就給順序執(zhí)行增加了開銷,每一次的求和操作都新開啟了一個線程。
【使用更有針對性的的方法LongStream.rangeClosed():
1. 直接產(chǎn)生long類型數(shù)據(jù),沒有開箱操作 2. 生成數(shù)字范圍,容易拆分成獨立的小塊
由此可見,選擇適當?shù)臄?shù)據(jù)結(jié)構(gòu)往往比并行化算法更重要。并行是有代價的。并行過程需要對流做遞歸劃分,把每個子流的操作分配到不同的線程,然后把這些操作的結(jié)果合并成一個值。但是多核之間移動數(shù)據(jù)的代價比我們想象的要大,所以很重要的一點是保證再內(nèi)核中并行執(zhí)行的工作時間比內(nèi)核之間傳輸數(shù)據(jù)的時間要長。
【正確的使用并行流錯誤使用并行流的首要原因就是使用的算法改變了共享變量的狀態(tài),因為修改共享變量意味著同步,而使用同步方法就會使的并行毫無意義。以下是一些建議:
1. 測試,并行還是順序執(zhí)行最重要的基準就是不停的測試性能。 2. 留意裝箱,自動裝箱,拆箱會大大降低性能,java8提供了LongStream,IntStream,DoubleStream來避免這兩個操作。 3. 有些操作本身就是順序執(zhí)行要率高,例如:limit,findFirst等依賴元素順序的操作。 4. 當執(zhí)行單個任務(wù)的成本高時使用并行,如果單個操作的成本很低,并行執(zhí)行反而會因為開啟線程,標記狀態(tài)等操作使得效率下降。 5. 小量數(shù)據(jù)不適用并行。 6. 考慮流中背后的數(shù)據(jù)結(jié)構(gòu)是否易于分解。ArrayList的拆分效率比LinkedList高得多,因為前者用不著便利就可以平均拆分。另外,range工廠方法的原始類型數(shù)據(jù)流也可以快速分解。以下時流數(shù)據(jù)源的可分解性: - ArrayList:極佳 - LinkedList:差 - IntStream等:極佳 - Stream.iterate:差 - HashSet:好 - TreeSet:好 7. 中間操作改變流的方法,涉及到排序就不適用并行。 8. 終端操作合并流的代價,涉及到排序就不適用并行。【正確的使用并行
高并發(fā)、任務(wù)執(zhí)行時間短的業(yè)務(wù),線程池線程數(shù)可以設(shè)置為CPU核數(shù)+1,減少線程上下文的切換
并發(fā)不高、任務(wù)執(zhí)行時間長的業(yè)務(wù)要區(qū)分開看:
假如是業(yè)務(wù)時間長集中在IO操作上,也就是IO密集型的任務(wù),因為IO操作并不占用CPU,所以不要讓所有的CPU閑下來,可以加大線程池中的線程數(shù)目,讓CPU處理更多的業(yè)務(wù)
假如是業(yè)務(wù)時間長集中在計算操作上,也就是計算密集型任務(wù),這個就沒辦法了,和(1)一樣吧,線程池中的線程數(shù)設(shè)置得少一些,減少線程上下文的切換
并發(fā)高、業(yè)務(wù)執(zhí)行時間長,解決這種類型任務(wù)的關(guān)鍵不在于線程池而在于整體架構(gòu)的設(shè)計,看看這些業(yè)務(wù)里面某些數(shù)據(jù)是否能做緩存是第一步,增加服務(wù)器是第二步,至于線程池的設(shè)置,設(shè)置參考(2)。最后,業(yè)務(wù)執(zhí)行時間長的問題,也可能需要分析一下,看看能不能使用中間件對任務(wù)進行拆分和解耦。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/68095.html
摘要:并行流與目前,我們對集合進行計算有兩種方式并行流而更加的靈活,我們可以配置線程池的大小確保整體的計算不會因為等待而發(fā)生阻塞。 【回顧Future接口 Future接口時java5引入的,設(shè)計初衷是對將來某個時刻會發(fā)生的結(jié)果建模。它建模了一種異步計算,返回了一個執(zhí)行預(yù)算結(jié)果的引用。比如,你去干洗店洗衣服,店員會告訴你什么時候可以來取衣服,而不是讓你一直在干洗店等待。要使用Future只需...
摘要:進程線程與協(xié)程它們都是并行機制的解決方案。選擇是任意性的,并在對實現(xiàn)做出決定時發(fā)生。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執(zhí)行異常而結(jié)束,那么線程池會補充一個新線程。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求。 并發(fā)與并行的概念 并發(fā)(Concurrency): 問題域中的概念—— 程序需要被設(shè)計成能夠處理多個同時(或者幾乎同時)發(fā)生的事件 并行(Parallel...
摘要:關(guān)于三者的一些概括總結(jié)離線分析框架,適合離線的復(fù)雜的大數(shù)據(jù)處理內(nèi)存計算框架,適合在線離線快速的大數(shù)據(jù)處理流式計算框架,適合在線的實時的大數(shù)據(jù)處理我是一個以架構(gòu)師為年之內(nèi)目標的小小白。 整理自《架構(gòu)解密從分布式到微服務(wù)》第七章——聊聊分布式計算.做了相應(yīng)補充和修改。 [TOC] 前言 不管是網(wǎng)絡(luò)、內(nèi)存、還是存儲的分布式,它們最終目的都是為了實現(xiàn)計算的分布式:數(shù)據(jù)在各個計算機節(jié)點上流動,同...
摘要:限制編寫并行流,存在一些與非并行流不一樣的約定。底層框架并行流在底層沿用的框架,遞歸式的分解問題,然后每段并行執(zhí)行,最終由合并結(jié)果,返回最后的值。 本書第六章的讀書筆記,也是我這個系列的最后一篇讀書筆記。后面7、8、9章分別講的測試、調(diào)試與重構(gòu)、設(shè)計和架構(gòu)的原則以及使用Lambda表達式編寫并發(fā)程序,因為筆記不好整理,就不寫了,感興趣的同學(xué)自己買書來看吧。 并行化流操作 關(guān)于并行與并發(fā)...
摘要:但有一個限制它們不能修改定義的方法的局部變量的內(nèi)容。如前所述,這種限制存在的原因在于局部變量保存在棧上,并且隱式表示它們僅限于其所在線程。 2014年,Oracle發(fā)布了Java8新版本。對于Java來說,這顯然是一個具有里程碑意義的版本。尤其是那函數(shù)式編程的功能,避開了Java那煩瑣的語法所帶來的麻煩。 這可以算是一篇Java8的學(xué)習(xí)筆記。將Java8一些常見的一些特性作了一個概要的...
閱讀 3700·2021-09-27 13:35
閱讀 3644·2019-08-29 17:09
閱讀 2538·2019-08-26 11:30
閱讀 781·2019-08-26 10:32
閱讀 629·2019-08-26 10:23
閱讀 1286·2019-08-26 10:20
閱讀 3244·2019-08-23 15:26
閱讀 3818·2019-08-23 14:33