中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

基于netty的websocket在channelActive觸發時發送數據異常問題分析是怎樣的

發布時間:2021-12-23 18:00:28 來源:億速云 閱讀:263 作者:柒染 欄目:大數據

本篇文章給大家分享的是有關基于netty的websocket在channelActive觸發時發送數據異常問題分析是怎樣的,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

####事情起因,用netty實現了websocket,在鏈接創建成功后發送一個消息給客戶端,我們選擇在channelActive中發送消息。 可想而知肯定是不行的了 代碼如下

EventLoopGroup bossGroup = new NioEventLoopGroup();

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.option(ChannelOption.SO_BACKLOG, 1024);
            // 綁定線程池
            sb.group(group, bossGroup)
                    // 指定使用的channel
                    .channel(NioServerSocketChannel.class)
                    // 綁定監聽端口
                    .localAddress(this.port)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    // 綁定客戶端連接時候觸發操作
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            log.info("收到新連接");
                            ch.pipeline().addLast(new LoggingHandler("DEBUG"));
                            ch.pipeline().addLast(new IdleStateHandler(60, 0, 0));
                            //websocket協議本身是基于http協議的,所以這邊也要使用http解編碼器
                            ch.pipeline().addLast(new HttpServerCodec());
                            //以塊的方式來寫的處理器
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                            ch.pipeline().addLast(new HttpObjectAggregator(8192));
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/socket", null, true, 65536 * 10));
                            ch.pipeline().addLast(new MyWebSocketHandler());
                        }
                    });
            // 服務器異步創建綁定
            ChannelFuture cf = sb.bind().sync();
            log.info(NettyServer.class + " 啟動正在監聽: " + cf.channel().localAddress());
            // 關閉服務器通道
            cf.channel().closeFuture().sync();

#####排查1:因為沒有任何異常,客戶端沒有收到消息,故先采用wireshark抓包,發現網卡上沒有對應的想發送的消息。 為什么網卡沒有對應的包呢,經過debug發現如果發送的數據類型是WebSocketFrame在最終發送時候異常了具體代碼在HeadContext的write方法中,headcontext是netty的channelpipeline的頭部,最終寫出時都會從pipeline的尾部鏈接到頭部來執行(pipeline為雙向鏈表) 為什么在channelread中能寫信息而在channelActive無法寫信息呢,經過分析發現,channelActive的觸發是在socketchannel第一次注冊的時候發生的具體代碼如下:abstrcatchannel中

private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

而抓包時發現觸發此channelactive時,服務端還未返回websocket的協議握手包(websocket協議是在在http協議上衍生的,會先發一個http get請求然后服務端返回一個為websocket協議的包給客戶端)至此問題就真相大白了,在我們添加的WebSocketServerProtocolHandler這個handller中有如下代碼

    public void handlerAdded(ChannelHandlerContext ctx) {
        ChannelPipeline cp = ctx.pipeline();
        if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
            // Add the WebSocketHandshakeHandler before this one.
            ctx.pipeline().addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
                    new WebSocketServerProtocolHandshakeHandler(websocketPath, subprotocols,
                            allowExtensions, maxFramePayloadLength, allowMaskMismatch, checkStartsWith));
        }
        if (cp.get(Utf8FrameValidator.class) == null) {
            // Add the UFT8 checking before this one.
            ctx.pipeline().addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
                    new Utf8FrameValidator());
        }
    }

添加的WebSocketServerProtocolHandshakeHandler中有如下代碼

 public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        final FullHttpRequest req = (FullHttpRequest) msg;
        if (isNotWebSocketPath(req)) {
            ctx.fireChannelRead(msg);
            return;
        }

        try {
            if (!GET.equals(req.method())) {
                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
                return;
            }

            final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                    getWebSocketLocation(ctx.pipeline(), req, websocketPath), subprotocols,
                            allowExtensions, maxFramePayloadSize, allowMaskMismatch);
            final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
            if (handshaker == null) {
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            } else {
                final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);
                handshakeFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            ctx.fireExceptionCaught(future.cause());
                        } else {
                            // Kept for compatibility
                            ctx.fireUserEventTriggered(
                                    WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
                            ctx.fireUserEventTriggered(
                                    new WebSocketServerProtocolHandler.HandshakeComplete(
                                            req.uri(), req.headers(), handshaker.selectedSubprotocol()));
                        }
                    }
                });
                WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);
                ctx.pipeline().replace(this, "WS403Responder",
                        WebSocketServerProtocolHandler.forbiddenHttpRequestResponder());
            }
        } finally {
            req.release();
        }
    }

接受握手信息時候會添加兩個handler 為websocket協議信息的編碼和解碼handler

public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
                                            HttpHeaders responseHeaders, final ChannelPromise promise) {

        if (logger.isDebugEnabled()) {
            logger.debug("{} WebSocket version {} server handshake", channel, version());
        }
        FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
        ChannelPipeline p = channel.pipeline();
        if (p.get(HttpObjectAggregator.class) != null) {
            p.remove(HttpObjectAggregator.class);
        }
        if (p.get(HttpContentCompressor.class) != null) {
            p.remove(HttpContentCompressor.class);
        }
        ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);
        final String encoderName;
        if (ctx == null) {
            // this means the user use a HttpServerCodec
            ctx = p.context(HttpServerCodec.class);
            if (ctx == null) {
                promise.setFailure(
                        new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
                return promise;
            }
            p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
            p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
            encoderName = ctx.name();
        } else {
            p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());

            encoderName = p.context(HttpResponseEncoder.class).name();
            p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());
        }
        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    ChannelPipeline p = future.channel().pipeline();
                    p.remove(encoderName);
                    promise.setSuccess();
                } else {
                    promise.setFailure(future.cause());
                }
            }
        });
        return promise;
    }

如果想要實現在websocket協議連接成功后發送一個消息給客戶端,我們發現在發送握手成功后觸發了fireUserEventTriggered,去實現userEventTriggered然后判斷evt類型做處理吧 基于netty的websocket在channelActive觸發時發送數據異常問題分析是怎樣的

以上就是基于netty的websocket在channelActive觸發時發送數據異常問題分析是怎樣的,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

宁南县| 富平县| 屏东县| 山东| 运城市| 扎鲁特旗| 廊坊市| 新丰县| 调兵山市| 旬阳县| 黄山市| 犍为县| 抚顺县| 兖州市| 昌江| 锡林郭勒盟| 上犹县| 黎平县| 易门县| 古浪县| 黔南| 普兰店市| 通榆县| 蓬安县| 微山县| 黄平县| 冷水江市| 内江市| 横峰县| 天长市| 玛多县| 江安县| 林州市| 寿阳县| 苗栗市| 陆河县| 平度市| 纳雍县| 蓬莱市| 霍城县| 邓州市|