中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

消息隊列原理之如何掌握rabbitmq

發布時間:2021-10-21 13:52:42 來源:億速云 閱讀:123 作者:iii 欄目:編程語言

這篇文章主要講解了“消息隊列原理之如何掌握rabbitmq”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“消息隊列原理之如何掌握rabbitmq”吧!

介紹

RabbitMQ 是一個由 Erlang 開發的 AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)的開源實現,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。支持多種客戶端語言。

架構

整體架構對照下面的圖說明

消息隊列原理之如何掌握rabbitmq

先看看圖片上各個名次的解釋:

  • Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸,簡單來說就是消息隊列服務器實體。

  • Connection: 客戶端與 Rabbitmq Broker 直接的 TCP 連接,通常一個客戶端與 Broker 之間只需要一個連接即可。

  • Channel: 消息通道,在客戶端的每個連接里,可建立多個channel,最好每個線程都用獨立的Channel,后續的對 QueueExchange 的操作都是在 Channel 中完成的。

  • Producer: 消息生產者,通過和 Broker 建立 Connection 和 Channel ,向 Exchange 發送消息。

  • Consumer: 消息消費者,通過和 Broker 建立 Connection 和 Channel,從 Queue 中消費消息。

  • Exchange: 消息交換機,按照一定的策略把 Producer 生產的消息投遞到 Queue 中,等待消費者消費。

  • Queue: 消息隊列載體,每個消息都會被投入到一個或多個隊列。

  • Vhost: 虛擬主機,一個broker里可以開設多個vhost,用作權限分離,把不同的系統使用的rabbitmq區分開,共用一個消息隊列服務器,但看上去就像各自在用不用的rabbitmq服務器一樣。

  • Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。

  • RoutingKey:路由關鍵字,生產者在將消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,而這個routing key需要與Exchange Type及binding key聯合使用才能最終生效。

這里面比較難理解的概念是 RoutingKey,Exchange,Binding ,消費發送時不會直接發送給 Queue ,而是先發送給 Exchange,由 Exchange 按照一定的規則投遞到與它綁定的 Queue 中,那這個規則是什么呢? 規則就與 Exchange 的 Type、BindingRoutingKey 相關,Exchange 支持的類型有 4 種,direct,fanout,topic,headers,含義如下:

  • direct: QueueExchange 在綁定時需要指定一個 key, 我們稱為 BindkeyProducerExchange 發送消息時,也需要指定一個 key ,這個 key 就是 Routekey。這種模式下 Exchange 會把消息投遞給 RoutekeyBindkey 相同的隊列

  • fanout: 類似于廣播的方式,會把消息投遞給和 Exchange 綁定的所有隊列,不需要檢查 RoutekeyBindkey

  • topic: 類似于組播的方式,這種模式下 Bingkey 支持模糊匹配,* 代表匹配一個任意詞組#代表匹配0個或多個詞組。如 Producer 產生一條 RouteKey 為 benz.car 的消息, 同時這個 Exchange 綁定了3組隊列(請注意是3組不是3個,意思是Exchange可以和同一個Queue進行多次綁定,通過Bindkey 的不同,它們之間是多對多的關系),Bindkey 分別為: car ,*.car ,benz.car ,那么會把這個消息投遞到 *.carbenz.car 對應的 Queue 中。

  • headers: 這個類型 RoutekeyBindkey 的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配。

對照上面圖和名次解釋應該比較清晰明了了,下面我們通過幾個例子說明如何使用。

用法(golang)

direct

消息隊列原理之如何掌握rabbitmq

先看看 Rabbitmq 默認的 exchange ,其中第一個(AMQP default) 是默認的,默認綁定了所有的 Queue ,會把消息投遞到 Routekey 對應的隊列中,即: Routekey==QueueName

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func handlerError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

var url = "amqp://username:password@ip:port"

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	queueNameCar := "car"
	if _, err := channel.QueueDeclare(queueNameCar, false, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decare Queue")
	}

	if err := channel.Publish("", queueNameCar, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
}
  • 這里是一個完整的 Demo, 后面只會提供main() 函數的示例代碼,其他的和這里這里類似。

  • 申明了一個名稱為 car 的消息隊列,并沒有做任何的綁定,往 defalut exchange 發送一條消息,routekey 為 car ,可以看到和隊列名相同。

  • 為了方便演示,結果以圖片的方式展現,可以看到這里有 car 的隊列,并且有一條消息。 消息隊列原理之如何掌握rabbitmq

在創建隊列有幾個參數可以關注一下 消息隊列原理之如何掌握rabbitmq

  • Durability: 持久化,是否將隊列持久化到磁盤,當選擇持久化時當 rabbitmq 重啟了,這個隊列還在,否則當重啟了之后這個隊列就沒有了,需要重新創建,這個需要設計程序時考慮到。

  • Auto delete: 當其中一個消費者已經完成之后,會刪除這個隊列并斷開與其他的消費者的連接。

  • Arguments:

    • x-message-ttl: 消息的過期時間,發布到隊列中的消息在被丟棄之前可以存活多久。

    • x-expires: 隊列的過期時間,一個隊列在多長時間內未使用會被自動刪除。

    • x-max-length: 隊列的長度,最多剋容納多少條消息。

    • x-max-length-bytes: 隊列最大可以包含多大的消息。

    • x-dead-letter-exchange: 當消息過期或者被客戶端reject 之后應該重新投遞到那個exchange ,類似與一個producer發送消息時選擇exchange

    • x-dead-letter-routing-key: 當消息過期或者被客戶端reject 之后重新投遞時的 Routekey,類似與一個producer發送消息時設置routekey,默認是原消息的 routekey

    • x-max-priority: 消息的優先級設置,設置可以支持的最大優先級,如設置為10,則可以在發送消息設置優先級,可以根據優先級處理消息,默認為空,當為空時則不支持優先級

    • x-queue-mode: 將隊列設置為懶惰模式,盡可能多地將消息保留在磁盤上,以減少RAM的使用量;如果不設置,隊列將保留內存中的緩存,以盡可能快地傳遞消息。

我們自己創建一個 direct 類型的 exchange 并綁定一些隊列看看是什么效果。

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	directExchangeNameCar := "direct.car"
	if err := channel.ExchangeDeclare(directExchangeNameCar, "direct", true, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decalare exchange")
	}

	queueNameCar := "car"
	queueNameBigCar := "big-car"
	queueNameMiddleCar := "middle-car"
	queueNameSmallCar := "small-car"
	channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)

	if err := channel.QueueBind(queueNameCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "big.car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameMiddleCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameMiddleCar, "middler.car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameSmallCar, "car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameSmallCar, "small.car", directExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.Publish(directExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
}
  • 代碼中申明了 1 一個 Exchange ,4個 Queue,7個 Binding ,其中一個 Binding 詳情如下: 消息隊列原理之如何掌握rabbitmq

  • 可以看到向這個 Exchange 中發消息,Routekey 為 car ,匹配的隊列有個,那么這4個隊列中都應該有消息才對 消息隊列原理之如何掌握rabbitmq 和我們的設想是一直

Queue 的創建上面已經講過了,這里有 Exchange 的創建,那么再看看創建 Exchange 有哪些參數 消息隊列原理之如何掌握rabbitmq

  • Type: 類型,上面已經涉及到了

  • Durability: 持久化

  • Auto delete: 是否自動刪除,如果為yes 則當其中隊列完成 unbind 操作,則其他的 queue 或者 exchange 也會 unbind 并且刪除這個 exchange

  • Internal: 如果為yes ,則客戶端不能直接往這個 exchange 上發送消息,只能用作和 exchange 綁定。

fanout

fanout 工作方式類似于廣播,看看下面的代碼

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	fanoutExchangeNameCar := "fanout.car"
	if err := channel.ExchangeDeclare(fanoutExchangeNameCar, "fanout", true, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decalare exchange")
	}

	queueNameCar := "car"
	queueNameBigCar := "big-car"
	queueNameMiddleCar := "middle-car"
	queueNameSmallCar := "small-car"
	channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)

	if err := channel.QueueBind(queueNameCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameBigCar, "big.car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameMiddleCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameMiddleCar, "middler.car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.QueueBind(queueNameSmallCar, "car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}
	if err := channel.QueueBind(queueNameSmallCar, "small.car", fanoutExchangeNameCar, false, nil); err != nil {
		handlerError(err, "Failed to bind queue to exchange")
	}

	if err := channel.Publish(fanoutExchangeNameCar, "middle.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
}
  • 這個申明了一個 fanout 類型的 exchange ,和上面的代碼類似,只有 exchange 不同。

  • 可以先在腦海中想想每個 queue 中有幾條消息。

  • fanout.car 這個 exchange 發消息指定 Routekey 為 middle.car ,但是由于是廣播模式,所以和 routekey 是沒有關系的,每個消息隊列中各有一條消息。

  • 請注意有些 binding 指向的是同一個 queue ,那么會產生多條消息到相同的 queue 中,答案是否定的。producer 產生一條消息,根據一定的規則,每個隊列只會收到一條(如何符合投遞規則的話)。 消息隊列原理之如何掌握rabbitmq

topic

topic 比較有意思了,和之前的簡單粗暴的用法不一樣了,先看看下面的代碼,聲明了一個 topic 類型的 exchange, 4個 queue

func main() {
	conn, err := amqp.Dial(url)
	handlerError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	channel, err := conn.Channel()
	handlerError(err, "Failed to open a Channel")
	defer channel.Close()

	topicExchangeNameCar := "topic.car"
	if err := channel.ExchangeDeclare(topicExchangeNameCar, "topic", true, false, false, false, nil); err != nil {
		handlerError(err, "Failed to decalare exchange")
	}

	queueNameCar := "car"
	queueNameBigCar := "big-car"
	queueNameMiddleCar := "middle-car"
	queueNameSmallCar := "small-car"
	channel.QueueDeclare(queueNameCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil)
	channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil)

	if err := channel.QueueBind(queueNameCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameBigCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameBigCar, "big.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }

    if err := channel.QueueBind(queueNameMiddleCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameMiddleCar, "middler.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }

    if err := channel.QueueBind(queueNameSmallCar, "car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameSmallCar, "small.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameSmallCar, "*.small.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
    if err := channel.QueueBind(queueNameSmallCar, "#.small.car", topicExchangeNameCar, false, nil); err != nil {
        handlerError(err, "Failed to bind queue to exchange")
    }
}

現在思考每個 producer 產生消息之后,會有哪些 queue 會收到消息。

	if err := channel.Publish(topicExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
  • 每個 queue 都會收到消息

	if err := channel.Publish(topicExchangeNameCar, "small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
  • small-car 這一個隊列會收到消息。

    • 符合 Routekey 為 small.car*.small.car#.small.car 的binding

	if err := channel.Publish(topicExchangeNameCar, "benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
  • small-car 這一個隊列會收到消息。

    • 符合 Routekey 為 *.small.car#.small.car 的binding

	if err := channel.Publish(topicExchangeNameCar, "auto.blue.benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
  • small-car 這一個隊列會收到消息。

    • 符合 Routekey 為 #.small.car 的binding

	if err := channel.Publish(topicExchangeNameCar, "bike", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil {
		handlerError(err, "Failed to publish message")
	}
  • 都不會收到消息,沒有符合的 routekey 。

headers

這種類型很少有實際的應用場景。

感謝各位的閱讀,以上就是“消息隊列原理之如何掌握rabbitmq”的內容了,經過本文的學習后,相信大家對消息隊列原理之如何掌握rabbitmq這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

金华市| 仁怀市| 息烽县| 江陵县| 普定县| 新昌县| 包头市| 楚雄市| 清丰县| 正宁县| 杭州市| 花莲市| 建湖县| 汝城县| 武穴市| 太湖县| 余江县| 平塘县| 康马县| 富顺县| 施甸县| 三穗县| 沅江市| 德化县| 拜泉县| 高清| 玉屏| 蕉岭县| 中牟县| 霸州市| 深水埗区| 类乌齐县| 沭阳县| 萨迦县| 岢岚县| 横山县| 颍上县| 连平县| 鸡泽县| 南汇区| 昌宁县|