中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何對FastLeaderElection進行源碼解析

發布時間:2021-09-10 16:57:56 來源:億速云 閱讀:137 作者:柒染 欄目:編程語言

這篇文章將為大家詳細講解有關如何對FastLeaderElection進行源碼解析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

zookeeper作為常用的分布式協調器、注冊中心,在初始化啟動、leader宕機、或者與leader心跳超時等情況下,為保證集群可用性會進行重新選主。選主的默認算法是FastLeaderElection,本文會對FastLeaderElection進行解析。

選主入口

 在QuorumPeer.run()方法中,進行主循環while(running)。如果當前狀態為LOOKING,則進入重新選主的方法lookForLeader。

//主循環
while (running){
    ...

    switch (getPeerState()) {
        case LOOKING:
            ...
            //調用makeLEStrategy獲取選主策略,默認為FastLeaderElection
            //進入lookForLeader方法,開啟重新選主。
            //把lookForLeader返回的結果調用setCurrentVote緩存起來。
            setCurrentVote(makeLEStrategy().lookForLeader());
            ...
            break;
        case OBSERVING:
            ...
            break;
        case FOLLOWING:
            ...
            break;
        case LEADING:
            break;
    }
}

 選主流程

進入FastLeaderElection#lookForLeader后,代碼大致邏輯如下

1.leader選舉周期版本號+1,發起第一次投票,默認選自己。

2.開啟循環,接收其他zk節點發來的投票通知

    2.1接收不到其他節點的投票時,指數級延長接收等待時間并重新執行步驟2的循環

    2.2接收到其他節點投票后,判斷投票通知來源節點的狀態

        2.2.1投票來源節點為LOOKING狀態,比較投票通知和本地投票信息緩存。比較投票周期版本號、決議zxid、zk節點的序號,判斷哪個更有效。

            2.2.1.2比較后重新計算本機當前的投票信息,包括推選的leader id、推選leader的決議zxid、leader選舉周期版本號,并發送給集群其他節點

            2.2.1.3判斷本機最新的投票信息,在本機接收到的所有投票通知緩存中的數量,判斷是否超過集群總數的一半

                2.2.1.3.1未超過集群總數的一半,重新執行步驟2的循環

                2.2.1.3.2超過集群總數的一半,嘗試循環拉取投票隊列中剩余的其他票。

                    2.2.1.3.2.1如果拉取到了其他投票,過濾掉無效的票,重新執行步驟2的循環。

                    2.2.1.3.2.2如果沒有拉取到其他選票,設置當前節點的狀態,LEADING、 FOLLOWING或者OBSERVING,清除緩存,返回最終leader的結果。

        2.2.2投票來源節點為OBSERVING,此狀態的節點不能投票,丟棄此票,繼續執行步驟2的循環。

        2.2.3投票來源節點為FOLLOWING,說明在集群內部已經選出主節點了,此時放棄自己的投票,查詢outofelection中的其他節點的投票緩存,直接確定leader。設置當前節點狀態為FOLLOWING或者OBSERVING,返回選票結果。

        2.2.4投票來源節點為LEADING,說明在集群內部已經選出主節點了,此時放棄自己的投票,查詢outofelection中的其他節點的投票緩存,直接確定leader。設置當前節點狀態為FOLLOWING或者OBSERVING,返回選票結果。

    /**
     * 開始新一輪leader選舉。每當我們的服務節點狀態變更為LOOKING時,調用此方法,然后向所有其他對等方發送通知
     */
    public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }

        self.start_fle = Time.currentElapsedTime();
        try {
            /**
             * 當前領導人選舉的選票存儲在recvset這個map中。換句話說,選票Vote在recvset中
             * 必須滿足條件v.electionEpoch == logicalclock(判定這個選票,是否屬于這次選舉周期)。
             * 當前選舉參與節點使用recvset來推斷是否大多選舉參與節點投了贊成票。
             */
            Map<Long, Vote> recvset = new HashMap<Long, Vote>();


            /**
             * 歷次leader選舉的票數以及本次leader選舉的票數均會存儲在outofelection中。
             * 注意,處于LOOKING狀態的通知不會存儲在outofelection中。
             *
             * 如果當前節點參與選舉時,集群其他節點已經選舉出了leader,則根據這個map中的數據直接跟隨集群leader
             */
            Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = minNotificationInterval;

            synchronized (this) {
                //選舉的邏輯周期+1,表示發起新一屆選舉
                logicalclock.incrementAndGet();
                //更新投票信息,第一個參數表示選誰(默認選自己),第二個參數表示當前本地日志中最新的決議zxid,第三個參數表示新的leader周期的epoch編號
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info(
                "New election. My id = {}, proposed zxid=0x{}",
                self.getId(),
                Long.toHexString(proposedZxid));
            //發送投票信息,廣播給其他節點
            sendNotifications();

            SyncedLearnerTracker voteSet = null;

            /*
             * 在循環中,我們和其他zk節點交換通知,直到找到一個leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {

                //從接收隊列中,拉取一個其他節點的投票通知,拉取超時時間為200ms
                Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);


                //200ms內沒有拉取到新的投票通知,則發送更多投票通知。
                //拉取到了則否則處理新投票通知。
                if (n == null) {
                    if (manager.haveDelivered()) {
                        sendNotifications();
                    } else {
                        manager.connectAll();
                    }

                    /*
                     * 指數級延長等待時間
                     */
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                    self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval);

                    if (self.getQuorumVerifier() instanceof QuorumOracleMaj && voteSet != null && voteSet.hasAllQuorums() && notTimeout != minNotificationInterval) {
                        setPeerState(proposedLeader, voteSet);
                        Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }

                    LOG.info("Notification time out: {} ms", notTimeout);

                } else if (validVoter(n.sid) && validVoter(n.leader)) {

                    //判斷這個投票的來源節點的狀態
                    switch (n.state) {
                        //處于選舉中的的狀態
                    case LOOKING:

                        //當前日志中最新的zxid為-1,屬于異常情況,直接退出
                        if (getInitLastLoggedZxid() == -1) {
                            LOG.debug("Ignoring notification as our zxid is -1");
                            break;
                        }

                        //投票通知中的zxid為-1,也屬于異常情況,直接退出
                        if (n.zxid == -1) {
                            LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                            break;
                        }
                        //判斷投票通知中的leader選舉周期版本號,是否大于本地選舉周期中的版本號
                        if (n.electionEpoch > logicalclock.get()) {
                            //本地leader選舉周期版本號不是最新的,更新成更新的leader選舉周期版本號
                            logicalclock.set(n.electionEpoch);
                            //清除空舊的選票map
                            recvset.clear();
                            //調用核心方法totalOrderPredicate
                            // 投票通知中的信息和本地初始化信息對比,判斷投票是否有效
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                //把接收到的投票通知中的信息,更新到本地投票信息中
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                //還是和之前一樣,使用本地的投票信息
                                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                            }
                            //廣播發送新的投票通知
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                            //接收的投票通知中的leader選舉周期版本號比本地的舊,則丟棄這個投票通知
                                LOG.debug(
                                    "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                    Long.toHexString(n.electionEpoch),
                                    Long.toHexString(logicalclock.get()));
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                            //還是調用totalOrderPredicate方法
                            // 用投票通知中的信息,和本地當前緩存中最新的投票信息進行比對,如果投票中的比較新,則進入這里
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        LOG.debug(
                            "Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
                            n.sid,
                            n.leader,
                            Long.toHexString(n.zxid),
                            Long.toHexString(n.electionEpoch));

                        //在選票map中存儲選票
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        //proposedLeader、proposedZxid、proposedEpoch三個參數,表示當前zk服務根據已經接收到的投票,確定出的本機投選的leader信息
                        //調用getVoteTracker方法,對本機投選的leader的選票進行統計
                        voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
                        //本機投選的leader是否獲取了半數以上的票
                        if (voteSet.hasAllQuorums()) {

                            // 拉取投票接收隊列中的剩余選票
                            while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                                //判斷選票是否更有效
                                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                    //如果有更有效的選票,則放回隊列
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /**
                             * 沒有拉取到新的投票通知了,那么表示所有服務都不改票了
                             */
                            if (n == null) {
                                //根據投票結果,設置當前節點的狀態,LEADING、 FOLLOWING或者OBSERVING
                                setPeerState(proposedLeader, voteSet);
                                //創建最終選舉結果的選票對象
                                Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                //清空接收隊列
                                leaveInstance(endVote);
                                //返回最終選舉結果
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: {}", n.sid);
                        break;

                       
                    case FOLLOWING:
                        //收到的投票通知中,投票來源的機器是FOLLOWING,說明在集群內部已經選出主節點了
                        //此時放棄自己的投票,查詢outofelection中的投票,直接確定leader
                        Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
                        if (resultFN == null) {
                            break;
                        } else {
                            return resultFN;
                        }
                    case LEADING:
                        //收到的投票通知中,投票來源的機器是LEADING,說明在集群內部已經選出主節點了
                        //此時放棄自己的投票,查詢outofelection中的投票,直接確定leader
                        Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
                        if (resultLN == null) {
                            break;
                        } else {
                            return resultLN;
                        }
                    default:
                        LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
                        break;
                    }
                } else {
                    if (!validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            try {
                if (self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
        }
    }

核心投票有效性抉擇邏輯

 選LEADER主流程中,多次調用FastLeaderElection#totalOrderPredicate進行了投票有效性的比較,該邏輯是選主有效性抉擇的核心邏輯。優先比較leader選舉周期版本號epoch,相等時比較決議zxid,都相等時直接使用leader機器id大的。

 /**
     * 檢查接收到的投票通知,是否為有效投票
     * @param newId 新投票中的leader節點id
     * @param newZxid 新投票中的決議id
     * @param newEpoch 新投票中的leader選舉周期版本號
     * @param curId 當前本機投票中的leader節點id
     * @param curZxid 當前本機投票中的決議id
     * @param curEpoch 當前本機投票中的leader選舉周期版本號
     * @return 新投票更有效時返回true,否則false
     */
    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug(
            "id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}",
            newId,
            curId,
            Long.toHexString(newZxid),
            Long.toHexString(curZxid));

        if (self.getQuorumVerifier().getWeight(newId) == 0) {
            return false;
        }

        /*
         *如果以下三種情況之一成立,則返回true:
         *1-投票中的leader選舉周期版本號大于本地的
         *2-leader選舉版本號相同,但投票中的決議號zxid更高
         *3-leader選舉版本號和決議號zxid都相同,則比較zk機器本身的機器編號,判斷投票中的機器編號是否大于當前機器編號
         */
        return ((newEpoch > curEpoch)
                || ((newEpoch == curEpoch)
                    && ((newZxid > curZxid)
                        || ((newZxid == curZxid)
                            && (newId > curId)))));
    }

關于如何對FastLeaderElection進行源碼解析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

东光县| 定边县| 罗江县| 鲁山县| 巧家县| 临夏县| 汤原县| 社旗县| 武功县| 玛曲县| 文化| 石首市| 巨鹿县| 昌宁县| 磴口县| 龙门县| 宝坻区| 聊城市| 大关县| 沙雅县| 芜湖县| 灵璧县| 宜昌市| 柳河县| 永泰县| 东辽县| 福建省| 土默特左旗| 宿州市| 富阳市| 浙江省| 调兵山市| 米林县| 防城港市| 舟山市| 根河市| 台江县| 石家庄市| 儋州市| 资讯| 玛多县|