中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Go如何創建Grpc鏈接池

發布時間:2023-03-07 17:43:28 來源:億速云 閱讀:110 作者:iii 欄目:開發技術

這篇“Go如何創建Grpc鏈接池”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Go如何創建Grpc鏈接池”文章吧。

常規用法

gRPC 四種基本使用

  • 請求響應模式

  • 客戶端數據流模式

  • 服務端數據流模式

  • 雙向流模式

常見的gRPC調用寫法

func main(){
	//... some code
	// 鏈接grpc服務
	conn , err := grpc.Dial(":8000",grpc.WithInsecure)
	if err != nil {
		//...log
	}
	defer conn.Close()
	//...some code

存在的問題:面臨高并發的情況,性能問題很容易就會出現,例如我們在做性能測試的時候,就會發現,打一會性能測試,客戶端請求服務端的時候就會報錯:

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused

實際去查看問題的時候,很明顯,這是 gRPC 的連接數被打滿了,很多連接都還未完全釋放。[#本文來源:janrs.com#]

gRPC 的通信本質上也是 TCP 的連接,那么一次連接就需要三次握手,和四次揮手,每一次建立連接和釋放連接的時候,都需要走這么一個過程,如果我們頻繁的建立和釋放連接,這對于資源和性能其實都是一個大大的浪費。

在服務端,gRPC 服務端的鏈接管理不用我們操心,但是 gRPC 客戶端的鏈接管理非常有必要關心,要實現復用客戶端的連接。

創建鏈接池

創建鏈接池需要考慮的問題:

  • 連接池是否支持擴縮容

  • 空閑的連接是否支持超時自行關閉,是否支持保活

  • 池子滿的時候,處理的策略是什么樣的

創建鏈接池接口

type Pool interface {
	// 獲取一個新的連接 , 當關閉連接的時候,會將該連接放入到池子中
   Get() (Conn, error)
	// 關閉連接池,自然連接池子中的連接也不再可用
   Close() error
	//[#本文來源:janrs.com#]
   Status() string
}

實現鏈接池接口

創建鏈接池代碼

func New(address string, option Options) (Pool, error) {
   if address == "" {
      return nil, errors.New("invalid address settings")
   }
   if option.Dial == nil {
      return nil, errors.New("invalid dial settings")
   }
   if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive {
      return nil, errors.New("invalid maximum settings")
   }
   if option.MaxConcurrentStreams <= 0 {
      return nil, errors.New("invalid maximun settings")
   }
   p := &pool{
      index:   0,
      current: int32(option.MaxIdle),
      ref:     0,
      opt:     option,
      conns:   make([]*conn, option.MaxActive),
      address: address,
      closed:  0,
   }
   for i := 0; i < p.opt.MaxIdle; i++ {
      c, err := p.opt.Dial(address)
      if err != nil {
         p.Close()
         return nil, fmt.Errorf("dial is not able to fill the pool: %s", err)
      }
      p.conns[i] = p.wrapConn(c, false)
   }
   log.Printf("new pool success: %v\n", p.Status())
   return p, nil
}

關于以上的代碼,需要特別注意每一個連接的建立也是在 New 里面完成的,[#本文來源:janrs.com#]只要有 1 個連接未建立成功,那么咱們的連接池就算是建立失敗,咱們會調用 p.Close() 將之前建立好的連接全部釋放掉。

關閉鏈接池代碼

// 關閉連接池
func (p *pool) Close() error {
   atomic.StoreInt32(&p.closed, 1)
   atomic.StoreUint32(&p.index, 0)
   atomic.StoreInt32(&p.current, 0)
   atomic.StoreInt32(&p.ref, 0)
   p.deleteFrom(0)
   log.Printf("[janrs.com]close pool success: %v\n", p.Status())
   return nil
}

從具體位置刪除鏈接池代碼

// 清除從 指定位置開始到 MaxActive 之間的連接
func (p *pool) deleteFrom(begin int) {
   for i := begin; i < p.opt.MaxActive; i++ {
      p.reset(i)
   }
}

銷毀具體的鏈接代碼

// 清除具體的連接
func (p *pool) reset(index int) {
   conn := p.conns[index]
   if conn == nil {
      return
   }
   conn.reset()
   p.conns[index] = nil
}

關閉鏈接

代碼

func (c *conn) reset() error {
   cc := c.cc
   c.cc = nil
   c.once = false
   // 本文博客來源:janrs.com
   if cc != nil {
      return cc.Close()
   }
   return nil
}
func (c *conn) Close() error {
   c.pool.decrRef()
   if c.once {
      return c.reset()
   }
   return nil
}

在使用連接池通過 pool.Get() 拿到具體的連接句柄 conn 之后,會使用 conn.Close()關閉連接,實際上也是會走到上述的 Close() 實現的位置,但是并未指定當然也沒有權限顯示的指定將 once 置位為 false ,也就是對于調用者來說,是關閉了連接,對于連接池來說,實際上是將連接歸還到連接池中。

擴縮容

關鍵代碼

func (p *pool) Get() (Conn, error) {
   // the first selected from the created connections
   nextRef := p.incrRef()
   p.RLock()
   current := atomic.LoadInt32(&p.current)
   p.RUnlock()
   if current == 0 {
      return nil, ErrClosed
   }
   if nextRef <= current*int32(p.opt.MaxConcurrentStreams) {
      next := atomic.AddUint32(&p.index, 1) % uint32(current)
      return p.conns[next], nil
   }
   // 本文博客來源:janrs.com
   // the number connection of pool is reach to max active
   if current == int32(p.opt.MaxActive) {
      // the second if reuse is true, select from pool's connections
      if p.opt.Reuse {
         next := atomic.AddUint32(&p.index, 1) % uint32(current)
         return p.conns[next], nil
      }
      // the third create one-time connection
      c, err := p.opt.Dial(p.address)
      return p.wrapConn(c, true), err
   }
   // the fourth create new connections given back to pool
   p.Lock()
   current = atomic.LoadInt32(&p.current)
   if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) {
      // 2 times the incremental or the remain incremental  ##janrs.com
      increment := current
      if current+increment > int32(p.opt.MaxActive) {
         increment = int32(p.opt.MaxActive) - current
      }
      var i int32
      var err error
      for i = 0; i < increment; i++ {
         c, er := p.opt.Dial(p.address)
         if er != nil {
            err = er
            break
         }
         p.reset(int(current + i))
         p.conns[current+i] = p.wrapConn(c, false)
      }
	  // 本文博客來源:janrs.com
      current += i
      log.Printf("#janrs.com#grow pool: %d ---> %d, increment: %d, maxActive: %d\n",
         p.current, current, increment, p.opt.MaxActive)
      atomic.StoreInt32(&p.current, current)
      if err != nil {
         p.Unlock()
         return nil, err
      }
   }
   p.Unlock()
   next := atomic.AddUint32(&p.index, 1) % uint32(current)
   return p.conns[next], nil
}

Get 代碼邏輯

  • 先增加連接的引用計數,如果在設定 current*int32(p.opt.MaxConcurrentStreams) 范圍內,那么直接取連接進行使用即可。

  • 若當前的連接數達到了最大活躍的連接數,那么就看我們新建池子的時候傳遞的 option 中的 reuse 參數是否是 true,若是復用,則隨機取出連接池中的任意連接提供使用,如果不復用,則新建一個連接。

  • 其余的情況,就需要我們進行 2 倍或者 1 倍的數量對連接池進行擴容了。

也可以在 Get 的實現上進行縮容,具體的縮容策略可以根據實際情況來定,例如當引用計數 nextRef 只有當前活躍連接數的 10% 的時候(這只是一個例子),就可以考慮縮容了。

以上就是關于“Go如何創建Grpc鏈接池”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

邯郸市| 莆田市| 临朐县| 竹北市| 即墨市| 陇川县| 南投市| 台湾省| 青神县| 吴江市| 沁水县| 紫阳县| 襄樊市| 嘉义县| 石阡县| 陵水| 巴塘县| 双辽市| 宾阳县| 社会| 汾阳市| 慈利县| 揭东县| 左贡县| 新民市| 九江市| 遂川县| 靖宇县| 临朐县| 京山县| 怀远县| 巴彦县| 凤城市| 灵武市| 台山市| 封开县| 孙吴县| 鄂尔多斯市| 卫辉市| 和平区| 建水县|