您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關使用CompletableFuture怎么實現并發編程,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
實例化 首先,不管我們要做什么,我們第一步是需要構造出 CompletableFuture 實例。
最簡單的,我們可以通過構造函數來進行實例化:
CompletableFuture<String> cf = new CompletableFuture<String>(); 這個實例此時還沒有什么用,因為它沒有實際的任務,我們選擇結束這個任務:
// 可以選擇在當前線程結束,也可以在其他線程結束 cf.complete("coding..."); 因為 CompletableFuture 是一個 Future,我們用 String result = cf.get() 就能獲取到結果了。
CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一樣的,都是阻塞獲取值,它們的區別在于 join() 拋出的是 unchecked Exception。
上面的代碼確實沒什么用,下面介紹幾個 static 方法,它們使用任務來實例化一個 CompletableFuture 實例。
CompletableFuture.runAsync(Runnable runnable); CompletableFuture.runAsync(Runnable runnable, Executor executor);
CompletableFuture.supplyAsync(Supplier<U> supplier); CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) runAsync 方法接收的是 Runnable 的實例,意味著它沒有返回值 supplyAsync 方法對應的是有返回值的情況 這兩個方法的帶 executor 的變種,表示讓任務在指定的線程池中執行,不指定的話,通常任務是在 ForkJoinPool.commonPool() 線程池中執行的。 好的,現在我們已經有了第一個 CompletableFuture 實例了,我們來看接下來的內容。
任務之間的順序執行 我們先來看執行兩個任務的情況,首先執行任務 A,然后將任務 A 的結果傳遞給任務 B。
其實這里有很多種情況,任務 A 是否有返回值,任務 B 是否需要任務 A 的返回值,任務 B 是否有返回值,等等。有個明確的就是,肯定是任務 A 執行完后再執行任務 B。
我們用下面的 6 行代碼來說:
CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB"); 前面 3 行代碼演示的是,任務 A 無返回值,所以對應的,第 2 行和第 3 行代碼中,resultA 其實是 null。
第 4 行用的是 thenRun(Runnable runnable),任務 A 執行完執行 B,并且 B 不需要 A 的結果。
第 5 行用的是 thenAccept(Consumer action),任務 A 執行完執行 B,B 需要 A 的結果,但是任務 B 不返回值。
第 6 行用的是 thenApply(Function fn),任務 A 執行完執行 B,B 需要 A 的結果,同時任務 B 有返回值。
這一小節說完了,如果任務 B 后面還有任務 C,往下繼續調用 .thenXxx() 即可。
異常處理 說到這里,我們順便來說下 CompletableFuture 的異常處理。這里我們要介紹兩個方法:
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn); public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); 看下面的代碼:
CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD"); 上面的代碼中,任務 A、B、C、D 依次執行,如果任務 A 拋出異常(當然上面的代碼不會拋出異常),那么后面的任務都得不到執行。如果任務 C 拋出異常,那么任務 D 得不到執行。
那么我們怎么處理異常呢?看下面的代碼,我們在任務 A 中拋出異常,并對其進行處理:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { throw new RuntimeException(); }) .exceptionally(ex -> "errorResultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD");
System.out.println(future.join()); 上面的代碼中,任務 A 拋出異常,然后通過 .exceptionally() 方法處理了異常,并返回新的結果,這個新的結果將傳遞給任務 B。所以最終的輸出結果是:
errorResultA resultB resultC resultD 再看下面的代碼,我們來看下另一種處理方式,使用 handle(BiFunction fn) 來處理異常:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") // 任務 C 拋出異常 .thenApply(resultB -> {throw new RuntimeException();}) // 處理任務 C 的返回值或異常 .handle(new BiFunction<Object, Throwable, Object>() { @Override public Object apply(Object re, Throwable throwable) { if (throwable != null) { return "errorResultC"; } return re; } }) .thenApply(resultC -> resultC + " resultD");
System.out.println(future.join()); 上面的代碼使用了 handle 方法來處理任務 C 的執行結果,上面的代碼中,re 和 throwable 必然有一個是 null,它們分別代表正常的執行結果和異常的情況。
當然,它們也可以都為 null,因為如果它作用的那個 CompletableFuture 實例沒有返回值的時候,re 就是 null。
取兩個任務的結果 上面一節,我們說的是,任務 A 執行完 -> 任務 B 執行完 -> 執行任務 C,它們之間有先后執行關系,因為后面的任務依賴于前面的任務的結果。
這節我們來看怎么讓任務 A 和任務 B 同時執行,然后取它們的結果進行后續操作。這里強調的是任務之間的并行工作,沒有先后執行順序。
如果使用 Future 的話,我們通常是這么寫的:
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> futureA = executorService.submit(() -> "resultA"); Future<String> futureB = executorService.submit(() -> "resultB");
String resultA = futureA.get(); String resultB = futureB.get(); 接下來,我們看看 CompletableFuture 中是怎么寫的,看下面的幾行代碼:
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");
cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {}); cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B"); cfA.runAfterBoth(cfB, () -> {}); 第 3 行代碼和第 4 行代碼演示了怎么使用兩個任務的結果 resultA 和 resultB,它們的區別在于,thenAcceptBoth 表示后續的處理不需要返回值,而 thenCombine 表示需要返回值。
如果你不需要 resultA 和 resultB,那么還可以使用第 5 行描述的 runAfterBoth 方法。
注意,上面的寫法和下面的寫法是沒有區別的:
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
cfA.thenAcceptBoth(CompletableFuture.supplyAsync(() -> "resultB"), (resultA, resultB) -> {}); 千萬不要以為這種寫法任務 A 執行完了以后再執行任務 B。
取多個任務的結果 接下來,我們將介紹兩個非常簡單的靜態方法:allOf() 和 anyOf() 方法。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...} public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {...} 這兩個方法都非常簡單,簡單介紹一下。
CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123); CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC");
CompletableFuture<Void> future = CompletableFuture.allOf(cfA, cfB, cfC); // 所以這里的 join() 將阻塞,直到所有的任務執行結束 future.join(); 由于 allOf 聚合了多個 CompletableFuture 實例,所以它是沒有返回值的。這也是它的一個缺點。
anyOf 也非常容易理解,就是只要有任意一個 CompletableFuture 實例執行完成就可以了,看下面的例子:
CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123); CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC");
CompletableFuture<Object> future = CompletableFuture.anyOf(cfA, cfB, cfC); Object result = future.join(); 最后一行的 join() 方法會返回最先完成的任務的結果,所以它的泛型用的是 Object,因為每個任務可能返回的類型不同。
either 方法 如果你的 anyOf(...) 只需要處理兩個 CompletableFuture 實例,那么也可以使用 xxxEither() 來處理,
cfA.acceptEither(cfB, result -> {}); cfA.acceptEitherAsync(cfB, result -> {}); cfA.acceptEitherAsync(cfB, result -> {}, executorService);
cfA.applyToEither(cfB, result -> {return result;}); cfA.applyToEitherAsync(cfB, result -> {return result;}); cfA.applyToEitherAsync(cfB, result -> {return result;}, executorService);
cfA.runAfterEither(cfA, () -> {}); cfA.runAfterEitherAsync(cfB, () -> {}); cfA.runAfterEitherAsync(cfB, () -> {}, executorService); 上面的各個帶 either 的方法,表達的都是一個意思,指的是兩個任務中的其中一個執行完成,就執行指定的操作。它們幾組的區別也很明顯,分別用于表達是否需要任務 A 和任務 B 的執行結果,是否需要返回值。
大家可能會對這里的幾個變種有盲區,這里順便說幾句。
1、cfA.acceptEither(cfB, result -> {}); 和 cfB.acceptEither(cfA, result -> {}); 是一個意思;
2、第二個變種,加了 Async 后綴的方法,代表將需要執行的任務放到 ForkJoinPool.commonPool() 中執行(非完全嚴謹);第三個變種很好理解,將任務放到指定線程池中執行;
3、難道第一個變種是同步的?不是的,而是說,它由任務 A 或任務 B 所在的執行線程來執行,取決于哪個任務先結束。
compose update on 2019-07-26
這里我們簡單來說說 CompletableFuture 的最后一塊拼圖,compose 方法。
前面我們介紹了 thenAcceptBoth 和 thenCombine 用于聚合兩個任務,其實 compose 也是一樣的功能,它們本質上都是為了讓多個 CompletableFuture 實例形成一個鏈。
我們還是用代碼來說吧:
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> { System.out.println("processing a..."); return "hello"; });
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> { System.out.println("processing b..."); return " world"; });
CompletableFuture<String> cfC = CompletableFuture.supplyAsync(() -> { System.out.println("processing c..."); return ", I'm robot!"; }); 我們示例三個實例的情況,這邊不介紹 thenAcceptBoth 了,我們來看下 thenCombine:
cfA.thenCombine(cfB, (resultA, resultB) -> { System.out.println(resultA + resultB); // hello world return resultA + resultB; }).thenCombine(cfC, (resultAB, resultC) -> { System.out.println(resultAB + resultC); // hello world, I'm robot! return resultAB + resultC; }); 我們先有 cfA,然后和 cfB 組成一個鏈:cfA -> cfB,然后又組合了 cfC,形成鏈:cfA -> cfB -> cfC。
這里有個隱藏的點:cfA、cfB、cfC 它們完全沒有數據依賴關系,我們只不過是聚合了它們的結果。
這下看 compose 就清楚了:
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> { // 第一個實例的結果 return "hello"; }).thenCompose(resultA -> CompletableFuture.supplyAsync(() -> { // 把上一個實例的結果傳遞到這里 return resultA + " world"; })).thenCompose(resultAB -> CompletableFuture.supplyAsync(() -> { // 到這里大家應該很清楚了 return resultAB + ", I'm robot"; }));
System.out.println(result.join()); // hello world, I'm robot 前面一個 CompletableFuture 實例的結果可以傳遞到下一個實例中,這就是 compose 和 combine 的主要區別。
combine 是把結果進行聚合,但是 compose 更像是把多個已有的 cf 實例組合成一個整體的實例。
thenCompose 和 thenApply 的區別 評論區有同學關注到了 thenApply 和 thenCompose,這里簡單說說。
我們來看看它們的方法貼到一起對比一下:
public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null, fn); } 使用示例:
CompletableFuture<String> future1 = CompletableFuture .supplyAsync(() -> "hello") .thenApply(cfA -> cfA + " world");
CompletableFuture<String> future2 = CompletableFuture .supplyAsync(() -> "hello") .thenCompose(cfA -> CompletableFuture.supplyAsync(() -> cfA + " world")); 它們都需要接收一個 Function,這個函數的主要的區別在于 thenApply 中返回一個具體的值,而 thenCompose 返回一個新的 cf 實例。
thenApply 類似于 map 操作,把 cf 實例的結果加工成另一個值,像 Stream 里面的 map() 方法。它還有一個很重要的特征,這里是同步的操作。
如果你希望執行一個異步的 map 操作,那么就應該使用 thenCompose 了,比如上面的第二個例子。
我們來繼續較真一下,我們可以讓 thenApply 的 Function 也返回 CompletableFuture 實例,不就實現了異步的需求:
CompletableFuture<CompletableFuture<String>> future = CompletableFuture .supplyAsync(() -> "hello") .thenApply(cfA -> CompletableFuture.supplyAsync(() -> cfA + " world")); 可是,返回值我們可就不太喜歡了。說到這里,大家可能會想到 Stream 里面的 flatMap() 了
看完上述內容,你們對使用CompletableFuture怎么實現并發編程有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。