中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

zookeeper(14)源碼分析-服務器(1)

發布時間:2020-07-22 00:07:07 來源:網絡 閱讀:281 作者:shayang88 欄目:編程語言

ZooKeeperServer,為所有服務器的父類。
QuorumZooKeeperServer,其是所有參與選舉的服務器的父類,是抽象類,其繼承了ZooKeeperServer類。
LeaderZooKeeperServer,Leader服務器,繼承了QuorumZooKeeperServer類,也會繼承ZooKeeperServer中的很多方法。
LearnerZooKeeper,其是Learner服務器的父類,為抽象類,也繼承了QuorumZooKeeperServer類。
FollowerZooKeeperServer,Follower服務器,繼承了LearnerZooKeeper。
ObserverZooKeeperServer,Observer服務器,繼承了LearnerZooKeeper。
ReadOnlyZooKeeperServer,只讀服務器,不提供寫服務,繼承QuorumZooKeeperServer。

ZooKeeperServer

1、類的繼承關系

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {}

ZooKeeperServer是ZooKeeper中所有服務器的父類,其實現了Session.Expirer和ServerStats.Provider接口,SessionExpirer中定義了expire方法(表示會話過期)和getServerId方法(表示獲取服務器ID),而Provider則主要定義了獲取服務器某些數據的方法。

2、類屬性

protected static final Logger LOG;

    static {
        LOG = LoggerFactory.getLogger(ZooKeeperServer.class);

        Environment.logEnv("Server environment:", LOG);
    }
    //jmx服務
    protected ZooKeeperServerBean jmxServerBean;
    protected DataTreeBean jmxDataTreeBean;
    // 默認心跳頻率
    public static final int DEFAULT_TICK_TIME = 3000;
    protected int tickTime = DEFAULT_TICK_TIME;
    // 最小會話過期時間
    /** value of -1 indicates unset, use default */
    protected int minSessionTimeout = -1;
    // 最大會話過期時間
    /** value of -1 indicates unset, use default */
    protected int maxSessionTimeout = -1;
    protected SessionTracker sessionTracker;
    // 事務日志快照
    private FileTxnSnapLog txnLogFactory = null;
    // Zookeeper內存數據庫
    private ZKDatabase zkDb;
    private final AtomicLong hzxid = new AtomicLong(0);
    public final static Exception ok = new Exception("No prob");
    // 請求處理器
    protected RequestProcessor firstProcessor;
    protected volatile State state = State.INITIAL;

    protected enum State {
        INITIAL, RUNNING, SHUTDOWN, ERROR
    }

    /**
     * This is the secret that we use to generate passwords. For the moment,
     * it's more of a checksum that's used in reconnection, which carries no
     * security weight, and is treated internally as if it carries no
     * security weight.
     */
    static final private long superSecret = 0XB3415C00L;

    private final AtomicInteger requestsInProcess = new AtomicInteger(0);
    // 未處理的ChangeRecord
    final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
    // this data structure must be accessed under the outstandingChanges lock
    // 記錄path對應的ChangeRecord
    final HashMap<String, ChangeRecord> outstandingChangesForPath =
        new HashMap<String, ChangeRecord>();

    protected ServerCnxnFactory serverCnxnFactory;
    protected ServerCnxnFactory secureServerCnxnFactory;
    // 服務器統計數據
    private final ServerStats serverStats;
    private final ZooKeeperServerListener listener;
    private ZooKeeperServerShutdownHandler zkShutdownHandler;
    private volatile int createSessionTrackerServerId = 1;

3、核心函數

3.1 loadData

該函數用于加載數據,其首先會判斷內存庫是否已經加載設置zxid,之后會調用killSession函數刪除過期的會話

if(zkDb.isInitialized()){ // 內存數據庫已被初始化
            // 設置為最后處理的Zxid
            setZxid(zkDb.getDataTreeLastProcessedZxid());
        }
        else {
            // 未被初始化,則加載數據庫
            setZxid(zkDb.loadDataBase());
        }

        // Clean up dead sessions
        LinkedList<Long> deadSessions = new LinkedList<Long>();
        for (Long session : zkDb.getSessions()) {// 遍歷所有的會話
            if (zkDb.getSessionWithTimeOuts().get(session) == null) {
                deadSessions.add(session);
            }
        }

        for (long session : deadSessions) { // 刪除過期的會話
            // XXX: Is lastProcessedZxid really the best thing to use?
            killSession(session, zkDb.getDataTreeLastProcessedZxid());
        }

        // Make a clean snapshot
        //初始化一個快照
        takeSnapshot();

3.2、submitRequest

提交請求,處理器進行處理

public void submitRequest(Request si) {
        if (firstProcessor == null) {// 第一個處理器為空
            synchronized (this) {
                try {
                    // Since all requests are passed to the request
                    // processor it should wait for setting up the request
                    // processor chain. The state will be updated to RUNNING
                    // after the setup.
                    //服務器調用鏈還未初始化完成
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null || state != State.RUNNING) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);
            // 是否為合法的請求
            boolean validpacket = Request.isValid(si.type);
            if (validpacket) {
                //調用鏈第一處理器開始處理
                firstProcessor.proce***equest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type " + si.type);
                new UnimplementedRequestProcessor().proce***equest(si);
            }
        } catch (MissingSessionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping request: " + e.getMessage());
            }
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request:" + e.getMessage(), e);
        }
    }

LeaderZooKeeperServer介紹

1、類屬性

// 提交請求處理器
CommitProcessor commitProcessor;
//處理鏈請求第一個處理處理器
PrepRequestProcessor prepRequestProcessor;

2、構造方法

LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
        super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self);
    }

直接調用父類QuorumZooKeeperServer的構造函數,然后再調用ZooKeeperServer的構造函數,逐級構造。

3、核心函數

3.1、setupRequestProcessors

 @Override
    protected void setupRequestProcessors() {
        //創建FinalRequestProcessor,處理鏈最后一個處理器
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        //創建ToBeAppliedRequestProcessor
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        // 創建CommitProcessor,提交處理器
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        // 啟動CommitProcessor
        commitProcessor.start();
        // 創建ProposalRequestProcessor
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        // 初始化ProposalProcessor
        proposalProcessor.initialize();
        //創建PrepRequestProcessor,作為以第一個處理鏈處理器
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        // firstProcessor為PrepRequestProcessor
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

        setupContainerManager();
    }

setupRequestProcessors函數表示創建處理鏈,可以看到其處理鏈的順序為PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

荥经县| 石狮市| 嵊泗县| 万安县| 东宁县| 丰原市| 佛山市| 鄂州市| 昌吉市| 无为县| 木里| 新密市| 龙岩市| 深泽县| 即墨市| 定南县| 易门县| 宜都市| 前郭尔| 扎兰屯市| 玛多县| 乌兰察布市| 沙河市| 罗平县| 高密市| 大足县| 云林县| 靖边县| 永丰县| 得荣县| 屏东县| 嘉定区| 定西市| 秦皇岛市| 肇源县| 拉孜县| 阆中市| 普兰县| 全南县| 柳州市| 达尔|