您好,登錄后才能下訂單哦!
/**
* 使用給定的參數創建ThreadPoolExecutor.
*
* @param corePoolSize 核心線程池中的最大線程數
* @param maximumPoolSize 總線程池中的最大線程數
* @param keepAliveTime 空閑線程的存活時間
* @param unit keepAliveTime的單位
* @param workQueue 任務隊列, 保存已經提交但尚未被執行的任務
* @param threadFactory 線程工廠(用于指定如果創建一個線程)
* @param handler 拒絕策略 (當任務太多導致工作隊列滿時的處理策略)
*/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime); // 使用納秒保存存活時間
this.threadFactory = threadFactory;
this.handler = handler;
}
1、ThreadPoolExecutor在邏輯上將自身管理的線程池劃分為兩部分:核心線程池(大小對應為corePoolSize)、非核心線程池(大小對應為maximumPoolSize-corePoolSize)。
2、ThreadPoolExecutor中只有一種類型的線程,名叫Worker,它是ThreadPoolExecutor內部類,封裝著Runnable任務和執行該任務的Thread對象,我們稱它為【工作線程】,它也是ThreadPoolExecutor唯一需要進行維護的線程;
3、【核心線程池】【非核心線程池】都是邏輯上的概念,ThreadPoolExecutor在任務調度過程中會根據corePoolSize和maximumPoolSize的大小,判斷如何執行任務。
1、低29位保存線程數
2、高3位保存線程池狀態
//保存線程池狀態和工作線程數:低29位: 工作線程數,高3位 : 線程池狀態
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大線程數: 2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 線程池狀態
private static final int RUNNING = -1 << COUNT_BITS; //接受新任務, 且處理已經進入阻塞隊列的任務
private static final int SHUTDOWN = 0 << COUNT_BITS; //不接受新任務, 但處理已經進入阻塞隊列的任務
private static final int STOP = 1 << COUNT_BITS; //不接受新任務, 且不處理已經進入阻塞隊列的任務, 同時中斷正在運行的任務
private static final int TIDYING = 2 << COUNT_BITS; //所有任務都已終止, 工作線程數為0, 線程轉化為TIDYING狀態并準備調用terminated方法
private static final int TERMINATED = 3 << COUNT_BITS; //terminated方法已經執行完成
/**
* Worker表示線程池中的一個工作線程, 可以與任務相關聯.
* 由于實現了AQS框架, 其同步狀態值的定義如下:
* -1: 初始狀態
* 0: 無鎖狀態
* 1: 加鎖狀態
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/**
* 與該Worker關聯的線程.
*/
final Thread thread;
/**
*初始化任務,可以為空,為空的時候則去任務隊列workQueue里獲取
*/
Runnable firstTask;
/**
* 當前工作線程處理完成的任務數
*/
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // 初始的同步狀態值
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/**
* 執行任務
*/
public void run() {
runWorker(this);
}
/**
* 是否加鎖
*/
protected boolean isHeldExclusively() {
return getState() != 0;
}
/**
* 嘗試獲取鎖
*/
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 嘗試釋放鎖
*/
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
public void unlock() {
release(1);
}
public boolean isLocked() {
return isHeldExclusively();
}
/**
* 中斷線程(僅任務非初始狀態)
*/
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//獲取線程池狀態和工作線程數量
int c = ctl.get();
//如果工作線程數 < 核心線程數,則創建工作線程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) //addWorker創建工作線程
return;
c = ctl.get();
}
//工作線程創建失敗,或者,工作線程 >= 核心線程數,任務插入任務隊列workQueue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次檢查線程池狀態,如果不是運行狀態,移除任務,并拒絕
if (! isRunning(recheck) && remove(command))
reject(command);
//如果工作線程為0,則創建一個不帶任務的線程,線程自動去任務隊列獲取任務執行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//任務插入隊列失敗,創建非核心工作線程,如果失敗,則說明工作線程 > 總線程數量,則執行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
說明:
1、如果工作線程數小于核心線程池上限(CorePoolSize),則直接新建一個工作線程并執行任務;
2、如果工作線程數大于等于CorePoolSize,則嘗試將任務加入到隊列等待以后執行。如果隊列已滿,則在總線程池未滿的情況下(CorePoolSize ≤ 工作線程數 < maximumPoolSize)新建一個工作線程立即執行任務,否則執行拒絕策略。
/**
* 添加工作線程并執行任務
*
* @param firstTask 如果指定了該參數, 表示將立即執行該firstTask任務; 否則從工作隊列中獲取任務并執行
* @param core 執行任務的工作線程歸屬于哪個線程池: true-核心線程池 false-非核心線程池
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 這個if主要是判斷哪些情況下, 線程池不再接受新任務執行, 而是直接返回.總結下, 有以下幾種情況:
* 1. 線程池狀態為 STOP 或 TIDYING 或 TERMINATED: 線程池狀態為上述任一一種時, 都不會再接受任務,所以直接返回
* 2. 線程池狀態≥ SHUTDOWN 且 firstTask != null: 因為當線程池狀態≥ SHUTDOWN時, 不再接受新任務的提交,所以直接返回
* 3. 線程池狀態≥ SHUTDOWN 且 隊列為空: 隊列中已經沒有任務了, 所以也就不需要執行任何任務了,可以直接返回
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取工作線程數
int wc = workerCountOf(c);
/**
* 這個if主要是判斷工作線程數是否超限, 以下任一情況屬于超限, 直接返回:
* 1. 工作線程數超過最大工作線程數(2^29-1)
* 2. 工作線程數超過核心線程池上限(入參core為true, 表示歸屬核心線程池)
* 3. 工作線程數超過總線程池上限(入參core為false, 表示歸屬非核心線程池)
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//增加工作線程數
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
//CAS失敗,自旋重新操作
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//將任務包裝成工作線程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//重新檢查線程池狀態
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//工作線程加入集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//啟動線程,這個很關鍵,因為t在new的時候是用Work作為任務的,
// Work實現了Runnale接口,所以t.start就是執行Work的run方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
特別注意:
啟動線程,這個很關鍵,因為thread在new的時候是用Work(this)作為任務的, Work實現了Runnale接口,所以t.start就是執行Work的run方法。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 任務, 如果是null則從隊列取任務
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允許執行線程被中斷
boolean completedAbruptly = true;
try {
// 當task==null時會通過getTask從隊列取任務
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 鉤子方法,由子類自定義實現
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 鉤子方法,由子類自定義實現
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++; // 完成任務數+1
w.unlock();
}
}
//說明該工作線程自身既沒有攜帶任務, 也沒從任務隊列中獲取到任務
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
獲取任務方法getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 以下IF用于判斷哪些情況下不允許再從隊列獲取任務:
* 1. 線程池進入停止狀態(STOP/TIDYING/TERMINATED), 此時即使隊列中還有任務未執行, 也不再執行
* 2. 線程池非RUNNING狀態, 且隊列為空
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/**
* timed變量用于判斷是否需要進行超時控制:
* 對于核心線程池中的工作線程, 除非設置了allowCoreThreadTimeOut==true, 否則不會超時回收;
* 對于非核心線程池中的工作線程, 都需要超時控制
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 這里主要是當外部通過setMaximumPoolSize方法重新設置了最大線程數時
// 需要回收多出的工作線程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//區分超時操作還是非超時獲取
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
所謂拒絕策略,就是在構造ThreadPoolExecutor時,傳入的RejectedExecutionHandler對象,一共4種
1.AbortPolicy(默認),拋出一個RejectedExecutionException異常
2、DiscardPolicy,什么都不做,等任務自己被回收
3、DiscardOldestPolicy,丟棄任務隊列中的最近一個任務,并執行當前任務
4、CallerRunsPolicy,以自身線程來執行任務,減緩新任務提交的速度
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。