您好,登錄后才能下訂單哦!
小編給大家分享一下golang中如何實現并發安全Map以及分段鎖,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
分斷鎖
type SimpleCache struct { mu sync.RWMutex items map[interface{}]*simpleItem }
在日常開發中, 上述這種數據結構肯定不少見,因為golang的原生map是非并發安全的,所以為了保證map的并發安全,最簡單的方式就是給map加鎖。
之前使用過兩個本地內存緩存的開源庫, gcache, cache2go,其中存儲緩存對象的結構都是這樣,對于輕量級的緩存庫,為了設計簡潔(包含清理過期對象等 ) 再加上當需要緩存大量數據時有redis,memcache等明星項目解決。 但是如果拋開這些因素遇到真正數量巨大的數據量時,直接對一個map加鎖,當map中的值越來越多,訪問map的請求越來越多,大家都競爭這一把鎖顯得并發訪問控制變重。 在go1.9引入sync.Map 之前,比較流行的做法就是使用分段鎖,顧名思義就是將鎖分段,將鎖的粒度變小,將存儲的對象分散到各個分片中,每個分片由一把鎖控制,這樣使得當需要對在A分片上的數據進行讀寫時不會影響B分片的讀寫。
分段鎖的實現
// Map 分片 type ConcurrentMap []*ConcurrentMapShared // 每一個Map 是一個加鎖的并發安全Map type ConcurrentMapShared struct { items map[string]interface{} sync.RWMutex // 各個分片Map各自的鎖 }
主流的分段鎖,即通過hash取模的方式找到當前訪問的key處于哪一個分片之上,再對該分片進行加鎖之后再讀寫。分片定位時,常用有BKDR, FNV32等hash算法得到key的hash值。
func New() ConcurrentMap { // SHARD_COUNT 默認32個分片 m := make(ConcurrentMap, SHARD_COUNT) for i := 0; i < SHARD_COUNT; i++ { m[i] = &ConcurrentMapShared{ items: make(map[string]interface{}), } } return m }
在初始化好分片后, 對分片上的數據進行讀寫時就需要用hash取模進行分段定位來確認即將要讀寫的分片。
獲取段定位
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared { return m[uint(fnv32(key))%uint(SHARD_COUNT)] } // FNV hash func fnv32(key string) uint32 { hash := uint32(2166136261) const prime32 = uint32(16777619) for i := 0; i < len(key); i++ { hash *= prime32 hash ^= uint32(key[i]) } return hash }
之后對于map的GET SET 就簡單順利成章的完成
Set And Get
func (m ConcurrentMap) Set(key string, value interface{}) { shard := m.GetShard(key) // 段定位找到分片 shard.Lock() // 分片上鎖 shard.items[key] = value // 分片操作 shard.Unlock() // 分片解鎖 } func (m ConcurrentMap) Get(key string) (interface{}, bool) { shard := m.GetShard(key) shard.RLock() val, ok := shard.items[key] shard.RUnlock() return val, ok }
由此一個分段鎖Map就實現了, 但是比起普通的Map, 常用到的方法比如獲取所有key, 獲取所有Val 操作是要比原生Map復雜的,因為要遍歷每一個分片的每一個數據, 好在golang的并發特性使得解決這類問題變得非常簡單
Keys
// 統計當前分段map中item的個數 func (m ConcurrentMap) Count() int { count := 0 for i := 0; i < SHARD_COUNT; i++ { shard := m[i] shard.RLock() count += len(shard.items) shard.RUnlock() } return count } // 獲取所有的key func (m ConcurrentMap) Keys() []string { count := m.Count() ch := make(chan string, count) // 每一個分片啟動一個協程 遍歷key go func() { wg := sync.WaitGroup{} wg.Add(SHARD_COUNT) for _, shard := range m { go func(shard *ConcurrentMapShared) { defer wg.Done() shard.RLock() // 每個分片中的key遍歷后都寫入統計用的channel for key := range shard.items { ch <- key } shard.RUnlock() }(shard) } wg.Wait() close(ch) }() keys := make([]string, count) // 統計各個協程并發讀取Map分片的key for k := range ch { keys = append(keys, k) } return keys }
這里寫了一個benchMark來對該分段鎖Map和原生的Map加鎖方式進行壓測, 場景為將一萬個不重復的鍵值對同時以100萬次寫和100萬次讀,分別進行5次壓測, 如下壓測代碼
func BenchmarkMapShared(b *testing.B) { num := 10000 testCase := genNoRepetTestCase(num) // 10000個不重復的鍵值對 m := New() for _, v := range testCase { m.Set(v.Key, v.Val) } b.ResetTimer() for i := 0; i < 5; i++ { b.Run(strconv.Itoa(i), func(b *testing.B) { b.N = 1000000 wg := sync.WaitGroup{} wg.Add(b.N * 2) for i := 0; i < b.N; i++ { e := testCase[rand.Intn(num)] go func(key string, val interface{}) { m.Set(key, val) wg.Done() }(e.Key, e.Val) go func(key string) { _, _ = m.Get(key) wg.Done() }(e.Key) } wg.Wait() }) } }
原生Map加鎖壓測結果
分段鎖壓測結果
可以看出在將鎖的粒度細化后再面對大量需要控制并發安全的訪問時,分段鎖Map的耗時比原生Map加鎖要快3倍有余
Sync.Map
go1.9之后加入了支持并發安全的Map sync.Map, sync.Map 通過一份只使用原子操作的數據和一份冗余了只讀數據的加鎖數據實現一定程度上的讀寫分離,使得大多數讀操作和更新操作是原子操作,寫入新數據才加鎖的方式來提升性能。以下是 sync.Map源碼剖析, 結構體中的注釋都會在具體實現代碼中提示相呼應
type Map struct { // 保護dirty的鎖 mu Mutex // 只讀數據(修改采用原子操作) read atomic.Value // 包含只讀中所有數據(冗余),寫入新數據時也在dirty中操作 dirty map[interface{}]*entry // 當原子操作訪問只讀read時找不到數據時會去dirty中尋找,此時misses+1,dirty及作為存儲新寫入的數據,又冗余了只讀結構中的數據,所以當misses > dirty 的長度時, 會將dirty升級為read,同時將老的dirty置nil misses int } // Map struct 中的 read 就是readOnly 的指針 type readOnly struct { // 基礎Map m map[interface{}]*entry // 用于表示當前dirty中是否有read中不存在的數據, 在寫入數據時, 如果發現dirty中沒有新數據且dirty為nil時,會將read中未被刪除的數據拷貝一份冗余到dirty中, 過程與Map struct中的 misses相呼應 amended bool } // 數據項 type entry struct { p unsafe.Pointer } // 用于標記數據項已被刪除(主要保證數據冗余時的并發安全) // 上述Map結構中說到有一個將read數據拷貝冗余至dirty的過程, 因為刪除數據項是將*entry置nil, 為了避免冗余過程中因并發問題導致*entry改變而影響到拷貝后的dirty正確性,所以sync.Map使用expunged來標記entry是否被刪除 var expunged = unsafe.Pointer(new(interface{}))
在下面sync.Map具體實現中將會看到很多“雙檢查”代碼,因為通過原子操作獲取的值可能在進行其他非原子操作過程中已改變,所以再非原子操作后需要使用之前原子操作獲取的值需要再次進行原子操作獲取。
compareAndSwap 交換并比較, 用于在多線程編程中實現不被打斷的數據交換操作,從而避免多線程同時改寫某一數據時導致數據不一致問題。
sync.Map Write
func (m *Map) Store(key, value interface{}) { // 先不上鎖,而是從只讀數據中按key讀取, 如果已存在以compareAndSwap操作進行覆蓋(update) read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok && e.tryStore(&value) { return } m.mu.Lock() // 雙檢查獲取read read, _ = m.read.Load().(readOnly) // 如果data在read中,更新entry if e, ok := read.m[key]; ok { // 如果原子操作讀到的數據是被標記刪除的, 則視為新數據寫入dirty if e.unexpungeLocked() { m.dirty[key] = e } // 原子操作寫新數據 e.storeLocked(&value) } else if e, ok := m.dirty[key]; ok { // 原子操作寫新數據 e.storeLocked(&value) } else { // 新數據 // 當dirty中沒有新數據時,將read中數據冗余到dirty if !read.amended { m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) } m.mu.Unlock() } func (e *entry) tryStore(i *interface{}) bool { p := atomic.LoadPointer(&e.p) if p == expunged { return false } for { if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true } p = atomic.LoadPointer(&e.p) if p == expunged { return false } } } // 在dirty中沒有比read多出的新數據時觸發冗余 func (m *Map) dirtyLocked() { if m.dirty != nil { return } read, _ := m.read.Load().(readOnly) m.dirty = make(map[interface{}]*entry, len(read.m)) for k, e := range read.m { // 檢查entry是否被刪除, 被刪除的數據不冗余 if !e.tryExpungeLocked() { m.dirty[k] = e } } } func (e *entry) tryExpungeLocked() (isExpunged bool) { p := atomic.LoadPointer(&e.p) for p == nil { // 將被刪除(置nil)的數據以cas原子操作標記為expunged(防止因并發情況下其他操作導致冗余進dirty的數據不正確) if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { return true } p = atomic.LoadPointer(&e.p) } return p == expunged }
sync.Map Read
func (m *Map) Load(key interface{}) (value interface{}, ok bool) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] // 只讀數據中沒有,并且dirty有比read多的數據,加鎖在dirty中找 if !ok && read.amended { m.mu.Lock() // 雙檢查, 因為上鎖之前的語句是非原子性的 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { // 只讀中沒有讀取到的次數+1 e, ok = m.dirty[key] // 檢查是否達到觸發dirty升級read的條件 m.missLocked() } m.mu.Unlock() } if !ok { return nil, false } // atomic.Load 但被標記為刪除的會返回nil return e.load() } func (m *Map) missLocked() { m.misses++ if m.misses < len(m.dirty) { return } m.read.Store(readOnly{m: m.dirty}) m.dirty = nil m.misses = 0 }
sync.Map DELETE
func (m *Map) Delete(key interface{}) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] // 只讀中不存在需要到dirty中去刪除 if !ok && read.amended { m.mu.Lock() // 雙檢查, 因為上鎖之前的語句是非原子性的 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { delete(m.dirty, key) } m.mu.Unlock() } if ok { e.delete() } } func (e *entry) delete() (hadValue bool) { for { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, nil) { return true } } }
同樣以剛剛壓測原生加鎖Map和分段鎖的方式來壓測sync.Map
壓測平均下來sync.Map和分段鎖差別不大,但是比起分段鎖, sync.Map則將鎖的粒度更加的細小到對數據的狀態上,使得大多數據可以無鎖化操作, 同時比分段鎖擁有更好的拓展性,因為分段鎖使用前總是要定一個分片數量, 在做擴容或者縮小時很麻煩, 但要達到sync.Map這種性能既好又能動態擴容的程度,代碼就相對復雜很多。
還有注意在使用sync.Map時切忌不要將其拷貝, go源碼中有對sync.Map注釋到” A Map must not be copied after first use.”因為當sync.Map被拷貝之后, Map類型的dirty還是那個map 但是read 和 鎖卻不是之前的read和鎖(都不在一個世界你拿什么保護我), 所以必然導致并發不安全(為了寫博我把sync.Map代碼復制出來一份把私有成員改成可外部訪問的打印指針)
以上是“golang中如何實現并發安全Map以及分段鎖”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。