中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何用Python集成ActiveMQ

發布時間:2020-05-29 20:45:18 來源:億速云 閱讀:1919 作者:鴿子 欄目:編程語言

ActiveMQ是一個非常流行的消息隊列服務中間件,實現JMS規范,基于STOMP協議(端口為61613)支持Python訪問。


JMS:Java Message Service

STOMP:Simple(or Streaming) Text Orientated Messaging Protocol,簡單(流)文本定向消息協議


JMS規范定義了2類消息發送接收模型:點對點queue,發布訂閱topic,區別是能夠重復消費和是否保存。


1,點對點queue:不可重復消費,消息被消費前一直保存。

生產者發送消息到queue,一個消費者取出并消費消息。

消息被消費后,queue中不再保存,所有只有一個消費者能夠取到消息。

queue支持多個消費者存在,但是一個消息只有一個消費者可以消費。

當前沒有消費者時,消息一直保存,直到被消費者消費。

如何用Python集成ActiveMQ

2,發布訂閱topic:可重復消費,發布給所有訂閱者。

生產者發布消息到topic中,多個訂閱者收到并消費消息。

queue不同,發布到topic中的消息會被所有訂閱者消費。

當生產者發布消息時,不管是否有訂閱者,都不保存消息。

如何用Python集成ActiveMQ

JMS規范定義的2類消息傳輸模型queue和topic比較:


Queue

Topic

模型

點對點Point-to-Point

發布訂閱publish/subscribe

有無狀態

queue消息在消費前被一直保存在mq服務器文件或者配置DB

topic數據默認不保存,是無狀態的。

完整性保障

queue保證每條消息都被消費者接收到

topic不保證生產者發布的每條消息都被訂閱者接收到

消息是否會丟失

生產者發送消息到queue,消費者接收到消息。如果沒有消費者,將一直保存,不會丟失。

生產者發布消息到topic時,當前的訂閱者都能夠接收到消息。如果當前沒有訂閱者,該消息就丟失。

消息發布接收策略

一對一的消息發布接收策略,一個生產者發送的消息只被一個消費者接收。mq服務器收到回復后,將這個消息刪除。

一對多的消息發布接收策略,同一個topic的多個訂閱者都能收到生產者發布的消息。


Python集成ActiveMQ使用stomp.py,只需簡單配置,本文在Django框架下進一步封裝服務mq_service.py。典型系統架構示意圖和消息隊列:

如何用Python集成ActiveMQ

時序圖如下:

如何用Python集成ActiveMQ

示例代碼:https://github.com/rickding/HelloPython/tree/master/hello_activemq

├── settings.py

├── mq

│   └── mq_service.py

│   └── mq_listener.py

├── tests

│   └── test_mq_service.py

├── management

│   └── commands

│        └── mq.py


一,Python集成ActiveMQ


代碼文件

功能要點

Python集成ActiveMQ

requirements.txt

安裝stomp.py:

stomp.py >= 5.0.1

封裝服務

mq_serivce.py

封裝ActiveMQ的消息發送和處理功能。在Django框架下,將地址等配置在settings.py中集中管理,注意端口為61613

接收處理消息

mq_listener.py

增加消息接收處理類,繼承stomp.ConnectionListener

啟動消息監聽服務

mq.py

在Django框架下,將啟動服務代碼封裝成command,方便調用和維護。

單元測試

test_mq_serivce.py

測試封裝的功能函數

功能調用

views.py

增加REST接口/chk/mq,調用mq_service發送消息


1. 新建Django項目,運行:django-admin startproject hello_activemq

2. 進到目錄hello_activemq,增加應用:python manage.py startapp app

如何用Python集成ActiveMQ

項目的目錄文件結構如下:

如何用Python集成ActiveMQ

3. 安裝stomp.py,pip install stomp.py >= 5.0.1


二,封裝服務mq_service.py,調用ActiveMQ發送消息

1. 增加mq_service.py:

import json
import logging
import stomp
from django.conf import settings

log = logging.getLogger(__name__)

def send_msg(msg_dict, queue_or_topic=settings.MQ_QUEUE):
    conn = stomp.Connection10([(settings.MQ_URL, settings.MQ_PORT)])
    conn.connect(settings.MQ_USER, settings.MQ_PASSWORD)

    msg_str = json.dumps(msg_dict)
    log.info('Send msg: %s, %s, %s' % (type(msg_dict), type(msg_str), msg_str))
    conn.send(queue_or_topic, msg_str)
    conn.disconnect()

2. 打開settings.py,配置ActiveMQ信息:

MQ_URL = '127.0.0.1'
MQ_PORT = 61613
MQ_USER = 'admin'
MQ_PASSWORD = 'admin'
MQ_QUEUE = '/queue/SampleQueue'
MQ_TOPIC = '/topic/SampleTopic'

3. 為了增加代碼的兼容和容錯能力,封裝get_conn(), close_conn()等輔助函數,詳見代碼文件mq_service.py


三,接收處理消息mq_listener.py

1. 增加mq_listener.py,聲明消息處理類,繼承stomp.ConnectionListener

import json
import logging
import stomp

log = logging.getLogger(__name__)

class MqListener(stomp.ConnectionListener):
    def on_message(self, headers, msg_str):
        log.info('Receive msg: %s, %s, %s' % (type(msg_str), msg_str, headers))

        msg_dict = None
        try:
            msg_dict = json.loads(msg_str)
        except Exception as e:
            log.warning('Exception when parse msg: %s' % str(e))

        log.info('Parsed msg: {}, {}'.format(type(msg_dict), msg_dict))

    def on_error(self, headers, msg_str):
        log.info('Error msg: %s, %s, %s' % (type(msg_str), msg_str, headers))

2. on_message()函數中,將消息字符串解析為json,方便業務處理

3. 聲明on_error()函數處理錯誤信息。


四,啟動消息監聽服務mq.py

1. 將循環接收消息代碼封裝成函數consume_msg(),增加在服務中mq_serivce.py:

import logging
import time
import stomp
from django.conf import settings

log = logging.getLogger(__name__)

def consume_msg(listener, queue=settings.MQ_QUEUE, topic=settings.MQ_TOPIC):
    conn = stomp.Connection10([(settings.MQ_URL, settings.MQ_PORT)])
    conn.connect(settings.MQ_USER, settings.MQ_PASSWORD)
    
    conn.set_listener('', listener)
    conn.subscribe(queue)
    conn.subscribe(topic)

    while 1:
        time.sleep(1000)  # secs

    conn.disconnect()

2. 調用set_listener()設置消息接收類實例,使用之前創建的MqListener

3. 調用subscribe()訂閱消息,啟動循環監聽。

4. 我們將啟動服務代碼封裝成command,在目錄management/commands中增加mq.py

import logging
from django.core.management.base import BaseCommand
from hello_activemq.mq import mq_service as mq
from hello_activemq.mq.mq_listener import MqListener

log = logging.getLogger(__name__)


class Command(BaseCommand):
    help = 'mq starts listener'

    def handle(self, *args, **options):
        log.info("mq starts")
        return mq.consume_msg(MqListener())

5. 運行命令python manage.py mq,看到消息提示,啟動監聽服務成功。

如何用Python集成ActiveMQ

五,單元測試test_mq_service.py

增加測試函數,發送消息:

import logging
from django.test import TestCase
from hello_activemq.mq import mq_service as mq

log = logging.getLogger(__name__)

class MQServiceTest(TestCase):
    def test_send_msg(self):
        msg_dict = {'content': 'test msg dict', 'msg': 'msg from python'}
        mq.send_msg_to_queue(msg_dict)
        mq.send_msg_to_topic({'msg': "test msg from python"})

運行python manage.py test,同時看到監聽服務收到并處理消息:

如何用Python集成ActiveMQ

六,發送消息功能調用

1. views.py中發送消息,調用mq_servcie.py

import json
from django.http import HttpResponse
from hello_activemq.mq import mq_service as mq

def chk_mq(req):
    msg_dict = {
        'url': req.get_raw_uri(),
        'path': req.get_full_path(),
        'host': req.get_host(),
    }

    mq.send_msg_to_queue(msg_dict)
    mq.send_msg_to_topic(msg_dict)

    return HttpResponse(json.dumps(msg_dict))

2. urls.py中配置路由

from django.urls import path
from app.views import chk_mq

urlpatterns = [
    path('', chk_mq, name='chk'),
]

3. 運行命令啟動服務:python manage.py runserver 0.0.0.0:8001

如何用Python集成ActiveMQ

4. REST接口發送消息

如何用Python集成ActiveMQ

七,常見問題和解決方法

1. 啟動服務錯誤:[transport.py: 787, attempt_connection] Could not connect to host 127.0.0.1, port 61613

解決:檢查ActiveMQ是否正常啟動,特別注意是否開啟STOMP協議端口61613

原因:Python連接ActiveMQ使用STOMP協議,端口默認61613


2. 發送消息時錯誤:TypeError: message should be a string or bytes, found <class 'dict'>

解決:將消息內容序列化為JSON,發送時調用json.dumps(),接收時調用json.loads()

原因:Python連接ActiveMQ使用的是STOMP協議,消息格式為簡單文本。


注:JMS規范定義的5類消息:

字符串TextMessage,

鍵值對MapMessage,

序列化對象ObjectMessage

字節流BytesMessage

數據流StreamMessage

如何用Python集成ActiveMQ

ActiveMQ支持5類JMS消息,增加了二進制大文件消息BlobMessage:

如何用Python集成ActiveMQ


3. 跨系統對接時接收到的消息類型不是TextMessage

Python開發的業務處理服務 -> Java開發的API服務,接收到的消息類型為BytesMessage,Python發送時設置conn.send('xx', msg_str, content_type="text/plain")仍然接收不到期望的類型TextMessage

解決:stomp建立連接時配置參數conn = stomp.Connection10([("localhost", 61613)], auto_content_length=False)

原因:Python連接ActiveMQ使用STOMP協議,消息格式為簡單文本,不攜帶類型信息,只通過header中的content-length來判斷TextMessage和BytesMessage,所以發送消息時不在header中添加content-length就可以了。

向AI問一下細節
推薦閱讀:
  1. ActiveMQ Tips
  2. ActiveMQ安裝

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

涿鹿县| 吐鲁番市| 修水县| 昌都县| 潞西市| 延川县| 洛南县| 北流市| 逊克县| 高雄县| 德州市| 甘肃省| 新晃| 垦利县| 略阳县| 广灵县| 荣昌县| 乌苏市| 朝阳市| 花垣县| 都兰县| 佛坪县| 屏东县| 昌都县| 宜川县| 梧州市| 广汉市| 聊城市| 大连市| 体育| 通江县| 开封市| 如东县| 五台县| 丘北县| 桓台县| 霍州市| 遵义县| 文水县| 淅川县| 乌鲁木齐市|