您好,登錄后才能下訂單哦!
本篇文章為大家展示了如何理解tcpServer 中的IOLoop方法,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
今天我們就分析一下IOLoop
這個方法
廢話不多說,直接上代碼吧(代碼位于nsq/nsqlookupd/lookup_protocol_v1.go這個文件中)
//這段代碼位于nsq/nsqlookupd/client_v1.go這個文件中 type ClientV1 struct { net.Conn //組合net.Conn接口 peerInfo *PeerInfo //client的信息也就是前面所講的product的信息 } //初始化一個ClientV1 func NewClientV1(conn net.Conn) *ClientV1 { return &ClientV1{ Conn: conn, } } //實現String接口 func (c *ClientV1) String() string { return c.RemoteAddr().String() } //定義ClientV1結束 type LookupProtocolV1 struct { ctx *Context //一直貫穿整個代碼的Context,具體可翻看前面章節 } func (p *LookupProtocolV1) IOLoop(conn net.Conn) error { var err error var line string client := NewClientV1(conn) //新建一個client版本為V1 err = nil reader := bufio.NewReader(client) //由client 創建一個 帶有buffer 的Reader 默認 buffer size 為4096,這里的NewReader參數為io.Reader 接口,為何net.Conn接口也能作為參數呢?因為net.Conn 接口其實也是實現了io.Reader接口了,具體概念大家可翻看golang的教程 for { line, err = reader.ReadString('\n') //按行讀取 if err != nil { break } line = strings.TrimSpace(line) //去掉這行兩頭的空格符 params := strings.Split(line, " ") //字符串按一個空格字符串分割,并獲取相應的Commad 以及 該command 的相應的params response, err := p.Exec(client, reader, params) //執行相應的Command if err != nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } p.ctx.nsqlookupd.logf("ERROR: [%s] - %s%s", client, err, ctx) _, err = protocol.SendResponse(client, []byte(err.Error())) //返回錯誤給client if err != nil { break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } if response != nil { _, err = protocol.SendResponse(client, response) //返回命令處理結果給client if err != nil { break } } } //for 循環結束了 說明程序要退出了,調用RegistrationDB 中的 RemoveProducer從producer 的注冊數據中刪除 producer信息 //這里的RegistrationDB下章再具體講解 p.ctx.nsqlookupd.logf("CLIENT(%s): closing", client) if client.peerInfo != nil { registrations := p.ctx.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id) for _, r := range registrations { if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed { p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, r.Category, r.Key, r.SubKey) } } } return err } //這個方法就是執行相應的命令動作 有 PING IDENTIFY REGISTER UNREGISTER 這四個類型 func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { switch params[0] { case "PING": //用于client的心跳處理 return p.PING(client, params) case "IDENTIFY": //用于client端的信息注冊,執行PING REGISTER UNREGISTER 命令前必須先執行此命令 return p.IDENTIFY(client, reader, params[1:]) case "REGISTER": //用于client端注冊topic以及channel的命令 return p.REGISTER(client, reader, params[1:]) case "UNREGISTER": //執行與REGISTER命令相反的邏輯 return p.UNREGISTER(client, reader, params[1:]) } return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) } //INDENTIFY命令處理邏輯 //該命令用于注冊client的producer信息,并返回nsqlookupd的TCP 以及 HTTP 端口信息給client //大致的報文如下 // V1 INDENTIFY\n 注意了這里前面提過的V1前面是兩個空格字符 //123\n 這里是后面json數據(producer 信息的json數據的字節長度) //{....}\n 一串表示producer信息的json數據,具體的可參考 nsq/nsqlookupd/registration_db.go文件中的PeerInfo struct func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { var err error if client.peerInfo != nil { //如果有該client 的PeerInfo數據則返回錯誤 return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again") } var bodyLen int32 err = binary.Read(reader, binary.BigEndian, &bodyLen) //獲取producer PeerInfo json數據的字節長度 大端二進制格式 if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size") } body := make([]byte, bodyLen) //初始化一個producer PeerInfo json數據長度的bytes _, err = io.ReadFull(reader, body) //讀取所有的json數據 if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body") } // body is a json structure with producer information peerInfo := PeerInfo{id: client.RemoteAddr().String()} //實例化一個PeerInfo err = json.Unmarshal(body, &peerInfo) //解析producer PeerInfo json數據到peerInfo if err != nil { //json 數據解析失敗 返回錯誤 return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body") } peerInfo.RemoteAddress = client.RemoteAddr().String() //獲取PeerInfo remote address // require all fields //校驗獲取的PeerInfo 數據 if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.HTTPPort == 0 || peerInfo.Version == "" { return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields") } //將當前系統時間(納秒)更新到PeerInfo 中的lastUpdate中 用于 client的心跳判斷 atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano()) p.ctx.nsqlookupd.logf("CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s", client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version) client.peerInfo = &peerInfo //注冊producer PeerInfo 信息到 RegistrationDB中 其中Registration的 Category 為client Key 和 SubKey為空 if p.ctx.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) { p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "") } // build a response //將nsqlookupd的TCP端口,HTTP端口信息,版本信息,broadcast address信息,以及host name信息 已json數據格式返回給client data := make(map[string]interface{}) data["tcp_port"] = p.ctx.nsqlookupd.RealTCPAddr().Port data["http_port"] = p.ctx.nsqlookupd.RealHTTPAddr().Port data["version"] = version.Binary hostname, err := os.Hostname() if err != nil { log.Fatalf("ERROR: unable to get hostname %s", err) } data["broadcast_address"] = p.ctx.nsqlookupd.opts.BroadcastAddress data["hostname"] = hostname response, err := json.Marshal(data) if err != nil { p.ctx.nsqlookupd.logf("ERROR: marshaling %v", data) return []byte("OK"), nil } return response, nil } //獲取params中的topic 以及 channel func getTopicChan(command string, params []string) (string, string, error) { if len(params) == 0 { return "", "", protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("%s insufficient number of params", command)) } topicName := params[0] var channelName string if len(params) >= 2 { channelName = params[1] } //校驗是否是topic if !protocol.IsValidTopicName(topicName) { return "", "", protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("%s topic name '%s' is not valid", command, topicName)) } //校驗是否是channel if channelName != "" && !protocol.IsValidChannelName(channelName) { return "", "", protocol.NewFatalClientErr(nil, "E_BAD_CHANNEL", fmt.Sprintf("%s channel name '%s' is not valid", command, channelName)) } return topicName, channelName, nil } //REGISTER 命令 用于注冊client的topic 以及 channel信息 //一個topic下可以有多個channel //一個消費者訂閱的是一個topic 那么 生成者給這個topic 下的channel的信息 這個消費者也能接收得到這個信息,如果消費者訂閱的不是這個channel的信息,那么這個消費者則接受不到這個信息 //nsq topic 與channel的關系,大家可以多搜索下資料,我這里感覺講的也不太清晰,請大家諒解一下 //REGISTER 命令必須在INDENTIFY 之后才能調用 //具體協議報文如下 //REGISTER topic1\n 這個只創建一個名字為topic1的topic //或如下報文 //REGISTER topic1 channel1\n 這個只創建一個名字為topic1的topic 且topic1下面創建一個名字為channel1的channel func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { if client.peerInfo == nil { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") } //獲取REGISTER 命令時的topic 以及channel名字 topic, channel, err := getTopicChan("REGISTER", params) if err != nil { return nil, err } //如果有channel if channel != "" { //添加channel信息到RegistrationDB中其中Registration的 Category 為"channel"字符串,Key為topic,SubKey為channel key := Registration{"channel", topic, channel} if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "channel", topic, channel) } } //添加topic信息到RegistrationDB中其中Registration的 Category 為"topic"字符串,Key為topic,SubKey為空 key := Registration{"topic", topic, ""} if p.ctx.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { p.ctx.nsqlookupd.logf("DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "topic", topic, "") } //返回OK return []byte("OK"), nil } //UNREGISTER命令用于取消注冊topic 或取消注冊某個topic下的某一個channel //報文格式如下 //UNREGISTER topic1 channel1\n 這個報文表示取消注冊名字為topic1的topic下的名字為channel1的channel,這個名字 只取消注冊這個channel1,不取消注冊topic1下的其他channel 以及這個topic1本身 //或這個格式的報文 //UNREGISTER topic1\n 這個報文表示取消注冊名字為topic1的topic,這個時候topic1以及topic1下的所有channel都取消注冊了 func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { if client.peerInfo == nil { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") } topic, channel, err := getTopicChan("UNREGISTER", params) //獲取topic 以及channel 的名字 if err != nil { return nil, err } if channel != "" { //如果有channel 則只取消注冊這個topic下的這個channel key := Registration{"channel", topic, channel} removed, left := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id) if removed { p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, "channel", topic, channel) } // for ephemeral channels, remove the channel as well if it has no producers if left == 0 && strings.HasSuffix(channel, "#ephemeral") { p.ctx.nsqlookupd.DB.RemoveRegistration(key) } } else { // no channel was specified so this is a topic unregistration // remove all of the channel registrations... // normally this shouldn't happen which is why we print a warning message // if anything is actually removed //如果沒有channel 這個取消注冊這個topic 以及這個topic下的所有channel registrations := p.ctx.nsqlookupd.DB.FindRegistrations("channel", topic, "*") for _, r := range registrations { if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed { p.ctx.nsqlookupd.logf("WARNING: client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s", client, "channel", topic, r.SubKey) } } key := Registration{"topic", topic, ""} if removed, _ := p.ctx.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id); removed { p.ctx.nsqlookupd.logf("DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, "topic", topic, "") } } //返回OK return []byte("OK"), nil } //PING 用于client中的心跳處理命令 func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error) { if client.peerInfo != nil { // we could get a PING before other commands on the same client connection //獲取上一次心跳的時間 cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate)) //獲取當前時間 now := time.Now() //這里日志輸出兩次心跳之間的間隔時間 p.ctx.nsqlookupd.logf("CLIENT(%s): pinged (last ping %s)", client.peerInfo.id, now.Sub(cur)) //更新PeerInfo中的lastUpdate時間為當前時間 atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano()) } //返回OK return []byte("OK"), nil }
nsqlookupd
中的tcpServer
中的主要協議處理的IOLoop
這里基本講解完了。
上述內容就是如何理解tcpServer 中的IOLoop方法,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。