中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Golang限流庫、漏桶和令牌桶如何使用

發布時間:2023-03-30 11:16:32 來源:億速云 閱讀:132 作者:iii 欄目:開發技術

本篇內容主要講解“Golang限流庫、漏桶和令牌桶如何使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Golang限流庫、漏桶和令牌桶如何使用”吧!

    RateLimit 限流中間件

    為什么需要限流中間件

    在大數據量高并發訪問時,經常會出現服務或接口面對大量的請求而導致數據庫崩潰的情況,甚至引發連鎖反映導致整個系統崩潰。或者有人惡意攻擊網站,大量的無用請求出現會導致緩存穿透的情況出現。使用限流中間件可以在短時間內對請求進行限制數量,起到降級的作用,從而保障了網站的安全性。

    應對大量并發請求的策略

    • 使用消息中間件進行統一限制(降速)

    • 使用限流方案將多余請求返回(限流)

    • 升級服務器

    • 負載均衡升級

    • 等等

    可以看出在代碼已經無法提升的情況下,只能去提升硬件水平。或者改動架構再加一層!也可以使用消息中間件統一處理。而結合看來,限流方案是一種既不需要大幅改動也不需要高額開銷的策略。

    常見的限流方案

    • 令牌桶算法

    • 漏桶算法

    • 滑動窗口算法

    • 等等

    這里主要根據golang的庫介紹令牌桶和漏桶的實現原理以及在實際項目中如何應用。

    漏桶

    引入ratelimit庫

    go get -u go.uber.org/ratelimit

    庫函數源代碼

    // New returns a Limiter that will limit to the given RPS.
    func New(rate int, opts ...Option) Limiter {
    	return newAtomicBased(rate, opts...)
    }
    // newAtomicBased returns a new atomic based limiter.
    func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
    	// TODO consider moving config building to the implementation
    	// independent code.
    	config := buildConfig(opts)
    	perRequest := config.per / time.Duration(rate)
    	l := &atomicLimiter{
    		perRequest: perRequest,
    		maxSlack:   -1 * time.Duration(config.slack) * perRequest,
    		clock:      config.clock,
    	}
    	initialState := state{
    		last:     time.Time{},
    		sleepFor: 0,
    	}
    	atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
    	return l
    }

    該函數使用了函數選項模式對多個結構體對象進行初始化

    首先根據傳入的值來初始化一個桶結構體 rateint傳參 (time.Duration(rate)單位為納秒 = 1/1e9秒)

    初始化過程中包括了

    • 每一滴水需要的時間 perquest = config.per / time.Duration(rate)

    • maxSlack 寬松度(寬松度為負值)-1 * time.Duration(config.slack) * perRequest 松緊度是用來規范等待時間的

    // Clock is the minimum necessary interface to instantiate a rate limiter with
    // a clock or mock clock, compatible with clocks created using
    // github.com/andres-erbsen/clock.
    type Clock interface {
       Now() time.Time
       Sleep(time.Duration)
    }

    同時還需要結構體Clock來記錄當前請求的時間now和此刻的請求所需要花費等待的時間sleep

    type state struct {
       last     time.Time
       sleepFor time.Duration
    }

    state 主要用來記錄上次執行的時間以及當前執行請求需要花費等待的時間(作為中間狀態記錄)

    最重要的Take邏輯

    // Take blocks to ensure that the time spent between multiple
    // Take calls is on average time.Second/rate.
    func (t *atomicLimiter) Take() time.Time {
       var (
          newState state
          taken    bool
          interval time.Duration
       )
       for !taken {
          now := t.clock.Now()
          previousStatePointer := atomic.LoadPointer(&t.state)
          oldState := (*state)(previousStatePointer)
          newState = state{
             last:     now,
             sleepFor: oldState.sleepFor,
          }
          // If this is our first request, then we allow it.
          if oldState.last.IsZero() {
             taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
             continue
          }
          // 計算是否需要進行等待取水操作
          newState.sleepFor += t.perRequest(每兩滴水之間的間隔時間) - now.Sub(oldState.last)(當前時間與上次取水時間的間隔)
           // 如果等待取水時間特別小,就需要松緊度進行維護
          if newState.sleepFor < t.maxSlack {
             newState.sleepFor = t.maxSlack
          }
           // 如果等待時間大于0,就進行更新
          if newState.sleepFor > 0 {
             newState.last = newState.last.Add(newState.sleepFor)
             interval, newState.sleepFor = newState.sleepFor, 0
          }
          taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
       }
       t.clock.Sleep(interval)
       // 最后返回需要等待的時間
        return newState.last
    }

    實現一個Take方法

    • 該Take方法會進行原子性操作(可以理解為加鎖和解鎖),在大量并發請求下仍可以保證正常使用。

    • 記錄下當前的時間 now := t.clock.Now()

    • oldState.last.IsZero() 判斷是不是第一次取水,如果是就直接將state結構體中的值進行返回。而這個結構體中初始化了上次執行時間,如果是第一次取水就作為當前時間直接傳參。

    • 如果 newState.sleepFor 非常小,就會出現問題,因此需要借助寬松度,一旦這個最小值比寬松度小,就用寬松度對取水時間進行維護。

    • 如果newState.sleepFor > 0 就直接更新結構體中上次執行時間newState.last = newState.last.Add(newState.sleepFor)并記錄需要等待的時間interval, newState.sleepFor = newState.sleepFor, 0。

    • 如果允許取水和等待操作,那就說明沒有發生并發競爭的情況,就模擬睡眠時間t.clock.Sleep(interval)。然后將取水的目標時間進行返回,由服務端代碼來判斷是否打回響應或者等待該時間后繼續響應。

    t.clock.Sleep(interval)

    func (c *clock) Sleep(d time.Duration) {<!--{C}%3C!%2D%2D%20%2D%2D%3E--> time.Sleep(d) }

    實際上在一個請求來的時候,限流器就會進行睡眠對應的時間,并在睡眠后將最新取水時間返回。

    實際應用(使用Gin框架)

    func ratelimit1() func(ctx *gin.Context) {
    	r1 := rate1.New(100)
    	return func(ctx *gin.Context) {
    		now := time.Now()
    		//  Take 返回的是一個 time.Duration的時間
    		if r1.Take().Sub(now) > 0 {
    			// 返回的時間比當前的時間還大,說明需要進行等待
    			// 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行
    			// 如果不需要等待請求時間,就直接進行Abort 然后返回
    			response(ctx, http.StatusRequestTimeout, "rate1 limit...")
    			fmt.Println("rate1 limit...")
    			ctx.Abort()
    			return
    		}
    		// 放行
    		ctx.Next()
    	}
    }

    這里你可以進行選擇是否返回。因為Take一定會執行sleep函數,所以當執行take結束后表示當前請求已經接到了水。當前演示使用第一種情況。

    • 如果你的業務要求響應不允許進行等待。那么可以在該請求接完水之后然后,如上例。

    • 如果你的業務允許響應等待,那么該請求等待對應的接水時間后進行下一步。具體代碼就是將if中的內容直接忽略。(建議使用)

    測試代碼

    這里定義了一個響應函數和一個handler函數方便測試

    func response(c *gin.Context, code int, info any) {
       c.JSON(code, info)
    }
    func pingHandler(c *gin.Context) {
       response(c, 200, "ping ok~")
    }

    執行go test -run=Run -v先開啟一個web服務

    func TestRun(t *testing.T) {
       r := gin.Default()
       r.GET("/ping1", ratelimit1(), pingHandler)
       r.GET("/ping2", ratelimit2(), helloHandler)
       _ = r.Run(":4399")
    }

    使用接口壓力測試工具go-wrk進行測試->tsliwowicz/go-wrk: go-wrk

    golang install版本可以直接通過go install github.com/tsliwowicz/go-wrk@latest下載

    使用幫助

       Usage: go-wrk <options> <url>
       Options:
        -H       Header to add to each request (you can define multiple -H flags) (Default )
        -M       HTTP method (Default GET)
        -T       Socket/request timeout in ms (Default 1000)
        -body    request body string or @filename (Default )
        -c       Number of goroutines to use (concurrent connections) (Default 10)
        -ca      CA file to verify peer against (SSL/TLS) (Default )
        -cert    CA certificate file to verify peer against (SSL/TLS) (Default )
        -d       Duration of test in seconds (Default 10)
        -f       Playback file name (Default <empty>)
        -help    Print help (Default false)
        -host    Host Header (Default )
        -http    Use HTTP/2 (Default true)
        -key     Private key file name (SSL/TLS (Default )
        -no-c    Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)
        -no-ka   Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)
        -no-vr   Skip verifying SSL certificate of the server (Default false)
        -redir   Allow Redirects (Default false)
        -v       Print version details (Default false)

    -t 8個線程 -c 400個連接 -n 模擬1k次請求 -d 替換-n 表示連接時間

    輸入go-wrk -t=8 -c=400 -n=1000 http://127.0.0.1:4399/ping1

    可以稍微等待一下水流積攢否則一個請求也不一定能夠響應。

    Golang限流庫、漏桶和令牌桶如何使用

    可以看出,89個請求全部返回。也就是說在一段請求高峰期,不會有請求進行響應。因此我認為既然內部已經睡眠,那么就應該對請求放行處理。限流器實現的比較純粹!

    令牌桶

    引入ratelimit

    go get -u github.com/juju/ratelimit

    初始化

    // NewBucket returns a new token bucket that fills at the
    // rate of one token every fillInterval, up to the given
    // maximum capacity. Both arguments must be
    // positive. The bucket is initially full.
    func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
       return NewBucketWithClock(fillInterval, capacity, nil)
    }
    // NewBucketWithClock is identical to NewBucket but injects a testable clock
    // interface.
    func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
       return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
    }

    進行Bucket桶的初始化。

    / NewBucketWithQuantumAndClock is like NewBucketWithQuantum, but
    // also has a clock argument that allows clients to fake the passing
    // of time. If clock is nil, the system clock will be used.
    func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
       if clock == nil {
          clock = realClock{}
       }
        // 填充速率
       if fillInterval <= 0 {
          panic("token bucket fill interval is not > 0")
       }
        // 最大令牌容量
       if capacity <= 0 {
          panic("token bucket capacity is not > 0")
       }
        // 單次令牌生成量
       if quantum <= 0 {
          panic("token bucket quantum is not > 0")
       }
       return &Bucket{
          clock:           clock,
          startTime:       clock.Now(),
          latestTick:      0,
          fillInterval:    fillInterval,
          capacity:        capacity,
          quantum:         quantum,
          availableTokens: capacity,
       }
    }

    令牌桶初始化過程,初始化結構體 fillInterval(填充速率) cap(最大令牌量) quannum(每次令牌生成量)。

    如果三個變量有一個小于或者等于0的話直接進行報錯返回。在最開始就將當前令牌數初始化為最大容量。

    調用

    // TakeAvailable takes up to count immediately available tokens from the
    // bucket. It returns the number of tokens removed, or zero if there are
    // no available tokens. It does not block.
    func (tb *Bucket) TakeAvailable(count int64) int64 {
       tb.mu.Lock()
       defer tb.mu.Unlock()
       return tb.takeAvailable(tb.clock.Now(), count)
    }

    調用TakeAvailable函數,傳入參數為需要取出的令牌數量,返回參數是實際能夠取出的令牌數量。

    內部實現

    // takeAvailable is the internal version of TakeAvailable - it takes the
    // current time as an argument to enable easy testing.
    func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
       // 如果需要取出的令牌數小于等于零,那么就返回0個令牌
        if count <= 0 {
          return 0
       }
        // 根據時間對當前桶中令牌數進行計算
       tb.adjustavailableTokens(tb.currentTick(now))
        // 計算之后的令牌總數小于等于0,說明當前令牌不足取出,那么就直接返回0個令牌
       if tb.availableTokens <= 0 {
          return 0
       }
        // 如果當前存儲的令牌數量多于請求數量,那么就返回取出令牌數
       if count > tb.availableTokens {
          count = tb.availableTokens
       }
        // 調整令牌數
       tb.availableTokens -= count
       return count
    }

    調整令牌

    // adjustavailableTokens adjusts the current number of tokens
    // available in the bucket at the given time, which must
    // be in the future (positive) with respect to tb.latestTick.
    func (tb *Bucket) adjustavailableTokens(tick int64) {
       lastTick := tb.latestTick
       tb.latestTick = tick
        // 如果當前令牌數大于最大等于容量,直接返回最大容量
       if tb.availableTokens >= tb.capacity {
          return
       }
        // 當前令牌數 += (當前時間 - 上次取出令牌數的時間) * quannum(每次生成令牌量)
       tb.availableTokens += (tick - lastTick) * tb.quantum
        // 如果當前令牌數大于最大等于容量, 將當前令牌數 = 最大容量 然后返回 當前令牌數
       if tb.availableTokens > tb.capacity {
          tb.availableTokens = tb.capacity
       }
       return
    }

    實現原理

    加鎖 defer 解鎖

    判斷count(想要取出的令牌數) 是否小于等于 0,如果是直接返回 0

    調用函數adjustTokens 獲取可用的令牌數量,該函數實現原理:

    • 如果當前令牌數大于最大等于容量,直接返回最大容量

    • 當前令牌數 += (當前時間 - 上次取出令牌數的時間) * quannum(每次生成令牌量)

    • 如果當前令牌數大于最大等于容量, 將當前令牌數 = 最大容量 然后返回 當前令牌數

    如果當前可以取出的令牌數小于等于0 直接返回 0

    如果當前可以取出的令牌數小于當前想要取出的令牌數(count) count = 當前可以取出的令牌數

    當前的令牌數 -= 取出的令牌數(count)

    返回 count

    額外介紹

    take函數,能夠返回等待時間和布爾值,允許欠賬,沒有令牌也可以取出。

    func (tb *Bucket) Take(count int64) time.Duration

    takeMaxDuration函數,可以根據最大等待時間來進行判斷。

    func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)

    測試

    func ratelimit2() func(ctx *gin.Context) {
    	// 生成速率 最大容量
    	r2 := rate2.NewBucket(time.Second, 200)
    	return func(ctx *gin.Context) {
    		//r2.Take() // 允許欠賬,令牌不夠也可以接收請求
    		if r2.TakeAvailable(1) == 1 {
    			// 如果想要取出1個令牌并且能夠取出,就放行
    			ctx.Next()
    			return
    		}
    		response(ctx, http.StatusRequestTimeout, "rate2 limit...")
    		ctx.Abort()
    		return
    	}
    }

    Golang限流庫、漏桶和令牌桶如何使用

    由于壓測速度過于快速,在實際過程中可以根據調整令牌生成速率來進行具體限流!

    到此,相信大家對“Golang限流庫、漏桶和令牌桶如何使用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    大安市| 乌鲁木齐市| 京山县| 镇宁| 五华县| 达拉特旗| 从化市| 宁武县| 彝良县| 射洪县| 汶川县| 改则县| 隆尧县| 天祝| 彝良县| 高青县| 昆山市| 新泰市| 土默特右旗| 宜黄县| 密云县| 汝南县| 闽清县| 涟源市| 城市| 凤城市| 古交市| 濉溪县| 洪雅县| 临夏县| 腾冲县| 锡林浩特市| 秭归县| 梧州市| 榆林市| 晋中市| 故城县| 潼南县| 桑日县| 平果县| 奈曼旗|