中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java中的ForkJoin是什么及怎么調用

發布時間:2022-04-28 10:22:25 來源:億速云 閱讀:110 作者:iii 欄目:開發技術

本篇內容主要講解“Java中的ForkJoin是什么及怎么調用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Java中的ForkJoin是什么及怎么調用”吧!

    什么是ForkJoin?

    ForkJoin 從字面上看Fork是分岔的意思,Join是結合的意思,我們可以理解為將大任務拆分成小任務進行計算求解,最后將小任務的結果進行結合求出大任務的解,這些裂變出來的小任務,我們就可以交給不同的線程去進行計算,這也就是分布式計算的一種思想。這與大數據中的分布式離線計算MapReduce類似,對ForkJoin最經典的一個應用就是Java8中的Stream,我們知道Stream分為串行流和并行流,其中并行流parallelStream就是依賴于ForkJoin來實現并行處理的。

    下面我們一起來看一下最為核心的ForkJoinTaskForkJoinPool

    ForkJoinTask 任務

    ForkJoinTask本身的依賴關系并不復雜,它與異步任務計算FutureTask一樣均實現了Future接口

    Java中的ForkJoin是什么及怎么調用

    下面我們就ForkJoinTask的核心源碼來研究一下,該任務是如何通過分治法進行計算。

    ForkJoinTask最核心的莫過于fork()和join()方法了。

    fork()

    • 判斷當前線程是不是ForkJoinWorkerThread線程

      • 是 直接將當前線程push到工作隊列中

      • 否 調用ForkJoinPool 的externalPush方法

    ForkJoinPool構建了一個靜態的common對象,這里調用的就是commonexternalPush()

    join()

    • 調用doJoin()方法,等待線程執行完成

        public final ForkJoinTask<V> fork() {
            Thread t;
            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
                ((ForkJoinWorkerThread)t).workQueue.push(this);
            else
                ForkJoinPool.common.externalPush(this);
            return this;
        }
    
        public final V join() {
            int s;
            if ((s = doJoin() & DONE_MASK) != NORMAL)
                reportException(s);
            return getRawResult();
        }
    
        private int doJoin() {
            int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
            return (s = status) < 0 ? s :
                ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                tryUnpush(this) && (s = doExec()) < 0 ? s :
                wt.pool.awaitJoin(w, this, 0L) :
                externalAwaitDone();
        }
    
    	// 獲取結果的方法由子類實現
    	public abstract V getRawResult();	

    RecursiveTask 是ForkJoinTask的一個子類主要對獲取結果的方法進行了實現,通過泛型約束結果。我們如果需要自己創建任務,仍需要實現RecursiveTask,并去編寫最為核心的計算方法compute()。

    public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
        private static final long serialVersionUID = 5232453952276485270L;
    
        V result;
    
        protected abstract V compute();
    
        public final V getRawResult() {
            return result;
        }
    
        protected final void setRawResult(V value) {
            result = value;
        }
        protected final boolean exec() {
            result = compute();
            return true;
        }
    
    }

    ForkJoinPool 線程池

    ForkJoinTask 中許多功能都依賴于ForkJoinPool線程池,所以說ForkJoinTask運行離不開ForkJoinPool,ForkJoinPool與ThreadPoolExecutor有許多相似之處,他是專門用來執行ForkJoinTask任務的線程池,我之前也有文章對線程池技術進行了介紹,感興趣的可以進行閱讀&mdash;&mdash;從java源碼分析線程池(池化技術)的實現原理

    ForkJoinPool與ThreadPoolExecutor的繼承關系幾乎是相同的,他們相當于兄弟關系。

    Java中的ForkJoin是什么及怎么調用

    工作竊取算法

    ForkJoinPool中采取工作竊取算法,如果每次fork子任務如果都去創建新線程去處理的話,對系統資源的開銷是巨大的,所以必須采取線程池。一般的線程池只有一個任務隊列,但是對于ForkJoinPool來說,由于同一個任務Fork出的各個子任務是平行關系,為了提高效率,減少線程的競爭,需要將這些平行的任務放到不同的隊列中,由于線程處理不同任務的速度不同,這樣就可能存在某個線程先執行完了自己隊列中的任務,這時為了提升效率,就可以讓該線程去“竊取”其它任務隊列中的任務,這就是所謂的“工作竊取算法”。

    對于一般的隊列來說,入隊元素都是在隊尾,出隊元素在隊首,要滿足“工作竊取”的需求,任務隊列應該支持從“隊尾”出隊元素,這樣可以減少與其它工作線程的沖突(因為其它工作線程會從隊首獲取自己任務隊列中的任務),這時就需要使用雙端阻塞隊列來解決。

    構造方法

    首先我們來看ForkJoinPool線程池的構造方法,他為我們提供了三種形式的構造,其中最為復雜的是四個入參的構造,下面我們看一下它四個入參都代表什么?

    • int parallelism 可并行級別(不代表最多存在的線程數量)

    • ForkJoinWorkerThreadFactory factory 線程創建工廠

    • UncaughtExceptionHandler handler 異常捕獲處理器

    • boolean asyncMode 先進先出的工作模式 或者 后進先出的工作模式

        public ForkJoinPool() {
            this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
                 defaultForkJoinWorkerThreadFactory, null, false);
        }
    
    	public ForkJoinPool(int parallelism) {
            this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
        }
    
    	public ForkJoinPool(int parallelism,
                            ForkJoinWorkerThreadFactory factory,
                            UncaughtExceptionHandler handler,
                            boolean asyncMode) {
            this(checkParallelism(parallelism),
                 checkFactory(factory),
                 handler,
                 asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
                 "ForkJoinPool-" + nextPoolId() + "-worker-");
            checkPermission();
        }

    提交方法

    下面我們看一下提交任務的方法:

    externalPush這個方法我們很眼熟,它正是在fork的時候如果當前線程不是ForkJoinWorkerThread,新提交任務也是會通過這個方法去執行任務。由此可見,fork就是新建一個子任務進行提交。

    externalSubmit是最為核心的一個方法,它可以首次向池提交第一個任務,并執行二次初始化。它還可以檢測外部線程的首次提交,并創建一個新的共享隊列。

    signalWork(ws, q)是發送工作信號,讓工作隊列進行運轉。

        public ForkJoinTask<?> submit(Runnable task) {
            if (task == null)
                throw new NullPointerException();
            ForkJoinTask<?> job;
            if (task instanceof ForkJoinTask<?>) // avoid re-wrap
                job = (ForkJoinTask<?>) task;
            else
                job = new ForkJoinTask.AdaptedRunnableAction(task);
            externalPush(job);
            return job;
        }
    
        final void externalPush(ForkJoinTask<?> task) {
            WorkQueue[] ws; WorkQueue q; int m;
            int r = ThreadLocalRandom.getProbe();
            int rs = runState;
            if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
                U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                ForkJoinTask<?>[] a; int am, n, s;
                if ((a = q.array) != null &&
                    (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                    int j = ((am & s) << ASHIFT) + ABASE;
                    U.putOrderedObject(a, j, task);
                    U.putOrderedInt(q, QTOP, s + 1);
                    U.putOrderedInt(q, QLOCK, 0);
                    if (n <= 1)
                        signalWork(ws, q);
                    return;
                }
                U.compareAndSwapInt(q, QLOCK, 1, 0);
            }
            externalSubmit(task);
        }
    
        private void externalSubmit(ForkJoinTask<?> task) {
            int r;                                    // initialize caller's probe
            if ((r = ThreadLocalRandom.getProbe()) == 0) {
                ThreadLocalRandom.localInit();
                r = ThreadLocalRandom.getProbe();
            }
            for (;;) {
                WorkQueue[] ws; WorkQueue q; int rs, m, k;
                boolean move = false;
                if ((rs = runState) < 0) {
                    tryTerminate(false, false);     // help terminate
                    throw new RejectedExecutionException();
                }
                else if ((rs & STARTED) == 0 ||     // initialize
                         ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                    int ns = 0;
                    rs = lockRunState();
                    try {
                        if ((rs & STARTED) == 0) {
                            U.compareAndSwapObject(this, STEALCOUNTER, null,
                                                   new AtomicLong());
                            // create workQueues array with size a power of two
                            int p = config & SMASK; // ensure at least 2 slots
                            int n = (p > 1) ? p - 1 : 1;
                            n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                            n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                            workQueues = new WorkQueue[n];
                            ns = STARTED;
                        }
                    } finally {
                        unlockRunState(rs, (rs & ~RSLOCK) | ns);
                    }
                }
                else if ((q = ws[k = r & m & SQMASK]) != null) {
                    if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                        ForkJoinTask<?>[] a = q.array;
                        int s = q.top;
                        boolean submitted = false; // initial submission or resizing
                        try {                      // locked version of push
                            if ((a != null && a.length > s + 1 - q.base) ||
                                (a = q.growArray()) != null) {
                                int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                                U.putOrderedObject(a, j, task);
                                U.putOrderedInt(q, QTOP, s + 1);
                                submitted = true;
                            }
                        } finally {
                            U.compareAndSwapInt(q, QLOCK, 1, 0);
                        }
                        if (submitted) {
                            signalWork(ws, q);
                            return;
                        }
                    }
                    move = true;                   // move on failure
                }
                else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                    q = new WorkQueue(this, null);
                    q.hint = r;
                    q.config = k | SHARED_QUEUE;
                    q.scanState = INACTIVE;
                    rs = lockRunState();           // publish index
                    if (rs > 0 &&  (ws = workQueues) != null &&
                        k < ws.length && ws[k] == null)
                        ws[k] = q;                 // else terminated
                    unlockRunState(rs, rs & ~RSLOCK);
                }
                else
                    move = true;                   // move if busy
                if (move)
                    r = ThreadLocalRandom.advanceProbe(r);
            }
        }

    創建工人(線程)

    提交任務后,通過signalWork(ws, q)方法,發送工作信號,當符合沒有執行完畢,且沒有出現異常的條件下,循環執行任務,根據控制變量嘗試添加工人(線程),通過線程工廠,生成線程,并且啟動線程,也控制著工人(線程)的下崗。

        final void signalWork(WorkQueue[] ws, WorkQueue q) {
            long c; int sp, i; WorkQueue v; Thread p;
            while ((c = ctl) < 0L) {                       // too few active
                if ((sp = (int)c) == 0) {                  // no idle workers
                    if ((c & ADD_WORKER) != 0L)            // too few workers
                        tryAddWorker(c);
                    break;
                }
                if (ws == null)                            // unstarted/terminated
                    break;
                if (ws.length <= (i = sp & SMASK))         // terminated
                    break;
                if ((v = ws[i]) == null)                   // terminating
                    break;
                int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
                int d = sp - v.scanState;                  // screen CAS
                long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
                if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                    v.scanState = vs;                      // activate v
                    if ((p = v.parker) != null)
                        U.unpark(p);
                    break;
                }
                if (q != null && q.base == q.top)          // no more work
                    break;
            }
        }
    
        private void tryAddWorker(long c) {
            boolean add = false;
            do {
                long nc = ((AC_MASK & (c + AC_UNIT)) |
                           (TC_MASK & (c + TC_UNIT)));
                if (ctl == c) {
                    int rs, stop;                 // check if terminating
                    if ((stop = (rs = lockRunState()) & STOP) == 0)
                        add = U.compareAndSwapLong(this, CTL, c, nc);
                    unlockRunState(rs, rs & ~RSLOCK);
                    if (stop != 0)
                        break;
                    if (add) {
                        createWorker();
                        break;
                    }
                }
            } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
        }
    
        private boolean createWorker() {
            ForkJoinWorkerThreadFactory fac = factory;
            Throwable ex = null;
            ForkJoinWorkerThread wt = null;
            try {
                if (fac != null && (wt = fac.newThread(this)) != null) {
                    wt.start();
                    return true;
                }
            } catch (Throwable rex) {
                ex = rex;
            }
            deregisterWorker(wt, ex);
            return false;
        }
    
       final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
            WorkQueue w = null;
            if (wt != null && (w = wt.workQueue) != null) {
                WorkQueue[] ws;                           // remove index from array
                int idx = w.config & SMASK;
                int rs = lockRunState();
                if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
                    ws[idx] = null;
                unlockRunState(rs, rs & ~RSLOCK);
            }
            long c;                                       // decrement counts
            do {} while (!U.compareAndSwapLong
                         (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                               (TC_MASK & (c - TC_UNIT)) |
                                               (SP_MASK & c))));
            if (w != null) {
                w.qlock = -1;                             // ensure set
                w.transferStealCount(this);
                w.cancelAll();                            // cancel remaining tasks
            }
            for (;;) {                                    // possibly replace
                WorkQueue[] ws; int m, sp;
                if (tryTerminate(false, false) || w == null || w.array == null ||
                    (runState & STOP) != 0 || (ws = workQueues) == null ||
                    (m = ws.length - 1) < 0)              // already terminating
                    break;
                if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
                    if (tryRelease(c, ws[sp & m], AC_UNIT))
                        break;
                }
                else if (ex != null && (c & ADD_WORKER) != 0L) {
                    tryAddWorker(c);                      // create replacement
                    break;
                }
                else                                      // don't need replacement
                    break;
            }
            if (ex == null)                               // help clean on way out
                ForkJoinTask.helpExpungeStaleExceptions();
            else                                          // rethrow
                ForkJoinTask.rethrow(ex);
        }
    
        public static interface ForkJoinWorkerThreadFactory {
            public ForkJoinWorkerThread newThread(ForkJoinPool pool);
        }
        static final class DefaultForkJoinWorkerThreadFactory
            implements ForkJoinWorkerThreadFactory {
            public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
                return new ForkJoinWorkerThread(pool);
            }
        }
        protected ForkJoinWorkerThread(ForkJoinPool pool) {
            // Use a placeholder until a useful name can be set in registerWorker
            super("aForkJoinWorkerThread");
            this.pool = pool;
            this.workQueue = pool.registerWorker(this);
        }
    
        final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
            UncaughtExceptionHandler handler;
            wt.setDaemon(true);                           // configure thread
            if ((handler = ueh) != null)
                wt.setUncaughtExceptionHandler(handler);
            WorkQueue w = new WorkQueue(this, wt);
            int i = 0;                                    // assign a pool index
            int mode = config & MODE_MASK;
            int rs = lockRunState();
            try {
                WorkQueue[] ws; int n;                    // skip if no array
                if ((ws = workQueues) != null && (n = ws.length) > 0) {
                    int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                    int m = n - 1;
                    i = ((s << 1) | 1) & m;               // odd-numbered indices
                    if (ws[i] != null) {                  // collision
                        int probes = 0;                   // step by approx half n
                        int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                        while (ws[i = (i + step) & m] != null) {
                            if (++probes >= n) {
                                workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                                m = n - 1;
                                probes = 0;
                            }
                        }
                    }
                    w.hint = s;                           // use as random seed
                    w.config = i | mode;
                    w.scanState = i;                      // publication fence
                    ws[i] = w;
                }
            } finally {
                unlockRunState(rs, rs & ~RSLOCK);
            }
            wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
            return w;
        }

    例:ForkJoinTask實現歸并排序

    這里我們就用經典的歸并排序為例,構建一個我們自己的ForkJoinTask,按照歸并排序的思路,重寫其核心的compute()方法,通過ForkJoinPool.submit(task)提交任務,通過get()同步獲取任務執行結果。

    package com.zhj.interview;
    
    import java.util.*;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveTask;
    
    public class Test16 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            int[] bigArr = new int[10000000];
            for (int i = 0; i < 10000000; i++) {
                bigArr[i] = (int) (Math.random() * 10000000);
            }
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            MyForkJoinTask task = new MyForkJoinTask(bigArr);
            long start = System.currentTimeMillis();
            forkJoinPool.submit(task).get();
            long end = System.currentTimeMillis();
            System.out.println("耗時:" + (end-start));
    	}
    
    }
    class MyForkJoinTask extends RecursiveTask<int[]> {
    
        private int source[];
    
        public MyForkJoinTask(int source[]) {
            if (source == null) {
                throw new RuntimeException("參數有誤!!!");
            }
            this.source = source;
        }
    
        @Override
        protected int[] compute() {
            int l = source.length;
            if (l < 2) {
                return Arrays.copyOf(source, l);
            }
            if (l == 2) {
                if (source[0] > source[1]) {
                    int[] tar = new int[2];
                    tar[0] = source[1];
                    tar[1] = source[0];
                    return tar;
                } else {
                    return Arrays.copyOf(source, l);
                }
            }
            if (l > 2) {
                int mid = l / 2;
                MyForkJoinTask task1 = new MyForkJoinTask(Arrays.copyOf(source, mid));
                task1.fork();
                MyForkJoinTask task2 = new MyForkJoinTask(Arrays.copyOfRange(source, mid, l));
                task2.fork();
                int[] res1 = task1.join();
                int[] res2 = task2.join();
                int tar[] = merge(res1, res2);
                return tar;
            }
            return null;
        }
    	// 合并數組
        private int[] merge(int[] res1, int[] res2) {
            int l1 = res1.length;
            int l2 = res2.length;
            int l = l1 + l2;
            int tar[] = new int[l];
            for (int i = 0, i1 = 0, i2 = 0; i < l; i++) {
                int v1 = i1 >= l1 ? Integer.MAX_VALUE : res1[i1];
                int v2 = i2 >= l2 ? Integer.MAX_VALUE : res2[i2];
                // 如果條件成立,說明應該取數組array1中的值
                if(v1 < v2) {
                    tar[i] = v1;
                    i1++;
                } else {
                    tar[i] = v2;
                    i2++;
                }
            }
            return tar;
        }
    }

    ForkJoin計算流程

    通過ForkJoinPool提交任務,獲取結果流程如下,拆分子任務不一定是二分的形式,可參照MapReduce的模式,也可以按照具體需求進行靈活的設計。

    Java中的ForkJoin是什么及怎么調用

    到此,相信大家對“Java中的ForkJoin是什么及怎么調用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    屯留县| 西平县| 哈尔滨市| 咸丰县| 巴东县| 安阳县| 余江县| 滨海县| 天峻县| 团风县| 新疆| 商河县| 定兴县| 萨嘎县| 原平市| 宝鸡市| 繁昌县| 岳阳市| 乌恰县| 疏勒县| 武清区| 绥化市| 永嘉县| 阳山县| 扎囊县| 巴青县| 资中县| 庆城县| 双桥区| 咸丰县| 临城县| 嘉峪关市| 加查县| 沂南县| 托克托县| 彭阳县| 沈丘县| 临洮县| 平邑县| 兴文县| 大厂|