您好,登錄后才能下訂單哦!
Java并發編程中線程池工作原理的示例分析,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
ThreadPoolExecutor實現的頂層接口是Executor,在接口Executor中用戶無需關注如何創建線程,如何調度線程來執行任務,用戶只需提供Runnable對象,將任務的運行邏輯提交到執行器Executor中,由Executor框架完成線程的調配和任務的執行部分。
ExecutorService接口增加了一些能力
擴充執行任務的能力,補充可以為一個或一批異步任務生成Future的方法;
提供了管控線程池的方法,比如停止線程池的運行。
AbstractExecutorService則是上層的抽象類:
將執行任務的流程串聯了起來,保證下層的實現只需關注一個執行任務的方法即可。
ThreadPoolExecutor實現最復雜的運行部分:
可以自動創建、管理和復用指定數量的一組線程,適用方只需提交任務即可線程安全,ThreadPoolExecutor內部有狀態、核心線程數、非核心線程等屬性,廣泛使用了CAS和AQS鎖機制避免并發帶來的沖突問題
提供了核心線程、緩沖阻塞隊列、非核心線程、拋棄策略的概念,可以根據實際應用場景進行組合使用
提供了beforeExecute 和afterExecute()可以支持對線程池的功能進行擴展
提高響應速度:任務到達時,相對于手工創建一個線程,直接從線程池中拿線程,速度肯定快很多
提高線程可管理性:線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統穩定性,使用線程池可以進行同意分配、調優和監控。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize:線程池的核心線程數,一般情況下不管有沒有任務都會一直在線程池中一直存活,只有在 ThreadPoolExecutor中的方法allowCoreThreadTimeOut(boolean value)設置為true時,閑置的核心線程會存在超時機制,如果在指定時間沒有新任務來時,核心線程也會被終止,而這個時間間隔由第3個屬性keepAliveTime指定。
maximumPoolSize:線程池所能容納的最大線程數,當活動的線程數達到這個值后,后續的新任務將會被阻塞。
keepAliveTime:控制線程閑置時的超時時長,超過則終止該線程。一般情況下用于非核心線程,只有在 ThreadPoolExecutor中的方法allowCoreThreadTimeOut(boolean value)設置為true時,也作用于核心線程。
unit:用于指定keepAliveTime參數的時間單位,TimeUnit是個enum枚舉類型,常用的有:TimeUnit.HOURS(小時)、TimeUnit.MINUTES(分鐘)、TimeUnit.SECONDS(秒) 和 TimeUnit.MILLISECONDS(毫秒)等。
workQueue:線程池的任務隊列,通過線程池的execute(Runnable command)方法會將任務Runnable存儲在隊列中。
threadFactory:線程工廠,它是一個接口,用來為線程池創建新線程的。
handler:拒絕策略,所謂拒絕策略,是指將任務添加到線程池中時,線程池拒絕該任務所采取的相應策略。
/** * 任務阻塞隊列 */ private final BlockingQueue<Runnable> workQueue; /** * 非公平的互斥鎖(可重入鎖) */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 線程集合一個Worker對應一個線程,沒有核心線程的說話,只有核心線程數 */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * 配合mainLock通過Condition能夠更加精細的控制多線程的休眠與喚醒 */ private final Condition termination = mainLock.newCondition(); /** * 線程池中線程數量曾經達到過的最大值。 */ private int largestPoolSize; /** * 已完成任務數量 */ private long completedTaskCount; /** * ThreadFactory對象,用于創建線程。 */ private volatile ThreadFactory threadFactory; /** * 拒絕策略的處理句柄 * 現在默認提供了CallerRunsPolicy、AbortPolicy、DiscardOldestPolicy、DiscardPolicy */ private volatile RejectedExecutionHandler handler; /** * 線程池維護線程(超過核心線程數)所允許的空閑時間 */ private volatile long keepAliveTime; /** * 允許線程池中的核心線程超時進行銷毀 */ private volatile boolean allowCoreThreadTimeOut; /** * 線程池維護線程的最小數量,哪怕是空閑的 */ private volatile int corePoolSize; /** * 線程池維護的最大線程數量,線程數超過這個數量之后新提交的任務就需要進入阻塞隊列 */ private volatile int maximumPoolSize;
核心線程數為0,總線程數量閾值為Integer.MAX_VALUE,即可以創建無限的非核心線程
newCachedThreadPool是一個可根據需要創建新線程的線程池,但是在以前構造的線程可用時將重用它們。對于執行很多短期異步任務的程序而言,這些線程池通常可提高程序性能。調用 execute() 將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則創建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。因此,長時間保持空閑的線程池不會使用任何資源。注意,可以使用 ThreadPoolExecutor 構造方法創建具有類似屬性但細節不同(例如超時參數)的線程池。
會出下面大量的線程對象,導致的OOM
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
先執行SynchronousQueue的offer方法提交任務,并查詢線程池中是否有空閑線程來執行SynchronousQueue的poll方法來移除任務。如果有,則配對成功,將任務交給這個空閑線程,否則,配對失敗,創建新的線程去處理任務
當線程池中的線程空閑時,會執行SynchronousQueue的poll方法等待執行SynchronousQueue中新提交的任務。若等待超過60s,空閑線程就會終止
執行大量短生命周期任務。因為maximumPoolSize是無界的,所以提交任務的速度 > 線程池中線程處理任務的速度就要不斷創建新線程;每次提交任務,都會立即有線程去處理,因此CachedThreadPool適用于處理大量、耗時少的任務。
它適用于需要保證順序地執行各個任務;并且在任意時間點,不會有多個線程是活動的應用場景,SingleThreadExecutor的corePoolSize和maximumPoolSize被設置為1,使用無界隊列LinkedBlockingQueue作為線程池的工作隊列
newSingleThreadExecutor 創建是一個單線程池,也就是該線程池只有一個線程在工作,所有的任務是串行執行的,如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它,此線程池保證所有任務的執行順序按照任務的提交順序執行。
當線程池中沒有線程時,會創建一個新線程來執行任務。
當前線程池中有一個線程后,將新任務加入LinkedBlockingQueue
線程執行完第一個任務后,會在一個無限循環中反復從LinkedBlockingQueue獲取任務來執行 。
**適用于串行執行任務場景**
會存在出現阻塞隊列堆積過大,導致的OOM
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
corePoolSize等于maximumPoolSize,所以線程池中只有核心線程,使用無界阻塞隊列LinkedBlockingQueue作為工作隊列
適用于處理CPU密集型的任務,確保CPU在長期被工作線程使用的情況下,盡可能的少的分配線程,即適用執行長期的任務。
newFixedThreadPool:創建固定大小的線程池,每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小,線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。當線程處于空閑狀態時,他們并不會被回收,除非線程池被關閉。當所有的線程都處于活動狀態時,新的任務都會處于等待狀態,直到有線程空閑出來。
會存在出現阻塞隊列堆積大,導致的OOM
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
newScheduledThreadPool 創建一個大小無限的線程池,此線程池支持定時以及周期性執行任務的需求。
線程總數閾值為Integer.MAX_VALUE,工作隊列使用DelayedWorkQueue,非核心線程存活時間為0,所以線程池僅僅包含固定數目的核心線程。
會存在出現阻塞隊列堆積過大,導致的OOM
public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }
可以看出來上面的方法一共使用了DelayedWorkQueue、LinkedBlockingQueue 和 SynchronousQueue。這個就是線程核心之一的阻塞隊列。
scheduleAtFixedRate: 按照固定速率周期執行
scheduleWithFixedDelay:上個任務延遲固定時間后執行
它一般分為直接提交隊列、有界任務隊列、無界任務隊列、優先任務隊列;
直接提交隊列:設置為SynchronousQueue隊列,SynchronousQueue是一個特殊的BlockingQueue,它沒有容量,每執行一個插入操作就會阻塞,需要再執行一個刪除操作才會被喚醒,反之每一個刪除操作也都要等待對應的插入操作。
一個不存儲元素的阻塞隊列,每個插入操作,必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態
SynchronousQueue隊列,提交的任務不會被保存,總是會馬上提交執行。如果用于執行任務的線程數量小于maximumPoolSize,則嘗試創建新的進程,如果達到maximumPoolSize設置的最大值,則根據你設置的handler執行拒絕策略。因此這種方式你提交的任務不會被緩存起來,而是會被馬上執行,在這種情況下,你需要對你程序的并發量有個準確的評估,才能設置合適的maximumPoolSize數量,否則很容易就會執行拒絕策略;
有界的任務隊列:有界的任務隊列可以使用ArrayBlockingQueue實現,如下所示:
new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用ArrayBlockingQueue有界任務隊列,若有新的任務需要執行時,線程池會創建新的線程,直到創建的線程數量達到corePoolSize時,則會將新的任務加入到等待隊列中。若等待隊列已滿,即超過ArrayBlockingQueue初始化的容量,則繼續創建線程,直到線程數量達到maximumPoolSize設置的最大線程數量,若大于maximumPoolSize,則執行拒絕策略。
在這種情況下,線程數量的上限與有界任務隊列的狀態有直接關系,如果有界隊列初始容量較大或者沒有達到超負荷的狀態,線程數將一直維持在corePoolSize以下,反之當任務隊列已滿時,則會以maximumPoolSize為最大線程數上限。
無界的任務隊列:無界任務隊列可以使用LinkedBlockingQueue實現,如下所示:
new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
使用無界任務隊列,線程池的任務隊列可以無限制的添加新的任務,而線程池創建的最大線程數量就是你corePoolSize設置的數量,也就是說在這種情況下maximumPoolSize這個參數是無效的,哪怕你的任務隊列中緩存了很多未執行的任務,當線程池的線程數達到corePoolSize后,就不會再增加了;若后續有新的任務加入,則直接進入隊列等待,當使用這種任務隊列模式時,一定要注意你任務提交與處理之間的協調與控制,不然會出現隊列中的任務由于無法及時處理導致一直增長,直到最后資源耗盡的問題。
優先任務隊列:優先任務隊列通過PriorityBlockingQueue實現,使用平衡二叉樹堆,實現的具有優先級的無界阻塞隊列
任務會按優先級重新排列執行,且線程池的線程數一直為corePoolSize,也就是只有一個。
PriorityBlockingQueue其實是一個特殊的無界隊列,它其中無論添加了多少個任務,線程池創建的線程數也不會超過corePoolSize的數量,只不過其他隊列一般是按照先進先出的規則處理任務,而PriorityBlockingQueue隊列可以自定義規則根據任務的優先級順序先后執行。
其實LinkedBlockingQueue也是可以設置界限的,它默認的界限是Integer.MAX_VALUE。同時也支持也支持構造的時候設置隊列大小。
無界阻塞延遲隊列,隊列中每個元素均有過期時間,當從隊列獲取元素時,只有過期元素才會出隊列。隊列頭元素是最塊要過期的元素。
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
當Executor已經關閉,即執行了executorService.shutdown()方法后,或者Executor將有限邊界用于最大線程和工作隊列容量,且已經飽和時。使用方法execute()提交的新任務將被拒絕. 在以上述情況下,execute方法將調用其RejectedExecutionHandler的rejectExecution()方法
RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
也稱為終止策略,遭到拒絕將拋出運行時RejectedExecutionException。業務方能通過捕獲異常及時得到對本次任務提交的結果反饋。
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
擁有自主反饋控制,讓提交者執行提交任務,能夠減緩新任務的提交速度。這種情況是需要讓所有的任務都執行完畢。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
拒絕任務的處理程序,靜默丟棄任務。使用此策略,我們可能無法感知系統的異常狀態。慎用~!
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
丟棄隊列中最前面的任務,然后重新提交被拒絕的任務。是否要使用此策略需要看業務是否需要新老的替換,慎用~!(LRU)
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
判斷線程池中核心線程數是否已達閾值corePoolSize,若否,則創建一個新核心線程執行任務
若核心線程數已達閾值corePoolSize,判斷阻塞隊列workQueue是否已滿,若未滿,則將新任務添加進阻塞隊列
若滿,再判斷,線程池中線程數是否達到閾值maximumPoolSize,若否,則新建一個非核心線程執行任務。若達到閾值,則執行線程池飽和策略。
AbortPolicy:直接拋出一個異常,默認策略
DiscardPolicy: 直接丟棄任務
DiscardOldestPolicy:拋棄下一個將要被執行的任務(最舊任務)
CallerRunsPolicy:主線程中執行任務
要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:
任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
任務的優先級:高,中和低。
任務的執行時間:長,中和短。
任務的依賴性:是否依賴其他系統資源,如數據庫連接。
根據任務所需要的cpu和io資源的量可以分為,
CPU密集型任務: 主要是執行計算任務,響應時間很快,cpu一直在運行,這種任務cpu的利用率很高。
IO密集型任務:主要是進行IO操作,執行IO操作的時間較長,這是cpu出于空閑狀態,導致cpu的利用率不高。
為了合理最大限度的使用系統資源同時也要保證的程序的高性能,可以給CPU密集型任務和IO密集型任務配置一些線程數。
CPU密集型:線程個數為CPU核數。這幾個線程可以并行執行,不存在線程切換到開銷,提高了cpu的利用率的同時也減少了切換線程導致的性能損耗
IO密集型:線程個數為CPU核數的兩倍。到其中的線程在IO操作的時候,其他線程可以繼續用cpu,提高了cpu的利用率。
看完上述內容,你們掌握Java并發編程中線程池工作原理的示例分析的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。