中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Reactor中Thread和Scheduler的區別是什么

發布時間:2021-08-03 15:29:34 來源:億速云 閱讀:175 作者:Leah 欄目:編程語言

Reactor中Thread和Scheduler的區別是什么,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

Thread多線程

先看一下之前舉的Flux的創建的例子:

        Flux<String> flux = Flux.generate(
                () -> 0,
                (state, sink) -> {
                    sink.next("3 x " + state + " = " + 3*state);
                    if (state == 10) sink.complete();
                    return state + 1;
                });

        flux.subscribe(System.out::println);

可以看到,不管是Flux generator還是subscriber,他們實際上都是運行在同一個線程中的。

如果我們想讓subscribe發生在一個新的線程中,我們需要新啟動一個線程,然后在線程內部進行subscribe操作。

        Mono<String> mono = Mono.just("hello ");

        Thread t = new Thread(() -> mono
                .map(msg -> msg + "thread ")
                .subscribe(v ->
                        System.out.println(v + Thread.currentThread().getName())
                )
        );
        t.start();
        t.join();

上面的例子中,Mono在主線程中創建,而subscribe發生在新啟動的Thread中。

Schedule定時器

很多情況下,我們的publisher是需要定時去調用一些方法,來產生元素的。Reactor提供了一個新的Schedule類來負責定時任務的生成和管理。

Scheduler是一個接口:

public interface Scheduler extends Disposable

它定義了一些定時器中必須要實現的方法:

比如立即執行的:

Disposable schedule(Runnable task);

延時執行的:

default Disposable schedule(Runnable task, long delay, TimeUnit unit)

和定期執行的:

default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)

Schedule有一個工具類叫做Schedules,它提供了多個創建Scheduler的方法,它的本質就是對ExecutorService和ScheduledExecutorService進行封裝,將其做為Supplier來創建Schedule。

簡單點看Schedule就是對ExecutorService的封裝。

Schedulers工具類

Schedulers工具類提供了很多個有用的工具類,我們來詳細介紹一下:

Schedulers.immediate():

提交的Runnable將會立馬在當前線程執行。

Schedulers.single():

使用同一個線程來執行所有的任務。

Schedulers.boundedElastic():

創建一個可重用的線程池,如果線程池中的線程在長時間內都沒有被使用,那么將會被回收。boundedElastic會有一個最大的線程個數,一般來說是CPU cores x 10。 如果目前沒有可用的worker線程,提交的任務將會被放入隊列等待。

Schedulers.parallel():

創建固定個數的工作線程,個數和CPU的核數相關。

Schedulers.fromExecutorService(ExecutorService):

從一個現有的線程池創建Scheduler。

Schedulers.newXXX:

Schedulers提供了很多new開頭的方法,來創建各種各樣的Scheduler。

我們看一個Schedulers的具體應用,我們可以指定特定的Scheduler來產生元素:

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

publishOn 和 subscribeOn

publishOn和subscribeOn主要用來進行切換Scheduler的執行上下文。

先講一個結論,就是在鏈式調用中,publishOn可以切換Scheduler,但是subscribeOn并不會起作用。

這是因為真正的publish-subscribe關系只有在subscriber開始subscribe的時候才建立。

下面我們來具體看一下這兩個方法的使用情況:

publishOn

publishOn可以在鏈式調用的過程中,進行publish的切換:

    [@Test](https://my.oschina.net/azibug)
    public void usePublishOn() throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":"+ Thread.currentThread())
                .publishOn(s)
                .map(i -> "value " + i+":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
        System.out.println(Thread.currentThread());
        Thread.sleep(5000);
    }

上面我們創建了一個名字為parallel-scheduler的scheduler。

然后創建了一個Flux,Flux先做了一個map操作,然后切換執行上下文到parallel-scheduler,最后右執行了一次map操作。

最后,我們采用一個新的線程來進行subscribe的輸出。

先看下輸出結果:

Thread[main,5,main]
value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]

可以看到,主線程的名字是Thread。Subscriber線程的名字是ThreadA。

那么在publishOn之前,map使用的線程就是ThreadA。 而在publishOn之后,map使用的線程就切換到了parallel-scheduler線程池。

subscribeOn

subscribeOn是用來切換Subscriber的執行上下文,不管subscribeOn出現在調用鏈的哪個部分,最終都會應用到整個調用鏈上。

我們看一個例子:

    [@Test](https://my.oschina.net/azibug)
    public void useSubscribeOn() throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i + ":" + Thread.currentThread())
                .subscribeOn(s)
                .map(i -> "value " + i + ":"+ Thread.currentThread());

        new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
        Thread.sleep(5000);
    }

同樣的,上面的例子中,我們使用了兩個map,然后在兩個map中使用了一個subscribeOn用來切換subscribe執行上下文。

看下輸出結果:

value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]

看完上述內容,你們掌握Reactor中Thread和Scheduler的區別是什么的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

普格县| 阿巴嘎旗| 齐河县| 通江县| 赣榆县| 伊吾县| 安达市| 赤城县| 辽源市| 饶阳县| 楚雄市| 龙游县| 镇原县| 开鲁县| 陆良县| 东台市| 达日县| 扎鲁特旗| 普格县| 陈巴尔虎旗| 永泰县| 巴塘县| 南华县| 皮山县| 永清县| 新晃| 聂拉木县| 博客| 镇赉县| 宜章县| 隆回县| 重庆市| 望城县| 南京市| 桃园县| 谷城县| 新乡市| 财经| 阳春市| 左权县| 清涧县|