中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java中ThreadPoolExecutor線程池的使用方法

發布時間:2020-10-10 18:13:32 來源:億速云 閱讀:157 作者:小新 欄目:編程語言

這篇文章將為大家詳細講解有關Java中ThreadPoolExecutor線程池的使用方法,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

Executors

Executors 是一個Java中的工具類. 提供工廠方法來創建不同類型的線程池.

Java中ThreadPoolExecutor線程池的使用方法

從上圖中也可以看出, Executors的創建線程池的方法, 創建出來的線程池都實現了 ExecutorService接口. 常用方法有以下幾個:

newFixedThreadPool(int Threads): 創建固定數目線程的線程池, 超出的線程會在隊列中等待.

newCachedThreadPool(): 創建一個可緩存線程池, 如果線程池長度超過處理需要, 可靈活回收空閑線程(60秒), 若無可回收,則新建線程.

newSingleThreadExecutor(): 創建一個單線程化的線程池, 它只會用唯一的工作線程來執行任務, 保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行. 如果某一個任務執行出錯, 將有另一個線程來繼續執行.

newScheduledThreadPool(int corePoolSize): 創建一個支持定時及周期性的任務執行的線程池, 多數情況下可用來替代Timer類.

Executors 例子

newCachedThreadPool

線程最大數為 Integer.MAX_VALUE, 當我們往線程池添加了 n 個任務, 這 n 個任務都是一起執行的.

        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
newFixedThreadPool
        ExecutorService cachedThreadPool = Executors.newFixedThreadPool(1);
        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        cachedThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(1000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
newScheduledThreadPool

三秒執行一次, 只有執行完這一次后, 才會執行.

        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        Thread.currentThread().sleep(2000);
                        System.out.println(Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, 3, TimeUnit.SECONDS);
newSingleThreadExecutor

順序執行各個任務, 第一個任務執行完, 才會執行下一個.

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName());
                        Thread.currentThread().sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName());
                        Thread.currentThread().sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

Executors存在什么問題

Java中ThreadPoolExecutor線程池的使用方法

在阿里巴巴Java開發手冊中提到,使用Executors創建線程池可能會導致OOM(OutOfMemory ,內存溢出),但是并沒有說明為什么,那么接下來我們就來看一下到底為什么不允許使用Executors?

我們先來一個簡單的例子,模擬一下使用Executors導致OOM的情況.

/**
 * @author Hollis
 */
public class ExecutorsDemo {
    private static ExecutorService executor = Executors.newFixedThreadPool(15);
    public static void main(String[] args) {
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executor.execute(new SubThread());
        }
    }
}

class SubThread implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            //do nothing
        }
    }
}

通過指定JVM參數:-Xmx8m -Xms8m 運行以上代碼,會拋出OOM:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)

以上代碼指出,ExecutorsDemo.java 的第16行,就是代碼中的 executor.execute(new SubThread());

Java中的 BlockingQueue 主要有兩種實現, 分別是 ArrayBlockingQueue 和 LinkedBlockingQueue.

ArrayBlockingQueue 是一個用數組實現的有界阻塞隊列, 必須設置容量.

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

LinkedBlockingQueue 是一個用鏈表實現的有界阻塞隊列, 容量可以選擇進行設置, 不設置的話, 將是一個無邊界的阻塞隊列, 最大長度為 Integer.MAX_VALUE.

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

這里的問題就出在如果我們不設置 LinkedBlockingQueue 的容量的話, 其默認容量將會是 Integer.MAX_VALUE.

而 newFixedThreadPool 中創建 LinkedBlockingQueue 時, 并未指定容量. 此時, LinkedBlockingQueue 就是一個無邊界隊列, 對于一個無邊界隊列來說, 是可以不斷的向隊列中加入任務的, 這種情況下就有可能因為任務過多而導致內存溢出問題.

newCachedThreadPool 和 newScheduledThreadPool 這兩種方式創建的最大線程數可能是Integer.MAX_VALUE, 而創建這么多線程, 必然就有可能導致OOM.

ThreadPoolExecutor 創建線程池

避免使用 Executors 創建線程池, 主要是避免使用其中的默認實現, 那么我們可以自己直接調用 ThreadPoolExecutor 的構造函數來自己創建線程池. 在創建的同時, 給 BlockQueue 指定容量就可以了.

ExecutorService executor = new ThreadPoolExecutor(10, 10,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue(10));

這種情況下, 一旦提交的線程數超過當前可用線程數時, 就會拋出 java.util.concurrent.RejectedExecutionException, 這是因為當前線程池使用的隊列是有邊界隊列, 隊列已經滿了便無法繼續處理新的請求.

除了自己定義 ThreadPoolExecutor 外. 還有其他方法. 如apache和guava等.

四個構造函數

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler)
             
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

int corePoolSize => 該線程池中核心線程數最大值
線程池新建線程的時候,如果當前線程總數小于corePoolSize, 則新建的是核心線程, 如果超過corePoolSize, 則新建的是非核心線程

核心線程默認情況下會一直存活在線程池中, 即使這個核心線程啥也不干(閑置狀態).

如果指定 ThreadPoolExecutor 的 allowCoreThreadTimeOut 這個屬性為 true, 那么核心線程如果不干活(閑置狀態)的話, 超過一定時間(時長下面參數決定), 就會被銷毀掉

很好理解吧, 正常情況下你不干活我也養你, 因為我總有用到你的時候, 但有時候特殊情況(比如我自己都養不起了), 那你不干活我就要把你干掉了

int maximumPoolSize
該線程池中線程總數最大值

線程總數 = 核心線程數 + 非核心線程數.

long keepAliveTime
該線程池中非核心線程閑置超時時長

一個非核心線程, 如果不干活(閑置狀態)的時長超過這個參數所設定的時長, 就會被銷毀掉

如果設置 allowCoreThreadTimeOut = true, 則會作用于核心線程

TimeUnit unit

keepAliveTime的單位, TimeUnit是一個枚舉類型, 其包括:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒

BlockingQueue workQueue

一個阻塞隊列, 用來存儲等待執行的任務. 也就是說現在有10個任務, 核心線程 有四個, 非核心線程有六個, 那么這六個線程會被添加到 workQueue 中, 等待執行.

這個參數的選擇也很重要, 會對線程池的運行過程產生重大影響, 一般來說, 這里的阻塞隊列有以下幾種選擇:

SynchronousQueue: 這個隊列接收到任務的時候, 會直接提交給線程處理, 而不保留它, 如果所有線程都在工作怎么辦? 那就*新建一個線程來處理這個任務!所以為了保證不出現<線程數達到了maximumPoolSize而不能新建線程>的錯誤, 使用這個類型隊列的時候, maximumPoolSize 一般指定成 Integer.MAX_VALUE, 即無限大.

LinkedBlockingQueue: 這個隊列接收到任務的時候, 如果當前線程數小于核心線程數, 則核心線程處理任務; 如果當前線程數等于核心線程數, 則進入隊列等待. 由于這個隊列最大值為 Integer.MAX_VALUE , 即所有超過核心線程數的任務都將被添加到隊列中,這也就導致了 maximumPoolSize 的設定失效, 因為總線程數永遠不會超過 corePoolSize.

ArrayBlockingQueue: 可以限定隊列的長度, 接收到任務的時候, 如果沒有達到 corePoolSize 的值, 則核心線程執行任務, 如果達到了, 則入隊等候, 如果隊列已滿, 則新建線程(非核心線程)執行任務, 又如果總線程數到了maximumPoolSize, 并且隊列也滿了, 則發生錯誤.

DelayQueue: 隊列內元素必須實現 Delayed 接口, 這就意味著你傳進去的任務必須先實現Delayed接口. 這個隊列接收到任務時, 首先先入隊, 只有達到了指定的延時時間, 才會執行任務.

ThreadFactory threadFactory

它是ThreadFactory類型的變量, 用來創建新線程.

默認使用 Executors.defaultThreadFactory() 來創建線程. 使用默認的 ThreadFactory 來創建線程時, 會使新創建的線程具有相同的 NORM_PRIORITY 優先級并且是非守護線程, 同時也設置了線程的名稱.

RejectedExecutionHandler handler

表示當拒絕處理任務時的策略, 有以下四種取值:

ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常(默認).
ThreadPoolExecutor.DiscardPolicy:直接丟棄任務, 但是不拋出異常.
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務, 然后重新嘗試執行任務(重復此過程)
ThreadPoolExecutor.CallerRunsPolicy:用調用者所在的線程來執行任務.

關于Java中ThreadPoolExecutor線程池的使用方法就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

汝阳县| 靖边县| 秦皇岛市| 浦城县| 永靖县| 平乡县| 大埔县| 天津市| 浮梁县| 玉山县| 齐齐哈尔市| 郓城县| 盐源县| 福安市| 抚松县| 青龙| 大竹县| 固镇县| 故城县| 久治县| 长兴县| 慈利县| 名山县| 叙永县| 称多县| 玉屏| 阿合奇县| 静乐县| 阿瓦提县| 深水埗区| 锡林郭勒盟| 确山县| 九江县| 邮箱| 靖边县| 新田县| 巧家县| 封丘县| 洮南市| 金湖县| 金沙县|