您好,登錄后才能下訂單哦!
本篇內容主要講解“怎么處理java異步事件的阻塞和非阻塞”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“怎么處理java異步事件的阻塞和非阻塞”吧!
前言
由于多核系統普遍存在,并發性編程的應用無疑比以往任何時候都要廣泛。但并發性很難正確實現,用戶需要借助新工具來使用它。很多基于 JVM 的語言都屬于這類開發工具,Scala 在這一領域尤為活躍。本系列文章將介紹一些針對 Java 和 Scala 語言的較新的并發性編程方法。
在任何并發性應用程序中,異步事件處理都至關重要。事件來源可能是不同的計算任務、I/O 操作或與外部系統的交互。無論來源是什么,應用程序代碼都必須跟蹤事件,協調為響應事件而采取的操作。
Java 應用程序可采用兩種基本的異步事件處理方法:該應用程序有一個協調線程等待事件,然后采取操作,或者事件可在完成時直接執行某項操作(通常采取執行應用程序所提供的代碼的方式)。讓線程等待事件的方法被稱為阻塞 方法。讓事件執行操作、線程無需顯式等待事件的方法被稱為非阻塞 方法。
在計算中,根據具體上下文,阻塞 和非阻塞 這兩個詞的使用通常會有所不同。舉例而言,共享數據結構的非阻塞算法不需要線程等待訪問數據結構。在非阻塞 I/O 中,應用程序線程可以啟動一個 I/O 操作,然后離開執行其他事情,同時該操作會異步地執行。在本文中,非阻塞 指的是在無需等待線程的情況下完成某個執行操作的事件。這些用法中的一個共同概念是,阻塞操作需要一個線程來等待某個結果,而非阻塞操作不需要。
合成事件
等待事件的完成很簡單:您有一個線程等待該事件,線程恢復運行時,您就可以知道該事件已經完成。如果您的線程在此期間有其他事要做,它會做完這些事再等待。該線程甚至可以使用輪詢方法,通過該方法中斷它的其他活動,從而檢查事件是否已完成。但基本原理是相同的:需要事件的結果時,您會讓線程停靠 (park),以便等待事件完成。
阻塞很容易完成且相對簡單,只要您有一個等待事件完成的單一主線程。使用多個因為彼此等待而阻塞的線程時,可能遇到一些問題,比如:
死鎖:兩個或更多線程分別控制其他線程繼續執行所需的資源。 饑餓 (Starvation):一些線程可能無法繼續執行,因為其他線程貪婪地消耗著共享資源。 活鎖:線程嘗試針對彼此而調整,但最終沒有進展。
非阻塞方法為創造力留出的空間要多得多。回調是非阻塞事件處理的一種常見技術。回調是靈活性的象征,因為您可以在發生事件時執行任何想要的代碼。回調的缺點是,在使用回調處理許多事件時,您的代碼會變得凌亂。而且回調特別難調試,因為控制流與應用程序中的代碼順序不匹配。
Java 8 CompletableFuture 同時支持阻塞和非阻塞的事件處理方法,包括常規回調。CompletableFuture 也提供了多種合成和組合事件的方式,實現了回調的靈活性以及干凈、簡單、可讀的代碼。在本節中,您將看到處理由 CompletableFuture 表示的事件的阻塞和非阻塞方法的示例。
任務和排序
應用程序在一個特定操作中通常必須執行多個處理步驟。例如,在向用戶返回結果之前,Web 應用程序可能需要:
1.在一個數據庫中查找用戶的信息2.使用查找到的信息來執行 Web 服務調用,并執行另一次數據庫查詢。3.基于來自上一步的結果而執行數據庫更新。
圖 1 演示了這種結構類型。
圖 1. 應用程序任務流
圖 1 將處理過程分解為 4 個不同的任務,它們通過表示順序依賴關系的箭頭相連接。任務 1 可直接執行,任務 2 和任務 3 都在任務 1 完成后執行,任務 4 在任務 2 和任務 3 都完成后執行。這是我在本文中用于演示異步事件處理的任務結構。真實應用程序(尤其是具有多個移動部分的服務器應用程序)可能要復雜得多,但這個簡單的示例僅用于演示所涉及的原理。
建模異步事件
在真實系統中,異步事件的來源一般是并行計算或某種形式的 I/O 操作。但是,使用簡單的時間延遲來建模這種系統會更容易,這也是本文所采用的方法。清單 1 顯示了我用于生成事件的基本的賦時事件 (timed-event) 代碼,這些事件采用了 CompletableFuture 格式。
清單 1. 賦時事件代碼
import java.util.Timer;import java.util.TimerTask;import java.util.concurrent.CompletableFuture;public class TimedEventSupport {private static final Timer timer = new Timer();/*** Build a future to return the value after a delay.** @param delay* @param value* @return future*/public static <T> CompletableFuture<T> delayedSuccess(int delay, T value) {CompletableFuture<T> future = new CompletableFuture<T>();TimerTask task = new TimerTask() {public void run() {future.complete(value);}};timer.schedule(task, delay * 1000);return future;}/*** Build a future to return a throwable after a delay.* * @param delay* @param t* @return future*/public static <T> CompletableFuture<T> delayedFailure(int delay, Throwable t) {CompletableFuture<T> future = new CompletableFuture<T>();TimerTask task = new TimerTask() {public void run() {future.completeExceptionally(t);}};timer.schedule(task, delay * 1000);return future;}}
為什么不采用 lambda?
清單 1 中的 TimerTask 被實現為一個匿名內部類,僅包含一個 run() 方法。您可能認為這里可以使用 lambda 代替內部類。但是,lambda 僅能用作接口的實例,而 TimerTask 被定義為一種抽象類。除非 lambda 特性的 future 擴展添加了對抽象類的支持(有可能,但由于設計問題,未必行得通),或者為 TimerTask 等情形定義了并行接口,否則您必須繼續使用 Java 內部類創建單一方法實現。
清單 1 的代碼使用一個 java.util.Timer 來計劃 java.util.TimerTask 在一定的延遲后執行。每個 TimerTask 在運行時完成一個有關聯的 future。delayedSuccess() 計劃一個任務來成功完成一個 CompletableFuture<T> 并將 future 返回調用方。delayedFailure() 計劃了一個任務來完成一個 CompletableFuture<T> 并拋出異常,然后將 future 返回給調用方。
清單 2 展示了如何使用 清單 1 中的代碼創建 CompletableFuture<Integer> 形式的事件,這些事件與 圖 1 中的 4 個任務相匹配。(此代碼來自示例代碼中的 EventComposition 類。)
清單 2. 示例任務的事件
// task definitionsprivate static CompletableFuture<Integer> task1(int input) {return TimedEventSupport.delayedSuccess(1, input + 1);}private static CompletableFuture<Integer> task2(int input) {return TimedEventSupport.delayedSuccess(2, input + 2);}private static CompletableFuture<Integer> task3(int input) {return TimedEventSupport.delayedSuccess(3, input + 3);}private static CompletableFuture<Integer> task4(int input) {return TimedEventSupport.delayedSuccess(1, input + 4);}
清單 2 中 4 個任務方法中的每一個都為該任務的完成時刻使用了特定的延遲值:task1 為 1 秒,task2 為 2 秒,task3 為 3 秒,task4 重新變為 1 秒。每個任務還接受一個輸入值,是該輸入加上任務編號作為 future 的(最終)結果值。這些方法都使用了 future 的成功形式;稍后我們將會查看一些使用失敗形式的例子。
這些任務要求您按 圖 1 中所示的順序運行它們,向每個任務傳遞上一個任務返回的結果值(或者對于 task4,傳遞前兩個任務結果的和)。如果中間兩個任務是同時執行的,那么總執行時間大約為 5 秒(1 秒 + (2 秒、3 秒中的最大值)+ 1 秒。
如果 task1 的輸入為 1,那么結果為 2。如果該結果傳遞給 task2 和 task3,結果將為 4 和 5。如果這兩個結果的和 (9) 作為輸入傳遞給 task4,最終結果將為 13。
阻塞等待
在設置了執行環境后,是時候設置一些操作了。協調 4 個任務的執行的最簡單方式是使用阻塞等待:主要線程等待每個任務完成。清單 3(同樣來自示例代碼中的 EventComposition 類)給出了此方法。
清單 3. 阻塞等待任務執行
private static CompletableFuture<Integer> runBlocking() {Integer i1 = task1(1).join();CompletableFuture<Integer> future2 = task2(i1);CompletableFuture<Integer> future3 = task3(i1);Integer result = task4(future2.join() + future3.join()).join();return CompletableFuture.completedFuture(result);}
清單 3 使用 CompletableFuture 的 join() 方法來完成阻塞等待。join() 等待任務完成,然后,如果成功完成任務,則返回結果值,或者如果失敗或被取消,則拋出一個未經檢查的異常。該代碼首先等待 task1 的結果,然后同時啟動 task2 和 task3,并等待兩個任務依次返回 future,最后等待 task4 的結果。runBlocking() 返回一個 CompletableFuture,以便與我接下來將展示的非阻塞形式保持一致,但在本例中,future 實際上將在該方法返回之前完成。
合成和組合 future
清單 4(同樣來自示例代碼中的 EventComposition 類)展示了如何將 future 連接在一起,以便按正確順序并使用正確的依賴關系執行任務,而不使用阻塞。
清單 4. 非阻塞的合成和組合
private static CompletableFuture<Integer> runNonblocking() {return task1(1).thenCompose(i1 -> ((CompletableFuture<Integer>)task2(i1).thenCombine(task3(i1), (i2,i3) -> i2+i3))).thenCompose(i4 -> task4(i4));}
清單 4 中的代碼基本上構造了一個執行計劃,指定不同的任務如何執行和它們彼此有何關聯。此代碼精美而簡潔,但是,如果您不熟悉 CompletableFuture 方法,或許難以理解該代碼。清單 5 通過將 task2 和 task3 部分分離到一個新方法 runTask2and3 中,將同樣的代碼重構為更容易理解的形式。
清單 5. 重構后的非阻塞的合成和組合
private static CompletableFuture<Integer> runTask2and3(Integer i1) {CompletableFuture<Integer> task2 = task2(i1);CompletableFuture<Integer> task3 = task3(i1);BiFunction<Integer, Integer, Integer> sum = (a, b) -> a + b;return task2.thenCombine(task3, sum);}private static CompletableFuture<Integer> runNonblockingAlt() {CompletableFuture<Integer> task1 = task1(1);CompletableFuture<Integer> comp123 = task1.thenCompose(EventComposition::runTask2and3);return comp123.thenCompose(EventComposition::task4); }
在 清單 5 中,runTask2and3() 方法表示任務流的中間部分,其中 task2 和 task3 同時執行,然后將它們的結果值組合在一起。此順序是使用一個 future 上的 thenCombine() 方法來編碼的,該方法接受另一個 future 作為它的第一個參數,接受一個二進制函數實例(其輸入類型與 future 的結果類型匹配)作為其第二個參數。thenCombine() 返回了第三個 future,表示應用到最初的兩個 future 的結果上的函數的值。在本例中,兩個 future 是 task2 和 task3,該函數將結果值求和。
runNonblockingAlt() 方法使用在一個 future 上調用了 thenCompose() 方法兩次。thenCompose() 的參數是一個函數實例,它接收原始 future 的值類型作為輸入,返回另一個 future 作為輸出。thenCompose() 的結果是第三個 future,具有與該函數相同的結果類型。這個 future 用作在原始 future 完成后,該函數最終將返回的 future 的占位符。
對 task1.thenCompose() 的調用將會返回一個 future,表示對 task1 的結果應用 runTask2and3() 函數的結果,該結果被保存為 comp123。對 comp123.thenCompose() 的調用返回一個 future,表示對第一個 henCompose() 的結果應用 task4() 函數的結果,這是執行所有任務的總體結果。
試用示例
示例代碼包含一個 main() 方法,以便依次運行事件代碼的每個版本,并顯示完成事件(約 5 秒)和結果 (13) 是正確的。清單 6 顯示了從一個控制臺運行這個 main() 方法的結果。
清單 6. 運行 main() 方法
dennis@linux-guk3:~/devworks/scala3/code/bin> java com.sosnoski.concur.article3.EventCompositionStarting runBlockingrunBlocking returned 13 in 5008 ms.Starting runNonblockingrunNonblocking returned 13 in 5002 ms.Starting runNonblockingAltrunNonblockingAlt returned 13 in 5001 ms.
不順利的道路
目前為止,您看到了以 future 形式協調事件的代碼,這些代碼總是能夠成功完成。在真實應用程序中,不能寄希望于事情總是這么順利。處理任務過程中將發生問題,而且在 Java 術語中,這些問題通常表示為 Throwable。
更改 清單 2 中的任務定義很容易,只需使用 delayedFailure() 代替 delayedSuccess() 方法即可,如這里的 task4 所示:
private static CompletableFuture<Integer> task4(int input) {return TimedEventSupport.delayedFailure(1, new IllegalArgumentException("This won't work!"));}
如果運行 清單 3 并且僅將 task4 修改為完成時拋出異常,那么您會得到 task4 上的 join() 調用所拋出的預期的 IllegalArgumentException。如果在 runBlocking() 方法中沒有捕獲該問題,該異常會在調用鏈中一直傳遞,最終如果仍未捕獲問題,則會終止執行線程。幸運的是,修改該代碼很容易,因此,如果在任何任務完成時拋出異常,該異常會通過返回的 future 傳遞給調用方來處理。清單 7 展示了這一更改。
清單 7. 具有異常的阻塞等待
private static CompletableFuture<Integer> runBlocking() {try {Integer i1 = task1(1).join();CompletableFuture<Integer> future2 = task2(i1);CompletableFuture<Integer> future3 = task3(i1);Integer result = task4(future2.join() + future3.join()).join();return CompletableFuture.completedFuture(result);} catch (CompletionException e) {CompletableFuture<Integer> result = new CompletableFuture<Integer>();result.completeExceptionally(e.getCause());return result;}}
清單 7 非常淺顯易懂。最初的代碼包裝在一個 try/catch 中,catch 在返回的 future 完成時傳回異常。此方法稍微增加了一點復雜性,但任何 Java 開發人員應該仍然很容易理解它。
清單 4 中的非阻塞代碼甚至不需要添加 try/catch。CompletableFuture 合成和組合操作負責自動為您傳遞異常,以便依賴的 future 也會在完成時拋出異常。
阻塞還是不阻塞
您已經查看了由 CompletableFuture 表示的事件的阻塞和非阻塞處理方法。至少對于本文中建模的基本的任務流,兩種方法都非常簡單。對于更復雜的任務流,該代碼也會更加復雜。
在阻塞情況下,增加的復雜性不是大問題,您仍然只需要等待事件完成。如果要在線程之間執行其他類型的同步,則會遇到線程饑餓問題,甚至是死鎖問題。
在非阻塞情況下,事件的完成所觸發的代碼執行很難調試。在執行許多類型的事件且事件之間存在許多交互時,跟蹤哪個事件觸發了哪次執行就會變得很難。這種情形基本上就是回調的噩夢,無論是使用傳統的回調還是 CompletableFuture 組合和合成操作。
總而言之,阻塞代碼通常具有簡單性優勢。那么為什么有人希望使用非阻塞方法?本節將給出一些重要的理由。
切換的成本
一個線程阻塞時,以前執行該線程的處理器核心會轉而執行另一個線程。以前執行的線程的執行狀態必須保存到內存中,并加載新線程的狀態。這種將核心從運行一個線程切換到運行另一個線程的操作稱為上下文切換。
除了直接的上下文切換性能成本,新線程一般會使用來自前一個線程的不同數據。內存訪問比處理器時鐘慢得多,所以現代系統會在處理器核心與主要內存之間使用多層緩存。盡管比主要內存快得多,但緩存的容量也小得多(一般而言,緩存越快,容量越小),所以任何時刻只能在緩存中保存總內存的小部分。
發生線程切換且一個核心開始執行一個新線程時,新線程需要的內存數據可能不在緩存中,所以該核心必須等待該數據從主要內存加載。
組合的上下文切換和內存訪問延遲,會導致直接的顯著性能成本。圖 2 顯示了我使用 Oracle 的 Java 8 for 64-bit Linux 的四核 AMD 系統上的線程切換開銷。
此測試使用了可變數量的線程,數量從 1 到 4,096 按 2 的冪次變化,每個線程的內存塊大小也是可變的,介于 0 到 64KB 之間。線程依次執行,使用 CompletableFuture 來觸發線程執行。每次一個線程執行時,它首先使用針對線程的數據返回一個簡單計算結果,以顯示將此數據加載到緩存中的開銷,然后增加一個共享的靜態變量。
最后創建一個新 CompletableFuture 實例來觸發它的下一次執行,然后通過完成該線程等待的 CompletableFuture 來啟動序列中的下一個線程。最后,如果需要再次執行它,那么該線程會等待新創建的 CompletableFuture 完成。
圖 2. 線程切換成本
可以在圖 2 的圖表中看到線程數量和每個線程的內存的影響。線程數量為 4 個時影響最大,只要特定于線程的數據足夠小,兩個線程的運行速度幾乎與單個線程一樣快。線程數量超過 4 個后,對性能的影響相對較小。每個線程的內存量越大,兩層緩存就會越快溢出,導致切換成本增高。
圖 2 中所示的時間值來自我的有點過時的主要系統。您系統上相應的時間將不同,可能會小得多。但是,曲線的形狀應大體相同。
圖 2 顯示了一次線程切換的微秒級開銷,所以即使線程切換的成本達到數萬個處理器時鐘,但絕對數字并不大。對于中等數量的線程,16KB 數據具有 12.5 微秒的切換時間(圖表中的黃線),系統每秒可執行 80,000 次線程切換。與您在任何精心編寫的單用戶應用程序以及甚至許多服務器應用程序中看到的結果相比,這一線程切換次數可能多得多。但對于每秒處理數千個事件的高性能服務器應用程序,阻塞的開銷可能成為影響性能的主要因素。對于這種應用程序,盡可能使用非阻塞代碼來最大限度減少線程切換非常重要。
認識到這些時間數據來自最理想的場景也很重要。運行線程切換程序時,會運行足夠的 CPU 活動來讓所有核心全速運行(至少在我的系統上是這樣)。在真實應用程序中,處理負載可能具有更大的突發性。在活動量低的時間,現代處理器將一些核心過渡到休眠狀態,以減少總功耗和產生的熱量。這個降低功耗的過程的惟一問題是,在需求增多時,它需要時間來將核心從休眠狀態喚醒。從深度休眠狀態過渡到全速運行所需的時間可能達到微秒級別,而不是在這個線程切換時間示例中看到的毫秒級。
反應式應用程序
對于許多應用程序,不在特定線程上阻塞的另一個原因是,這些線程用于處理需要及時響應的事件。經典的例子就是 UI 線程。如果在 UI 線程中執行會阻塞來等待異步事件完成的代碼,那么您會延遲用戶輸入事件的處理。沒有人喜歡等待應用程序響應他們的鍵入、單擊或觸控操作,所以 UI 線程中的阻塞可能很快在來自用戶的錯誤報告中反映出來。
UI 線程概念以一種更一般性的原則作為支撐。許多類型的應用程序,甚至非 GUI 應用程序,也必須迅速響應事件,而且在許多情況下,保持較短的響應事件至關重要。對于這些類型的應用程序,阻塞等待不是可接受的選擇。
反應式編程 這個名稱表示為響應靈敏且可擴展的應用程序采用的編程風格。反應式編程的核心原則是應用程序應能夠:
對事件做出反應:應用程序應是事件驅動的,在由異步通信所鏈接的每個級別上具有松散耦合的組件。 對負載做出反應:應用程序應該是可擴展的,以便可以輕松地升級應用程序來處理增加的需求。 對故障做出反應:應用程序應具有恢復能力,能將故障的影響局部化并迅速更正。 對用戶做出反應:應用程序應能迅速響應用戶,甚至在具有負載和存在故障的情況下。
使用阻塞式事件處理方法的應用程序無法滿足這些原則。線程是有限的資源,所以在阻塞等待中占用它們會限制可伸縮性,還會增加延遲(應用程序響應時間),因為阻塞的線程無法立即響應事件。非阻塞應用程序可更快地響應事件,降低成本,同時減少線程切換開銷并改善吞吐量。
反應式編程比非阻塞代碼的復雜得多。反應式編程涉及到關注您應用程序中的數據流并將這些數據流實現為異步交互,而不會讓接收方負擔過重或讓發送方積滯。這種對數據流的關注,有助于避免傳統的并發編程的許多復雜性。
到此,相信大家對“怎么處理java異步事件的阻塞和非阻塞”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。