您好,登錄后才能下訂單哦!
本篇內容介紹了“dubbo的WrappedChannelHandler有什么作用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
本文主要研究一下dubbo的WrappedChannelHandler
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java
public class WrappedChannelHandler implements ChannelHandlerDelegate { protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class); protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true)); protected final ExecutorService executor; protected final ChannelHandler handler; protected final URL url; public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { componentKey = CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); } public void close() { try { if (executor != null) { executor.shutdown(); } } catch (Throwable t) { logger.warn("fail to destroy thread pool of server: " + t.getMessage(), t); } } @Override public void connected(Channel channel) throws RemotingException { handler.connected(channel); } @Override public void disconnected(Channel channel) throws RemotingException { handler.disconnected(channel); } @Override public void sent(Channel channel, Object message) throws RemotingException { handler.sent(channel, message); } @Override public void received(Channel channel, Object message) throws RemotingException { handler.received(channel, message); } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { handler.caught(channel, exception); } public ExecutorService getExecutor() { return executor; } @Override public ChannelHandler getHandler() { if (handler instanceof ChannelHandlerDelegate) { return ((ChannelHandlerDelegate) handler).getHandler(); } else { return handler; } } public URL getUrl() { return url; } public ExecutorService getExecutorService() { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } return cexecutor; } }
WrappedChannelHandler的構造根據ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url)獲取ExecutorService,然后放到dataStore中
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
public class ExecutionChannelHandler extends WrappedChannelHandler { public ExecutionChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getExecutorService(); if (message instanceof Request) { try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly, // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent // this scenario from happening, but a better solution should be considered later. if (t instanceof RejectedExecutionException) { Request request = (Request) message; if (request.isTwoWay()) { String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") thread pool is exhausted, detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event.", t); } } else { handler.received(channel, message); } } }
ExecutionChannelHandler繼承了WrappedChannelHandler,其received會創建ChannelEventRunnable,然后放到executor去執行
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelEventRunnable.java
public class ChannelEventRunnable implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class); private final ChannelHandler handler; private final Channel channel; private final ChannelState state; private final Throwable exception; private final Object message; public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state) { this(channel, handler, state, null); } public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) { this(channel, handler, state, message, null); } public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Throwable t) { this(channel, handler, state, null, t); } public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) { this.channel = channel; this.handler = handler; this.state = state; this.message = message; this.exception = exception; } @Override public void run() { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } break; case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } } } /** * ChannelState * * */ public enum ChannelState { /** * CONNECTED */ CONNECTED, /** * DISCONNECTED */ DISCONNECTED, /** * SENT */ SENT, /** * RECEIVED */ RECEIVED, /** * CAUGHT */ CAUGHT } }
ChannelEventRunnable實現了Runnable接口,其run方法根據不同的ChannelState做不同處理
WrappedChannelHandler的構造根據ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url)獲取ExecutorService,然后放到dataStore中
ExecutionChannelHandler繼承了WrappedChannelHandler,其received會創建ChannelEventRunnable,然后放到executor去執行
ChannelEventRunnable實現了Runnable接口,其run方法根據不同的ChannelState做不同處理
“dubbo的WrappedChannelHandler有什么作用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。