中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

dubbo中ConnectionOrderedDispatcher的作用是什么

發布時間:2021-06-15 14:06:59 來源:億速云 閱讀:238 作者:Leah 欄目:大數據

本篇文章為大家展示了dubbo中ConnectionOrderedDispatcher的作用是什么,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedDispatcher.java

public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ConnectionOrderedChannelHandler(handler, url);
    }

}
  • ConnectionOrderedDispatcher實現了Dispatcher接口,其dispatch方法返回的是ConnectionOrderedChannelHandler

ConnectionOrderedChannelHandler

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queuewarninglimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout.
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                Request request = (Request) message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queuewarninglimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
        }
    }
}
  • ConnectionOrderedChannelHandler繼承了WrappedChannelHandler,其構造器創建了corePoolSize及maximumPoolSize均為1,queue為LinkedBlockingQueue的connectionExecutor

  • 其connected、disconnected方法均是使用connectionExecutor來執行新創建的ChannelEventRunnable;這兩個方法均會先執行checkQueueLength來判斷queue大小是否大于queuewarninglimit,大于的話則打印warn日志

  • 其received、caught均是通過父類的getExecutorService獲取線程池,然后執行創建的ChannelEventRunnable;received方法在捕獲到異常時RejectedExecutionException且message是Request,而且request是twoWay的時候會返回SERVER_THREADPOOL_EXHAUSTED_ERROR

ConnectChannelHandlerTest

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/handler/ConnectChannelHandlerTest.java

public class ConnectChannelHandlerTest extends WrappedChannelHandlerTest {

    @BeforeEach
    public void setUp() throws Exception {
        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true), url);
    }

    @Test
    public void test_Connect_Blocked() throws RemotingException {
        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);
        ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "connectionExecutor", 1);
        Assertions.assertEquals(1, executor.getMaximumPoolSize());

        int runs = 20;
        int taskCount = runs * 2;
        for (int i = 0; i < runs; i++) {
            handler.connected(new MockedChannel());
            handler.disconnected(new MockedChannel());
            Assertions.assertTrue(executor.getActiveCount() <= 1, executor.getActiveCount() + " must <=1");
        }
        //queue.size 
        Assertions.assertEquals(taskCount - 1, executor.getQueue().size());

        for (int i = 0; i < taskCount; i++) {
            if (executor.getCompletedTaskCount() < taskCount) {
                sleep(100);
            }
        }
        Assertions.assertEquals(taskCount, executor.getCompletedTaskCount());
    }

    @Test //biz error should not throw and affect biz thread.
    public void test_Connect_Biz_Error() throws RemotingException {
        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true), url);
        handler.connected(new MockedChannel());
    }

    @Test //biz error should not throw and affect biz thread.
    public void test_Disconnect_Biz_Error() throws RemotingException {
        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true), url);
        handler.disconnected(new MockedChannel());
    }

    @Test
    public void test_Connect_Execute_Error() throws RemotingException {
        Assertions.assertThrows(ExecutionException.class, () -> {
            handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);
            ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "connectionExecutor", 1);
            executor.shutdown();
            handler.connected(new MockedChannel());
        });
    }

    @Test
    public void test_Disconnect_Execute_Error() throws RemotingException {
        Assertions.assertThrows(ExecutionException.class, () -> {
            handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);
            ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "connectionExecutor", 1);
            executor.shutdown();
            handler.disconnected(new MockedChannel());
        });
    }

    //throw  ChannelEventRunnable.runtimeExeception(int logger) not in execute exception
    @Test//(expected = RemotingException.class)
    public void test_MessageReceived_Biz_Error() throws RemotingException {
        handler.received(new MockedChannel(), "");
    }

    //throw  ChannelEventRunnable.runtimeExeception(int logger) not in execute exception
    @Test
    public void test_Caught_Biz_Error() throws RemotingException {
        handler.caught(new MockedChannel(), new BizException());
    }

    @Test
    public void test_Received_InvokeInExecuter() throws RemotingException {
        Assertions.assertThrows(ExecutionException.class, () -> {
            handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);
            ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "SHARED_EXECUTOR", 1);
            executor.shutdown();
            executor = (ThreadPoolExecutor) getField(handler, "executor", 1);
            executor.shutdown();
            handler.received(new MockedChannel(), "");
        });
    }

    /**
     * Events do not pass through the thread pool and execute directly on the IO
     */
    @SuppressWarnings("deprecation")
    @Disabled("Heartbeat is processed in HeartbeatHandler not WrappedChannelHandler.")
    @Test
    public void test_Received_Event_invoke_direct() throws RemotingException {
        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);
        ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "SHARED_EXECUTOR", 1);
        executor.shutdown();
        executor = (ThreadPoolExecutor) getField(handler, "executor", 1);
        executor.shutdown();
        Request req = new Request();
        req.setHeartbeat(true);
        final AtomicInteger count = new AtomicInteger(0);
        handler.received(new MockedChannel() {
            @Override
            public void send(Object message) throws RemotingException {
                Assertions.assertTrue(((Response) message).isHeartbeat(), "response.heartbeat");
                count.incrementAndGet();
            }
        }, req);
        Assertions.assertEquals(1, count.get(), "channel.send must be invoke");
    }
}
  • ConnectChannelHandlerTest在setup時創建的是ConnectionOrderedChannelHandler,然后進行了test_Connect_Blocked、test_Connect_Biz_Error、test_Disconnect_Biz_Error、test_Connect_Execute_Error、test_Disconnect_Execute_Error、test_MessageReceived_Biz_Error、test_Caught_Biz_Error、test_Received_InvokeInExecuter、test_Received_Event_invoke_direct

小結

  • ConnectionOrderedDispatcher實現了Dispatcher接口,其dispatch方法返回的是ConnectionOrderedChannelHandler;ConnectionOrderedChannelHandler繼承了WrappedChannelHandler,其構造器創建了corePoolSize及maximumPoolSize均為1,queue為LinkedBlockingQueue的connectionExecutor

  • ConnectionOrderedChannelHandler的connected、disconnected方法均是使用connectionExecutor來執行新創建的ChannelEventRunnable;這兩個方法均會先執行checkQueueLength來判斷queue大小是否大于queuewarninglimit,大于的話則打印warn日志

  • ConnectionOrderedChannelHandler的received、caught均是通過父類的getExecutorService獲取線程池,然后執行創建的ChannelEventRunnable;received方法在捕獲到異常時RejectedExecutionException且message是Request,而且request是twoWay的時候會返回SERVER_THREADPOOL_EXHAUSTED_ERROR

上述內容就是dubbo中ConnectionOrderedDispatcher的作用是什么,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

肥乡县| 栾城县| 三台县| 甘孜县| 鹤山市| 额尔古纳市| 图们市| 任丘市| 胶州市| 利川市| 峨山| 漳州市| 洪雅县| 固阳县| 鄱阳县| 比如县| 大城县| 富阳市| 海宁市| 绥化市| 宿迁市| 临汾市| 出国| 澄江县| 华池县| 通城县| 阿巴嘎旗| 垣曲县| 台北县| 陕西省| 合肥市| 尚义县| 四会市| 彭阳县| 邵武市| 安岳县| 塔河县| 仙桃市| 叶城县| 肥乡县| 全椒县|