在Golang中使用RabbitMQ實現任務分發與負載均衡的策略可以通過以下步驟實現:
安裝RabbitMQ: 根據你的操作系統,在RabbitMQ官網上下載并安裝RabbitMQ。
創建生產者和消費者: 在Golang中,使用RabbitMQ的AMQP庫可以創建生產者和消費者。生產者負責將任務放入隊列中,消費者則從隊列中取出任務并執行。
// 生產者
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
// 連接到RabbitMQ服務器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
// 創建一個channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
// 聲明一個隊列
q, err := ch.QueueDeclare(
"task_queue", // 隊列名
true, // 是否持久化
false, // 是否自動刪除
false, // 是否獨占連接
false, // 是否阻塞
nil, // 額外的屬性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
// 發布消息到隊列中
body := "Hello RabbitMQ!"
err = ch.Publish(
"", // 交換器
q.Name, // 路由鍵
false, // 強制性
false, // 立即發送
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久化消息
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatalf("Failed to publish a message: %v", err)
}
log.Printf("Sent message: %s", body)
}
// 消費者
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
// 連接到RabbitMQ服務器
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
// 創建一個channel
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
// 聲明一個隊列
q, err := ch.QueueDeclare(
"task_queue", // 隊列名
true, // 是否持久化
false, // 是否自動刪除
false, // 是否獨占連接
false, // 是否阻塞
nil, // 額外的屬性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
// 設置每次從隊列中獲取的消息數量
err = ch.Qos(
1, // 每次獲取的數量
0, // 預取數量
false, // 是否全局
)
if err != nil {
log.Fatalf("Failed to set QoS: %v", err)
}
// 消費消息
msgs, err := ch.Consume(
q.Name, // 隊列名
"", // 消費者標識
false, // 自動回復
false, // 獨占連接
false, // 不阻塞
false, // 額外的屬性
nil, // 可選項
)
if err != nil {
log.Fatalf("Failed to consume messages: %v", err)
}
forever := make(chan bool)
// 處理并執行任務
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 模擬任務執行,這里可以替換為實際的任務處理邏輯
doWork(d.Body)
log.Printf