中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

多線程(二十二、異步執行-Futrue模式)

發布時間:2020-06-19 04:44:21 來源:網絡 閱讀:532 作者:shayang88 欄目:編程語言

Future簡介

如果一個任務需要返回執行結果,一般我們會實現一個Callable任務,并創建一個線程來執行任務。對于執行時間比較長的任務,顯然我們同步的等待結果再去執行后續的業務是不現實的,那么,Future模式是怎樣解決這個問題的呢?

Future模式,可以讓調用方立即返回,然后它自己會在后面慢慢處理,此時調用者拿到的僅僅是一個憑證,調用者可以先去處理其它任務,在真正需要用到調用結果的場合,再使用憑證去獲取調用結果。這個憑證就是這里的Future。

Future接口的定義:

public interface Future<V> {
      // 取消任務
    boolean cancel(boolean mayInterruptIfRunning);
      // 任務是否取消
    boolean isCancelled();
      // 標記任務是否執行完成
    boolean isDone();
      // 阻塞獲取任務結果
    V get() throws InterruptedException, ExecutionException;
      // 超時獲取任務結果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask

Future模式中,最重要的就是FutureTask類

多線程(二十二、異步執行-Futrue模式)

FutureTask一共給任務定義了7種狀態

1、NEW:表示任務的初始化狀態;
2、COMPLETING:表示任務已執行完成(正常完成或異常完成),但任務結果或異常原因還未設置完成,屬于中間狀態;
3、NORMAL:表示任務已經執行完成(正常完成),且任務結果已設置完成,屬于最終狀態;
4、EXCEPTIONAL:表示任務已經執行完成(異常完成),且任務異常已設置完成,屬于最終狀態;
5、CANCELLED:表示任務還沒開始執行就被取消(非中斷方式),屬于最終狀態;
6、INTERRUPTING:表示任務還沒開始執行就被取消(中斷方式),正式被中斷前的過渡狀態,屬于中間狀態;
7、INTERRUPTED:表示任務還沒開始執行就被取消(中斷方式),且已被中斷,屬于最終狀態。

各個狀態之間的流轉:

多線程(二十二、異步執行-Futrue模式)

FutureTask構造

FutureTask在構造時可以接受Runnable或Callable任務,如果是Runnable,則最終包裝成Callable:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
                // 包裝Runnable成為Callable
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

FutureTask成員

private volatile int state;//任務狀態
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

private Callable<V> callable;       // 真正的任務
private volatile Thread runner;     // 保存正在執行任務的線程

/**
 * 記錄結果或異常
 */
private Object outcome;

/**
 * 無鎖棧(Treiber stack)
 * 保存等待線程
 */
private volatile WaitNode waiters;

當調用FutureTask的get方法時,如果任務沒有完成,則調用線程會被阻塞,其實就是將線程包裝成WaitNode結點保存到waiters指向的棧中。

static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

任務執行run

public void run() {
    // 僅當任務為NEW狀態時, 才能執行任務
    if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                            //執行任務
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                                //設置異常
                setException(ex);
            }
            if (ran)
                            //設置任務執行結果outcome
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

set方法:

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;//存儲結果值
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

任務取消

public boolean cancel(boolean mayInterruptIfRunning) {
    // 僅NEW狀態下可以取消任務
    if (!(state == NEW &&
            UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;

    try {   
        if (mayInterruptIfRunning) {    // 中斷任務
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
            //釋放所有在棧上等待的線程
        finishCompletion();
    }
    return true;
}

任務取消后,最終調用finishCompletion方法,釋放所有在棧上等待的線程

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) { //自旋釋放所有等待線程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);//喚醒線程
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

獲取結果

FutureTask可以通過get方法獲取任務結果,如果需要限時等待,可以調用get(long timeout, TimeUnit unit)

public V get() throws InterruptedException, ExecutionException {
    int s = state;
        //當前任務的狀態是NEW或COMPLETING,會調用awaitDone阻塞線程
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);   // 任務執行結果
}
/**
 * 返回執行結果.
 */
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V) x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable) x);
}

ScheduledFutureTask

1、ScheduledFutureTask在普通FutureTask的基礎上增加了周期執行/延遲執行的功能
2、ScheduledFutureTask是ScheduledThreadPoolExecutor這個線程池的默認調度任務類,通過繼承FutureTask和Delayed接口來實現周期/延遲功能的。
多線程(二十二、異步執行-Futrue模式)

ScheduledFutureTask的源碼非常簡單,基本都是委托FutureTask來實現的

任務運行
public void run() {
    // 是否是周期任務
    boolean periodic = isPeriodic();  
        //// 能否運行任務
    if (!canRunInCurrentRunState(periodic))      
        cancel(false);
    else if (!periodic)  // 非周期任務:調用FutureTask的run方法運行
        ScheduledFutureTask.super.run();
                // 周期任務:調用FutureTask的runAndReset方法運行
    else if (ScheduledFutureTask.super.runAndReset()) { 
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

FutureTask的runAndReset方法與run方法的區別就是當任務正常執行完成后,不會設置任務的最終狀態(即保持NEW狀態),以便任務重復執行:

protected boolean runAndReset() {
    // 僅NEW狀態的任務可以執行
    if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
        return false;

    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); //不設置執行結果
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        runner = null;
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;//重新設置任務狀態為NEW,繼續重復執行
}
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

蒙阴县| 丁青县| 赤壁市| 隆德县| 湘潭县| 邹平县| 安徽省| 景德镇市| 新余市| 图们市| 靖江市| 嘉义市| 临沧市| 九台市| 澎湖县| 翁源县| 喀喇| 许昌县| 綦江县| 积石山| 咸阳市| 长治市| 舒城县| 樟树市| 西宁市| 扎鲁特旗| 利津县| 滕州市| 光泽县| 图们市| 青州市| 瑞昌市| 上栗县| 秦皇岛市| 额尔古纳市| 明光市| 酒泉市| 新乡县| 灌南县| 东乌| 衢州市|