成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

一起學(xué)并發(fā)編程 - 簡(jiǎn)易線程池實(shí)現(xiàn)

Harriet666 / 1130人閱讀

摘要:并且,線程池在某些情況下還能動(dòng)態(tài)調(diào)整工作線程的數(shù)量,以平衡資源消耗和工作效率。同時(shí)線程池還提供了對(duì)池中工作線程進(jìn)行統(tǒng)一的管理的相關(guān)方法。

開(kāi)發(fā)中經(jīng)常會(huì)遇到各種池(如:連接池,線程池),它們的作用就是為了提高性能及減少開(kāi)銷(xiāo),在JDK1.5以后的java.util.concurrent包中內(nèi)置了很多不同使用場(chǎng)景的線程池,為了更好的理解它們,自己手寫(xiě)一個(gè)線程池,加深印象。

概述

1.什么是池

它的基本思想是一種對(duì)象池,程序初始化的時(shí)候開(kāi)辟一塊內(nèi)存空間,里面存放若干個(gè)線程對(duì)象,池中線程執(zhí)行調(diào)度由池管理器來(lái)處理。當(dāng)有線程任務(wù)時(shí),從池中取一個(gè),執(zhí)行完成后線程對(duì)象歸池,這樣可以避免反復(fù)創(chuàng)建線程對(duì)象所帶來(lái)的性能開(kāi)銷(xiāo),節(jié)省系統(tǒng)的資源。

2.使用線程池的好處

合理的使用線程池可以重復(fù)利用已創(chuàng)建的線程,這樣就可以減少在創(chuàng)建線程和銷(xiāo)毀線程上花費(fèi)的時(shí)間和資源。并且,線程池在某些情況下還能動(dòng)態(tài)調(diào)整工作線程的數(shù)量,以平衡資源消耗和工作效率。同時(shí)線程池還提供了對(duì)池中工作線程進(jìn)行統(tǒng)一的管理的相關(guān)方法。這樣就相當(dāng)于我們一次創(chuàng)建,就可以多次使用,大量的節(jié)省了系統(tǒng)頻繁的創(chuàng)建和銷(xiāo)毀線程所需要的資源。

簡(jiǎn)易版實(shí)現(xiàn)

包含功能:

1.創(chuàng)建線程池,銷(xiāo)毀線程池,添加新任務(wù)

2.沒(méi)有任務(wù)進(jìn)入等待,有任務(wù)則處理掉

3.動(dòng)態(tài)伸縮,擴(kuò)容

4.拒絕策略

介紹了線程池的原理以及主要組件之后,就讓我們來(lái)手動(dòng)實(shí)現(xiàn)一個(gè)自己的線程池,以加深理解和深入學(xué)習(xí)。因?yàn)樽约簩?shí)現(xiàn)的簡(jiǎn)易版本所以不建議生產(chǎn)中使用,生產(chǎn)中使用java.util.concurrent會(huì)更加健壯和優(yōu)雅(后續(xù)文章會(huì)介紹)

代碼

以下線程池相關(guān)代碼均在SimpleThreadPoolExecutor.java中,由于為了便于解讀因此以代碼塊的形式呈現(xiàn)

維護(hù)一個(gè)內(nèi)部枚舉類,用來(lái)標(biāo)記當(dāng)前任務(wù)線程狀態(tài),在Thread中其實(shí)也有.

private enum TaskState {
    FREE, RUNNABLE, BLOCKED, TERMINATED;
}

定義拒絕策略接口,以及默認(rèn)實(shí)現(xiàn)

static class DiscardException extends RuntimeException {
    private static final long serialVersionUID = 8827362380544575914L;

    DiscardException(String message) {
        super(message);
    }
}

interface DiscardPolicy {//拒絕策略接口
    void discard() throws DiscardException;
}

任務(wù)線程具體實(shí)現(xiàn)

1.繼承Thread,重寫(xiě)run方法。

2.this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty() 如果當(dāng)前線程處于空閑狀態(tài)且沒(méi)有任何任務(wù)了就將它wait住,讓出CPU執(zhí)行權(quán)

3.如果有任務(wù)就去執(zhí)行FIFO(先進(jìn)先出)策略

4.定義close方法,關(guān)閉線程,當(dāng)然這里不能暴力關(guān)閉,所以這里有需要借助interrupt

public static class WorkerTask extends Thread {
    // 線程狀態(tài)
    private TaskState taskState;
    // 線程編號(hào)
    private static int threadInitNumber;
    /**
     * 生成線程名,參考Thread.nextThreadNum();
     *
     * @return
     */
    private static synchronized String nextThreadName() {
        return THREAD_NAME_PREFIX + (++threadInitNumber);
    }

    WorkerTask() {
        super(THREAD_GROUP, nextThreadName());
    }

    @Override
    public void run() {
        Runnable target;
        //說(shuō)明該線程處于空閑狀態(tài)
        OUTER:
        while (this.taskState != TaskState.TERMINATED) {
            synchronized (TASK_QUEUE) {
                while (this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty()) {
                    try {
                        this.taskState = TaskState.BLOCKED;//此處標(biāo)記
                        //沒(méi)有任務(wù)就wait住,讓出CPU執(zhí)行權(quán)
                        TASK_QUEUE.wait();
                        //如果被打斷說(shuō)明當(dāng)前線程執(zhí)行了 shutdown() 方法  線程狀態(tài)為 TERMINATED 直接跳到 while 便于退出
                    } catch (InterruptedException e) {
                        break OUTER;
                    }
                }
                target = TASK_QUEUE.removeFirst();//遵循FIFO策略
            }
            if (target != null) {
                this.taskState = TaskState.RUNNABLE;
                target.run();//開(kāi)始任務(wù)了
                this.taskState = TaskState.FREE;
            }
        }
    }

    void close() {//優(yōu)雅關(guān)閉線程
        this.taskState = TaskState.TERMINATED;
        this.interrupt();
    }
}

簡(jiǎn)易版線程池,主要就是維護(hù)了一個(gè)任務(wù)隊(duì)列線程集,為了動(dòng)態(tài)擴(kuò)容,自己也繼承了Thread去做監(jiān)聽(tīng)操作,對(duì)外提供submit()提交執(zhí)行任務(wù)、shutdown()等待所有任務(wù)工作完畢,關(guān)閉線程池

public class SimpleThreadPoolExecutor extends Thread {

    // 線程池大小
    private int threadPoolSize;
    // 最大接收任務(wù)
    private int queueSize;
    // 拒絕策略
    private DiscardPolicy discardPolicy;
    // 是否被銷(xiāo)毀
    private volatile boolean destroy = false;

    private final static int DEFAULT_MIN_THREAD_SIZE = 2;// 默認(rèn)最小線程數(shù)
    private final static int DEFAULT_ACTIVE_THREAD_SIZE = 5;// 活躍線程
    private final static int DEFAULT_MAX_THREAD_SIZE = 10;// 最大線程
    private final static int DEFAULT_WORKER_QUEUE_SIZE = 100;// 最多執(zhí)行多少任務(wù)
    private final static String THREAD_NAME_PREFIX = "MY-THREAD-NAME-";//線程名前綴
    private final static String THREAD_POOL_NAME = "SIMPLE-POOL";//線程組的名稱
    private final static ThreadGroup THREAD_GROUP = new ThreadGroup(THREAD_POOL_NAME);//線程組
    private final static List WORKER_TASKS = new ArrayList<>();// 線程容器
    // 任務(wù)隊(duì)列容器,也可以用Queue 遵循 FIFO 規(guī)則
    private final static LinkedList TASK_QUEUE = new LinkedList<>();
    // 拒絕策略
    private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {
        throw new DiscardException("[拒絕執(zhí)行] - [任務(wù)隊(duì)列溢出...]");
    };

    private int minSize;//最小線程
    private int maxSize;//最大線程
    private int activeSize;//活躍線程

    SimpleThreadPoolExecutor() {
        this(DEFAULT_MIN_THREAD_SIZE, DEFAULT_ACTIVE_THREAD_SIZE, DEFAULT_MAX_THREAD_SIZE, DEFAULT_WORKER_QUEUE_SIZE, DEFAULT_DISCARD_POLICY);
    }

    SimpleThreadPoolExecutor(int minSize, int activeSize, int maxSize, int queueSize, DiscardPolicy discardPolicy){
        this.minSize = minSize;
        this.activeSize = activeSize;
        this.maxSize = maxSize;
        this.queueSize = queueSize;
        this.discardPolicy = discardPolicy;
        initPool();
    }

    void submit(Runnable runnable) {
        if (destroy) {
            throw new IllegalStateException("線程池已銷(xiāo)毀...");
        }
        synchronized (TASK_QUEUE) {
            if (TASK_QUEUE.size() > queueSize) {//如果當(dāng)前任務(wù)隊(duì)超出隊(duì)列限制,后續(xù)任務(wù)拒絕執(zhí)行
                discardPolicy.discard();
            }
            // 1.將任務(wù)添加到隊(duì)列
            TASK_QUEUE.addLast(runnable);
            // 2.喚醒等待的線程去執(zhí)行任務(wù)
            TASK_QUEUE.notifyAll();
        }
    }

    void shutdown() throws InterruptedException {
        int activeCount = THREAD_GROUP.activeCount();
        while (!TASK_QUEUE.isEmpty() && activeCount > 0) {
            // 如果還有任務(wù),那就休息一會(huì)
            Thread.sleep(100);
        }
        int intVal = WORKER_TASKS.size();//如果線程池中沒(méi)有線程,那就不用關(guān)了
        while (intVal > 0) {
            for (WorkerTask task : WORKER_TASKS) {
                //當(dāng)任務(wù)隊(duì)列為空的時(shí)候,線程狀態(tài)才會(huì)為 BLOCKED ,所以可以打斷掉,相反等任務(wù)執(zhí)行完在關(guān)閉
                if (task.taskState == TaskState.BLOCKED) {
                    task.close();
                    intVal--;
                } else {
                    Thread.sleep(50);
                }
            }
        }
        this.destroy = true;
        //資源回收
        TASK_QUEUE.clear();
        WORKER_TASKS.clear();
        this.interrupt();
        System.out.println("線程關(guān)閉");
    }

    private void createWorkerTask() {
        WorkerTask task = new WorkerTask();
        //剛創(chuàng)建出來(lái)的線程應(yīng)該是未使用的
        task.taskState = TaskState.FREE;
        WORKER_TASKS.add(task);
        task.start();
    }

    /**
     * 初始化操作
     */
    private void initPool() {
        for (int i = 0; i < this.minSize; i++) {
            this.createWorkerTask();
        }
        this.threadPoolSize = minSize;
        this.start();//自己?jiǎn)?dòng)自己
    }

    @Override
    public void run() {
        while (!destroy) {
            try {
                Thread.sleep(5_000L);
                if (TASK_QUEUE.size() > activeSize && threadPoolSize < activeSize) { // 第一次擴(kuò)容到 activeSize 大小
                    for (int i = threadPoolSize; i < activeSize; i++) {
                        createWorkerTask();
                    }
                    this.threadPoolSize = activeSize;
                    System.out.println("[初次擴(kuò)充] - [" + toString() + "]");
                } else if (TASK_QUEUE.size() > maxSize && threadPoolSize < maxSize) {// 第二次擴(kuò)容到最大線程
                    System.out.println();
                    for (int i = threadPoolSize; i < maxSize; i++) {
                        createWorkerTask();
                    }
                    this.threadPoolSize = maxSize;
                    System.out.println("[再次擴(kuò)充] - [" + toString() + "]");
                } else {
                    //防止線程在submit的時(shí)候,其他線程獲取到鎖干壞事
                    synchronized (WORKER_TASKS) {
                        int releaseSize = threadPoolSize - activeSize;
                        Iterator iterator = WORKER_TASKS.iterator();// List不允許在for中刪除集合元素,所以這里需要使用迭代器
                        while (iterator.hasNext()) {
                            if (releaseSize <= 0) {
                                break;
                            }
                            WorkerTask task = iterator.next();
                            //不能回收正在運(yùn)行的線程,只回收空閑線程
                            if (task.taskState == TaskState.FREE) {
                                task.close();
                                iterator.remove();
                                releaseSize--;
                            }
                        }
                        System.out.println("[資源回收] - [" + toString() + "]");
                    }
                    threadPoolSize = activeSize;
                }
            } catch (InterruptedException e) {
                System.out.println("資源釋放");
            }
        }
    }

    @Override
    public String toString() {
        return "SimpleThreadPoolExecutor{" +
                "threadPoolSize=" + threadPoolSize +
                ", taskQueueSize=" + TASK_QUEUE.size() +
                ", minSize=" + minSize +
                ", maxSize=" + maxSize +
                ", activeSize=" + activeSize +
                "}";
    }
}
測(cè)試一把

創(chuàng)建一個(gè)測(cè)試類

public class SimpleExecutorTest {

    public static void main(String[] args) throws InterruptedException {

        SimpleThreadPoolExecutor executor = new SimpleThreadPoolExecutor();
        IntStream.range(0, 30).forEach(i ->
                executor.submit(() -> {
                    System.out.printf("[線程] - [%s] 開(kāi)始工作...
", Thread.currentThread().getName());
                    try {
                        Thread.sleep(2_000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("[線程] - [%s] 工作完畢...
", Thread.currentThread().getName());
                })
        );
        //executor.shutdown();如果放開(kāi)注釋即會(huì)執(zhí)行完所有任務(wù)關(guān)閉線程池
    }
}

日志分析: 從日志中可以看到,初始化的時(shí)候是2個(gè)線程在工作,執(zhí)行速度較為緩慢,當(dāng)經(jīng)過(guò)第一次擴(kuò)容后,會(huì)觀察到線程池里線程個(gè)數(shù)增加了,執(zhí)行任務(wù)的速度就越來(lái)越快了,本文一共擴(kuò)容了2次,第一次是擴(kuò)容到activeSize的大小,第二次是擴(kuò)容到maxSize,在執(zhí)行任務(wù)的過(guò)程中,當(dāng)線程數(shù)過(guò)多的時(shí)候就會(huì)觸發(fā)回收機(jī)制...

[線程] - [MY-THREAD-NAME-1] 開(kāi)始工作...
[線程] - [MY-THREAD-NAME-2] 開(kāi)始工作...
[線程] - [MY-THREAD-NAME-1] 工作完畢...
[線程] - [MY-THREAD-NAME-1] 開(kāi)始工作...
[線程] - [MY-THREAD-NAME-2] 工作完畢...
[線程] - [MY-THREAD-NAME-2] 開(kāi)始工作...
[線程] - [MY-THREAD-NAME-1] 工作完畢...
[線程] - [MY-THREAD-NAME-1] 開(kāi)始工作...
[線程] - [MY-THREAD-NAME-2] 工作完畢...
[線程] - [MY-THREAD-NAME-2] 開(kāi)始工作...
[初次擴(kuò)充] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=44, minSize=2, maxSize=10, activeSize=5}]
[線程] - [MY-THREAD-NAME-3] 開(kāi)始工作...
...
[線程] - [MY-THREAD-NAME-6] 開(kāi)始工作...
[線程] - [MY-THREAD-NAME-7] 開(kāi)始工作...
[再次擴(kuò)充] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=30, minSize=2, maxSize=10, activeSize=5}]
[線程] - [MY-THREAD-NAME-10] 開(kāi)始工作...
...
[線程] - [MY-THREAD-NAME-5] 開(kāi)始工作...
[資源回收] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=4, minSize=2, maxSize=10, activeSize=5}]
[線程] - [MY-THREAD-NAME-4] 工作完畢...
...
[線程] - [MY-THREAD-NAME-7] 工作完畢...
[資源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]
[資源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]
總結(jié)

通過(guò)本文,大致可以了解線程池的工作原理和實(shí)現(xiàn)方式,學(xué)習(xí)的過(guò)程中,就是要知其然知其所以然。這樣才能更好地駕馭它,更好地去理解和使用,也能更好地幫助我們觸類旁通,后面的文章中會(huì)詳細(xì)介紹java.util.concurrent中的線程池

- 說(shuō)點(diǎn)什么

全文代碼:https://gitee.com/battcn/battcn-concurent/tree/master/Chapter1-1/battcn-thread/src/main/java/com/battcn/chapter12

個(gè)人QQ:1837307557

battcn開(kāi)源群(適合新手):391619659

微信公眾號(hào):battcn(歡迎調(diào)戲)

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/67758.html

相關(guān)文章

  • 一起學(xué)并發(fā)編程 - 處理異常中止的線程

    摘要:在之前,不能為線程單獨(dú)設(shè)置或指定一個(gè)默認(rèn)的,為了設(shè)置,需要繼承并覆寫(xiě)方法。幸運(yùn)的是后線程提供了一個(gè)方法,用來(lái)捕獲并處理因線程中拋出的未知異常,以避免程序終止。 在單線程的開(kāi)發(fā)過(guò)程中,通常采用try-catch的方式進(jìn)行異常捕獲,但是這種方式在多線程環(huán)境中會(huì)顯得無(wú)能為力,而且還有可能導(dǎo)致一些問(wèn)題的出現(xiàn),比如發(fā)生異常的時(shí)候不能及時(shí)回收系統(tǒng)資源,或者無(wú)法及時(shí)關(guān)閉當(dāng)前的連接... 概述 Ja...

    zacklee 評(píng)論0 收藏0
  • 一起學(xué)并發(fā)編程 - 等待與通知

    摘要:如果有其它線程調(diào)用了相同對(duì)象的方法,那么處于該對(duì)象的等待池中的線程就會(huì)全部進(jìn)入該對(duì)象的鎖池中,從新?tīng)?zhēng)奪鎖的擁有權(quán)。 wait,notify 和 notifyAll,這些在多線程中被經(jīng)常用到的保留關(guān)鍵字,在實(shí)際開(kāi)發(fā)的時(shí)候很多時(shí)候卻并沒(méi)有被大家重視,而本文則是對(duì)這些關(guān)鍵字的使用進(jìn)行描述。 存在即合理 在java中,每個(gè)對(duì)象都有兩個(gè)池,鎖池(monitor)和等待池(waitset),每個(gè)...

    Meathill 評(píng)論0 收藏0
  • java并發(fā)編程學(xué)習(xí)之線程-ThreadPoolExecutor(三)

    摘要:是所有線程池實(shí)現(xiàn)的父類,我們先看看構(gòu)造函數(shù)構(gòu)造參數(shù)線程核心數(shù)最大線程數(shù)線程空閑后,存活的時(shí)間,只有線程數(shù)大于的時(shí)候生效存活時(shí)間的單位任務(wù)的阻塞隊(duì)列創(chuàng)建線程的工程,給線程起名字當(dāng)線程池滿了,選擇新加入的任務(wù)應(yīng)該使用什么策略,比如拋異常丟棄當(dāng)前 ThreadPoolExecutor ThreadPoolExecutor是所有線程池實(shí)現(xiàn)的父類,我們先看看構(gòu)造函數(shù) 構(gòu)造參數(shù) corePool...

    阿羅 評(píng)論0 收藏0
  • Python

    摘要:最近看前端都展開(kāi)了幾場(chǎng)而我大知乎最熱語(yǔ)言還沒(méi)有相關(guān)。有關(guān)書(shū)籍的介紹,大部分截取自是官方介紹。但從開(kāi)始,標(biāo)準(zhǔn)庫(kù)為我們提供了模塊,它提供了和兩個(gè)類,實(shí)現(xiàn)了對(duì)和的進(jìn)一步抽象,對(duì)編寫(xiě)線程池進(jìn)程池提供了直接的支持。 《流暢的python》閱讀筆記 《流暢的python》是一本適合python進(jìn)階的書(shū), 里面介紹的基本都是高級(jí)的python用法. 對(duì)于初學(xué)python的人來(lái)說(shuō), 基礎(chǔ)大概也就夠用了...

    dailybird 評(píng)論0 收藏0
  • 一起學(xué)并發(fā)編程 - 守護(hù)線程

    摘要:的作用是為其他線程的運(yùn)行提供服務(wù),比如說(shuō)線程。在某些平臺(tái)上,指定一個(gè)較高的參數(shù)值可能使線程在拋出之前達(dá)到較大的遞歸深度。參數(shù)的值與最大遞歸深度和并發(fā)程度之間的關(guān)系細(xì)節(jié)與平臺(tái)有關(guān)。 今天研究了下Java線程基礎(chǔ)知識(shí),發(fā)現(xiàn)以前太多知識(shí)知識(shí)略略帶過(guò)了,比較說(shuō)Java的線程機(jī)制,在Java中有兩類線程:User Thread(用戶線程)、Daemon Thread(守護(hù)線程),以及構(gòu)造器中的s...

    junnplus 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<