中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java8?CompletableFuture異步多線程怎么實現

發布時間:2023-04-08 16:48:58 來源:億速云 閱讀:189 作者:iii 欄目:開發技術

這篇文章主要介紹了Java8 CompletableFuture異步多線程怎么實現的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Java8 CompletableFuture異步多線程怎么實現文章都會有所收獲,下面我們一起來看看吧。

1、一個示例回顧Future

一些業務場景我們需要使用多線程異步執行任務,加快任務執行速度。

JDK5新增了Future接口,用于描述一個異步計算的結果。

雖然 Future 以及相關使用方法提供了異步執行任務的能力,但是對于結果的獲取卻是很不方便,我們必須使用Future.get()的方式阻塞調用線程,或者使用輪詢方式判斷 Future.isDone 任務是否結束,再獲取結果。

這兩種處理方式都不是很優雅,相關代碼如下:

    @Test
    public void testFuture() throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Future<String> future = executorService.submit(() -> {
            Thread.sleep(2000);
            return "hello";
        });
        System.out.println(future.get());
        System.out.println("end");
    }

與此同時,Future無法解決多個異步任務需要相互依賴的場景,簡單點說就是,主線程需要等待子線程任務執行完畢之后在進行執行,這個時候你可能想到了「CountDownLatch」,沒錯確實可以解決,代碼如下。

這里定義兩個Future,第一個通過用戶id獲取用戶信息,第二個通過商品id獲取商品信息。

    @Test
    public void testCountDownLatch() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        CountDownLatch downLatch = new CountDownLatch(2);
        long startTime = System.currentTimeMillis();
        Future<String> userFuture = executorService.submit(() -> {
            //模擬查詢商品耗時500毫秒
            Thread.sleep(500);
            downLatch.countDown();
            return "用戶A";
        });
 
        Future<String> goodsFuture = executorService.submit(() -> {
            //模擬查詢商品耗時500毫秒
            Thread.sleep(400);
            downLatch.countDown();
            return "商品A";
        });
 
        downLatch.await();
        //模擬主程序耗時時間
        Thread.sleep(600);
        System.out.println("獲取用戶信息:" + userFuture.get());
        System.out.println("獲取商品信息:" + goodsFuture.get());
        System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms");
 
    }

「運行結果」

獲取用戶信息:用戶A
獲取商品信息:商品A
總共用時1110ms

從運行結果可以看出結果都已經獲取,而且如果我們不用異步操作,執行時間應該是:500+400+600 = 1500,用異步操作后實際只用1110。

但是Java8以后我不在認為這是一種優雅的解決方式,接下來來了解下CompletableFuture的使用。

2、通過CompletableFuture實現上面示例

    @Test
    public void testCompletableInfo() throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
 
        //調用用戶服務獲取用戶基本信息
        CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->
                //模擬查詢商品耗時500毫秒
        {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "用戶A";
        });
 
        //調用商品服務獲取商品基本信息
        CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->
                //模擬查詢商品耗時500毫秒
        {
            try {
                Thread.sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "商品A";
        });
 
        System.out.println("獲取用戶信息:" + userFuture.get());
        System.out.println("獲取商品信息:" + goodsFuture.get());
 
        //模擬主程序耗時時間
        Thread.sleep(600);
        System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms");
    }

運行結果

獲取用戶信息:用戶A
獲取商品信息:商品A
總共用時1112ms

通過CompletableFuture可以很輕松的實現CountDownLatch的功能,你以為這就結束了,遠遠不止,CompletableFuture比這要強多了。

比如可以實現:任務1執行完了再執行任務2,甚至任務1執行的結果,作為任務2的入參數等等強大功能,下面就來學學CompletableFuture的API。

3、CompletableFuture創建方式

3.1、常用的4種創建方式

CompletableFuture源碼中有四個靜態方法用來執行異步任務

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
public static CompletableFuture<Void> runAsync(Runnable runnable){..}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}

一般我們用上面的靜態方法來創建CompletableFuture,這里也解釋下他們的區別:

  • 「supplyAsync」執行任務,支持返回值。

  • 「runAsync」執行任務,沒有返回值。

3.1.1、「supplyAsync方法」

//使用默認內置線程池ForkJoinPool.commonPool(),根據supplier構建執行任務
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//自定義線程,根據supplier構建執行任務
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

3.1.2、「runAsync方法」

//使用默認內置線程池ForkJoinPool.commonPool(),根據runnable構建執行任務
public static CompletableFuture<Void> runAsync(Runnable runnable) 
//自定義線程,根據runnable構建執行任務
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)

3.2、結果獲取的4種方式

對于結果的獲取CompltableFuture類提供了四種方式

//方式一
public T get()
//方式二
public T get(long timeout, TimeUnit unit)
//方式三
public T getNow(T valueIfAbsent)
//方式四
public T join()

說明:

  • 「get()和get(long timeout, TimeUnit unit)」 => 在Future中就已經提供了,后者提供超時處理,如果在指定時間內未獲取結果將拋出超時異常

  • 「getNow」 => 立即獲取結果不阻塞,結果計算已完成將返回結果或計算過程中的異常,如果未計算完成將返回設定的valueIfAbsent值

  • 「join」 => 方法里不會拋出異常

示例

    @Test
    public void testCompletableGet() throws InterruptedException, ExecutionException {
 
        CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "商品A";
        });
 
        // getNow方法測試 
        System.out.println(cp1.getNow("商品B"));
 
        //join方法測試 
        CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((() -> 1 / 0));
        System.out.println(cp2.join());
        System.out.println("-----------------------------------------------------");
        //get方法測試
        CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((() -> 1 / 0));
        System.out.println(cp3.get());
    }

「運行結果」:

  • 第一個執行結果為 「商品B」,因為要先睡上1秒結果不能立即獲取

  • join方法獲取結果方法里不會拋異常,但是執行結果會拋異常,拋出的異常為CompletionException

  • get方法獲取結果方法里將拋出異常,執行結果拋出的異常為ExecutionException

4、異步回調方法

Java8?CompletableFuture異步多線程怎么實現

4.1、thenRun/thenRunAsync

通俗點講就是,「做完第一個任務后,再做第二個任務,第二個任務也沒有返回值」

示例

    @Test
    public void testCompletableThenRunAsync() throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
 
        CompletableFuture<Void> cp1 = CompletableFuture.runAsync(() -> {
            try {
                //執行任務A
                Thread.sleep(600);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
 
        });
 
        CompletableFuture<Void> cp2 = cp1.thenRun(() -> {
            try {
                //執行任務B
                Thread.sleep(400);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
 
        // get方法測試
        System.out.println(cp2.get());
 
        //模擬主程序耗時時間
        Thread.sleep(600);
        System.out.println("總共用時" + (System.currentTimeMillis() - startTime) + "ms");
    }
 
    //運行結果
    /**
     *  null
     *  總共用時1610ms
     */

「thenRun 和thenRunAsync有什么區別呢?」

如果你執行第一個任務的時候,傳入了一個自定義線程池:

  • 調用thenRun方法執行第二個任務時,則第二個任務和第一個任務是共用同一個線程池。

  • 調用thenRunAsync執行第二個任務時,則第一個任務使用的是你自己傳入的線程池,第二個任務使用的是ForkJoin線程池。

說明: 后面介紹的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它們之間的區別也是這個。

4.2、thenAccept/thenAcceptAsync

第一個任務執行完成后,執行第二個回調方法任務,會將該任務的執行結果,作為入參,傳遞到回調方法中,但是回調方法是沒有返回值的。

示例

    @Test
    public void testCompletableThenAccept() throws ExecutionException, InterruptedException {
        long startTime = System.currentTimeMillis();
        CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
            return "dev";
 
        });
        CompletableFuture<Void> cp2 = cp1.thenAccept((a) -> {
            System.out.println("上一個任務的返回結果為: " + a);
        });
 
        cp2.get();
    }

4.3、 thenApply/thenApplyAsync

表示第一個任務執行完成后,執行第二個回調方法任務,會將該任務的執行結果,作為入參,傳遞到回調方法中,并且回調方法是有返回值的。

示例

    @Test
    public void testCompletableThenApply() throws ExecutionException, InterruptedException {
        CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
            return "dev";
 
        }).thenApply((a) -> {
            if (Objects.equals(a, "dev")) {
                return "dev";
            }
            return "prod";
        });
 
        System.out.println("當前環境為:" + cp1.get());
 
        //輸出: 當前環境為:dev
    }

5、異常回調

當CompletableFuture的任務不論是正常完成還是出現異常它都會調用「whenComplete」這回調函數。

  • 「正常完成」:whenComplete返回結果和上級任務一致,異常為null;

  • 「出現異常」:whenComplete返回結果為null,異常為上級任務的異常;

即調用get()時,正常完成時就獲取到結果,出現異常時就會拋出異常,需要你處理該異常。

下面來看看示例

5.1、只用whenComplete

    @Test
    public void testCompletableWhenComplete() throws ExecutionException, InterruptedException {
        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
 
            if (Math.random() < 0.5) {
                throw new RuntimeException("出錯了");
            }
            System.out.println("正常結束");
            return 0.11;
 
        }).whenComplete((aDouble, throwable) -> {
            if (aDouble == null) {
                System.out.println("whenComplete aDouble is null");
            } else {
                System.out.println("whenComplete aDouble is " + aDouble);
            }
            if (throwable == null) {
                System.out.println("whenComplete throwable is null");
            } else {
                System.out.println("whenComplete throwable is " + throwable.getMessage());
            }
        });
        System.out.println("最終返回的結果 = " + future.get());
    }

正常完成,沒有異常時:

正常結束
whenComplete aDouble is 0.11
whenComplete throwable is null
最終返回的結果 = 0.11

出現異常時:get()會拋出異常

whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出錯了
 
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 出錯了
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)

5.2、whenComplete + exceptionally示例

    @Test
    public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {
        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("出錯了");
            }
            System.out.println("正常結束");
            return 0.11;
 
        }).whenComplete((aDouble, throwable) -> {
            if (aDouble == null) {
                System.out.println("whenComplete aDouble is null");
            } else {
                System.out.println("whenComplete aDouble is " + aDouble);
            }
            if (throwable == null) {
                System.out.println("whenComplete throwable is null");
            } else {
                System.out.println("whenComplete throwable is " + throwable.getMessage());
            }
        }).exceptionally((throwable) -> {
            System.out.println("exceptionally中異常:" + throwable.getMessage());
            return 0.0;
        });
 
        System.out.println("最終返回的結果 = " + future.get());
    }

當出現異常時,exceptionally中會捕獲該異常,給出默認返回值0.0。

whenComplete aDouble is null
whenComplete throwable is java.lang.RuntimeException: 出錯了
exceptionally中異常:java.lang.RuntimeException: 出錯了
最終返回的結果 = 0.0

6、多任務組合回調

Java8?CompletableFuture異步多線程怎么實現

6.1、AND組合關系

thenCombine / thenAcceptBoth / runAfterBoth都表示:「當任務一和任務二都完成再執行任務三」

區別在于:

  • 「runAfterBoth」 不會把執行結果當做方法入參,且沒有返回值

  • 「thenAcceptBoth」: 會將兩個任務的執行結果作為方法入參,傳遞到指定方法中,且無返回值

  • 「thenCombine」:會將兩個任務的執行結果作為方法入參,傳遞到指定方法中,且有返回值

示例

    @Test
    public void testCompletableThenCombine() throws ExecutionException, InterruptedException {
        //創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //開啟異步任務1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("異步任務1,當前線程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("異步任務1結束");
            return result;
        }, executorService);
 
        //開啟異步任務2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("異步任務2,當前線程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("異步任務2結束");
            return result;
        }, executorService);
 
        //任務組合
        CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> {
            System.out.println("執行任務3,當前線程是:" + Thread.currentThread().getId());
            System.out.println("任務1返回值:" + f1);
            System.out.println("任務2返回值:" + f2);
            return f1 + f2;
        }, executorService);
 
        Integer res = task3.get();
        System.out.println("最終結果:" + res);
    }

「運行結果」

異步任務1,當前線程是:17
異步任務1結束
異步任務2,當前線程是:18
異步任務2結束
執行任務3,當前線程是:19
任務1返回值:2
任務2返回值:2
最終結果:4

6.2、OR組合關系

applyToEither / acceptEither / runAfterEither 都表示:「兩個任務,只要有一個任務完成,就執行任務三」

區別在于:

  • 「runAfterEither」:不會把執行結果當做方法入參,且沒有返回值

  • 「acceptEither」: 會將已經執行完成的任務,作為方法入參,傳遞到指定方法中,且無返回值

  • 「applyToEither」:會將已經執行完成的任務,作為方法入參,傳遞到指定方法中,且有返回值

示例

    @Test
    public void testCompletableEitherAsync() {
        //創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //開啟異步任務1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("異步任務1,當前線程是:" + Thread.currentThread().getId());
 
            int result = 1 + 1;
            System.out.println("異步任務1結束");
            return result;
        }, executorService);
 
        //開啟異步任務2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("異步任務2,當前線程是:" + Thread.currentThread().getId());
            int result = 1 + 2;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("異步任務2結束");
            return result;
        }, executorService);
 
        //任務組合
        task.acceptEitherAsync(task2, (res) -> {
            System.out.println("執行任務3,當前線程是:" + Thread.currentThread().getId());
            System.out.println("上一個任務的結果為:" + res);
        }, executorService);
    }

運行結果

//通過結果可以看出,異步任務2都沒有執行結束,任務3獲取的也是1的執行結果
異步任務1,當前線程是:17
異步任務1結束
異步任務2,當前線程是:18
執行任務3,當前線程是:19
上一個任務的結果為:2

注意

如果把上面的核心線程數改為1也就是

 ExecutorService executorService = Executors.newFixedThreadPool(1);

運行結果就是下面的了,會發現根本沒有執行任務3,顯然是任務3直接被丟棄了。

異步任務1,當前線程是:17
異步任務1結束
異步任務2,當前線程是:17

6.3、多任務組合

  • 「allOf」:等待所有任務完成

  • 「anyOf」:只要有一個任務完成

示例

allOf:等待所有任務完成

    @Test
    public void testCompletableAallOf() throws ExecutionException, InterruptedException {
        //創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //開啟異步任務1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("異步任務1,當前線程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("異步任務1結束");
            return result;
        }, executorService);
 
        //開啟異步任務2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("異步任務2,當前線程是:" + Thread.currentThread().getId());
            int result = 1 + 2;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("異步任務2結束");
            return result;
        }, executorService);
 
        //開啟異步任務3
        CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("異步任務3,當前線程是:" + Thread.currentThread().getId());
            int result = 1 + 3;
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("異步任務3結束");
            return result;
        }, executorService);
 
        //任務組合
        CompletableFuture<Void> allOf = CompletableFuture.allOf(task, task2, task3);
 
        //等待所有任務完成
        allOf.get();
        //獲取任務的返回結果
        System.out.println("task結果為:" + task.get());
        System.out.println("task2結果為:" + task2.get());
        System.out.println("task3結果為:" + task3.get());
    }

anyOf: 只要有一個任務完成

    @Test
    public void testCompletableAnyOf() throws ExecutionException, InterruptedException {
        //創建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //開啟異步任務1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            int result = 1 + 1;
            return result;
        }, executorService);
 
        //開啟異步任務2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            int result = 1 + 2;
            return result;
        }, executorService);
 
        //開啟異步任務3
        CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
            int result = 1 + 3;
            return result;
        }, executorService);
 
        //任務組合
        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(task, task2, task3);
        //只要有一個有任務完成
        Object o = anyOf.get();
        System.out.println("完成的任務的結果:" + o);
    }

7、CompletableFuture使用有哪些注意點

Java8?CompletableFuture異步多線程怎么實現

 CompletableFuture 使我們的異步編程更加便利的、代碼更加優雅的同時,我們也要關注下它,使用的一些注意點。

7.1、Future需要獲取返回值,才能獲取異常信息

    @Test
    public void testWhenCompleteExceptionally() {
        CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
            if (1 == 1) {
                throw new RuntimeException("出錯了");
            }
            return 0.11;
        });
 
        //如果不加 get()方法這一行,看不到異常信息
        //future.get();
    }

Future需要獲取返回值,才能獲取到異常信息。如果不加 get()/join()方法,看不到異常信息。

小伙伴們使用的時候,注意一下哈,考慮是否加try...catch...或者使用exceptionally方法。

7.2、CompletableFuture的get()方法是阻塞的

CompletableFuture的get()方法是阻塞的,如果使用它來獲取異步調用的返回值,需要添加超時時間。

//反例
 CompletableFuture.get();
//正例
CompletableFuture.get(5, TimeUnit.SECONDS);

7.3、不建議使用默認線程池

CompletableFuture代碼中又使用了默認的「ForkJoin線程池」,處理的線程個數是電腦「CPU核數-1」。在大量請求過來的時候,處理邏輯復雜的話,響應會很慢。一般建議使用自定義線程池,優化線程池配置參數。

7.4、自定義線程池時,注意飽和策略

CompletableFuture的get()方法是阻塞的,我們一般建議使用future.get(5, TimeUnit.SECONDS)。并且一般建議使用自定義線程池。

但是如果線程池拒絕策略是DiscardPolicy或者DiscardOldestPolicy,當線程池飽和時,會直接丟棄任務,不會拋棄異常。因此建議,CompletableFuture線程池策略最好使用AbortPolicy,然后耗時的異步線程,做好線程池隔離哈。

關于“Java8 CompletableFuture異步多線程怎么實現”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Java8 CompletableFuture異步多線程怎么實現”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

日喀则市| 广宗县| 内黄县| 巴林左旗| 嘉兴市| 万荣县| 曲周县| 莫力| 内黄县| 加查县| 文化| 石首市| 石河子市| 黄梅县| 新津县| 贺兰县| 农安县| 泰顺县| 江西省| 建阳市| 广丰县| 鹤壁市| 芦溪县| 克东县| 嘉兴市| 鸡泽县| 大姚县| 五华县| 福鼎市| 麦盖提县| 云安县| 淮北市| 永靖县| 建阳市| 陇南市| 余姚市| 易门县| 扎赉特旗| 桃园市| 永嘉县| 福建省|