中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Netty線程模型、Future、Channel總結和源碼分析

發布時間:2020-06-13 09:25:02 來源:網絡 閱讀:2999 作者:星恒Android 欄目:移動開發
Netty線程模型

可根據需要配置線程模型:單線程Reactor、多線程Reactor、多層線程Reactor

無論幾個線程,都通過單一的Acceptor接收客戶端請求,可以創建更多的NioEventLoop來處理IO操作。

EventLoop和EventLoopGroup實際繼承了Java的ScheduledExecutorService,使其具備了線程池的特性,其線程數量可動態配置。例如配置單線程模型,設置線程數量為1即可。

Future和Promise
Future

Future即異步操作
future操作可以被close,但結果是未知的;調用get可以獲取操作結果,但是會被阻塞;isDone可判斷是否完成操作。
ChannelFuture是為了獲取異步返回結果而設計
可以通過ChannelFutureListener接口獲得回調,無需等待get方法返回。

public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {
    ChannelFutureListener CLOSE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            future.channel().close();
        }
    };
    ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().close();
            }

        }
    };
    ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().pipeline().fireExceptionCaught(future.cause());
            }

        }
    };
}

連接超時和channel超時配置
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000);
channelFutrue.awaitUninterruptibly(10, TimeUnit.SECONDS);

注意:
1、謹慎調用await,可能導致死鎖。
2、ChannelFuture超時后如果調用了業務代碼重連,而此時IO未超時,將可能導致多條連接并存,設置IO超時時間建議小于業務代碼超時時間。

promise

升級版的future,可寫可操作(對回調過程)。future好比古代飛鴿傳書,只能等鴿子回來或者不回來,不可控;promise就像現代快遞員,送快遞送一半可以打電話給他叫他不要送了或者中途請他幫忙買個餅。
例如:
DefaultPromise類
awaitUninterruptibly()可手動打斷回調,使進程等待。

 public Promise<V> awaitUninterruptibly() {
        if (this.isDone()) {
            return this;
        } else {
            boolean interrupted = false;
            synchronized(this) {
                while(!this.isDone()) {
                    this.checkDeadLock();
                    this.incWaiters();

                    try {
                        this.wait();
                    } catch (InterruptedException var9) {
                        interrupted = true;
                    } finally {
                        this.decWaiters();
                    }
                }
            }

            if (interrupted) {
                Thread.currentThread().interrupt();
            }

            return this;
        }
    }

進行了死鎖判斷,避免已存在相同任務;并限制了最大等待數量32767

    protected void checkDeadLock() {
        EventExecutor e = this.executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(this.toString());
        }
    }

private void incWaiters() {
        if (this.waiters == 32767) {
            throw new IllegalStateException("too many waiters: " + this);
        } else {
            ++this.waiters;
        }
    }
Channel和UnSafe

Channel負責對外提供操作IO的接口,而UnSafe是Channel的內部接口類,如其名一樣是不安全的操作,所以封裝在接口內部不讓外部調用,而實際的操作IO最終都是在Unsafe中執行。

//Channel調用連接為例,跟蹤實現連接請求的過程
ChannelFuture connect(SocketAddress var1);

//DefaultChannelPipeline中執行,實際是調用尾部的pipeline
 public ChannelFuture connect(SocketAddress remoteAddress) {
        return this.tail.connect(remoteAddress);
    }

//AbstractChannelHandlerContext是Pipeline容器中的對象,
//持續尋找所有handler執行對象,直到全部被調用
 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        AbstractChannelHandlerContext next = this.findContextOutbound();
        next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
        return promise;
    }
   private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;

        do {
            ctx = ctx.prev;
        } while(!ctx.outbound);

        return ctx;
    }

//而真實的執行是尋找到UnSafe的Invoker
   public ChannelHandlerInvoker invoker() {
        return this.invoker == null ? this.channel().unsafe().invoker() : this.invoker;
    }

 public void invokeConnect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        } else if (ChannelHandlerInvokerUtil.validatePromise(ctx, promise, false)) {
            if (this.executor.inEventLoop()) {
                ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
            } else {
                this.safeExecuteOutbound(new OneTimeTask() {
                    public void run() {
                        ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
                    }
                }, promise);
            }

        }
    }
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

阿巴嘎旗| 闻喜县| 晋州市| 长宁县| 武陟县| 浪卡子县| 大石桥市| 莱西市| 蓬莱市| 石城县| 公安县| 呼伦贝尔市| 东丰县| 博野县| 墨玉县| 永胜县| 罗山县| 乃东县| 都昌县| 盐边县| 临汾市| 涟水县| 重庆市| 保靖县| 苗栗市| 通州区| 佳木斯市| 新宾| 社会| 汾阳市| 南阳市| 陆川县| 巴塘县| 邻水| 星子县| 宁波市| 平和县| 铁岭市| 介休市| 色达县| 休宁县|