您好,登錄后才能下訂單哦!
這篇文章主要講解了“讓消息隊列達到最大吞吐量的方法教程”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“讓消息隊列達到最大吞吐量的方法教程”吧!
寫入消息隊列吞吐量取決于以下兩個方面
最佳吞吐量是讓其中之一打滿,而一般情況下內網帶寬都會非常高,不太可能被打滿,所以自然就是講消息隊列的寫入速度打滿,這就就有兩個點需要平衡
go-zero 的 PeriodicalExecutor
和 ChunkExecutor
就是為了這種情況設計的
批量寫入的消息量大小或者字節數多少
延遲多久寫入
網絡帶寬
消息隊列(比如Kafka)寫入速度
從消息隊列里消費消息的吞吐量取決于以下兩個方面
這里有個核心問題是不能不考慮業務處理速度,而讀取過多的消息到內存里,否則可能會引起兩個問題:
內存占用過高,甚至出現OOM,pod
也是有 memory limit
的
停止 pod
時堆積的消息來不及處理而導致消息丟失
消息隊列的讀取速度,一般情況下消息隊列本身的讀取速度相比于處理消息的速度都是足夠快的
處理速度,這個依賴于業務
借用一下 Rob Pike
的一張圖,這個跟隊列消費異曲同工。左邊4個 gopher
從隊列里取,右邊4個 gopher
接過去處理。比較理想的結果是左邊和右邊速率基本一致,沒有誰浪費,沒有誰等待,中間交換處也沒有堆積。
我們來看看 go-zero
是怎么實現的:
Producer
端
for { select { case <-q.quit: logx.Info("Quitting producer") return default: if v, ok := q.produceOne(producer); ok { q.channel <- v } } }
沒有退出事件就會通過 produceOne
去讀取一個消息,成功后寫入 channel
。利用 chan
就可以很好的解決讀取和消費的銜接問題。
Consumer
端
for { select { case message, ok := <-q.channel: if ok { q.consumeOne(consumer, message) } else { logx.Info("Task channel was closed, quitting consumer...") return } case event := <-eventChan: consumer.OnEvent(event) } }
這里如果拿到消息就去處理,當 ok
為 false
的時候表示 channel
已被關閉,可以退出整個處理循環了。同時我們還在 redis queue
上支持了 pause/resume
,我們原來在社交場景里大量使用這樣的隊列,可以通知 consumer
暫停和繼續。
啟動 queue
,有了這些我們就可以通過控制 producer/consumer
的數量來達到吞吐量的調優了
func (q *Queue) Start() { q.startProducers(q.producerCount) q.startConsumers(q.consumerCount) q.producerRoutineGroup.Wait() close(q.channel) q.consumerRoutineGroup.Wait() }
這里需要注意的是,先要停掉 producer
,再去等 consumer
處理完。
到這里核心控制代碼基本就講完了,其實看起來還是挺簡單的,也可以到 https://github.com/tal-tech/go-zero/tree/master/core/queue 去看完整實現。
基本的使用流程:
創建 producer
或 consumer
啟動 queue
生產消息 / 消費消息
對應到 queue
中,大致如下:
// 生產者創建工廠 producer := newMockedProducer() // 消費者創建工廠 consumer := newMockedConsumer() // 將生產者以及消費者的創建工廠函數傳遞給 NewQueue() q := queue.NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil })
我們看看 NewQueue
需要什么參數:
producer
工廠方法
consumer
工廠方法
將 producer & consumer
的工廠函數傳遞 queue
,由它去負責創建。框架提供了 Producer
和 Consumer
的接口以及工廠方法定義,然后整個流程的控制 queue
實現會自動完成。
message
我們通過自定義一個 mockedProducer
來模擬:
type mockedProducer struct { total int32 count int32 // 使用waitgroup來模擬任務的完成 wait sync.WaitGroup } // 實現 Producer interface 的方法:Produce() func (p *mockedProducer) Produce() (string, bool) { if atomic.AddInt32(&p.count, 1) <= p.total { p.wait.Done() return "item", true } time.Sleep(time.Second) return "", false }
queue
中的生產者編寫都必須實現:
Produce()
:由開發者編寫生產消息的邏輯
AddListener()
:添加事件 listener
message
我們通過自定義一個 mockedConsumer
來模擬:
type mockedConsumer struct { count int32 } func (c *mockedConsumer) Consume(string) error { atomic.AddInt32(&c.count, 1) return nil }
queue
啟動,然后驗證我們上述的生產者和消費者之間的數據是否傳輸成功:
func main() { // 創建 queue q := NewQueue(func() (Producer, error) { return newMockedProducer(), nil }, func() (Consumer, error) { return newMockedConsumer(), nil }) // 啟動panic了也可以確保stop被執行以清理資源 defer q.Stop() // 啟動 q.Start() }
以上就是 queue
最簡易的實現示例。我們通過這個 core/queue
框架實現了基于 redis
和 kafka
等的消息隊列服務,在不同業務場景中經過了充分的實踐檢驗。你也可以根據自己的業務實際情況,實現自己的消息隊列服務。
整體流程如上圖:
全體的通信都由 channel
進行
Producer
和 Consumer
的數量可以設定以匹配不同業務需求
Produce
和 Consume
具體實現由開發者定義,queue
負責整體流程
感謝各位的閱讀,以上就是“讓消息隊列達到最大吞吐量的方法教程”的內容了,經過本文的學習后,相信大家對讓消息隊列達到最大吞吐量的方法教程這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。