中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

還不會Rxjava響應式編程框架設計,先從這篇文章入手

發布時間:2020-07-06 18:58:59 來源:網絡 閱讀:221 作者:wx5db6b6408bea5 欄目:移動開發

一.Rxjava是什么

Rxjava在GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫)。

還不會Rxjava響應式編程框架設計,先從這篇文章入手

通俗來說,Rxjava是一個采用了觀察者模式設計處理異步的框架。鏈式調用設計讓代碼優雅易讀。

舉個例子:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
 @Override
 public void subscribe(ObservableEmitter<String> e) throws Exception {
 e.onNext("a");
 }
 });
 observable.subscribe(new Observer<String>() {
 @Override
 public void onSubscribe(Disposable d) {
 }
 @Override
 public void onNext(String s) {
 }
 @Override
 public void onError(Throwable e) {
 }
 @Override
 public void onComplete() {
 }
 });

這是Rxjava2最簡單的用法:

1.創建一個Observable,重寫subscribe方法,這里主要處理被觀察的事件。

2.訂閱這個Observable,事件會回調observer的方法,我們可以對事件做響應的處理

二.Rxjava源碼解析

2.1. 創建Observable:

創建Observable用的是Observable.create(ObservableOnSubscribe<T> source)方法。這個方法的參數是ObservableOnSubscribe:

public interface ObservableOnSubscribe<T> {
 /**
 * Called for each Observer that subscribes.
 * @param e the safe emitter instance, never null
 * @throws Exception on error
 */
 void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

ObservableOnSubscribe是一個接口,唯一的方法是subscribe,參數是ObservableEmitter<T> e。ObservableEmitter是一個繼承了Emitter的接口,接口Emitter里定義了onNext、onError、onComplete等方法,和Observer(觀察者)的方法相對應。

public interface Emitter<T> {
 /**
 * Signal a normal value.
 * @param value the value to signal, not null
 */
 void onNext(@NonNull T value);
 /**
 * Signal a Throwable exception.
 * @param error the Throwable to signal, not null
 */
 void onError(@NonNull Throwable error);
 /**
 * Signal a completion.
 */
 void onComplete();
}

ObservableEmitter對接口Emitter進行擴展,增加了setDisposable、setCancellable等方法

基本參數了解了,現在看看create方法里面做了什么,代碼如下:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
 }
調用了RxJavaPlugins的onAssembly方法。又有一個新參數ObservableCreate<T>(source),我們看看它是什么:

final class ObservableCreate<T> extends Observable<T> {
 public ObservableCreate(ObservableOnSubscribe<T> source) {
 this.source = source;
 }
}

繼承了Observable,所以也是個被觀察對象,在構造函數中我們看到我們new的ObservableOnSubscribe對象,被存在了ObservableCreate的source里面

那我們繼續看看onAssembly方法做什么:

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
 Function<? super Observable, ? extends Observable> f = onObservableAssembly;
 if (f != null) {
 return apply(f, source);
 }
 return source;
 }

一個Hook方法。onObservableAssembly是一個靜態變量,我們沒有設置,默認為空,所以直接返回source對象。也就是說,Observable的create方法其實就是把我們ObservableOnSubscribe對象,存儲在ObservableCreate對象的source里面,然后返回ObservableCreate對象。

我們知道ObservableCreate是繼承Observable的,所以創建了ObservableCreate對象,我們的Observable也就創建完了。

2.2 訂閱事件(被觀察者)

訂閱被觀察者的操作是observable.subscribe(new Observer<String>())。這個操作符其實是個“被動”,就是事件被觀察者觀察。因為subscribe方法里的參數Observer才是觀察者。我們也會在Observer里的各個會調方法里接收到事件相關的返回值。

我們看看subscribe方法的源碼:

public final void subscribe(Observer<? super T> observer) {
 try {
 subscribeActual(observer);
 } catch (NullPointerException e) { // NOPMD
 throw e;
 } catch (Throwable e) {
 RxJavaPlugins.onError(e);
 }
 }

看代碼我們知道最主要調用的方法是:subscribeActual(observer);,這個方法是Observable里的抽象方法,而此時我們的Observable是一個ObservableCreate對象(前面create方法返回的對象)。所以我們去看一下ObservableCreate里面是如何重寫這個方法的。代碼如下:

protected void subscribeActual(Observer<? super T> observer) {
 CreateEmitter<T> parent = new CreateEmitter<T>(observer);
 observer.onSubscribe(parent);
 try {
 source.subscribe(parent);
 } catch (Throwable ex) {
 Exceptions.throwIfFatal(ex);
 parent.onError(ex);
 }

我們一看到這個方法主要做了三件事:

①創建一個CreateEmitter對象parent。

②把parent傳給source的subscribe方法。上面我們知道source就是剛才存的ObservableOnSubscribe對象,subscribe也就是我們重寫的方法:

@Override
 public void subscribe(ObservableEmitter<String> e) throws Exception {
 e.onNext("a");
 }

所以我們在這個方法里就能收到一個CreateEmmiter,通過CreateEmitter可以回調相應的方法。CreateEmitter是實現ObservableEmitter接口,我們看看它內部實現,如:onNext源碼如下:

@Override
public void onNext(T t) {
 observer.onNext(t);
}

也就是說,當我們在ObservableOnSubscribe的subscribe方法里調用ObservableEmitter的onNext方法的時候,它里面會調用observer的onNext。于是通過這樣的傳遞,我們就能在observer里響應的回調方法里收到事件的相關狀態。

至此一個簡單Rxjava流式傳遞原理已經講完了,總結流程如下:

使用Observbable.create方法,產生一個ObservableCreate對象,對象里存著ObservableOnSubscribe對象source。

調用ObservableCreate.subscribe方法,實際調用的是subscribeActual方法,傳入一個Observer對象。

subscribeActual方法中創建一個CreateEmmiter對象,調用source.subscribe方法,傳入CreateEmmiter對象。

于是我們在ObservableOnSubscribe中就接收到了一個CreateEmmiter,CreateEmmiter是ObservableEmmiter的子類。我們可以在這里調用CreateEmmiter的方法進行事件回調。

調用CreateEmmiter方法,實際上會調用Observer的響應的方法。也就是CreateEmmiter把事件狀態傳遞給觀察者。

Android架構師之路很漫長,所以我們從這份學習內容和計劃開始執行吧!喜歡的話別忘記點擊關注和贊哦

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

镇安县| 大邑县| 武功县| 海盐县| 红河县| 祁阳县| 琼海市| 明星| 贡山| 尉犁县| 五指山市| 聊城市| 广平县| 淮南市| 辉县市| 通江县| 梅州市| 固始县| 沙洋县| 沁阳市| 舒城县| 双鸭山市| 黎川县| 莲花县| 铁岭市| 宁蒗| 新安县| 瑞安市| 松江区| 聂荣县| 山丹县| 如东县| 涪陵区| 东丽区| 拜城县| 无棣县| 年辖:市辖区| 墨竹工卡县| 邵阳市| 五峰| 新昌县|