您好,登錄后才能下訂單哦!
簡介
在實現定時調度功能的時候,我們往往會借助于第三方類庫來完成,比如: quartz 、 Spring Schedule 等等。JDK從1.3版本開始,就提供了基于 Timer 的定時調度功能。在 Timer 中,任務的執行是串行的。這種特性在保證了線程安全的情況下,往往帶來了一些嚴重的副作用,比如任務間相互影響、任務執行效率低下等問題。為了解決 Timer 的這些問題,JDK從1.5版本開始,提供了基于 ScheduledExecutorService 的定時調度功能。
本節我們主要分析 Timer 的功能。對于 ScheduledExecutorService 的功能,我們將新開一篇文章來講解。
如何使用
Timer 需要和 TimerTask 配合使用,才能完成調度功能。 Timer 表示調度器, TimerTask 表示調度器執行的任務。任務的調度分為兩種:一次性調度和循環調度。下面,我們通過一些例子來了解他們是如何使用的。
1. 一次性調度
public static void main(String[] args) { Timer timer = new Timer(); TimerTask task = new TimerTask() { @Override public void run() { SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss"); System.out.println(format.format(scheduledExecutionTime()) + ", called"); } }; // 延遲一秒,打印一次 // 打印結果如下:10:58:24, called timer.schedule(task, 1000); }
2. 循環調度 - schedule()
public static void main(String[] args) { Timer timer = new Timer(); TimerTask task = new TimerTask() { @Override public void run() { SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss"); System.out.println(format.format(scheduledExecutionTime()) + ", called"); } }; // 固定時間的調度方式,延遲一秒,之后每隔一秒打印一次 // 打印結果如下: // 11:03:55, called // 11:03:56, called // 11:03:57, called // 11:03:58, called // 11:03:59, called // ... timer.schedule(task, 1000, 1000); }
3. 循環調度 - scheduleAtFixedRate()
public static void main(String[] args) { Timer timer = new Timer(); TimerTask task = new TimerTask() { @Override public void run() { SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss"); System.out.println(format.format(scheduledExecutionTime()) + ", called"); } }; // 固定速率的調度方式,延遲一秒,之后每隔一秒打印一次 // 打印結果如下: // 11:08:43, called // 11:08:44, called // 11:08:45, called // 11:08:46, called // 11:08:47, called // ... timer.scheduleAtFixedRate(task, 1000, 1000); }
4. schedule()和scheduleAtFixedRate()的區別
從2和3的結果來看,他們達到的效果似乎是一樣的。既然效果一樣,JDK為啥要實現為兩個方法呢?他們應該有不一樣的地方!
在正常的情況下,他們的效果是一模一樣的。而在異常的情況下 - 任務執行的時間比間隔的時間更長,他們是效果是不一樣的。
我們先來看看 schedule() 的異常效果:
public static void main(String[] args) { Timer timer = new Timer(); TimerTask task = new TimerTask() { @Override public void run() { SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(format.format(scheduledExecutionTime()) + ", called"); } }; timer.schedule(task, 1000, 2000); // 執行結果如下: // 11:18:56, called // 11:18:59, called // 11:19:02, called // 11:19:05, called // 11:19:08, called // 11:19:11, called }
接下來我們看看 scheduleAtFixedRate() 的異常效果:
public static void main(String[] args) { Timer timer = new Timer(); TimerTask task = new TimerTask() { @Override public void run() { SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(format.format(scheduledExecutionTime()) + ", called"); } }; timer.scheduleAtFixedRate(task, 1000, 2000); // 執行結果如下: // 11:20:45, called // 11:20:47, called // 11:20:49, called // 11:20:51, called // 11:20:53, called // 11:20:55, called }
樓主一直相信,實踐是檢驗真理比較好的方式,上面的例子從側面驗證了我們最初的猜想。
但是,這兒引出了另外一個問題。既然 Timer 內部是單線程實現的,在執行間隔為2秒、任務實際執行為3秒的情況下, scheduleAtFixedRate 是如何做到2秒輸出一次的呢?
【特別注意】
這兒其實是一個障眼法。需要重點關注的是,打印方法輸出的值是通過調用 scheduledExecutionTime() 來生成的,而這個方法并不一定是任務真實執行的時間,而是當前任務應該執行的時間。
源碼閱讀
樓主對于知識的理解是,除了知其然,還需要知其所以然。而閱讀源碼是打開 知其所以然 大門的一把強有力的鑰匙。在JDK中, Timer 主要由 TimerTask 、 TaskQueue 和 TimerThread 組成。
1. TimerTask
TimerTask 表示任務調度器執行的任務,繼承自 Runnable ,其內部維護著任務的狀態,一共有4種狀態
TimerTask 還有下面的成員變量
分析完大致的功能之后,我們來看看其代碼。
/** * The state of this task, chosen from the constants below. */ int state = VIRGIN; /** * This task has not yet been scheduled. */ static final int VIRGIN = 0; /** * This task is scheduled for execution. If it is a non-repeating task, * it has not yet been executed. */ static final int SCHEDULED = 1; /** * This non-repeating task has already executed (or is currently * executing) and has not been cancelled. */ static final int EXECUTED = 2; /** * This task has been cancelled (with a call to TimerTask.cancel). */ static final int CANCELLED = 3;
TimerTask 有兩個操作方法
cancel() 比較簡單,主要對當前任務加鎖,然后變更狀態為已取消。
public boolean cancel() { synchronized(lock) { boolean result = (state == SCHEDULED); state = CANCELLED; return result; } }
而在 scheduledExecutionTime() 中,任務執行時間是通過下一次執行時間減去間隔時間的方式計算出來的。
public long scheduledExecutionTime() { synchronized(lock) { return (period < 0 ? nextExecutionTime + period : nextExecutionTime - period); } }
2. TaskQueue
TaskQueue 是一個隊列,在 Timer 中用于存放任務。其內部是使用【最小堆算法】來實現的,堆頂的任務將最先被執行。由于使用了【最小堆】, TaskQueue 判斷執行時間是否已到的效率極高。我們來看看其內部是怎么實現的。
class TaskQueue { /** * Priority queue represented as a balanced binary heap: the two children * of queue[n] are queue[2*n] and queue[2*n+1]. The priority queue is * ordered on the nextExecutionTime field: The TimerTask with the lowest * nextExecutionTime is in queue[1] (assuming the queue is nonempty). For * each node n in the heap, and each descendant of n, d, * n.nextExecutionTime <= d.nextExecutionTime. * * 使用數組來存放任務 */ private TimerTask[] queue = new TimerTask[128]; /** * The number of tasks in the priority queue. (The tasks are stored in * queue[1] up to queue[size]). * * 用于表示隊列中任務的個數,需要注意的是,任務數并不等于數組長度 */ private int size = 0; /** * Returns the number of tasks currently on the queue. */ int size() { return size; } /** * Adds a new task to the priority queue. * * 往隊列添加一個任務 */ void add(TimerTask task) { // Grow backing store if necessary // 在任務數超過數組長度,則通過數組拷貝的方式進行動態擴容 if (size + 1 == queue.length) queue = Arrays.copyOf(queue, 2*queue.length); // 將當前任務項放入隊列 queue[++size] = task; // 向上調整,重新形成一個最小堆 fixUp(size); } /** * Return the "head task" of the priority queue. (The head task is an * task with the lowest nextExecutionTime.) * * 隊列的第一個元素就是最先執行的任務 */ TimerTask getMin() { return queue[1]; } /** * Return the ith task in the priority queue, where i ranges from 1 (the * head task, which is returned by getMin) to the number of tasks on the * queue, inclusive. * * 獲取隊列指定下標的元素 */ TimerTask get(int i) { return queue[i]; } /** * Remove the head task from the priority queue. * * 移除堆頂元素,移除之后需要向下調整,使之重新形成最小堆 */ void removeMin() { queue[1] = queue[size]; queue[size--] = null; // Drop extra reference to prevent memory leak fixDown(1); } /** * Removes the ith element from queue without regard for maintaining * the heap invariant. Recall that queue is one-based, so * 1 <= i <= size. * * 快速移除指定位置元素,不會重新調整堆 */ void quickRemove(int i) { assert i <= size; queue[i] = queue[size]; queue[size--] = null; // Drop extra ref to prevent memory leak } /** * Sets the nextExecutionTime associated with the head task to the * specified value, and adjusts priority queue accordingly. * * 重新調度,向下調整使之重新形成最小堆 */ void rescheduleMin(long newTime) { queue[1].nextExecutionTime = newTime; fixDown(1); } /** * Returns true if the priority queue contains no elements. * * 隊列是否為空 */ boolean isEmpty() { return size==0; } /** * Removes all elements from the priority queue. * * 清除隊列中的所有元素 */ void clear() { // Null out task references to prevent memory leak for (int i=1; i<=size; i++) queue[i] = null; size = 0; } /** * Establishes the heap invariant (described above) assuming the heap * satisfies the invariant except possibly for the leaf-node indexed by k * (which may have a nextExecutionTime less than its parent's). * * This method functions by "promoting" queue[k] up the hierarchy * (by swapping it with its parent) repeatedly until queue[k]'s * nextExecutionTime is greater than or equal to that of its parent. * * 向上調整,使之重新形成最小堆 */ private void fixUp(int k) { while (k > 1) { int j = k >> 1; if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime) break; TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } } /** * Establishes the heap invariant (described above) in the subtree * rooted at k, which is assumed to satisfy the heap invariant except * possibly for node k itself (which may have a nextExecutionTime greater * than its children's). * * This method functions by "demoting" queue[k] down the hierarchy * (by swapping it with its smaller child) repeatedly until queue[k]'s * nextExecutionTime is less than or equal to those of its children. * * 向下調整,使之重新形成最小堆 */ private void fixDown(int k) { int j; while ((j = k << 1) <= size && j > 0) { if (j < size && queue[j].nextExecutionTime > queue[j+1].nextExecutionTime) j++; // j indexes smallest kid if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime) break; TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } } /** * Establishes the heap invariant (described above) in the entire tree, * assuming nothing about the order of the elements prior to the call. */ void heapify() { for (int i = size/2; i >= 1; i--) fixDown(i); } }
3. TimerThread
TimerThread 作為 Timer 的成員變量,扮演著調度器的校色。我們先來看看它的構造方法,作用主要就是持有任務隊列。
TimerThread(TaskQueue queue) { this.queue = queue; }
接下來看看 run() 方法,也就是線程執行的入口。
public void run() { try { mainLoop(); } finally { // Someone killed this Thread, behave as if Timer cancelled synchronized(queue) { newTasksMayBeScheduled = false; queue.clear(); // Eliminate obsolete references } } }
主邏輯全在 mainLoop() 方法。在 mainLoop 方法執行完之后,會進行資源的清理操作。我們來看看 mainLoop() 方法。
private void mainLoop() { // while死循環 while (true) { try { TimerTask task; boolean taskFired; // 對queue進行加鎖,保證一個隊列里所有的任務都是串行執行的 synchronized(queue) { // Wait for queue to become non-empty // 操作1,隊列為空,需要等待新任務被調度,這時進行wait操作 while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); // 這兒再次判斷隊列是否為空,是因為【操作1】有任務進來了,同時任務又被取消了(進行了`cancel`操作), // 這時如果隊列再次為空,那么需要退出線程,避免循環被卡死 if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; // 取出隊列中的堆頂元素(下次執行時間最小的那個任務) task = queue.getMin(); // 這兒對堆元素進行加鎖,是為了保證任務的可見性和原子性 synchronized(task.lock) { // 取消的任務將不再被執行,需要從隊列中移除 if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } // 獲取系統當前時間和任務下次執行的時間 currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; // 任務下次執行的時間 <= 系統當前時間,則執行此任務(設置狀態標記`taskFired`為true) if (taskFired = (executionTime<=currentTime)) { // `peroid`為0,表示此任務只需執行一次 if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } // period不為0,表示此任務需要重復執行 // 在這兒就體現出了`schedule()`方法和`scheduleAtFixedRate()`的區別 else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } // 任務沒有被觸發,隊列掛起(帶超時時間) if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } // 任務被觸發,執行任務。執行完后進入下一輪循環 if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } }
4. Timer
Timer 通過構造方法做了下面的事情:
/** * The timer thread. */ private final TimerThread thread = new TimerThread(queue); public Timer(String name, boolean isDaemon) { thread.setName(name); thread.setDaemon(isDaemon); thread.start(); }
在 Timer 中,真正的暴露給用戶使用的調度方法只有兩個, schedule() 和 scheduleAtFixedRate() ,我們來看看。
public void schedule(TimerTask task, long delay) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); sched(task, System.currentTimeMillis()+delay, 0); } public void schedule(TimerTask task, Date time) { sched(task, time.getTime(), 0); } public void schedule(TimerTask task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, System.currentTimeMillis()+delay, -period); } public void schedule(TimerTask task, Date firstTime, long period) { if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, firstTime.getTime(), -period); } public void scheduleAtFixedRate(TimerTask task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, System.currentTimeMillis()+delay, period); } public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) { if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, firstTime.getTime(), period); }
從上面的代碼我們看出下面幾點。
接下來我們看看 sched() 方法。
private void sched(TimerTask task, long time, long period) { // 1. `time`不能為負數的校驗 if (time < 0) throw new IllegalArgumentException("Illegal execution time."); // Constrain value of period sufficiently to prevent numeric // overflow while still being effectively infinitely large. // 2. `period`不能超過`Long.MAX_VALUE >> 1` if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { // 3. Timer被取消時,不能被調度 if (!thread.newTasksMayBeScheduled) throw new IllegalStateException("Timer already cancelled."); // 4. 對任務加鎖,然后設置任務的下次執行時間、執行周期和任務狀態,保證任務調度和任務取消是線程安全的 synchronized(task.lock) { if (task.state != TimerTask.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask.SCHEDULED; } // 5. 將任務添加進隊列 queue.add(task); // 6. 隊列中如果堆頂元素是當前任務,則喚醒隊列,讓`TimerThread`可以進行任務調度 if (queue.getMin() == task) queue.notify(); } }
sched() 方法經過了下述步驟:
【說明】:我們需要特別關注一下第6點。為什么堆頂元素必須是當前任務時才喚醒隊列呢?原因在于堆頂元素所代表的意義,即:堆頂元素表示離當前時間最近的待執行任務!
【例子1】:假如當前時間為1秒,隊列里有一個任務A需要在3秒執行,我們新加入的任務B需要在5秒執行。這時,因為 TimerThread 有 wait(timeout) 操作,時間到了會自己喚醒。所以為了性能考慮,不需要在 sched() 操作的時候進行喚醒。
【例子2】:假如當前時間為1秒,隊列里有一個任務A需要在3秒執行,我們新加入的任務B需要在2秒執行。這時,如果不在 sched() 中進行喚醒操作,那么任務A將在3秒時執行。而任務B因為需要在2秒執行,已經過了它應該執行的時間,從而出現問題。
任務調度方法 sched() 分析完之后,我們繼續分析其他方法。先來看一下 cancel() ,該方法用于取消 Timer 的執行。
public void cancel() { synchronized(queue) { thread.newTasksMayBeScheduled = false; queue.clear(); queue.notify(); // In case queue was already empty. } }
從上面源碼分析來看,該方法做了下面幾件事情:
有的時候,在一個 Timer 中可能會存在多個 TimerTask 。如果我們只是取消其中幾個 TimerTask ,而不是全部,除了對 TimerTask 執行 cancel() 方法調用,還需要對 Timer 進行清理操作。這兒的清理方法就是 purge() ,我們來看看其實現邏輯。
public int purge() { int result = 0; synchronized(queue) { // 1. 遍歷所有任務,如果任務為取消狀態,則將其從隊列中移除,移除數做加一操作 for (int i = queue.size(); i > 0; i--) { if (queue.get(i).state == TimerTask.CANCELLED) { queue.quickRemove(i); result++; } } // 2. 將隊列重新形成最小堆 if (result != 0) queue.heapify(); } return result; }
5. 喚醒隊列的方法
通過前面源碼的分析,我們看到隊列的喚醒存在于下面幾處:
第一點和第二點其實已經分析過了,下面我們來看看第三點。
private final Object threadReaper = new Object() { protected void finalize() throws Throwable { synchronized(queue) { thread.newTasksMayBeScheduled = false; queue.notify(); // In case queue is empty. } } };
該方法用于在GC階段對任務隊列進行喚醒,此處往往被讀者所遺忘。
那么,我們回過頭來想一下,為什么需要這段代碼呢?
我們在分析 TimerThread 的時候看到:如果 Timer 創建之后,沒有被調度的話,將一直wait,從而陷入 假死狀態 。為了避免這種情況,并發大師Doug Lea機智地想到了在 finalize() 中設置狀態標記 newTasksMayBeScheduled ,并對任務隊列進行喚醒操作(queue.notify()),將 TimerThread 從死循環中解救出來。
總結
首先,本文演示了 Timer 是如何使用的,然后分析了調度方法 schedule() 和 scheduleAtFixedRate() 的區別和聯系。
然后,為了加深我們對 Timer 的理解,我們通過閱讀源碼的方式進行了深入的分析。可以看得出,其內部實現得非常巧妙,考慮得也很完善。
但是因為 Timer 串行執行的特性,限制了其在高并發下的運用。后面我們將深入分析高并發、分布式環境下的任務調度是如何實現的,讓我們拭目以待吧~
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。