中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

netty?pipeline中的inbound和outbound事件怎么傳播

發布時間:2023-04-25 17:13:08 來源:億速云 閱讀:130 作者:iii 欄目:開發技術

這篇文章主要介紹了netty pipeline中的inbound和outbound事件怎么傳播的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇netty pipeline中的inbound和outbound事件怎么傳播文章都會有所收獲,下面我們一起來看看吧。

    傳播inbound事件

    有關于inbound事件, 在概述中做過簡單的介紹, 就是以自己為基準, 流向自己的事件, 比如最常見的channelRead事件, 就是對方發來數據流的所觸發的事件, 己方要對這些數據進行處理, 這一小節, 以激活channelRead為例講解有關inbound事件的處理流程。

    在業務代碼中, 我們自己的handler往往會通過重寫channelRead方法來處理對方發來的數據, 那么對方發來的數據是如何走到channelRead方法中了呢, 也是我們這一小節要剖析的內容。

    在業務代碼中, 傳遞channelRead事件方式是通過fireChannelRead方法進行傳播的。

    兩種寫法

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //寫法1:
        ctx.fireChannelRead(msg);
        //寫法2
        ctx.pipeline().fireChannelRead(msg);
    }

    這里重寫了channelRead方法, 并且方法體內繼續通過fireChannelRead方法進行傳播channelRead事件, 那么這兩種寫法有什么異同?

    我們先以寫法2為例, 將這種寫法進行剖析。

    這里首先獲取當前contextpipeline對象, 然后通過pipeline對象調用自身的fireChannelRead方法進行傳播, 因為默認創建的DefaultChannelpipeline

    DefaultChannelPipeline.fireChannelRead(msg)

    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

    這里首先調用的是AbstractChannelHandlerContext類的靜態方法invokeChannelRead, 參數傳入head節點和事件的消息

    AbstractChannelHandlerContext.invokeChannelRead(head, msg)

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

    這里的m通常就是我們傳入的msg, 而next, 目前是head節點, 然后再判斷是否為當前eventLoop線程, 如果不是則將方法包裝成task交給eventLoop線程處理

    AbstractChannelHandlerContext.invokeChannelRead(m)
    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

    首先通過invokeHandler()判斷當前handler是否已添加, 如果添加, 則執行當前handlerchanelRead方法, 其實這里就明白了, 通過fireChannelRead方法傳遞事件的過程中, 其實就是找到相關handler執行其channelRead方法, 由于我們在這里的handler就是head節點, 所以我們跟到HeadContextchannelRead方法中

    HeadContext的channelRead方法
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //向下傳遞channelRead事件
        ctx.fireChannelRead(msg);
    }

    在這里我們看到, 這里通過fireChannelRead方法繼續往下傳遞channelRead事件, 而這種調用方式, 就是我們剛才分析用戶代碼的第一種調用方式

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //寫法1:
        ctx.fireChannelRead(msg);
        //寫法2
        ctx.pipeline().fireChannelRead(msg);
    }

    這里直接通過context對象調用fireChannelRead方法, 那么和使用pipeline調用有什么區別的, 我會回到HeadConetxchannelRead方法, 我們來剖析ctx.fireChannelRead(msg)這句, 大家就會對這個問題有答案了, 跟到ctxfireChannelRead方法中, 這里會走到AbstractChannelHandlerContext類中的fireChannelRead方法中

    AbstractChannelHandlerContext.fireChannelRead(msg)

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

    這里我們看到, invokeChannelRead方法中傳入了一個findContextInbound()參數, 而這findContextInbound方法其實就是找到當前Context的下一個節點

    AbstractChannelHandlerContext.findContextInbound()

    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

    這里的邏輯也比較簡單, 是通過一個doWhile循環, 找到當前handlerContext的下一個節點, 這里要注意循環的終止條件, while (!ctx.inbound)表示下一個context標志的事件不是inbound的事件, 則循環繼續往下找, 言外之意就是要找到下一個標注inbound事件的節點

    有關事件的標注, 之前已經進行了分析, 如果是用戶定義的handler, 是通過handler繼承的接口而定的, 如果tail或者head, 那么是在初始化的時候就已經定義好, 這里不再贅述

    回到AbstractChannelHandlerContext.fireChannelRead(msg)

    AbstractChannelHandlerContext.fireChannelRead(msg)

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

    找到下一個節點后, 繼續調用invokeChannelRead方法, 傳入下一個和消息對象

    AbstractChannelHandlerContext.invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

    這里的邏輯我們又不陌生了, 因為我們傳入的是當前context的下一個節點, 所以這里會調用下一個節點invokeChannelRead方法, 因我們剛才剖析的是head節點, 所以下一個節點有可能是用戶添加的handler的包裝類HandlerConext的對象

    AbstractChannelHandlerContext.invokeChannelRead(Object msg)
    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try { 
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                //發生異常的時候在這里捕獲異常
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

    又是我們熟悉的邏輯, 調用了自身handlerchannelRead方法, 如果是用戶自定義的handler, 則會走到用戶定義的channelRead()方法中去, 所以這里就解釋了為什么通過傳遞channelRead事件, 最終會走到用戶重寫的channelRead方法中去

    同樣, 也解釋了該小節最初提到過的兩種寫法的區別

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //寫法1:
        ctx.fireChannelRead(msg);
        //寫法2
        ctx.pipeline().fireChannelRead(msg);
    }
    • 寫法1是通過當前節點往下傳播事件

    • 寫法2是通過頭節點往下傳遞事件

    • 所以, 在handler中如果要在channelRead方法中傳遞channelRead事件, 一定要采用寫法1的方式向下傳遞, 或者交給其父類處理, 如果采用2的寫法則每次事件傳輸到這里都會繼續從head節點傳輸, 從而陷入死循環或者發生異常

    • 還有一點需要注意, 如果用戶代碼中channelRead方法, 如果沒有顯示的調用ctx.fireChannelRead(msg)那么事件則不會再往下傳播, 則事件會在這里終止, 所以如果我們寫業務代碼的時候要考慮有關資源釋放的相關操作

    如果ctx.fireChannelRead(msg)則事件會繼續往下傳播, 如果每一個handler都向下傳播事件, 當然, 根據我們之前的分析channelRead事件只會在標識為inbound事件的HandlerConetext中傳播, 傳播到最后, 則最終會調用到tail節點的channelRead方法

    tailConext的channelRead方法

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        onUnhandledInboundMessage(msg);
    }

    onUnhandledInboundMessage(msg)

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            //釋放資源
            ReferenceCountUtil.release(msg);
        }
    }

    這里做了釋放資源的相關的操作

    到這里,對于inbound事件的傳輸流程以及channelRead方法的執行流程已經分析完畢。

    傳播outBound事件

    有關于outBound事件, 和inbound正好相反,以自己為基準, 流向對方的事件, 比如最常見的wirte事件

    在業務代碼中, , 有可能使用wirte方法往寫數據

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().write("test data");
    }

    當然, 直接調用write方法是不能往對方channel中寫入數據的, 因為這種方式只能寫入到緩沖區, 還要調用flush方法才能將緩沖區數據刷到channel中, 或者直接調用writeAndFlush方法, 有關邏輯, 我們會在后面章節中詳細講解, 這里只是以wirte方法為例為了演示outbound事件的傳播的流程

    兩種寫法

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //寫法1
        ctx.channel().write("test data");
        //寫法2
        ctx.write("test data");
    }

    這兩種寫法有什么區別, 首先分析第一種寫法

    //這里獲取ctx所綁定的channel
    ctx.channel().write("test data");

    AbstractChannel.write(Object msg)

    public ChannelFuture write(Object msg) {
    	//這里pipeline是DefaultChannelPipeline
        return pipeline.write(msg);
    }

    繼續跟蹤DefaultChannelPipeline.write(msg)

    DefaultChannelPipeline.write(msg)

    public final ChannelFuture write(Object msg) {
        //從tail節點開始(從最后的節點往前寫)
        return tail.write(msg);
    }

    這里調用tail節點write方法, 這里我們應該能分析到, outbound事件, 是通過tail節點開始往上傳播的。

    其實tail節點并沒有重寫write方法, 最終會調用其父類AbstractChannelHandlerContext.write方法

    AbstractChannelHandlerContext.write(Object msg)
    public ChannelFuture write(Object msg) { 
        return write(msg, newPromise());
    }

    這里有個newPromise()這個方法, 這里是創建一個Promise對象, 有關Promise的相關知識會在以后章節進行分析,繼續分析write

    AbstractChannelHandlerContext.write(final Object msg, final ChannelPromise promise)
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        /**
         * 省略
         * */
        write(msg, false, promise);
        return promise;
    }
    AbstractChannelHandlerContext.write(Object msg, boolean flush, ChannelPromise promise)
    private void write(Object msg, boolean flush, ChannelPromise promise) { 
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                //沒有調flush
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

    這里跟我們之前分析過channelRead方法有點類似, 但是事件傳輸的方向有所不同, 這里findContextOutbound()是獲取上一個標注outbound事件的HandlerContext

    AbstractChannelHandlerContext.findContextOutbound()
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

    這里的邏輯跟之前的findContextInbound()方法有點像, 只是過程是反過來的

    在這里, 會找到當前context的上一個節點, 如果標注的事件不是outbound事件, 則繼續往上找, 意思就是找到上一個標注outbound事件的節點

    回到AbstractChannelHandlerContext.write方法

    AbstractChannelHandlerContext next = findContextOutbound();

    這里將找到節點賦值到next屬性中,因為我們之前分析的write事件是從tail節點傳播的, 所以上一個節點就有可能是用戶自定的handler所屬的context

    然后判斷是否為當前eventLoop線程, 如果是不是, 則封裝成task異步執行, 如果不是, 則繼續判斷是否調用了flush方法, 因為我們這里沒有調用, 所以會執行到next.invokeWrite(m, promise)

    AbstractChannelHandlerContext.invokeWrite(Object msg, ChannelPromise promise)
    private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
        }
    }

    這里會判斷當前handler的狀態是否是添加狀態, 這里返回的是true, 將會走到invokeWrite0(msg, promise)這一步

    AbstractChannelHandlerContext.invokeWrite0(Object msg, ChannelPromise promise)
    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            //調用當前handler的wirte()方法
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

    這里的邏輯也似曾相識, 調用了當前節點包裝的handlerwrite方法, 如果用戶沒有重寫write方法, 則會交給其父類處理

    ChannelOutboundHandlerAdapter.write
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

    這里調用了當前ctxwrite方法, 這種寫法和我們小節開始的寫法是相同的, 我們回顧一下

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //寫法1
        ctx.channel().write("test data");
        //寫法2
        ctx.write("test data");
    }

    我們跟到其write方法中, 這里走到的是AbstractChannelHandlerContext類的write方法

    AbstractChannelHandlerContext.write(Object msg, boolean flush, ChannelPromise promise)
    private void write(Object msg, boolean flush, ChannelPromise promise) { 
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                //沒有調flush
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

    又是我們所熟悉邏輯, 找到當前節點的上一個標注事件為outbound事件的節點, 繼續執行invokeWrite方法, 根據之前的剖析, 我們知道最終會執行到上一個handlerwrite方法中。

    走到這里已經不難理解, ctx.channel().write("test data")其實是從tail節點開始傳播寫事件, 而ctx.write("test data")是從自身開始傳播寫事件。

    所以, 在handler中如果重寫了write方法要傳遞write事件, 一定采用ctx.write("test data")這種方式或者交給其父類處理處理, 而不能采用ctx.channel().write("test data")這種方式, 因為會造成每次事件傳輸到這里都會從tail節點重新傳輸, 導致不可預知的錯誤。

    如果用代碼中沒有重寫handlerwrite方法, 則事件會一直往上傳輸, 當傳輸完所有的outbound節點之后, 最后會走到head節點的wirte方法中。

    HeadContext.write
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }

    我們看到write事件最終會流向這里, 通過unsafe對象進行最終的寫操作

    inbound事件和outbound事件的傳輸流程圖

    netty?pipeline中的inbound和outbound事件怎么傳播

    關于“netty pipeline中的inbound和outbound事件怎么傳播”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“netty pipeline中的inbound和outbound事件怎么傳播”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    昌都县| 邳州市| 武陟县| 平塘县| 巴塘县| 南澳县| 枣阳市| 诏安县| 车险| 长汀县| 潞城市| 玉门市| 洛阳市| 桑日县| 唐海县| 平和县| 邵阳市| 花莲县| 澳门| 上饶县| 凭祥市| 和田县| 怀来县| 南皮县| 紫云| 福泉市| 汝城县| 株洲县| 罗山县| 阜阳市| 通化县| 浑源县| 大邑县| 芜湖市| 高台县| 济源市| 唐山市| 万州区| 沁水县| 习水县| 鹿泉市|