中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java并發編程中線程池工作原理的示例分析

發布時間:2021-09-18 11:20:31 來源:億速云 閱讀:127 作者:柒染 欄目:編程語言

Java并發編程中線程池工作原理的示例分析,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

線程池繼承關系

Java并發編程中線程池工作原理的示例分析

ThreadPoolExecutor實現的頂層接口是Executor,在接口Executor中用戶無需關注如何創建線程,如何調度線程來執行任務,用戶只需提供Runnable對象,將任務的運行邏輯提交到執行器Executor中,由Executor框架完成線程的調配和任務的執行部分。

ExecutorService接口增加了一些能力

  • 擴充執行任務的能力,補充可以為一個或一批異步任務生成Future的方法

  • 提供了管控線程池的方法,比如停止線程池的運行

AbstractExecutorService則是上層的抽象類

將執行任務的流程串聯了起來,保證下層的實現只需關注一個執行任務的方法即可

ThreadPoolExecutor實現最復雜的運行部分:

可以自動創建、管理和復用指定數量的一組線程,適用方只需提交任務即可線程安全,ThreadPoolExecutor內部有狀態、核心線程數、非核心線程等屬性,廣泛使用了CAS和AQS鎖機制避免并發帶來的沖突問題

提供了核心線程、緩沖阻塞隊列、非核心線程、拋棄策略的概念,可以根據實際應用場景進行組合使用

提供了beforeExecute 和afterExecute()可以支持對線程池的功能進行擴展

線程池的優點

降低線程創建和銷毀線程造成的開銷

  • 提高響應速度:任務到達時,相對于手工創建一個線程,直接從線程池中拿線程,速度肯定快很多

  • 提高線程可管理性:線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統穩定性,使用線程池可以進行同意分配、調優和監控。

構造函數

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

  • corePoolSize:線程池的核心線程數,一般情況下不管有沒有任務都會一直在線程池中一直存活,只有在 ThreadPoolExecutor中的方法allowCoreThreadTimeOut(boolean value)設置為true時,閑置的核心線程會存在超時機制,如果在指定時間沒有新任務來時,核心線程也會被終止,而這個時間間隔由第3個屬性keepAliveTime指定。

  • maximumPoolSize:線程池所能容納的最大線程數,當活動的線程數達到這個值后,后續的新任務將會被阻塞。

  • keepAliveTime:控制線程閑置時的超時時長,超過則終止該線程。一般情況下用于非核心線程,只有在 ThreadPoolExecutor中的方法allowCoreThreadTimeOut(boolean value)設置為true時,也作用于核心線程。

  • unit:用于指定keepAliveTime參數的時間單位,TimeUnit是個enum枚舉類型,常用的有:TimeUnit.HOURS(小時)、TimeUnit.MINUTES(分鐘)、TimeUnit.SECONDS(秒) 和 TimeUnit.MILLISECONDS(毫秒)等。

  • workQueue:線程池的任務隊列,通過線程池的execute(Runnable command)方法會將任務Runnable存儲在隊列中。

  • threadFactory:線程工廠,它是一個接口,用來為線程池創建新線程的。

  • handler:拒絕策略,所謂拒絕策略,是指將任務添加到線程池中時,線程池拒絕該任務所采取的相應策略。


成員變量

/**
 * 任務阻塞隊列 
 */
private final BlockingQueue<Runnable> workQueue; 
/**
 * 非公平的互斥鎖(可重入鎖)
 */
private final ReentrantLock mainLock = new ReentrantLock();
/**
 * 線程集合一個Worker對應一個線程,沒有核心線程的說話,只有核心線程數
 */
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
 * 配合mainLock通過Condition能夠更加精細的控制多線程的休眠與喚醒
 */
private final Condition termination = mainLock.newCondition();
/**
 * 線程池中線程數量曾經達到過的最大值。
 */
private int largestPoolSize;  
/**
 * 已完成任務數量
 */
private long completedTaskCount;
/**
 * ThreadFactory對象,用于創建線程。
 */
private volatile ThreadFactory threadFactory;  
/**
 * 拒絕策略的處理句柄
 * 現在默認提供了CallerRunsPolicy、AbortPolicy、DiscardOldestPolicy、DiscardPolicy
 */
private volatile RejectedExecutionHandler handler;
/**
 * 線程池維護線程(超過核心線程數)所允許的空閑時間
 */
private volatile long keepAliveTime;
/**
 * 允許線程池中的核心線程超時進行銷毀
 */
private volatile boolean allowCoreThreadTimeOut;  
/**
 * 線程池維護線程的最小數量,哪怕是空閑的  
 */
private volatile int corePoolSize;
/**
 * 線程池維護的最大線程數量,線程數超過這個數量之后新提交的任務就需要進入阻塞隊列
 */
private volatile int maximumPoolSize;

創建線程池

緩存程線程池(不會存放隊列,一直創建線程)

核心線程數為0,總線程數量閾值為Integer.MAX_VALUE,即可以創建無限的非核心線程

newCachedThreadPool是一個可根據需要創建新線程的線程池,但是在以前構造的線程可用時將重用它們。對于執行很多短期異步任務的程序而言,這些線程池通常可提高程序性能。調用 execute() 將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則創建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。因此,長時間保持空閑的線程池不會使用任何資源。注意,可以使用 ThreadPoolExecutor 構造方法創建具有類似屬性但細節不同(例如超時參數)的線程池。

會出下面大量的線程對象,導致的OOM

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                  60L, TimeUnit.SECONDS,
                  new SynchronousQueue<Runnable>());
}

執行流程

  • 先執行SynchronousQueue的offer方法提交任務,并查詢線程池中是否有空閑線程來執行SynchronousQueue的poll方法來移除任務。如果有,則配對成功,將任務交給這個空閑線程,否則,配對失敗,創建新的線程去處理任務

  • 當線程池中的線程空閑時,會執行SynchronousQueue的poll方法等待執行SynchronousQueue中新提交的任務。若等待超過60s,空閑線程就會終止

Java并發編程中線程池工作原理的示例分析


Java并發編程中線程池工作原理的示例分析

使用場景

執行大量短生命周期任務因為maximumPoolSize是無界的,所以提交任務的速度 > 線程池中線程處理任務的速度就要不斷創建新線程;每次提交任務都會立即有線程去處理,因此CachedThreadPool適用于處理大量、耗時少的任務


單線程線程池(只會運行一個線程,否則一直會堆積到阻塞隊列)

它適用于需要保證順序地執行各個任務;并且在任意時間點,不會有多個線程是活動的應用場景,SingleThreadExecutor的corePoolSize和maximumPoolSize被設置為1,使用無界隊列LinkedBlockingQueue作為線程池的工作隊列

newSingleThreadExecutor 創建是一個單線程池,也就是該線程池只有一個線程在工作,所有的任務是串行執行的,如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它此線程池保證所有任務的執行順序按照任務的提交順序執行

  • 當線程池中沒有線程時會創建一個新線程來執行任務

  • 當前線程池中有一個線程后將新任務加入LinkedBlockingQueue

  • 線程執行完第一個任務后會在一個無限循環中反復從LinkedBlockingQueue獲取任務來執行 。

使用場景:

**適用于串行執行任務場景**

會存在出現阻塞隊列堆積過大,導致的OOM

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));
}

固定大小線程池(會運行指定數量的線程,否則一直會堆積到阻塞隊列)

 corePoolSize等于maximumPoolSize,所以線程池中只有核心線程,使用無界阻塞隊列LinkedBlockingQueue作為工作隊列

使用場景

適用于處理CPU密集型的任務,確保CPU在長期被工作線程使用的情況下,盡可能的少的分配線程,即適用執行長期的任務


newFixedThreadPool創建固定大小的線程池每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小,線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。當線程處于空閑狀態時,他們并不會被回收,除非線程池被關閉。當所有的線程都處于活動狀態時,新的任務都會處于等待狀態,直到有線程空閑出來。

會存在出現阻塞隊列堆積大,導致的OOM

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                  0L, TimeUnit.MILLISECONDS,
                  new LinkedBlockingQueue<Runnable>(),
                  threadFactory);
}

定時任務線程池(任務隊列與最大值均為無限大小,一直堆積到阻塞隊列)

newScheduledThreadPool 創建一個大小無限的線程池,此線程池支持定時以及周期性執行任務的需求。

線程總數閾值為Integer.MAX_VALUE,工作隊列使用DelayedWorkQueue,非核心線程存活時間為0,所以線程池僅僅包含固定數目的核心線程。

會存在出現阻塞隊列堆積過大,導致的OOM

public static ScheduledExecutorService newScheduledThreadPool(
    int corePoolSize, ThreadFactory threadFactory) {
  return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
                   ThreadFactory threadFactory) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
      new DelayedWorkQueue(), threadFactory);
}

可以看出來上面的方法一共使用了DelayedWorkQueueLinkedBlockingQueueSynchronousQueue。這個就是線程核心之一的阻塞隊列。

兩種方式提交任務:

  • scheduleAtFixedRate: 按照固定速率周期執行

  • scheduleWithFixedDelay:上個任務延遲固定時間后執行

任務阻塞隊列

它一般分為直接提交隊列、有界任務隊列、無界任務隊列、優先任務隊列;

SynchronousQueue

直接提交隊列:設置為SynchronousQueue隊列,SynchronousQueue是一個特殊的BlockingQueue,它沒有容量,每執行一個插入操作就會阻塞,需要再執行一個刪除操作才會被喚醒,反之每一個刪除操作也都要等待對應的插入操作。

一個不存儲元素的阻塞隊列,每個插入操作,必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態


SynchronousQueue隊列,提交的任務不會被保存,總是會馬上提交執行。如果用于執行任務的線程數量小于maximumPoolSize,則嘗試創建新的進程,如果達到maximumPoolSize設置的最大值,則根據你設置的handler執行拒絕策略。因此這種方式你提交的任務不會被緩存起來,而是會被馬上執行,在這種情況下,你需要對你程序的并發量有個準確的評估,才能設置合適的maximumPoolSize數量,否則很容易就會執行拒絕策略;


ArrayBlockingQueue

有界的任務隊列:有界的任務隊列可以使用ArrayBlockingQueue實現,如下所示:
new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用ArrayBlockingQueue有界任務隊列,若有新的任務需要執行時,線程池會創建新的線程,直到創建的線程數量達到corePoolSize時,則會將新的任務加入到等待隊列中。若等待隊列已滿,即超過ArrayBlockingQueue初始化的容量,則繼續創建線程,直到線程數量達到maximumPoolSize設置的最大線程數量,若大于maximumPoolSize,則執行拒絕策略。


在這種情況下,線程數量的上限與有界任務隊列的狀態有直接關系,如果有界隊列初始容量較大或者沒有達到超負荷的狀態,線程數將一直維持在corePoolSize以下,反之當任務隊列已滿時,則會以maximumPoolSize為最大線程數上限。

LinkedBlockingQueue

無界的任務隊列:無界任務隊列可以使用LinkedBlockingQueue實現,如下所示:
 new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用無界任務隊列,線程池的任務隊列可以無限制的添加新的任務,而線程池創建的最大線程數量就是你corePoolSize設置的數量,也就是說在這種情況下maximumPoolSize這個參數是無效的,哪怕你的任務隊列中緩存了很多未執行的任務,當線程池的線程數達到corePoolSize后,就不會再增加了;若后續有新的任務加入,則直接進入隊列等待,當使用這種任務隊列模式時,一定要注意你任務提交與處理之間的協調與控制,不然會出現隊列中的任務由于無法及時處理導致一直增長,直到最后資源耗盡的問題。 Java并發編程中線程池工作原理的示例分析


Java并發編程中線程池工作原理的示例分析

PriorityBlockingQueue

優先任務隊列:優先任務隊列通過PriorityBlockingQueue實現,使用平衡二叉樹堆,實現的具有優先級的無界阻塞隊列

任務會按優先級重新排列執行,且線程池的線程數一直為corePoolSize,也就是只有一個。

PriorityBlockingQueue其實是一個特殊的無界隊列,它其中無論添加了多少個任務,線程池創建的線程數也不會超過corePoolSize的數量,只不過其他隊列一般是按照先進先出的規則處理任務,而PriorityBlockingQueue隊列可以自定義規則根據任務的優先級順序先后執行

其實LinkedBlockingQueue也是可以設置界限的,它默認的界限是Integer.MAX_VALUE。同時也支持也支持構造的時候設置隊列大小

DelayQueue

無界阻塞延遲隊列,隊列中每個元素均有過期時間,當從隊列獲取元素時,只有過期元素才會出隊列。隊列頭元素是最塊要過期的元素。

拒絕策略

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

當Executor已經關閉,即執行了executorService.shutdown()方法后,或者Executor將有限邊界用于最大線程和工作隊列容量,且已經飽和時。使用方法execute()提交的新任務將被拒絕. 在以上述情況下,execute方法將調用其RejectedExecutionHandler的rejectExecution()方法

RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)

AbortPolicy(默認的拒絕策略)

也稱為終止策略,遭到拒絕將拋出運行時RejectedExecutionException。業務方能通過捕獲異常及時得到對本次任務提交的結果反饋。

public static class AbortPolicy implements RejectedExecutionHandler {
  public AbortPolicy() { }
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
  }
}

CallerRunsPolicy

擁有自主反饋控制,讓提交者執行提交任務,能夠減緩新任務的提交速度。這種情況是需要讓所有的任務都執行完畢。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

DiscardPolicy

拒絕任務的處理程序,靜默丟棄任務。使用此策略,我們可能無法感知系統的異常狀態。慎用~!

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

DiscardOldestPolicy

丟棄隊列中最前面的任務,然后重新提交被拒絕的任務。是否要使用此策略需要看業務是否需要新老的替換,慎用~!(LRU)

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

運作執行流程

  1. 判斷線程池中核心線程數是否已達閾值corePoolSize,若否,則創建一個新核心線程執行任務

  2. 若核心線程數已達閾值corePoolSize,判斷阻塞隊列workQueue是否已滿,若未滿,則將新任務添加進阻塞隊列

  3. 若滿,再判斷,線程池中線程數是否達到閾值maximumPoolSize,若否,則新建一個非核心線程執行任務。若達到閾值,則執行線程池飽和策略。

線程池飽和策略分為一下幾種:

  • AbortPolicy:直接拋出一個異常,默認策略

  • DiscardPolicy: 直接丟棄任務

  • DiscardOldestPolicy:拋棄下一個將要被執行的任務(最舊任務)

  • CallerRunsPolicy:主線程中執行任務


Java并發編程中線程池工作原理的示例分析

合理配置線程池大小

 要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:

  • 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。

  • 任務的優先級:高,中和低。

  • 任務的執行時間:長,中和短。

  • 任務的依賴性:是否依賴其他系統資源,如數據庫連接。


根據任務所需要的cpu和io資源的量可以分為,
  • CPU密集型任務:  主要是執行計算任務,響應時間很快,cpu一直在運行,這種任務cpu的利用率很高。

  • IO密集型任務:主要是進行IO操作,執行IO操作的時間較長,這是cpu出于空閑狀態,導致cpu的利用率不高。


為了合理最大限度的使用系統資源同時也要保證的程序的高性能,可以給CPU密集型任務和IO密集型任務配置一些線程數。
  • CPU密集型:線程個數為CPU核數。這幾個線程可以并行執行,不存在線程切換到開銷,提高了cpu的利用率的同時也減少了切換線程導致的性能損耗

  • IO密集型:線程個數為CPU核數的兩倍。到其中的線程在IO操作的時候,其他線程可以繼續用cpu,提高了cpu的利用率。

看完上述內容,你們掌握Java并發編程中線程池工作原理的示例分析的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

石台县| 瓮安县| 故城县| 鄂托克旗| 江油市| 乌审旗| 大洼县| 沁阳市| 寿光市| 腾冲县| 佛冈县| 林口县| 炉霍县| 石嘴山市| 会泽县| 册亨县| 武宁县| 恩施市| 华池县| 清苑县| 阿巴嘎旗| 潮安县| 隆德县| 寻乌县| 余姚市| 方城县| 永丰县| 盐山县| 河间市| 嵩明县| 阆中市| 彭水| 邓州市| 昌平区| 峨眉山市| 乐都县| 武宁县| 仁怀市| 乌拉特前旗| 彰化市| 永康市|