中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

dubbo中ExecutionDispatcher的作用是什么

發布時間:2021-06-21 14:39:49 來源:億速云 閱讀:161 作者:Leah 欄目:大數據

本篇文章為大家展示了dubbo中ExecutionDispatcher的作用是什么,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

ExecutionDispatcher

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionDispatcher.java

public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ExecutionChannelHandler(handler, url);
    }

}
  • ExecutionDispatcher實現了Dispatcher接口,其dispatch方法返回的是ExecutionChannelHandler

ExecutionChannelHandler

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java

public class ExecutionChannelHandler extends WrappedChannelHandler {

    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        if (message instanceof Request) {
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
                // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
                // this scenario from happening, but a better solution should be considered later.
                if (t instanceof RejectedExecutionException) {
                    Request request = (Request) message;
                    if (request.isTwoWay()) {
                        String msg = "Server side(" + url.getIp() + "," + url.getPort()
                                + ") thread pool is exhausted, detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
            }
        } else {
            handler.received(channel, message);
        }
    }
}
  • ExecutionChannelHandler繼承了WrappedChannelHandler,其received方法判斷message是否是Request類型,如果是則創建ChannelEventRunnable放到線程池里頭執行,如果不是則直接執行handler.received

PerformanceServerTest

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceServerTest.java

public class PerformanceServerTest  {

    private static final Logger logger = LoggerFactory.getLogger(PerformanceServerTest.class);
    private static ExchangeServer server = null;

    private static void restartServer(int times, int alive, int sleep) throws Exception {
        if (server != null && !server.isClosed()) {
            server.close();
            Thread.sleep(100);
        }

        for (int i = 0; i < times; i++) {
            logger.info("restart times:" + i);
            server = statServer();
            if (alive > 0) Thread.sleep(alive);
            server.close();
            if (sleep > 0) Thread.sleep(sleep);
        }

        server = statServer();
    }

    private static ExchangeServer statServer() throws Exception {
        final int port = PerformanceUtils.getIntProperty("port", 9911);
        final String transporter = PerformanceUtils.getProperty(Constants.TRANSPORTER_KEY, Constants.DEFAULT_TRANSPORTER);
        final String serialization = PerformanceUtils.getProperty(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION);
        final String threadpool = PerformanceUtils.getProperty(THREADPOOL_KEY, DEFAULT_THREADPOOL);
        final int threads = PerformanceUtils.getIntProperty(THREADS_KEY, DEFAULT_THREADS);
        final int iothreads = PerformanceUtils.getIntProperty(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS);
        final int buffer = PerformanceUtils.getIntProperty(BUFFER_KEY, DEFAULT_BUFFER_SIZE);
        final String channelHandler = PerformanceUtils.getProperty(Constants.DISPATCHER_KEY, ExecutionDispatcher.NAME);


        // Start server
        ExchangeServer server = Exchangers.bind("exchange://0.0.0.0:" + port + "?transporter="
                + transporter + "&serialization="
                + serialization + "&threadpool=" + threadpool
                + "&threads=" + threads + "&iothreads=" + iothreads + "&buffer=" + buffer + "&channel.handler=" + channelHandler, new ExchangeHandlerAdapter() {
            public String telnet(Channel channel, String message) throws RemotingException {
                return "echo: " + message + "\r\ntelnet> ";
            }

            public CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException {
                if ("environment".equals(request)) {
                    return CompletableFuture.completedFuture(PerformanceUtils.getEnvironment());
                }
                if ("scene".equals(request)) {
                    List<String> scene = new ArrayList<String>();
                    scene.add("Transporter: " + transporter);
                    scene.add("Service Threads: " + threads);
                    return CompletableFuture.completedFuture(scene);
                }
                return CompletableFuture.completedFuture(request);
            }
        });

        return server;
    }

    private static ExchangeServer statTelnetServer(int port) throws Exception {
        // Start server
        ExchangeServer telnetserver = Exchangers.bind("exchange://0.0.0.0:" + port, new ExchangeHandlerAdapter() {
            public String telnet(Channel channel, String message) throws RemotingException {
                if (message.equals("help")) {
                    return "support cmd: \r\n\tstart \r\n\tstop \r\n\tshutdown \r\n\trestart times [alive] [sleep] \r\ntelnet>";
                } else if (message.equals("stop")) {
                    logger.info("server closed:" + server);
                    server.close();
                    return "stop server\r\ntelnet>";
                } else if (message.startsWith("start")) {
                    try {
                        restartServer(0, 0, 0);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return "start server\r\ntelnet>";
                } else if (message.startsWith("shutdown")) {
                    System.exit(0);
                    return "start server\r\ntelnet>";
                } else if (message.startsWith("channels")) {
                    return "server.getExchangeChannels():" + server.getExchangeChannels().size() + "\r\ntelnet>";
                } else if (message.startsWith("restart ")) { //r times [sleep] r 10 or r 10 100
                    String[] args = message.split(" ");
                    int times = Integer.parseInt(args[1]);
                    int alive = args.length > 2 ? Integer.parseInt(args[2]) : 0;
                    int sleep = args.length > 3 ? Integer.parseInt(args[3]) : 100;
                    try {
                        restartServer(times, alive, sleep);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }

                    return "restart server,times:" + times + " stop alive time: " + alive + ",sleep time: " + sleep + " usage:r times [alive] [sleep] \r\ntelnet>";
                } else {
                    return "echo: " + message + "\r\ntelnet> ";
                }

            }
        });

        return telnetserver;
    }

    @Test
    public void testServer() throws Exception {
        // Read port from property
        if (PerformanceUtils.getProperty("port", null) == null) {
            logger.warn("Please set -Dport=9911");
            return;
        }
        final int port = PerformanceUtils.getIntProperty("port", 9911);
        final boolean telnet = PerformanceUtils.getBooleanProperty("telnet", true);
        if (telnet) statTelnetServer(port + 1);
        server = statServer();

        synchronized (PerformanceServerTest.class) {
            while (true) {
                try {
                    PerformanceServerTest.class.wait();
                } catch (InterruptedException e) {
                }
            }
        }
    }

}
  • PerformanceServerTest的statServer方法使用PerformanceUtils.getProperty(Constants.DISPATCHER_KEY, ExecutionDispatcher.NAME)獲取channelHandler,找不到則使用ExecutionDispatcher.NAME

上述內容就是dubbo中ExecutionDispatcher的作用是什么,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

克什克腾旗| 普格县| 奎屯市| 义马市| 团风县| 庐江县| 嘉峪关市| 鄂伦春自治旗| 广饶县| 榆林市| 北安市| 镇巴县| 尤溪县| 通海县| 和田市| 武清区| 钟祥市| 镇宁| 盱眙县| 崇仁县| 怀化市| 鹤山市| 安乡县| 铅山县| 南漳县| 南陵县| 余姚市| 汾阳市| 曲水县| 德化县| 泗洪县| 宕昌县| 新津县| 扬中市| 澄江县| 兖州市| 亳州市| 苍溪县| 满洲里市| 达日县| 蒙山县|