摘要:最后,我們會(huì)通過(guò)對(duì)源代碼的剖析深入了解線程池的運(yùn)行過(guò)程和具體設(shè)計(jì),真正達(dá)到知其然而知其所以然的水平。創(chuàng)建線程池既然線程池是一個(gè)類,那么最直接的使用方法一定是一個(gè)類的對(duì)象,例如。單線程線程池單線程線程
我們一般不會(huì)選擇直接使用線程類Thread進(jìn)行多線程編程,而是使用更方便的線程池來(lái)進(jìn)行任務(wù)的調(diào)度和管理。線程池就像共享單車,我們只要在我們有需要的時(shí)候去獲取就可以了。甚至可以說(shuō)線程池更棒,我們只需要把任務(wù)提交給它,它就會(huì)在合適的時(shí)候運(yùn)行了。但是如果直接使用Thread類,我們就需要在每次執(zhí)行任務(wù)時(shí)自己創(chuàng)建、運(yùn)行、等待線程了,而且很難對(duì)線程進(jìn)行整體的管理,這可不是一件輕松的事情。既然我們已經(jīng)有了線程池,那還是把這些麻煩事交給線程池來(lái)處理吧。
這篇文章將會(huì)從線程池的概念與一般使用入手,首先讓大家可以了解線程池的基本使用方法,之后會(huì)介紹實(shí)踐中最常用的四種線程池。最后,我們會(huì)通過(guò)對(duì)JDK源代碼的剖析深入了解線程池的運(yùn)行過(guò)程和具體設(shè)計(jì),真正達(dá)到知其然而知其所以然的水平。雖然只要了解了API就可以滿足一般的日常使用了,但是只有當(dāng)我們真正厘清了多線程相關(guān)的知識(shí)點(diǎn),才能在面對(duì)多線程的實(shí)踐與面試問(wèn)題時(shí)做到游刃有余、成竹在胸。
本文是一系列多線程文章中的第三篇,主要講解了線程池相關(guān)的知識(shí),這個(gè)系列總共有十篇文章,前五篇暫定結(jié)構(gòu)如下,感興趣的讀者可以關(guān)注一下:
并發(fā)基本概念——當(dāng)我們?cè)谡f(shuō)“并發(fā)、多線程”,說(shuō)的是什么?
多線程入門——這一次,讓我們完全掌握J(rèn)ava多線程(2/10)
線程池使用與原理剖析——本文
線程同步機(jī)制
并發(fā)常見(jiàn)問(wèn)題
線程池的使用方法一般我們最常用的線程池實(shí)現(xiàn)類是ThreadPoolExecutor,我們接下來(lái)會(huì)介紹這個(gè)類的基本使用方法。JDK已經(jīng)對(duì)線程池做了比較好的封裝,相信這個(gè)過(guò)程會(huì)非常輕松。
創(chuàng)建線程池既然線程池是一個(gè)Java類,那么最直接的使用方法一定是new一個(gè)ThreadPoolExecutor類的對(duì)象,例如ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue
下面就是這個(gè)構(gòu)造器的方法簽名:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue)
各個(gè)參數(shù)分別表示下面的含義:
corePoolSize,核心線程池大小,一般線程池會(huì)至少保持這么多的線程數(shù)量;
maximumPoolSize,最大線程池大小,也就是線程池最大的線程數(shù)量;
keepAliveTime和unit共同組成了一個(gè)超時(shí)間,keepAliveTime是時(shí)間數(shù)量,unit是時(shí)間單位,單位加數(shù)量組成了最終的超時(shí)時(shí)間。這個(gè)超時(shí)時(shí)間表示如果線程池中包含了超過(guò)corePoolSize數(shù)量的線程,則在有線程空閑的時(shí)間超過(guò)了超時(shí)時(shí)間時(shí)該線程就會(huì)被銷毀;
workQueue是任務(wù)的阻塞隊(duì)列,在沒(méi)有線程池中沒(méi)有足夠的線程可用的情況下會(huì)將任務(wù)先放入到這個(gè)阻塞隊(duì)列中等待執(zhí)行。這里傳入的隊(duì)列類型就決定了線程池在處理這些任務(wù)時(shí)的策略。
線程池中的阻塞隊(duì)列專門用于存放待執(zhí)行的任務(wù),在ThreadPoolExecutor中一個(gè)任務(wù)可以通過(guò)兩種方式被執(zhí)行:第一種是直接在創(chuàng)建一個(gè)新的Worker時(shí)被作為第一個(gè)任務(wù)傳入,由這個(gè)新創(chuàng)建的線程來(lái)執(zhí)行;第二種就是把任務(wù)放入一個(gè)阻塞隊(duì)列,等待線程池中的工作線程撈取任務(wù)進(jìn)行執(zhí)行。
上面提到的阻塞隊(duì)列是這樣的一種數(shù)據(jù)結(jié)構(gòu),它是一個(gè)隊(duì)列(類似于一個(gè)List),可以存放0到N個(gè)元素。我們可以對(duì)這個(gè)隊(duì)列進(jìn)行插入和彈出元素的操作,彈出操作可以理解為是一個(gè)獲取并從隊(duì)列中刪除一個(gè)元素的操作。當(dāng)隊(duì)列中沒(méi)有元素時(shí),對(duì)這個(gè)隊(duì)列的獲取操作將會(huì)被阻塞,直到有元素被插入時(shí)才會(huì)被喚醒;當(dāng)隊(duì)列已滿時(shí),對(duì)這個(gè)隊(duì)列的插入操作將會(huì)被阻塞,直到有元素被彈出后才會(huì)被喚醒。這樣的一種數(shù)據(jù)結(jié)構(gòu)非常適合于線程池的場(chǎng)景,當(dāng)一個(gè)工作線程沒(méi)有任務(wù)可處理時(shí)就會(huì)進(jìn)入阻塞狀態(tài),直到有新任務(wù)提交后才被喚醒。
提交任務(wù)當(dāng)創(chuàng)建了一個(gè)線程池之后我們就可以將任務(wù)提交到線程池中執(zhí)行了。提交任務(wù)到線程池中相當(dāng)簡(jiǎn)單,我們只要把原來(lái)傳入Thread類構(gòu)造器的Runnable對(duì)象傳入線程池的execute方法或者submit方法就可以了。execute方法和submit方法基本沒(méi)有區(qū)別,兩者的區(qū)別只是submit方法會(huì)返回一個(gè)Future對(duì)象,用于檢查異步任務(wù)的執(zhí)行情況和獲取執(zhí)行結(jié)果(異步任務(wù)完成后)。
我們可以先試試如何使用比較簡(jiǎn)單的execute方法,代碼例子如下:
public class ThreadPoolTest { private static int count = 0; public static void main(String[] args) throws Exception { Runnable task = new Runnable() { public void run() { for (int i = 0; i < 1000000; ++i) { synchronized (ThreadPoolTest.class) { count += 1; } } } }; // 重要:創(chuàng)建線程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue關(guān)閉線程池()); // 重要:向線程池提交兩個(gè)任務(wù) threadPool.execute(task); threadPool.execute(task); // 等待線程池中的所有任務(wù)完成 threadPool.shutdown(); while (!threadPool.awaitTermination(1L, TimeUnit.MINUTES)) { System.out.println("Not yet. Still waiting for termination"); } System.out.println("count = " + count); } }
上面的代碼中為了等待線程池中的所有任務(wù)執(zhí)行完已經(jīng)使用了shutdown()方法,關(guān)閉線程池的方法主要有兩個(gè):
shutdown(),有序關(guān)閉線程池,調(diào)用后線程池會(huì)讓已經(jīng)提交的任務(wù)完成執(zhí)行,但是不會(huì)再接受新任務(wù)。
shutdownNow(),直接關(guān)閉線程池,線程池中正在運(yùn)行的任務(wù)會(huì)被中斷,正在等待執(zhí)行的任務(wù)不會(huì)再被執(zhí)行,但是這些還在阻塞隊(duì)列中等待的任務(wù)會(huì)被作為返回值返回。
監(jiān)控線程池運(yùn)行狀態(tài)我們可以通過(guò)調(diào)用線程池對(duì)象上的一些方法來(lái)獲取線程池當(dāng)前的運(yùn)行信息,常用的方法有:
getTaskCount,線程池中已完成、執(zhí)行中、等待執(zhí)行的任務(wù)總數(shù)估計(jì)值。因?yàn)樵诮y(tǒng)計(jì)過(guò)程中任務(wù)會(huì)發(fā)生動(dòng)態(tài)變化,所以最后的結(jié)果并不是一個(gè)準(zhǔn)確值;
getCompletedTaskCount,線程池中已完成的任務(wù)總數(shù),這同樣是一個(gè)估計(jì)值;
getLargestPoolSize,線程池曾經(jīng)創(chuàng)建過(guò)的最大線程數(shù)量。通過(guò)這個(gè)數(shù)據(jù)可以知道線程池是否充滿過(guò),也就是達(dá)到過(guò)maximumPoolSize;
getPoolSize,線程池當(dāng)前的線程數(shù)量;
getActiveCount,當(dāng)前線程池中正在執(zhí)行任務(wù)的線程數(shù)量估計(jì)值。
四種常用線程池很多情況下我們也不會(huì)直接創(chuàng)建ThreadPoolExecutor類的對(duì)象,而是根據(jù)需要通過(guò)Executors的幾個(gè)靜態(tài)方法來(lái)創(chuàng)建特定用途的線程池。目前常用的線程池有四種:
可緩存線程池,使用Executors.newCachedThreadPool方法創(chuàng)建
定長(zhǎng)線程池,使用Executors.newFixedThreadPool方法創(chuàng)建
延時(shí)任務(wù)線程池,使用Executors.newScheduledThreadPool方法創(chuàng)建
單線程線程池,使用Executors.newSingleThreadExecutor方法創(chuàng)建
下面通過(guò)這些靜態(tài)方法的源碼來(lái)具體了解一下不同類型線程池的特性與適用場(chǎng)景。
可緩存線程池JDK中的源碼我們通過(guò)在IDE中進(jìn)行跳轉(zhuǎn)可以很方便地進(jìn)行查看,下面就是Executors.newCachedThreadPool方法中的源代碼。從代碼中我們可以看到,可緩存線程池其實(shí)也是通過(guò)直接創(chuàng)建ThreadPoolExecutor類的構(gòu)造器創(chuàng)建的,只是其中的參數(shù)都已經(jīng)被設(shè)置好了,我們可以不用做具體的設(shè)置。所以我們要觀察的重點(diǎn)就是在這個(gè)方法中具體產(chǎn)生了一個(gè)怎樣配置的ThreadPoolExecutor對(duì)象,以及這樣的線程池適用于怎樣的場(chǎng)景。
從下面的代碼中,我們可以看到,傳入ThreadPoolExecutor構(gòu)造器的值有:
- corePoolSize核心線程數(shù)為0,代表線程池中的線程數(shù)可以為0 - maximumPoolSize最大線程數(shù)為Integer.MAX_VALUE,代表線程池中最多可以有無(wú)限多個(gè)線程 - 超時(shí)時(shí)間設(shè)置為60秒,表示線程池中的線程在空閑60秒后會(huì)被回收 - 最后傳入的是一個(gè)`SynchronousQueue`類型的阻塞隊(duì)列,代表每一個(gè)新添加的任務(wù)都要馬上有一個(gè)工作線程進(jìn)行處理
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
所以可緩存線程池在添加任務(wù)時(shí)會(huì)優(yōu)先使用空閑的線程,如果沒(méi)有就創(chuàng)建一個(gè)新線程,線程數(shù)沒(méi)有上限,所以每一個(gè)任務(wù)都會(huì)馬上被分配到一個(gè)工作線程進(jìn)行執(zhí)行,不需要在阻塞隊(duì)列中等待;如果線程池長(zhǎng)期閑置,那么其中的所有線程都會(huì)被銷毀,節(jié)約系統(tǒng)資源。
優(yōu)點(diǎn)
任務(wù)在添加后可以馬上執(zhí)行,不需要進(jìn)入阻塞隊(duì)列等待
在閑置時(shí)不會(huì)保留線程,可以節(jié)約系統(tǒng)資源
缺點(diǎn)
對(duì)線程數(shù)沒(méi)有限制,可能會(huì)過(guò)量消耗系統(tǒng)資源
適用場(chǎng)景
適用于大量短耗時(shí)任務(wù)和對(duì)響應(yīng)時(shí)間要求較高的場(chǎng)景
定長(zhǎng)線程池傳入ThreadPoolExecutor構(gòu)造器的值有:
corePoolSize核心線程數(shù)和maximumPoolSize最大線程數(shù)都為固定值nThreads,即線程池中的線程數(shù)量會(huì)保持在nThreads,所以被稱為“定長(zhǎng)線程池”
超時(shí)時(shí)間被設(shè)置為0毫秒,因?yàn)榫€程池中只有核心線程,所以不需要考慮超時(shí)釋放
最后一個(gè)參數(shù)使用了無(wú)界隊(duì)列,所以在所有線程都在處理任務(wù)的情況下,可以無(wú)限添加任務(wù)到阻塞隊(duì)列中等待執(zhí)行
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
定長(zhǎng)線程池中的線程數(shù)會(huì)逐步增長(zhǎng)到nThreads個(gè),并且在之后空閑線程不會(huì)被釋放,線程數(shù)會(huì)一直保持在nThreads個(gè)。如果添加任務(wù)時(shí)所有線程都處于忙碌狀態(tài),那么就會(huì)把任務(wù)添加到阻塞隊(duì)列中等待執(zhí)行,阻塞隊(duì)列中任務(wù)的總數(shù)沒(méi)有上限。
優(yōu)點(diǎn)
線程數(shù)固定,對(duì)系統(tǒng)資源的消耗可控
缺點(diǎn)
在任務(wù)量暴增的情況下線程池不會(huì)彈性增長(zhǎng),會(huì)導(dǎo)致任務(wù)完成時(shí)間延遲
使用了無(wú)界隊(duì)列,在線程數(shù)設(shè)置過(guò)小的情況下可能會(huì)導(dǎo)致過(guò)多的任務(wù)積壓,引起任務(wù)完成時(shí)間過(guò)晚和資源被過(guò)度消耗的問(wèn)題
適用場(chǎng)景
任務(wù)量峰值不會(huì)過(guò)高,且任務(wù)對(duì)響應(yīng)時(shí)間要求不高的場(chǎng)景
延時(shí)任務(wù)線程池與之前的兩個(gè)方法不同,Executors.newScheduledThreadPool返回的是ScheduledExecutorService接口對(duì)象,可以提供延時(shí)執(zhí)行、定時(shí)執(zhí)行等功能。在線程池配置上有如下特點(diǎn):
maximumPoolSize最大線程數(shù)為無(wú)限,在任務(wù)量較大時(shí)可以創(chuàng)建大量新線程執(zhí)行任務(wù)
超時(shí)時(shí)間為0,線程空閑后會(huì)被立即銷毀
使用了延時(shí)工作隊(duì)列,延時(shí)工作隊(duì)列中的元素都有對(duì)應(yīng)的過(guò)期時(shí)間,只有過(guò)期的元素才會(huì)被彈出
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
延時(shí)任務(wù)線程池實(shí)現(xiàn)了ScheduledExecutorService接口,主要用于需要延時(shí)執(zhí)行和定時(shí)執(zhí)行的情況。
單線程線程池單線程線程池中只有一個(gè)工作線程,可以保證添加的任務(wù)都以指定順序執(zhí)行(先進(jìn)先出、后進(jìn)先出、優(yōu)先級(jí))。但是如果線程池里只有一個(gè)線程,為什么我們還要用線程池而不直接用Thread呢?這種情況下主要有兩種優(yōu)點(diǎn):一是我們可以通過(guò)共享的線程池很方便地提交任務(wù)進(jìn)行異步執(zhí)行,而不用自己管理線程的生命周期;二是我們可以使用任務(wù)隊(duì)列并指定任務(wù)的執(zhí)行順序,很容易做到任務(wù)管理的功能。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue線程池的內(nèi)部實(shí)現(xiàn)())); }
通過(guò)前面的內(nèi)容我們其實(shí)已經(jīng)可以在代碼中使用線程池了,但是我們?yōu)槭裁催€要去深究線程池的內(nèi)部實(shí)現(xiàn)呢?首先,可能有一個(gè)很功利性的目的就是為了面試,在面試時(shí)如果能準(zhǔn)確地說(shuō)出一些底層的運(yùn)行機(jī)制與原理那一定可以成為過(guò)程中一個(gè)重要的亮點(diǎn)。
但是我認(rèn)為學(xué)習(xí)探究線程池的內(nèi)部實(shí)現(xiàn)的作用絕對(duì)不僅是如此,只有深入了解并厘清了線程池的具體實(shí)現(xiàn),我們才能解決實(shí)踐中需要考慮的各種邊界條件。因?yàn)槎嗑€程編程所代表的并發(fā)編程并不是一個(gè)固定的知識(shí)點(diǎn),而是實(shí)踐中不斷在發(fā)展和完善的一個(gè)知識(shí)門類。我們也許會(huì)需要同時(shí)考慮多個(gè)維度,最后得到一個(gè)特定于應(yīng)用場(chǎng)景的解決方案,這就要求我們具備從細(xì)節(jié)著手構(gòu)建出解決方案并做好各個(gè)考慮維度之間的取舍的能力。
而且我相信只要在某一個(gè)點(diǎn)上能突破到相當(dāng)?shù)纳疃?,那么以后從這個(gè)點(diǎn)上向外擴(kuò)展就會(huì)容易得多。也許在剛開(kāi)始我們的探究會(huì)碰到非常大的阻力,但是我們要相信,最后我們可以得到的將不止是一個(gè)知識(shí)點(diǎn)而是一整個(gè)知識(shí)面。
查看JDK源碼的方式在IDE中,例如IDEA里,我們可以點(diǎn)擊我們樣例代碼里的ThreadPoolExecutor類跳轉(zhuǎn)到JDK中ThreadPoolExecutor類的源代碼。在源代碼中我們可以看到很多java.util.concurrent包的締造者大牛“Doug Lea”所留下的各種注釋,下面的圖片就是該類源代碼的一個(gè)截圖。
這些注釋的內(nèi)容非常有參考價(jià)值,建議有能力的讀者朋友可以自己閱讀一遍。下面,我們就一步步地抽絲剝繭,來(lái)揭開(kāi)線程池類ThreadPoolExecutor源代碼的神秘面紗。
控制變量與線程池生命周期在ThreadPoolExecutor類定義的開(kāi)頭,我們可以看到如下的幾行代碼:
// 控制變量,前3位表示狀態(tài),剩下的數(shù)據(jù)位表示有效的線程數(shù) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // Integer的位數(shù)減去3位狀態(tài)位就是線程數(shù)的位數(shù) private static final int COUNT_BITS = Integer.SIZE - 3; // CAPACITY就是線程數(shù)的上限(含),即2^COUNT_BITS - 1個(gè) private static final int CAPACITY = (1 << COUNT_BITS) - 1;
第一行是一個(gè)用來(lái)作為控制變量的整型值,即一個(gè)Integer。之所以要用AtomicInteger類是因?yàn)橐WC多線程安全,在本系列之后的文章中會(huì)對(duì)AtomicInteger進(jìn)行具體介紹。一個(gè)整型一般是32位,但是這里的代碼為了保險(xiǎn)起見(jiàn),還是使用了Integer.SIZE來(lái)表示整型的總位數(shù)。這里的“位”指的是數(shù)據(jù)位(bit),在計(jì)算機(jī)中,8bit = 1字節(jié),1024字節(jié) = 1KB,1024KB = 1MB。每一位都是一個(gè)0或1的數(shù)字,我們?nèi)绻颜拖胂蟪梢粋€(gè)二進(jìn)制(0或1)的數(shù)組,那么一個(gè)Integer就是32個(gè)數(shù)字的數(shù)組。其中,前三個(gè)被用來(lái)表示狀態(tài),那么我們就可以表示2^3 = 8個(gè)不同的狀態(tài)了。剩下的29位二進(jìn)制數(shù)字都會(huì)被用于表示當(dāng)前線程池中有效線程的數(shù)量,上限就是(2^29 - 1)個(gè),即常量CAPACITY。
之后的部分列出了線程池的所有狀態(tài):
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
在這里可以忽略數(shù)字后面的<< COUNT_BITS,可以把狀態(tài)簡(jiǎn)單地理解為前面的數(shù)字部分,這樣的簡(jiǎn)化基本不影響結(jié)論。
各個(gè)狀態(tài)的解釋如下:
RUNNING,正常運(yùn)行狀態(tài),可以接受新的任務(wù)和處理隊(duì)列中的任務(wù)
SHUTDOWN,關(guān)閉中狀態(tài),不能接受新任務(wù),但是可以處理隊(duì)列中的任務(wù)
STOP,停止中狀態(tài),不能接受新任務(wù),也不處理隊(duì)列中的任務(wù),會(huì)中斷進(jìn)行中的任務(wù)
TIDYING,待結(jié)束狀態(tài),所有任務(wù)已經(jīng)結(jié)束,線程數(shù)歸0,進(jìn)入TIDYING狀態(tài)后將會(huì)運(yùn)行terminated()方法
TERMINATED,結(jié)束狀態(tài),terminated()方法調(diào)用完成后進(jìn)入
這幾個(gè)狀態(tài)所對(duì)應(yīng)的數(shù)字值是按照順序排列的,也就是說(shuō)線程池的狀態(tài)只能從小到大變化,這也方便了通過(guò)數(shù)字比較來(lái)判斷狀態(tài)所在的階段,這種通過(guò)數(shù)字大小來(lái)比較狀態(tài)值的方法在ThreadPoolExecutor的源碼中會(huì)有大量的使用。
下圖是這五個(gè)狀態(tài)之間的變化過(guò)程:
當(dāng)線程池被創(chuàng)建時(shí)會(huì)處于RUNNING狀態(tài),正常接受和處理任務(wù);
當(dāng)shutdown()方法被直接調(diào)用,或者在線程池對(duì)象被GC回收時(shí)通過(guò)finalize()方法隱式調(diào)用了shutdown()方法時(shí),線程池會(huì)進(jìn)入SHUTDOWN狀態(tài)。該狀態(tài)下線程池仍然會(huì)繼續(xù)執(zhí)行完阻塞隊(duì)列中的任務(wù),只是不再接受新的任務(wù)了。當(dāng)隊(duì)列中的任務(wù)被執(zhí)行完后,線程池中的線程也會(huì)被回收。當(dāng)隊(duì)列和線程都被清空后,線程池將進(jìn)入TIDYING狀態(tài);
在線程池處于RUNNING或者SHUTDOWN狀態(tài)時(shí),如果有代碼調(diào)用了shutdownNow()方法,則線程池會(huì)進(jìn)入STOP狀態(tài)。在STOP狀態(tài)下,線程池會(huì)直接清空阻塞隊(duì)列中待執(zhí)行的任務(wù),然后中斷所有正在進(jìn)行中的任務(wù)并回收線程。當(dāng)線程都被清空以后,線程池就會(huì)進(jìn)入TIDYING狀態(tài);
當(dāng)線程池進(jìn)入TIDYING狀態(tài)時(shí),將會(huì)運(yùn)行terminated()方法,該方法執(zhí)行完后,線程池就會(huì)進(jìn)入最終的TERMINATED狀態(tài),徹底結(jié)束。
到這里我們就已經(jīng)清楚地了解了線程從剛被創(chuàng)建時(shí)的RUNNING狀態(tài)一直到最終的TERMINATED狀態(tài)的整個(gè)生命周期了。那么當(dāng)我們要向一個(gè)RUNNING狀態(tài)的線程池提交任務(wù)時(shí)會(huì)發(fā)生些什么呢?
execute方法的實(shí)現(xiàn)我們一般會(huì)使用execute方法提交我們的任務(wù),那么線程池在這個(gè)過(guò)程中做了什么呢?在ThreadPoolExecutor類的execute()方法的源代碼中,我們主要做了四件事:
如果當(dāng)前線程池中的線程數(shù)小于核心線程數(shù)corePoolSize,則創(chuàng)建一個(gè)新的Worker代表一個(gè)線程,并把入?yún)⒅械娜蝿?wù)作為第一個(gè)任務(wù)傳入Worker。addWorker方法中的第一個(gè)參數(shù)是該線程的第一個(gè)任務(wù),而第二個(gè)參數(shù)就是代表是否創(chuàng)建的是核心線程,在execute方法中addWorker總共被調(diào)用了三次,其中第一次傳入的是true,后兩次傳入的都是false;
如果當(dāng)前線程池中的線程數(shù)已經(jīng)滿足了核心線程數(shù)corePoolSize,那么就會(huì)通過(guò)workQueue.offer()方法將任務(wù)添加到阻塞隊(duì)列中等待執(zhí)行;
如果線程數(shù)已經(jīng)達(dá)到了corePoolSize且阻塞隊(duì)列中無(wú)法插入該任務(wù)(比如已滿),那么線程池就會(huì)再增加一個(gè)線程來(lái)執(zhí)行該任務(wù),除非線程數(shù)已經(jīng)達(dá)到了最大線程數(shù)maximumPoolSize;
如果確實(shí)已經(jīng)達(dá)到了最大線程數(shù),那么就拒絕這個(gè)任務(wù)。
總體上的執(zhí)行流程如下,下方的黑色同心圓代表流程結(jié)束:
這里再重復(fù)一次阻塞隊(duì)列的定義,方便大家閱讀:
線程池中的阻塞隊(duì)列專門用于存放待執(zhí)行的任務(wù),在ThreadPoolExecutor中一個(gè)任務(wù)可以通過(guò)兩種方式被執(zhí)行:第一種是直接在創(chuàng)建一個(gè)新的Worker時(shí)被作為第一個(gè)任務(wù)傳入,由這個(gè)新創(chuàng)建的線程來(lái)執(zhí)行;第二種就是把任務(wù)放入一個(gè)阻塞隊(duì)列,等待線程池中的工作線程撈取任務(wù)進(jìn)行執(zhí)行。上面提到的阻塞隊(duì)列是這樣的一種數(shù)據(jù)結(jié)構(gòu),它是一個(gè)隊(duì)列(類似于一個(gè)List),可以存放0到N個(gè)元素。我們可以對(duì)這個(gè)隊(duì)列進(jìn)行插入和彈出元素的操作,彈出操作可以理解為是一個(gè)獲取并從隊(duì)列中刪除一個(gè)元素的操作。當(dāng)隊(duì)列中沒(méi)有元素時(shí),對(duì)這個(gè)隊(duì)列的獲取操作將會(huì)被阻塞,直到有元素被插入時(shí)才會(huì)被喚醒;當(dāng)隊(duì)列已滿時(shí),對(duì)這個(gè)隊(duì)列的插入操作將會(huì)被阻塞,直到有元素被彈出后才會(huì)被喚醒。這樣的一種數(shù)據(jù)結(jié)構(gòu)非常適合于線程池的場(chǎng)景,當(dāng)一個(gè)工作線程沒(méi)有任務(wù)可處理時(shí)就會(huì)進(jìn)入阻塞狀態(tài),直到有新任務(wù)提交后才被喚醒。
下面是帶有注釋的源代碼,大家可以和上面的流程對(duì)照起來(lái)參考一下:
public void execute(Runnable command) { // 檢查提交的任務(wù)是否為空 if (command == null) throw new NullPointerException(); // 獲取控制變量值 int c = ctl.get(); // 檢查當(dāng)前線程數(shù)是否達(dá)到了核心線程數(shù) if (workerCountOf(c) < corePoolSize) { // 未達(dá)到核心線程數(shù),則創(chuàng)建新線程 // 并將傳入的任務(wù)作為該線程的第一個(gè)任務(wù) if (addWorker(command, true)) // 添加線程成功則直接返回,否則繼續(xù)執(zhí)行 return; // 因?yàn)榍懊嬲{(diào)用了耗時(shí)操作addWorker方法 // 所以線程池狀態(tài)有可能發(fā)生了改變,重新獲取狀態(tài)值 c = ctl.get(); } // 判斷線程池當(dāng)前狀態(tài)是否是運(yùn)行中 // 如果是則調(diào)用workQueue.offer方法將任務(wù)放入阻塞隊(duì)列 if (isRunning(c) && workQueue.offer(command)) { // 因?yàn)閳?zhí)行了耗時(shí)操作“放入阻塞隊(duì)列”,所以重新獲取狀態(tài)值 int recheck = ctl.get(); // 如果當(dāng)前狀態(tài)不是運(yùn)行中,則將剛才放入阻塞隊(duì)列的任務(wù)拿出,如果拿出成功,則直接拒絕這個(gè)任務(wù) if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) // 如果線程池中沒(méi)有線程了,那就創(chuàng)建一個(gè) addWorker(null, false); } // 如果放入阻塞隊(duì)列失?。ㄈ珀?duì)列已滿),則添加一個(gè)線程 else if (!addWorker(command, false)) // 如果添加線程失?。ㄈ缫呀?jīng)達(dá)到了最大線程數(shù)),則拒絕任務(wù) reject(command); }addWorker方法
在前面execute方法的代碼中我們可以看到線程池是通過(guò)addWorker方法來(lái)向線程池中添加新線程的,那么新的線程又是如何運(yùn)行起來(lái)的呢?
這里我們暫時(shí)跳過(guò)addWorker方法的詳細(xì)源代碼,因?yàn)殡m然這個(gè)方法的代碼行數(shù)較多,但是功能相對(duì)比較直接,只是創(chuàng)建一個(gè)代表線程的Worker類對(duì)象,并調(diào)用這個(gè)對(duì)象所對(duì)應(yīng)線程對(duì)象的start()方法。我們知道一旦調(diào)用了Thread類的start()方法,則這個(gè)線程就會(huì)開(kāi)始調(diào)用創(chuàng)建線程時(shí)傳入的Runnable對(duì)象。從下面的Worker類構(gòu)造器源代碼可以看出,Worker類正是把自己(this指針)傳入了線程的構(gòu)造器當(dāng)中,那么這個(gè)線程就會(huì)運(yùn)行Worker類的run()方法了,這個(gè)run()方法只執(zhí)行了一行很簡(jiǎn)單的代碼runWorker(this);。
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); }runWorker方法的實(shí)現(xiàn)
我們看到線程池中的線程在啟動(dòng)時(shí)會(huì)調(diào)用對(duì)應(yīng)的Worker類的runWorker方法,而這里就是整個(gè)線程池任務(wù)執(zhí)行的核心所在了。runWorker方法中包含有一個(gè)類似無(wú)限循環(huán)的while語(yǔ)句,讓worker對(duì)象可以不斷執(zhí)行提交到線程池中的新任務(wù)。
大家可以配合代碼上帶有的注釋來(lái)理解該方法的具體實(shí)現(xiàn):
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 將worker的狀態(tài)重置為正常狀態(tài),因?yàn)閟tate狀態(tài)值在構(gòu)造器中被初始化為-1 w.unlock(); // 通過(guò)completedAbruptly變量的值判斷任務(wù)是否正常執(zhí)行完成 boolean completedAbruptly = true; try { // 如果task為null就通過(guò)getTask方法獲取阻塞隊(duì)列中的下一個(gè)任務(wù) // getTask方法一般不會(huì)返回null,所以這個(gè)while類似于一個(gè)無(wú)限循環(huán) // worker對(duì)象就通過(guò)這個(gè)方法的持續(xù)運(yùn)行來(lái)不斷處理新的任務(wù) while (task != null || (task = getTask()) != null) { // 每一次任務(wù)的執(zhí)行都必須獲取鎖來(lái)保證下方臨界區(qū)代碼的線程安全 w.lock(); // 如果狀態(tài)值大于等于STOP(狀態(tài)值是有序的,即STOP、TIDYING、TERMINATED) // 且當(dāng)前線程還沒(méi)有被中斷,則主動(dòng)中斷線程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); // 開(kāi)始 try { // 執(zhí)行任務(wù)前處理操作,默認(rèn)是一個(gè)空實(shí)現(xiàn) // 在子類中可以通過(guò)重寫來(lái)改變?nèi)蝿?wù)執(zhí)行前的處理行為 beforeExecute(wt, task); // 通過(guò)thrown變量保存任務(wù)執(zhí)行過(guò)程中拋出的異常 // 提供給下面finally塊中的afterExecute方法使用 Throwable thrown = null; try { // *** 重要:實(shí)際執(zhí)行任務(wù)的代碼 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { // 因?yàn)镽unnable接口的run方法中不能拋出Throwable對(duì)象 // 所以要包裝成Error對(duì)象拋出 thrown = x; throw new Error(x); } finally { // 執(zhí)行任務(wù)后處理操作,默認(rèn)是一個(gè)空實(shí)現(xiàn) // 在子類中可以通過(guò)重寫來(lái)改變?nèi)蝿?wù)執(zhí)行后的處理行為 afterExecute(task, thrown); } } finally { // 將循環(huán)變量task設(shè)置為null,表示已處理完成 task = null; // 累加當(dāng)前worker已經(jīng)完成的任務(wù)數(shù) w.completedTasks++; // 釋放while體中第一行獲取的鎖 w.unlock(); } } // 將completedAbruptly變量設(shè)置為false,表示任務(wù)正常處理完成 completedAbruptly = false; } finally { // 銷毀當(dāng)前的worker對(duì)象,并完成一些諸如完成任務(wù)數(shù)量統(tǒng)計(jì)之類的輔助性工作 // 在線程池當(dāng)前狀態(tài)小于STOP的情況下會(huì)創(chuàng)建一個(gè)新的worker來(lái)替換被銷毀的worker processWorkerExit(w, completedAbruptly); } }
在runWorker方法的源代碼中有兩個(gè)比較重要的方法調(diào)用,一個(gè)是while條件中對(duì)getTask方法的調(diào)用,一個(gè)是在方法的最后對(duì)processWorkerExit方法的調(diào)用。下面是對(duì)這兩個(gè)方法更詳細(xì)的解釋。
getTask方法在阻塞隊(duì)列中有待執(zhí)行的任務(wù)時(shí)會(huì)從隊(duì)列中彈出一個(gè)任務(wù)并返回,如果阻塞隊(duì)列為空,那么就會(huì)阻塞等待新的任務(wù)提交到隊(duì)列中直到超時(shí)(在一些配置下會(huì)一直等待而不超時(shí)),如果在超時(shí)之前獲取到了新的任務(wù),那么就會(huì)將這個(gè)任務(wù)作為返回值返回。
當(dāng)getTask方法返回null時(shí)會(huì)導(dǎo)致當(dāng)前Worker退出,當(dāng)前線程被銷毀。在以下情況下getTask方法才會(huì)返回null:
當(dāng)前線程池中的線程數(shù)超過(guò)了最大線程數(shù)。這是因?yàn)檫\(yùn)行時(shí)通過(guò)調(diào)用setMaximumPoolSize修改了最大線程數(shù)而導(dǎo)致的結(jié)果;
線程池處于STOP狀態(tài)。這種情況下所有線程都應(yīng)該被立即回收銷毀;
線程池處于SHUTDOWN狀態(tài),且阻塞隊(duì)列為空。這種情況下已經(jīng)不會(huì)有新的任務(wù)被提交到阻塞隊(duì)列中了,所以線程應(yīng)該被銷毀;
線程可以被超時(shí)回收的情況下等待新任務(wù)超時(shí)。線程被超時(shí)回收一般有以下兩種情況:
超出核心線程數(shù)部分的線程等待任務(wù)超時(shí)
允許核心線程超時(shí)(線程池配置)的情況下線程等待任務(wù)超時(shí)
processWorkerExit方法會(huì)銷毀當(dāng)前線程對(duì)應(yīng)的Worker對(duì)象,并執(zhí)行一些累加總處理任務(wù)數(shù)等輔助操作。但在線程池當(dāng)前狀態(tài)小于STOP的情況下會(huì)創(chuàng)建一個(gè)新的Worker來(lái)替換被銷毀的Worker,有興趣的讀者可以自行參考processWorkerExit方法源代碼。
總結(jié)到這里我們的線程池源代碼之旅就結(jié)束了,希望大家在看完這篇文章之后能對(duì)線程池的使用和運(yùn)行都有一個(gè)大概的印象。為什么說(shuō)只是有了一個(gè)大概的印象呢?因?yàn)槲矣X(jué)得很多沒(méi)有相關(guān)基礎(chǔ)的讀者讀到這里可能還只是對(duì)線程池有了一個(gè)自己的認(rèn)識(shí),對(duì)其中的一些細(xì)節(jié)可能還沒(méi)有完全捕捉到。所以我建議大家在看完下面的總結(jié)之后不妨再返回到文章的開(kāi)頭多讀幾遍,相信第二遍的閱讀能給大家?guī)?lái)不一樣的體驗(yàn),因?yàn)槲易约阂彩窃诘谌巫xThreadPoolExecutor類的源代碼時(shí)才真正打通了其中的一些重要關(guān)節(jié)的。
在這篇文章中我們從線程池的概念和基本使用方法說(shuō)起,然后介紹了ThreadPoolExecutor的構(gòu)造器參數(shù)和常用的四種具體配置。最后的一大半篇幅我們一起在TheadPoolExecutor類的源代碼中暢游了一番,了解了從線程池的創(chuàng)建到任務(wù)執(zhí)行的完整執(zhí)行模型。
引子在瀏覽ThreadPoolExexutor源碼的過(guò)程中,有幾個(gè)點(diǎn)我們其實(shí)并沒(méi)有完全說(shuō)清楚,比如對(duì)鎖的加鎖操作、對(duì)控制變量的多次獲取、控制變量的AtomicInteger類型。在下一篇文章中,我將會(huì)介紹這些以鎖、volatile變量、CAS操作、AQS抽象類為代表的一系列線程同步方法,歡迎感興趣的讀者繼續(xù)關(guān)注我后續(xù)發(fā)布的文章~
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/73762.html
摘要:那么線程池到底是怎么利用類來(lái)實(shí)現(xiàn)持續(xù)不斷地接收提交的任務(wù)并執(zhí)行的呢接下來(lái),我們通過(guò)的源代碼來(lái)一步一步抽絲剝繭,揭開(kāi)線程池運(yùn)行模型的神秘面紗。 在上一篇文章《從0到1玩轉(zhuǎn)線程池》中,我們了解了線程池的使用方法,以及向線程池中提交任務(wù)的完整流程和ThreadPoolExecutor.execute方法的源代碼。在這篇文章中,我們將會(huì)從頭閱讀線程池ThreadPoolExecutor類的源代...
摘要:另外,為了線程切換后能恢復(fù)到正確的執(zhí)行位置,每條線程都需要有一個(gè)獨(dú)立的程序計(jì)數(shù)器,各線程之間計(jì)數(shù)器互不影響,獨(dú)立存儲(chǔ),我們稱這類內(nèi)存區(qū)域?yàn)榫€程私有的內(nèi)存。運(yùn)行時(shí)常量池運(yùn)行時(shí)常量池是方法區(qū)的一部分。 寫在前面(常見(jiàn)面試題) 基本問(wèn)題: 介紹下 Java 內(nèi)存區(qū)域(運(yùn)行時(shí)數(shù)據(jù)區(qū)) Java 對(duì)象的創(chuàng)建過(guò)程(五步,建議能默寫出來(lái)并且要知道每一步虛擬機(jī)做了什么) 對(duì)象的訪問(wèn)定位的兩種方式(句...
摘要:新生代收集器,復(fù)制算法,并行收集,面向吞吐量要求吞吐量?jī)?yōu)先收集器。吞吐量用戶代碼運(yùn)行時(shí)間用戶代碼運(yùn)行時(shí)間垃圾回收時(shí)間控制最大垃圾收集停頓時(shí)間,大于零的毫秒數(shù)。吞吐量大小,到的整數(shù),垃圾收集時(shí)間占總時(shí)間的比例,計(jì)算時(shí)間占用比例。 基礎(chǔ)背景 運(yùn)行時(shí)數(shù)據(jù)區(qū)域 虛擬機(jī)結(jié)構(gòu)圖 showImg(https://segmentfault.com/img/bVbpRUI?w=378&h=309); 程...
摘要:不同的是它還多了內(nèi)部類和內(nèi)部類,以及讀寫對(duì)應(yīng)的成員變量和方法。另外是給和內(nèi)部類使用的。內(nèi)部類前面說(shuō)到的操作是分配到里面執(zhí)行的。他們都是接口的實(shí)現(xiàn),所以其實(shí)最像應(yīng)該是這個(gè)兩個(gè)內(nèi)部類。而且大體上也沒(méi)什么差異,也是用的內(nèi)部類。 之前講了《AQS源碼閱讀》和《ReentrantLock源碼閱讀》,本次將延續(xù)閱讀下ReentrantReadWriteLock,建議沒(méi)看過(guò)之前兩篇文章的,先大概了解...
閱讀 1791·2023-04-26 01:02
閱讀 4985·2021-11-24 09:39
閱讀 1866·2019-08-30 15:44
閱讀 3069·2019-08-30 11:10
閱讀 1844·2019-08-30 10:49
閱讀 1105·2019-08-29 17:06
閱讀 657·2019-08-29 16:15
閱讀 958·2019-08-29 15:17