您好,登錄后才能下訂單哦!
這篇文章主要介紹“怎么解析ThreadPoolExecutor”,在日常操作中,相信很多人在怎么解析ThreadPoolExecutor問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么解析ThreadPoolExecutor”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
為什么要用線程池
你有沒有這樣的疑惑,為什么要用線程池呢?可能你會說,我可以復用已經創建的線程呀;線程是個重量級對象,為了避免頻繁創建和銷毀,使用線程池來管理最好了。
沒毛病,各位都很懂哈~
不過使用線程池還有一個重要的點:可以控制并發的數量。如果并發數量太多了,導致消耗的資源增多,直接把服務器給搞趴下了,肯定也是不行的
繞不過去的幾個參數
提到 ThreadPoolExecutor 那么你的小腦袋肯定會想到那么幾個參數,咱們來瞅瞅源碼(我就直接放有 7 個參數的那個方法了):
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
咱們分別來看:
corePoolSize :
核心線程數,在線程池中有兩種線程,核心線程和非核心線程。在線程池中的核心線程,就算是它什么都不做,也會一直在線程池中,除非設置了 allowCoreThreadTimeOut 參數
maximumPoolSize:
線程池能夠創建的最大線程數。這個值 = 核心線程數 + 非核心線程數
keepAliveTime & unit :
線程池是可以撤銷線程的,那么什么時候撤銷呢?一個線程如果在一段時間內,都沒有執行任務,那說明這個線程很閑啊,那是不是就可以把它撤銷掉了?
所以呢,如果一個線程不是核心線程,而且在 keepAliveTime & unit 這段時間內,還沒有干活,那么很抱歉,只能請你走人了 核心線程就算是很閑,也不會將它從線程池中清除,沒辦法誰讓它是 core 線程呢~
workQueue :
工作隊列,這個隊列維護的是等待執行的 Runnable 任務對象
常用的幾個隊列:LinkedBlockingQueue , ArrayBlockingQueue , SynchronousQueue , DelayQueue
大廠的編碼規范,相信各位都知道,并不建議使用 Executors ,最重要的一個原因就是:Executors 提供的很多方法默認使用的都是無界的 LinkedBlockingQueue ,在高負載情況下,無界隊列很容易就導致 OOM ,而 OOM 會讓所有請求都無法處理,所以在使用時,強烈建議使用有界隊列,因為如果你使用的是有界隊列的話,當線程數量太多時,它會走拒絕策略
threadFactory :
創建線程的工廠,用來批量創建線程的。如果不指定的話,就會創建一個默認的線程工廠
handler :
拒絕處理策略。在 workQueue 那里說了,如果使用的是有界隊列,那么當線程數量大于最大線程數的時候,拒絕處理策略就起到作用了
常用的有四種處理策略:
- AbortPolicy :默認的拒絕策略,會丟棄任務并拋出 RejectedExecutionException 異常- CallerRunsPolicy :提交任務的線程,自己去執行這個任務- DiscardOldestPolicy :直接丟棄新來的任務,也沒有任何異常拋出- DiscardOldestPolicy :丟棄最老的任務,然后將新任務加入到工作隊列中
默認拒絕策略是 AbortPolicy ,會 throw RejectedExecutionException 異常,但是這是一個運行時異常,對于運行時異常編譯器不會強制 catch 它,所以就會比較容易忽略掉錯誤。
所以,如果線程池處理的任務非常重要,盡量自定義自己的拒絕策略
線程池的幾個狀態
在源碼中,能夠清楚地看到線程池有 5 種狀態:
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
同時,使用 AtomicInteger 修飾的變量 ctl 來控制線程池的狀態,而 ctl 保存了 2 個變量:一個是 rs 即 runState ,線程池的運行狀態;一個是 wc 即 workerCount ,線程池中活動線程的數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int ctlOf(int rs, int wc) { return rs | wc; }
線程池創建之后就處于 RUNNING 狀態
調用 shutdown() 方法之后處于 SHUTDOWN 狀態,此時線程池不再接受新的任務,清除一些空閑 worker ,等待阻塞隊列的任務完成
調用 shutdownNow() 方法后處于 STOP 狀態,此時線程池不再接受新的任務,中斷所有的線程,阻塞隊列中沒有被執行的任務也會被全部丟棄
當線程池中執行的任務為空時,也就是此時 ctl 的值為 0 時,線程池會變為 TIDYING 狀態,接下來會執行 terminated() 方法
執行完 terminated() 方法之后,線程池的狀態就由 TIDYING 轉到 TERMINATED 狀態
懵了?別急,有張圖呢~
線程池處理任務
execute
做到線程復用,肯定要先 execute 起來吧
線程池處理任務的核心方法是 execute ,大概思路就是:
如果 command 為 null ,沒啥說的,直接拋出異常就完事兒了
如果當前線程數小于 corePoolSize ,會新建一個核心線程執行任務
如果當前線程數不小于 corePoolSize ,就會將任務放到隊列中等待,如果任務排隊成功,仍然需要檢查是否應該添加線程,所以需要重新檢查狀態,并且在必要時回滾排隊;如果線程池處于 running 狀態,但是此時沒有線程,就會創建線程
如果沒有辦法給任務排隊,說明這個時候,緩存隊列滿了,而且線程數達到了 maximumPoolSize 或者是線程池關閉了,系統沒辦法再響應新的請求,此時會執行拒絕策略
來瞅瞅源碼具體是如何處理的:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 當前線程數小于 corePoolSize 時,調用 addWorker 創建核心線程來執行任務 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 當前線程數不小于 corePoolSize ,就將任務添加到 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { // 獲取到當前線程的狀態,賦值給 recheck ,是為了重新檢查狀態 int recheck = ctl.get(); // 如果 isRunning 返回 false ,那就 remove 掉這個任務,然后執行拒絕策略,也就是回滾重新排隊 if (! isRunning(recheck) && remove(command)) reject(command); // 線程池處于 running 狀態,但是沒有線程,那就創建線程執行任務 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果放入 workQueue 失敗,嘗試通過創建非核心線程來執行任務 // 如果還是失敗,說明線程池已經關閉或者已經飽和,會拒絕執行該任務 else if (!addWorker(command, false)) reject(command); }
在上面源碼中,判斷了兩次線程池的狀態,為什么要這么做呢?
這是因為在多線程環境下,線程池的狀態是時刻發生變化的,可能剛獲取線程池狀態之后,這個狀態就立刻發生了改變.如果沒有二次檢查的話,線程池處于非 RUNNING 狀態時, command 就永遠不會執行
有點兒懵?阿粉都懂你,一張圖走起~
addWorker
從上面能夠看出來,主要是 addWorker 方法
addWorker 主要是用來創建核心線程的,它主要的實現邏輯是:
判斷線程數量有沒有超過規定的數量,如果超過了就返回 false
如果沒有超過,就會創建 worker 對象,并初始化一個 Thread 對象,然后啟動這個線程對象
接下來瞅瞅源碼:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 線程池狀態 >= SHUTDOWN 時,不再接受新的任務,直接返回 false // 如果 rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty() 同樣不接受新的任務,返回 false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // wc >= CAPACITY 說明線程數不夠,所以就返回 false // wc >= (core ? corePoolSize : maximumPoolSize) 是在做判斷 // 如果 core 為 true ,說明要創建的線程是核心線程,接下來判斷 wc 是否大于 核心線程數 ,如果大于返回 false // 如果 core 為 false ,說明要創建的線程是非核心線程,接下來判斷 wc 是否大于 最大線程數 ,如果大于返回 false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS 操作增加 workerCount 的值,如果成功跳出循環 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 判斷線程池狀態有沒有變化,如果有變化,則重試 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // workerCount 增加成功之后開始走下面的代碼 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 創建一個 worker 對象 w = new Worker(firstTask); // 實例化一個 Thread 對象 final Thread t = w.thread; if (t != null) { // 接下來的操作需要加鎖進行 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 將任務線程添加到線程池中 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 啟動任務線程,開始執行任務 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 如果任務線程啟動失敗調用 addWorkerFailed // addWorkerFailed 方法里面主要做了兩件事:將該線程從線程池中移除;將 workerCount 的值減 1 addWorkerFailed(w); } return workerStarted; }
Worker 類
在 addWorker 中,主要是由 Worker 類去做一些相應處理, worker 繼承 AQS ,實現 Runnable 接口
線程池維護的是 HashSet,一個由 worker 對象組成的 HashSet
private final HashSet<Worker> workers = new HashSet<Worker>();
worker 繼承 AQS 主要是利用 AQS 獨占鎖機制,來標識線程是否空閑;另外, worker 還實現了 Runnable 接口,所以它本身就是一個線程任務,在構造方法中創建了一個線程,線程的任務就是自己 this。thread = getThreadFactory().newThread(this);
咱們瞅瞅里面的源碼:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; // 處理任務的線程 final Thread thread; // worker 傳入的任務 Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { // 將 state 設為 -1 ,避免 worker 在執行前被中斷 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 創建一個線程,來執行任務 this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
runWorker
worker 類在執行 run 方法時,實際上調用的是 runWorker 方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 允許中斷 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 判斷 task 是否為空,如果不為空直接執行 // 如果 task 為空,調用 getTask() 方法,從 workQueue 中取出新的 task 執行 while (task != null || (task = getTask()) != null) { // 加鎖,防止被其他線程中斷 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 檢查線程池的狀態,如果線程池處于 stop 狀態,則需要中斷當前線程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 執行 beforeExecute beforeExecute(wt, task); Throwable thrown = null; try { // 執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 執行 afterExecute 方法 afterExecute(task, thrown); } } finally { // 將 task 設置為 null ,循環操作 task = null; w.completedTasks++; // 釋放鎖 w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
在 runWorker 方法中,首先會去執行創建這個 worker 時就有的任務,當執行完這個任務之后, worker 并不會被銷毀,而是在 while 循環中, worker 會不斷的調用 getTask 方法從阻塞隊列中獲取任務然后調用 task。run() 來執行任務,這樣就達到了復用線程的目的。通過循環條件 while (task != null || (task = getTask()) != null) 可以看出,只要 getTask 方法返回值不為 null ,就會一直循環下去,這個線程也就會一直在執行,從而達到了線程復用的目的
getTask
咱們來看看 getTask 方法的實現:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // allowCoreThreadTimeOut 變量默認為 false ,也就是核心線程就算是空閑也不會被銷毀 // 如果為 true ,核心線程在 keepAliveTime 內是空閑的,就會被銷毀 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果運行線程數大于最大線程數,但是緩存隊列已經空了,此時遞減 worker 數量 // 如果有設置允許線程超時或者線程數量超過了核心線程數量,并且線程在規定時間內沒有 poll 到任務并且隊列為空,此時也遞減 worker 數量 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 如果 timed 為 true ,會調用 workQueue 的 poll 方法 // 超時時間為 keepAliveTime ,如果超過 keepAliveTime 時長的話, poll 就會返回 null // 如果返回為 null ,在 runWorker 中 // while (task != null || (task = getTask()) != null) 循環條件被打破,從而跳出循環,此時線程執行完畢 // 如果 timed 為 false ( allowCoreThreadTimeOut 為 false ,并且 wc > corePoolSize 為 false ) // 會調用 workQueue 的 take 方法阻塞到當前 // 當隊列中有任務加入時,線程被喚醒, take 方法返回任務,開始執行 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
源碼分析到這里就差不多清楚了
線程復用主要體現在 runWorker 方法中的 while 循環中,在 while 循環里面, worker 會不斷的調用 getTask 方法,而在 getTask 方法里,如果任務隊列中沒有了任務,此時如果線程是核心線程則會一直卡在 workQueue。take 方法,這個時候會被阻塞并掛起,不會占用 CPU 資源,直到拿到任務然后返回 true , 此時 runWorker 中得到這個任務來繼續執行任務,從而實現了線程復用。
到此,關于“怎么解析ThreadPoolExecutor”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。