您好,登錄后才能下訂單哦!
這篇“Go連接池設計與實現的方法是什么”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Go連接池設計與實現的方法是什么”文章吧。
如果不用連接池,而是每次請求都創建一個連接是比較昂貴的,因此需要完成3次tcp握手
同時在高并發場景下,由于沒有連接池的最大連接數限制,可以創建無數個連接,耗盡文件描述符
連接池就是為了復用這些創建好的連接
基本上連接池都會設計以下幾個參數:
初始連接數:在初始化連接池時就會預先創建好的連接數量,如果設置得:
過大:可能造成浪費
過小:請求到來時需要新建連接
最大空閑連接數maxIdle:池中最大緩存的連接個數,如果設置得:
過大:造成浪費,自己不用還把持著連接。因為數據庫整體的連接數是有限的,當前進程占用多了,其他進程能獲取的就少了
過小:無法應對突發流量
最大連接數maxCap:
如果已經用了maxCap個連接,要申請第maxCap+1個連接時,一般會阻塞在那里,直到超時或者別人歸還一個連接
最大空閑時間idleTimeout:當發現某連接空閑超過這個時間時,會將其關閉,重新去獲取連接
避免連接長時間沒用,自動失效的問題
連接池對外提供兩個方法,Get
:獲取一個連接,Put
:歸還一個連接
大部分連接池的實現大同小異,基本流程如下:
需要注意:
當有空閑連接時,需要進一步判斷連接是否有過期(超過最大空閑時間idleTimeout)
這些連接有可能很久沒用過了,在數據庫層面已經過期。如果貿然使用可能出現錯誤,因此最好檢查下是否超時
當陷入阻塞時,最好設置超時時間,避免一直沒等到有人歸還連接而一直阻塞
歸還連接時:
先看有沒有阻塞的獲取連接的請求,如果有轉交連接,并喚醒阻塞請求
否則看能否放回去空閑隊列,如果不能直接關閉請求
根據上面總結的流程,連接池還需要維護另外兩個結構:
空閑隊列
阻塞請求的隊列
數據結構:
// channelPool 存放連接信息 type channelPool struct { mu sync.RWMutex // 空閑連接 conns chan *idleConn // 產生新連接的方法 factory func() (interface{}, error) // 關閉連接的方法 close func(interface{}) error ping func(interface{}) error // 最大空閑時間,最大阻塞等待時間(實際沒用到) idleTimeout, waitTimeOut time.Duration // 最大連接數 maxActive int openingConns int // 阻塞的請求 connReqs []chan connReq }
可以看出,silenceper/pool
:
用channel實現了空閑連接隊列conns
為每個阻塞的請求創建一個channel,加入connReqs
中。這樣請求會阻塞在自己的channel上
func (c *channelPool) Get() (interface{}, error) { conns := c.getConns() if conns == nil { return nil, ErrClosed } for { select { // 如果有空閑連接 case wrapConn := <-conns: if wrapConn == nil { return nil, ErrClosed } //判斷是否超時,超時則丟棄 if timeout := c.idleTimeout; timeout > 0 { if wrapConn.t.Add(timeout).Before(time.Now()) { //丟棄并關閉該連接 c.Close(wrapConn.conn) continue } } //判斷是否失效,失效則丟棄,如果用戶沒有設定 ping 方法,就不檢查 if c.ping != nil { if err := c.Ping(wrapConn.conn); err != nil { c.Close(wrapConn.conn) continue } } return wrapConn.conn, nil // 沒有空閑連接 default: c.mu.Lock() log.Debugf("openConn %v %v", c.openingConns, c.maxActive) if c.openingConns >= c.maxActive { // 連接數已經達到上線,不能再創建連接 req := make(chan connReq, 1) c.connReqs = append(c.connReqs, req) c.mu.Unlock() // 將自己阻塞在channel上 ret, ok := <-req if !ok { return nil, ErrMaxActiveConnReached } // 再檢查一次是否超時 if timeout := c.idleTimeout; timeout > 0 { if ret.idleConn.t.Add(timeout).Before(time.Now()) { //丟棄并關閉該連接 c.Close(ret.idleConn.conn) continue } } return ret.idleConn.conn, nil } // 沒有超過最大連接數,創建一個新的連接 if c.factory == nil { c.mu.Unlock() return nil, ErrClosed } conn, err := c.factory() if err != nil { c.mu.Unlock() return nil, err } c.openingConns++ c.mu.Unlock() return conn, nil } } }
這段代碼基本符合上面介紹的Get流程,應該很好理解
需要注意:
當收到別人歸還的連接狗,這里再檢查了一次是否超時。但我認為這次檢查是沒必要的,因為別人剛用完,一般不可能超時
雖然在pool的數據結構定義中有waitTimeOut
字段,但實際沒有使用,即阻塞獲取可能無限期阻塞,這是一個優化點
// Put 將連接放回pool中 func (c *channelPool) Put(conn interface{}) error { if conn == nil { return errors.New("connection is nil. rejecting") } c.mu.Lock() if c.conns == nil { c.mu.Unlock() return c.Close(conn) } // 如果有請求在阻塞獲取連接 if l := len(c.connReqs); l > 0 { req := c.connReqs[0] copy(c.connReqs, c.connReqs[1:]) c.connReqs = c.connReqs[:l-1] // 將連接轉交 req <- connReq{ idleConn: &idleConn{conn: conn, t: time.Now()}, } c.mu.Unlock() return nil } else { // 否則嘗試是否能放回空閑連接隊列 select { case c.conns <- &idleConn{conn: conn, t: time.Now()}: c.mu.Unlock() return nil default: c.mu.Unlock() //連接池已滿,直接關閉該連接 return c.Close(conn) } } }
值得注意的是:
put方法喚醒阻塞請求時,從隊頭開始喚醒,這樣先阻塞的請求先被喚醒,保證了公平性
Go在官方庫sql中就實現了連接池,這樣的好處在于:
對于開發:就不用像java一樣,需要自己找第三方的連接池實現
對于driver的實現:只用關心怎么和數據庫交互,不用考慮連接池的問題
sql.DB
中和連接池相關的字段如下:
type DB struct { /** ... */ // 空閑連接隊列 freeConn []*driverConn // 阻塞請求的隊列 connRequests map[uint64]chan connRequest // 已經打開的連接 numOpen int // number of opened and pending open connections // 最大空閑連接 maxIdle int // zero means defaultMaxIdleConns; negative means 0 // 最大連接數 maxOpen int // <= 0 means unlimited // ... }
繼續看獲取連接:
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) { // 檢測連接池是否被關閉 db.mu.Lock() if db.closed { db.mu.Unlock() return nil, errDBClosed } select { default: // 檢測ctx是否超時 case <-ctx.Done(): db.mu.Unlock() return nil, ctx.Err() } lifetime := db.maxLifetime db.numOpen++ // optimistically db.mu.Unlock() ci, err := db.connector.Connect(ctx) if err != nil { db.mu.Lock() db.numOpen-- // correct for earlier optimism db.maybeOpenNewConnections() db.mu.Unlock() return nil, err } db.mu.Lock() dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, inUse: true, } db.addDepLocked(dc, dc) db.mu.Unlock() return dc, nil }
接下來檢測是否有空閑連接:
numFree := len(db.freeConn) // 如果有空閑連接 if strategy == cachedOrNewConn && numFree > 0 { // 從隊頭取一個 conn := db.freeConn[0] copy(db.freeConn, db.freeConn[1:]) db.freeConn = db.freeConn[:numFree-1] conn.inUse = true db.mu.Unlock() if conn.expired(lifetime) { conn.Close() return nil, driver.ErrBadConn } // Reset the session if required. if err := conn.resetSession(ctx); err == driver.ErrBadConn { conn.Close() return nil, driver.ErrBadConn } return conn, nil }
以上代碼是1.14版本,但是到了1.18以后,獲取空閑連接的方式發生了變化:
last := len(db.freeConn) - 1 if strategy == cachedOrNewConn && last >= 0 { // 從最后一個位置獲取連接 conn := db.freeConn[last] db.freeConn = db.freeConn[:last] conn.inUse = true if conn.expired(lifetime) { db.maxLifetimeClosed++ db.mu.Unlock() conn.Close() return nil, driver.ErrBadConn }
可以看出,1.14版本從隊首獲取,1.18改成從隊尾獲取連接
為啥從隊尾拿連接?
因為隊尾的連接是才放進去的,該連接過期的概率比隊首連接小
繼續看:
// 如果已經達到最大連接數 if db.maxOpen > 0 && db.numOpen >= db.maxOpen { req := make(chan connRequest, 1) reqKey := db.nextRequestKeyLocked() db.connRequests[reqKey] = req db.waitCount++ db.mu.Unlock() waitStart := time.Now() // 阻塞當前請求,要么ctx超時,要么別人歸還了連接 select { case <-ctx.Done(): db.mu.Lock() // 把自己從阻塞隊列中刪除 delete(db.connRequests, reqKey) db.mu.Unlock() atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) select { default: case ret, ok := <-req: if ok && ret.conn != nil { db.putConn(ret.conn, ret.err, false) } } return nil, ctx.Err() case ret, ok := <-req: // 別人歸還連接 atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) if !ok { return nil, errDBClosed } if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) { ret.conn.Close() return nil, driver.ErrBadConn } if ret.conn == nil { return nil, ret.err } return ret.conn, ret.err } }
這里需要注意,在ctx超時分支中:
首先把自己從阻塞隊列中刪除
再檢查一下req中是否有連接,如果有,將連接放回連接池
奇怪的是為啥把自己刪除后,req
還可能收到連接呢?
因為put
連接時,會先拿出一個阻塞連接的req,如果這里刪除req在put拿出req:
之前:那沒問題,put不可能再放該req發送連接
之后:那有可能put往該req發送了連接,因此需要再檢查下req中是否有連接,如果有歸還
也解釋了為啥阻塞隊列要用map
:
用于快速找到自己的req,并刪除
最后看看put:
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool { if db.closed { return false } if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } // 有阻塞的請求,轉移連接 if c := len(db.connRequests); c > 0 { var req chan connRequest var reqKey uint64 for reqKey, req = range db.connRequests { break } delete(db.connRequests, reqKey) // Remove from pending requests. if err == nil { dc.inUse = true } req <- connRequest{ conn: dc, err: err, } return true // 判斷能否放回空閑隊列 } else if err == nil && !db.closed { if db.maxIdleConnsLocked() > len(db.freeConn) { db.freeConn = append(db.freeConn, dc) db.startCleanerLocked() return true } db.maxIdleClosed++ } return false }
以上就是關于“Go連接池設計與實現的方法是什么”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。