中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

elasticsearch集群zendiscovery的Ping機制是什么

發布時間:2022-04-22 09:08:29 來源:億速云 閱讀:235 作者:iii 欄目:開發技術

這篇文章主要介紹“elasticsearch集群zendiscovery的Ping機制是什么”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“elasticsearch集群zendiscovery的Ping機制是什么”文章能幫助大家解決問題。

     zenDiscovery實現機制

    ping是集群發現的基本手段,通過在網絡上廣播或者指定ping某些節點獲取集群信息,從而可以找到集群的master加入集群。zenDiscovery實現了兩種ping機制:廣播與單播。本篇將詳細分析一些這MulticastZenPing機制的實現為后面的集群發現和master選舉做好鋪墊。

    廣播的過程

    首先看一下廣播(MulticastZenPing),廣播的原理很簡單,節點啟動后向網絡發送廣播信息,任何收到的節點只要集群名字相同都應該對此廣播信息作出回應。這樣該節點就獲取了集群的相關信息。它定義了一個action:"internal:discovery/zen/multicast"和廣播的信息頭:INTERNAL_HEADER 。之前說過NettyTransport是cluster通信的基礎,但是廣播卻沒有使它。它使用了java的MulticastSocket。這里簡單的介紹一下MulticastSocket的使用。它是一個UDP 機制的socket,用來進行多個數據包的廣播。它可以幫到一個ip形成一個group,任何MulticastSocket都可以join進來,組內的socket發送的信息會被訂閱了改組的所有機器接收到。elasticsearch對其進行了封裝形成了MulticastChannel,有興趣可以參考相關源碼。 

    首先看一下MulticastZenPing的幾個輔助內部類:

    elasticsearch集群zendiscovery的Ping機制是什么

    它總共定義了4個內部類,這些內部類和它一起完成廣播功能。FinalizingPingCollection是一pingresponse的容器,所有的響應都用它來存儲。MulticastPingResponseRequestHandler它是response處理類,類似于之前所說的nettytransportHandler,它雖然使用的不是netty,但是它也定義了一個messageReceived的方法,當收到請求時直接返回一個response。

    MulticastPingResponse就不用細說了,它就是一個響應類。最后要著重說一下Receiver類,因為廣播并不是使用NettyTransport,因此對于消息處理邏輯都在Receiver中。在初始化MulticastZenPing時會將receiver注冊進去。

    protected void doStart() throws ElasticsearchException {
            try {
                ....
                multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
                        new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
                        new Receiver());//將receiver注冊到channel中
            } catch (Throwable t) {
              ....
            }
        }

    Receiver類基礎了Listener,實現了3個方法,消息經過onMessage方法區分,如果是內部ping則使用handleNodePingRequest方法處理,否則使用handleExternalPingRequest處理,區分方法很簡單,就是讀取信息都看它是否符合所定義的INTERNAL_HEADER 信息頭。

    nodeping處理代碼

    private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
               ....
                final DiscoveryNodes discoveryNodes = contextProvider.nodes();
                final DiscoveryNode requestingNode = requestingNodeX;
                if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
                    // 自身發出的ping,忽略
                    return;
                }
            //只接受本集群ping
                if (!requestClusterName.equals(clusterName)) {
                ...return;
                }
                // 兩個client間不需要ping
                if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return;
                }
            //新建一個response
                final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
                multicastPingResponse.id = id;
                multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
            //無法連接的情況
                if (!transportService.nodeConnected(requestingNode)) {
                    // do the connect and send on a thread pool
                    threadPool.generic().execute(new Runnable() {
                        @Override
                        public void run() {
                            // connect to the node if possible
                            try {
                                transportService.connectToNode(requestingNode);
                                transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                                    @Override
                                    public void handleException(TransportException exp) {
                                        logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                                    }
                                });
                            } catch (Exception e) {
                                if (lifecycle.started()) {
                                    logger.warn("failed to connect to requesting node {}", e, requestingNode);
                                }
                            }
                        }
                    });
                } else {
                    transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                        @Override
                        public void handleException(TransportException exp) {
                            if (lifecycle.started()) {
                                logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                            }
                        }
                    });
                }
            }
        }

    另外的一個方法是處理外部ping信息,處理過程是返回cluster的信息(這種外部ping的具體作用沒有研究不是太清楚)。以上是響應MulticastZenPing的過程,收到其它節點的響應信息后它會把本節點及集群的master節點相關信息返回給廣播節點。這樣廣播節點就獲知了集群的相關信息。在MulticastZenPing類中還有一個類 MulticastPingResponseRequestHandler,它的作用是廣播節點對其它節點對廣播信息響應的回應,廣播節點的第二次發送信息的過程。它跟其它TransportRequestHandler一樣它有messageReceived方法,在啟動時注冊到transportserver中,只處理一類action:"internal:discovery/zen/multicast"。

    ping請求的發送策略

    代碼如下:

    public void ping(final PingListener listener, final TimeValue timeout) {
           ....
        //產生一個id
            final int id = pingIdGenerator.incrementAndGet();
            try {
                receivedResponses.put(id, new PingCollection());
                sendPingRequest(id);//第一次發送ping請求
                // 等待時間的1/2后再次發送一個請求
                threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                    @Override
                    public void onFailure(Throwable t) {
                        logger.warn("[{}] failed to send second ping request", t, id);
                        finalizePingCycle(id, listener);
                    }
                    @Override
                    public void doRun() {
                        sendPingRequest(id);
                //再過1/2時間再次發送一個請求
                        threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                            @Override
                            public void onFailure(Throwable t) {
                                logger.warn("[{}] failed to send third ping request", t, id);
                                finalizePingCycle(id, listener);
                            }
                            @Override
                            public void doRun() {
                                // make one last ping, but finalize as soon as all nodes have responded or a timeout has past
                                PingCollection collection = receivedResponses.get(id);
                                FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
                                receivedResponses.put(id, finalizingPingCollection);
                                logger.trace("[{}] sending last pings", id);
                                sendPingRequest(id);
                    //最后一次發送請求,超時的1/4后
                                threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                                    @Override
                                    public void onFailure(Throwable t) {
                                        logger.warn("[{}] failed to finalize ping", t, id);
                                    }
                                    @Override
                                    protected void doRun() throws Exception {
                                        finalizePingCycle(id, listener);
                                    }
                                });
                            }
                        });
                    }
                });
            } catch (Exception e) {
                logger.warn("failed to ping", e);
                finalizePingCycle(id, listener);
            }
        }

    發送過程主要是調用sendPingRequest(id)方法,在該方法中會將id,信息頭,版本,本地節點信息一起寫入到BytesStreamOutput中然后將其進行廣播,這個廣播信息會被其它機器上的Receiver接收并處理,并且響應該ping請求。另外一個需要關注的是以上加說明的部分,它通過鏈時的定期發送請求,在等待時間內可能會發出4次請求,這種發送方式會造成大量的ping請求重復,幸好ping的資源消耗小,但是好處是可以盡可能保證在timeout這個時間段內集群的新增節點都能收到這個ping信息。在單播中也采用了該策略。

    關于“elasticsearch集群zendiscovery的Ping機制是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    庆阳市| 红原县| 兴国县| 横山县| 建水县| 孟州市| 崇明县| 惠水县| 汤原县| 额敏县| 新沂市| 红河县| 石嘴山市| 西畴县| 武胜县| 乌兰察布市| 桦甸市| 南靖县| 思南县| 大竹县| 靖安县| 沿河| 佛冈县| 乌恰县| 宁安市| 德江县| 全南县| 彭阳县| 枣阳市| 股票| 青冈县| 舞钢市| 兴安盟| 康乐县| 休宁县| 瑞昌市| 田林县| 奈曼旗| 德庆县| 同仁县| 陆良县|