中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RxJava線程切換過程是怎樣的

發布時間:2021-12-28 16:33:54 來源:億速云 閱讀:132 作者:柒染 欄目:云計算

今天就跟大家聊聊有關RxJava線程切換過程是怎樣的,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

線程切換過程

下面我們就來看看它的又一利器,調度器Scheduler:就像我們所知道的,Scheduler是給Observable數據流添加多線程功能所準備的,一般我們會通過使用subscribeOn()observeOn()方法傳入對應的Scheduler去指定數據流的每部分操作應該以何種方式運行在何種線程。對于我們而言,最常見的莫過于在非主線程獲取并處理數據之后在主線程更新UI這樣的場景了:

這是我們十分常見的調用方法,一氣呵成就把不同線程之間的處理都搞定了,因為是鏈式所以結構也很清晰,我們現在來看看這其中的線程切換流程。

  • subscribeOn()

    當我們調用subscribeOn()的時候:

    可以看到這里也是調用了create()去生成一個Observable,而OperatorSubscribeOn則是實現了OnSubscribe接口,同時將原始的Observable和我們需要的scheduler傳入:


    可以看出來,這里對subscriber的處理與前文中OperatorMapcall()subscriber的處理很相似。在這里我們同樣會根據傳入的subscriber構造出新的Subscribers,不過這一系列的過程大部分都是由worker通過schedule()去執行的,從后面setProducer()中對于線程的判斷,再結合subscribeOn()方法的目的我們能大概推測出,這個worker在一定程度上就相當于一個新線程的代理執行者,schedule()所實現的與Thread類中run()應該十分類似。我們現在來看看這個worker的執行過程。
    首先從Schedulers.io()進入:

    這個通過hook拿到scheduler的過程我們先不管,直接進CachedThreadScheduler,看它的createWorker()方法:

    這里的pool是一個原子變量引用AtomicReference,所持有的則是CachedWorkerPool,因而這個pool顧名思義就是用來保存worker的緩存池啦,我們從緩存池里拿到需要的worker并作了一層封裝成為EventLoopWorker

    在這里我們終于發現目標ThreadWorker,它繼承自NewThreadWorker,之前的schedule()方法最終都會到這個scheduleActual()方法里:

    這里我們看到了executor線程池,我們用Schedulers.io()最終實現的線程切換的本質就在這里了。現在再結合之前的過程我們從頭梳理一下:

    subscribeOn()時,我們會新生成一個Observable,它的成員onSubscribe會在目標Subscriber訂閱時使用傳入的Scheduler的worker作為線程調度執行者,在對應的線程中通知原始Observable發送消息給這個過程中臨時生成的Subscriber,這個Subscriber又會通知到目標Subscriber,這樣就完成了subscribeOn()的過程。

  • observeOn()

    下面我們接著來看看observeOn()

    我們直接看最終調用的部分,可以看到這里又是一個lift(),在這里傳入了OperatorObserveOn,它與OperatorSubscribeOn不同,是一個OperatorOperator的功能我們上文中已經講過就不贅述了),它構造出了新的觀察者ObserveOnSubscriber并實現了Action0接口:

    可以看出來,這里ObserveOnSubscriber所有的發送給目標Subscriber child的消息都被切換到了recursiveScheduler的線程作處理,也就達到了將線程切回的目的。

總結observeOn()整體流程如下:

對比subscribeOn()observeOn()這兩個過程,我們不難發現兩者的區別:subscribeOn()將初始Observable的訂閱事件整體都切換到了另一個線程;而observeOn()則是將初始Observable發送的消息切換到另一個線程通知到目標Subscriber。前者把 “訂閱 + 發送” 的切換了一個線程,后者把 “發送” 切換了一個線程。所以,我們的代碼中所實現的功能其實是:

這樣就能很容易實現耗時任務在子線程操作,在主線程作更新操作等這些常見場景的功能啦。

4.其他角色

Subject
Subject在Rx系列是一個比較特殊的角色,它繼承了Observable的同時也實現了Observer接口,也就是說它既可作為觀察者,也可作為被觀察者,他一般被用來作為連接多個不同Observable、Observer之間的紐帶。可能你會奇怪,我們不是已經有了像map()flatMap()這類的操作符去變化 Observable數據流了嗎,為什么還要引入Subject這個東西呢?這是因為Subject所承擔的工作并非是針對Observable數據流內容的轉換連接,而是數據流本身在Observable、Observer之間的調度。光這么說可能還是很模糊,我們舉個《RxJava Essentials》中的例子:

我們通過create()創建了一個PublishSubject,觀察者成功訂閱了這個subject,然而這個subject卻沒有任何數據要發送,我們只是知道他未來會發送的會是String值而已。之后,當我們調用subject.onNext()時,消息才被發送,Observer的onNext()被觸發調用,輸出了"Hello World"。

這里我們注意到,當訂閱事件發生時,我們的subject是沒有產生數據流的,直到它發射了"Hello World",數據流才開始運轉,試想我們如果將訂閱過程和subject.onNext()調換一下位置,那么Observer就一定不會接受到"Hello World"了(這不是廢話嗎- -|||),因而這也在根本上反映了Observable的冷熱區別。

一般而言,我們的Observable都屬于Cold Observables,就像看視頻,每次點開新視頻我們都要從頭開始播放;而Subject則默認屬于Hot Observables,就像看直播,視頻數據永遠都是新的。
基于這種屬性,Subject自然擁有了對接收到的數據流進行選擇調度等的能力了,因此,我們對于Subject的使用也就通常基于如下的思路:

在前面的例子里我們用到的是PublishSubject,它只會把在訂閱發生的時間點之后來自原始Observable的數據發射給觀察者。等一下,這功能聽起來是不是有些似曾相識呢?

沒錯,就是EventBus和Otto。(RxJava的出現慢慢讓Otto退出了舞臺,現在Otto的Repo已經是Deprecated狀態了,而EventBus依舊堅挺)基于RxJava的觀察訂閱取消的能力和PublishSubject的功能,我們十分容易就能寫出實現了最基本功能的簡易事件總線框架:

當然Subject還有其他如BehaviorSubjectReplaySubjectAsyncSubject等類型,大家可以去看官方文檔,寫得十分詳細,這里就不介紹了。

三.后記

前面相信最近這段日子里,提到RxJava,大家就會想到Google最近剛剛開源的Agera。Agera作為專門為Android打造的Reactive Programming框架,難免會被拿來與RxJava做對比。本文前面RxJava的主體流程分析已近尾聲,現在我們再來看看Agera這東東又是怎么一回事。

首先先上結論:

Agera最初是為了Google Play Movies而開發的一個內部框架,現在開源出來了,它雖然是在RxJava之后才出現,但是完全獨立于RxJava,與它沒有任何關系(只不過開源的時間十分微妙罷了233333)。 與RxJava比起來,Agera更加專注于Android的生命周期,而RxJava則更加純粹地面向Java平臺而非Android。

也許你可能會問:“那么RxAndroid呢,不是還有它嗎?”事實上,RxAndroid早在1.0版本的時候就進行了很大的重構,很多模塊被拆分到其他的項目中去了,同時也刪除了部分代碼,僅存下來的部分多是和Android線程相關的部分,比如AndroidSchedulersMainThreadSubscription等。鑒于這種情況,我們暫且不去關注RxAndroid,先把目光放在Agera上。

同樣也是基于觀察者模式,Agera和RxJava的角色分類大致相似,在Agera中,主要角色有兩個:Observable(被觀察者)、Updatable(觀察者)。



是的,相較于RxJava中的Observable,Agera中的Observable只是一個簡單的接口,也沒有范性的存在,Updatable亦是如此,這樣我們要如何做到消息的傳遞呢?這就需要另外一個接口了:

終于看到了泛型T,我們的消息的傳遞能力就是依賴于此接口了。所以我們將這個接口和基礎的Observable結合一下:

這里的Repository<T>在一定程度上就是我們想要的RxJava中的Observable<T>啦。類似地,Repository也有兩種類型的實現:

  • Direct - 所包含的數據總是可用的或者是可被同步計算出來的;一個Direct的Repository總是處于活躍(active)狀態下

  • Deferred - 所包含的數據是異步計算或拉去所得;一個Deffered的Repository直到有Updatable被添加進來之前都會是非活躍(inactive)狀態下
    是不是感到似曾相識呢?沒錯,Repository也是有冷熱區分的,不過我們現在暫且不去關注這一點。回到上面接著看,既然現在發數據的角色有了,那么我們要如何接收數據呢?答案就是Receiver

相信看到這里,大家應該也隱約感覺到了:在Agera的世界里,數據的傳輸與事件的傳遞是相互隔離開的,這是目前Agera與Rx系列的最大本質區別。Agera所使用的是一種push event, pull data的模型,這意味著event并不會攜帶任何data,Updatable在需要更新時,它自己會承擔起從數據源拉取數據的任務。這樣,提供數據的責任就從Observable中拆分了出來交給了Repository,讓其自身能夠專注于發送一些簡單的事件如按鈕點擊、一次下拉刷新的觸發等等。

那么,這樣的實現有什么好處呢?

當這兩種處理分發邏輯分離開時,Updatable就不必觀察到來自Repository的完整數據變化的歷史,畢竟在大多數場景下,尤其是更新UI的場景下,最新的數據往往才是有用的數據。

但是我就是需要看到變化的歷史數據,怎么辦?

不用擔心,這里我們再請出一個角色Reservoir

顧名思義,Reservoir就是我們用來存儲變化中的數據的地方,它繼承了ReceiverRepository,也就相當于同時具有了接收數據,發送數據的能力。通過查看其具體實現我們可以知道它的本質操作都是使用內部的Queue實現的:通過accept()接收到數據后入列,通過get()拿到數據后出列。若一個Updatable觀察了此Reservoir,其隊列中發生調度變化后即將出列的下一個數據如果是可用的(非空),就會通知該Updatable,進一步拉取這個數據發送給Receiver

現在,我們已經大概了解了這幾個角色的功能屬性了,接下來我們來看一段官方示例代碼:


是不是有些云里霧里的感覺呢?多虧有注釋,我們大概能夠猜出到底上面都做了什么:使用需要的圖片規格作為參數拼接到url中,拉取對應的圖片并用ImageView顯示出來。我們結合API來看看整個過程:

  • Repositories.repositoryWithInitialValue(Result.absent())
    創建一個可運行(抑或說執行)的repository。
    初始化傳入值是Result,它用來概括一些諸如apply()merge()的操作的結果的不可變對象,并且存在兩種狀態succeeded()failed()
    返回REventSource

  • observe()
    用于添加新的Observable作為更新我們的圖片的Event source,本例中不需要。
    返回RFrequency

  • onUpdatesPerLoop()
    在每一個Looper Thread loop中若有來自多個Event Source的update()處理時,只需開啟一個數據處理流。
    返回RFlow

  • getFrom(new Supplier(…))
    忽略輸入值,使用來自給定Supplier的新獲取的數據作為輸出值。
    返回RFlow

  • goTo(executor)
    切換到給定的executor繼續數據處理流。

  • attemptTransform(function())
    使用給定的function()變換輸入值,若變換失敗,則終止數據流;若成功,則取新的變換后的值作為當前流指令的輸出。
    返回RTermination

  • orSkip()
    若前面的操作檢查為失敗,就跳過剩下的數據處理流,并且不會通知所有已添加的Updatable。

  • thenTransform(function())
    與attemptTransform(function())相似,區別在于當必要時會發出通知。
    返回RConfig

  • onDeactivation(SEND_INTERRUPT)
    用于明確repository不再active時的行為。
    返回RConfig

  • compile()
    執行這個repository。
    返回Repository

整體流程乍看起來并沒有什么特別的地方,但是真正的玄機其實藏在執行每一步的返回值里:
初始的REventSource<T, T>代表著事件源的開端,它從傳入值接收了T initialValue,這里的中,第一個T是當前repository的數據的類型,第二個T則是數據處理流開端的時候的數據的類型。

之后,當observe()調用后,我們傳入事件源給REventSource,相當于設定好了需要的事件源和對應的開端,這里返回的是RFrequency<T, T>,它繼承自REventSource,為其添加了事件源的發送頻率的屬性。

之后,我們來到了onUpdatesPerLoop(),這里明確了所開啟的數據流的個數(也就是前面所講的頻率)后,返回了RFlow,這里也就意味著我們的數據流正式生成了。同時,這里也是流式調用的起點。

拿到我們的RFlow之后,我們就可以為其提供數據源了,也就是前面說的Supplier,于是調用getFrom(),這樣我們的數據流也就真正意義擁有了數據“干貨”。

有了數據之后我們就可以按具體需要進行數據轉換了,這里我們可以直接使用transform(),返回RFlow,以便進一步進行流式調用;也可以調用attemptTransform()來對可能出現的異常進行處理,比如orSkip()、orEnd()之后繼續進行流式調用。

經過一系列的流式調用之后,我們終于對數據處理完成啦,現在我們可以選擇先對成型的數據在做一次最后的包裝thenTransform(),或是與另一個Supplier合并thenMergeIn()等。這些處理之后,我們的返回值也就轉為了RConfig,進入了最終配置和repository聲明結束的狀態。
在最終的這個配置過程中,我們調用了onDeactivation(),為這個repository明確了最終進入非活躍狀態時的行為,如果不需要其他多余的配置的話,我們就可以進入最終的compile()方法了。當我們調用compile()時,就會按照前面所走過的所有流程與配置去執行并生成這個repository。到此,我們的repository才真正被創建了出來。

以上就是repository從無到有的全過程。當repository誕生后,我們也就可以傳輸需要的數據啦。再回到上面的示例代碼:

我們在onResume()onPause()這兩個生命周期下分別添加、移除了Updatable。相較于RxJava中通過Subscription去取消訂閱的做法,Agera的這種寫法顯然更為清晰也更為整潔。我們的Activity實現了Updatable和Receiver接口,直接看其實現方法:

可以看到這里repository將數據發送給了receiver,也就是自己,在對應的accept()方法中接收到我們想要的bitmap后,這張圖片也就顯示出來了,示例代碼中的完整流程也就結束了。

總結一下上述過程:

  • 首先Repositories.repositoryWithInitialValue()生成原點REventSource。

  • 配置完Observable之后進入RFrequency狀態,接著配置數據流的流數。

  • 前面配置完成后,數據流RFlow生成,之后通過getFrom()mergeIn()、transform()等方法可進一步進行流式調用;也可以使用attemptXXX()方法代替原方法,后面接著調用orSkip()orEnd()進行error handling處理。當使用attemptXXX()方法時,數據流狀態會變為RTermination,它代表此時的狀態已具有終結數據流的能力,是否終結數據流要根據failed check觸發,結合后面跟著調用的orSkip()orEnd(),我們的數據流會從RTermination再次切換為RFlow,以便進行后面的流式調用。

  • 經過前面一系列的流式處理,我們需要結束數據流時,可以選擇調用thenXXX()方法,對數據流進行最終的處理,處理之后,數據流狀態會變為 RConfig;也可以為此行為添加error handling處理,選擇thenAttemptXXX()方法,后面同樣接上orSkip()orEnd()即可,最終數據流也會轉為Rconfig狀態。

  • 此時,我們可以在結束前按需要選擇對數據流進行最后的配置,例如:調用onDeactivation()配置從“訂閱”到“取消訂閱”的過程是否需要繼續執行數據流等等。

  • 一切都部署完畢后,我們compile()這個RConfig,得到最終的成型的Repository,它具有添加Updatable、發送數據通知Receiver的能力。

  • 我們根據需要添加Updatablerepository在數據流處理完成后會通過update()發送event通知Updatable

  • Updatable收到通知后則會拉取repository的成果數據,并將數據通過accept()發送給Receiver。完成 Push event, pull data 的流程。

看完上述內容,你們對RxJava線程切換過程是怎樣的有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

衡东县| 元谋县| 墨竹工卡县| 房产| 贵州省| 南安市| 塘沽区| 德昌县| 富锦市| 东乡县| 廊坊市| 来安县| 咸宁市| 钟山县| 循化| 青冈县| 兖州市| 庆阳市| 大荔县| 永善县| 定结县| 邹平县| 赣榆县| 城步| 桃源县| 松溪县| 葵青区| 韩城市| 郴州市| 屯留县| 宁安市| 揭西县| 天长市| 城固县| 寿宁县| 阿城市| 张家界市| 江门市| 花莲市| 南安市| 沈丘县|