中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么在Java中使用reactive stream協議

發布時間:2021-06-02 16:12:34 來源:億速云 閱讀:320 作者:Leah 欄目:開發技術

這篇文章將為大家詳細講解有關怎么在Java中使用reactive stream協議,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

什么是reactive stream

上面我們講到了reactive stream的作用,大家應該對reactive stream有了一個基本的了解。這里我們再給reactive stream做一個定義:

reactive stream就是一個異步stream處理的標準,它的特點就是非阻塞的back pressure。

reactive stream只是一個標準,它定義了實現非阻塞的back pressure的最小區間的接口,方法和協議。

所以reactive stream其實有很多種實現的,不僅僅是java可以使用reactive stream,其他的編程語言也可以。

reactive stream只是定義了最基本的功能,各大實現在實現了基本功能的同時可以自由擴展。

目前reactive stream最新的java版本是1.0.3,是在2019年8月23發布的。它包含了java API,協議定義文件,測試工具集合和具體的實現例子。

深入了解java版本的reactive stream

在介紹java版本的reactive stream之前,我們先回顧一下reactive stream需要做哪些事情:

1.能夠處理無效數量的消息

2.消息處理是有順序的

3.可以異步的在組件之間傳遞消息

4.一定是非阻塞和backpressure的

為了實現這4個功能,reactive stream定義了4個接口,Publisher,Subscriber,Subscription,Processor。這四個接口實際上是一個觀察者模式的實現。接下來我們詳細來分析一下各個接口的作用和約定。

Publisher

先看下Publisher的定義:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Publisher就是用來生成消息的。它定義了一個subscribe方法,傳入一個Subscriber。這個方法用來將Publisher和Subscriber進行連接。

一個Publisher可以連接多個Subscriber。

每次調用subscribe建立連接,都會創建一個新的Subscription,Subscription和subscriber是一一對應的。

一個Subscriber只能夠subscribe一次Publisher。

如果subscribe失敗或者被拒絕,則會出發Subscriber.onError(Throwable)方法。

Subscriber

先看下Subscriber的定義:

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Subscriber就是消息的接收者。

在Publisher和Subscriber建立連接的時候會觸發onSubscribe(Subscription s)方法。

當調用Subscription.request(long)方法時,onNext(T t)會被觸發,根據request請求參數的大小,onNext會被觸發一次或者多次。

在發生異常或者結束時會觸發onError(Throwable t)或者onComplete()方法。

Subscription

先看下Subscription的定義:

public interface Subscription {
    public void request(long n);
    public void cancel();
}

Subscription代表著一對一的Subscriber和Publisher之間的Subscribe關系。

request(long n)意思是向publisher請求多少個events,這會觸發Subscriber.onNext方法。

cancel()則是請求Publisher停止發送信息,并清除資源。

Processor

先看下Processor的定義:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Processor即是Subscriber又是Publisher,它代表著一種處理狀態。

JDK中reactive stream的實現

在JDK中java.util.concurrent.Flow就是reactive stream語義的一種實現。

Flow從JDK9就開始有了。我們看下它的結構:

怎么在Java中使用reactive stream協議

關于怎么在Java中使用reactive stream協議就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

铜山县| 昌江| 新竹县| 宣恩县| 珠海市| 恩施市| 静安区| 库伦旗| 花莲县| 新昌县| 陇川县| 衡水市| 唐河县| 涿鹿县| 铁力市| 荆门市| 红桥区| 栾城县| 吉水县| 惠东县| 疏勒县| 泽库县| 康马县| 宕昌县| 扶绥县| 英德市| 宝应县| 金门县| 襄汾县| 柯坪县| 浠水县| 舟山市| 台中市| 登封市| 漳平市| 印江| 仁化县| 金湖县| 云林县| 三原县| 海林市|