您好,登錄后才能下訂單哦!
本篇內容主要講解“java線程池源碼分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“java線程池源碼分析”吧!
當提交一個任務時,如果當前線程數小于corePoolSize,就會創建一個線程。即使其他有可用的空閑線程。
用于保存等待執行的任務的阻塞隊列。 可以選擇以下幾個阻塞隊列:
1.ArrayBlockingQueue 是一個基于數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
2.LinkedBlockingQueue 一個基于鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
3.SynchronousQueue 一個不存儲元素的阻塞隊列。每個插入操作必須等上一個元素被移除之后,否則插入操作一直處于阻塞狀態,吞吐量通常要高于LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
4.PriorityBlockingQueue 一個具有優先級的無限阻塞隊列。
不同的runnableTaskQueue對線程池運行邏輯有很大影響
線程池允許創建的最大線程數。如果隊列滿了,并且已創建的線程數小于最大線程數,則線程池會再創建新的線程執行任務。值得注意的是如果使用了無界的任務隊列這個參數就沒什么效果。
線程執行結束后,保持存活的時間。 當線程數大于核心時,此為終止前多余的空閑線程等待新任務的最長時間。
用于設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字。
線程池隊列飽和之后的執行策略,默認是采用AbortPolicy。JDK提供四種實現方式:
AbortPolicy:直接拋出異常
CallerRunsPolicy :只用調用者所在線程來運行任務
DiscardOldestPolicy 丟棄隊列里最近的一個任務,并執行當前任務
DiscardPolicy : 不處理,丟棄掉
keepalive的時間單位,可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
我們來看看 Executors.newCachedThreadPool() 里面的構造:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
corePoolSize 為 0,意味著核心線程數是 0。
maximumPoolSize 是 Integer.MAX_VALUE ,意味這可以一直往線程池提交任務,不會執行 reject 策略。
keepAliveTime 和 unit 決定了線程的存活時間是 60s,意味著一個線程空閑60s后才會被回收。
reject 策略是默認的 AbortPolicy,當線程池超出最大限制時拋出異常。不過這里 CacheThreadPool 的沒有最大線程數限制,所以 reject 策略沒用。
runnableTaskQueue 是 SynchronousQueue。該隊列的特點是一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態。使用該隊列是實現 CacheThreadPool 的關鍵之一。
SynchronousQueue 的詳細原理參考這里
我們看看 CacheThreadPool 的注釋介紹,大意是說當有任務提交進來,會優先使用線程池里可用的空閑線程來執行任務,但是如果沒有可用的線程會直接創建線程。空閑的線程會保留 60s,之后才會被回收。這些特性決定了,當需要執行很多短時間的任務時,CacheThreadPool 的線程復用率比較高, 會顯著的提高性能。而且線程60s后會回收,意味著即使沒有任務進來,CacheThreadPool 并不會占用很多資源。
那么問題來了:
CacheThreadPool 如何實現線程保留60s。
CacheThreadPool 如何實現線程復用。
首先我們向線程池提交任務一般用 execute() 方法,我們就從這里入手:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 1.返回包含線程數以及線程狀態Integer類型的數值 int c = ctl.get(); // 如果工作線程數小于核心線程數,則創建線程并執行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 如果失敗,防止外部已經在線程池中加入新任務,重新獲取下 c = ctl.get(); } // 2.只有線程處于RUNNING狀態,才執行后半句:置入隊列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果線程池不是RUNNING狀態,則將剛加入的移除 if (! isRunning(recheck) && remove(command)) reject(command); // 如果之前的線程已經被消費完,則新建一個線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); // 沒有被消費完,只將任務放入隊列 } // 3.如果task不能加入到隊列,會嘗試創建一個新線程。 else if (!addWorker(command, false)) // 如果創建失敗,走reject流程 reject(command);
第一步比較簡單,如果當前運行的線程少于核心線程,調用 addWorker(),創建一個線程。但是因為 CacheThreadPool 的 corePoolSize 是0,所以會跳過這步,并不會創建核心線程。
關鍵在第二步,首先判斷了線程池是否運行狀態,緊接著調用 workQueue.offer() 往對列添加 task 。 workQueue 是一個 BlockingQueue ,我們知道 BlockingQueue.offer() 方法是向隊列插入元素,如果成功返回 true ,如果隊列沒有可用空間返回 false 。
CacheThreadPool 用的是 SynchronousQueue ,前面了解過 SynchronousQueue 的特性,添加到 SynchronousQueue 的元素必須被其他線程取出,才能塞入下一個元素。等會我們再來看看哪里是從 SynchronousQueue 取出元素。
這里當任務入隊列成功后,再次檢查了線程池狀態,還是運行狀態就繼續。然后檢查當前運行線程數量,如果當前沒有運行中的線程,調用 addWorker() ,第一個參數為 null 第二個參數是 false ,標明了非核心線程。
為什么這里 addWorker() 第一個方法要用null?帶著這個疑問,我們來看看 addWorker() 方法:
大概翻譯了下
檢查是否可以添加新 worker ,在線程池狀態和給定的邊界(核心數或最大數)。
如果可以,則計數線程數,并且創建并啟動新工作程序,以firstTask作為其運行第一項任務。
如果池已停止或有資格關閉,則此方法返回false。
如果線程工廠在詢問時無法創建線程,它也會返回false。
如果線程創建失敗,則由于線程工廠返回null,或者由于異常(通常是Thread.start()中的OutOfMemoryError),我們干凈地回滾。
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * 使用 corePoolSize 綁定做校驗為 true,maximumPoolSize 綁定做校驗為 false, * @param core if true use corePoolSize as bound, else maximumPoolSize. * * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) { // continue retry 快速推多層循環嵌套 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 當前線程數量+1 if (compareAndIncrementWorkerCount(c)) break retry; // 獲取當前線程數 c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 創建線程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加鎖。持有主鎖防止干擾。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); // 將任務包裝成 worker 對象,用線程安全的方式添加到當前工作 HashSet()里 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 線程 start 并執行 run方法處理 runWorker() 執行 task t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 創建失敗減去線程數 addWorkerFailed(w); } return workerStarted; }
源代碼比較長,這里省略了一部分。過程主要分成兩步, 第一步是一段 cas 代碼通過雙重循環檢查狀態并為當前線程數擴容 +1, 第二部是將任務包裝成 worker 對象,用線程安全的方式添加到當前工作 HashSet() 里,并開始執行線程。 終于讀到線程開始執行的地方了,里程碑式的勝利啊同志們!
但是我們注意到,task 為 null ,Worker 里面的 firstTask 是 null ,那么 wokrer thread 里面是怎么工作下去的呢?
繼續跟蹤代碼,Worker 類繼承 Runnable 接口,因此 worker thread start 后,走的是 worker.run()方法:
public void run() { runWorker(this); }
繼續進入 runWorker() 方法:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 獲取task Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { // getTask() 獲取任務 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 退出自旋,進入finally代碼塊。調用processWorkerExit方法,注銷當前Worker,實現worker的銷毀 processWorkerExit(w, completedAbruptly); } }
可以看到這里判斷了 firstTask 如果為空,就調用 getTask() 方法。getTask() 方法是從 workQueue 拉取任務。 所以到這里之前的疑問就解決了,調用 addWorker(null,false) 的目的是啟動一個線程,然后再 workQueue 拉取任務執行。
繼續跟蹤 getTask() 方法:
private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 當allowCoreThreadTimeout(運行空閑核心線程超時) // 或 wc>corePoolSize(當前線程數量大于核心線程數量) 時,timed會標識為true,表示需要進行超時判斷。 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 當wc(當前工作者數量)大于 最大線程數 或 空閑線程的空閑時間大于keepAliveTime(timed && timeout), // 以及wc>1或(workQueue)任務隊列為空時,會進入compareAndDecrementWorkerCount方法,對wc的值減1。 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 當compareAndDecrementWorkerCount方法返回true時,則getTask方法會返回null,終止getTask方法的自旋。 // 這時候回到runWorker方法,就會進入到processWorkerExit方法,進行銷毀worker。 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // timed 為 true 時,進行poll處理,超時后線程就會會被回收 Runnable r = timed ? // poll(time):取走BlockingQueue里排在首位的對象, // 若不能立即取出,則可以等time參數規定的時間,取不到時返回null workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // take():取走BlockingQueue里排在首位的對象, // 若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止 workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
終于看到從 workQueue 拉取元素了。
CacheThreadPool 構造的時候 corePoolSize 是 0,allowCoreThreadTimeOut 默認是 false ,因此 timed 一直為 true ,會調用 workQueue.poll() 從隊列拉取一個任務,等待 60s, 60s后超時,線程就會會被回收。
如果 60s 內,進來一個任務,會發生什么情況?任務在 execute() 方法里,會被 offer() 進 workQueue ,因為目前隊列是空的,所以 offer 進來后,馬上會被阻塞的 worker.poll() 拉取出來,然后在 runWorker() 方法里執行,因為線程沒有新建所以達到了線程的復用。
至此,我們已經明白了線程復用的秘密,以及線程保留 60s 的實現方法。回到 execute() 方法,還有剩下一個邏輯 如果task不能加入到隊列,會嘗試創建線程。如果創建失敗,走reject流程
else if (!addWorker(command, false)) reject(command);
因為 CacheThreadPool 用的 SynchronousQueue ,所以沒有空閑線程, SynchronousQueue 有一個元素正在被阻塞,那么就不能加入到隊列里。會走到 addWorker(commond,false) 這里,這個時候因為就會新建線程來執行任務。如果 addWorker() 返回 false 才會走 reject 策略。
那么什么時候 addWorker() 什么時候會返回false呢?我們看代碼:
private boolean addWorker(Runnable firstTask, boolean core){ retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 1.線程池已經shutdown,或者提交進來task為ull且隊列也是空,返回false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 2.如果需要創建核心線程但是當前線程已經大于corePoolSize 返回false, // 如果是非核心線程但是已經超出maximumPoolSize,返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; //省略代碼。。。 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); //省略代碼。。。 } } } //省略代碼。。。 }
addWorker() 有以下情況會返回 false :
線程池已經 shutdown,或者提交進來 task 為ull且同時任務隊列也是空,返回 false。
如果需要創建核心線程但是當前線程已經大于 corePoolSize 返回 false,
如果是非核心線程但是已經超出 maximumPoolSize ,返回 false。
創建線程后,檢查是否已經啟動。
我們逐條檢查。 第一點只有線程池被 shutDown() 才會出現。 第二點由于 CacheThreadPool 的 corePoolSize 是 0 , maximumPoolSize 是 Intger.MAX_VALUE ,所以也不會出現。 第三點是保護性錯誤,我猜因為線程允許通過外部的 ThreadFactory 創建,所以檢查了一下是否外部已經 start,如果開發者編碼規范,一般這種情況也不會出現。
綜上,在線程池沒有 shutDown 的情況下,addWorker() 不會返回 false ,不會走reject流程,所以理論上 CacheThreadPool 可以一直提交任務,符合CacheThreadPool注釋里的描述。
Executors 還提供了這么一個方法 Executors.newFixedThreadPool(4) 來創建一個有固定線程數量的線程池,我們看看創建的參數:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor( nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
參數中核心線程和最大線程一樣,線程保留時間 0 ,使用 LinkedBlockingQueue 作為任務隊列,這樣的線程池有什么樣的特性呢?我們看看注釋說明,大意是說這是一個有著固定線程數量且使用無界隊列作為線程隊列的線程池。如果有新的任務提交,但是沒有線程可用,這個任務會一直等待直到有可用的線程。如果一個線程因為異常終止了,當線程不夠用的時候會再創建一個出來。線程會一直保持,直到線程池 shutDown。
和 CacheThreadPool 相比,FixedThreadPool 注釋里描述的特性有幾個不同的地方。
因為 corePoolSize == maximumPoolSize ,所以FixedThreadPool只會創建核心線程。
在 getTask() 方法,如果隊列里沒有任務可取,線程會一直阻塞在 LinkedBlockingQueue.take() ,線程不會被回收。
由于線程不會被回收,會一直卡在阻塞,所以沒有任務的情況下, FixedThreadPool 占用資源更多。
FixedThreadPool 和 CacheThreadPool 也有相同點,都使用無界隊列,意味著可用一直向線程池提交任務,不會觸發 reject 策略。
到此,相信大家對“java線程池源碼分析”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。