您好,登錄后才能下訂單哦!
可根據需要配置線程模型:單線程Reactor、多線程Reactor、多層線程Reactor
無論幾個線程,都通過單一的Acceptor接收客戶端請求,可以創建更多的NioEventLoop來處理IO操作。
EventLoop和EventLoopGroup實際繼承了Java的ScheduledExecutorService,使其具備了線程池的特性,其線程數量可動態配置。例如配置單線程模型,設置線程數量為1即可。
Future即異步操作
future操作可以被close,但結果是未知的;調用get可以獲取操作結果,但是會被阻塞;isDone可判斷是否完成操作。
ChannelFuture是為了獲取異步返回結果而設計
可以通過ChannelFutureListener接口獲得回調,無需等待get方法返回。
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {
ChannelFutureListener CLOSE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};
ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
future.channel().close();
}
}
};
ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
}
};
}
連接超時和channel超時配置
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000);
channelFutrue.awaitUninterruptibly(10, TimeUnit.SECONDS);
注意:
1、謹慎調用await,可能導致死鎖。
2、ChannelFuture超時后如果調用了業務代碼重連,而此時IO未超時,將可能導致多條連接并存,設置IO超時時間建議小于業務代碼超時時間。
升級版的future,可寫可操作(對回調過程)。future好比古代飛鴿傳書,只能等鴿子回來或者不回來,不可控;promise就像現代快遞員,送快遞送一半可以打電話給他叫他不要送了或者中途請他幫忙買個餅。
例如:
DefaultPromise類
awaitUninterruptibly()可手動打斷回調,使進程等待。
public Promise<V> awaitUninterruptibly() {
if (this.isDone()) {
return this;
} else {
boolean interrupted = false;
synchronized(this) {
while(!this.isDone()) {
this.checkDeadLock();
this.incWaiters();
try {
this.wait();
} catch (InterruptedException var9) {
interrupted = true;
} finally {
this.decWaiters();
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}
}
進行了死鎖判斷,避免已存在相同任務;并限制了最大等待數量32767
protected void checkDeadLock() {
EventExecutor e = this.executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(this.toString());
}
}
private void incWaiters() {
if (this.waiters == 32767) {
throw new IllegalStateException("too many waiters: " + this);
} else {
++this.waiters;
}
}
Channel負責對外提供操作IO的接口,而UnSafe是Channel的內部接口類,如其名一樣是不安全的操作,所以封裝在接口內部不讓外部調用,而實際的操作IO最終都是在Unsafe中執行。
//Channel調用連接為例,跟蹤實現連接請求的過程
ChannelFuture connect(SocketAddress var1);
//DefaultChannelPipeline中執行,實際是調用尾部的pipeline
public ChannelFuture connect(SocketAddress remoteAddress) {
return this.tail.connect(remoteAddress);
}
//AbstractChannelHandlerContext是Pipeline容器中的對象,
//持續尋找所有handler執行對象,直到全部被調用
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
AbstractChannelHandlerContext next = this.findContextOutbound();
next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
return promise;
}
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while(!ctx.outbound);
return ctx;
}
//而真實的執行是尋找到UnSafe的Invoker
public ChannelHandlerInvoker invoker() {
return this.invoker == null ? this.channel().unsafe().invoker() : this.invoker;
}
public void invokeConnect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
} else if (ChannelHandlerInvokerUtil.validatePromise(ctx, promise, false)) {
if (this.executor.inEventLoop()) {
ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
} else {
this.safeExecuteOutbound(new OneTimeTask() {
public void run() {
ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
}
}, promise);
}
}
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。