中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

zk中learner的作用是什么

發布時間:2021-06-28 17:39:32 來源:億速云 閱讀:203 作者:chen 欄目:大數據

這篇文章主要介紹“zk中learner的作用是什么”,在日常操作中,相信很多人在zk中learner的作用是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”zk中learner的作用是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

learner時observer,follower的父類,定義了公共屬性和方法

子類 Follower 和Observer

內部類:

PacketInFlight表示在提議中還沒有commit的消息

static class PacketInFlight { TxnHeader hdr; Record rec; }

屬性:

QuorumPeer

服務器節點

LearnerZooKeeperServer

learner的服務節點

BufferedOutputStream

輸出流

Socket

端口套接字

InetSocketAddress

地址信息

InputArchive

輸入存檔

OutputArchive

輸出存檔

leaderProtocolVersion

leader協議版本

BUFFERED_MESSAGE_SIZE

緩存信息大小

MessageTracker

順序接收和發送信息

方法

validateSession(ServerCnxn cnxn, long clientId, int timeout)

驗證session有效性

writePacket(QuorumPacket pp, boolean flush)

發送包給leader

readPacket(QuorumPacket pp)

從leader讀取message

request(Request request)

發送request給leader

findLeader

查找認為是leader的地址信息

createSocket()

創建socket對象

registerWithLeader(int pktType)

執行handshake protocal建立follower/observer連接

到服務器驗證session有效性

void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException {
    LOG.info("Revalidating client: 0x" + Long.toHexString(clientId));
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream dos = new DataOutputStream(baos);
    dos.writeLong(clientId);
    dos.writeInt(timeout);
    dos.close();
    QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null);
    pendingRevalidations.put(clientId, cnxn);
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(
            LOG,
            ZooTrace.SESSION_TRACE_MASK,
            "To validate session 0x" + Long.toHexString(clientId));
    }
    writePacket(qp, true);
}

void writePacket(QuorumPacket pp, boolean flush) throws IOException {
    synchronized (leaderOs) {
        if (pp != null) {
            messageTracker.trackSent(pp.getType());
            leaderOs.writeRecord(pp, "packet");
        }
        if (flush) {
            bufferedOutput.flush();
        }
    }
}


void request(Request request) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream oa = new DataOutputStream(baos);
    oa.writeLong(request.sessionId);
    oa.writeInt(request.cxid);
    oa.writeInt(request.type);
    if (request.request != null) {
        request.request.rewind();
        int len = request.request.remaining();
        byte[] b = new byte[len];
        request.request.get(b);
        request.request.rewind();
        oa.write(b);
    }
    oa.close();
    QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
    writePacket(qp, true);
}

查找當前的leader信息
protected QuorumServer findLeader() {
    QuorumServer leaderServer = null;
    // Find the leader by id
    Vote current = self.getCurrentVote();
    for (QuorumServer s : self.getView().values()) {
        if (s.id == current.getId()) {
            // Ensure we have the leader's correct IP address before
            // attempting to connect.
            s.recreateSocketAddresses();
            leaderServer = s;
            break;
        }
    }
    if (leaderServer == null) {
        LOG.warn("Couldn't find the leader with id = " + current.getId());
    }
    return leaderServer;
}



連接套接字
sockConnect(Socket sock, InetSocketAddress addr, int timeout) 


建立和leader的連接
/**
 * Establish a connection with the LearnerMaster found by findLearnerMaster.
 * Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
 * Retries until either initLimit time has elapsed or 5 tries have happened.
 * @param addr - the address of the Peer to connect to.
 * @throws IOException - if the socket connection fails on the 5th attempt
 * if there is an authentication failure while connecting to leader
 * @throws X509Exception
 * @throws InterruptedException
 */
protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {
    this.sock = createSocket();
    this.leaderAddr = addr;

    // leader connection timeout defaults to tickTime * initLimit
    int connectTimeout = self.tickTime * self.initLimit;

    // but if connectToLearnerMasterLimit is specified, use that value to calculate
    // timeout instead of using the initLimit value
    if (self.connectToLearnerMasterLimit > 0) {
        connectTimeout = self.tickTime * self.connectToLearnerMasterLimit;
    }

    int remainingTimeout;
    long startNanoTime = nanoTime();

    for (int tries = 0; tries < 5; tries++) {
        try {
            // recalculate the init limit time because retries sleep for 1000 milliseconds
            remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000);
            if (remainingTimeout <= 0) {
                LOG.error("connectToLeader exceeded on retries.");
                throw new IOException("connectToLeader exceeded on retries.");
            }

            sockConnect(sock, addr, Math.min(connectTimeout, remainingTimeout));
            if (self.isSslQuorum()) {
                //開始握手
                ((SSLSocket) sock).startHandshake();
            }
            sock.setTcpNoDelay(nodelay);
            break;
        } catch (IOException e) {
            //出現異常
            remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000);
            //剩余超時時間
            if (remainingTimeout <= 1000) {
                //打印錯誤日志
                LOG.error("Unexpected exception, connectToLeader exceeded. tries=" + tries
                          + ", remaining init limit=" + remainingTimeout
                          + ", connecting to " + addr, e);
                throw e;

                //嘗試次數大于4
            } else if (tries >= 4) {
                //打印錯誤日志
                LOG.error("Unexpected exception, retries exceeded. tries=" + tries
                          + ", remaining init limit=" + remainingTimeout
                          + ", connecting to " + addr, e);
                throw e;
            } else {
                //發出警告
                LOG.warn("Unexpected exception, tries=" + tries
                         + ", remaining init limit=" + remainingTimeout
                         + ", connecting to " + addr, e);
                //重新嘗試建立socket連接
                this.sock = createSocket();
            }
        }
        //讀取配置延時時間,默認100ns
        Thread.sleep(leaderConnectDelayDuringRetryMs);
    }

    self.authLearner.authenticate(sock, hostname);

    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}

到此,關于“zk中learner的作用是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

zk
AI

科技| 丰城市| 北海市| 新巴尔虎左旗| 涪陵区| 双桥区| 辽宁省| 马山县| 当阳市| 宁蒗| 兰溪市| 桂林市| 林芝县| 察哈| 普陀区| 五莲县| 宜州市| 新化县| 南川市| 甘孜县| 沾益县| 富源县| 永善县| 卫辉市| 运城市| 定西市| 吉林省| 嘉鱼县| 沙田区| 楚雄市| 长丰县| 铁岭市| 河南省| 保定市| 汤原县| 三穗县| 汾阳市| 江门市| 蓝山县| 会宁县| 勃利县|