您好,登錄后才能下訂單哦!
這篇文章主要介紹了Reactive反應式編程是什么及如何使用的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Reactive反應式編程是什么及如何使用文章都會有所收獲,下面我們一起來看看吧。
Reactor是Reactive Programming范例的一個實現,可以概括為:
反應式編程是一種涉及數據流和變化傳播的異步編程范例。這意味著可以通過所采用的編程語言輕松地表達靜態(例如陣列)或動態(例如事件發射器)數據流。
作為反應式編程方向的第一步,Microsoft在.NET生態系統中創建了Reactive Extensions(Rx)庫。然后RxJava在JVM上實現了響應式編程。隨著時間的推移,通過Reactive Streams工作出現了Java的標準化 ,這一規范定義了JVM上的反應庫的一組接口和交互規則。它的接口已經集成到父Flow類下的Java 9中。
反應式編程范例通常以面向對象的語言呈現,作為Observer設計模式的擴展。人們還可以將主要的反應流模式與熟悉的迭代器設計模式進行比較,因為在所有這些庫中對Iterable- Iterator對存在雙重性 。一個主要的區別是,雖然迭代器是基于拉的,但是反應流是基于推的。
使用迭代器是一種命令式編程模式,即使訪問值的方法完全由其負責Iterable。實際上,開發人員可以選擇何時訪問next()序列中的項目。在反應流中,相當于上述對Publisher-Subscriber。但是, 當它們出現時,Publisher它會通知訂閱者新的可用值,而這一推動方面是被動反應的關鍵。此外,應用于推送值的操作以聲明方式而非命令方式表示:程序員表達計算的邏輯而不是描述其精確的控制流。
除了推送值之外,還以明確定義的方式涵蓋錯誤處理和完成方面。A Publisher可以將新值推送到Subscriber(通過調用onNext),但也可以發出錯誤(通過調用onError)或完成(通過調用onComplete)。錯誤和完成都會終止序列。這可以概括為:
onNext x 0..N [onError | onComplete]
這種方法非常靈活。該模式支持沒有值,一個值或n值的用例(包括無限的值序列,例如時鐘的連續滴答)。
但是我們首先考慮一下,為什么我們首先需要這樣的異步反應庫?
現代應用程序可以覆蓋大量并發用戶,即使現代硬件的功能不斷提高,現代軟件的性能仍然是一個關鍵問題。
人們可以通過兩種方式來提高計劃的績效:
并行化:使用更多線程和更多硬件資源。
在現有資源的使用方式上尋求更高的效率。
通常,Java開發人員使用阻塞代碼編寫程序。這種做法很好,直到出現性能瓶頸,此時需要引入額外的線程,運行類似的阻塞代碼。但是,資源利用率的這種擴展會很快引入爭用和并發問題。
更糟糕的是,阻止浪費資源。如果仔細觀察,一旦程序涉及一些延遲(特別是I / O,例如數據庫請求或網絡調用),資源就會被浪費,因為線程(或許多線程)現在處于空閑狀態,等待數據。
所以并行化方法不是靈丹妙藥。為了獲得硬件的全部功能是必要的,但是理由也很復雜并且易受資源浪費的影響。
第二種方法(前面提到過),尋求更高的效率,可以解決資源浪費問題。通過編寫異步,非阻塞代碼,您可以使用相同的底層資源將執行切換到另一個活動任務,然后在異步處理完成后返回到當前進程。
但是如何在JVM上生成異步代碼?Java提供了兩種異步編程模型:
回調:異步方法沒有返回值,但需要額外的 callback參數(lambda或匿名類),在結果可用時調用它們。一個眾所周知的例子是Swing的EventListener層次結構。
期貨:異步方法Future立即返回。異步進程計算一個T值,但該Future對象包含對它的訪問。該值不會立即可用,并且可以輪詢對象,直到該值可用。例如,ExecutorService運行Callable任務使用Future對象。
這些技術是否足夠好?不適用于所有用例,兩種方法都有局限性。
回調難以組合在一起,很快導致難以閱讀和維護的代碼(稱為“Callback Hell”)。
考慮一個示例:在用戶界面上顯示用戶的前五個收藏夾,或者如果她沒有收藏夾則提出建議。這通過三個服務(一個提供喜歡的ID,第二個提取喜歡的詳細信息,第三個提供詳細建議):
userService.getFavorites(userId, new Callback() { public void onSuccess(Listlist) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback() { public void onSuccess(Listlist) { UiUtils.submitOnUiThread(() -> { list.stream() .limit(5) .forEach(uiList::show); }); } public void onError(Throwable error) { UiUtils.errorPopup(error); } }); } else { list.stream() .limit(5) .forEach(favId -> favoriteService.getDetails(favId, new Callback() { public void onSuccess(Favorite details) { UiUtils.submitOnUiThread(() -> uiList.show(details)); } public void onError(Throwable error) { UiUtils.errorPopup(error); } } )); } } public void onError(Throwable error) { UiUtils.errorPopup(error); } });
我們有基于回調的服務:一個Callback接口,其中包含在異步過程成功時調用的方法,以及在發生錯誤時調用的方法。
第一個服務使用喜歡的ID列表調用其回調。
如果列表為空,我們必須去suggestionService。
在suggestionService給出了一個List到第二個回調。
由于我們處理UI,我們需要確保我們的消費代碼將在UI線程中運行。
我們使用Java 8 Stream將處理的建議數限制為五個,并在UI中的圖形列表中顯示它們。
在每個級別,我們以相同的方式處理錯誤:在彈出窗口中顯示它們。
回到最喜歡的ID級別。如果服務返回完整列表,那么我們需要轉到favoriteService獲取詳細Favorite對象。由于我們只需要五個,我們首先流式傳輸ID列表,將其限制為五個。
再一次,一個回調。這次我們得到一個完全成熟的Favorite對象,我們將其推送到UI線程內的UI。
這是很多代碼,它有點難以遵循并且具有重復的部分。考慮它在Reactor中的等價物:
userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
我們從最喜歡的ID流開始。
我們將它們異步轉換為詳細的Favorite對象(flatMap)。我們現在有一個流動Favorite。
如果流量Favorite是空的,我們會切換到后退 suggestionService。
我們最多只對最終流程中的五個元素感興趣。
最后,我們想要處理UI線程中的每個數據。
我們通過描述如何處理數據的最終形式(在UI列表中顯示)以及在出現錯誤(顯示彈出窗口)時該怎么做來觸發流程。
如果您想確保在不到800毫秒內檢索到喜歡的ID,或者如果需要更長時間從緩存中獲取它們,該怎么辦?在基于回調的代碼中,這是一項復雜的任務。在Reactor中,它變得像timeout在鏈中添加運算符一樣簡單:
userService.getFavorites(userId) .timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup);
如果上面的部分發出的時間超過800毫秒,則傳播錯誤。
如果出現錯誤,請回復cacheService。
鏈的其余部分與前面的示例類似。
盡管Java 8中帶來了改進,但期貨比回調要好一些,但它們在構圖方面仍然表現不佳CompletableFuture。一起編排多個未來是可行但不容易的。此外,Future還有其他問題:Future通過調用get() 方法很容易結束對象的另一個阻塞情況,它們不支持延遲計算,并且它們不支持多個值和高級錯誤處理。
考慮另一個例子:我們得到一個ID列表,我們要從中獲取一個名稱和一個統計信息,然后將它們成對地組合在一起,所有這些都是異步的。
CompletableFutureids = ifhIds(); CompletableFutureresult = ids.thenComposeAsync(l -> { Streamzip = l.stream().map(i -> { CompletableFuturenameTask = ifhName(i); CompletableFuturestatTask = ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); ListcombinationList = zip.collect(Collectors.toList()); CompletableFuture[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]); CompletableFutureallDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combinationList.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); }); Listresults = result.join(); assertThat(results).contains( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121");
我們從一個未來開始,它為我們提供了一個id要處理的值列表。
一旦得到列表,我們想要開始一些更深入的異步處理。
對于列表中的每個元素:
異步獲取關聯的名稱。
異步獲取相關任務。
結合兩個結果。
我們現在有一個代表所有組合任務的期貨清單。為了執行這些任務,我們需要將列表轉換為數組。
將數組傳遞給CompletableFuture.allOf,輸出Future完成所有任務后完成的數組。
棘手的一點是allOf返回CompletableFuture,所以我們重申了期貨清單,通過收集結果join() (這里沒有阻止,因為allOf確保期貨全部完成)。
一旦觸發了整個異步管道,我們就等待它被處理并返回我們可以斷言的結果列表。
由于Reactor具有更多開箱即用的組合運算符,因此可以簡化此過程:
Fluxids = ifhrIds(); Fluxcombinations = ids.flatMap(id -> { MononameTask = ifhrName(id); MonostatTask = ifhrStat(id); return nameTask.zipWith(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); Monoresult = combinations.collectList(); Listresults = result.block(); assertThat(results).containsExactly( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121" );
這一次,我們從異步提供的ids(a Flux)序列開始。
對于序列中的每個元素,我們異步處理它(在body函數內部flatMap)兩次。
獲取相關名稱。
獲取相關統計信息。
異步組合2個值。
在將值List變為可用時將值聚合為a 。
在生產中,我們將繼續Flux通過進一步組合或訂閱它來異步處理。最有可能的是,我們會回歸result Mono。由于我們在測試中,我們阻塞,等待處理完成,然后直接返回聚合的值列表。
斷言結果。
Callback和Future的這些風險是相似的,并且是反應式編程與該Publisher-Subscriber對的關系。
諸如Reactor之類的反應庫旨在解決JVM上“經典”異步方法的這些缺點,同時還關注一些其他方面:
可組合性和可讀性
數據作為一個用豐富的運算符詞匯表操縱的流程
在您訂閱之前沒有任何事情發生
背壓或消費者向生產者發出信號表明排放率過高的能力
高級但高價值的抽象,與并發無關
通過可組合性,我們指的是編排多個異步任務的能力,使用先前任務的結果將輸入提供給后續任務或以fork-join方式執行多個任務,以及將異步任務重用為更高級別系統中的分立組件。
編排任務的能力與代碼的可讀性和可維護性緊密相關。隨著異步過程層數量和復雜性的增加,能夠編寫和讀取代碼變得越來越困難。正如我們所看到的,回調模型很簡單,但其主要缺點之一是,對于復雜的進程,您需要從回調執行回調,本身嵌套在另一個回調中,依此類推。那個混亂被稱為Callback Hell。正如你可以猜到的(或者從經驗中得知),這樣的代碼很難回歸并推理。
Reactor提供了豐富的組合選項,其中代碼反映了抽象過程的組織,并且所有內容通常都保持在同一級別(嵌套最小化)。
您可以將響應式應用程序處理的數據視為在裝配線中移動。反應器既是傳送帶又是工作站。原材料從原料(原始Publisher)中倒出,最終成為成品,準備推送給消費者(或Subscriber)。
原材料可以經歷各種轉換和其他中間步驟,或者是將中間件聚集在一起的較大裝配線的一部分。如果在某一點出現毛刺或堵塞(也許裝箱產品需要不成比例的長時間),受影響的工作站可向上游發出信號以限制原材料的流動。
在Reactor中,運算符是我們的匯編類比中的工作站。每個操作符都將行為添加到a Publisher并將上一步驟包裝Publisher到新實例中。因此,整個鏈被鏈接,使得數據源自第一Publisher鏈并且向下移動鏈,由每個鏈轉換。最終,Subscriber完成了整個過程。請記住,在Subscriber訂閱a 之前沒有任何事情發生Publisher,下面就會提到。
了解操作員創建新實例可以幫助您避免一個常見錯誤,該錯誤會導致您認為您的鏈中使用的操作員未被應用。看到這個項目的常見問題。
雖然Reactive Streams規范根本沒有指定運算符,但Reactor等反應庫的最佳附加值之一是它們提供的豐富的運算符。這些涉及很多方面,從簡單的轉換和過濾到復雜的編排和錯誤處理。
在Reactor中,當您編寫Publisher鏈時,默認情況下數據不會啟動。相反,您可以創建異步過程的抽象描述(這可以幫助重用和組合)。
通過訂閱行為,您將Publishera 綁定到a Subscriber,從而觸發整個鏈中的數據流。這是通過上游傳播的單個request 信號在內部實現的Subscriber,一直傳回源 Publisher。
上游傳播信號也用于實現背壓,我們在裝配線中將其描述為當工作站比上游工作站處理速度慢時向線路發送的反饋信號。
Reactive Streams規范定義的真實機制非常接近于類比:訂閱者可以在無限制模式下工作,讓源以最快的速度推送所有數據,或者可以使用該request機制向源發送信號表明它已準備就緒處理最多的n元素。
中間操作員也可以在途中更改請求。想象一個buffer 運算符,它將元素分組為10個。如果訂閱者請求1個緩沖區,則源可以生成10個元素。一些操作員還實施 預取策略,這避免了request(1)往返,并且如果在請求之前生成元素并不太昂貴,則是有益的。
這將推模型轉換為推拉式混合動力,如果它們隨時可用,下游可以從上游拉出n個元素。但是如果元素沒有準備好,它們就會在生成時被上游推動。
在反應庫的Rx家族中,人們可以區分兩大類反應序列:熱和冷。這種區別主要與反應流如何對訂閱的用戶做出反應有關:
冷序列的含義是不論訂閱者在何時訂閱該序列,總是能收到序列中產生的全部消息。
而與之對應的熱序列,則是在持續不斷地產生消息,訂閱者只能獲取到在其訂閱之后產生的消息。
關于“Reactive反應式編程是什么及如何使用”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Reactive反應式編程是什么及如何使用”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。