中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行ThreadPoolExecutor 源碼解析

發布時間:2021-12-17 15:19:58 來源:億速云 閱讀:116 作者:柒染 欄目:大數據

如何進行ThreadPoolExecutor 源碼解析,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

一、線程

  1. 線程是CPU 調度的最小操作單位,線程模型分為KLT 模型和ULT 模型,JVM 使用的是KLT 模型。

  2. 線程的狀態 :NEW,RUNNABLE,BLOCKED,TERMINATED

二、線程池

1. 線程池解決的兩大核心問題:
  • 在執行大量異步運算的時候,線程池用優化系統性能,減少線程的反復創建所帶來的的系統開銷

  • 提供了一種限制和管理資源的方法

2. 7 大核心參數:
  1. corePoolSize :線程池中的核心線程數,當提交一個任務時,線程池創建一個新線程執行任務,直到當前線程數等于corePoolSize;如果當前線程數為corePoolSize,繼續提交的任務被保存到 阻塞隊列中,等待被執行;如果執行了線程池的prestartAllCoreThreads()方法,線程池會 提前創建并啟動所有核心線程。

  2. maximumPoolSize :線程池中允許的最大線程數。如果當前阻塞隊列滿了,且繼續提交任務,則創建新的線程執行任務,前提是當前線程數小于maximumPoolSize;

  3. keepAliveTime :線程池維護線程所允許的空閑時間。當線程池中的線程數量大corePoolSize的時 候,如果這時沒有新的任務提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待 的時間超過了keepAliveTime;

  4. unit: keepAliveTime的單位;

  5. workQueue: 用來保存等待被執行的任務的阻塞隊列,且任務必須實現Runable接口,在JDK中提供了如下阻塞隊列:

  • ArrayBlockingQueue:基于數組結構的有界阻塞隊列,按FIFO排序任務;

  • LinkedBlockingQuene:基于鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高于ArrayBlockingQuene; -

  • SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態,吞吐量通常要高于 LinkedBlockingQuene;

  • priorityBlockingQuene:具有優先級的無界阻塞隊列;

  1. threadFactory:它是ThreadFactory類型的變量,用來創建新線程。默認使用 Executors.defaultThreadFactory() 來創建線程。使用默認ThreadFactory來創建線程 時,會使新創建的線程具有相同的NORM_PRIORITY優先級并且是非守護線程,同時也設 置了線程的名稱。

  2. handler: 線程池的飽和策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續提交任務,必 須采取一種策略處理該任務,線程池提供了4種策略:

  • AbortPolicy:直接拋出異常,默認策略;

  • CallerRunsPolicy:用調用者所在的線程來執行任務;

  • DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,并執行當前任務;

  • DiscardPolicy:直接丟棄任務;

上面的4種策略都是ThreadPoolExecutor的內部類。當然也可以根據應用場景實現RejectedExecutionHandler接口,自定義飽和策略,如 記錄日志或持久化存儲不能處理的任務。

3. 線程池的生命周期狀態 :
  • NEW

  • RUNNABLE

  • WATING

  • BLOCKED

  • TIMED_WATING

  • TERMINATED

4. 線程池的重要屬性 :ctl
  1. ctl 是對線程池的運行狀態和線程池中有效線程的數量進行控制的一個字段, 它包含兩 部分的信息: 線程池的運行狀態 (runState) 和線程池內有效線程的數量 (workerCount),這 里可以看到,使用了Integer類型來保存,高3位保存runState,低29位保存 workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常 量表示workerCount的上限值,大約是5億。

  2. runState 主要提供線程池生命周期的控制,主要值包括:

  • RUNNING

(1) 狀態說明:線程池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行 處理。

(2) 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處 于RUNNING狀態,并且線程池中的任務數為0!

  • SHUTDOWN

(1) 狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。

(2) 狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> -SHUTDOWN。

  • STOP

(1) 狀態說明:線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,并且會中 斷正在處理的任務。

(2) 狀態切換:調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。

  • TIDYING

(1) 狀態說明:當所有的任務已終止,ctl記錄的”任務數量”為0,線程池會變為TIDYING 狀態。當線程池變為TIDYING狀態時,會執行鉤子函數terminated()。terminated()在 ThreadPoolExecutor類中是空的,若用戶想在線程池變為TIDYING時,進行相應的處理; 可以通過重載terminated()函數來實現。

(2) 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空并且線程池中執行的任務也 為空時,就會由 SHUTDOWN -> TIDYING。 當線程池在STOP狀態下,線程池中執行的 任務為空時,就會由STOP -> TIDYING。

  • TERMINATED

(1) 狀態說明:線程池徹底終止,就變成TERMINATED狀態。

(2) 狀態切換:線程池處在TIDYING狀態時,執行完terminated()之后,就會由 TIDYING - > TERMINATED。 進入TERMINATED的條件如下: 線程池不是RUNNING狀態; 線程池狀態不是TIDYING狀態或TERMINATED狀態; 如果線程池狀態是SHUTDOWN并且workerQueue為空; workerCount為0,設置TIDYING狀態成功。

如何進行ThreadPoolExecutor 源碼解析

  1. ctl相關 API

  • runStateOf():獲取運行狀態;

  • workerCountOf():獲取活動線程數;

  • ctlOf():獲取運行狀態和活動線程數的值。

5. 線程池的行為
  • execute(Runnable command):執行Ruannable類型的任務

  • submit(task):可用來提交Callable或Runnable任務,并返回代表此任務的Future 對象

  • shutdown():在完成已提交的任務后封閉辦事,不再接管新任務,

  • shutdownNow():停止所有正在履行的任務并封閉辦事。

  • isTerminated():測試是否所有任務都履行完畢了。

  • isShutdown():測試是否該ExecutorService已被關閉。

6. 常用線程池的具體實現
ThreadPoolExecutor 默認線程池 
ScheduledThreadPoolExecutor 定時線程池
7. 線程池監控API
  • public long getTaskCount() //線程池已執行與未執行的任務總數

  • public long getCompletedTaskCount() //已完成的任務數

  • public int getPoolSize() //線程池當前的線程數

  • public int getActiveCount() //線程池中正在執行任務的線程數量

三、源碼解析

execute() 方法
//在將來的某個時間執行給定的任務。任務可以是新起一個新線程或者復用現有池線程中的線程去執行
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 執行過程還是分為 3 步:
         *
         * 1.執行任務: 
         * 如果小于核心線程數,嘗試創建一個新線程來執行給定的任務。
         * 方法 addWorker() 就是真正的創建一個新線程來執行任務的方法。
         * addWorker()方法會對 runState 和 workerCount進行原子檢查。
         * addWorker()方法會返回一個 boolean 值,通過返回 false 值來防止在不應該添加線程的情況下發出錯誤警報
         * 
         *
         * 2.添加到阻塞隊列:
         * 未能滿足條件執行完步驟 1 則添加到阻塞隊列。
         * 如果任務可以成功排隊,會再次進行檢查,檢查是否應該添加線程(因為現有線程自上次檢查后就死了),
         * 或者自進入此方法以來該池已關閉。因此,需要重新檢查狀態,并在停止的情況下在必要時回滾隊列,如果沒有,則啟動一個新線程。 
         *
         * 3.拒絕任務: 
         * 如果無法將任務添加至阻塞隊列,最大線程數也未達到最大會嘗試添加一個新的線程。如果失敗,說明線程池已關閉或處于飽和狀態,因此拒絕該任務。
         */
         
         //clt記錄著runState和workerCount
        int c = ctl.get();
        /*
         * workerCountOf方法取出低29位的值,表示當前活動的線程數;
         * 如果當前活動線程數小于corePoolSize,則新建一個線程放入線程池中;并把任務添加到該線程中。
         */
        if (workerCountOf(c) < corePoolSize) {
            /*
             * addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷還是maximumPoolSize來判斷
             * 如果為true,根據corePoolSize來判斷;
             * 如果為false,則根據maximumPoolSize來判斷
             */
            if (addWorker(command, true))
                return;
            //如果添加失敗,則重新獲取ctl值    
            c = ctl.get();
        }
        //執行到此處說明從核心線程里給當前任務分配線程失敗
        //如果當前線程池是運行狀態并且任務添加到隊列成功
        if (isRunning(c) && workQueue.offer(command)) {
            //重新獲取ctl值。即使添加隊列成功也要再次檢查,如果不是運行狀態,由于之前已經把任務添加到workerQueue 中了,所以要移除該任務,執行過后通過handler使用拒絕策略對該任務進行處理,整個方法返回
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
                /*
                 * 獲取線程池中的有效線程數,如果數量是0,則執行addWorker方法 這里傳入的參數表示:
                 * 第一個參數為null,表示在線程池中創建一個線程,但不去啟動;
                 * 第二個參數為false,將線程池的有限線程數量的上限設置為maximumPolSize,添加線程時根據maximumPoolSize來判斷;
                 * 如果判斷workerCount大于0,則直接返回,在workQueue中新增的comman 會在將來的某個時刻被執行。
                 */
            //因為 任務已經被添加到workQueue中了,所以worker在執行的時候,會直接從workQueue中 獲取任務。     
            else if (workerCountOf(recheck) == 0)
                //執行到這里說明任務已經添加到阻塞隊列里了,最大線程數也未飽和,則創建一個新的線程去阻塞隊列里拿任務
                //這步操作也就是創建一個線程,但并沒有傳入任務,因為任務已經被添加到workQueue中了,所以worker在執行的時候,會直接從workQueue中 獲取任務。
                //為什么要這樣做呢?是為了保證線程池在RUNNING狀態下必須要有一個線程來執行任務。
                addWorker(null, false);
        }
        /*
         * 如果執行到這里,有兩種情況:
         * 1. 線程池已經不是RUNNING狀態;
         * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize并且workQueue已滿。
         * 這時,再次調用addWorker方法,但第二個參數傳入為false,將線程池的 有限線程數量的上限設置為maximumPoolSize;
         * 如果失敗則拒絕該任務
         */
        else if (!addWorker(command, false))
            reject(command);
    }
addWorker() 方法
/**
 * 檢查是否可以根據當前線程池的狀態添加一個新的工作線程去執行任務。
 * addWorker(runnable,true)表示從核心工作線程數中分配線程執行傳進來的任務;
 * addWorker(null,false)表示從最大線程數中分配線程執行阻塞隊列中的任務。
 * 線程池如果停止或者關閉則直接返回 false,如果線程池創建新線程失敗同樣也會返回 false。
 * 如果創建線程失敗,或者線程工廠返回 null,或者執行當前 addWorker()的線程拋出異常,(注意是當前線程拋出異常,當前線程拋出異常只與當前任務有關,并不影響其他任務的執行),線程池的相關屬性會立即回滾
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 外層 for 循環就是為了能給任務分配線程做準備,判斷狀態--> 原子遞增 workerCount
    // 直到線程池狀態不符合條件返回 false ,或者自增成功跳出 for 循環
    // 同樣的,getTask()從阻塞隊列中獲取任務的時候也是這么個邏輯,先對 workerCount 原子遞減,再去執行任務
    for (;;) {
        //可以看到,每一步操作都會對線程池的狀態參數做判斷
        int c = ctl.get();
        int rs = runStateOf(c);
        
        //也是對線程池狀態,隊列狀態做檢查
        /**
         * 這里的狀態判斷也很好理解:
         * 線程池狀態為SHUTDOWN,不會再接受新的任務了,返回 false
         * 想城池狀態不為SHUTDOWN,傳進來的任務為空,并且阻塞隊列里也沒任務,那還執行個錘子任務,同樣返回 false 
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        //前面已經判斷過滿足為任務分配一個線程去執行任務
        //這個 for 循環就是為了創建任務做準備,先去原子性的遞增 workerCount,workerCount 遞增成功了才會去真正的為任務分配線程去執行
        for (;;) {
            //當前工作線程數
            int wc = workerCountOf(c);
            //當前工作線程數大于corePoolSize 或者 maximumPoolSize (跟誰比較就是根據傳進來的參數 core 判斷),
            //說明也沒有分配的線程可以執行任務了,返回 false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            /** 
             * 執行到這里說明滿足條件了,可以分配出來線程去執行任務了
             * 嘗試增加workerCount,如果成功,則跳出第一個for循環
             * 這里是進行 CAS 自增 ctl 的 workerCount(先把數量自增,再跳出 for 循環創建新的線程去執行任務)
             * 該方法內部也是調用了原子類 AtomicInteger.compareAndSet()方法,保證原子遞增
             */    
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //如果嘗試添加新的工作線程失敗則會繼續判斷當前線程池的狀態,狀態滿足繼續嘗試為當前線程分配工作線程    
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //跳出 for 循環之后說明線程池的工作線程數 workerCount 已經調節過了,接下來要做到就是真正的分配線程,執行任務
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //Worker 對象是個內部類,其實就是用threatFactory 生成一個新的線程
        //繼承 AQS 類,實現Runable 接口,重寫 run()方法,重寫的 run()方法也很重要,后面會講
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //加鎖保證同步
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //還是先進行一通的線程池狀態檢查
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //這個 workers 是個 HashSet,線程池也是通過維護這個 workers 控制任務的執行
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        //largestPoolSize記錄著線程池中出現過的最大線程數量
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //終于,調用線程的 start() 方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //如果創建線程失敗,就要回滾線程池的狀態了
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Worker 類

Worker 類是用來干嘛的,存在的意義

回頭再看 Worker 類,線程池中的每一個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組 Worker對象(HashSet)。

Worker類繼承了AQS,并實現了Runnable接口,注意其中的firstTask和thread屬 性:firstTask用它來保存傳入的任務;thread是在調用構造方法時通過ThreadFactory來創 建的線程,是用來處理任務的線程。

Worker繼承了AQS,使用AQS來實現獨占鎖的功能。為什么不使用ReentrantLock來 實現呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的:

  1. lock方法一旦獲取了獨占鎖,表示當前線程正在執行任務中;

  2. 如果正在執行任務,則不應該中斷線程;

  3. 如果該線程現在不是獨占鎖的狀態,也就是空閑的狀態,說明它沒有在處理任務, 這時可以對該線程進行中斷;

  4. 線程池在執行shutdown方法或tryTerminate方法時會調用interruptIdleWorkers 方法來中斷空閑的線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程 池中的線程是否是空閑狀態;

  5. 之所以設置為不可重入,是因為我們不希望任務在調用像setCorePoolSize這樣的 線程池控制方法時重新獲取鎖。如果使用ReentrantLock,它是可重入的,這樣如果 在任務中調用了如setCorePoolSize這類線程池控制的方法,會中斷正在運行的線程。 所以,Worker繼承自AQS,用于判斷線程是否空閑以及是否可以被中斷。 此外,在構造方法中執行了setState(-1);,把state變量設置為-1,為什么這么做呢? 是因為AQS中默認的state是0,如果剛創建了一個Worker對象,還沒有執行任務時,這時就不應該被中斷,看一下tryAquire方法:

Worker 類中以及涉及到的重要的方法

  • tryAcquire(int unused) 方法

  /**
    * 用于判斷線程是否空閑以及是否可以被中斷
    */
   protected boolean tryAcquire(int unused) {
           //cas 修改狀態,不可重入
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

tryAcquire方法是根據state是否是0來判斷的,所以,將state設置為-1是 為了禁止在執行任務前對線程進行中斷。

正因為如此,在runWorker方法中會先調用Worker對象的unlock方法將state設置為 0。

  • runWorker(Worker w)方法

/**
 * Worker 類實現 Runnable 接口,重寫的 run()方法
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //允許中斷
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //while 循環就是不斷的去執行任務,當自己的任務(firstTask)執行完之后依然會從阻塞隊列里拿任務去執行,就這樣的操作保證了線程的重用
        //task 為空則從阻塞隊列中獲取任務
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            /**
             * 如果線程池正在停止,那么要保證當前線程是中斷狀態,如果不是的話,則要保證當前線程不是中斷狀態
             * 這里為什么要這么做呢?考慮在執行該if語句期間可能也執行了shutdownNow方法,shutdownNow方法會 把狀態設置為STOP,
             * 回顧一下STOP狀態:不能接受新任務,也不處理隊列中的任務,會中斷正在處理任務的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態時,
             * 調用 shutdownNow() 方法會使線程池進入到STOP狀態。
             * STOP狀態要中斷線程池中的所有線程,而這里使用Thread.interrupted()來判斷是否中斷是為了確保在 RUNNING或者SHUTDOWN狀態時線程是非中斷狀態的,
             * 因為Thread.interrupted()方法會重置中斷的狀態。
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

總結一下runWorker方法的執行過程:

  1. while循環不斷地通過getTask()方法獲取任務;

  2. getTask()方法從阻塞隊列中取任務;

  3. 如果線程池正在停止,那么要保證當前線程是中斷狀態,否則要保證當前線程不是 中斷狀態;

  4. 調用task.run()執行任務;

  5. 如果task為null則跳出循環,執行processWorkerExit()方法;

  6. runWorker方法執行完畢,也代表著Worker中的run方法執行完畢,銷毀線程。

  • getTask()方法

/**
 * 從阻塞隊列中獲取任務,返回值是 Runnable
 * 線程池狀態不滿足執行條件時直接返回 null
 */
private Runnable getTask() {
    //timeOut變量的值表示上次從阻塞隊列中取任務時是否超時
    boolean timedOut = false; // Did the last poll() time out?
    
    //這里兩個 for 循環操作和 addWorker() 方法里的兩個 for 循環操作思想一樣
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        //仍然檢查線程池狀態
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        //timed變量用于判斷是否需要進行超時控制
        //allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時
        //wc > corePoolSize,表示當前線程池中的線程數量大于核心線程數量
        //對于超過核心線程數量的這些線程,需要進行超時控制
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //和 addWorker() 里流程一樣,也是先對線程池中 workerCount 進行控制,再進行后面的執行任務操作
        //滿足條件則 workerCount 數量減一
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //根據timed來判斷,如果為true,則通過阻塞隊列的poll方法進行超時控制,如果在keepAliveTime時間內沒有獲取到任務,則返回null
            //否則通過take方法,如果這時隊列為空,則take方法會阻塞直到隊列不為空
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 如果 r == null,說明已經超時,timedOut設置為true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果獲取任務時當前線程發生了中斷,則設置timedOut為false并返回
            timedOut = false;
        }
    }
}
processWorkerExit() 
/**
 * getTask方法返回null時,在runWorker方法中會跳出while循環,然后會執行 processWorkerExit方法。
 * 做線程池的善后工作
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //如果completedAbruptly值為true,則說明線程執行時出現了異常,需要將workerCount減1
    //如果線程執行時沒有出現異常,說明在getTask()方法中已經已經對workerCount減1了,這里就不需要再減了
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //統計完成的任務數
        completedTaskCount += w.completedTasks;
        // 從workers中移除,也就表示著從線程池中移除了一個工作線程
        // workers 是前面提到的 HashSet,線程池就是通過維護這個 worker()來保證線程池運作的 
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 根據線程池狀態進行判斷是否結束線程池
    tryTerminate();


    /**
     * 當線程池是RUNNING或SHUTDOWN狀態時,如果worker是異常結束,那么會直接addWorker;
     * 如果allowCoreThreadTimeOut=true,并且等待隊列有任務,至少保留一個worker
     * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize
     */
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

至此,processWorkerExit執行完之后,工作線程被銷毀,以上就是整個工作線程的生 命周期,從execute方法開始,Worker使用ThreadFactory創建新的工作線程, runWorker通過getTask獲取任務,然后執行任務,如果getTask返回null,進入 processWorkerExit方法,整個線程結束

四、思考

  1. 線程池如何實現線程重用的?

就是在重寫的 run()方法里,通過 while 循環,執行完 firstTask 之后依然從阻塞隊列里獲取任務去執行。

  1. 線程超時怎么處理?

當前面任務拋出異常,后面的線程還會執行嗎? 答案是會。也是 while 循環里找答案,當前線程拋出異常只會對當前線程產生影響,對線程池里其他任務不會有影響。

  1. 什么時候會銷毀?

是runWorker方法執行完之后,也就是Worker中的run方法執行完,由JVM自動回收。

  1. 阻塞隊列選取?在JDK中提供了如下阻塞隊列:

  • ArrayBlockingQueue:基于數組結構的有界阻塞隊列,按FIFO排序任務;

  • LinkedBlockingQuene:基于鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高于ArrayBlockingQuene;

  • SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態,吞吐量通常要高于 LinkedBlockingQuene;

  • PriorityBlockingQuene:具有優先級的無界阻塞隊列;

  1. 丟棄策略選取?線程池提供了4種策略:

  • AbortPolicy:直接拋出異常,默認策略;

  • CallerRunsPolicy:用調用者所在的線程來執行任務;

  • DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,并執行當前任務;

  • DiscardPolicy:直接丟棄任務;

  1. 線程數如何設置?

  • 一般設法是會根據我們任務的類型去設置,簡單分為: CPU 密集型 :CPU 核數 + 1 IO 密集型:2*CPU 核數 + 1

《Java并發編程實戰》中最原始的公式是這樣的: Nthreads=Ncpu?Ucpu?(1+WC)Nthreads=Ncpu?Ucpu?(1+CW);

  • Ncpu代表CPU的個數,

  • Ucpu代表CPU利用率的期望值(0<Ucpu<10<Ucpu<1),

  • WCCW仍然是等待時間與計算時間的比例。

上面提供的公式相當于目標CPU利用率為100%。 通常系統中不止一個線程池,所以實際配置線程數應該將目標CPU利用率計算進去。

關于如何進行ThreadPoolExecutor 源碼解析問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

营山县| 集安市| 扎兰屯市| 金坛市| 云林县| 中江县| 修武县| 乌兰县| 万盛区| 达拉特旗| 革吉县| 重庆市| 海林市| 清水县| 图们市| 涟源市| 岑巩县| 陕西省| 定兴县| 偏关县| 利辛县| 全椒县| 青浦区| 扶绥县| 沅江市| 阜南县| 正镶白旗| 介休市| 正安县| 青州市| 淮阳县| 安阳县| 徐州市| 彩票| 全州县| 长垣县| 西宁市| 民县| 玛曲县| 松阳县| 东兰县|