中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行ZooKeeper中的客戶端創建連接過程分析

發布時間:2021-12-23 18:04:01 來源:億速云 閱讀:130 作者:柒染 欄目:云計算

這期內容當中小編將會給大家帶來有關如何進行ZooKeeper中的客戶端創建連接過程分析,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

客戶端API簡單使用

0.1 demo案例1

一個最簡單的demo如下:

public class ZookeeperConstructorSimple implements Watcher{

    private static CountDownLatch connectedSemaphone=new CountDownLatch(1);

    public static void main(String[] args) throws IOException {
        ZooKeeper zooKeeper=new ZooKeeper("127.0.0.1:2181",5000,new ZookeeperConstructorSimple());
        System.out.println(zooKeeper.getState());
        try {
            connectedSemaphone.await();
        } catch (Exception e) {}
        System.out.println("ZooKeeper session established");
        System.out.println("sessionId="+zooKeeper.getSessionId());
        System.out.println("password="+zooKeeper.getSessionPasswd());
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("my ZookeeperConstructorSimple watcher Receive watched event:"+event);
        if(KeeperState.SyncConnected==event.getState()){
            connectedSemaphone.countDown();
        }
    }

}

使用的maven依賴如下:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

對于目前來說,ZooKeeper的服務器端代碼和客戶端代碼還是混在一起的,估計日后能改吧。

使用的ZooKeeper的構造函數有三個參數構成

  • ZooKeeper集群的服務器地址列表

    該地址是可以填寫多個的,以逗號分隔。如"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",那客戶端連接的時候到底是使用哪一個呢?先隨機打亂,然后輪詢著用,后面再詳細介紹。

  • sessionTimeout

    最終會引出三個時間設置:和服務器端協商后的sessionTimeout、readTimeout、connectTimeout

    服務器端使用協商后的sessionTimeout:即超過該時間后,客戶端沒有向服務器端發送任何請求(正常情況下客戶端會每隔一段時間發送心跳請求,此時服務器端會從新計算客戶端的超時時間點的),則服務器端認為session超時,清理數據。此時客戶端的ZooKeeper對象就不再起作用了,需要再重新new一個新的對象了。

    客戶端使用connectTimeout、readTimeout分別用于檢測連接超時和讀取超時,一旦超時,則該客戶端認為該服務器不穩定,就會從新連接下一個服務器地址。

  • Watcher

    作為ZooKeeper對象一個默認的Watcher,用于接收一些事件通知。如和服務器連接成功的通知、斷開連接的通知、Session過期的通知等。

同時我們可以看到,一旦和ZooKeeper服務器連接建立成功,就會獲取服務器端分配的sessionId和password,如下:

sessionId=94249128002584594
password=[B@4de3aaf6

下面就通過源碼來詳細說明這個建立連接的過程。

1 客戶端的建立連接的過程

1.1 大體連接過程概述

首先與ZooKeeper服務器建立連接,有兩層連接要建立。

  • 客戶端與服務器端的TCP連接

  • 在TCP連接的基礎上建立session關聯

建立TCP連接之后,客戶端發送ConnectRequest請求,申請建立session關聯,此時服務器端會為該客戶端分配sessionId和密碼,同時開啟對該session是否超時的檢測。

當在sessionTimeout時間內,即還未超時,此時TCP連接斷開,服務器端仍然認為該sessionId處于存活狀態。此時,客戶端會選擇下一個ZooKeeper服務器地址進行TCP連接建立,TCP連接建立完成后,拿著之前的sessionId和密碼發送ConnectRequest請求,如果還未到該sessionId的超時時間,則表示自動重連成功,對客戶端用戶是透明的,一切都在背后默默執行,ZooKeeper對象是有效的。

如果重新建立TCP連接后,已經達到該sessionId的超時時間了(服務器端就會清理與該sessionId相關的數據),則返回給客戶端的sessionTimeout時間為0,sessionid為0,密碼為空字節數組。客戶端接收到該數據后,會判斷協商后的sessionTimeout時間是否小于等于0,如果小于等于0,則使用eventThread線程先發出一個KeeperState.Expired事件,通知相應的Watcher,然后結束EventThread線程的循環,開始走向結束。此時ZooKeeper對象就是無效的了,必須要重新new一個新的ZooKeeper對象,分配新的sessionId了。

1.2 ZooKeeper對象

它是面向用戶的,提供一些操作API。

它又兩個重要的屬性:

  • ClientCnxn cnxn:負責所有的ZooKeeper節點操作的執行

  • ZKWatchManager watchManager:負責維護某個path上注冊的Watcher

如創建某個node操作(同步方式):

ZooKeeper對象負責創建出Request,并交給ClientCnxn來執行,ZooKeeper對象再對返回結果進行處理。

如何進行ZooKeeper中的客戶端創建連接過程分析

同步方式提交一個請求后,開始循環判斷該請求包的狀態是否結束,即處于阻塞狀態,一旦結束則繼續往下走下去,返回結果。異步方式則提交一個請求后,直接返回,對結果的處理邏輯包含在回調函數中。一旦該對該請求包響應完畢,則取出回調函數執行相應的回調方法。

ZooKeeper對象主要封裝用戶的請求以及處理響應等操作。用戶請求的執行全部交給ClientCnxn來執行,那我們就詳細看下ClientCnxn的來源及大體內容。

先看看ClientCnxn是怎么來的:

如何進行ZooKeeper中的客戶端創建連接過程分析

  • 第一步:為ZKWatchManager watchManager設置一個默認的Watcher

  • 第二步:將連接字符串信息交給ConnectStringParser進行解析

    連接字符串比如: "192.168.12.1:2181,192.168.12.2:2181,192.168.12.3:2181/root"

  • 得到兩個數據String chrootPath默認的跟路徑和ArrayList<InetSocketAddress> serverAddresses即多個host和port信息。

  • 第三步:根據上述解析的host和port列表結果,創建一個HostProvider

    有了ConnectStringParser的解析結果,為什么還需要一個HostProvider再來包裝下呢?主要是為將來留下擴展的余地

    來看下HostProvider的詳細接口介紹:

    如何進行ZooKeeper中的客戶端創建連接過程分析

    HostProvider主要負責不斷的對外提供可用的ZooKeeper服務器地址,這些服務器地址可以是從一個url中加載得來或者其他途徑得來。同時對于不同的ZooKeeper客戶端,給出就近的ZooKeeper服務器地址等。

  • 有三個屬性,一個就是服務器地址列表(經過如下方式隨機打亂了):

    Collections.shuffle(this.serverAddresses)

    另外兩個屬性用于標記,下面來具體看下,StaticHostProvider是如何實現不斷的對外提供ZooKeeper服務器地址的:

    如何進行ZooKeeper中的客戶端創建連接過程分析

    代碼也很簡單,就是在打亂的服務器地址列表中,不斷地遍歷,到頭之后,在從0開始。

    上面的spinDelay是個什么情況呢?

    正常情況下,currentIndex先加1,然后返回currentIndex+1的地址,當該地址連接成功后會執行onConnected方法,即lastIndex = currentIndex了。然而當返回的currentIndex+1的地址連接不成功,繼續嘗試下一個,仍不成功,仍繼續下一個,就會遇到currentIndex=lastIndex的情況,此時即輪詢了一遍,仍然沒有一個地址能夠連接上,此時的策略就是先暫停休息休息,然后再繼續。

  • 第四步:為創建ClientCnxn準備參數并創建ClientCnxn。

    首先是通過getClientCnxnSocket()獲取一個ClientCnxnSocket。來看下ClientCnxnSocket是主要做什么工作的:

    A ClientCnxnSocket does the lower level communication with a socket implementation. This code has been moved out of ClientCnxn so that a Netty implementation can be provided as an alternative to the NIO socket code.

    專門用于負責socket通信的,把一些公共部分抽象出來,其他的留給不同的實現者來實現。如可以選擇默認的ClientCnxnSocketNIO,也可以使用netty等。


  • 首先獲取系統參數"zookeeper.clientCnxnSocket",如果沒有的話,使用默認的ClientCnxnSocketNIO,所以我們可以通過指定該參數來替換默認的實現。

    參數準備好了,ClientCnxn是如何來創建的呢?

    如何進行ZooKeeper中的客戶端創建連接過程分析

    首先就是保存一些對象參數,此時的sessionId和sessionPasswd都還沒有。然后就是兩個timeout參數:connectTimeout和readTimeout。在ClientCnxn的發送和接收數據的線程中,會不斷的檢測連接超時和讀取超時,一旦出現超時,就認為服務不穩定,需要更換服務器,就會從HostProvider中獲取下一個服務器地址進行連接。

    最后就是兩個線程,一個事件線程即EventThread,一個發送和接收socket數據的線程即SendThread。

    事件線程EventThread呢就是從一個事件隊列中不斷取出事件并進行處理

  • 看下具體的處理過程,主要分成兩種情況,一種就是我們注冊的watch事件,另一種就是處理異步回調函數:

    如何進行ZooKeeper中的客戶端創建連接過程分析

    可以看到這里就是觸發我們注冊Watch的,還有觸發上文提到的異步回調的情況的。

    明白了EventThread是如何來處理事件的,需要知道這些事件是如何來的:

  • 對外提供了三個方法來添加不同類型的事件,如SendThread線程就會調用這三個方法來添加事件。其中對于事件通知,會首先根據ZKWatchManager watchManager來獲取關心該事件的所有Watcher,然后觸發他們。

    再來看看SendThread的工作內容:

    sendThread = new SendThread(clientCnxnSocket); 把傳遞給ClientCnxn的clientCnxnSocket,再傳遞給SendThread,讓它服務于SendThread。

    在SendThread的run方法中,有一個while循環,不斷的做著以下幾件事:

    • 任務1:不斷檢測clientCnxnSocket是否和服務器處于連接狀態,如果是未連接狀態,則從hostProvider中取出一個服務器地址,使用clientCnxnSocket進行連接。

      如何進行ZooKeeper中的客戶端創建連接過程分析

      和服務器建立連接成功后,開始發送ConnectRequest請求,把該請求放到outgoingQueue請求隊列中,等待被發送給服務器


    • 任務2:檢測是否超時:當處于連接狀態時,檢測是否讀超時,當處于未連接狀態時,檢測是否連接超時

      如何進行ZooKeeper中的客戶端創建連接過程分析

      一旦超時,則拋出SessionTimeoutException,然后看下是如何處理呢?


    • 可以看到一旦發生超時異常或者其他異常,都會進行清理,并設置連接狀態為未連接,然后發送Disconnected事件。至此又會進入任務1的流程

    • 任務3:不斷的發送ping通知,服務器端每接收到ping請求,就會從當前時間重新計算session過期時間,所以當客戶端按照一定時間間隔不斷的發送ping請求,就能保證客戶端的session不會過期。發送時間間隔如下:

      如何進行ZooKeeper中的客戶端創建連接過程分析

      clientCnxnSocket.getIdleSend():是最后一次發送數據包的時間與當前時間的間隔。當readTimeout的時間已經過去一半多了,都沒有發送數據包的話,則執行一次Ping發送。或者過去MAX_SEND_PING_INTERVAL(10s)都還沒有發送數據包的話,則執行一次Ping發送。

    • ping發送的內容只有請求頭OpCode.ping的標示,其他都為空。發送ping請求,也是把該請求放到outgoingQueue發送隊列中,等待被執行。

    • 任務4:執行IO操作,即發送請求隊列中的請求和讀取服務器端的響應數據。

      如何進行ZooKeeper中的客戶端創建連接過程分析

      首先從outgoingQueue請求隊列中取出第一個請求,然后進行序列化,然后使用socket進行發送。

      讀取服務器端數據

    • 分為兩種:一種是讀取針對ConnectRequest請求的響應,另一種就是其他響應,先暫時不說。

      先來看看針對ConnectRequest請求的響應:

      如何進行ZooKeeper中的客戶端創建連接過程分析

      首先進行反序列化,得到ConnectResponse對象,我們就可以獲取到服務器端給我們客戶端分配的sessionId和passwd,以及協商后的sessionTimeOut時間。

    • 首選要根據協商后的sessionTimeout時間,重新計算readTimeout和connectTimeout值。然后保留和記錄sessionId和passwd。最后通過EventThread發送一個SyncConnected連接成功事件。至此,TCP連接和session初始化請求都完成了,客戶端的ZooKeeper對象可以正常使用了。

      至此,我們便了解客戶端與服務器端建立連接的過程。

4 服務器端處理連接的過程

服務器端情況分很多種,先暫時說最簡單的單機版。同時也不再給出服務器端的啟動過程(后面的文章再來詳細說明)。

首先介紹下服務器端的大體概況:

  • 首先是服務器端的配置文件,有tickTime、minSessionTimeout、maxSessionTimeout相關屬性。默認情況下,tickTime是3000ms,minSessionTimeout是2倍的tickTime,maxSessionTimeout是20倍的tickTime。

  • 服務器端默認采用NIOServerCnxnFactory來負責socket的處理。每來一個客戶端socket請求,為該客戶端創建一個NIOServerCnxn。之后與該客戶端的交互,就交給了NIOServerCnxn來處理。對于客戶端的ConnectRequest請求,處理如下:

    首先反序列化出ConnectRequest

    如何進行ZooKeeper中的客戶端創建連接過程分析

    然后開始協商sessionTimeout時間

即判斷用戶傳遞過來的sessionTimeout時間是否在minSessionTimeout、maxSessionTimeout之間。協商完成之后,根據用戶傳遞過來的sessionId是否是0進行不同的處理。客戶端第一次請求,sessionId為0。當客戶端已經連接過一個服務器地址,分配了sessionId,然后如果發生超時等異常,客戶端會去拿著已經分配的sessionId去連接下一個服務器地址,此時的sessionId不為0。

sessionId為0,則代表著要創建session。sessionId不為0,則需要對該sessionId進行合法性檢查,以及是否已經過期了的檢查。

我們先來看看sessionId為0的情況:

![創建session](https://static.oschina.net/uploads/img/201508/01065436_4nHs.png "創建session")

大體上分三大步:1、使用sessionTracker根據sessionTimeout時間創建一個新的session 2、根據sessionId創建出密碼
3、提交這個創建session的請求到請求處理器鏈,最終將sessionId和密碼返回給客戶端

下面來分別詳細的說明這三個過程:

4.1 使用sessionTracker創建session

SessionTracker是用來創建刪除session,執行session的過期檢查的。

直接看下默認使用的SessionTrackerImpl:

如何進行ZooKeeper中的客戶端創建連接過程分析

先看下session有哪些屬性:

  • final long sessionId:session的唯一標示

  • final int timeout:這個session的timeout時間(即上文中客戶端和服務器端商定下來的timeout時間)

  • long tickTime:這個session的下一次超時時間點(隨著客戶端不斷的發送PING請求,就會不斷的刷新該時間,不斷的往后變化)

  • boolean isClosing:session的標示符,用于標示session是否還被正常使用

  • Object owner:創建該session的owner。會在客戶端更換所連接的服務器的時候用到(之后詳細說明)

然后再來看看SessionTracker的幾個數據:

  • HashMap<Long, SessionImpl> sessionsById:很簡單,以sessionId存儲session

  • ConcurrentHashMap<Long, Integer> sessionsWithTimeout:以sessionId存儲每個session的timeout時間

  • HashMap<Long, SessionSet> sessionSets:某個時間點上的session集合(用于session過期檢查)

  • long nextSessionId:初始的sessionId,之后創建的sessionId就在此基礎上自增

  • nextExpirationTime:下一次過期時間點,每當到該時間點就會進行一次session的過期檢查

  • expirationInterval:session過期檢查的周期

要搞清楚的內容有:1 創建session的過程 2 session過期檢查的過程

先來看看創建session的過程:代碼很簡單,就是創建一個SessionImpl對象,然后存儲到SessionTracker中,同時開始計算session的超時時間。這里有一個內容就是sessionId的來歷,我們可以看到就是根據nextSessionId來的,并且是不斷自增的。

sessionId是一個客戶端的重要標示,是全局唯一的,先來看看單機版的nextSessionId初始化:

如何進行ZooKeeper中的客戶端創建連接過程分析

如何進行ZooKeeper中的客戶端創建連接過程分析

單機版的服務器使用1通過計算來初始化nextSessionId。而集群版對應的id則分別是每個機器指定的sid。

  • 第一步:就是取當前時間,為 10100111011100110110010101110100111100011 為41為二進制

  • 第二步:long有64位,左移24位,其實是除掉了前面的1,后面補了24位的0。

  • 第三步:第二步的結果可能是正數也可能是負數,目前是正數,之后可能就是負數了,你可以算一下需要多少年,哈哈。為了保證右移的時候,進行補0操作,需要使用無符號右移,即>>>。這里使用了無符號右移8位

  • 第四步:將傳過來的id這里即1左移56位。然后再與第三步的正數結果進行或操作,得到最終的基準nextSessionId,所以當這里的id值不是很大的話,一般幾臺機器而已,也保證了sessionId是一個正數,同時前八位就是機器的sid號。所以每臺機器的的前八位是不同的,保證了每臺機器中不會配置相同的sessionId,每臺機器的sessionId又是自增操作,所以單臺機器內sessionId也是不會重復的。

綜上所示保證了sessionId是唯一的,不會出現重復分配的情況。

搞清楚了sessionId的分配,接下來就要弄清楚如何進行session的過期檢查問題:

我們先看下,session激活過程是怎么處理的:

如何進行ZooKeeper中的客戶端創建連接過程分析

  • 首先獲取這個session數據,然后計算它的超期時間

    long expireTime = roundToInterval(System.currentTimeMillis() + timeout);

    private long roundToInterval(long time) {

       // We give a one interval grace period
       return (time / expirationInterval + 1) * expirationInterval;


    }

    即是拿當前時間加上這個session的timeout時間,然后對其進行取expirationInterval的整,即始終保持是expirationInterval的正數倍,即每個session的過期時間點最終都會落在expirationInterval的整數倍上。

  • 如果原本該session的超期時間就大于你所計算出的超期時間,則不做任何處理,否則設置該session的超期時間為上述計算結果的超期時間。

  • 取出原本該session所在的超期時間,從集合里面刪除

  • 重新獲取現在超期時間所在的集合,添加進去

綜上所述,session的激活其實就是重新計算下超時時間,最終取expirationInterval的正數倍,然后從之前時間點的集合中移除,然后再添加到新的時間點的集合中去。

至此,session的檢查就方便多了,只需要在expirationInterval整數時間點上取出集合,然后一個個標記為過期即可。而那些不斷被激活的session,則不斷的從一個時間點的集合中換到下一個時間點的集合中。

SessionTrackerImpl也是一個線程,該線程執行內容就是session的過期檢查。

4.2 根據sessionId創建出密碼

回到創建session的三大步驟:

如何進行ZooKeeper中的客戶端創建連接過程分析

來看下密碼是如何來產生的:

Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);

其中superSecret為常量

static final private long superSecret = 0XB3415C00L;

使用Random的方式來隨機生成字節數組。但是該字節數組,只要參數即sessionId相同,字節數組的內容就相同。即當我們知道了sessionId,就可以利用上述方式算出對應的密碼,我感覺密碼基本上沒什么用。

再看下當客戶端帶著sessionId和密碼進行連接的時候,這時會進行密碼的檢查

看了上面的代碼,就再次驗證了密碼沒什么鳥用,知道了sessionId,就完全知道了密碼。所以這一塊有待改進吧,應該不能由sessionId完全決定吧,如再加上當前時間等等,讓客戶端造不出來密碼,同時服務器端存儲加密后的密碼。

4.2 提交這個創建session的請求到請求處理器鏈

本文內容已太多,這里就先簡單描述下,之后再詳細的講解

如何進行ZooKeeper中的客戶端創建連接過程分析

如果是成功創建session,則把sessionTimeout、sessionId、passwd傳遞給客戶端。如果沒有成功創建,上述三者的值分別是0,0,new byte[16]

之后客戶端處理該響應的過程,上面已經說了,可以回頭再看下。

上述就是小編為大家分享的如何進行ZooKeeper中的客戶端創建連接過程分析了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

卢龙县| 德江县| 报价| 太仓市| 平度市| 日土县| 株洲市| 曲阜市| 临邑县| 恭城| 黄梅县| 新竹市| 赤水市| 襄垣县| 弥渡县| 泗洪县| 东平县| 清新县| 石林| 渭源县| 通道| 肥乡县| 新和县| 盐城市| 七台河市| 乾安县| 绥宁县| 长汀县| 托克托县| 武夷山市| 阳城县| 墨玉县| 莱州市| 平和县| 左权县| 亚东县| 建湖县| 平陆县| 盐源县| 乌拉特后旗| 永寿县|