中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么在python項目中使用RabbitMQ

發布時間:2021-03-25 17:47:36 來源:億速云 閱讀:400 作者:Leah 欄目:開發技術

怎么在python項目中使用RabbitMQ?針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

一、RabbitMQ 消息隊列介紹

RabbitMQ也是消息隊列,那RabbitMQ和之前python的Queue有什么區別么?

py 消息隊列:
    線程 queue(同一進程下線程之間進行交互)
    進程 Queue(父子進程進行交互 或者 同屬于同一進程下的多個子進程進行交互)

如果是兩個完全獨立的python程序,也是不能用上面兩個queue進行交互的,或者和其他語言交互有哪些實現方式呢。

【Disk、Socket、其他中間件】這里中間件不僅可以支持兩個程序之間交互,可以支持多個程序,可以維護好多個程序的隊列。

像這種公共的中間件有好多成熟的產品:
RabbitMQ
ZeroMQ
ActiveMQ
……

RabbitMQ:erlang語言 開發的。

Python中連接RabbitMQ的模塊:pika 、Celery(分布式任務隊列) 、haigha

可以維護很多的隊列

RabbitMQ 教程官網:http://www.rabbitmq.com/getstarted.html

幾個概念說明:

Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務

二、RabbitMQ基本示例.

1、Rabbitmq 安裝

ubuntu系統

install rabbitmq-server # 直接搞定

以下centos系統

1)Install Erlang

# For EL5:
rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
# For EL6:
rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
# For EL7:
rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm

yum install erlang

2)Install RabbitMQ Server

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.5-1.noarch.rpm

3)use RabbitMQ Server

chkconfig rabbitmq-server on
service rabbitmq-server stop/start

2、基本示例

發送端 producer

import pika

# 建立一個實例
connection = pika.BlockingConnection(
  pika.ConnectionParameters('localhost',5672) # 默認端口5672,可不寫
  )
# 聲明一個管道,在管道里發消息
channel = connection.channel()
# 在管道里聲明queue
channel.queue_declare(queue='hello')
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
           routing_key='hello', # queue名字
           body='Hello World!') # 消息內容
print(" [x] Sent 'Hello World!'")
connection.close() # 隊列關閉

接收端 consumer

import pika
import time

# 建立實例
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
# 聲明管道
channel = connection.channel()

# 為什么又聲明了一個‘hello'隊列?
# 如果確定已經聲明了,可以不聲明。但是你不知道那個機器先運行,所以要聲明兩次。
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body): # 四個參數為標準格式
  print(ch, method, properties) # 打印看一下是什么
  # 管道內存對象 內容相關信息 后面講
  print(" [x] Received %r" % body)
  time.sleep(15)
  ch.basic_ack(delivery_tag = method.delivery_tag) # 告訴生成者,消息處理完成

channel.basic_consume( # 消費消息
    callback, # 如果收到消息,就調用callback函數來處理消息
    queue='hello', # 你要從那個隊列里收消息
    # no_ack=True # 寫的話,如果接收消息,機器宕機消息就丟了
    # 一般不寫。宕機則生產者檢測到發給其他消費者
    )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 開始消費消息

3、RabbitMQ 消息分發輪詢

上面的只是一個生產者、一個消費者,能不能一個生產者多個消費者呢?

可以上面的例子,多啟動幾個消費者consumer,看一下消息的接收情況。

采用輪詢機制;把消息依次分發

假如消費者處理消息需要15秒,如果當機了,那這個消息處理明顯還沒處理完,怎么處理?

(可以模擬消費端斷了,分別注釋和不注釋 no_ack=True 看一下)

你沒給我回復確認,就代表消息沒處理完。

上面的效果消費端斷了就轉到另外一個消費端去了,但是生產者怎么知道消費端斷了呢?

因為生產者和消費者是通過socket連接的,socket斷了,就說明消費端斷開了。

上面的模式只是依次分發,實際情況是機器配置不一樣。怎么設置類似權重的操作?

RabbitMQ怎么辦呢,RabbitMQ做了簡單的處理就能實現公平的分發。

就是RabbitMQ給消費者發消息的時候檢測下消費者里的消息數量,如果超過指定值(比如1條),就不給你發了。

只需要在消費者端,channel.basic_consume前加上就可以了。

channel.basic_qos(prefetch_count=1) # 類似權重,按能力分發,如果有一個消息,就不在給你發
channel.basic_consume( # 消費消息

三、RabbitMQ 消息持久化(durable、properties)

1、RabbitMQ 相關命令

rabbitmqctl list_queues # 查看當前queue數量及queue里消息數量

2、消息持久化

如果隊列里還有消息,RabbitMQ 服務端宕機了呢?消息還在不在?

把RabbitMQ服務重啟,看一下消息在不在。

上面的情況下,宕機了,消息就久了,下面看看如何把消息持久化。

每次聲明隊列的時候,都加上durable,注意每個隊列都得寫,客戶端、服務端聲明的時候都得寫。

# 在管道里聲明queue
channel.queue_declare(queue='hello2', durable=True)

測試結果發現,只是把隊列持久化了,但是隊列里的消息沒了。

durable的作用只是把隊列持久化。離消息持久話還差一步:

發送端發送消息時,加上properties

properties=pika.BasicProperties(
  delivery_mode=2, # 消息持久化
  )

發送端 producer

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost',5672)) # 默認端口5672,可不寫
channel = connection.channel()
#聲明queue
channel.queue_declare(queue='hello2', durable=True) # 若聲明過,則換一個名字
#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
           routing_key='hello2',
           body='Hello World!',
           properties=pika.BasicProperties(
             delivery_mode=2, # make message persistent
             )
           )

print(" [x] Sent 'Hello World!'")
connection.close()

接收端 consumer

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello2', durable=True)

def callback(ch, method, properties, body):
  print(" [x] Received %r" % body)
  time.sleep(10)
  ch.basic_ack(delivery_tag = method.delivery_tag) # 告訴生產者,消息處理完成

channel.basic_qos(prefetch_count=1) # 類似權重,按能力分發,如果有一個消息,就不在給你發
channel.basic_consume( # 消費消息
           callback, # 如果收到消息,就調用callback
           queue='hello2',
           # no_ack=True # 一般不寫,處理完接收處理結果。宕機則發給其他消費者
           )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

四、RabbitMQ 廣播模式(exchange)

前面的效果都是一對一發,如果做一個廣播效果可不可以,這時候就要用到exchange了

exchange必須精確的知道收到的消息要發給誰。exchange的類型決定了怎么處理,

類型有以下幾種:

  • fanout: 所有綁定到此exchange的queue都可以接收消息

  • direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息

  • topic: 所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息

1、fanout 純廣播、all

需要queue和exchange綁定,因為消費者不是和exchange直連的,消費者是連在queue上,queue綁定在exchange上,消費者只會在queu里度消息

 |------------------------|
     |      /—— queue <—|—> consumer1
producer —|—exchange1 <bind    |         
    \ |      \—— queue <—|—> consumer2
    \-|-exchange2  ……    |
     |------------------------|

發送端 publisher 發布、廣播

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
# 注意:這里是廣播,不需要聲明queue
channel.exchange_declare(exchange='logs', # 聲明廣播管道
             type='fanout')

# message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
           routing_key='', # 注意此處空,必須有
           body=message)
print(" [x] Sent %r" % message)
connection.close()

接收端 subscriber 訂閱

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
             type='fanout')
# 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
result = channel.queue_declare(exclusive=True)
# 獲取隨機的queue名字
queue_name = result.method.queue
print("random queuename:", queue_name)

channel.queue_bind(exchange='logs', # queue綁定到轉發器上
          queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
  print(" [x] %r" % body)

channel.basic_consume(callback,
           queue=queue_name,
           no_ack=True)

channel.start_consuming()

注意:廣播,是實時的,收不到就沒了,消息不會存下來,類似收音機。

2、direct 有選擇的接收消息

接收者可以過濾消息,只收我想要的消息

發送端publisher

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
             type='direct')
# 重要程度級別,這里默認定義為 info
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
           routing_key=severity,
           body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

接收端subscriber

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
             type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 獲取運行腳本所有的參數
severities = sys.argv[1:]
if not severities:
  sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
  sys.exit(1)
# 循環列表去綁定
for severity in severities:
  channel.queue_bind(exchange='direct_logs',
            queue=queue_name,
            routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
  print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
           queue=queue_name,
           no_ack=True)

channel.start_consuming()

運行接收端,指定接收級別的參數,例:

python direct_sonsumer.py info warning
python direct_sonsumer.py warning error

3、topic 更細致的過濾

比如把error中,apache和mysql的分別或取出來

發送端publisher

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
             type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
           routing_key=routing_key,
           body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

接收端 subscriber

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
             type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
  sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
  sys.exit(1)

for binding_key in binding_keys:
  channel.queue_bind(exchange='topic_logs',
            queue=queue_name,
            routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
  print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
           queue=queue_name,
           no_ack=True)

channel.start_consuming()

運行接收端,指定接收哪些消息,例:

python topic_sonsumer.py *.info
python topic_sonsumer.py *.error mysql.*
python topic_sonsumer.py '#' # 接收所有消息

# 接收所有的 logs run:
# python receive_logs_topic.py "#"

# To receive all logs from the facility "kern":
# python receive_logs_topic.py "kern.*"

# Or if you want to hear only about "critical" logs:
# python receive_logs_topic.py "*.critical"

# You can create multiple bindings:
# python receive_logs_topic.py "kern.*" "*.critical"

# And to emit a log with a routing key "kern.critical" type:
# python emit_log_topic.py "kern.critical" "A critical kernel error"

4、RabbitMQ RPC 實現(Remote procedure call)

不知道你有沒有發現,上面的流都是單向的,如果遠程的機器執行完返回結果,就實現不了了。

如果返回,這種模式叫什么呢,RPC(遠程過程調用),snmp就是典型的RPC

RabbitMQ能不能返回呢,怎么返回呢?既是發送端又是接收端。

但是接收端返回消息怎么返回?可以發送到發過來的queue里么?不可以。

返回時,再建立一個queue,把結果發送新的queue里

為了服務端返回的queue不寫死,在客戶端給服務端發指令的的時候,同時帶一條消息說,你結果返回給哪個queue

RPC client

import pika
import uuid
import time

class FibonacciRpcClient(object):
  def __init__(self):
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue

    self.channel.basic_consume(self.on_response, # 只要一收到消息就調用on_response
                  no_ack=True,
                  queue=self.callback_queue) # 收這個queue的消息

  def on_response(self, ch, method, props, body): # 必須四個參數
    # 如果收到的ID和本機生成的相同,則返回的結果就是我想要的指令返回的結果
    if self.corr_id == props.correlation_id:
      self.response = body

  def call(self, n):
    self.response = None # 初始self.response為None
    self.corr_id = str(uuid.uuid4()) # 隨機唯一字符串
    self.channel.basic_publish(
        exchange='',
        routing_key='rpc_queue', # 發消息到rpc_queue
        properties=pika.BasicProperties( # 消息持久化
          reply_to = self.callback_queue, # 讓服務端命令結果返回到callback_queue
          correlation_id = self.corr_id, # 把隨機uuid同時發給服務器
        ),
        body=str(n)
    )
    while self.response is None: # 當沒有數據,就一直循環
      # 啟動后,on_response函數接到消息,self.response 值就不為空了
      self.connection.process_data_events() # 非阻塞版的start_consuming()
      # print("no msg……")
      # time.sleep(0.5)
    # 收到消息就調用on_response
    return int(self.response)

if __name__ == '__main__':
  fibonacci_rpc = FibonacciRpcClient()
  print(" [x] Requesting fib(7)")
  response = fibonacci_rpc.call(7)
  print(" [.] Got %r" % response)

RPC server

import pika
import time

def fib(n):
  if n == 0:
    return 0
  elif n == 1:
    return 1
  else:
    return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
  n = int(body)
  print(" [.] fib(%s)" % n)
  response = fib(n)

  ch.basic_publish(
      exchange='', # 把執行結果發回給客戶端
      routing_key=props.reply_to, # 客戶端要求返回想用的queue
      # 返回客戶端發過來的correction_id 為了讓客戶端驗證消息一致性
      properties=pika.BasicProperties(correlation_id = props.correlation_id),
      body=str(response)
  )
  ch.basic_ack(delivery_tag = method.delivery_tag) # 任務完成,告訴客戶端

if __name__ == '__main__':
  connection = pika.BlockingConnection(pika.ConnectionParameters(
      host='localhost'))
  channel = connection.channel()
  channel.queue_declare(queue='rpc_queue') # 聲明一個rpc_queue ,

  channel.basic_qos(prefetch_count=1)
  # 在rpc_queue里收消息,收到消息就調用on_request
  channel.basic_consume(on_request, queue='rpc_queue')
  print(" [x] Awaiting RPC requests")
  channel.start_consuming()

關于怎么在python項目中使用RabbitMQ問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

临猗县| 湘乡市| 凤城市| 永定县| 安塞县| 攀枝花市| 潼关县| 安仁县| 且末县| 郁南县| 罗山县| 昌平区| 乌兰县| 鄢陵县| 久治县| 莱州市| 宽城| 海安县| 丁青县| 建阳市| 白银市| 瑞丽市| 赣榆县| 余江县| 松潘县| 渝北区| 靖远县| 武安市| 社会| 定西市| 黄陵县| 麟游县| 宜州市| 霍山县| 依兰县| 新乐市| 手游| 思南县| 顺昌县| 中宁县| 乐清市|