您好,登錄后才能下訂單哦!
如何理解Go里面的互斥鎖mutex,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
在鎖的實現中現在越來越多的采用CAS來進行,通過利用處理器的CAS指令來實現對給定變量的值交換來進行鎖的獲取
在多線程并發的情況下很有可能會有線程CAS失敗,通常就會配合for循環采用輪詢的方式去嘗試重新獲取鎖
鎖從公平性上通常會分為公平鎖和非公平鎖,主要取決于在鎖獲取的過程中,先進行鎖獲取的線程是否比后續的線程更先獲得鎖,如果是則就是公平鎖:多個線程按照獲取鎖的順序依次獲得鎖,否則就是非公平性
鎖饑餓是指因為大量線程都同時進行獲取鎖,某些線程可能在鎖的CAS過程中一直失敗,從而長時間獲取不到鎖
上面提到了CAS和輪詢鎖進行鎖獲取的方式,可以發現如果已經有線程獲取了鎖,但是在當前線程在多次輪詢獲取鎖失敗的時候,就沒有必要再繼續進行反復嘗試浪費系統資源,通常就會采用一種排隊機制,來進行排隊等待
在大多數編程語言中針對實現基于CAS的鎖的時候,通常都會采用一個32位的整數來進行鎖狀態的存儲
在go的mutex中核心成員變量只有兩個state和sema,其通過state來進行鎖的計數,而通過sema來實現排隊
type Mutex struct { state int32 sema uint32 }
鎖模式主要分為兩種
描述 | 公平性 | |
---|---|---|
正常模式 | 正常模式下所有的goroutine按照FIFO的順序進行鎖獲取,被喚醒的goroutine和新請求鎖的goroutine同時進行鎖獲取,通常新請求鎖的goroutine更容易獲取鎖 | 否 |
饑餓模式 | 饑餓模式所有嘗試獲取鎖的goroutine進行等待排隊,新請求鎖的goroutine不會進行鎖獲取,而是加入隊列尾部等待獲取鎖 | 是 |
上面可以看到其實在正常模式下,其實鎖的性能是最高的如果多個goroutine進行鎖獲取后立馬進行釋放則可以避免多個線程的排隊消耗 同理在切換到饑餓模式后,在進行鎖獲取的時候,如果滿足一定的條件也會切換回正常模式,從而保證鎖的高性能
在mutex中鎖有三個標志位,其中其二進制位分別位001(mutexLocked)、010(mutexWoken)、100(mutexStarving), 注意這三者并不是互斥的關系,比如一個鎖的狀態可能是鎖定的饑餓模式并且已經被喚醒
mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving
mutex中通過低3位存儲了當前mutex的三種狀態,剩下的29位全部用來存儲嘗試正在等待獲取鎖的goroutine的數量
mutexWaiterShift = iota // 3
喚醒標志其實就是上面說的第二位,喚醒標志主要用于標識當前嘗試獲取goroutine是否有正在處于喚醒狀態的,記得上面公平模式下,當前正在cpu上運行的goroutine可能會先獲取到鎖
當釋放鎖的時候,如果當前有goroutine正在喚醒狀態,則只需要修改鎖狀態為釋放鎖,則處于woken狀態的goroutine就可以直接獲取鎖,否則則需要喚醒一個goroutine, 并且等待這個goroutine修改state狀態為mutexWoken,才退出
如果當前沒有goroutine加鎖,則并且直接進行CAS成功,則直接獲取鎖成功
// Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return }
// 注意這里其實包含兩個信息一個是如果當前已經是鎖定狀態,然后允許自旋iter主要是計數次數實際上只允許自旋4次 // 其實就是在自旋然后等待別人釋放鎖,如果有人釋放鎖,則會立刻進行下面的嘗試獲取鎖的邏輯 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // !awoke 如果當前線程不處于喚醒狀態 // old&mutexWoken == 0如果當前沒有其他正在喚醒的節點,就將當前節點處于喚醒的狀態 // old>>mutexWaiterShift != 0 :右移3位,如果不位0,則表明當前有正在等待的goroutine // atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken)設置當前狀態為喚醒狀態 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } // 嘗試自旋, runtime_doSpin() // 自旋計數 iter++ // 從新獲取狀態 old = m.state continue }
流程走到這里會有兩種可能: 1.鎖狀態當前已經不是鎖定狀態 2.自旋超過指定的次數,不再允許自旋了
new := old if old&mutexStarving == 0 { // 如果當前不是饑餓模式,則這里其實就可以嘗試進行鎖的獲取了|=其實就是將鎖的那個bit位設為1表示鎖定狀態 new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { // 如果當前被鎖定或者處于饑餓模式,則增等待一個等待計數 new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { // 如果當前已經處于饑餓狀態,并且當前鎖還是被占用,則嘗試進行饑餓模式的切換 new |= mutexStarving } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // awoke為true則表明當前線程在上面自旋的時候,修改mutexWoken狀態成功 // 清除喚醒標志位 // 為什么要清除標志位呢? // 實際上是因為后續流程很有可能當前線程會被掛起,就需要等待其他釋放鎖的goroutine來喚醒 // 但如果unlock的時候發現mutexWoken的位置不是0,則就不會去喚醒,則該線程就無法再醒來加鎖 new &^= mutexWoken }
再加鎖的時候實際上只會有一個goroutine加鎖CAS成功,而其他線程則需要重新獲取狀態,進行上面的自旋與喚醒狀態的重新計算,從而再次CAS
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { // 如果原來的狀態等于0則表明當前已經釋放了鎖并且也不處于饑餓模式下 // 實際的二進制位可能是這樣的 1111000, 后面三位全是0,只有記錄等待goroutine的計數器可能會不為0 // 那就表明其實 break // locked the mutex with CAS } // 排隊邏輯,如果發現waitStatrTime不為0,則表明當前線程之前已經再排隊來,后面可能因為 // unlock被喚醒,但是本次依舊沒獲取到鎖,所以就將它移動到等待隊列的頭部 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 這里就會進行排隊等待其他節點進行喚醒 runtime_SemacquireMutex(&m.sema, queueLifo) // 如果等待超過指定時間,則切換為饑餓模式 starving=true // 如果一個線程之前不是饑餓狀態,并且也沒超過starvationThresholdNs,則starving為false // 就會觸發下面的狀態切換 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 重新獲取狀態 old = m.state if old&mutexStarving != 0 { // 如果發現當前已經是饑餓模式,注意饑餓模式喚醒的是第一個goroutine // 當前所有的goroutine都在排隊等待 // 一致性檢查, if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 獲取當前的模式 delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // 如果當前goroutine不是饑餓狀態,就從饑餓模式切換會正常模式 // 就從mutexStarving狀態切換出去 delta -= mutexStarving } // 最后進行cas操作 atomic.AddInt32(&m.state, delta) break } // 重置計數 awoke = true iter = 0 } else { old = m.state }
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // 直接進行cas操作 new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { // 如果釋放鎖并且不是饑餓模式 old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { // 如果已經有等待者并且已經被喚醒,就直接返回 return } // 減去一個等待計數,然后將當前模式切換成mutexWoken new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { // 喚醒一個goroutine runtime_Semrelease(&m.sema, false) return } old = m.state } } else { // 喚醒等待的線程 runtime_Semrelease(&m.sema, true) } }
看完上述內容,你們掌握如何理解Go里面的互斥鎖mutex的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。