中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

futuretask源碼分析(推薦)

發布時間:2020-09-02 20:02:49 來源:腳本之家 閱讀:152 作者:孤客_ 欄目:編程語言

FutureTask只實現RunnableFuture接口:

該接口繼承了java.lang.Runnable和Future接口,也就是繼承了這兩個接口的特性。

1.可以不必直接繼承Thread來生成子類,只要實現run方法,且把實例傳入到Thread構造函數,Thread就可以執行該實例的run方法了( Thread(Runnable) )。

2.可以讓任務獨立執行,get獲取任務執行結果時,可以阻塞直至執行結果完成。也可以中斷執行,判斷執行狀態等。

FutureTask是一個支持取消行為的異步任務執行器。該類實現了Future接口的方法。

如: 1. 取消任務執行

2. 查詢任務是否執行完成

3. 獲取任務執行結果(”get“任務必須得執行完成才能獲取結果,否則會阻塞直至任務完成)。

注意:一旦任務執行完成,則不能執行取消任務或者重新啟動任務。(除非一開始就使用runAndReset模式運行任務)
FutureTask支持執行兩種任務, Callable 或者 Runnable的實現類。且可把FutureTask實例交由Executor執行。

源碼部分(很簡單):

public class FutureTask<V> implements RunnableFuture<V> {
  /*
   * Revision notes: This differs from previous versions of this
   * class that relied on AbstractQueuedSynchronizer, mainly to
   * avoid surprising users about retaining interrupt status during
   * cancellation races. Sync control in the current design relies
   * on a "state" field updated via CAS to track completion, along
   * with a simple Treiber stack to hold waiting threads.
   *
   * Style note: As usual, we bypass overhead of using
   * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
   */
  /**
   * The run state of this task, initially NEW. The run state
   * transitions to a terminal state only in methods set,
   * setException, and cancel. During completion, state may take on
   * transient values of COMPLETING (while outcome is being set) or
   * INTERRUPTING (only while interrupting the runner to satisfy a
   * cancel(true)). Transitions from these intermediate to final
   * states use cheaper ordered/lazy writes because values are unique
   * and cannot be further modified.
   *
   * Possible state transitions:
   * NEW -> COMPLETING -> NORMAL
   * NEW -> COMPLETING -> EXCEPTIONAL
   * NEW -> CANCELLED
   * NEW -> INTERRUPTING -> INTERRUPTED
   */
  private volatile int state;
  private static final int NEW     = 0;
  private static final int COMPLETING  = 1;
  private static final int NORMAL    = 2;
  private static final int EXCEPTIONAL = 3;
  private static final int CANCELLED  = 4;
  private static final int INTERRUPTING = 5;
  private static final int INTERRUPTED = 6;
  /** The underlying callable; nulled out after running */
  private Callable<V> callable;
  /** 用來存儲任務執行結果或者異常對象,根據任務state在get時候選擇返回執行結果還是拋出異常 */
  private Object outcome; // non-volatile, protected by state reads/writes
  /** 當前運行Run方法的線程 */
  private volatile Thread runner;
  /** Treiber stack of waiting threads */
  private volatile WaitNode waiters;
  /**
   * Returns result or throws exception for completed task.
   *
   * @param s completed state value
   */
  @SuppressWarnings("unchecked")
  private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
      return (V)x;
    if (s >= CANCELLED)
      throw new CancellationException();
    throw new ExecutionException((Throwable)x);
  }
  /**
   * Creates a {@code FutureTask} that will, upon running, execute the
   * given {@code Callable}.
   *
   * @param callable the callable task
   * @throws NullPointerException if the callable is null
   */
  public FutureTask(Callable<V> callable) {
    if (callable == null)
      throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;    // ensure visibility of callable
  }
  /**
   * Creates a {@code FutureTask} that will, upon running, execute the
   * given {@code Runnable}, and arrange that {@code get} will return the
   * given result on successful completion.
   *
   * @param runnable the runnable task
   * @param result the result to return on successful completion. If
   * you don't need a particular result, consider using
   * constructions of the form:
   * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
   * @throws NullPointerException if the runnable is null
   */
  public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;    // ensure visibility of callable
  }
  //判斷任務是否已取消(異常中斷、取消等)
  public boolean isCancelled() {
    return state >= CANCELLED;
  }
  /**
  判斷任務是否已結束(取消、異常、完成、NORMAL都等于結束)
  **
  public boolean isDone() {
    return state != NEW;
  }
  /**
  mayInterruptIfRunning用來決定任務的狀態。
          true : 任務狀態= INTERRUPTING = 5。如果任務已經運行,則強行中斷。如果任務未運行,那么則不會再運行
          false:CANCELLED  = 4。如果任務已經運行,則允許運行完成(但不能通過get獲取結果)。如果任務未運行,那么則不會再運行
  **/
  public boolean cancel(boolean mayInterruptIfRunning) {
    if (state != NEW)
      return false;
    if (mayInterruptIfRunning) {
      if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
        return false;
      Thread t = runner;
      if (t != null)
        t.interrupt();
      UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    }
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
      return false;
    finishCompletion();
    return true;
  }
  /**
   * @throws CancellationException {@inheritDoc}
   */
  public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //如果任務未徹底完成,那么則阻塞直至任務完成后喚醒該線程
    if (s <= COMPLETING)
      s = awaitDone(false, 0L);
    return report(s);
  }
  /**
   * @throws CancellationException {@inheritDoc}
   */
  public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
      throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
      (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
      throw new TimeoutException();
    return report(s);
  }
  /**
   * Protected method invoked when this task transitions to state
   * {@code isDone} (whether normally or via cancellation). The
   * default implementation does nothing. Subclasses may override
   * this method to invoke completion callbacks or perform
   * bookkeeping. Note that you can query status inside the
   * implementation of this method to determine whether this task
   * has been cancelled.
   */
  protected void done() { }
  /**
  該方法在FutureTask里只有run方法在任務完成后調用。
  主要保存任務執行結果到成員變量outcome 中,和切換任務執行狀態。
  由該方法可以得知:
  COMPLETING : 任務已執行完成(也可能是異常完成),但還未設置結果到成員變量outcome中,也意味著還不能get
  NORMAL  : 任務徹底執行完成
  **/
  protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      outcome = v;
      UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
      finishCompletion();
    }
  }
  /**
   * Causes this future to report an {@link ExecutionException}
   * with the given throwable as its cause, unless this future has
   * already been set or has been cancelled.
   *
   * <p>This method is invoked internally by the {@link #run} method
   * upon failure of the computation.
   *
   * @param t the cause of failure
   */
  protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      outcome = t;
      UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
      finishCompletion();
    }
  }
  /**
  由于實現了Runnable接口的緣故,該方法可由執行線程所調用。
  **/
  public void run() {
    //只有當任務狀態=new時才被運行繼續執行
    if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
                     null, Thread.currentThread()))
      return;
    try {
      Callable<V> c = callable;
      if (c != null && state == NEW) {
        V result;
        boolean ran;
        try {
          //調用Callable的Call方法
          result = c.call();
          ran = true;
        } catch (Throwable ex) {
          result = null;
          ran = false;
          setException(ex);
        }
        if (ran)
          set(result);
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      int s = state;
      if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
    }
  }
  /**
  如果該任務在執行過程中不被取消或者異常結束,那么該方法不記錄任務的執行結果,且不修改任務執行狀態。
  所以該方法可以重復執行N次。不過不能直接調用,因為是protected權限。
  **/
  protected boolean runAndReset() {
    if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this, runnerOffset,
                     null, Thread.currentThread()))
      return false;
    boolean ran = false;
    int s = state;
    try {
      Callable<V> c = callable;
      if (c != null && s == NEW) {
        try {
          c.call(); // don't set result
          ran = true;
        } catch (Throwable ex) {
          setException(ex);
        }
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      s = state;
      if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
  }
  /**
   * Ensures that any interrupt from a possible cancel(true) is only
   * delivered to a task while in run or runAndReset.
   */
  private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us. Let's spin-wait patiently.
    if (s == INTERRUPTING)
      while (state == INTERRUPTING)
        Thread.yield(); // wait out pending interrupt
    // assert state == INTERRUPTED;
    // We want to clear any interrupt we may have received from
    // cancel(true). However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
  }
  /**
   * Simple linked list nodes to record waiting threads in a Treiber
   * stack. See other classes such as Phaser and SynchronousQueue
   * for more detailed explanation.
   */
  static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
  }
  /**
  該方法在任務完成(包括異常完成、取消)后調用。刪除所有正在get獲取等待的節點且喚醒節點的線程。和調用done方法和置空callable.
  **/
  private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
      if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
        for (;;) {
          Thread t = q.thread;
          if (t != null) {
            q.thread = null;
            LockSupport.unpark(t);
          }
          WaitNode next = q.next;
          if (next == null)
            break;
          q.next = null; // unlink to help gc
          q = next;
        }
        break;
      }
    }
    done();
    callable = null;    // to reduce footprint
  }
  /**
  阻塞等待任務執行完成(中斷、正常完成、超時)
  **/
  private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
      /**
      這里的if else的順序也是有講究的。
      1.先判斷線程是否中斷,中斷則從隊列中移除(也可能該線程不存在于隊列中)
      2.判斷當前任務是否執行完成,執行完成則不再阻塞,直接返回。
      3.如果任務狀態=COMPLETING,證明該任務處于已執行完成,正在切換任務執行狀態,CPU讓出片刻即可
      4.q==null,則證明還未創建節點,則創建節點
      5.q節點入隊
      6和7.阻塞
      **/
      if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
      }
      int s = state;
      if (s > COMPLETING) {
        if (q != null)
          q.thread = null;
        return s;
      }
      else if (s == COMPLETING) // cannot time out yet
        Thread.yield();
      else if (q == null)
        q = new WaitNode();
      else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                           q.next = waiters, q);
      else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
          removeWaiter(q);
          return state;
        }
        LockSupport.parkNanos(this, nanos);
      }
      else
        LockSupport.park(this);
    }
  }
  /**
   * Tries to unlink a timed-out or interrupted wait node to avoid
   * accumulating garbage. Internal nodes are simply unspliced
   * without CAS since it is harmless if they are traversed anyway
   * by releasers. To avoid effects of unsplicing from already
   * removed nodes, the list is retraversed in case of an apparent
   * race. This is slow when there are a lot of nodes, but we don't
   * expect lists to be long enough to outweigh higher-overhead
   * schemes.
   */
  private void removeWaiter(WaitNode node) {
    if (node != null) {
      node.thread = null;
      retry:
      for (;;) {     // restart on removeWaiter race
        for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
          s = q.next;
          if (q.thread != null)
            pred = q;
          else if (pred != null) {
            pred.next = s;
            if (pred.thread == null) // check for race
              continue retry;
          }
          else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                             q, s))
            continue retry;
        }
        break;
      }
    }
  }
  // Unsafe mechanics
  private static final sun.misc.Unsafe UNSAFE;
  private static final long stateOffset;
  private static final long runnerOffset;
  private static final long waitersOffset;
  static {
    try {
      UNSAFE = sun.misc.Unsafe.getUnsafe();
      Class<?> k = FutureTask.class;
      stateOffset = UNSAFE.objectFieldOffset
        (k.getDeclaredField("state"));
      runnerOffset = UNSAFE.objectFieldOffset
        (k.getDeclaredField("runner"));
      waitersOffset = UNSAFE.objectFieldOffset
        (k.getDeclaredField("waiters"));
    } catch (Exception e) {
      throw new Error(e);
    }
  }
}

總結

以上就是本文關于futuretask源碼分析(推薦)的全部內容,希望對大家有所幫助。感興趣的朋友可以參閱:Java利用future及時獲取多線程運行結果、淺談Java多線程處理中Future的妙用(附源碼)、futuretask用法及使用場景介紹等,有什么問題可以隨時留言,歡迎大家一起交流討論。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

十堰市| 玛沁县| 荔浦县| 永泰县| 饶平县| 宁陵县| 方正县| 永登县| 柞水县| 长沙县| 肇州县| 石泉县| 嵊泗县| 新津县| 宁波市| 礼泉县| 花莲市| 荆州市| 称多县| 永德县| 陇西县| 静乐县| 平潭县| 安溪县| 全椒县| 建瓯市| 锦州市| 漳浦县| 新干县| 潞城市| 昌平区| 沙田区| 改则县| 来凤县| 陇西县| 望城县| 怀来县| 洛隆县| 凉城县| 平武县| 凌源市|