中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

golang高并發系統限流策略漏桶和令牌桶算法源碼分析

發布時間:2022-06-17 13:59:47 來源:億速云 閱讀:126 作者:iii 欄目:開發技術

本篇內容主要講解“golang高并發系統限流策略漏桶和令牌桶算法源碼分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“golang高并發系統限流策略漏桶和令牌桶算法源碼分析”吧!

漏桶算法

漏桶算法比較好理解,假設我們現在有一個水桶,我們向這個水桶里添水,雖然我們我們無法預計一次會添多少水,也無法預計水流入的速度,但是可以固定出水的速度,不論添水的速率有多大,都按照固定的速率流出,如果桶滿了,溢出的上方水直接拋棄。我們把水當作HTTP請求,每次都把請求放到一個桶中,然后以固定的速率處理請求,說了這么多,不如看一個圖加深理解:

golang高并發系統限流策略漏桶和令牌桶算法源碼分析

原理其實很簡單,就看我們怎么實現它了,uber團隊有一個開源的uber-go/ratelimit庫,這個庫就是漏桶的一種實現,下面我們一起來看一看他的實現思路。

樣例

學習一個新東西的時候,往往是從會用開始的,慢慢才能明白其實現原理,所以我們先來看看這個庫是怎樣使用的,這里我們直接提供一個實際使用例子,配合Gin框架,我們添加一個限流中間件,來達到請求限流的作用,測試代碼如下:

// 定義全局限流器對象
var rateLimit ratelimit.Limiter
// 在 gin.HandlerFunc 加入限流邏輯
func leakyBucket() gin.HandlerFunc {
	prev := time.Now()
	return func(c *gin.Context) {
		now := rateLimit.Take()
		fmt.Println(now.Sub(prev)) // 為了打印時間間隔
		prev = now // 記錄上一次的時間,沒有這個打印的會有問題
	}
}
func main() {
	rateLimit = ratelimit.New(10)
	r := gin.Default()
	r.GET("/ping", leakyBucket(), func(c *gin.Context) {
		c.JSON(200, true)
	})
	r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}

我們簡單使用壓測工具ab測試一下:ab -n 10 -c 2 http://127.0.0.1:8080/ping,執行結果部分如下:

golang高并發系統限流策略漏桶和令牌桶算法源碼分析

golang高并發系統限流策略漏桶和令牌桶算法源碼分析

觀察結果可知,每次處理請求的時間間隔是10ms,并且后面的請求耗時越來越久,為什么會這樣呢? 這里先賣個小關子,看完uber的實現你就知道了~

源碼實現

我們首先來看一下其核心結構:

type limiter struct {
	sync.Mutex
	last       time.Time
	sleepFor   time.Duration
	perRequest time.Duration
	maxSlack   time.Duration
	clock      Clock
}
type Limiter interface {
	// Take should block to make sure that the RPS is met.
	Take() time.Time
}

限制器接口只提供了一個方法take(),take()方法會阻塞確保兩次請求之間的時間走完,具體實現我們在下面進行分析。實現限制器接口的結構體中各個字段的意義如下:

sync.Mutext:互斥鎖,控制并發的作用

last:記錄上一次的時刻

sleepFor:距離處理下一次請求需要等待的時間

perRequest:每次請求的時間間隔

maxSlack:最大松弛量,用來解決突發流量

clock:一個時鐘或模擬時鐘,提供了now和sleep方法,是實例化速率限制器

要是用該限制器,首先需要通過New方法進行初始化,一個必傳的參數是rate,代表的是每秒請求量(RPS),還有一個可選參數,參數類型option,也就是我們可以自定義limit,不過一般使用場景不多,這里就不過多介紹了。我主要看一下他是怎么保證固定速率的,截取New方法部分代碼如下:

l := &limiter{
		perRequest: time.Second / time.Duration(rate),
		maxSlack:   -10 * time.Second / time.Duration(rate),
	}

根據我們傳入的請求數量,能計算出1s內要通過n個請求,每個請求之間的間隔時間是多少,這樣在take方法中就可以根據這個字段來處理請求的固定速率問題,這里還初始化了最大松弛化字段,他的值是負數,默認最大松弛量是10個請求的時間間隔。

接下來我們主要看一下take方法:

func (t *limiter) Take() time.Time {
	t.Lock()
	defer t.Unlock()
	now := t.clock.Now()
	if t.last.IsZero() {
		t.last = now
		return t.last
	}
	t.sleepFor += t.perRequest - now.Sub(t.last)
	if t.sleepFor < t.maxSlack {
		t.sleepFor = t.maxSlack
	}
	if t.sleepFor > 0 {
		t.clock.Sleep(t.sleepFor)
		t.last = now.Add(t.sleepFor)
		t.sleepFor = 0
	} else {
		t.last = now
	}
	return t.last
}

take()方法的執行步驟如下:

  • 為了控制并發,所以進入該方法就需要進行上鎖,該鎖的粒度比較大,整個方法都加上了鎖

  • 通過IsZero方法來判斷當前是否是第一次請求,如果是第一次請求,直接取now時間即可返回。

  • 如果不是第一次請求,就需要計算距離處理下一次請求需要等待的時間,這里有一個要注意點的是累加需要等待的時間,目的是可以給后面的抵消使用

  • 如果當前累加需要等待的時間大于最大松弛量了,將等待的時間設置為最大松弛量的時間。

  • 如果當前請求多余的時間無法完全抵消此次的所需量,調用sleep方法進行阻塞,同時清空等待的時間。如果sleepFor小于0,說明此次請求時間間隔大于預期間隔,也就說無需等待可以直接處理請求。

步驟其實不是很多,主要需要注意一個知識點 &mdash;&mdash; 最大松弛量。

漏桶算法有個天然缺陷就是無法應對突發流量(勻速,兩次請求 req1 和 req2 之間的延遲至少應該 >=perRequest),舉個例子說明:假設我們現在有三個請求req1、req2、req3按順序處理,每個請求處理間隔為100ms,req1請求處理完成之后150ms,req2請求到來,依據限速策略可以對 req2 立即處理,當 req2 完成后,50ms 后, req3 到來,這個時候距離上次請求還不足 100ms,因此還需要等待 50ms 才能繼續執行, 但是,對于這種情況,實際上這三個請求一共消耗了 250ms 才完成,并不是預期的 200ms。

golang高并發系統限流策略漏桶和令牌桶算法源碼分析

對于上面這種情況,我們可以把之前間隔比較長的請求的時間勻給后面的請求判斷限流時使用,減少請求等待的時間了,但是當兩個請求之間到達的間隔比較大時,就會產生很大的可抵消時間,以至于后面大量請求瞬間到達時,也無法抵消這個時間,那樣就已經失去了限流的意義,所以引入了最大松弛量 (maxSlack) 的概念, 該值為負值,表示允許抵消的最長時間,防止以上情況的出現。

以上就是漏桶實現的基本思路了,整體還是很簡單的,你學會了嗎?

令牌桶算法

令牌桶其實和漏桶的原理類似,令牌桶就是想象有一個固定大小的桶,系統會以恒定速率向桶中放 Token,桶滿則暫時不放。從網上找了圖,表述非常恰當:

golang高并發系統限流策略漏桶和令牌桶算法源碼分析

關于令牌桶限流算法的實現,Github有一個高效的基于令牌桶限流算法實現的限流庫:github.com/juju/ratelimit,Golang的timer/rate也是令牌桶的一種實現,本文就不介紹juju/ratelimit庫了,有興趣的自己學習一下的他的實現思想吧,我們主要來看一看time/rate是如何實現的。

樣例

還是老樣子,我們還是結合gin寫一個限流中間件看看他是怎么使用的,例子如下:

import (
	"net/http"
	"time"
	"github.com/gin-gonic/gin"
	"golang.org/x/time/rate"
)
var rateLimit *rate.Limiter
func tokenBucket() gin.HandlerFunc {
	return func(c *gin.Context) {
		if rateLimit.Allow() {
			c.String(http.StatusOK, "rate limit,Drop")
			c.Abort()
			return
		}
		c.Next()
	}
}
func main() {
	limit := rate.Every(100 * time.Millisecond)
	rateLimit = rate.NewLimiter(limit, 10)
	r := gin.Default()
	r.GET("/ping", tokenBucket(), func(c *gin.Context) {
		c.JSON(200, true)
	})
	r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}

上面的例子我們首先調用NewLimiter方法構造一個限流器,第一個參數是r limit,代表每秒可以向Token桶中產生多少token,第二個參數是b int,代表Token桶的容量大小,對于上面的例子,表示每100ms往桶中放一個token,也就是1s鐘產生10個,桶的容量就是10。消費token的方法這里我們使用Allow方法,Allow 實際上就是 AllowN(time.Now(),1),AllowN 方法表示,截止到某一時刻,目前桶中數目是否至少為 n 個,滿足則返回 true,同時從桶中消費 n 個 token。反之返回不消費 Token。對應上面的例子,當桶中的數目不足于1個時,就會丟掉該請求。

源碼剖析

Limit類型

time/rate自定義了一個limit類型,其實他本質就是float64的別名,Limit定了事件的最大頻率,表示每秒事件的數據量,0就表示無限制。Inf是無限的速率限制;它允許所有事件(即使突發為0)。還提供 Every 方法來指定向 Token 桶中放置 Token 的間隔,計算出每秒時間的數據量。

type Limit float64
// Inf is the infinite rate limit; it allows all events (even if burst is zero).
const Inf = Limit(math.MaxFloat64)
// Every converts a minimum time interval between events to a Limit.
func Every(interval time.Duration) Limit {
	if interval &lt;= 0 {
		return Inf
	}
	return 1 / Limit(interval.Seconds())
}

Limiter結構體

type Limiter struct {
	mu     sync.Mutex
	limit  Limit
	burst  int
	tokens float64
	// last is the last time the limiter's tokens field was updated
	last time.Time
	// lastEvent is the latest time of a rate-limited event (past or future)
	lastEvent time.Time
}

各個字段含義如下:

  • mu:互斥鎖、為了控制并發

  • limit:每秒允許處理的事件數量,即每秒處理事件的頻率

  • burst:令牌桶的最大數量,如果burst為0,并且limit == Inf,則允許處理任何事件,否則不允許

  • tokens:令牌桶中可用的令牌數量

  • last:記錄上次limiter的tokens被更新的時間

  • lastEvent:lastEvent記錄速率受限制(桶中沒有令牌)的時間點,該時間點可能是過去的,也可能是將來的(Reservation預定的結束時間點)

Reservation結構體

type Reservation struct {
	ok        bool
	lim       *Limiter
	tokens    int
	timeToAct time.Time
	// This is the Limit at reservation time, it can change later.
	limit Limit
}

各個字段含義如下:

ok:到截至時間是否可以獲取足夠的令牌

lim:limiter對象

tokens:需要獲取的令牌數量

timeToAct:需要等待的時間點

limit:代表預定的時間,是可以更改的。

reservation就是一個預定令牌的操作,timeToAct是本次預約需要等待到的指定時間點才有足夠預約的令牌。

Limiter消費token

Limiter有三個token的消費方法,分別是Allow、Reserve和Wait,最終三種消費方式都調用了 reserveN 、advance這兩個方法來生成和消費 Token。所以我們主要看看reserveN、advance函數的具體實現。

advance方法的實現:

func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
	//last不能在當前時間now之后,否則計算出來的elapsed為負數,會導致令牌桶數量減少
  last := lim.last
	if now.Before(last) {
		last = now
	}
	//根據令牌桶的缺數計算出令牌桶未進行更新的最大時間
	maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
	elapsed := now.Sub(last) //令牌桶未進行更新的時間段
	if elapsed > maxElapsed {
		elapsed = maxElapsed
	}
	//根據未更新的時間(未向桶中加入令牌的時間段)計算出產生的令牌數
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta //計算出可用的令牌數
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}
	return now, last, tokens
}

advance方法的作用是更新令牌桶的狀態,計算出令牌桶未更新的時間(elapsed),根據elapsed算出需要向桶中加入的令牌數delta,然后算出桶中可用的令牌數newTokens.

reserveN方法的實現:reserveN是 AllowN, ReserveN及 WaitN的輔助方法,用于判斷在maxFutureReserve時間內是否有足夠的令牌。

// @param n 要消費的token數量
// @param maxFutureReserve 愿意等待的最長時間
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
	// 如果沒有限制
	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true, //桶中有足夠的令牌
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
	//更新令牌桶的狀態,tokens為目前可用的令牌數量
	now, last, tokens := lim.advance(now)
  // 計算取完之后桶還能剩能下多少token
	tokens -= float64(n)
	var waitDuration time.Duration
  // 如果token < 0, 說明目前的token不夠,需要等待一段時間
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}
	ok := n <= lim.burst && waitDuration <= maxFutureReserve
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
  // timeToAct表示當桶中滿足token數目等于n的時間
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}
  // 更新桶里面的token數目
	// 更新last時間
	// lastEvent
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}
	lim.mu.Unlock()
	return r
}

上面的代碼我已經進行了注釋,這里在總結一下流程:

  • 首選判斷是否擁有速率限制,沒有速率限制也就是桶中一致擁有足夠的令牌。

  • 計算從上次取 Token 的時間到當前時刻,期間一共新產生了多少 Token:我們只在取 Token 之前生成新的 Token,也就意味著每次取Token的間隔,實際上也是生成 Token 的間隔。我們可以利用 tokensFromDuration, 輕易的算出這段時間一共產生 Token 的數目。所以當前 Token 數目 = 新產生的 Token 數目 + 之前剩余的 Token 數目 - 要消費的 Token 數目。

  • 如果消費后剩余 Token 數目大于零,說明此時 Token 桶內仍不為空,此時 Token 充足,無需調用側等待。 如果 Token 數目小于零,則需等待一段時間。那么這個時候,我們可以利用 durationFromTokens 將當前負值的 Token 數轉化為需要等待的時間。

  • 將需要等待的時間等相關結果返回給調用方

其實整個過程就是利用了 Token 數可以和時間相互轉化 的原理。而如果 Token 數為負,則需要等待相應時間即可。

上面提到了durationFromTokens、tokensFromDuration這兩個方法,是關鍵,他們的實現如下:

func (limit Limit) durationFromTokens(tokens float64) time.Duration {
	seconds := tokens / float64(limit)
	return time.Nanosecond * time.Duration(1e9*seconds)
}
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
	// Split the integer and fractional parts ourself to minimize rounding errors.
	// See golang.org/issues/34861.
	sec := float64(d/time.Second) * float64(limit)
	nsec := float64(d%time.Second) * float64(limit)
	return sec + nsec/1e9
}
  • durationFromTokens:功能是計算出生成N 個新的 Token 一共需要多久。

  • tokensFromDuration:給定一段時長,這段時間一共可以生成多少個 Token。

細心的網友會發現tokensFromDuration方法既然是計算一段時間一共可以生成多少個 Token,為什么不直接進行相乘呢?其實Golang最初的版本就是采用d.Seconds() * float64(limit)直接相乘實現的,雖然看上去一點問題沒有,但是這里是兩個小數相乘,會帶來精度損失,所以采用現在這種方法實現,分別求出秒的整數部分和小數部分,進行相乘后再相加,這樣可以得到最精確的精度。

limiter歸還Token

既然我們可以消費Token,那么對應也可以取消此次消費,將token歸還,當調用 Cancel() 函數時,消費的 Token 數將會盡可能歸還給 Token 桶。歸還也并不是那么簡單,接下我們我們看看歸還token是如何實現的。

func (r *Reservation) CancelAt(now time.Time) {
	if !r.ok {
		return
	}
	r.lim.mu.Lock()
	defer r.lim.mu.Unlock()
  /*
  1.如果無需限流
	2. tokens為0 (需要獲取的令牌數量為0)
	3. 已經過了截至時間
	以上三種情況無需處理取消操作
	*/
	if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
		return
	}
	//計算出需要還原的令牌數量
	//這里的r.lim.lastEvent可能是本次Reservation的結束時間,也可能是后來的Reservation的結束時間,所以要把本次結束時間點(r.timeToAct)之后產生的令牌數減去
	restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
  // 當小于0,表示已經都預支完了,不能歸還了
	if restoreTokens &lt;= 0 {
		return
	}
	//從新計算令牌桶的狀態
	now, _, tokens := r.lim.advance(now)
	//還原當前令牌桶的令牌數量,當前的令牌數tokens加上需要還原的令牌數restoreTokens
	tokens += restoreTokens
  //如果tokens大于桶的最大容量,則將tokens置為桶的最大容量
	if burst := float64(r.lim.burst); tokens &gt; burst {
		tokens = burst
	}
	// update state
	r.lim.last = now //記錄桶的更新時間
	r.lim.tokens = tokens //更新令牌數量
 // 如果都相等,說明跟沒消費一樣。直接還原成上次的狀態吧
	if r.timeToAct == r.lim.lastEvent {
		prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
		if !prevEvent.Before(now) {
			r.lim.lastEvent = prevEvent
		}
	}
	return
}

注釋已經添加,就不在詳細解釋了,重點是這一行代碼:

restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)),

  • r.tokens指的是本次消費的token數,

  • r.timeToAcr指的是Token桶可以滿足本次消費數目的時刻,也就是消費的時刻+等待的時長

  • r.lim.lastEvent指的是最近一次消費的timeToAct的值,

通過r.limit.tokensFromDuration方法得出的結果指的是從該次消費到當前時間,一共又消費了多少Token數目,所以最終得出這一段的代碼含義是:

要歸還的Token = 該次消費的Token - 新消費的token。

到此,相信大家對“golang高并發系統限流策略漏桶和令牌桶算法源碼分析”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

温宿县| 张掖市| 马龙县| 灵宝市| 马关县| 红安县| 凌源市| 郁南县| 陈巴尔虎旗| 新建县| 大埔区| 樟树市| 晋城| 彭州市| 静安区| 泸溪县| SHOW| 永善县| 收藏| 磴口县| 太湖县| 沾益县| 龙里县| 罗甸县| 鲜城| 和硕县| 陕西省| 酉阳| 神木县| 布尔津县| 全南县| 西吉县| 且末县| 晋州市| 鲁甸县| 松滋市| 始兴县| 多伦县| 大化| 自治县| 峡江县|