您好,登錄后才能下訂單哦!
本篇內容主要講解“怎么打造高性能的 Go 緩存庫”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“怎么打造高性能的 Go 緩存庫”吧!
我在看一些優秀的開源庫的時候看到一個有意思的緩存庫 fastcache,在它的介紹主要有以下幾點特點:
讀寫數據要快,即使在并發下;
即使在數 GB 的緩存中,也要保持很好的性能,以及盡可能減少 GC 次數;
設計盡可能簡單;
本文會通過模仿它寫一個簡單的緩存庫,從而研究其內核是如何實現這樣的目標的。希望各位能有所收獲。
在項目中,我們經常會用到 Go 緩存庫比如說 patrickmn/go-cache庫。但很多緩存庫其實都是用一個簡單的 Map 來存放數據,這些庫在使用的時候,當并發低,數據量少的時候是沒有問題的,但是在數據量比較大并發比較高的時候會延長 GC 時間,增加內存分配次數。
比如,我們使用一個簡單的例子:
func main() { a := make(map[string]string, 1e9) for i := 0; i < 10; i++ { runtime.GC() } runtime.KeepAlive(a) }
在這個例子中,預分配了大小是10億(1e9) 的 map,然后我們通過 gctrace 輸出一下 GC 情況:
做實驗的環境是 Linux,機器配置是 16C 8G ,想要更深入理解 GC,可以看這篇:《 Go 語言 GC 實現原理及源碼分析 https://www.luozhiyun.com/archives/475 》
[root@localhost gotest]# GODEBUG=gctrace=1 go run main.go ... gc 6 @13.736s 17%: 0.010+1815+0.004 ms clock, 0.17+0/7254/21744+0.067 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced) gc 7 @15.551s 18%: 0.012+1796+0.005 ms clock, 0.20+0/7184/21537+0.082 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced) gc 8 @17.348s 19%: 0.008+1794+0.004 ms clock, 0.14+0/7176/21512+0.070 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced) gc 9 @19.143s 19%: 0.010+1819+0.005 ms clock, 0.16+0/7275/21745+0.085 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced) gc 10 @20.963s 19%: 0.011+1844+0.004 ms clock, 0.18+0/7373/22057+0.076 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced)
上面展示了最后 5 次 GC 的情況,下面我們看看具體的含義是什么:
gc 1 @0.004s 4%: 0.22+1.4+0.021 ms clock, 1.7+0.009/0.40/0.073+0.16 ms cpu, 4->5->1 MB, 5 MB goal, 8 P gc 10 @20.963s 19%: 0.011+1844+0.004 ms clock, 0.18+0/7373/22057+0.076 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced) gc 10 :程序啟動以來第10次GC @20.963s:距離程序啟動到現在的時間 19%:當目前為止,GC 的標記工作所用的CPU時間占總CPU的百分比
垃圾回收的時間
gc 1 @0.004s 4%: 0.22+1.4+0.021 ms clock, 1.7+0.009/0.40/0.073+0.16 ms cpu, 4->5->1 MB, 5 MB goal, 8 P gc 10 @20.963s 19%: 0.011+1844+0.004 ms clock, 0.18+0/7373/22057+0.076 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced) gc 10 :程序啟動以來第10次GC @20.963s:距離程序啟動到現在的時間 19%:當目前為止,GC 的標記工作所用的CPU時間占總CPU的百分比 垃圾回收的時間 0.011 ms:標記開始 STW 時間 1844 ms:并發標記時間 0.004 ms:標記終止 STW 時間 垃圾回收占用cpu時間 0.18 ms:標記開始 STW 時間 0 ms:mutator assists占用的時間 7373 ms:標記線程占用的時間 22057 ms:idle mark workers占用的時間 0.076 ms:標記終止 STW 時間 內存 73984 MB:標記開始前堆占用大小 73984 MB:標記結束后堆占用大小 73984 MB:標記完成后存活堆的大小 147968 MB goal:標記完成后正在使用的堆內存的目標大小 16 P:使用了多少處理器
可以從上面的輸出看到每次 GC 處理的時間非常的長,占用的 CPU 資源也非常多。那么造成這樣的原因是什么呢?
string 實際上底層數據結構是由兩部分組成,其中包含指向字節數組的指針和數組的大小:
type StringHeader struct { Data uintptr Len int }
由于 StringHeader中包含指針,所以每次 GC 的時候都會掃描每個指針,那么在這個巨大的 map中是包含了非常多的指針的,所以造成了巨大的資源消耗。
在上面的例子 map a 中數據大概是這樣存儲:
一個 map 中里面有多個 bucket ,bucket 里面有一個 bmap 數組用來存放數據,但是由于 key 和 value 都是 string 類型的,所以在 GC 的時候還需要根據 StringHeader中的 Data指針掃描 string 數據。
對于這種情況,如果所有的 string 字節都在一個單一內存片段中,我們就可以通過偏移來追蹤某個字符串在這段內存中的開始和結束位置。通過追蹤偏移,我們不在需要在我們大數組中存儲指針,GC 也不在會被困擾。如下:
如同上面所示,如果我們將字符串中的字節數據拷貝到一個連續的字節數組 chunks 中,并為這個字節數組提前分配好內存,并且僅存儲字符串在數組中的偏移而不是指針。
除了上面所說的優化內容以外,還有其他的方法嗎?
其實我們還可以直接從系統 OS 中調用 mmap syscall 進行內存分配,這樣 GC 就永遠不會對這塊內存進行內存管理,因此也就不會掃描到它。如下:
func main() { test := "hello syscall" data, _ := syscall.Mmap(-1, 0, 13, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) p := (*[13]byte)(unsafe.Pointer(&data[0])) for i := 0; i < 13; i++ { p[i] = test[i] } fmt.Println(string(p[:])) }
通過系統調用直接向 OS 申請了 13bytes 的內存,然后將一個字符串寫入到申請的內存數組中。
所以我們也可以通過提前向 OS 申請一塊內存,而不是用的時候才申請內存,減少頻繁的內存分配從而達到提高性能的目的。
我們在開發前先把這個庫的 API 定義一下:
func New(maxBytes int) *Cache
創建一個 Cache 結構體,傳入預設的緩存大小,單位是字節。
func (c *Cache) Get(k []byte) []byte
獲取 Cache 中的值,傳入的參數是 byte 數組。
func (c *Cache) Set(k, v []byte)
設置鍵值對到緩存中,k 是鍵,v 是值,參數都是 byte 數組。
const bucketsCount = 512 type Cache struct { buckets [bucketsCount]bucket } type bucket struct { // 讀寫鎖 mu sync.RWMutex // 二維數組,存放數據的地方,是一個環形鏈表 chunks [][]byte // 索引字典 m map[uint64]uint64 // 索引值 idx uint64 // chunks 被重寫的次數,用來校驗環形鏈表中數據有效性 gen uint64 }
通過我們上面的分析,可以看到,實際上真正存放數據的地方是 chunks 二維數組,在實現上是通過 m 字段來映射索引路徑,根據 chunks 和 gen 兩個字段來構建一個環形鏈表,環形鏈表每轉一圈 gen 就會加一。
func New(maxBytes int) *Cache { if maxBytes <= 0 { panic(fmt.Errorf("maxBytes must be greater than 0; got %d", maxBytes)) } var c Cache // 算出每個桶的大小 maxBucketBytes := uint64((maxBytes + bucketsCount - 1) / bucketsCount) for i := range c.buckets[:] { // 對桶進行初始化 c.buckets[i].Init(maxBucketBytes) } return &c }
我們會設置一個 New 函數來初始化我們 Cache 結構體,在 Cache 結構體中會將緩存的數據大小平均分配到每個桶中,然后對每個桶進行初始化。
const bucketSizeBits = 40 const maxBucketSize uint64 = 1 << bucketSizeBits const chunkSize = 64 * 1024 func (b *bucket) Init(maxBytes uint64) { if maxBytes == 0 { panic(fmt.Errorf("maxBytes cannot be zero")) } // 我們這里限制每個桶最大的大小是 1024 GB if maxBytes >= maxBucketSize { panic(fmt.Errorf("too big maxBytes=%d; should be smaller than %d", maxBytes, maxBucketSize)) } // 初始化 Chunks 中每個 Chunk 大小為 64 KB,計算 chunk 數量 maxChunks := (maxBytes + chunkSize - 1) / chunkSize b.chunks = make([][]byte, maxChunks) b.m = make(map[uint64]uint64) // 初始化 bucket 結構體 b.Reset() }
在這里會將桶里面的內存按 chunk 進行分配,每個 chunk 占用內存約為 64 KB。在最后會調用 bucket 的 Reset 方法對 bucket 結構體進行初始化。
func (b *bucket) Reset() { b.mu.Lock() chunks := b.chunks // 遍歷 chunks for i := range chunks { // 將 chunk 中的內存歸還到緩存中 putChunk(chunks[i]) chunks[i] = nil } // 刪除索引字典中所有的數據 bm := b.m for k := range bm { delete(bm, k) } b.idx = 0 b.gen = 1 b.mu.Unlock() }
Reset 方法十分簡單,主要就是清空 chunks 數組、刪除索引字典中所有的數據以及重置索引 idx 和 gen 的值。
在上面這個方法中有一個 putChunk ,其實這個就是直接操作我們提前向 OS 申請好的內存,相應的還有一個 getChunk 方法。下面我們具體看看 Chunk 的操作。
const chunksPerAlloc = 1024 const chunkSize = 64 * 1024 var ( freeChunks []*[chunkSize]byte freeChunksLock sync.Mutex ) func getChunk() []byte { freeChunksLock.Lock() if len(freeChunks) == 0 { // 分配 64 * 1024 * 1024 = 64 MB 內存 data, err := syscall.Mmap(-1, 0, chunkSize*chunksPerAlloc, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE) if err != nil { panic(fmt.Errorf("cannot allocate %d bytes via mmap: %s", chunkSize*chunksPerAlloc, err)) } // 循環遍歷 data 數據 for len(data) > 0 { //將從系統分配的內存分為 64 * 1024 = 64 KB 大小,存放到 freeChunks中 p := (*[chunkSize]byte)(unsafe.Pointer(&data[0])) freeChunks = append(freeChunks, p) data = data[chunkSize:] } } //從 freeChunks 獲取最后一個元素 n := len(freeChunks) - 1 p := freeChunks[n] freeChunks[n] = nil freeChunks = freeChunks[:n] freeChunksLock.Unlock() return p[:] }
初次調用 getChunk 函數時會使用系統調用分配 64MB 的內存,然后循環將內存切成 1024 份,每份 64KB 放入到 freeChunks 空閑列表中。然后獲取每次都獲取 freeChunks 空閑列表最后一個元素 64KB 內存返回。需要注意的是 getChunk 會下下面將要介紹到的 Cache 的 set 方法中使用到,所以需要考慮到并發問題,所以在這里加了鎖。
func putChunk(chunk []byte) { if chunk == nil { return } chunk = chunk[:chunkSize] p := (*[chunkSize]byte)(unsafe.Pointer(&chunk[0])) freeChunksLock.Lock() freeChunks = append(freeChunks, p) freeChunksLock.Unlock() }
putChunk 函數就是將內存數據還回到 freeChunks 空閑列表中,會在 bucket 的 Reset 方法中被調用。
const bucketsCount = 512 func (c *Cache) Set(k, v []byte) { h := xxhash.Sum64(k) idx := h % bucketsCount c.buckets[idx].Set(k, v, h) }
Set 方法里面會根據 k 的值做一個 hash,然后取模映射到 buckets 桶中,這里用的 hash 庫是 cespare/xxhash。
最主要的還是 buckets 里面的 Set 方法:
func (b *bucket) Set(k, v []byte, h uint64) { // 限定 k v 大小不能超過 2bytes if len(k) >= (1<<16) || len(v) >= (1<<16) { return } // 4個byte 設置每條數據的數據頭 var kvLenBuf [4]byte kvLenBuf[0] = byte(uint16(len(k)) >> 8) kvLenBuf[1] = byte(len(k)) kvLenBuf[2] = byte(uint16(len(v)) >> 8) kvLenBuf[3] = byte(len(v)) kvLen := uint64(len(kvLenBuf) + len(k) + len(v)) // 校驗一下大小 if kvLen >= chunkSize { return } b.mu.Lock() // 當前索引位置 idx := b.idx // 存放完數據后索引的位置 idxNew := idx + kvLen // 根據索引找到在 chunks 的位置 chunkIdx := idx / chunkSize chunkIdxNew := idxNew / chunkSize // 新的索引是否超過當前索引 // 因為還有chunkIdx等于chunkIdxNew情況,所以需要先判斷一下 if chunkIdxNew > chunkIdx { // 校驗是否新索引已到chunks數組的邊界 // 已到邊界,那么循環鏈表從頭開始 if chunkIdxNew >= uint64(len(b.chunks)) { idx = 0 idxNew = kvLen chunkIdx = 0 b.gen++ // 當 gen 等于 1<<genSizeBits時,才會等于0 // 也就是用來限定 gen 的邊界為1<<genSizeBits if b.gen&((1<<genSizeBits)-1) == 0 { b.gen++ } } else { // 未到 chunks數組的邊界,從下一個chunk開始 idx = chunkIdxNew * chunkSize idxNew = idx + kvLen chunkIdx = chunkIdxNew } // 重置 chunks[chunkIdx] b.chunks[chunkIdx] = b.chunks[chunkIdx][:0] } chunk := b.chunks[chunkIdx] if chunk == nil { chunk = getChunk() // 清空切片 chunk = chunk[:0] } // 將數據 append 到 chunk 中 chunk = append(chunk, kvLenBuf[:]...) chunk = append(chunk, k...) chunk = append(chunk, v...) b.chunks[chunkIdx] = chunk // 因為 idx 不能超過bucketSizeBits,所以用一個 uint64 同時表示gen和idx // 所以高于bucketSizeBits位置表示gen // 低于bucketSizeBits位置表示idx b.m[h] = idx | (b.gen << bucketSizeBits) b.idx = idxNew b.mu.Unlock() }
鴻蒙官方戰略合作共建——HarmonyOS技術社區
在這段代碼開頭實際上我會限制鍵值的大小不能超過 2bytes;
然后將 2bytes 大小長度的鍵值封裝到 4bytes 的 kvLenBuf 作為數據頭,數據頭和鍵值的總長度是不能超過一個 chunk 長度,也就是 64 * 1024;
然后計算出原索引 chunkIdx 和新索引 chunkIdxNew,用來判斷這次添加的數據加上原來的數據有沒有超過一個 chunk 長度;
根據新的索引找到對應的 chunks 中的位置,然后將鍵值以及 kvLenBuf 追加到 chunk 后面;
設置新的 idx 以及 m 字典對應的值,m 字典中存放的是 gen 和 idx 通過取與的放置存放。
在 Set 一個鍵值對會有 4bytes 的 kvLenBuf 作為數據頭,后面的數據會接著 key 和 value ,在 kvLenBuf 中,前兩個 byte 分別代表了 key 長度的低位和高位;后兩個 byte 分別代表了 value 長度的低位和高位,數據圖大致如下:
下面舉個例子來看看是是如何利用 chunks 這個二維數組來實現環形鏈表的。
我們在 bucket 的 Init 方法中會根據傳入 maxBytes 桶字節數來設置 chunks 的長度大小,由于每個 chunk 大小都是 64 * 1024bytes,那么我們設置 3 * 64 * 1024bytes 大小的桶,那么 chunks 數組長度就為 3。
如果當前算出 chunkIdx 在 chunks 數組為 1 的位置,并且在 chunks[1] 的位置中,還剩下 6bytes 未被使用,那么有如下幾種情況:
現在假設放入的鍵值長度都是 1byte,那么在 chunks[1] 的位置中剩下的 6bytes 剛好可以放下;
現在假設放入的鍵值長度超過了 1byte,那么在 chunks[1] 的位置中剩下的位置就放不下,只能放入到 chunks[2] 的位置中。
如果當前算出 chunkIdx 在 chunks 數組為 2 的位置,并且現在 Set 一個鍵值,經過計算 chunkIdxNew 為 3,已經超過了 chunks 數組長度,那么會將索引重置,重新將數據從 chunks[0] 開始放置,并將 gen 加一,表示已經跑完一圈了。
func (c *Cache) Get(dst, k []byte) []byte { h := xxhash.Sum64(k) idx := h % bucketsCount dst, _ = c.buckets[idx].Get(dst, k, h, true) return dst }
這里和 Set 方法是一樣的,首先是要找到對應的桶的位置,然后才去桶里面拿數據。需要注意的是,這里的 dst 可以從外部傳入一個切片,以達到減少重復分配返回值。
func (b *bucket) Get(dst, k []byte, h uint64,returnDst bool) ([]byte, bool) { found := false b.mu.RLock() v := b.m[h] bGen := b.gen & ((1 << genSizeBits) - 1) if v > 0 { // 高于bucketSizeBits位置表示gen gen := v >> bucketSizeBits // 低于bucketSizeBits位置表示idx idx := v & ((1 << bucketSizeBits) - 1) // 這里說明chunks還沒被寫滿 if gen == bGen && idx < b.idx || // 這里說明chunks已被寫滿,并且當前數據沒有被覆蓋 gen+1 == bGen && idx >= b.idx || // 這里是邊界條件gen已是最大,并且chunks已被寫滿bGen從1開始,,并且當前數據沒有被覆蓋 gen == maxGen && bGen == 1 && idx >= b.idx { chunkIdx := idx / chunkSize // chunk 索引位置不能超過 chunks 數組長度 if chunkIdx >= uint64(len(b.chunks)) { goto end } // 找到數據所在的 chunk chunk := b.chunks[chunkIdx] // 通過取模找到該key 對應的數據在 chunk 中的位置 idx %= chunkSize if idx+4 >= chunkSize { goto end } // 前 4bytes 是數據頭 kvLenBuf := chunk[idx : idx+4] // 通過數據頭算出鍵值的長度 keyLen := (uint64(kvLenBuf[0]) << 8) | uint64(kvLenBuf[1]) valLen := (uint64(kvLenBuf[2]) << 8) | uint64(kvLenBuf[3]) idx += 4 if idx+keyLen+valLen >= chunkSize { goto end } // 如果鍵值是一致的,表示找到該數據 if string(k) == string(chunk[idx:idx+keyLen]) { idx += keyLen // 返回該鍵對應的值 if returnDst { dst = append(dst, chunk[idx:idx+valLen]...) } found = true } } } end: b.mu.RUnlock() return dst, found }
Get 方法主要是考慮環形鏈表的邊界問題。我們在 Set 方法中會將每一個 key 對應的 gen 和 idx 索引存放到 m 字典中,所以我們通過 hash 獲取 m 字典的值之后通過位運算就可以獲取到 gen 和 idx 索引。
找到 gen 和 idx 索引之后就是邊界條件的判斷了,用一個 if 條件來進行判斷:
gen == bGen && idx < b.idx
這里是判斷如果是在環形鏈表的同一次循環中,那么 key 對應的索引應該小于當前桶的索引;
gen+1 == bGen && idx >= b.idx
這里表示當前桶已經進入到下一個循環中,所以需要判斷 key 對應的索引是不是大于當前索引,以表示當前 key 對應的值沒有被覆蓋;
gen == maxGen && bGen == 1 && idx >= b.idx
因為 gen 和 idx 索引要塞到 uint64 類型的字段中,所以留給 gen 的最大值只有 maxGen = 1<< 24 -1,超過了 maxGen 會讓 gen 從 1 開始。所以這里如果 key 對應 gen 等于 maxGen ,那么當前的 bGen 應該等于 1,并且 key 對應的索引還應該大于當前 idx,這樣才這個鍵值對才不會被覆蓋。
判斷完邊界條件之后就會找到對應的 chunk ,然后取模后找到數據位置,通過偏移量找到并取出值。
下面我上一下過后的 Benchmark:
代碼位置: https://github.com/devYun/mycache/blob/main/cache_timing_test.go
GOMAXPROCS=4 go test -bench='Set|Get' -benchtime=10s goos: linux goarch: amd64 pkg: gotest // GoCache BenchmarkGoCacheSet-4 836 14595822 ns/op 4.49 MB/s 2167340 B/op 65576 allocs/op BenchmarkGoCacheGet-4 3093 3619730 ns/op 18.11 MB/s 5194 B/op 23 allocs/op BenchmarkGoCacheSetGet-4 236 54379268 ns/op 2.41 MB/s 2345868 B/op 65679 allocs/op // BigCache BenchmarkBigCacheSet-4 1393 12763995 ns/op 5.13 MB/s 6691115 B/op 8 allocs/op BenchmarkBigCacheGet-4 2526 4342561 ns/op 15.09 MB/s 650870 B/op 131074 allocs/op BenchmarkBigCacheSetGet-4 1063 11180201 ns/op 11.72 MB/s 4778699 B/op 131081 allocs/op // standard map BenchmarkStdMapSet-4 1484 7299296 ns/op 8.98 MB/s 270603 B/op 65537 allocs/op BenchmarkStdMapGet-4 4278 2480523 ns/op 26.42 MB/s 2998 B/op 15 allocs/op BenchmarkStdMapSetGet-4 343 39367319 ns/op 3.33 MB/s 298764 B/op 65543 allocs/op // sync.map BenchmarkSyncMapSet-4 756 15951363 ns/op 4.11 MB/s 3420214 B/op 262320 allocs/op BenchmarkSyncMapGet-4 11826 1010283 ns/op 64.87 MB/s 1075 B/op 33 allocs/op BenchmarkSyncMapSetGet-4 1910 5507036 ns/op 23.80 MB/s 3412764 B/op 262213 allocs/op PASS ok gotest 215.182s
上面的測試是 GoCache、BigCache、Map、sync.Map 的情況。下面是本篇文章中所開發的緩存庫的測試:
// myCachce BenchmarkCacheSet-4 4371 2723208 ns/op 24.07 MB/s 1306 B/op 2 allocs/op BenchmarkCacheGet-4 6003 1884611 ns/op 34.77 MB/s 951 B/op 1 allocs/op BenchmarkCacheSetGet-4 2044 6611759 ns/op 19.82 MB/s 2797 B/op 5 allocs/op
可以看到內存分配是幾乎就不存在,操作速度在上面的庫中也是佼佼者的存在。
在本文中根據其他緩存庫,并分析了如果用 Map 作為緩存所存在的問題,然后引出存在這個問題的原因,并提出解決方案;在我們的緩存庫中,第一是通過使用索引加內存塊的方式來存放緩存數據,再來是通過 OS 系統調用來進行內存分配讓我們的緩存數據塊脫離了 GC 的控制,從而做到降低 GC 頻率提高并發的目的。
其實不只是緩存庫,在我們的項目中當遇到需要使用大量的帶指針的數據結構并需要長時間保持引用的時候,也是需要注意這樣做可能會引發 GC 問題,從而給系統帶來隱患。
到此,相信大家對“怎么打造高性能的 Go 緩存庫”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。