您好,登錄后才能下訂單哦!
如何掌握響應式編程并入門Reactor,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
同步阻塞
“你知道什么是同步阻塞嗎”,當然知道了。“那你怎么看它呢”,這個。。。
在同步阻塞的世界里,代碼執行到哪里,數據就跟到哪里。如果數據很慢跟不上來,代碼就停在那里等待數據的到來,然后再帶著數據一起往下執行。
可以說是,代碼執行和數據是結伴而行,不離不棄。執子之手與子偕老。讓人老感動了。
如果還不太理解的話,可以認為代碼執行其實就是一些行為動作,這些行為動作的目的就是為了獲取/操作數據。
例如加法,這里的行為動作就是執行相加,數據就是加數和被加數。操作結果就是得到了另一個數據,即兩個數的和。
只是在這個加法里,數據跑的特別快,(CPU的寄存器,能不快嗎),我們幾乎覺察不到執行動作在等數據的過程。怎么辦呢,那就看一個能把它們拉開的例子。
那自然非數據庫查詢莫屬了,既有網絡I/O,又有磁盤I/O,肯定會慢一些。
假設我的業務是這樣的,代碼先去數據庫查詢一個用戶,接著修改用戶的密碼,然后再更新回數據庫,最后代碼返回成功。
如果網速和數據庫都很慢的話,可能是這樣的。代碼執行一個查詢數據庫動作,然后等啊等啊等,等的花都謝了,終于數據庫把用戶返回過來了,接著,代碼飛快的修改了密碼,并執行一個更新數據庫的動作,然后又是等啊等啊等,等的花又開了,數據庫終于回話了,更新成功。然后代碼返回成功,全部執行完了。
所以同步阻塞代碼的最大特點就是,帶著數據上路,數據不到位就阻塞住。
最后來個小小的升華:
所謂同步就是快的等慢的,然后一起往前走,表示的是目的。
所謂阻塞就是想辦法讓快的停滯不前,等待慢的到來,表示的是手段。
一言以蔽之,同步是目的,阻塞是手段。
異步非阻塞
“你知道什么是異步非阻塞嗎”,當然知道了,不過我不知道該怎么看它。“哦,恭喜你都會搶答了。。。”。
我們生活在異步的世界,卻是最不懂異步的人。
你去飯店吃飯,服務員把你的菜單寫好,交給廚房后就去服務別人了。
廚房把飯做好后,通過按鈴通知服務員,服務員再把飯送到你的位置上。
服務員是主(或I/O)線程,把任務交給廚房這個工作線程去執行,廚房接到任務的同時還要記住送來該任務的服務員,然后廚房去執行任務,服務員也去忙別的了。
廚房執行完任務后,對當時的那個服務員進行通知,服務員接到通知后,再去執行接下來的內容,如把飯送到客人餐桌。
這是一個非常常見的異步場景,由于其中一方不愿意等待(或時時刻刻關注)另一方,但又不知道對方什么時候能做完,所以只能寄希望于對方做完的時候告訴自己一聲,然后自己再進行后續的工作。
這就是我們常說的異步回調(或通知)。
早上項目經理開完會,給大家分好任務,并把測試用例代碼也給了大家,說誰做完了跑一邊測試用例,通了就可以了。然后就散會,各自忙去了。
下午5點你做完了,開始跑測試用例,很幸運,一次性全部通過。你的任務就算完成了,接下來就可以干自己想干的事情,比如看“編程新說”公眾號。
項目經理是主(或I/O)線程,把任務交給各個開發人員這些工作線程,并給每個人一段邏輯代碼,告訴他們在自己的任務完成后再執行這一段邏輯代碼。
開發人員完成任務后,接著執行邏輯代碼,執行完邏輯代碼后,就算已經結束了。不再需要告知項目經理一聲。
這也是一個常見的異步場景,一方給另一方安排好任務后,再給它一段邏輯代碼,接著彼此就分道揚鑣。之后的日子里,你走你的陽關道,我過我的獨木橋,井水不犯河水,老死不相往來。
這段邏輯代碼通常是由一個Runnable接口傳入,且是在任務完成時執行,就暫且稱它為的“完成執行”吧。
所以異步非阻塞代碼的最大特點就是,我給你分配任務,你完事給我回復,咱倆互相不耽誤。
最后來個小小的升華:
所謂異步就是你走你的,我走我的,大家各自往前走,表示的是一種事實形態。
所謂非阻塞就是快的快走,慢的慢走,一刻都不為你停留,表示的是一種直觀現象。
一言以蔽之,異步是形態,非阻塞是現象。
異步非阻塞它本身并沒有什么明顯的可圈可點的特征,注意我說的是它“本身”。因為我們整個世界都是按照異步非阻塞模式在運行。
上廁所的時候玩手機,等車的時候玩手機,上班的時候玩手機,等飯的時候玩手機,回家以后玩手機,睡覺做夢玩手機。第二天還是這樣的。哈哈。一個人就沒有被阻塞住的時候。
不可否認,我們生活的社會又很復雜,主要是因為人和人之間的溝通、交流和協調有時并非一件容易之事。
同理,異步非阻塞“本身”并不難,難就難在怎么實現它。畢竟讓一群聽不懂人話的二貨線程們互相溝通協調更非一件易事。
響應式
所謂響應式就是外界發生了變化,你要做出反應。所以響應式編程就是圍繞著變化來構建的。
如何收集到原始變化,如何把這個變化告知相關處理者,處理者如何做出反應,做出反應的過程其實就是引發了新的變化,這個新的變化又該如何被收集,又該如何告知下一個處理者,如此往復,直至全部結束。
可以說整個自然界都是響應式的,因為它們都會對外界的變化或自身的變化產生反應。
先說人類,冷的時候加衣,餓的時候吃飯,病的時候去醫院。看到綠色放松,看到藍色鎮定,看到紅色易激動。
再說動植物,向日葵圍繞太陽轉叫趨光性,植物的根系朝水多的地方生長叫趨水性,鴿子可以磁場辨別方向,鯨魚、海歸都可以利用磁場記住自己走過的路。
所以響應式“本身”是一個很簡單的模型,你給我一個變化,我做出一個反應。
動植物都有一套完善的感覺器官,能夠感受到外界變化。同時他們又有超高的智商或完善的一套生物系統能夠對這種變化作出反應。這是數萬年甚至數千萬年進化的結果,是基因決定的,所以看起來很自然。
再來看看編程界的響應式,也是這兩個問題,一是如何知道外界的變化,二是如何對這種變化作出反應。
代碼可是沒有生命的,那就只能簡單粗暴了。如何知道變化,那就讓別人告訴你唄。如何做出反應,那就執行一段邏輯代碼唄。
別人告訴你就等于異步回調/通知,執行的這段邏輯代碼,可以是外界傳入的,也可以是自己本身的一個方法。
現在明白了吧,異步非阻塞就是響應式。
最后來個小小升華:
所謂響應式就是一個概念,或是一種編程模式,它并不是一個知識,也不是一個技術。但它需要用到一個技術,那就是實現異步非阻塞的技術。
Reactor
在傳統的編碼中,會將邏輯處理代碼寫成方法,需要的數據由方法參數傳入,處理過的數據由方法的返回值返回。
執行時以main方法為入口點啟動,按照一定的順序執行這些方法,數據依次流入流出每個方法,當所有的方法執行完時,數據也處理完了,就結束了。
整個過程是以邏輯代碼的執行為主線,數據只是一個必須的參與者而已,因為代碼要處理數據,如果數據不到位,代碼就停下來不執行,等待數據的到來。
這就是典型的同步阻塞式的執行過程,非常簡單,易于理解,而且代碼也很好寫。
到目前為止,我們提到的都是響應式的理論,那應該怎樣去實現它呢,一時間還真沒有頭緒。
響應式是異步非阻塞,和同步阻塞應該是相對的。那我們不妨就拿響應式往同步阻塞上套一下,看看能得到什么有價值的發現。
響應式關注兩點,變化和反應,而且是變化在前,反應在后。同步阻塞也關注兩點,執行邏輯和數據,而且是執行邏輯在前,數據在后。
那就開始建立對應關系。因為“反應”是一系列行為動作,所以應該和“執行邏輯”對應。那“變化”只能和“數據”對應,其實這是對的,“數據”由不可用到可用,本身就是發生了一個“變化”。
這個對應關系建立的很完美,但是邏輯順序卻完全沖突。響應式是由變化主導反應,這很好理解,我都沒有變化,你無須做出反應。同步阻塞是由執行邏輯主導數據,這也很好理解,我代碼都沒執行呢,根本不需要數據。
可見,它們的對應關系非常完美,但主導順序完全相反,這就是一個非常非常有價值的發現。
因為我們只需把同步阻塞倒過來,就是實現響應式的大致方向。這樣的推理貌似是對的,但實際當中是這樣的嗎?嗯,是這樣的。
現在請大家和我一起扭轉思維。原來以邏輯代碼執行作為主線,數據作為參與者。現在以數據作為主線,邏輯代碼執行作為參與者。說的再白一些,原來是數據傳遞到邏輯代碼里,現在是邏輯代碼傳遞到數據里。
有人也許會問,邏輯代碼怎么傳遞?哈哈,Lambda表達式呀,函數式編程呀。
想象一下,有一個長長的管子,里面的水一直在流。
如果你想讓水變成橙色的,只需在管子上開個口,加裝一個可以持續投放橙色染料的裝置,結果流經它的水都變成橙色的了。
如果你想讓橙色的水變甜的話,只需在后面的管子上開個口,加裝一個可以持續投放白糖的裝置,結果流經它的水都變成甜的了。
同理,可以在后面繼續加裝投放檸檬酸的裝置,讓水變酸,在后面繼續加裝壓入二氧化碳的裝置,讓水帶氣泡。
最后發現,自來水經過多道工序處理后變成了芬達。
如果把水流看作是數據流,把投放裝置看作是邏輯代碼,就變成了,數據先流入第一個邏輯代碼,處理后再流入第二個邏輯代碼,依次流下去直至結束。
這就是以數據作為主線,邏輯代碼只是參與者,同時它也是Reactor實現響應式編程的原理,Spring官方使用的響應式類庫就是Reactor。
其中,“以數據為主線”和“在變化時通知處理者”這兩個功能Reactor庫都已經實現了,我們需要做的就是“對變化做出反應”,即插入邏輯代碼。
Reactor入門
在Reactor中,有兩個非常重要的類,就是Mono和Flux,它們都是數據源,在它們內部都已經實現了“以數據為主線”和“在變化時通知處理者”這兩個功能,而且還提供了方法讓我們來插入邏輯代碼用于“對變化做出反應”。
Mono表示0個或1個數據,Flux表示0到多個數據。先從簡單的Mono開始。
設計一個簡單的示例,首先創建一個數據源,只包含一個數據10,第一個處理就是加1,第二個處理就是奇偶性過濾,第三個處理就是把這個數據消費掉,然后就結束了。
為了清楚地看出來主線程執行的是哪些代碼,工作線程執行的是哪些代碼,特意打印了很多信息。
public static void main(String[] args) { displayCurrTime(1); displayCurrThreadId(1); //創建一個數據源 Mono.just(10) //延遲5秒再發射數據 .delayElement(Duration.ofSeconds(5)) //在數據上執行一個轉換 .map(n -> { displayCurrTime(2); displayCurrThreadId(2); displayValue(n); delaySeconds(2); return n + 1; }) //在數據上執行一個過濾 .filter(n -> { displayCurrTime(3); displayCurrThreadId(3); displayValue(n); delaySeconds(3); return n % 2 == 0; }) //如果數據沒了就用默認值 .defaultIfEmpty(9) //訂閱一個消費者把數據消費了 .subscribe(n -> { displayCurrTime(4); displayCurrThreadId(4); displayValue(n); delaySeconds(2); System.out.println(n + " consumed, worker Thread over, exit."); }); displayCurrTime(5); displayCurrThreadId(5); pause(); } //顯示當前時間 static void displayCurrTime(int point) { System.out.println(point + " : " + LocalTime.now()); } //顯示當前線程Id static void displayCurrThreadId(int point) { System.out.println(point + " : " + Thread.currentThread().getId()); } //顯示當前的數值 static void displayValue(int n) { System.out.println("input : " + n); } //延遲若干秒 static void delaySeconds(int seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { e.printStackTrace(); } } //主線程暫停 static void pause() { try { System.out.println("main Thread over, paused."); System.in.read(); } catch (IOException e) { e.printStackTrace(); } }
以下是輸出結果:
1 : 15:00:39.809 1 : 1 5 : 15:00:40.158 5 : 1 main Thread over, paused. 2 : 15:00:45.158 2 : 9 input : 10 3 : 15:00:47.160 3 : 9 input : 11 4 : 15:00:50.162 4 : 9 input : 9 9 consumed, worker Thread over, exit.
可以看到不到1秒鐘時間主線程就執行完了。然后5秒后數據從數據源發射出來進入第一步處理,2秒后進入第二步處理,3秒后進入第三步處理,數據被消費掉,就結束了。其中主線程Id是1,工作線程Id是9。
這段代碼其實是建立了一個數據通道,在通道的指定位置上插入處理邏輯,等待數據到來。
主線程執行的是建立通道的代碼,主線程很快執行完,通道就建好了。此時只是一個空的通道,根本就沒有數據。
在數據到來時,由工作線程執行每個節點的邏輯代碼來處理數據,然后把數據傳入下一個節點,如此反復直至結束。
所以,在寫響應式代碼的時候,心里一定要默念著,我所做的事情就是建立一條數據通道,在通道上指定的位置插入適合的邏輯處理代碼。同時還要切記,主線程執行完時,只是建立了通道,并沒有數據。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。