您好,登錄后才能下訂單哦!
本篇內容主要講解“怎么深入理解線程池”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“怎么深入理解線程池”吧!
本文將會從以下幾個方面來介紹線程池的原理。
為什么要用線程池
線程池是如何工作的
線程池提交任務的兩種方式
ThreadPoolExecutor 源碼剖析
解答開篇的問題
線程池的最佳實踐
總結
相信大家看完對線程池的理解會更進一步,肝文不易,看完別完了三連哦。
為什么要用線程池
在上文也提到過,創建線程有三大開銷,如下:
1、其實 Java 中的線程模型是基于操作系統原生線程模型實現的,也就是說 Java 中的線程其實是基于內核線程實現的,線程的創建,析構與同步都需要進行系統調用,而系統調用需要在用戶態與內核中來回切換,代價相對較高,線程的生命周期包括「線程創建時間」,「線程執行任務時間」,「線程銷毀時間」,創建和銷毀都需要導致系統調用。2、每個 Thread 都需要有一個內核線程的支持,也就意味著每個 Thread 都需要消耗一定的內核資源(如內核線程的棧空間),因此能創建的 Thread 是有限的,默認一個線程的線程棧大小是 1 M,有圖有真相
圖中所示,在 Java 8 下,創建 19 個線程(thread #19)需要創建 19535 KB,即 1 M 左右,reserved 代表如果創建 19 個線程,操作系統保證會為其分配這么多空間(實際上并不一定分配),committed 則表示實際已分配的空間大小。
畫外音:注意,這是在 Java 8 下的線程占用空間情況,但在 Java 11 中,對線程作了很大的優化,創建一個線程大概只需要 40 KB,空間消耗大大減少
3、線程多了,導致不可忽視的上下文切換開銷。
由此可見,線程的創建是昂貴的,所以必須以線程池的形式來管理這些線程,在線程池中合理設置線程大小和管理線程,以達到以合理的創建線程大小以達到最大化收益,最小化風險的目的,對于開發人員來說,要完成任務不用關心線程如何創建,如何銷毀,如何協作,只需要關心提交的任務何時完成即可,對線程的調優,監控等這些細枝末節的工作通通交給線程池來實現,所以也讓開發人員得到極大的解脫!
類似線程池的這種池化思想應用在很多地方,比如數據庫連接池,Http 連接池等,避免了昂貴資源的創建,提升了性能,也解放了開發人員。
ThreadPoolExecutor 設計架構圖
首先我們來看看 Executor 框架的設計圖
Executor: 最頂層的 Executor 接口只提供了一個 execute 接口,實現了提交任務與執行任務的解藕,這個方法是最核心的,也是我們源碼剖析的重點,此方法最終是由 ThreadPoolExecutor 實現的,
ExecutorService 擴展了 Executor 接口,實現了終止執行器,單個/批量提交任務等方法
AbstractExecutorService 實現了 ExecutorService 接口,實現了除 execute 以外的所有方法,只將一個最重要的 execute 方法交給 ThreadPoolExecutor 實現。
這樣的分層設計雖然層次看起來挺多,但每一層每司其職,邏輯清晰,值得借鑒。
線程池是如何工作的
首先我們來看下如何創建一個線程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 20, 600L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(4096), new NamedThreadFactory("common-work-thread")); // 設置拒絕策略,默認為 AbortPolicy threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
看下其構造方法簽名如下
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 省略代碼若干 }
要理解這些參數具體代表的意義,必須清楚線程池提交任務與執行任務流程,如下
圖片來自美團技術團隊
步驟如下
1、corePoolSize:如果提交任務后線程還在運行,當線程數小于 corePoolSize 值時,無論線程池中的線程是否忙碌,都會創建線程,并把任務交給此新創建的線程進行處理,如果線程數少于等于 corePoolSize,那么這些線程不會回收,除非將 allowCoreThreadTimeOut 設置為 true,但一般不這么干,因為頻繁地創建銷毀線程會極大地增加系統調用的開銷。
2、workQueue:如果線程數大于核心數(corePoolSize)且小于最大線程數(maximumPoolSize),則會將任務先丟到阻塞隊列里,然后線程自己去阻塞隊列中拉取任務執行。
3、maximumPoolSize: 線程池中最大可創建的線程數,如果提交任務時隊列滿了且線程數未到達這個設定值,則會創建線程并執行此次提交的任務,如果提交任務時隊列滿了但線池數已經到達了這個值,此時說明已經超出了線池程的負載能力,就會執行拒絕策略,這也好理解,總不能讓源源不斷地任務進來把線程池給壓垮了吧,我們首先要保證線程池能正常工作。
4、RejectedExecutionHandler:一共有以下四種拒絕策略
AbortPolicy:丟棄任務并拋出異常,這也是默認策略;
CallerRunsPolicy:用調用者所在的線程來執行任務,所以開頭的問題「線程把任務丟給線程池后肯定就馬上返回了?」我們可以回答了,如果用的是 CallerRunsPolicy 策略,提交任務的線程(比如主線程)提交任務后并不能保證馬上就返回,當觸發了這個 reject 策略不得不親自來處理這個任務。
DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,并執行當前任務。
DiscardPolicy:直接丟棄任務,不拋出任何異常,這種策略只適用于不重要的任務。
5、keepAliveTime: 線程存活時間,如果在此時間內超出 corePoolSize 大小的線程處于 idle 狀態,這些線程會被回收
6、threadFactory:可以用此參數設置線程池的命名,指定 defaultUncaughtExceptionHandler(有啥用,后文闡述),甚至可以設定線程為守護線程。
現在問題來了,該如何合理設置這些參數呢。
首先來看線程大小設置
<<Java 并發編程實戰>>告訴我們應該分兩種情況
1. 針對 CPU 密集型的任務,在有 Ncpu個處理器的系統上,當線程池的大小為 Ncpu + 1 時,通常能實現最優的利用率,+1 是因為當計算密集型線程偶爾由于缺頁故障或其他原因而暫停工作時,這個"額外"的線程也能確保 CPU 的時鐘周期不會被浪費,所謂 CPU 密集,就是線程一直在忙碌,這樣將線程池的大小設置為 Ncpu + 1 避免了線程的上下文切換,讓線程時刻處于忙碌狀態,將 CPU 的利用率最大化。
2. 針對 IO 密集型的任務,它也給出了如下計算公式
這些公式看看就好,實際的業務場景中基本用不上,這些公式太過理論化了,脫離業務場景,僅可作個理論參考,舉個例子,你說 CPU 密集型任務設置線程池大小為 N + 1個,但實際上在業務中往往不只設置一個線程池,這種情況套用的公式就懵逼了
再來看 workQueue 的大小設置
由上文可知,如果最大線程大于核心線程數,當且僅當核心線程滿了且 workQueue 也滿的情況下,才會新增新的線程,也就是說如果 workQueue 是無界隊列,那么當線程數增加到 corePoolSize 后,永遠不會再新增新的線程了,也就是說此時 maximumPoolSize 的設置就無效了,也無法觸發 RejectedExecutionHandler 拒絕策略,任務只會源源不斷地填充到 workQueue,直到 OOM。
所以 workQueue 應該為有界隊列,至少保證在任務過載的情況下線程池還能正常工作,那么哪些是有有界隊列,哪些是無界隊列呢。
有界隊列我們常用的以下兩個
LinkedBlockingQueue: 鏈表構成的有界隊列,按先進先出(FIFO)的順序對元素進行排列,但注意在創建時需指定其大小,否則其大小默認為 Integer.MAX_VALUE,相當于無界隊列了
ArrayBlockingQueue: 數組實現的有界隊列,按先進先出(FIFO)的順序對元素進行排列。
無界隊列我們常用 PriorityBlockingQueue 這個優先級隊列,任務插入的時候可以指定其權重以讓這些任務優先執行,但這個隊列很少用,原因很簡單,線程池里的任務執行順序一般是平等的,如果真有必須某些類型的任務需要優先執行,大不了再開個線程池好了,將不同的任務類型用不同的線程池隔離開來,也是合理利用線程池的一種實踐。
說到這我相信大家應該能回答開頭的問題「阿里 Java 代碼規范為什么不允許使用 Executors 快速創建線程池?」,最常見的是以下兩種創建方式
image-20201109002227476
newCachedThreadPool 方法的最大線程數設置成了 Integer.MAX_VALUE,而 newSingleThreadExecutor 方法創建 workQueue 時 LinkedBlockingQueue 未聲明大小,相當于創建了無界隊列,一不小心就會導致 OOM。
threadFactory 如何設置
一般業務中會有多個線程池,如果某個線程池出現了問題,定位是哪一個線程出問題很重要,所以為每個線程池取一個名字就很有必要了,我司用的 dubbo 的 NamedThreadFactory 來生成 threadFactory,創建很簡單
new NamedThreadFactory("demo-work")
它的實現還是很巧妙的,有興趣地可以看看它的源碼,每調用一次,底層有個計數器會加一,會依次命名為 「demo-work-thread-1」, 「demo-work-thread-2」, 「demo-work-thread-3」這樣遞增的字符串。
在實際的業務場景中,一般很難確定 corePoolSize, workQueue,maximumPoolSize 的大小,如果出問題了,一般來說只能重新設置一下這些參數再發布,這樣往往需要耗費一些時間,美團的這篇文章給出了讓人眼前一亮的解決方案,當發現問題(線程池監控告警)時,動態調整這些參數,可以讓這些參數實時生效,能在發現問題時及時解決,確實是個很好的思路。
線程池提交任務的兩種方式
線程池創建好了,該怎么給它提交任務,有兩種方式,調用 execute 和 submit 方法,來看下這兩個方法的方法簽名
// 方式一:execute 方法 public void execute(Runnable command) { } // 方式二:ExecutorService 中 submit 的三個方法 <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task);
區別在于調用 execute 無返回值,而調用 submit 可以返回 Future,那么這個 Future 能到底能干啥呢,看它的接口
public interface Future<V> { /** * 取消正在執行的任務,如果任務已執行或已被取消,或者由于某些原因不能取消則返回 false * 如果任務未開始或者任務已開始但可以中斷(mayInterruptIfRunning 為 true),則 * 可以取消/中斷此任務 */ boolean cancel(boolean mayInterruptIfRunning); /** * 任務在完成前是否已被取消 */ boolean isCancelled(); /** * 正常的執行完流程流程,或拋出異常,或取消導致的任務完成都會返回 true */ boolean isDone(); /** * 阻塞等待任務的執行結果 */ V get() throws InterruptedException, ExecutionException; /** * 阻塞等待任務的執行結果,不過這里指定了時間,如果在 timeout 時間內任務還未執行完成, * 則拋出 TimeoutException 異常 */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
可以用 Future 取消任務,判斷任務是否已取消/完成,甚至可以阻塞等待結果。
submit 為啥能提交任務(Runnable)的同時也能返回任務(Future)的執行結果呢
原來在最后執行 execute 前用 newTaskFor 將 task 封裝成了 RunnableFuture,newTaskFor 返回了 FutureTask 這個類,結構圖如下
可以看到 FutureTask 這個接口既實現了 Runnable 接口,也實現 Future 接口,所以在提交任務的同時也能利用 Future 接口來執行任務的取消,獲取任務的狀態,等待執行結果這些操作。
execute 與 submit 除了是否能返回執行結果這一區別外,還有一個重要區別,那就是使用 execute 執行如果發生了異常,是捕獲不到的,默認會執行 ThreadGroup 的 uncaughtException 方法(下圖數字 2 對應的邏輯)
所以如果你想監控執行 execute 方法時發生的異常,需要通過 threadFactory 來指定一個 UncaughtExceptionHandler,這樣就會執行上圖中的 1,進而執行 UncaughtExceptionHandler 中的邏輯,如下所示:
//1.實現一個自己的線程池工廠 ThreadFactory factory = (Runnable r) -> { //創建一個線程 Thread t = new Thread(r); //給創建的線程設置UncaughtExceptionHandler對象 里面實現異常的默認邏輯 t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> { // 在此設置統計監控邏輯 System.out.println("線程工廠設置的exceptionHandler" + e.getMessage()); }); return t; }; // 2.創建一個自己定義的線程池,使用自己定義的線程工廠 ExecutorService service = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(10),factory); //3.提交任務 service.execute(()->{ int i=1/0; });
執行以上邏輯最終會輸出「線程工廠設置的exceptionHandler/ by zero」,通過這樣的方式就能通過設定的 defaultUncaughtExceptionHandler 來執行我們的監控邏輯了。
如果用 submit ,如何捕獲異常呢,當我們調用 future.get 就可以捕獲
Callable testCallable = xxx; Future future = executor.submit(myCallable); try { future1.get(3)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
那么 future 為啥在 get 的時候才捕獲異步呢,因為在執行 submit 時拋出異常后此異常被保存了起來,而在 get 的時候才被拋出
關于 execute 和 submit 的執行流程 why 神的這篇文章寫得非常透徹,我就不拾人牙慧了,建議大家好好品品,收獲會很大!
ThreadPoolExecutor 源碼剖析
前面鋪墊了這么多,終于到了最核心的源碼剖析環節了。
對于線程池來說,我們最關心的是它的「狀態」和「可運行的線程數量」,一般來說我們可以選擇用兩個變量來記錄,不過 Doug Lea 只用了一個變量(ctl)就達成目的了,我們知道變量越多,代碼的可維護性就越差,也越容易出 bug, 所以只用一個變量就達成了兩個變量的效果,這讓代碼的可維護性大大提高,那么他是怎么設計的呢
// ThreadPoolExecutor.java public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 結果:111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; // 結果: 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 結果: 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; // 結果: 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; // 結果: 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; // 獲取線程池的狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲取線程數量 private static int workerCountOf(int c) { return c & CAPACITY; } }
可以看到,ctl 是一個 原子類的 Integer 變量,有 32 位,低 29 位表示線程數量, 29 位最大可以表示 (2^29)-1 (大概 5 億多),足夠記錄線程大小了,如果未來還是不夠,可以把 ctl 聲明為 AtomicLong,高 3 位用來表示線程池的狀態,3 位可以表示 8 個線程池的狀態,由于線程池總共只有五個狀態,所以 3 位也是足夠了,線程池的五個狀態如下
RUNNING: 接收新的任務,并能繼續處理 workQueue 中的任務
SHUTDOWN: 不再接收新的任務,不過能繼續處理 workQueue 中的任務
STOP: 不再接收新的任務,也不再處理 workQueue 中的任務,并且會中斷正在處理任務的線程
TIDYING: 所有的任務都完結了,并且線程數量(workCount)為 0 時即為此狀態,進入此狀態后會調用 terminated() 這個鉤子方法進入 TERMINATED 狀態
TERMINATED: 調用 terminated() 方法后即為此狀態
線程池的狀態流轉及觸發條件如下
有了這些基礎,我們來分析下 execute 的源碼
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 如果當前線程數少于核心線程數(corePoolSize),無論核心線程是否忙碌,都創建線程,直到達到 corePoolSize 為止 if (workerCountOf(c) < corePoolSize) { // 創建線程并將此任務交給 worker 處理(此時此任務即 worker 中的 firstTask) if (addWorker(command, true)) return; c = ctl.get(); } // 如果線程池處于 RUNNING 狀態,并且線程數大于 corePoolSize 或者 // 線程數少于 corePoolSize 但創建線程失敗了,則將任務丟進 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 這里需要再次檢查線程池是否處于 RUNNING 狀態,因為在任務入隊后可能線程池狀態會發生變化,(比如調用了 shutdown 方法等),如果線程狀態發生變化了,則移除此任務,執行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 如果線程池在 RUNNING 狀態下,線程數為 0,則新建線程加速處理 workQueue 中的任務 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 這段邏輯說明線程數大于 corePoolSize 且任務入隊失敗了,此時會以最大線程數(maximumPoolSize)為界來創建線程,如果失敗,說明線程數超過了 maximumPoolSize,則執行拒絕策略 else if (!addWorker(command, false)) reject(command); }
從這段代碼中可以看到,創建線程是調用 addWorker 實現的,在分析 addWorker 之前,有必要簡單提一下 Worker,線程池把每一個執行任務的線程都封裝為 Worker 的形式,取名為 Worker 很形象,線程池的本質是生產者-消費者模型,生產者不斷地往 workQueue 中丟 task, workQueue 就像流水線一樣不斷地輸送著任務,而 worker(工人) 不斷地取任務來執行
那么問題來了,為啥要把線程封裝到 worker 中呢,線程池拿到 task 后直接丟給線程處理或者讓線程自己去 workQueue 中處理不就完了?
將線程封裝為 worker 主要是為了更好地管理線程的中斷
來看下 Worker 的定義
// 此處可以看出 worker 既是一個 Runnable 任務,也實現了 AQS(實際上是用 AQS 實現了一個獨占鎖,這樣由于 worker 運行時會上鎖,執行 shutdown,setCorePoolSize,setMaximumPoolSize等方法時會試著中斷線程(interruptIdleWorkers) ,在這個方法中斷方法中會先嘗試獲取 worker 的鎖,如果不成功,說明 worker 在運行中,此時會先讓 worker 執行完任務再關閉 worker 的線程,實現優雅關閉線程的目的) private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; // 實際執行任務的線程 final Thread thread; // 上文提到,如果當前線程數少于核心線程數,創建線程并將提交的任務交給 worker 處理處理,此時 firstTask 即為此提交的任務,如果 worker 從 workQueue 中獲取任務,則 firstTask 為空 Runnable firstTask; // 統計完成的任務數 volatile long completedTasks; Worker(Runnable firstTask) { // 初始化為 -1,這樣在線程運行前(調用runWorker)禁止中斷,在 interruptIfStarted() 方法中會判斷 getState()>=0 setState(-1); this.firstTask = firstTask; // 根據線程池的 threadFactory 創建一個線程,將 worker 本身傳給線程(因為 worker 實現了 Runnable 接口) this.thread = getThreadFactory().newThread(this); } public void run() { // thread 啟動后會調用此方法 runWorker(this); } // 1 代表被鎖住了,0 代表未鎖 protected boolean isHeldExclusively() { return getState() != 0; } // 嘗試獲取鎖 protected boolean tryAcquire(int unused) { // 從這里可以看出它是一個獨占鎖,因為當獲取鎖后,cas 設置 state 不可能成功,這里我們也能明白上文中將 state 設置為 -1 的作用,這種情況下永遠不可能獲取得鎖,而 worker 要被中斷首先必須獲取鎖 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 嘗試釋放鎖 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } // 中斷線程,這個方法會被 shutdowNow 調用,從中可以看出 shutdownNow 要中斷線程不需要獲取鎖,也就是說如果線程正在運行,照樣會給你中斷掉,所以一般來說我們不用 shutdowNow 來中斷線程,太粗暴了,中斷時線程很可能在執行任務,影響任務執行 void interruptIfStarted() { Thread t; // 中斷也是有條件的,必須是 state >= 0 且 t != null 且線程未被中斷 // 如果 state == -1 ,不執行中斷,再次明白了為啥上文中 setState(-1) 的意義 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
通過上文對 Worker 類的分析,相信大家不難理解 將線程封裝為 worker 主要是為了更好地管理線程的中斷 這句話。
理解了 Worker 的意義,我們再來看 addWorker 的方法
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 獲取線程池的狀態 int rs = runStateOf(c); // 如果線程池的狀態 >= SHUTDOWN,即為 SHUTDOWN,STOP,TIDYING,TERMINATED 這四個狀態,只有一種情況有可能創建線程,即線程狀態為 SHUTDOWN, 且隊列非空時,firstTask == null 代表創建一個不接收新任務的線程(此線程會從 workQueue 中獲取任務再執行),這種情況下創建線程是為了加速處理完 workQueue 中的任務 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 獲取線程數 int wc = workerCountOf(c); // 如果超過了線程池的最大 CAPACITY(5 億多,基本不可能) // 或者 超過了 corePoolSize(core 為 true) 或者 maximumPoolSize(core 為 false) 時 // 則返回 false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 否則 CAS 增加線程的數量,如果成功跳出雙重循環 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 如果線程運行狀態發生變化,跳到外層循環繼續執行 if (runStateOf(c) != rs) continue retry; // 說明是因為 CAS 增加線程數量失敗所致,繼續執行 retry 的內層循環 } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 能執行到這里,說明滿足增加 worker 的條件了,所以創建 worker,準備添加進線程池中執行任務 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加鎖,是因為下文要把 w 添加進 workers 中, workers 是 HashSet,不是線程安全的,所以需要加鎖予以保證 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次 check 線程池的狀態以防執行到此步時發生中斷等 int rs = runStateOf(ctl.get()); // 如果線程池狀態小于 SHUTDOWN(即為 RUNNING), // 或者狀態為 SHUTDOWN 但 firstTask == null(代表不接收任務,只是創建線程處理 workQueue 中的任務),則滿足添加 worker 的條件 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 如果線程已啟動,顯然有問題(因為創建 worker 后,還沒啟動線程呢),拋出異常 if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); // 記錄最大的線程池大小以作監控之用 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 說明往 workers 中添加 worker 成功,此時啟動線程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 添加線程失敗,執行 addWorkerFailed 方法,主要做了將 worker 從 workers 中移除,減少線程數,并嘗試著關閉線程池這樣的操作 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
從這段代碼我們可以看到多線程下情況的不可預料性,我們發現在滿足條件情況下,又對線程狀態重新進行了 check,以防期間出現中斷等線程池狀態發生變更的操作,這也給我們以啟發:多線程環境下的各種臨界條件一定要考慮到位。
執行 addWorker 創建 worker 成功后,線程開始執行了(t.start()),由于在創建 Worker 時,將 Worker 自己傳給了此線程,所以啟動線程后,會調用 Worker 的 run 方法
public void run() { runWorker(this); }
可以看到最終會調用 runWorker 方法,接下來我們來分析下 runWorker 方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // unlock 會調用 tryRelease 方法將 state 設置成 0,代表允許中斷,允許中斷的條件上文我們在 interruptIfStarted() 中有提過,即 state >= 0 w.unlock(); boolean completedAbruptly = true; try { // 如果在提交任務時創建了線程,并把任務丟給此線程,則會先執行此 task // 否則從任務隊列中獲取 task 來執行(即 getTask() 方法) while (task != null || (task = getTask()) != null) { w.lock(); // 如果線程池狀態為 >= STOP(即 STOP,TIDYING,TERMINATED )時,則線程應該中斷 // 如果線程池狀態 < STOP, 線程不應該中斷,如果中斷了(Thread.interrupted() 返回 true,并清除標志位),再次判斷線程池狀態(防止在清除標志位時執行了 shutdownNow() 這樣的方法),如果此時線程池為 STOP,執行線程中斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 執行任務前,子類可實現此鉤子方法作為統計之用 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 執行任務后,子類可實現此鉤子方法作為統計之用 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 如果執行到這只有兩種可能,一種是執行過程中異常中斷了,一種是隊列里沒有任務了,從這里可以看出線程沒有核心線程與非核心線程之分,哪個任務異常了或者正常退出了都會執行此方法,此方法會根據情況將線程數-1 processWorkerExit(w, completedAbruptly); } }
來看看 processWorkerExit 方法是咋樣的
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果異常退出,cas 執行線程池減 1 操作 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 加鎖確保線程安全地移除 worker workers.remove(w); } finally { mainLock.unlock(); } // woker 既然異常退出,可能線程池狀態變了(如執行 shutdown 等),嘗試著關閉線程池 tryTerminate(); int c = ctl.get(); // 如果線程池處于 STOP 狀態,則如果 woker 是異常退出的,重新新增一個 woker,如果是正常退出的,在 wokerQueue 為非空的條件下,確保至少有一個線程在運行以執行 wokerQueue 中的任務 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
接下來我們分析 woker 從 workQueue 中取任務的方法 getTask
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果線程池狀態至少為 STOP 或者 // 線程池狀態 == SHUTDOWN 并且任務隊列是空的 // 則減少線程數量,返回 null,這種情況下上文分析的 runWorker 會執行 processWorkerExit 從而讓獲取此 Task 的 woker 退出 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 如果 allowCoreThreadTimeOut 為 true,代表任何線程在 keepAliveTime 時間內處于 idle 狀態都會被回收,如果線程數大于 corePoolSize,本身在 keepAliveTime 時間內處于 idle 狀態就會被回收 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // worker 應該被回收的幾個條件,這個比較簡單,就此略過 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 阻塞獲取 task,如果在 keepAliveTime 時間內未獲取任務,說明超時了,此時 timedOut 為 true Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
經過以上源碼剖析,相信我們對線程池的工作原理了解得八九不離十了,再來簡單過一下其他一些比較有用的方法,開頭我們提到線程池的監控問題,我們看一下可以監控哪些指標
int getCorePoolSize():獲取核心線程數。
int getLargestPoolSize():歷史峰值線程數。
int getMaximumPoolSize():最大線程數(線程池線程容量)。
int getActiveCount():當前活躍線程數
int getPoolSize():當前線程池中的線程總數
BlockingQueuegetQueue() 當前線程池的任務隊列,據此可以獲取積壓任務的總數,getQueue.size()
監控思路也很簡單,開啟一個定時線程 ScheduledThreadPoolExecutor,定期對這些線程池指標進行采集,一般會采用一些開源工具如 Grafana + Prometheus + MicroMeter 來實現。
如何實現核心線程池的預熱
使用 prestartAllCoreThreads() 方法,這個方法會一次性創建 corePoolSize 個線程,無需等到提交任務時才創建,提交創建好線程的話,一有任務提交過來,這些線程就可以立即處理。
如何實現動態調整線程池參數
setCorePoolSize(int corePoolSize) 調整核心線程池大小
setMaximumPoolSize(int maximumPoolSize)
setKeepAliveTime() 設置線程的存活時間
解答開篇的問題
其它問題基本都在源碼剖析環節回答了,這里簡單說下其他問題
1、Tomcat 的線程池和 JDK 的線程池實現有啥區別, Dubbo 中有類似 Tomcat 的線程池實現嗎? Dubbo 中一個叫 EagerThreadPool 的東西,可以看看它的使用說明
從注釋里可以看出,如果核心線程都處于 busy 狀態,如果有新的請求進來,EagerThreadPool 會選擇先創建線程,而不是將其放入任務隊列中,這樣可以更快地響應這些請求。
Tomcat 實現也是與此類似的,只不過稍微有所不同,當 Tomcat 啟動時,會先創建 minSpareThreads 個線程,如果經過一段時間收到請求時這些線程都處于忙碌狀態,每次都會以 minSpareThreads 的步長創建線程,本質上也是為了更快地響應處理請求。具體的源碼可以看它的 ThreadPool 實現,這里就不展開了。
2、我司網關 dubbo 調用線程池曾經出現過這樣的一個問題:壓測時接口可以正常返回,但接口 RT 很高,假設設置的核心線程大小為 500,最大線程為 800,緩沖隊列為 5000,你能從這個設置中發現出一些問題并對這些參數進行調優嗎?這個參數明顯能看出問題來,首先任務隊列設置過大,任務達到核心線程后,如果再有請求進來會先進入任務隊列,隊列滿了之后才創建線程,創建線程也是需要不少開銷的,所以我們后來把核心線程設置成了與最大線程一樣,并且調用 prestartAllCoreThreads() 來預熱核心線程,就不用等請求來時再創建線程了。
線程池的幾個最佳實踐
1、線程池執行的任務應該是互相獨立的,如果互相依賴的話,可能導致死鎖,比如下面這樣的代碼
ExecutorService pool = Executors .newSingleThreadExecutor(); pool.submit(() -> { try { String qq=pool.submit(()->"QQ").get(); System.out.println(qq); } catch (Exception e) { } });
2、核心任務與非核心任務最好能用多個線程池隔離開來
曾經我們業務上就出現這樣的一個故障:突然很多用戶反饋短信收不到了,排查才發現發短信是在一個線程池里,而另外的定時腳本也是用的這個線程池來執行任務,這個腳本一分鐘可能產生幾百上千條任務,導致發短信的方法在線程池里基本沒機會執行,后來我們用了兩個線程池把發短信和執行腳本隔離開來解決了問題。
3、添加線程池監控,動態設置線程池
如前文所述,線程池的各個參數很難一次性確定,既然難以確定,又要保證發現問題后及時解決,我們就需要為線程池增加監控,監控隊列大小,線程數量等,我們可以設置 3 分鐘內比如隊列任務一直都是滿了的話,就觸發告警,這樣可以提前預警,如果線上因為線程池參數設置不合理而觸發了降級等操作,可以通過動態設置線程池的方式來實時修改核心線程數,最大線程數等,將問題及時修復。
到此,相信大家對“怎么深入理解線程池”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。