中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

多線程(十七、深入了解線程池-ThreadPoolExecutor)

發布時間:2020-07-03 12:49:43 來源:網絡 閱讀:705 作者:shayang88 欄目:編程語言

ThreadPoolExecutor原理

ThreadPoolExecutor構造函數參數

/**
 * 使用給定的參數創建ThreadPoolExecutor.
 *
 * @param corePoolSize    核心線程池中的最大線程數
 * @param maximumPoolSize 總線程池中的最大線程數
 * @param keepAliveTime   空閑線程的存活時間
 * @param unit            keepAliveTime的單位
 * @param workQueue       任務隊列, 保存已經提交但尚未被執行的任務
 * @param threadFactory   線程工廠(用于指定如果創建一個線程)
 * @param handler         拒絕策略 (當任務太多導致工作隊列滿時的處理策略)
 */
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);   // 使用納秒保存存活時間
    this.threadFactory = threadFactory;
    this.handler = handler;
}

1、ThreadPoolExecutor在邏輯上將自身管理的線程池劃分為兩部分:核心線程池(大小對應為corePoolSize)、非核心線程池(大小對應為maximumPoolSize-corePoolSize)。
2、ThreadPoolExecutor中只有一種類型的線程,名叫Worker,它是ThreadPoolExecutor內部類,封裝著Runnable任務和執行該任務的Thread對象,我們稱它為【工作線程】,它也是ThreadPoolExecutor唯一需要進行維護的線程;
3、【核心線程池】【非核心線程池】都是邏輯上的概念,ThreadPoolExecutor在任務調度過程中會根據corePoolSize和maximumPoolSize的大小,判斷如何執行任務。

線程池狀態和管理

1、ThreadPoolExecutor內部定義了一個AtomicInteger變量—ctl,通過按位劃分的方式,在一個變量中記錄線程池狀態和工作線程數量

1、低29位保存線程數
2、高3位保存線程池狀態

//保存線程池狀態和工作線程數:低29位: 工作線程數,高3位 : 線程池狀態
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大線程數: 2^29-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 線程池狀態
    private static final int RUNNING    = -1 << COUNT_BITS; //接受新任務, 且處理已經進入阻塞隊列的任務
    private static final int SHUTDOWN   =  0 << COUNT_BITS; //不接受新任務, 但處理已經進入阻塞隊列的任務
    private static final int STOP       =  1 << COUNT_BITS; //不接受新任務, 且不處理已經進入阻塞隊列的任務, 同時中斷正在運行的任務
    private static final int TIDYING    =  2 << COUNT_BITS; //所有任務都已終止, 工作線程數為0, 線程轉化為TIDYING狀態并準備調用terminated方法
    private static final int TERMINATED =  3 << COUNT_BITS; //terminated方法已經執行完成

工作線程

工作線程(Worker),Worker內部類,實現了AQS框架,ThreadPoolExecutor通過一個HashSet來保存工作線程:

多線程(十七、深入了解線程池-ThreadPoolExecutor)

Worker定義

/**
 * Worker表示線程池中的一個工作線程, 可以與任務相關聯.
 * 由于實現了AQS框架, 其同步狀態值的定義如下:
 * -1: 初始狀態
 * 0:  無鎖狀態
 * 1:  加鎖狀態
 */
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

    /**
     * 與該Worker關聯的線程.
     */
    final Thread thread;
    /**
     *初始化任務,可以為空,為空的時候則去任務隊列workQueue里獲取
     */
    Runnable firstTask;
    /**
     * 當前工作線程處理完成的任務數
     */
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1); // 初始的同步狀態值
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /**
     * 執行任務
     */
    public void run() {
        runWorker(this);
    }

    /**
     * 是否加鎖
     */
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    /**
     * 嘗試獲取鎖
     */
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    /**
     * 嘗試釋放鎖
     */
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock() {
        acquire(1);
    }

    public boolean tryLock() {
        return tryAcquire(1);
    }

    public void unlock() {
        release(1);
    }

    public boolean isLocked() {
        return isHeldExclusively();
    }

    /**
     * 中斷線程(僅任務非初始狀態)
     */
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

線程池執行execute

execute代碼邏輯

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //獲取線程池狀態和工作線程數量
        int c = ctl.get();
        //如果工作線程數 < 核心線程數,則創建工作線程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) //addWorker創建工作線程
                return;
            c = ctl.get();
        }
        //工作線程創建失敗,或者,工作線程 >= 核心線程數,任務插入任務隊列workQueue
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次檢查線程池狀態,如果不是運行狀態,移除任務,并拒絕
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果工作線程為0,則創建一個不帶任務的線程,線程自動去任務隊列獲取任務執行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //任務插入隊列失敗,創建非核心工作線程,如果失敗,則說明工作線程 > 總線程數量,則執行拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }

執行流程圖

多線程(十七、深入了解線程池-ThreadPoolExecutor)

說明:
1、如果工作線程數小于核心線程池上限(CorePoolSize),則直接新建一個工作線程并執行任務;
2、如果工作線程數大于等于CorePoolSize,則嘗試將任務加入到隊列等待以后執行。如果隊列已滿,則在總線程池未滿的情況下(CorePoolSize ≤ 工作線程數 < maximumPoolSize)新建一個工作線程立即執行任務,否則執行拒絕策略。

創建工作線程addWorker

/**
     * 添加工作線程并執行任務
     *
     * @param firstTask 如果指定了該參數, 表示將立即執行該firstTask任務; 否則從工作隊列中獲取任務并執行
     * @param core      執行任務的工作線程歸屬于哪個線程池:  true-核心線程池  false-非核心線程池
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            /**
             * 這個if主要是判斷哪些情況下, 線程池不再接受新任務執行, 而是直接返回.總結下, 有以下幾種情況:
             * 1. 線程池狀態為 STOP 或 TIDYING 或 TERMINATED: 線程池狀態為上述任一一種時, 都不會再接受任務,所以直接返回
             * 2. 線程池狀態≥ SHUTDOWN 且 firstTask != null: 因為當線程池狀態≥ SHUTDOWN時, 不再接受新任務的提交,所以直接返回
             * 3. 線程池狀態≥ SHUTDOWN 且 隊列為空: 隊列中已經沒有任務了, 所以也就不需要執行任何任務了,可以直接返回
             */
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //獲取工作線程數
                int wc = workerCountOf(c);
                /**
                 * 這個if主要是判斷工作線程數是否超限, 以下任一情況屬于超限, 直接返回:
                 * 1. 工作線程數超過最大工作線程數(2^29-1)
                 * 2. 工作線程數超過核心線程池上限(入參core為true, 表示歸屬核心線程池)
                 * 3. 工作線程數超過總線程池上限(入參core為false, 表示歸屬非核心線程池)
                 */
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //增加工作線程數
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();
                //CAS失敗,自旋重新操作
                if (runStateOf(c) != rs)
                    continue retry;

            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //將任務包裝成工作線程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //加鎖
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //重新檢查線程池狀態
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //工作線程加入集合
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //啟動線程,這個很關鍵,因為t在new的時候是用Work作為任務的,
                    // Work實現了Runnale接口,所以t.start就是執行Work的run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

特別注意:
啟動線程,這個很關鍵,因為thread在new的時候是用Work(this)作為任務的, Work實現了Runnale接口,所以t.start就是執行Work的run方法。
多線程(十七、深入了解線程池-ThreadPoolExecutor)

工作線程的執行runWorker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 任務, 如果是null則從隊列取任務
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 允許執行線程被中斷
        boolean completedAbruptly = true;
        try {
            // 當task==null時會通過getTask從隊列取任務
            while (task != null || (task = getTask()) != null) {
                w.lock();

                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 鉤子方法,由子類自定義實現
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 執行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 鉤子方法,由子類自定義實現
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;  // 完成任務數+1
                    w.unlock();
                }
            }
            //說明該工作線程自身既沒有攜帶任務, 也沒從任務隊列中獲取到任務
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

獲取任務方法getTask

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            /**
             * 以下IF用于判斷哪些情況下不允許再從隊列獲取任務:
             * 1. 線程池進入停止狀態(STOP/TIDYING/TERMINATED), 此時即使隊列中還有任務未執行, 也不再執行
             * 2. 線程池非RUNNING狀態, 且隊列為空
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            /**
             * timed變量用于判斷是否需要進行超時控制:
             * 對于核心線程池中的工作線程, 除非設置了allowCoreThreadTimeOut==true, 否則不會超時回收;
             * 對于非核心線程池中的工作線程, 都需要超時控制
             */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 這里主要是當外部通過setMaximumPoolSize方法重新設置了最大線程數時
            // 需要回收多出的工作線程
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //區分超時操作還是非超時獲取
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

拒絕策略

所謂拒絕策略,就是在構造ThreadPoolExecutor時,傳入的RejectedExecutionHandler對象,一共4種
多線程(十七、深入了解線程池-ThreadPoolExecutor)

1.AbortPolicy(默認),拋出一個RejectedExecutionException異常
多線程(十七、深入了解線程池-ThreadPoolExecutor)
2、DiscardPolicy,什么都不做,等任務自己被回收
多線程(十七、深入了解線程池-ThreadPoolExecutor)
3、DiscardOldestPolicy,丟棄任務隊列中的最近一個任務,并執行當前任務
多線程(十七、深入了解線程池-ThreadPoolExecutor)
4、CallerRunsPolicy,以自身線程來執行任務,減緩新任務提交的速度
多線程(十七、深入了解線程池-ThreadPoolExecutor)

線程池關閉

1、shutdown方法將線程池切換到SHUTDOWN狀態(如果已經停止,則不用切換),并調用interruptIdleWorkers方法中斷所有空閑的工作線程,最后調用tryTerminate嘗試結束線程池

2、shutdownNow方法的主要不同之處就是,它會將線程池的狀態至少置為STOP,同時中斷所有工作線程(無論該線程是空閑還是運行中),同時返回任務隊列中的所有任務

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

兰考县| 平湖市| 珲春市| 京山县| 汾西县| 德格县| 桃江县| 靖宇县| 东安县| 延边| 儋州市| 澳门| 衡水市| 章丘市| 四川省| 威宁| 连云港市| 英超| 科技| 凤城市| 屯留县| 厦门市| 如皋市| 夏邑县| 菏泽市| 长泰县| 容城县| 华宁县| 垦利县| 南京市| 图们市| 平陆县| 双流县| 兴隆县| 永福县| 扎赉特旗| 桃园市| 陆丰市| 河北区| 内乡县| 基隆市|