您好,登錄后才能下訂單哦!
zookeeper是一個強一致【不嚴格】的分布式數據庫,由多個節點共同組成一個分布式集群,掛掉任意一個節點,數據庫仍然可以正常工作,客戶端無感知故障切換。客戶端向任意一個節點寫入數據,其它節點可以立即看到最新的數據。
zookeeper的內部是一個key/value存儲引擎,key是以樹狀的形式構成了一個多級的層次結構,每一個節點既可以存儲數據,又可以作為一個目錄存放下一級子節點。
zookeeper提供了創建/修改/刪除節點的api,如果父節點沒有創建,字節點會創建失敗。如果父節點還有子節點,父節點不可以被刪除。
zookeeper和客戶端之間以socket形式進行雙向通訊,客戶端可以主動調用服務器提供的api,服務器可以主動向客戶端推送事件。有多種事件可以watch,比如節點的增刪改,子節點的增刪改,會話狀態變更等。
zookeeper的事件有傳遞機制,字節點的增刪改觸發的事件會向上層依次傳播,所有的父節點都可以收到字節點的數據變更事件,所以層次太深/子節點太多會給服務器的事件系統帶來壓力,節點分配要做好周密的規劃。
zookeeper滿足了CAP定理的分區容忍性P和強一致性C,犧牲了高性能A【可用性蘊含性能】。zookeeper的存儲能力是有限的,當節點層次太深/子節點太多/節點數據太大,都會影響數據庫的穩定性。所以zookeeper不是一個用來做高并發高性能的數據庫,zookeeper一般只用來存儲配置信息。
zookeeper的讀性能隨著節點數量的提升能不斷增加,但是寫性能會隨著節點數量的增加而降低,所以節點的數量不宜太多,一般配置成3個或者5個就可以了。
圖中可以看出當服務器節點增多時,復雜度會隨之提升。因為每個節點和其它節點之間要進行p2p的連接。3個節點可以容忍掛掉1個節點,5個節點可以容忍掛掉2個節點。
客戶端連接zookeeper時會選擇任意一個節點保持長鏈接,后續通信都是通過這個節點進行讀寫的。如果該節點掛了,客戶端會嘗試去連接其它節點。
服務器會為每個客戶端連接維持一個會話對象,會話的ID會保存在客戶端。會話對象也是分布式的,意味著當一個節點掛掉了,客戶端使用原有的會話ID去連接其它節點,服務器維持的會話對象還繼續存在,并不需要重新創建一個新的會話。
如果客戶端主動發送會話關閉消息,服務器的會話對象會立即刪除。如果客戶端不小心奔潰了,沒有發送關閉消息,服務器的會話對象還會繼續存在一段時間。這個時間是會話的過期時間,在創建會話的時候客戶端會提供這個參數,一般是10到30秒。
也許你會問連接斷開了,服務器是可以感知到的,為什么需要客戶端主動發送關閉消息呢?
因為服務器要考慮網絡抖動的情況,連接可能只是臨時斷開了。為了避免這種情況下反復創建和銷毀復雜的會話對象以及創建會話后要進行的一系列事件初始化操作,服務器會盡量延長會話的生存時間。
zookeeper的節點可以是持久化(Persistent)的,也可以是臨時(Ephermeral)的。所謂臨時的節點就是會話關閉后,會話期間創建的所有臨時節點會立即消失。一般用于服務發現系統,將服務進程的生命期和zookeeper子節點的生命期綁定在一起,起到了實時監控服務進程的存活的效果。
zookeeper還提供了順序節點。類似于mysql里面的auto_increment屬性。服務器會在順序節點名稱后自動增加自增的唯一后綴,保持節點名稱的唯一性和順序性。
還有一種節點叫著保護(Protected)節點。這個節點非常特殊,但是也非常常用。在應用服務發現的場合時,客戶端創建了一個臨時節點后,服務器節點掛了,連接斷開了,然后客戶端去重連到其它的節點。因為會話沒有關閉,之前創建的臨時節點還存在,但是這個時候客戶端卻無法識別去這個臨時節點是不是自己創建的,因為節點內部并不存儲會話ID字段。所以客戶端會在節點名稱上加上一個GUID前綴,這個前綴會保存在客戶端,這樣它就可以在重連后識別出哪個臨時節點是自己之前創建的了。
接下來我們使用Go語言實現一下服務發現的注冊和發現功能。
如圖所示,我們要提供api.user這樣的服務,這個服務有3個節點,每個節點有不一樣的服務地址,這3個節點各自將自己的服務注冊進zk,然后消費者進行讀取zk得到api.user的服務地址,任選一個節點地址進行服務調用。為了簡單化,這里就沒有提供權重參數了。在一個正式的服務發現里一般都有權重參數,用于調整服務節點之間的流量分配。
go get github.com/samuel/go-zookeeper/zk
首先我們定義一個ServiceNode結構,這個結構數據會存儲在節點的data中,表示服務發現的地址信息。
type ServiceNode struct {
Name string `json:"name"` // 服務名稱,這里是user
Host string `json:"host"`
Port int `json:"port"`
}
在定義一個服務發現的客戶端結構體SdClient。
type SdClient struct {
zkServers []string // 多個節點地址
zkRoot string // 服務根節點,這里是/api
conn *zk.Conn // zk的客戶端連接
}
編寫構造器,創建根節點
func NewClient(zkServers []string, zkRoot string, timeout int) (*SdClient, error) {
client := new(SdClient)
client.zkServers = zkServers
client.zkRoot = zkRoot
// 連接服務器
conn, _, err := zk.Connect(zkServers, time.Duration(timeout)*time.Second)
if err != nil {
return nil, err
}
client.conn = conn
// 創建服務根節點
if err := client.ensureRoot(); err != nil {
client.Close()
return nil, err
}
return client, nil}// 關閉連接,釋放臨時節點func (s *SdClient) Close() {
s.conn.Close()
}
func (s *SdClient) ensureRoot() error {
exists, _, err := s.conn.Exists(s.zkRoot)
if err != nil {
return err
}
if !exists {
_, err := s.conn.Create(s.zkRoot, []byte(""), 0, zk.WorldACL(zk.PermAll))
if err != nil && err != zk.ErrNodeExists {
return err
}
}
return nil
}
值得注意的是代碼中的Create調用可能會返回節點已存在錯誤,這是正常現象,因為會存在多進程同時創建節點的可能。如果創建根節點出錯,還需要及時關閉連接。我們不關心節點的權限控制,所以使用zk.WorldACL(zk.PermAll)表示該節點沒有權限限制。Create參數中的flag=0表示這是一個持久化的普通節點。
接下來我們編寫服務注冊方法
func (s *SdClient) Register(node *ServiceNode) error {
if err := s.ensureName(node.Name); err != nil {
return err
}
path := s.zkRoot + "/" + node.Name + "/n"
data, err := json.Marshal(node)
if err != nil {
return err
}
_, err = s.conn.CreateProtectedEphemeralSequential(path, data, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
return nil}func (s *SdClient) ensureName(name string) error {
path := s.zkRoot + "/" + name
exists, _, err := s.conn.Exists(path)
if err != nil {
return err
}
if !exists {
_, err := s.conn.Create(path, []byte(""), 0, zk.WorldACL(zk.PermAll))
if err != nil && err != zk.ErrNodeExists {
return err
}
}
return nil
}
先要創建/api/user節點作為服務列表的父節點。然后創建一個保護順序臨時(ProtectedEphemeralSequential)子節點,同時將地址信息存儲在節點中。什么叫保護順序臨時節點,首先它是一個臨時節點,會話關閉后節點自動消失。其它它是個順序節點,zookeeper自動在名稱后面增加自增后綴,確保節點名稱的唯一性。同時還是個保護性節點,節點前綴增加了GUID字段,確保斷開重連后臨時節點可以和客戶端狀態對接上。
接下來我們實現消費者獲取服務列表方法
func (s *SdClient) GetNodes(name string) ([]*ServiceNode, error) {
path := s.zkRoot + "/" + name
// 獲取字節點名稱
childs, _, err := s.conn.Children(path)
if err != nil {
if err == zk.ErrNoNode {
return []*ServiceNode{}, nil
}
return nil, err
}
nodes := []*ServiceNode{}
for _, child := range childs {
fullPath := path + "/" + child
data, _, err := s.conn.Get(fullPath)
if err != nil {
if err == zk.ErrNoNode {
continue
}
return nil, err
}
node := new(ServiceNode)
err = json.Unmarshal(data, node)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}
return nodes, nil
}
獲取服務節點列表時,我們先獲取字節點的名稱列表,然后依次讀取內容拿到服務地址。因為獲取字節點名稱和獲取字節點內容不是一個原子操作,所以在調用Get獲取內容時可能會出現節點不存在錯誤,這是正常現象。
將以上代碼湊在一起,一個簡單的服務發現包裝就實現了。
最后我們看看如果使用以上代碼,為了方便起見,我們將多個服務提供者和消費者寫在一個main方法里。
func main() {
// 服務器地址列表
servers := []string{"192.168.0.101:2118", "192.168.0.102:2118", "192.168.0.103:2118"}
client, err := NewClient(servers, "/api", 10)
if err != nil {
panic(err)
}
defer client.Close()
node1 := &ServiceNode{"user", "127.0.0.1", 4000}
node2 := &ServiceNode{"user", "127.0.0.1", 4001}
node3 := &ServiceNode{"user", "127.0.0.1", 4002}
if err := client.Register(node1); err != nil {
panic(err)
}
if err := client.Register(node2); err != nil {
panic(err)
}
if err := client.Register(node3); err != nil {
panic(err)
}
nodes, err := client.GetNodes("user")
if err != nil {
panic(err)
}
for _, node := range nodes {
fmt.Println(node.Host, node.Port)
}
}
值得注意的是使用時一定要在進程退出前調用Close方法,否則zookeeper的會話不會立即關閉,服務器創建的臨時節點也就不會立即消失,而是要等到timeout之后服務器才會清理。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。