中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

【從0開始Python開發實戰】Python集成Active

發布時間:2020-10-15 13:12:12 來源:網絡 閱讀:359 作者:wx5b3c0a4298f7b 欄目:編程語言

目錄:

1.?Python集成ActiveMQ

2.?封裝服務mq_service.py

3.?接收處理消息mq_listener.py

4.?啟動消息監聽服務mq.py

5.?單元測試test_mq_serivce.py

6.?發送消息功能調用

7.?常見問題和解決方法


ActiveMQ是一個非常流行的消息隊列服務中間件,實現JMS規范,基于STOMP協議(端口為61613)支持Python訪問。


JMS:Java Message Service

STOMP:Simple(or Streaming) Text Orientated Messaging Protocol,簡單(流)文本定向消息協議


JMS規范定義了2類消息發送接收模型:點對點queue,發布訂閱topic,區別是能夠重復消費和是否保存。


1,點對點queue:不可重復消費,消息被消費前一直保存。

生產者發送消息到queue,一個消費者取出并消費消息。

消息被消費后,queue中不再保存,所有只有一個消費者能夠取到消息。

queue支持多個消費者存在,但是一個消息只有一個消費者可以消費。

當前沒有消費者時,消息一直保存,直到被消費者消費。

【從0開始Python開發實戰】Python集成Active

2,發布訂閱topic:可重復消費,發布給所有訂閱者。

生產者發布消息到topic中,多個訂閱者收到并消費消息。

queue不同,發布到topic中的消息會被所有訂閱者消費。

當生產者發布消息時,不管是否有訂閱者,都不保存消息。

【從0開始Python開發實戰】Python集成Active

JMS規范定義的2類消息傳輸模型queue和topic比較:


Queue

Topic

模型

點對點Point-to-Point

發布訂閱publish/subscribe

有無狀態

queue消息在消費前被一直保存在mq服務器文件或者配置DB

topic數據默認不保存,是無狀態的。

完整性保障

queue保證每條消息都被消費者接收到

topic不保證生產者發布的每條消息都被訂閱者接收到

消息是否會丟失

生產者發送消息到queue,消費者接收到消息。如果沒有消費者,將一直保存,不會丟失。

生產者發布消息到topic時,當前的訂閱者都能夠接收到消息。如果當前沒有訂閱者,該消息就丟失。

消息發布接收策略

一對一的消息發布接收策略,一個生產者發送的消息只被一個消費者接收。mq服務器收到回復后,將這個消息刪除。

一對多的消息發布接收策略,同一個topic的多個訂閱者都能收到生產者發布的消息。


Python集成ActiveMQ使用stomp.py,只需簡單配置,本文在Django框架下進一步封裝服務mq_service.py。典型系統架構示意圖和消息隊列:

【從0開始Python開發實戰】Python集成Active

時序圖如下:

【從0開始Python開發實戰】Python集成Active

示例代碼:https://github.com/rickding/HelloPython/tree/master/hello_activemq

├── settings.py

├── mq

│ ??└── mq_service.py

│ ??└── mq_listener.py

├── tests

│ ??└── test_mq_service.py

├── management

│ ??└── commands

│ ???????└── mq.py


一,Python集成ActiveMQ


代碼文件

功能要點

Python集成ActiveMQ

requirements.txt

安裝stomp.py:

stomp.py >= 5.0.1

封裝服務

mq_serivce.py

封裝ActiveMQ的消息發送和處理功能。在Django框架下,將地址等配置在settings.py中集中管理,注意端口為61613

接收處理消息

mq_listener.py

增加消息接收處理類,繼承stomp.ConnectionListener

啟動消息監聽服務

mq.py

在Django框架下,將啟動服務代碼封裝成command,方便調用和維護。

單元測試

test_mq_serivce.py

測試封裝的功能函數

功能調用

views.py

增加REST接口/chk/mq,調用mq_service發送消息


1.?新建Django項目,運行:django-admin startproject hello_activemq

2.?進到目錄hello_activemq,增加應用:python manage.py startapp app

【從0開始Python開發實戰】Python集成Active

項目的目錄文件結構如下:

【從0開始Python開發實戰】Python集成Active

3.?安裝stomp.py,pip install stomp.py >= 5.0.1


二,封裝服務mq_service.py,調用ActiveMQ發送消息

1.?增加mq_service.py:

import json
import logging
import stomp
from django.conf import settings

log = logging.getLogger(__name__)

def send_msg(msg_dict, queue_or_topic=settings.MQ_QUEUE):
????conn = stomp.Connection10([(settings.MQ_URL, settings.MQ_PORT)])
????conn.connect(settings.MQ_USER, settings.MQ_PASSWORD)

????msg_str = json.dumps(msg_dict)
????log.info('Send msg: %s, %s, %s' % (type(msg_dict), type(msg_str), msg_str))
????conn.send(queue_or_topic, msg_str)
????conn.disconnect()

2.?打開settings.py,配置ActiveMQ信息:

MQ_URL = '127.0.0.1'
MQ_PORT = 61613
MQ_USER = 'admin'
MQ_PASSWORD = 'admin'
MQ_QUEUE = '/queue/SampleQueue'
MQ_TOPIC = '/topic/SampleTopic'

3.?為了增加代碼的兼容和容錯能力,封裝get_conn(), close_conn()等輔助函數,詳見代碼文件mq_service.py


三,接收處理消息mq_listener.py

1.?增加mq_listener.py,聲明消息處理類,繼承stomp.ConnectionListener

import json
import logging
import stomp

log = logging.getLogger(__name__)

class MqListener(stomp.ConnectionListener):
????def on_message(self, headers, msg_str):
????????log.info('Receive msg: %s, %s, %s' % (type(msg_str), msg_str, headers))

????????msg_dict = None
????????try:
????????????msg_dict = json.loads(msg_str)
????????except Exception as e:
????????????log.warning('Exception when parse msg: %s' % str(e))

????????log.info('Parsed msg: {}, {}'.format(type(msg_dict), msg_dict))

????def on_error(self, headers, msg_str):
????????log.info('Error msg: %s, %s, %s' % (type(msg_str), msg_str, headers))

2.?on_message()函數中,將消息字符串解析為json,方便業務處理

3.?聲明on_error()函數處理錯誤信息。


四,啟動消息監聽服務mq.py

1.?將循環接收消息代碼封裝成函數consume_msg(),增加在服務中mq_serivce.py:

import logging
import time
import stomp
from django.conf import settings

log = logging.getLogger(__name__)

def consume_msg(listener, queue=settings.MQ_QUEUE, topic=settings.MQ_TOPIC):
????conn = stomp.Connection10([(settings.MQ_URL, settings.MQ_PORT)])
????conn.connect(settings.MQ_USER, settings.MQ_PASSWORD)
????
????conn.set_listener('', listener)
????conn.subscribe(queue)
????conn.subscribe(topic)

????while 1:
????????time.sleep(1000) ?# secs

????conn.disconnect()

2.?調用set_listener()設置消息接收類實例,使用之前創建的MqListener

3.?調用subscribe()訂閱消息,啟動循環監聽。

4.?我們將啟動服務代碼封裝成command,在目錄management/commands中增加mq.py

import logging
from django.core.management.base import BaseCommand
from hello_activemq.mq import mq_service as mq
from hello_activemq.mq.mq_listener import MqListener

log = logging.getLogger(__name__)


class Command(BaseCommand):
????help = 'mq starts listener'

????def handle(self, *args, **options):
????????log.info("mq starts")
????????return mq.consume_msg(MqListener())

5.?運行命令python manage.py mq,看到消息提示,啟動監聽服務成功。

【從0開始Python開發實戰】Python集成Active

五,單元測試test_mq_service.py

增加測試函數,發送消息:

import logging
from django.test import TestCase
from hello_activemq.mq import mq_service as mq

log = logging.getLogger(__name__)

class MQServiceTest(TestCase):
????def test_send_msg(self):
????????msg_dict = {'content': 'test msg dict', 'msg': 'msg from python'}
????????mq.send_msg_to_queue(msg_dict)
????????mq.send_msg_to_topic({'msg': "test msg from python"})

運行python manage.py test,同時看到監聽服務收到并處理消息:

【從0開始Python開發實戰】Python集成Active

六,發送消息功能調用

1.?views.py中發送消息,調用mq_servcie.py

import json
from django.http import HttpResponse
from hello_activemq.mq import mq_service as mq

def chk_mq(req):
????msg_dict = {
????????'url': req.get_raw_uri(),
????????'path': req.get_full_path(),
????????'host': req.get_host(),
????}

????mq.send_msg_to_queue(msg_dict)
????mq.send_msg_to_topic(msg_dict)

????return HttpResponse(json.dumps(msg_dict))

2.?urls.py中配置路由

from django.urls import path
from app.views import chk_mq

urlpatterns = [
????path('', chk_mq, name='chk'),
]

3.?運行命令啟動服務:python manage.py runserver 0.0.0.0:8001

【從0開始Python開發實戰】Python集成Active

4.?REST接口發送消息

【從0開始Python開發實戰】Python集成Active

七,常見問題和解決方法

1.?啟動服務錯誤:[transport.py: 787, attempt_connection] Could not connect to host 127.0.0.1, port 61613

解決:檢查ActiveMQ是否正常啟動,特別注意是否開啟STOMP協議端口61613

原因:Python連接ActiveMQ使用STOMP協議,端口默認61613


2.?發送消息時錯誤:TypeError: message should be a string or bytes, found <class 'dict'>

解決:將消息內容序列化為JSON,發送時調用json.dumps(),接收時調用json.loads()

原因:Python連接ActiveMQ使用的是STOMP協議,消息格式為簡單文本。


注:JMS規范定義的5類消息:

字符串TextMessage,

鍵值對MapMessage,

序列化對象ObjectMessage

字節流BytesMessage

數據流StreamMessage

【從0開始Python開發實戰】Python集成Active

ActiveMQ支持5類JMS消息,增加了二進制大文件消息BlobMessage:

【從0開始Python開發實戰】Python集成Active


3.?跨系統對接時接收到的消息類型不是TextMessage

Python開發的業務處理服務 -> Java開發的API服務,接收到的消息類型為BytesMessage,Python發送時設置conn.send('xx', msg_str, content_type="text/plain")仍然接收不到期望的類型TextMessage

解決:stomp建立連接時配置參數conn = stomp.Connection10([("localhost", 61613)], auto_content_length=False)

原因:Python連接ActiveMQ使用STOMP協議,消息格式為簡單文本,不攜帶類型信息,只通過header中的content-length來判斷TextMessage和BytesMessage,所以發送消息時不在header中添加content-length就可以了。


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

乌鲁木齐县| 威海市| 青州市| 交口县| 同江市| 疏附县| 商城县| 新余市| 隆德县| 平度市| 昌黎县| 三门峡市| 白山市| 澎湖县| 湖南省| 巍山| 青海省| 诏安县| 东海县| 孝感市| 柘城县| 封丘县| 上饶县| 无为县| 中阳县| 彰化县| 思南县| 栾川县| 铜鼓县| 轮台县| 增城市| 绥滨县| 张掖市| 丰都县| 镶黄旗| 石泉县| 依兰县| 内乡县| 读书| 余江县| 大理市|