系統(tǒng)啟動(dòng)一個(gè)線程的成本是比較高,使用線程池可以很好地提高性能,尤其是當(dāng)程序中需要?jiǎng)?chuàng)建大量生存期很短暫的線程時(shí)
線程池在系統(tǒng)啟動(dòng)時(shí)即創(chuàng)建大量空閑線程,將一個(gè)Runnable、Callable對(duì)象—–>傳給線程池—–>線程池啟動(dòng)里面的一個(gè)線程來(lái)執(zhí)行它們的run()或者call()方法———->當(dāng)線程執(zhí)行體執(zhí)行完成后,線程并不會(huì)死亡,而是再次返回線程池成為空閑狀態(tài),等待下一個(gè)Runnable、Callable對(duì)象的run()或者call()方法
Java8改進(jìn)的線程池 Executors工廠類Java5開(kāi)始,Java內(nèi)建支持線程池。Executors工廠類來(lái)產(chǎn)生線程池,該工廠類包含如下幾個(gè)靜態(tài)工廠方法來(lái)創(chuàng)建線程池:
ExecutorService newCachedThreadPool():創(chuàng)建一個(gè)具有緩存功能的線程池,系統(tǒng)根據(jù)需要?jiǎng)?chuàng)建線程,這些線程將會(huì)被緩存在線程池中
ExecutorService newFixedThreadPool(int nThreads):創(chuàng)建一個(gè)可重用的、具有nThread個(gè)固定的線程的線程池
ExecutorService newSingleThreadExecutor():創(chuàng)建包含一個(gè)只有單線程的線程池,相當(dāng)于調(diào)用newFixedThreadPool(1)
ScheduledExecutorService newScheduledThreadPool(int corePoolSize):創(chuàng)建具有指定線程數(shù)量的線程池,可以在指定延遲后執(zhí)行線程任務(wù)。corePoolSize指池中所保存的線程數(shù),即使線程是空閑的也被保存在線程池內(nèi)
ScheduledExecutorService newSingleThreadScheduledExecutor():創(chuàng)建只有一個(gè)線程的線程池,可以指定延遲后執(zhí)行線程任務(wù)
ExecutorService newWorkStealingPool(int parallelism):創(chuàng)建持有足夠的線程的線程池來(lái)支持給定的并行級(jí)別,該方法還會(huì)使用多個(gè)隊(duì)列來(lái)減少競(jìng)爭(zhēng)
ExecutorService newWorkStealingPool():該方法可以看做是前一個(gè)方法的簡(jiǎn)本,并行級(jí)別不需要用戶手工指定,是根據(jù)計(jì)算機(jī)CPU個(gè)數(shù)自動(dòng)生成的,如果當(dāng)前機(jī)器有6個(gè)CPU,則調(diào)用該方法時(shí)并行級(jí)別被設(shè)為6
前三個(gè)方法返回一個(gè)ExecutorService對(duì)象,代表一個(gè)線程池,可以執(zhí)行Runnable對(duì)象和Callable對(duì)象所代表的線程;中間兩個(gè)方法返回一個(gè)ScheduledExecutorService對(duì)象,它是ExecutorService的子類,可以在指定延遲后執(zhí)行線程任務(wù);最后兩個(gè)方法生成的work stealing池,相當(dāng)于后臺(tái)線程池,如果所有的前臺(tái)線程都死亡了,work stealig池中的線程也會(huì)自動(dòng)死亡
ExecutorServiceExecutorService代表盡快執(zhí)行線程的線程池(只要線程中有空閑線程就立即執(zhí)行線程任務(wù)),程序只要將一個(gè)Runnable對(duì)象或Callable對(duì)象(代表線程任務(wù))提交給該線程池,該線程池就會(huì)盡快執(zhí)行該任務(wù)
ExecutorService里提供了如下3個(gè)方法:
Future> submit(Runnable task):將一個(gè)Runnable對(duì)象提交給指定的線程池,線程池將在有空閑線程時(shí)執(zhí)行Runnable對(duì)象代表的任務(wù)。其中Future對(duì)象代表Runnable任務(wù)的返回值——但是run()方法沒(méi)有返回值,所以Future對(duì)象將在run()方法執(zhí)行結(jié)束后返回null。但可以調(diào)用Future的isDone()、isCancelled()方法來(lái)獲得Runnable對(duì)象的執(zhí)行狀態(tài)
ScheduledExecutorService代表可在指定延遲后或周期性地執(zhí)行線程任務(wù)的線程池,這提供了如下4個(gè)方法:
ScheduledFuture
ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit):指定Command任務(wù)將在delay延遲后執(zhí)行
ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):指定command任務(wù)將在delay延遲后執(zhí)行,而且以設(shè)定頻率重復(fù)執(zhí)行。即initialDelay后開(kāi)始執(zhí)行,依次在initialDelay + period、initialDelay + 2*period...處重復(fù)執(zhí)行
ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay,long delay,TimeUnit unit):創(chuàng)建并執(zhí)行一個(gè)在給定初始延遲后首次啟動(dòng)的定期操作,隨后在每一次執(zhí)行終止和下一次執(zhí)行開(kāi)始之間都存在給定的延遲。如果任務(wù)在任一次執(zhí)行時(shí)遇到異常,就會(huì)取消后續(xù)執(zhí)行,否則,只能通過(guò)程序來(lái)顯式取消或終止該任務(wù)
使用線程池來(lái)執(zhí)行線程任務(wù)當(dāng)用完一個(gè)線程池后,應(yīng)該調(diào)用該線程池的shutdown()方法,該方法將啟動(dòng)線程池的關(guān)閉序列,調(diào)用shutdown()方法后的線程池不再接收新任務(wù),但會(huì)將以前所有已提交的任務(wù)執(zhí)行完成。當(dāng)線程池中的所有任務(wù)都執(zhí)行完成后,池中的所有線程都會(huì)死亡;另外也可以調(diào)用線程池的shutdownNow()方法來(lái)關(guān)閉線程池,該方法試圖停止所有正在執(zhí)行的活動(dòng)任務(wù),暫停處理正在等待的任務(wù),并返回等待的任務(wù)列表
使用線程池執(zhí)行線程任務(wù)的步驟如下:
調(diào)用Executor類的靜態(tài)工廠方法創(chuàng)建一個(gè)ExecutorService對(duì)象,該對(duì)象代表一個(gè)線程池
創(chuàng)建Runnable或Callable接口實(shí)現(xiàn)類的實(shí)例,作為線程執(zhí)行任務(wù)
調(diào)用ExcutorService的submit方法來(lái)提交Runnable或Callable實(shí)例
當(dāng)不想提交任何任務(wù)時(shí)調(diào)用ExcutorService的shutdown()方法來(lái)關(guān)閉線程池
import java.util.concurrent.*; public class ThreadPoolTest { public static void main(String[] args) throws Exception { // 創(chuàng)建足夠的線程來(lái)支持4個(gè)CPU并行的線程池 // 創(chuàng)建一個(gè)具有固定線程數(shù)(6)的線程池 ExecutorService pool = Executors.newFixedThreadPool(6); // 使用Lambda表達(dá)式創(chuàng)建Runnable對(duì)象 Runnable target = () -> { for (int i = 0; i < 100 ; i++ ) { System.out.println(Thread.currentThread().getName() + "的i值為:" + i); } }; // 向線程池中提交兩個(gè)線程 pool.submit(target); pool.submit(target); // 關(guān)閉線程池 pool.shutdown(); } }Java8增強(qiáng)的ForkJoinPool
Java7提供了ForkJoinPool支持將一個(gè)任務(wù)拆分成多個(gè)“小任務(wù)”并行計(jì)算,再把多個(gè)“小任務(wù)”的結(jié)果合并成總的計(jì)算結(jié)果。ForkJoinPool是ExecutorService的實(shí)現(xiàn)類,是一種特殊的線程池
ForkJoinService提供了如下兩個(gè)常用的構(gòu)造器:
ForkJoinPool(int parallelism):創(chuàng)建一個(gè)包含parallelism個(gè)并行線程的ForkJoinPool
ForkJoinPool():以Runtime.availableProcessors()方法的返回值作為parallelism參數(shù)來(lái)創(chuàng)建ForkJoinPool
Jav增加通用池功能,由如下兩個(gè)靜態(tài)方法提供通用池功能:
ForkJoinPoll commonPool():該方法返回一個(gè)通用池,通用池的運(yùn)行狀態(tài)不受shutDown()或shutdownNow()方法的影響。但如果程序直接調(diào)用System.exit(0);來(lái)終止虛擬機(jī),通用池以及通用池中正在執(zhí)行的任務(wù)都會(huì)被自動(dòng)終止
int getCommonPollParallelism():該方法返回通用池的并行級(jí)別
創(chuàng)建ForkJoinPool實(shí)例可調(diào)用ForkJoinPool的submit(ForkJoinTask task)或invoke(ForkJoinTask task)方法來(lái)執(zhí)行指定任務(wù)了。其中ForkJoinTask代表一個(gè)可以并行、合并的任務(wù)
ForkJoinTask是一個(gè)抽象類,它有兩個(gè)抽象子類:RecursiveAction和RecursiveTask。其中RecursiveTask代表有返回值的任務(wù),RecursiveAction代表沒(méi)有返回值的任務(wù)
線程池工具類的類圖:
簡(jiǎn)單打印0~500的數(shù)值:
import java.util.concurrent.*; // 繼承RecursiveAction來(lái)實(shí)現(xiàn)"可分解"的任務(wù) class PrintTask extends RecursiveAction { // 每個(gè)“小任務(wù)”只最多只打印50個(gè)數(shù) private static final int THRESHOLD = 50; private int start; private int end; // 打印從start到end的任務(wù) public PrintTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { // 當(dāng)end與start之間的差小于THRESHOLD時(shí),開(kāi)始打印 if(end - start < THRESHOLD) { for (int i = start ; i < end ; i++ ) { System.out.println(Thread.currentThread().getName() + "的i值:" + i); } } else { // 如果當(dāng)end與start之間的差大于THRESHOLD時(shí),即要打印的數(shù)超過(guò)50個(gè) // 將大任務(wù)分解成兩個(gè)小任務(wù)。 int middle = (start + end) / 2; PrintTask left = new PrintTask(start, middle); PrintTask right = new PrintTask(middle, end); // 并行執(zhí)行兩個(gè)“小任務(wù)” left.fork(); right.fork(); } } } public class ForkJoinPoolTest { public static void main(String[] args) throws Exception { ForkJoinPool pool = new ForkJoinPool(); // 提交可分解的PrintTask任務(wù) pool.submit(new PrintTask(0 , 500)); pool.awaitTermination(2, TimeUnit.SECONDS); // 關(guān)閉線程池 pool.shutdown(); } }
程序?qū)崿F(xiàn)了對(duì)指定打印任務(wù)的分解,分解后的任務(wù)分別調(diào)用fork()方法開(kāi)始并行執(zhí)行。ForkJoinPool啟動(dòng)了4個(gè)線程來(lái)執(zhí)行打印任務(wù)
對(duì)一個(gè)長(zhǎng)度為100的數(shù)值的元素值進(jìn)行累加:
import java.util.concurrent.*; import java.util.*; // 繼承RecursiveTask來(lái)實(shí)現(xiàn)"可分解"的任務(wù) class CalTask extends RecursiveTask{ // 每個(gè)“小任務(wù)”只最多只累加20個(gè)數(shù) private static final int THRESHOLD = 20; private int arr[]; private int start; private int end; // 累加從start到end的數(shù)組元素 public CalTask(int[] arr, int start, int end) { this.arr = arr; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; // 當(dāng)end與start之間的差小于THRESHOLD時(shí),開(kāi)始進(jìn)行實(shí)際累加 if(end - start < THRESHOLD) { for (int i = start ; i < end ; i++ ) { sum += arr[i]; } return sum; } else { // 如果當(dāng)end與start之間的差大于THRESHOLD時(shí),即要累加的數(shù)超過(guò)20個(gè)時(shí) // 將大任務(wù)分解成兩個(gè)小任務(wù)。 int middle = (start + end) / 2; CalTask left = new CalTask(arr, start, middle); CalTask right = new CalTask(arr, middle, end); // 并行執(zhí)行兩個(gè)“小任務(wù)” left.fork(); right.fork(); // 把兩個(gè)“小任務(wù)”累加的結(jié)果合并起來(lái) return left.join() + right.join(); // ① } } } public class Sum { public static void main(String[] args) throws Exception { int[] arr = new int[100]; Random rand = new Random(); int total = 0; // 初始化100個(gè)數(shù)字元素 for (int i = 0, len = arr.length; i < len ; i++ ) { int tmp = rand.nextInt(20); // 對(duì)數(shù)組元素賦值,并將數(shù)組元素的值添加到sum總和中。 total += (arr[i] = tmp); } System.out.println(total); // 創(chuàng)建一個(gè)通用池 ForkJoinPool pool = ForkJoinPool.commonPool(); // 提交可分解的CalTask任務(wù) Future future = pool.submit(new CalTask(arr, 0, arr.length)); System.out.println(future.get()); // 關(guān)閉線程池 pool.shutdown(); } }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/66740.html
摘要:中的線程池是運(yùn)用場(chǎng)景最多的并發(fā)框架。才是真正的線程池。存放任務(wù)的隊(duì)列存放需要被線程池執(zhí)行的線程隊(duì)列。所以線程池的所有任務(wù)完成后,它最終會(huì)收縮到的大小。飽和策略一般情況下,線程池采用的是,表示無(wú)法處理新任務(wù)時(shí)拋出異常。 Java線程池 1. 簡(jiǎn)介 系統(tǒng)啟動(dòng)一個(gè)新線程的成本是比較高的,因?yàn)樗婕芭c操作系統(tǒng)的交互,這個(gè)時(shí)候使用線程池可以提升性能,尤其是需要?jiǎng)?chuàng)建大量聲明周期很短暫的線程時(shí)。Ja...
摘要:當(dāng)活動(dòng)線程核心線程非核心線程達(dá)到這個(gè)數(shù)值后,后續(xù)任務(wù)將會(huì)根據(jù)來(lái)進(jìn)行拒絕策略處理。線程池工作原則當(dāng)線程池中線程數(shù)量小于則創(chuàng)建線程,并處理請(qǐng)求。當(dāng)線程池中的數(shù)量等于最大線程數(shù)時(shí)默默丟棄不能執(zhí)行的新加任務(wù),不報(bào)任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點(diǎn)記錄以及采用的解決方案 深入分析 java 線程池的實(shí)現(xiàn)原理 在這篇文章中,作者有條不紊的將 ja...
摘要:四種線程池的使用介紹的弊端及四種線程池的使用,線程池的作用線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。相比,提供的四種線程池的好處在于重用存在的線程,減少對(duì)象創(chuàng)建消亡的開(kāi)銷(xiāo),性能佳。延遲執(zhí)行描述創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。 java 四種線程池的使用 介紹new Thread的弊端及Java四種線程池的使用 1,線程池的作用 線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。 ...
摘要:高并發(fā)系列第篇文章。簡(jiǎn)單的說(shuō),在使用了線程池之后,創(chuàng)建線程變成了從線程池中獲取一個(gè)空閑的線程,然后使用,關(guān)閉線程變成了將線程歸還到線程池。如果調(diào)用了線程池的方法,線程池會(huì)提前把核心線程都創(chuàng)造好,并啟動(dòng)線程池允許創(chuàng)建的最大線程數(shù)。 java高并發(fā)系列第18篇文章。 本文主要內(nèi)容 什么是線程池 線程池實(shí)現(xiàn)原理 線程池中常見(jiàn)的各種隊(duì)列 自定義線程創(chuàng)建的工廠 常見(jiàn)的飽和策略 自定義飽和策略 ...
摘要:中的線程池運(yùn)用場(chǎng)景非常廣泛,幾乎所有的一步或者并發(fā)執(zhí)行程序都可以使用。代碼中如果執(zhí)行了方法,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程。線程池最大數(shù)量線程池允許創(chuàng)建的線程最大數(shù)量。被稱為是可重用固定線程數(shù)的線程池。 Java中的線程池運(yùn)用場(chǎng)景非常廣泛,幾乎所有的一步或者并發(fā)執(zhí)行程序都可以使用。那么線程池有什么好處呢,以及他的實(shí)現(xiàn)原理是怎么樣的呢? 使用線程池的好處 在開(kāi)發(fā)過(guò)程中,合理的使用線程...
閱讀 3921·2021-10-12 10:11
閱讀 3720·2021-09-13 10:27
閱讀 2605·2019-08-30 15:53
閱讀 2044·2019-08-29 18:33
閱讀 2260·2019-08-29 14:03
閱讀 1061·2019-08-29 13:27
閱讀 3379·2019-08-28 18:07
閱讀 847·2019-08-26 13:23