中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Reactor模型如何實現

發布時間:2023-03-16 16:50:03 來源:億速云 閱讀:161 作者:iii 欄目:開發技術

這篇文章主要介紹了Reactor模型如何實現的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Reactor模型如何實現文章都會有所收獲,下面我們一起來看看吧。

    一. Reactor設計模式

    Reactor翻譯過來的意思是:反應堆,所以Reactor設計模式本質是基于事件驅動的。在Reactor設計模式中,存在如下幾個角色。

    • Handle(事件)。Reactor整體是基于Handle進行驅動,這里的Handle叫做事件,可以類比為BIO中的Socket,NIO中的Socket管道。比如當Socket管道有連接建立,或者有數據可讀,那么此時就稱作事件發生;

    • EventHandler(事件處理器)。有事件發生,就需要有相應的組件來處理事件,那么這里的組件就叫做事件處理器。EventHandler是一個抽象概念,其會有不同的具體實現,因為事件會有不同的類型,那么不同類型的事件,肯定都需要有相應的具體處理邏輯,這里的具體處理邏輯,就由EventHandler的具體實現來承載;

    • Concrete Event Handler(具體事件處理器)。是EventHandler的具體實現,用于處理不同類型的事件;

    • Synchronous Event Demultiplexer(事件多路分解器)。(這里將Synchronous Event Demultiplexer簡稱為Demultiplexer)Demultiplexer用于監聽事件并得到所有發生事件的集合,在監聽的狀態下是阻塞的,直到有事件發生為止。Demultiplexer有一個很好的類比,就是NIO中的多路復用器Selector,當調用Selector的select() 方法后,會進入監聽狀態,當從select() 方法返回時,會得到SelectionKey的一個集合,而每一個SelectionKey中就保存著有事件發生的Socket管道;

    • Initiation Dispatcher(事件分發器)。現在已經有Concrete Event Handler(具體事件處理器)來處理不同的事件,也能通過Synchronous Event Demultiplexer(事件多路分解器)拿到發生的事件,那么最后需要做的事情,肯定就是將事件分發到正確的事件處理器上進行處理,而Initiation Dispatcher就是完成這個分發的事情。

    Reactor設計模式的一個簡單類圖,如下所示。

    Reactor模型如何實現

    通常,Reactor設計模式中的Reactor,可以理解為上述圖中的Synchronous Event Demultiplexer + Initiation Dispatcher。

    二. 單Reactor單線程模型

    單Reactor單線程模型中,只有一個Reactor在監聽事件和分發事件,并且監聽事件,分發事件和處理事件都在一個線程中完成。示意圖如下所示。

    Reactor模型如何實現

    上述示意圖中,一次完整的處理流程可以概括如下。

    • Reactor監聽到ACCEPT事件發生,表示此時有客戶端建立連接;

    • Reactor將ACCEPT事件分發給Acceptor處理;

    • Acceptor會在服務端創建與客戶端通信的client-socket管道,然后注冊到IO多路復用器selector上,并監聽READ事件;

    • Reactor監聽到READ事件發生,表示此時客戶端數據可讀;

    • Reactor將ACCEPT事件分發給Handler處理,Handler處理READ事件就會基于client-socket管道完成客戶端數據的讀取。

    下面將基于Java語言,實現一個簡單的單Reactor單線程模型的服務端,整體代碼實現完全符合上述示意圖,大家可以進行參照閱讀。

    首先實現Reactor,如下所示。

    public class Reactor implements Runnable {
        private final Selector selector;
        public Reactor(int port) throws IOException {
            // 開啟多路復用
            selector = Selector.open();
            // 服務端創建listen-socket管道
            ServerSocketChannel listenSocketChannel = ServerSocketChannel.open();
            // 綁定端口
            listenSocketChannel.socket().bind(new InetSocketAddress(port));
            // 設置為非阻塞模式
            listenSocketChannel.configureBlocking(false);
            // ACCEPT事件的附加器是Acceptor
            listenSocketChannel.register(selector, SelectionKey.OP_ACCEPT,
                    new Acceptor(selector, listenSocketChannel));
        }
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    // 獲取發生的事件
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterable = selectionKeys.iterator();
                    while (iterable.hasNext()) {
                        // 對事件進行分發
                        dispatch(iterable.next());
                        iterable.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                LockSupport.parkNanos(1000 * 1000 * 1000);
            }
        }
        private void dispatch(SelectionKey selectionKey) {
            // 獲取事件的附加器
            // ACCEPT事件的附加器是Acceptor,故由Acceptor來處理ACCEPT事件
            // READ事件的附加器是Handler,故由Handler來處理READ事件
            Runnable attachment = (Runnable) selectionKey.attachment();
            if (attachment != null) {
                attachment.run();
            }
        }
    }

    已知Reactor會監聽客戶端連接的ACCEPT事件,還已知ACCEPT事件由Acceptor處理,所以在向多路復用器注冊服務端用于監聽客戶端連接的listen-socket管道時,添加了一個Acceptor作為附加器,那么當發生ACCEPT事件時,就能夠獲取到作為ACCEPT事件附加器的Acceptor來處理ACCEPT事件。

    下面看一下Acceptor的實現,如下所示。

    public class Acceptor implements Runnable {
        private final Selector selector;
        private final ServerSocketChannel listenSocketChannel;
        public Acceptor(Selector selector, ServerSocketChannel listenSocketChannel) {
            this.selector = selector;
            this.listenSocketChannel = listenSocketChannel;
        }
        @Override
        public void run() {
            try {
                // 為連接的客戶端創建client-socket管道
                SocketChannel clientSocketChannel = listenSocketChannel.accept();
                // 設置為非阻塞
                clientSocketChannel.configureBlocking(false);
                // READ事件的附加器是Handler
                clientSocketChannel.register(selector, SelectionKey.OP_READ,
                        new Handler(clientSocketChannel));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    在Acceptor中就是在服務端創建與客戶端通信的client-socket管道,然后注冊到多路復用器上并指定監聽READ事件,同時又因為READ事件由Handler處理,所以還添加了一個Handler作為附加器,當READ事件發生時可以獲取到作為READ事件附加器的Handler來處理READ事件。

    下面看一下Handler的實現,如下所示。

    public class Handler implements Runnable {
        private final SocketChannel clientSocketChannel;
        public Handler(SocketChannel clientSocketChannel) {
            this.clientSocketChannel = clientSocketChannel;
        }
        @Override
        public void run() {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            try {
                // 讀取數據
                int read = clientSocketChannel.read(byteBuffer);
                if (read <= 0) {
                    clientSocketChannel.close();
                } else {
                    System.out.println(new String(byteBuffer.array()));
                }
            } catch (IOException e1) {
                try {
                    clientSocketChannel.close();
                } catch (IOException e2) {
                    e2.printStackTrace();
                }
                e1.printStackTrace();
            }
        }
    }

    在Handler中就是簡單的讀取數據并打印,當讀取數據為空或者發生異常時,需要及時將管道關閉。

    最后編寫一個主程序將Reactor運行起來,如下所示。

    public class MainServer {
        public static void main(String[] args) throws IOException {
            Thread reactorThread = new Thread(new Reactor(8080));
            reactorThread.start();
        }
    }

    現在來思考一下,單Reactor單線程模型有什么優點和缺點。優點其實就是模型簡單,實現方便。缺點有兩點,如下所示。

    • 一個Reactor同時負責監聽ACCEPT事件和READ事件;

    • 只有一個線程在工作,處理效率低,無法利用多核CPU的優勢。

    但是盡管單Reactor單線程模型有上述的缺點,但是著名的緩存中間件Redis的服務端,就是使用的單Reactor單線程模型,示意圖如下。

    Reactor模型如何實現

    那為什么以性能著稱的Redis會采取單Reactor單線程模型呢,其實就是因為Redis的操作都在內存中,讀寫都非常快速,所以單Reactor單線程模型也能運行得很流暢,同時還避免了多線程下的各種并發問題。

    三. 單Reactor多線程模型

    在理解了單Reactor單線程模型后,那么肯定就能想到,假如在Handler中處理READ事件的這個事情能夠使用一個線程池來完成,從而就可以實現READ事件的處理不會阻塞主線程。而這樣的一個模型,其實就是單Reactor多線程模型,示意圖如下所示。

    Reactor模型如何實現

    和單Reactor單線程模型唯一的不同,就是在Handler中多了一個線程池。

    單Reactor多線程模型的代碼實現,除了Handler以外,其余和單Reactor單線程模型一摸一樣,所以下面就看一下單Reactor多線程模型中的Handler實現,如下所示。

    public class Handler implements Runnable {
        private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(16, 32,
                60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200));
        private final SocketChannel clientSocketChannel;
        public Handler(SocketChannel clientSocketChannel) {
            this.clientSocketChannel = clientSocketChannel;
        }
        @Override
        public void run() {
            threadPool.execute(() -> {
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                try {
                    // 讀取數據
                    int read = clientSocketChannel.read(byteBuffer);
                    if (read <= 0) {
                        clientSocketChannel.close();
                    } else {
                        System.out.println(new String(byteBuffer.array()));
                    }
                    // 睡眠10S,演示任務執行耗時長也不會阻塞處理其它客戶端請求
                    LockSupport.parkNanos(1000 * 1000 * 1000 * 10L);
                } catch (IOException e1) {
                    try {
                        clientSocketChannel.close();
                    } catch (IOException e2) {
                        e2.printStackTrace();
                    }
                    e1.printStackTrace();
                }
            });
        }
    }

    其實就是每一個READ事件的處理會作為一個任務被扔到線程池中去處理。

    單Reactor多線程模型雖然解決了只有一個線程的問題,但是可以發現,仍舊是只有一個Reactor在同時監聽ACCEPT事件和READ事件。

    那么現在思考一下,為什么一個Reactor同時監聽ACCEPT事件和READ事件是不好的。其實就是因為通常客戶端連接的建立是不頻繁的,但是連接建立后數據的收發是頻繁的,所以如果能夠將監聽READ事件這個動作拆分出來,讓多個子Reactor來監聽READ事件,而原來的主Reactor只監聽ACCEPT事件,那么整體的效率,會進一步提升,而這,就是主從Reactor多線程模型。

    四. 主從Reactor多線程模型

    主從Reactor模型中,有一個主Reactor,專門監聽ACCEPT事件,然后有多個從Reactor,專門監聽READ事件,示意圖如下所示。

    Reactor模型如何實現

    上述示意圖中,一次完整的處理流程可以概括如下。

    • 主Reactor監聽到ACCEPT事件發生,表示此時有客戶端建立連接;

    • 主Reactor將ACCEPT事件分發給Acceptor處理;

    • Acceptor會在服務端創建與客戶端通信的client-socket管道,然后注冊到從Reactor的IO多路復用器selector上,并監聽READ事件;

    • 從Reactor監聽到READ事件發生,表示此時客戶端數據可讀;

    • 從Reactor將ACCEPT事件分發給Handler處理,Handler處理READ事件就會基于client-socket管道完成客戶端數據的讀取。

    下面將基于Java語言,實現一個簡單的主從Reactor多線程模型的服務端,整體代碼實現完全符合上述示意圖,大家可以進行參照閱讀。

    首先是主Reactor的實現,如下所示。

    public class MainReactor implements Runnable {
        private final Selector selector;
        public MainReactor(int port) throws IOException {
            // 開多路復用器
            selector = Selector.open();
            // 服務端創建listen-socket管道
            ServerSocketChannel listenSocketChannel = ServerSocketChannel.open();
            // 設置為非阻塞
            listenSocketChannel.configureBlocking(false);
            // 綁定監聽端口
            listenSocketChannel.socket().bind(new InetSocketAddress(port));
            // 將listen-socket管道綁定到主Reactor的多路復用器上
            // 并且主Reactor上只會注冊listen-socket管道,用于監聽ACCEPT事件
            listenSocketChannel.register(selector, SelectionKey.OP_ACCEPT,
                    new Acceptor(listenSocketChannel));
        }
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterable = selectionKeys.iterator();
                    while (iterable.hasNext()) {
                        // 對事件進行分發
                        dispatch(iterable.next());
                        iterable.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                LockSupport.parkNanos(1000 * 1000 * 1000);
            }
        }
        private void dispatch(SelectionKey selectionKey) {
            // 獲取事件附加器,只會是Acceptor
            Runnable attachment = (Runnable) selectionKey.attachment();
            if (attachment != null) {
                attachment.run();
            }
        }
    }

    主Reactor的實現中,還是先創建服務端監聽客戶端連接的listen-socket管道,然后注冊到主Reactor的IO多路復用器上,并監聽ACCEPT事件,同時我們現在知道,主Reactor的IO多路復用器上只會注冊listen-socket管道且只會監聽ACCEPT事件。同樣,也添加了一個Acceptor作為附加器,那么當發生ACCEPT事件時,就能夠獲取到作為ACCEPT事件附加器的Acceptor來處理ACCEPT事件。

    下面是Acceptor的實現,如下所示。

    public class Acceptor implements Runnable {
        // 指定從Reactor一共有16個
        private static final int TOTAL_SUBREACTOR_NUM = 16;
        // 服務端的listen-socket管道
        private final ServerSocketChannel listenSocketChannel;
        // 用于運行從Reactor
        private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                TOTAL_SUBREACTOR_NUM, TOTAL_SUBREACTOR_NUM * 2,
                60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200));
        // 從Reactor集合
        private final List<SubReactor> subReactors = new ArrayList<>(TOTAL_SUBREACTOR_NUM);
        public Acceptor(ServerSocketChannel listenSocketChannel) throws IOException {
            this.listenSocketChannel = listenSocketChannel;
            // 將從Reactor初始化出來并運行
            for (int i = 0; i < TOTAL_SUBREACTOR_NUM; i++) {
                SubReactor subReactor = new SubReactor(Selector.open());
                subReactors.add(subReactor);
                threadPool.execute(subReactor);
            }
        }
        @Override
        public void run() {
            try {
                // 為連接的客戶端創建client-socket管道
                SocketChannel clientSocketChannel = listenSocketChannel.accept();
                // 設置為非阻塞
                clientSocketChannel.configureBlocking(false);
                // 任意選擇一個從Reactor,讓其監聽連接的客戶端的READ事件
                Optional<SubReactor> anySubReactor = subReactors.stream().findAny();
                if (anySubReactor.isPresent()) {
                    SubReactor subReactor = anySubReactor.get();
                    // 從Reactor的多路復用器會阻塞在select()方法上
                    // 這里需要先喚醒多路復用器,立即從select()方法返回
                    subReactor.getSelector().wakeup();
                    // 讓從Reactor負責處理客戶端的READ事件
                    clientSocketChannel.register(subReactor.getSelector(), SelectionKey.OP_READ,
                            new Handler(clientSocketChannel));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    首先在Acceptor的構造函數中,會將所有從Reactor初始化出來,并且每一個從Reactor都會持有一個IO多路復用器。當一個從Reactor創建出來后就會立即運行,此時從Reactor的IO多路復用器就會開始監聽,即阻塞在select() 方法上。

    然后在Acceptor的主體邏輯中,會為連接的客戶端創建client-socket管道,然后從所有從Reactor中基于某種策略(隨機)選擇一個從Reactor,并將client-socket管道注冊在選擇的從Reactor的IO多路復用器上,有一點需要注意,此時從Reactor的IO多路復用器可能會阻塞在select() 方法上,所以注冊前需要先通過wakeup() 方法進行喚醒。

    接下來繼續看從Reactor的實現,如下所示。

    public class SubReactor implements Runnable {
        private final Selector selector;
        public SubReactor(Selector selector) {
            this.selector = selector;
        }
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        // 對事件進行分發
                        dispatch(iterator.next());
                        iterator.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                LockSupport.parkNanos(1000 * 1000 * 1000);
            }
        }
        private void dispatch(SelectionKey selectionKey) {
            // 獲取事件附加器,只會是Handler
            Runnable runnable = (Runnable) selectionKey.attachment();
            if (runnable != null) {
                runnable.run();
            }
        }
        public Selector getSelector() {
            return selector;
        }
    }

    從Reactor的實現中,會監聽服務端為連接的客戶端創建的client-socket管道上的READ事件,一旦有READ事件發生,就會使用作為附加器的Handler來處理READ事件。同樣,從Reactor的IO多路復用器上只會注冊client-socket管道且只會監聽READ事件。

    然后是Handler,因為是多線程模型,所以其實現和第三節中的Handler完全一樣,下面再貼一下代碼。

    public class Handler implements Runnable {
        private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(16, 32,
                60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200));
        private final SocketChannel clientSocketChannel;
        public Handler(SocketChannel clientSocketChannel) {
            this.clientSocketChannel = clientSocketChannel;
        }
        @Override
        public void run() {
            threadPool.execute(() -> {
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                try {
                    // 讀取數據
                    int read = clientSocketChannel.read(byteBuffer);
                    if (read <= 0) {
                        clientSocketChannel.close();
                    } else {
                        System.out.println(new String(byteBuffer.array()));
                    }
                    // 睡眠10S,演示任務執行耗時長也不會阻塞處理其它客戶端請求
                    LockSupport.parkNanos(1000 * 1000 * 1000 * 10L);
                } catch (IOException e1) {
                    try {
                        clientSocketChannel.close();
                    } catch (IOException e2) {
                        e2.printStackTrace();
                    }
                    e1.printStackTrace();
                }
            });
        }
    }

    最后編寫一個主程序將主Reactor運行起來,如下所示。

    public class MainServer {
        public static void main(String[] args) throws IOException {
            Thread mainReactorThread = new Thread(new MainReactor(8080));
            mainReactorThread.start();
        }
    }

    關于“Reactor模型如何實現”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Reactor模型如何實現”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    永春县| 枣阳市| 榆社县| 石阡县| 通辽市| 小金县| 鄂尔多斯市| 冀州市| 青岛市| 申扎县| 洪洞县| 政和县| 宣汉县| 屏东县| 尤溪县| 安远县| 汉源县| 邵阳市| 得荣县| 晴隆县| 广元市| 大冶市| 邹平县| 上杭县| 额尔古纳市| 孙吴县| 禄劝| 普安县| 万安县| 佛教| 广水市| 高淳县| 花莲县| 铅山县| 龙陵县| 通江县| 藁城市| 泾阳县| 九龙城区| 信丰县| 黄梅县|