您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關如何進行ZooKeeper中的客戶端創建連接過程分析,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
一個最簡單的demo如下:
public class ZookeeperConstructorSimple implements Watcher{ private static CountDownLatch connectedSemaphone=new CountDownLatch(1); public static void main(String[] args) throws IOException { ZooKeeper zooKeeper=new ZooKeeper("127.0.0.1:2181",5000,new ZookeeperConstructorSimple()); System.out.println(zooKeeper.getState()); try { connectedSemaphone.await(); } catch (Exception e) {} System.out.println("ZooKeeper session established"); System.out.println("sessionId="+zooKeeper.getSessionId()); System.out.println("password="+zooKeeper.getSessionPasswd()); } @Override public void process(WatchedEvent event) { System.out.println("my ZookeeperConstructorSimple watcher Receive watched event:"+event); if(KeeperState.SyncConnected==event.getState()){ connectedSemaphone.countDown(); } } }
使用的maven依賴如下:
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency>
對于目前來說,ZooKeeper的服務器端代碼和客戶端代碼還是混在一起的,估計日后能改吧。
使用的ZooKeeper的構造函數有三個參數構成
ZooKeeper集群的服務器地址列表
該地址是可以填寫多個的,以逗號分隔。如"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183",那客戶端連接的時候到底是使用哪一個呢?先隨機打亂,然后輪詢著用,后面再詳細介紹。
sessionTimeout
最終會引出三個時間設置:和服務器端協商后的sessionTimeout、readTimeout、connectTimeout
服務器端使用協商后的sessionTimeout:即超過該時間后,客戶端沒有向服務器端發送任何請求(正常情況下客戶端會每隔一段時間發送心跳請求,此時服務器端會從新計算客戶端的超時時間點的),則服務器端認為session超時,清理數據。此時客戶端的ZooKeeper對象就不再起作用了,需要再重新new一個新的對象了。
客戶端使用connectTimeout、readTimeout分別用于檢測連接超時和讀取超時,一旦超時,則該客戶端認為該服務器不穩定,就會從新連接下一個服務器地址。
Watcher
作為ZooKeeper對象一個默認的Watcher,用于接收一些事件通知。如和服務器連接成功的通知、斷開連接的通知、Session過期的通知等。
同時我們可以看到,一旦和ZooKeeper服務器連接建立成功,就會獲取服務器端分配的sessionId和password,如下:
sessionId=94249128002584594 password=[B@4de3aaf6
下面就通過源碼來詳細說明這個建立連接的過程。
首先與ZooKeeper服務器建立連接,有兩層連接要建立。
客戶端與服務器端的TCP連接
在TCP連接的基礎上建立session關聯
建立TCP連接之后,客戶端發送ConnectRequest請求,申請建立session關聯,此時服務器端會為該客戶端分配sessionId和密碼,同時開啟對該session是否超時的檢測。
當在sessionTimeout時間內,即還未超時,此時TCP連接斷開,服務器端仍然認為該sessionId處于存活狀態。此時,客戶端會選擇下一個ZooKeeper服務器地址進行TCP連接建立,TCP連接建立完成后,拿著之前的sessionId和密碼發送ConnectRequest請求,如果還未到該sessionId的超時時間,則表示自動重連成功,對客戶端用戶是透明的,一切都在背后默默執行,ZooKeeper對象是有效的。
如果重新建立TCP連接后,已經達到該sessionId的超時時間了(服務器端就會清理與該sessionId相關的數據),則返回給客戶端的sessionTimeout時間為0,sessionid為0,密碼為空字節數組。客戶端接收到該數據后,會判斷協商后的sessionTimeout時間是否小于等于0,如果小于等于0,則使用eventThread線程先發出一個KeeperState.Expired事件,通知相應的Watcher,然后結束EventThread線程的循環,開始走向結束。此時ZooKeeper對象就是無效的了,必須要重新new一個新的ZooKeeper對象,分配新的sessionId了。
它是面向用戶的,提供一些操作API。
它又兩個重要的屬性:
ClientCnxn cnxn:負責所有的ZooKeeper節點操作的執行
ZKWatchManager watchManager:負責維護某個path上注冊的Watcher
如創建某個node操作(同步方式):
ZooKeeper對象負責創建出Request,并交給ClientCnxn來執行,ZooKeeper對象再對返回結果進行處理。
同步方式提交一個請求后,開始循環判斷該請求包的狀態是否結束,即處于阻塞狀態,一旦結束則繼續往下走下去,返回結果。異步方式則提交一個請求后,直接返回,對結果的處理邏輯包含在回調函數中。一旦該對該請求包響應完畢,則取出回調函數執行相應的回調方法。
ZooKeeper對象主要封裝用戶的請求以及處理響應等操作。用戶請求的執行全部交給ClientCnxn來執行,那我們就詳細看下ClientCnxn的來源及大體內容。
先看看ClientCnxn是怎么來的:
第一步:為ZKWatchManager watchManager設置一個默認的Watcher
第二步:將連接字符串信息交給ConnectStringParser進行解析
連接字符串比如: "192.168.12.1:2181,192.168.12.2:2181,192.168.12.3:2181/root"
得到兩個數據String chrootPath默認的跟路徑和ArrayList<InetSocketAddress> serverAddresses即多個host和port信息。
第三步:根據上述解析的host和port列表結果,創建一個HostProvider
有了ConnectStringParser的解析結果,為什么還需要一個HostProvider再來包裝下呢?主要是為將來留下擴展的余地
來看下HostProvider的詳細接口介紹:
HostProvider主要負責不斷的對外提供可用的ZooKeeper服務器地址,這些服務器地址可以是從一個url中加載得來或者其他途徑得來。同時對于不同的ZooKeeper客戶端,給出就近的ZooKeeper服務器地址等。
有三個屬性,一個就是服務器地址列表(經過如下方式隨機打亂了):
Collections.shuffle(this.serverAddresses)
另外兩個屬性用于標記,下面來具體看下,StaticHostProvider是如何實現不斷的對外提供ZooKeeper服務器地址的:
代碼也很簡單,就是在打亂的服務器地址列表中,不斷地遍歷,到頭之后,在從0開始。
上面的spinDelay是個什么情況呢?
正常情況下,currentIndex先加1,然后返回currentIndex+1的地址,當該地址連接成功后會執行onConnected方法,即lastIndex = currentIndex了。然而當返回的currentIndex+1的地址連接不成功,繼續嘗試下一個,仍不成功,仍繼續下一個,就會遇到currentIndex=lastIndex的情況,此時即輪詢了一遍,仍然沒有一個地址能夠連接上,此時的策略就是先暫停休息休息,然后再繼續。
第四步:為創建ClientCnxn準備參數并創建ClientCnxn。
首先是通過getClientCnxnSocket()獲取一個ClientCnxnSocket。來看下ClientCnxnSocket是主要做什么工作的:
A ClientCnxnSocket does the lower level communication with a socket implementation. This code has been moved out of ClientCnxn so that a Netty implementation can be provided as an alternative to the NIO socket code.
專門用于負責socket通信的,把一些公共部分抽象出來,其他的留給不同的實現者來實現。如可以選擇默認的ClientCnxnSocketNIO,也可以使用netty等。
首先獲取系統參數"zookeeper.clientCnxnSocket",如果沒有的話,使用默認的ClientCnxnSocketNIO,所以我們可以通過指定該參數來替換默認的實現。
參數準備好了,ClientCnxn是如何來創建的呢?
首先就是保存一些對象參數,此時的sessionId和sessionPasswd都還沒有。然后就是兩個timeout參數:connectTimeout和readTimeout。在ClientCnxn的發送和接收數據的線程中,會不斷的檢測連接超時和讀取超時,一旦出現超時,就認為服務不穩定,需要更換服務器,就會從HostProvider中獲取下一個服務器地址進行連接。
最后就是兩個線程,一個事件線程即EventThread,一個發送和接收socket數據的線程即SendThread。
事件線程EventThread呢就是從一個事件隊列中不斷取出事件并進行處理
看下具體的處理過程,主要分成兩種情況,一種就是我們注冊的watch事件,另一種就是處理異步回調函數:
可以看到這里就是觸發我們注冊Watch的,還有觸發上文提到的異步回調的情況的。
明白了EventThread是如何來處理事件的,需要知道這些事件是如何來的:
對外提供了三個方法來添加不同類型的事件,如SendThread線程就會調用這三個方法來添加事件。其中對于事件通知,會首先根據ZKWatchManager watchManager來獲取關心該事件的所有Watcher,然后觸發他們。
再來看看SendThread的工作內容:
sendThread = new SendThread(clientCnxnSocket); 把傳遞給ClientCnxn的clientCnxnSocket,再傳遞給SendThread,讓它服務于SendThread。
在SendThread的run方法中,有一個while循環,不斷的做著以下幾件事:
任務1:不斷檢測clientCnxnSocket是否和服務器處于連接狀態,如果是未連接狀態,則從hostProvider中取出一個服務器地址,使用clientCnxnSocket進行連接。
和服務器建立連接成功后,開始發送ConnectRequest請求,把該請求放到outgoingQueue請求隊列中,等待被發送給服務器
任務2:檢測是否超時:當處于連接狀態時,檢測是否讀超時,當處于未連接狀態時,檢測是否連接超時
一旦超時,則拋出SessionTimeoutException,然后看下是如何處理呢?
可以看到一旦發生超時異常或者其他異常,都會進行清理,并設置連接狀態為未連接,然后發送Disconnected事件。至此又會進入任務1的流程
任務3:不斷的發送ping通知,服務器端每接收到ping請求,就會從當前時間重新計算session過期時間,所以當客戶端按照一定時間間隔不斷的發送ping請求,就能保證客戶端的session不會過期。發送時間間隔如下:
clientCnxnSocket.getIdleSend():是最后一次發送數據包的時間與當前時間的間隔。當readTimeout的時間已經過去一半多了,都沒有發送數據包的話,則執行一次Ping發送。或者過去MAX_SEND_PING_INTERVAL(10s)都還沒有發送數據包的話,則執行一次Ping發送。
ping發送的內容只有請求頭OpCode.ping的標示,其他都為空。發送ping請求,也是把該請求放到outgoingQueue發送隊列中,等待被執行。
任務4:執行IO操作,即發送請求隊列中的請求和讀取服務器端的響應數據。
首先從outgoingQueue請求隊列中取出第一個請求,然后進行序列化,然后使用socket進行發送。
讀取服務器端數據
分為兩種:一種是讀取針對ConnectRequest請求的響應,另一種就是其他響應,先暫時不說。
先來看看針對ConnectRequest請求的響應:
首先進行反序列化,得到ConnectResponse對象,我們就可以獲取到服務器端給我們客戶端分配的sessionId和passwd,以及協商后的sessionTimeOut時間。
首選要根據協商后的sessionTimeout時間,重新計算readTimeout和connectTimeout值。然后保留和記錄sessionId和passwd。最后通過EventThread發送一個SyncConnected連接成功事件。至此,TCP連接和session初始化請求都完成了,客戶端的ZooKeeper對象可以正常使用了。
至此,我們便了解客戶端與服務器端建立連接的過程。
服務器端情況分很多種,先暫時說最簡單的單機版。同時也不再給出服務器端的啟動過程(后面的文章再來詳細說明)。
首先介紹下服務器端的大體概況:
首先是服務器端的配置文件,有tickTime、minSessionTimeout、maxSessionTimeout相關屬性。默認情況下,tickTime是3000ms,minSessionTimeout是2倍的tickTime,maxSessionTimeout是20倍的tickTime。
服務器端默認采用NIOServerCnxnFactory來負責socket的處理。每來一個客戶端socket請求,為該客戶端創建一個NIOServerCnxn。之后與該客戶端的交互,就交給了NIOServerCnxn來處理。對于客戶端的ConnectRequest請求,處理如下:
首先反序列化出ConnectRequest
然后開始協商sessionTimeout時間
即判斷用戶傳遞過來的sessionTimeout時間是否在minSessionTimeout、maxSessionTimeout之間。協商完成之后,根據用戶傳遞過來的sessionId是否是0進行不同的處理。客戶端第一次請求,sessionId為0。當客戶端已經連接過一個服務器地址,分配了sessionId,然后如果發生超時等異常,客戶端會去拿著已經分配的sessionId去連接下一個服務器地址,此時的sessionId不為0。 sessionId為0,則代表著要創建session。sessionId不為0,則需要對該sessionId進行合法性檢查,以及是否已經過期了的檢查。 我們先來看看sessionId為0的情況: ![創建session](https://static.oschina.net/uploads/img/201508/01065436_4nHs.png "創建session") 大體上分三大步:1、使用sessionTracker根據sessionTimeout時間創建一個新的session 2、根據sessionId創建出密碼 3、提交這個創建session的請求到請求處理器鏈,最終將sessionId和密碼返回給客戶端
下面來分別詳細的說明這三個過程:
SessionTracker是用來創建刪除session,執行session的過期檢查的。
直接看下默認使用的SessionTrackerImpl:
先看下session有哪些屬性:
final long sessionId:session的唯一標示
final int timeout:這個session的timeout時間(即上文中客戶端和服務器端商定下來的timeout時間)
long tickTime:這個session的下一次超時時間點(隨著客戶端不斷的發送PING請求,就會不斷的刷新該時間,不斷的往后變化)
boolean isClosing:session的標示符,用于標示session是否還被正常使用
Object owner:創建該session的owner。會在客戶端更換所連接的服務器的時候用到(之后詳細說明)
然后再來看看SessionTracker的幾個數據:
HashMap<Long, SessionImpl> sessionsById:很簡單,以sessionId存儲session
ConcurrentHashMap<Long, Integer> sessionsWithTimeout:以sessionId存儲每個session的timeout時間
HashMap<Long, SessionSet> sessionSets:某個時間點上的session集合(用于session過期檢查)
long nextSessionId:初始的sessionId,之后創建的sessionId就在此基礎上自增
nextExpirationTime:下一次過期時間點,每當到該時間點就會進行一次session的過期檢查
expirationInterval:session過期檢查的周期
要搞清楚的內容有:1 創建session的過程 2 session過期檢查的過程
先來看看創建session的過程:代碼很簡單,就是創建一個SessionImpl對象,然后存儲到SessionTracker中,同時開始計算session的超時時間。這里有一個內容就是sessionId的來歷,我們可以看到就是根據nextSessionId來的,并且是不斷自增的。
sessionId是一個客戶端的重要標示,是全局唯一的,先來看看單機版的nextSessionId初始化:
單機版的服務器使用1通過計算來初始化nextSessionId。而集群版對應的id則分別是每個機器指定的sid。
第一步:就是取當前時間,為 10100111011100110110010101110100111100011 為41為二進制
第二步:long有64位,左移24位,其實是除掉了前面的1,后面補了24位的0。
第三步:第二步的結果可能是正數也可能是負數,目前是正數,之后可能就是負數了,你可以算一下需要多少年,哈哈。為了保證右移的時候,進行補0操作,需要使用無符號右移,即>>>。這里使用了無符號右移8位
第四步:將傳過來的id這里即1左移56位。然后再與第三步的正數結果進行或操作,得到最終的基準nextSessionId,所以當這里的id值不是很大的話,一般幾臺機器而已,也保證了sessionId是一個正數,同時前八位就是機器的sid號。所以每臺機器的的前八位是不同的,保證了每臺機器中不會配置相同的sessionId,每臺機器的sessionId又是自增操作,所以單臺機器內sessionId也是不會重復的。
綜上所示保證了sessionId是唯一的,不會出現重復分配的情況。
搞清楚了sessionId的分配,接下來就要弄清楚如何進行session的過期檢查問題:
我們先看下,session激活過程是怎么處理的:
首先獲取這個session數據,然后計算它的超期時間
long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
private long roundToInterval(long time) {
// We give a one interval grace period return (time / expirationInterval + 1) * expirationInterval;
}
即是拿當前時間加上這個session的timeout時間,然后對其進行取expirationInterval的整,即始終保持是expirationInterval的正數倍,即每個session的過期時間點最終都會落在expirationInterval的整數倍上。
如果原本該session的超期時間就大于你所計算出的超期時間,則不做任何處理,否則設置該session的超期時間為上述計算結果的超期時間。
取出原本該session所在的超期時間,從集合里面刪除
重新獲取現在超期時間所在的集合,添加進去
綜上所述,session的激活其實就是重新計算下超時時間,最終取expirationInterval的正數倍,然后從之前時間點的集合中移除,然后再添加到新的時間點的集合中去。
至此,session的檢查就方便多了,只需要在expirationInterval整數時間點上取出集合,然后一個個標記為過期即可。而那些不斷被激活的session,則不斷的從一個時間點的集合中換到下一個時間點的集合中。
SessionTrackerImpl也是一個線程,該線程執行內容就是session的過期檢查。
回到創建session的三大步驟:
來看下密碼是如何來產生的:
Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd);
其中superSecret為常量
static final private long superSecret = 0XB3415C00L;
使用Random的方式來隨機生成字節數組。但是該字節數組,只要參數即sessionId相同,字節數組的內容就相同。即當我們知道了sessionId,就可以利用上述方式算出對應的密碼,我感覺密碼基本上沒什么用。
再看下當客戶端帶著sessionId和密碼進行連接的時候,這時會進行密碼的檢查
看了上面的代碼,就再次驗證了密碼沒什么鳥用,知道了sessionId,就完全知道了密碼。所以這一塊有待改進吧,應該不能由sessionId完全決定吧,如再加上當前時間等等,讓客戶端造不出來密碼,同時服務器端存儲加密后的密碼。
本文內容已太多,這里就先簡單描述下,之后再詳細的講解
如果是成功創建session,則把sessionTimeout、sessionId、passwd傳遞給客戶端。如果沒有成功創建,上述三者的值分別是0,0,new byte[16]
之后客戶端處理該響應的過程,上面已經說了,可以回頭再看下。
上述就是小編為大家分享的如何進行ZooKeeper中的客戶端創建連接過程分析了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。