成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專(zhuān)欄INFORMATION COLUMN

ThreadPoolExecutor源碼分析

周?chē)?guó)輝 / 2562人閱讀

摘要:源碼分析文章轉(zhuǎn)自源碼分析前段時(shí)間學(xué)習(xí)的源碼,學(xué)習(xí)線程池這一塊的時(shí)候發(fā)現(xiàn)了一篇不錯(cuò)的文章,就記錄下來(lái)。這個(gè)方法在任何可能導(dǎo)致線程池終止的動(dòng)作后執(zhí)行比如減少或狀態(tài)下從隊(duì)列中移除任務(wù)。

threadpoolexecutor源碼分析

文章轉(zhuǎn)自:threadpoolexecutor源碼分析
前段時(shí)間學(xué)習(xí)java.util.concurrent的源碼,學(xué)習(xí)線程池這一塊的時(shí)候發(fā)現(xiàn)了一篇不錯(cuò)的文章,就記錄下來(lái)。同時(shí),文章之中加入了自己的一些見(jiàn)解。廢話不多說(shuō),直接開(kāi)始。

ThreadPoolExecutor作為Java.util.concurrent包中核心的類(lèi),先看下類(lèi)型的結(jié)構(gòu):

核心的接口其實(shí)是Executor,它只有一個(gè)execute方法抽象為對(duì)任務(wù)(Runnable接口)的執(zhí)行, ExecutorService接口在Executor的基礎(chǔ)上提供了對(duì)任務(wù)執(zhí)行的生命周期的管理,主要是submit和shutdown方法, AbstractExecutorService對(duì)ExecutorService一些方法做了默認(rèn)的實(shí)現(xiàn),主要是submit和invoke方法,而真正的任務(wù)執(zhí)行 的Executor接口execute方法是由子類(lèi)實(shí)現(xiàn),就是ThreadPoolExecutor,它實(shí)現(xiàn)了基于線程池的任務(wù)執(zhí)行框架,所以要了解 JDK的線程池,那么就得先看這個(gè)類(lèi)。

再看execute方法之前需要先介幾個(gè)變量或類(lèi)。

ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

這個(gè)變量是整個(gè)類(lèi)的核心,AtomicInteger保證了對(duì)這個(gè)變量的操作是原子的,通過(guò)巧妙的操作,ThreadPoolExecutor用這一個(gè)變量保存了兩個(gè)內(nèi)容:

所有有效線程的數(shù)量

各個(gè)線程的狀態(tài)(runState)

低29位存線程數(shù),高3位存runState,這樣runState有5個(gè)值:

RUNNING:-536870912

SHUTDOWN:0

STOP:536870912

TIDYING:1073741824

TERMINATED:1610612736

線程池中各個(gè)狀態(tài)間的轉(zhuǎn)換比較復(fù)雜,主要記住下面內(nèi)容就可以了:

RUNNING狀態(tài):線程池正常運(yùn)行,可以接受新的任務(wù)并處理隊(duì)列中的任務(wù);

SHUTDOWN狀態(tài):不再接受新的任務(wù),但是會(huì)執(zhí)行隊(duì)列中的任務(wù);

STOP狀態(tài):不再接受新任務(wù),不處理隊(duì)列中的任務(wù)

圍繞rtc有一些操作和變量:

/**
 * 這個(gè)方法用于取出runState的值 因?yàn)镃APACITY值為:00011111111111111111111111111111
 * ~為按位取反操作,則~CAPACITY值為:11100000000000000000000000000000
 * 再同參數(shù)做&操作,就將低29位置0了,而高3位還是保持原先的值,也就是runState的值
 * 
 * @param c
 *            該參數(shù)為存儲(chǔ)runState和workerCount的int值
 * @return runState的值
 */
private static int runStateOf(int c) {
    return c & ~CAPACITY;
}


/**
 * 這個(gè)方法用于取出workerCount的值
 * 因?yàn)镃APACITY值為:00011111111111111111111111111111,所以&操作將參數(shù)的高3位置0了
 * 保留參數(shù)的低29位,也就是workerCount的值
 * 
 * @param c
 *            ctl, 存儲(chǔ)runState和workerCount的int值
 * @return workerCount的值
 */
private static int workerCountOf(int c) {
    return c & CAPACITY;
}

/**
 * 將runState和workerCount存到同一個(gè)int中
 * “|”運(yùn)算的意思是,假設(shè)rs的值是101000,wc的值是000111,則他們位或運(yùn)算的值為101111
 * 
 * @param rs
 *            runState移位過(guò)后的值,負(fù)責(zé)填充返回值的高3位
 * @param wc
 *            workerCount移位過(guò)后的值,負(fù)責(zé)填充返回值的低29位
 * @return 兩者或運(yùn)算過(guò)后的值
 */
private static int ctlOf(int rs, int wc) {
    return rs | wc;
}

// 只有RUNNING狀態(tài)會(huì)小于0
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
corePoolSize

核心線程池大小,活動(dòng)線程小于corePoolSize則直接創(chuàng)建,大于等于則先加到workQueue中,隊(duì)列滿了才創(chuàng)建新的線程。

keepAliveTime

線程從隊(duì)列中獲取任務(wù)的超時(shí)時(shí)間,也就是說(shuō)如果線程空閑超過(guò)這個(gè)時(shí)間就會(huì)終止。

Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable ...

內(nèi)部類(lèi)Worker是對(duì)任務(wù)的封裝,所有submit的Runnable都被封裝成了Worker,它本身也是一個(gè)Runnable, 然后利用AQS框架(關(guān)于AQS可以看我這篇文章)實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的非重入的互斥鎖, 實(shí)現(xiàn)互斥鎖主要目的是為了中斷的時(shí)候判斷線程是在空閑還是運(yùn)行,可以看后面shutdown和shutdownNow方法的分析。

// state只有0和1,互斥
protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;// 成功獲得鎖
    }
    // 線程進(jìn)入等待隊(duì)列
    return false;
}

protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

之所以不用ReentrantLock是為了避免任務(wù)執(zhí)行的代碼中修改線程池的變量,如setCorePoolSize,因?yàn)镽eentrantLock是可重入的。

execute

execute方法主要三個(gè)步驟:

活動(dòng)線程小于corePoolSize的時(shí)候創(chuàng)建新的線程;

活動(dòng)線程大于corePoolSize時(shí)都是先加入到任務(wù)隊(duì)列當(dāng)中;

任務(wù)隊(duì)列滿了再去啟動(dòng)新的線程,如果線程數(shù)達(dá)到最大值就拒絕任務(wù)。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 活動(dòng)線程數(shù) < corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 直接啟動(dòng)新的線程。第二個(gè)參數(shù)true:addWorker中會(huì)重新檢查workerCount是否小于corePoolSize
        if (addWorker(command, true))
            // 添加成功返回
            return;
        c = ctl.get();
    }
    // 活動(dòng)線程數(shù) >= corePoolSize
    // runState為RUNNING && 隊(duì)列未滿
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // double check
        // 非RUNNING狀態(tài) 則從workQueue中移除任務(wù)并拒絕
        if (!isRunning(recheck) && remove(command))
            reject(command);// 采用線程池指定的策略拒絕任務(wù)
        // 線程池處于RUNNING狀態(tài) || 線程池處于非RUNNING狀態(tài)但是任務(wù)移除失敗
        else if (workerCountOf(recheck) == 0)
            // 這行代碼是為了SHUTDOWN狀態(tài)下沒(méi)有活動(dòng)線程了,但是隊(duì)列里還有任務(wù)沒(méi)執(zhí)行這種特殊情況。
            // 添加一個(gè)null任務(wù)是因?yàn)镾HUTDOWN狀態(tài)下,線程池不再接受新任務(wù)
            addWorker(null, false);

        // 兩種情況:
        // 1.非RUNNING狀態(tài)拒絕新的任務(wù)
        // 2.隊(duì)列滿了啟動(dòng)新的線程失?。╳orkCount > maximumPoolSize)
    } else if (!addWorker(command, false))
        reject(command);
}

其中比較難理解的應(yīng)該是addWorker(null, false);這一行,這要結(jié)合addWorker一起來(lái)看。 主要目的是防止HUTDOWN狀態(tài)下沒(méi)有活動(dòng)線程了,但是隊(duì)列里還有任務(wù)沒(méi)執(zhí)行這種特殊情況。

addWorker
/**
* @param firstTask:新增一個(gè)線程并執(zhí)行這個(gè)任務(wù),可空,增加的線程從隊(duì)列獲取任務(wù);
* 
* @param core:是否使用corePoolSize作為上限,否則使用maxmunPoolSize
**/
private boolean addWorker(Runnable firstTask, boolean core) {
        retry: for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);// 當(dāng)前線程池狀態(tài)

            // Check if queue empty only if necessary.
            // 這條語(yǔ)句等價(jià):rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
            // workQueue.isEmpty())
            // 滿足下列調(diào)價(jià)則直接返回false,線程創(chuàng)建失敗:
            // rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此時(shí)不再接受新的任務(wù),且所有任務(wù)執(zhí)行結(jié)束
            // rs = SHUTDOWN:firtTask != null 此時(shí)不再接受任務(wù),但是仍然會(huì)執(zhí)行隊(duì)列中的任務(wù)
            // rs = SHUTDOWN:firtTask == null見(jiàn)execute方法的addWorker(null,
            // false),任務(wù)為null && 隊(duì)列為空
            // 最后一種情況也就是說(shuō)SHUTDONW狀態(tài)下,如果隊(duì)列不為空還得接著往下執(zhí)行,為什么?add一個(gè)null任務(wù)目的到底是什么?
            // 看execute方法只有workCount==0的時(shí)候firstTask才會(huì)為null結(jié)合這里的條件就是線程池SHUTDOWN了不再接受新任務(wù)
            // 但是此時(shí)隊(duì)列不為空,那么還得創(chuàng)建線程把任務(wù)給執(zhí)行完才行。
            if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                return false;

            // 走到這的情形:
            // 1.線程池狀態(tài)為RUNNING
            // 2.SHUTDOWN狀態(tài),但隊(duì)列中還有任務(wù)需要執(zhí)行
            for (;;) {
                int wc = workerCountOf(c);
                //判斷條件有點(diǎn)難理解,其實(shí)是非運(yùn)行狀態(tài)下(>=SHUTDOWN)或者SHUTDOWN狀態(tài)下任務(wù)非空(新提交任務(wù))、任務(wù)隊(duì)列為空,
                //就不可以再新增線程了(return false),即SHUTDOWN狀態(tài)是可以新增線程去執(zhí)行隊(duì)列中的任務(wù);  
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))// 原子操作遞增workCount
                    break retry;// 操作成功跳出的重試的循環(huán)
                c = ctl.get(); // Re-read ctl
                if (runStateOf(c) != rs)// 如果線程池的狀態(tài)發(fā)生變化則重試
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        // wokerCount遞增成功

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 并發(fā)的訪問(wèn)線程池workers對(duì)象必須加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    // RUNNING狀態(tài) || SHUTDONW狀態(tài)下清理隊(duì)列中剩余的任務(wù)
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 將新啟動(dòng)的線程添加到線程池中
                        workers.add(w);
                        // 更新largestPoolSize
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 啟動(dòng)新添加的線程,這個(gè)線程首先執(zhí)行firstTask,然后不停的從隊(duì)列中取任務(wù)執(zhí)行
                // 當(dāng)?shù)却齥eepAlieTime還沒(méi)有任務(wù)執(zhí)行則該線程結(jié)束。見(jiàn)runWoker和getTask方法的代碼。
                if (workerAdded) {
                    t.start();// 最終執(zhí)行的是ThreadPoolExecutor的runWoker方法
                    workerStarted = true;
                }
            }
        } finally {
            // 線程啟動(dòng)失敗,則從wokers中移除w并遞減wokerCount
            if (!workerStarted)
                // 遞減wokerCount會(huì)觸發(fā)tryTerminate方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }
runWorker

任務(wù)添加成功后實(shí)際執(zhí)行的是runWorker這個(gè)方法,這個(gè)方法非常重要,簡(jiǎn)單來(lái)說(shuō)它做的就是:

第一次啟動(dòng)會(huì)執(zhí)行初始化傳進(jìn)來(lái)的任務(wù)firstTask;

然后會(huì)從workQueue中取任務(wù)執(zhí)行,如果隊(duì)列為空則等待keepAliveTime這么長(zhǎng)時(shí)間。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // Worker的構(gòu)造函數(shù)中抑制了線程中斷setState(-1),所以這里需要unlock從而允許中斷
        w.unlock();
        // 用于標(biāo)識(shí)是否異常終止,finally中processWorkerExit的方法會(huì)有不同邏輯
        // 為true的情況:1.執(zhí)行任務(wù)拋出異常;2.被中斷。
        boolean completedAbruptly = true;
        try {
            // 如果getTask返回null那么getTask中會(huì)將workerCount遞減,如果異常了這個(gè)遞減操作會(huì)在processWorkerExit中處理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                        && !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 任務(wù)執(zhí)行前可以插入一些處理,子類(lèi)重載該方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();// 執(zhí)行用戶任務(wù)
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        // 和beforeExecute一樣,留給子類(lèi)去重載
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }

            completedAbruptly = false;
        } finally {
            // 結(jié)束線程的一些清理工作
            processWorkerExit(w, completedAbruptly);
        }
    }
getTask
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry: for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 1.rs > SHUTDOWN 所以rs至少等于STOP,這時(shí)不再處理隊(duì)列中的任務(wù)
            // 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,這時(shí)還需要處理隊(duì)列中的任務(wù)除非隊(duì)列為空
            // 這兩種情況都會(huì)返回null讓runWoker退出while循環(huán)也就是當(dāng)前線程結(jié)束了,所以必須要decrement
            // wokerCount
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 遞減workerCount值
                decrementWorkerCount();
                return null;
            }

            // 標(biāo)記從隊(duì)列中取任務(wù)時(shí)是否設(shè)置超時(shí)時(shí)間
            boolean timed; // Are workers subject to culling?

            // 1.RUNING狀態(tài)
            // 2.SHUTDOWN狀態(tài),但隊(duì)列中還有任務(wù)需要執(zhí)行
            for (;;) {
                int wc = workerCountOf(c);

                // 1.core thread允許被超時(shí),那么超過(guò)corePoolSize的的線程必定有超時(shí)
                // 2.allowCoreThreadTimeOut == false && wc >
                // corePoolSize時(shí),一般都是這種情況,core thread即使空閑也不會(huì)被回收,只要超過(guò)的線程才會(huì)
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                // 從addWorker可以看到一般wc不會(huì)大于maximumPoolSize,所以更關(guān)心后面半句的情形:
                // 1. timedOut == false 第一次執(zhí)行循環(huán), 從隊(duì)列中取出任務(wù)不為null方法返回 或者
                // poll出異常了重試
                // 2.timeOut == true && timed ==
                // false:看后面的代碼workerQueue.poll超時(shí)時(shí)timeOut才為true,
                // 并且timed要為false,這兩個(gè)條件相悖不可能同時(shí)成立(既然有超時(shí)那么timed肯定為true)
                // 所以超時(shí)不會(huì)繼續(xù)執(zhí)行而是return null結(jié)束線程。(重點(diǎn):線程是如何超時(shí)的???)
                if (wc <= maximumPoolSize && !(timedOut && timed))
                    break;

                // workerCount遞減,結(jié)束當(dāng)前thread
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get(); // Re-read ctl
                // 需要重新檢查線程池狀態(tài),因?yàn)樯鲜霾僮鬟^(guò)程中線程池可能被SHUTDOWN
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
                // 1.以指定的超時(shí)時(shí)間從隊(duì)列中取任務(wù)
                // 2.core thread沒(méi)有超時(shí)
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;// 超時(shí)
            } catch (InterruptedException retry) {
                timedOut = false;// 線程被中斷重試
            }
        }
    }
processWorkerExit

線程退出會(huì)執(zhí)行這個(gè)方法做一些清理工作。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 正常的話再runWorker的getTask方法workerCount已經(jīng)被減一了
        if (completedAbruptly)
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 累加線程的completedTasks
            completedTaskCount += w.completedTasks;
            // 從線程池中移除超時(shí)或者出現(xiàn)異常的線程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        // 嘗試停止線程池
        tryTerminate();

        int c = ctl.get();
        // runState為RUNNING或SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            // 線程不是異常結(jié)束
            if (!completedAbruptly) {
                // 線程池最小空閑數(shù),允許core thread超時(shí)就是0,否則就是corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果min == 0但是隊(duì)列不為空要保證有1個(gè)線程來(lái)執(zhí)行隊(duì)列中的任務(wù)
                if (min == 0 && !workQueue.isEmpty())
                    min = 1;
                // 線程池還不為空那就不用擔(dān)心了
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 1.線程異常退出
            // 2.線程池為空,但是隊(duì)列中還有任務(wù)沒(méi)執(zhí)行,看addWoker方法對(duì)這種情況的處理
            addWorker(null, false);
        }
    }
tryTerminate

processWorkerExit方法中會(huì)嘗試調(diào)用tryTerminate來(lái)終止線程池。這個(gè)方法在任何可能導(dǎo)致線程池終止的動(dòng)作后執(zhí)行:比如減少wokerCount或SHUTDOWN狀態(tài)下從隊(duì)列中移除任務(wù)。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 以下?tīng)顟B(tài)直接返回:
            // 1.線程池還處于RUNNING狀態(tài)
            // 2.SHUTDOWN狀態(tài)但是任務(wù)隊(duì)列非空
            // 3.runState >= TIDYING 線程池已經(jīng)停止了或在停止了
            if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
                return;

            // 只能是以下情形會(huì)繼續(xù)下面的邏輯:結(jié)束線程池。
            // 1.SHUTDOWN狀態(tài),這時(shí)不再接受新任務(wù)而且任務(wù)隊(duì)列也空了
            // 2.STOP狀態(tài),當(dāng)調(diào)用了shutdownNow方法

            // workerCount不為0則還不能停止線程池,而且這時(shí)線程都處于空閑等待的狀態(tài)
            // 需要中斷讓線程“醒”過(guò)來(lái),醒過(guò)來(lái)的線程才能繼續(xù)處理shutdown的信號(hào)。
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // runWoker方法中w.unlock就是為了可以被中斷,getTask方法也處理了中斷。
                // ONLY_ONE:這里只需要中斷1個(gè)線程去處理shutdown信號(hào)就可以了。
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 進(jìn)入TIDYING狀態(tài)
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 子類(lèi)重載:一些資源清理工作
                        terminated();
                    } finally {
                        // TERMINATED狀態(tài)
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 繼續(xù)awaitTermination
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
shutdown和shutdownNow

shutdown這個(gè)方法會(huì)將runState置為SHUTDOWN,會(huì)終止所有空閑的線程。

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 線程池狀態(tài)設(shè)為SHUTDOWN,如果已經(jīng)至少是這個(gè)狀態(tài)那么則直接返回
            advanceRunState(SHUTDOWN);
            // 注意這里是中斷所有空閑的線程:runWorker中等待的線程被中斷 → 進(jìn)入processWorkerExit →
            // tryTerminate方法中會(huì)保證隊(duì)列中剩余的任務(wù)得到執(zhí)行。
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

shutdownNow方法將runState置為STOP。和shutdown方法的區(qū)別,這個(gè)方法會(huì)終止所有的線程。

public List shutdownNow() {
    List tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // STOP狀態(tài):不再接受新任務(wù)且不再執(zhí)行隊(duì)列中的任務(wù)。
        advanceRunState(STOP);
        // 中斷所有線程
        interruptWorkers();
        // 返回隊(duì)列中還沒(méi)有被執(zhí)行的任務(wù)。
        tasks = drainQueue();
    }
    finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

主要區(qū)別在于shutdown調(diào)用的是interruptIdleWorkers這個(gè)方法,而shutdownNow實(shí)際調(diào)用的是Worker類(lèi)的interruptIfStarted方法:

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // w.tryLock能獲取到鎖,說(shuō)明該線程沒(méi)有在運(yùn)行,因?yàn)閞unWorker中執(zhí)行任務(wù)會(huì)先lock,
            // 因此保證了中斷的肯定是空閑的線程。
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    }
    finally {
        mainLock.unlock();
    }
}
void interruptIfStarted() {
    Thread t;
    // 初始化時(shí)state == -1
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

這就是前面提到的Woker類(lèi)實(shí)現(xiàn)AQS的主要作用。

注意:shutdown方法可能會(huì)在finalize被隱式的調(diào)用。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/67477.html

相關(guān)文章

  • 使用 Executors,ThreadPoolExecutor,創(chuàng)建線程池,源碼分析理解

    摘要:源碼分析創(chuàng)建可緩沖的線程池。源碼分析使用創(chuàng)建線程池源碼分析的構(gòu)造函數(shù)構(gòu)造函數(shù)參數(shù)核心線程數(shù)大小,當(dāng)線程數(shù),會(huì)創(chuàng)建線程執(zhí)行最大線程數(shù),當(dāng)線程數(shù)的時(shí)候,會(huì)把放入中保持存活時(shí)間,當(dāng)線程數(shù)大于的空閑線程能保持的最大時(shí)間。 之前創(chuàng)建線程的時(shí)候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...

    Chiclaim 評(píng)論0 收藏0
  • 后端ing

    摘要:當(dāng)活動(dòng)線程核心線程非核心線程達(dá)到這個(gè)數(shù)值后,后續(xù)任務(wù)將會(huì)根據(jù)來(lái)進(jìn)行拒絕策略處理。線程池工作原則當(dāng)線程池中線程數(shù)量小于則創(chuàng)建線程,并處理請(qǐng)求。當(dāng)線程池中的數(shù)量等于最大線程數(shù)時(shí)默默丟棄不能執(zhí)行的新加任務(wù),不報(bào)任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點(diǎn)記錄以及采用的解決方案 深入分析 java 線程池的實(shí)現(xiàn)原理 在這篇文章中,作者有條不紊的將 ja...

    roadtogeek 評(píng)論0 收藏0
  • 線程池源碼分析

    摘要:線程池的作用線程池能有效的處理多個(gè)線程的并發(fā)問(wèn)題,避免大量的線程因?yàn)榛ハ鄰?qiáng)占系統(tǒng)資源導(dǎo)致阻塞現(xiàn)象,能夠有效的降低頻繁創(chuàng)建和銷(xiāo)毀線程對(duì)性能所帶來(lái)的開(kāi)銷(xiāo)。固定的線程數(shù)由系統(tǒng)資源設(shè)置。線程池的排隊(duì)策略與有關(guān)。線程池的狀態(tài)值分別是。 線程池的作用 線程池能有效的處理多個(gè)線程的并發(fā)問(wèn)題,避免大量的線程因?yàn)榛ハ鄰?qiáng)占系統(tǒng)資源導(dǎo)致阻塞現(xiàn)象,能夠有效的降低頻繁創(chuàng)建和銷(xiāo)毀線程對(duì)性能所帶來(lái)的開(kāi)銷(xiāo)。 線程池的...

    enda 評(píng)論0 收藏0
  • 一看就懂的Java線程池分析詳解

    摘要:任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開(kāi)處理。線程池在運(yùn)行過(guò)程中已完成的任務(wù)數(shù)量。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。線程池的線程數(shù)量。獲取活動(dòng)的線程數(shù)。通過(guò)擴(kuò)展線程池進(jìn)行監(jiān)控??蚣馨ň€程池,,,,,,等。 Java線程池 [toc] 什么是線程池 線程池就是有N個(gè)子線程共同在運(yùn)行的線程組合。 舉個(gè)容易理解的例子:有個(gè)線程組合(即線程池,咱可以比喻為一個(gè)公司),里面有3...

    Yangder 評(píng)論0 收藏0
  • Java調(diào)度線程池ScheduledThreadPoolExecutor源碼分析

    摘要:當(dāng)面試官問(wèn)線程池時(shí),你應(yīng)該知道些什么一執(zhí)行流程與不同,向中提交任務(wù)的時(shí)候,任務(wù)被包裝成對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)線程。當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開(kāi)始提交任務(wù)了。 最近新接手的項(xiàng)目里大量使用了ScheduledThreadPoolExecutor類(lèi)去執(zhí)行一些定時(shí)任務(wù),之前一直沒(méi)有機(jī)會(huì)研究這個(gè)類(lèi)的源碼,這次趁著機(jī)會(huì)好好研讀一下。 原文地址:http://www.jianshu....

    kohoh_ 評(píng)論0 收藏0
  • Java調(diào)度線程池ScheduledThreadPoolExecutor源碼分析

    摘要:當(dāng)面試官問(wèn)線程池時(shí),你應(yīng)該知道些什么一執(zhí)行流程與不同,向中提交任務(wù)的時(shí)候,任務(wù)被包裝成對(duì)象加入延遲隊(duì)列并啟動(dòng)一個(gè)線程。當(dāng)我們創(chuàng)建出一個(gè)調(diào)度線程池以后,就可以開(kāi)始提交任務(wù)了。 最近新接手的項(xiàng)目里大量使用了ScheduledThreadPoolExecutor類(lèi)去執(zhí)行一些定時(shí)任務(wù),之前一直沒(méi)有機(jī)會(huì)研究這個(gè)類(lèi)的源碼,這次趁著機(jī)會(huì)好好研讀一下。 原文地址:http://www.jianshu....

    cheukyin 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

周?chē)?guó)輝

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<