中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python中RabbitMQ如何實現進程間通信

發布時間:2020-07-02 14:38:24 來源:億速云 閱讀:258 作者:清晨 欄目:開發技術

這篇文章主要介紹Python中RabbitMQ如何實現進程間通信,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

RabbitMQ    消息隊列

PY
threading Queue
進程Queue 父進程與子進程,或同一父進程下的多個子進程進行交互
缺點:兩個不同Python文件不能通過上面兩個Queue進行交互

erlong
基于這個語言創建的一種中間商
win中需要先安裝erlong才能使用
rabbitmq_server start

安裝 Python module

pip install pika

or

easy_install pika

or
源碼

rabbit      默認端口15672
查看當前時刻的隊列數
rabbitmqctl.bat list_queue

exchange
在定義的時候就是有類型的,決定到底哪些queue符合條件,可以接受消息
fanout:所有bind到此exchange的queue都可以收到消息
direct:通過routingkey和exchange決定唯一的queue可以接受消息
topic: 所有符合routingkey(此時可以是一個表達式)的routingkey所bind的queue都可以接受消息
      表達式符號說明:
      # 代表一個或多個字符     * 代表任何字符

RPC
remote procedure call           雙向傳輸,指令<-------->指令執行結果
實現方法:                        創建兩個隊列,一個隊列收指令,一個隊列發送執行結果

用rabbitmq實現簡單的生產者消費者模型

1) rabbit_producer.py

# Author : Xuefeng

import pika

connection = pika.BlockingConnection(pika.Connection.Parameters(
  "localhost"
))
# statement a channel
channel = connection.channel()

# create the queue, the name of queue is "hello"
# durable=True can make the queue be exist, although the service have stopped before.
channel.queue_declare(queue="hello", durable=True)

# n RabbitMQ a message can never be sent directly to queue,it always need to go through
channel.basic_publish(exchange = " ",
           routing_key = "hello",
           body = "Hello world!",
           properties = pika.BasicPropreties(
             delivery_mode=2, # make the message persistence
           )
           )
print("[x] sent 'Hello world!'")
connection.close()

2) rabbit_consumer.py

# Author : Xuefeng

import pika

connection = pika.BlockingConnection(pika.Connection.Parameters(
  "localhost"
))
# statement a channel
channel = connection.channel()
channel.queue_declare(queue="hello", durable=True)

def callback(ch, method, properties, body):
  '''
  Handle the recieved data
  :param ch: The address of the channel
  :param method: Information about the connection
  :param properties:
  :param body:
  :return:
  '''
  print("------>", ch, method, properties )
  print("[x] Recieved %r" % body)
  # ack by ourself
  ch.basic_ack(delivery_tag = method.delivery_tag)

# follow is for consumer to auto change with the ability
channel.basic_qos(profetch_count=1)
# no_ack = True  represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback,   # If have recieved message, enable the callback() function to handle the message.
           queue = "hello",
           no_ack = True)

print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()

用rabbitmq中的fanout模式實現廣播模式

1) fanout_rabbit_publish.py

# Author : Xuefeng

import pika
import sys

# 廣播模式:
# 生產者發送一條消息,所有的開通鏈接的消費者都可以接收到消息

connection = pika.BlockingConnection(pika.Connection.Parameters(
  "localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="logs",
             type="fanout")
message = ' '.join(sys.argv[1:]) or "info:Hello world!"
channel.basic_publish(
  exchange="logs",
  routing_key="",
  body=message
)
print("[x] Send %r" % message)

connection.close()

2) fanout_rabbit_consumer.py

# Author : Xuefeng


import pika
import sys

connection = pika.BlockingConnection(pika.Connection.Parameters(
  "localhost"
))
# statement a channel
channel = connection.channel()
# exclusive 排他,唯一的 隨機生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)

channel.queue_bind(exchange="logs",
          queue=queue_name)


def callback(ch, method, properties, body):
  '''
  Handle the recieved data
  :param ch: The address of the channel
  :param method: Information about the connection
  :param properties:
  :param body:
  :return:
  '''
  print("------>", ch, method, properties )
  print("[x] Recieved %r" % body)
  # ack by ourself
  ch.basic_ack(delivery_tag = method.delivery_tag)

# no_ack = True  represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback,   # If have recieved message, enable the callback() function to handle the message.
           queue = "hello",
           no_ack = True)

print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()

用rabbitmq中的direct模式實現消息過濾模式

1) direct_rabbit_publisher.py

# Author : Xuefeng
import pika
import sys

# 消息過濾模式:
# 生產者發送一條消息,通過severity優先級來確定是否可以接收到消息

connection = pika.BlockingConnection(pika.Connection.Parameters(
  "localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="direct_logs",
             type="direct")
severity = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "info:Hello world!"

channel.basic_publish(
  exchange="direct_logs",
  routing_key=severity,
  body=message
)
print("[x] Send %r:%r" % (severity, message))

connection.close()

2) direct_rabbit_consumer.py

# Author : Xuefeng

import pika
import sys

connection = pika.BlockingConnection(pika.Connection.Parameters(
  "localhost"
))
# statement a channel
channel = connection.channel()

channel.exchange_declare(exchange="direct_logs",
             type="direct")

# exclusive 排他,唯一的 隨機生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)

severities = sys.argv[1:]
if not severities:
  sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0])
  sys.exit(1)

for severity in severities:
  channel.queue_bind(exchange="direct_logs",
            queue=queue_name,
            routing_key=severity)
  


def callback(ch, method, properties, body):
  '''
  Handle the recieved data
  :param ch: The address of the channel
  :param method: Information about the connection
  :param properties:
  :param body:
  :return:
  '''
  print("------>", ch, method, properties )
  print("[x] Recieved %r" % body)
  # ack by ourself
  ch.basic_ack(delivery_tag = method.delivery_tag)

# no_ack = True  represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback,   # If have recieved message, enable the callback() function to handle the message.
           queue = "hello",
           no_ack = True)

print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming() 


用rabbitmq中的topic模式實現細致消息過濾模式

1) topic_rabbit_publisher.py

# Author : Xuefeng

import pika
import sys

# 消息細致過濾模式:
# 生產者發送一條消息,通過運行腳本 *.info 等確定接收消息類型進行對應接收
connection = pika.BlockingConnection(pika.Connection.Parameters(
  "localhost"
))
# statement a channel
channel = connection.channel()
channel.exchange_declare(exchange="topic_logs",
             type="topic")
binding_key = sys.argv[1] if len(sys.argv) > 1 else "info"
message = ' '.join(sys.argv[2:]) or "info:Hello world!"

channel.basic_publish(
  exchange="topic_logs",
  routing_key=binding_key,
  body=message
)
print("[x] Send %r:%r" % (binding_key, message))

connection.close()

2) topic_rabbit_consumer.py

# Author : Xuefeng

import pika
import sys

connection = pika.BlockingConnection(pika.Connection.Parameters(
  "localhost"
))
# statement a channel
channel = connection.channel()

channel.exchange_declare(exchange="topic_logs",
             type="topic")

# exclusive 排他,唯一的 隨機生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)

binding_keys = sys.argv[1:]
if not binding_keys:
  sys.stderr.write("Usage:%s [info] [warning] [error]\n" % sys.argv[0])
  sys.exit(1)

for binding_key in binding_keys:
  channel.queue_bind(exchange="topic_logs",
            queue=queue_name,
            routing_key=binding_key)


def callback(ch, method, properties, body):
  '''
  Handle the recieved data
  :param ch: The address of the channel
  :param method: Information about the connection
  :param properties:
  :param body:
  :return:
  '''
  print("------>", ch, method, properties)
  print("[x] Recieved %r" % body)
  # ack by ourself
  ch.basic_ack(delivery_tag=method.delivery_tag)


# no_ack = True  represent that the message cannot be transfor to next consumer,
# when the current consumer is stop by accident.
channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message.
           queue="hello",
           no_ack=True)

print("[*] Waiting for messages. To Exit press CTRL+C")
channel.start_consuming()

用rabbitmq實現rpc操作

1) Rpc_rabbit_client.py

# Author : Xuefeng

import pika
import time
import uuid

class FibonacciRpcClient(object):
  def __init__(self):
    self.connection = pika.BlockingConnection(pika.Connection.Parameters(
  "localhost"))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue    # 隨機的生成一個接收命令執行結果的隊列
    self.channel.basic_consume(self.on_response,  # 只要收到消息就調用
                  no_ack=True,
                  queue=self.callback_queue)

  def on_response(self, ch, method, props, body):
    if self.corr_id == props.correlation_id:
      self.response = body

  def call(self,n):
    self.response = None
    self.corr_id = str(uuid.uuid4())
    self.channel.basic_publish(
      exchange="",
      routing_key="rpc_queue",
      properties=pika.BasicPropreties(
        rely_to=self.callback_queue,
        correlation_id=self.corr_id     # 通過隨機生成的ID來驗證指令執行結果與指令的匹配性
      ),
      body=str(n)
    )
    while self.response is None:
      self.connection.process_data_events()  # 非阻塞版的start_consume,有沒有消息都繼續
      print("no message...")
      time.sleep(0.5)
    return int(self.response)

fibonacci_rcp = FibonacciRpcClient()

print("[x] Requesting fib(30)")
response = fibonacci_rcp.call(30)
print("[x] Rec %r" % response)

2) Rpc_rabbit_server.py

# Author : Xuefeng

import pika
import sys

connection = pika.BlockingConnection(pika.Connection.Parameters(
  "localhost"
))
# statement a channel
channel = connection.channel()

channel.queue_declare(queue="rpc_queue")

def fib(n):
  if n == 0:
    return 0
  elif n == 1:
    return 1
  else:
    return fib(n-1)+fib(n-2)

def on_request(ch, method, props, body):
  n = int(body)
  print("[.] fib(%s)" % n)
  response = fib(n)
  ch.basic_publish(
    exchange="",
    routing_key=props.rely_to,
    properties=pika.BasicPropreties(correlation_id=\
                    props.correlation),
    body = str(body)
  )
  ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue="rpc_queue")

print("[x] Awaiting RPC requests")
channel.start_consumeing()



channel.exchange_declare(exchange="direct_logs",
             type="direct")

# exclusive 排他,唯一的 隨機生成queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
print("Random queue name:", queue_name)

severities = sys.argv[1:]


以上是Python中RabbitMQ如何實現進程間通信的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

阳春市| 中超| 东兴市| 永川市| 岳普湖县| 池州市| 宁城县| 松溪县| 锡林浩特市| 许昌市| 绵竹市| 灌阳县| 昌邑市| 响水县| 中超| 南充市| 林口县| 龙海市| 长海县| 万宁市| 鸡东县| 梓潼县| 呼玛县| 汉源县| 五寨县| 垦利县| 峨眉山市| 英超| 天镇县| 淮阳县| 绥宁县| 格尔木市| 宁城县| 朔州市| 双辽市| 东乌珠穆沁旗| 黄平县| 合山市| 公安县| 东丰县| 彭阳县|