中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

讓消息隊列達到最大吞吐量的方法教程

發布時間:2021-10-11 10:31:24 來源:億速云 閱讀:116 作者:iii 欄目:編程語言

這篇文章主要講解了“讓消息隊列達到最大吞吐量的方法教程”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“讓消息隊列達到最大吞吐量的方法教程”吧!

關于吞吐量的一些思考

  • 寫入消息隊列吞吐量取決于以下兩個方面

    最佳吞吐量是讓其中之一打滿,而一般情況下內網帶寬都會非常高,不太可能被打滿,所以自然就是講消息隊列的寫入速度打滿,這就就有兩個點需要平衡

    go-zero 的 PeriodicalExecutorChunkExecutor 就是為了這種情況設計的

    • 批量寫入的消息量大小或者字節數多少

    • 延遲多久寫入

    • 網絡帶寬

    • 消息隊列(比如Kafka)寫入速度

  • 從消息隊列里消費消息的吞吐量取決于以下兩個方面

    這里有個核心問題是不能不考慮業務處理速度,而讀取過多的消息到內存里,否則可能會引起兩個問題:

    • 內存占用過高,甚至出現OOM,pod 也是有 memory limit

    • 停止 pod 時堆積的消息來不及處理而導致消息丟失

    • 消息隊列的讀取速度,一般情況下消息隊列本身的讀取速度相比于處理消息的速度都是足夠快的

    • 處理速度,這個依賴于業務

解決方案和實現

讓消息隊列達到最大吞吐量的方法教程

借用一下 Rob Pike 的一張圖,這個跟隊列消費異曲同工。左邊4個 gopher 從隊列里取,右邊4個 gopher 接過去處理。比較理想的結果是左邊和右邊速率基本一致,沒有誰浪費,沒有誰等待,中間交換處也沒有堆積。

我們來看看 go-zero 是怎么實現的:

  • Producer

	for {
		select {
		case <-q.quit:
			logx.Info("Quitting producer")
			return
		default:
			if v, ok := q.produceOne(producer); ok {
				q.channel <- v
			}
		}
	}

沒有退出事件就會通過 produceOne 去讀取一個消息,成功后寫入 channel。利用 chan 就可以很好的解決讀取和消費的銜接問題。

  • Consumer

	for {
		select {
		case message, ok := <-q.channel:
			if ok {
				q.consumeOne(consumer, message)
			} else {
				logx.Info("Task channel was closed, quitting consumer...")
				return
			}
		case event := <-eventChan:
			consumer.OnEvent(event)
		}
	}

這里如果拿到消息就去處理,當 okfalse 的時候表示 channel 已被關閉,可以退出整個處理循環了。同時我們還在 redis queue 上支持了 pause/resume,我們原來在社交場景里大量使用這樣的隊列,可以通知 consumer 暫停和繼續。

  • 啟動 queue,有了這些我們就可以通過控制 producer/consumer 的數量來達到吞吐量的調優了

func (q *Queue) Start() {
	q.startProducers(q.producerCount)
	q.startConsumers(q.consumerCount)

	q.producerRoutineGroup.Wait()
	close(q.channel)
	q.consumerRoutineGroup.Wait()
}

這里需要注意的是,先要停掉 producer,再去等 consumer 處理完。

到這里核心控制代碼基本就講完了,其實看起來還是挺簡單的,也可以到 https://github.com/tal-tech/go-zero/tree/master/core/queue 去看完整實現。

使用

基本的使用流程:

  1. 創建 producerconsumer

  2. 啟動 queue

  3. 生產消息 / 消費消息

對應到 queue 中,大致如下:

創建 queue

// 生產者創建工廠
producer := newMockedProducer()
// 消費者創建工廠
consumer := newMockedConsumer()
// 將生產者以及消費者的創建工廠函數傳遞給 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
  return producer, nil
}, func() (Consumer, error) {
  return consumer, nil
})

我們看看 NewQueue 需要什么參數:

  1. producer 工廠方法

  2. consumer 工廠方法

producer & consumer 的工廠函數傳遞 queue ,由它去負責創建。框架提供了 ProducerConsumer 的接口以及工廠方法定義,然后整個流程的控制 queue 實現會自動完成。

生產 message

我們通過自定義一個 mockedProducer 來模擬:

type mockedProducer struct {
	total int32
	count int32
  // 使用waitgroup來模擬任務的完成
	wait  sync.WaitGroup
}
// 實現 Producer interface 的方法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
	if atomic.AddInt32(&p.count, 1) <= p.total {
		p.wait.Done()
		return "item", true
	}
	time.Sleep(time.Second)
	return "", false
}

queue 中的生產者編寫都必須實現:

  • Produce():由開發者編寫生產消息的邏輯

  • AddListener():添加事件 listener

消費 message

我們通過自定義一個 mockedConsumer 來模擬:

type mockedConsumer struct {
	count  int32
}

func (c *mockedConsumer) Consume(string) error {
	atomic.AddInt32(&c.count, 1)
	return nil
}

啟動 queue

啟動,然后驗證我們上述的生產者和消費者之間的數據是否傳輸成功:

func main() {
	// 創建 queue
	q := NewQueue(func() (Producer, error) {
		return newMockedProducer(), nil
	}, func() (Consumer, error) {
		return newMockedConsumer(), nil
	})
  // 啟動panic了也可以確保stop被執行以清理資源
  defer q.Stop()
	// 啟動
	q.Start()
}

以上就是 queue 最簡易的實現示例。我們通過這個 core/queue 框架實現了基于 rediskafka 等的消息隊列服務,在不同業務場景中經過了充分的實踐檢驗。你也可以根據自己的業務實際情況,實現自己的消息隊列服務。

整體設計

讓消息隊列達到最大吞吐量的方法教程

整體流程如上圖:

  1. 全體的通信都由 channel 進行

  2. ProducerConsumer 的數量可以設定以匹配不同業務需求

  3. ProduceConsume 具體實現由開發者定義,queue 負責整體流程

感謝各位的閱讀,以上就是“讓消息隊列達到最大吞吐量的方法教程”的內容了,經過本文的學習后,相信大家對讓消息隊列達到最大吞吐量的方法教程這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

go
AI

胶州市| 镇平县| 定安县| 林西县| 白水县| 二连浩特市| 岳阳市| 长宁县| 冷水江市| 聂拉木县| 房山区| 旬阳县| 临湘市| 岳西县| 碌曲县| 内江市| 介休市| 柯坪县| 高安市| 皋兰县| 安化县| 景德镇市| 曲沃县| 白河县| 肥城市| 沙湾县| 泰州市| 丹凤县| 长子县| 怀仁县| 固镇县| 阳新县| 梁山县| 平顶山市| 巫山县| 内江市| 吉隆县| 临桂县| 商丘市| 崇仁县| 乌拉特前旗|