中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Python MQTT客戶端怎么使用

發布時間:2021-11-25 13:52:45 來源:億速云 閱讀:158 作者:iii 欄目:互聯網科技

本篇內容介紹了“Python MQTT客戶端怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

paho-mqtt

paho-mqtt 可以說是 Python MQTT 開源客戶端庫中的佼佼者。它由 Eclipse 基金會主導開發,除了 Python 庫以外,同樣支持各大主流的編程語言,比如 C++、Java、JavaScript、Golang 等。目前 Python 版本已經實現了 3.1 和 3.1.1 MQTT 協議,在最新開發版中實現了 MQTT 5.0。

在基金會的支持下,以每年一個版本的速度更新,本文發布時的最新版本為 1.5.0(于 2019 年 8 月發布)。

在 GitHub 主頁上,它提供了從入門的快速實現到每一個函數的詳細解讀,涵蓋了從初學者到高級使用者需要了解的各個部分。即使遇到超出范圍的問題,在 Google 上搜索,可以得到近 20 萬個相關詞條,是目前最為流行的 MQTT 客戶端。

得到如此多的關注度,除了穩定的代碼外,還有其易用性。Paho 的接口使用非常簡單優雅,您只需要少量的代碼就能實現 MQTT 的訂閱及消息發布。

安裝

pip3 install paho-mqtt

或者

git clone https://github.com/eclipse/paho.mqtt.python
cd paho.mqtt.python
python3 setup.py install

訂閱者

import paho.mqtt.client as mqtt

# 連接的回調函數
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    client.subscribe("$SYS/#")
    
# 收到消息的回調函數
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
    
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("broker.emqx.io", 1883, 60)
client.loop_forever()

發布者

import paho.mqtt.client as mqtt
import time
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    
client = mqtt.Client()
client.on_connect = on_connect
client.connect("broker.emqx.io", 1883, 60)
for i in range(3):
    client.publish('a/b', payload=i, qos=0, retain=False)
    print(f"send {i} to a/b")
    time.sleep(1)

client.loop_forever()

甚至,你可以通過一行代碼,實現訂閱、發布。

import paho.mqtt.subscribe as subscribe

# 當調用這個函數時,程序會堵塞在這里,直到有一條消息發送到 paho/test/simple 主題
msg = subscribe.simple("paho/test/simple", hostname="broker.emqx.io")
print(f"{msg.topic} {msg.payload}")
import paho.mqtt.publish as publish

# 發送一條消息
publish.single("a/b", "payload", hostname="broker.emqx.io")
# 或者一次發送多個消息
msgs = [{'topic':"a/b", 'payload':"multiple 1"}, ("a/b", "multiple 2", 0, False)]
publish.multiple(msgs, hostname="broker.emqx.io")

HBMQTT

HBMQTT 基于 Python asyncio 開發,僅支持 3.1.1 的 MQTT 協議。由于使用 asyncio 庫,開發者需要使用 3.4 以上的 Python 版本。

CPU 的速度遠遠快于磁盤、網絡等 IO 操作,而在一個線程中,無論 CPU 執行得再快,遇到 IO 操作時,都得停下來等待讀寫完成,這無疑浪費了許多時間。

為了解決這個問題,Python 加入了異步 IO 的特性。在 Python 3.4 中,正式將 asyncio 納入標準庫中,并在 Python 3.5 中,加入了 async/await 關鍵字。用戶可以很輕松的使用在函數前加入 async 關鍵字,使函數變成異步函數。

HBMQTT 便是建立在 asyncio 標準庫之上。它允許用戶顯示的設置異步斷點,通過異步 IO,MQTT 客戶端在收取消息或發送消息時,掛載當前的任務,繼續處理下一個。

不過 HBMQTT 的知名度卻小得多。在 Google 上搜索,關于 HBMQTT 僅有 6000 多個詞條,在 Stack Overflow 上只有 10 個提問數。這就意味著,如果選擇 HBMQTT 的話你需要很強的解決問題的能力。

有意思的是,HBMQTT 本身也是一個 MQTT 服務器。你可以通過 hbmqtt 命令一鍵開啟。

$ hbmqtt
[2020-08-28 09:35:56,608] :: INFO - Exited state new
[2020-08-28 09:35:56,608] :: INFO - Entered state starting
[2020-08-28 09:35:56,609] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connections=-1)

安裝

pip3 install hbmqtt

或者

git clone https://github.com/beerfactory/hbmqtt
cd hbmqtt
python3 setup.py install

訂閱者

import logging
import asyncio
from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_1, QOS_2

async def uptime_coro():
    C = MQTTClient()
    await C.connect('mqtt://broker.emqx.io/')
    await C.subscribe([
            ('$SYS/broker/uptime', QOS_1),
            ('$SYS/broker/load/#', QOS_2),
         ])
    try:
        for i in range(1, 100):
            message = await C.deliver_message()
            packet = message.publish_packet
            print(f"{i}:  {packet.variable_header.topic_name} => {packet.payload.data}")
        await C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
        await C.disconnect()
    except ClientException as ce:
        logging.error("Client exception: %s" % ce)
        
if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)
    asyncio.get_event_loop().run_until_complete(uptime_coro())

發布者

import logging
import asyncio
import time
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2

async def test_coro():
    C = MQTTClient()
    await  C.connect('mqtt://broker.emqx.io/')
    tasks = [
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)),
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)),
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)),
    ]
    await asyncio.wait(tasks)
    logging.info("messages published")
    await C.disconnect()
    
if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)
    asyncio.get_event_loop().run_until_complete(test_coro())

更多使用細節情參考官方文檔:https://hbmqtt.readthedocs.io/en/latest/。

gmqtt

gmqtt 是由個人開發者開源的客戶端庫。默認支持 MQTT 5.0 協議,如果連接的 MQTT 代理不支持 5.0 協議,則會降級到 3.1 并重新進行連接。

相較于前兩者,gmqtt 還屬于初級開發階段,本文發布時的版本號是 0.6.7。但它是早期支持 MQTT 5.0 的 Python 庫之一,因此在網絡上知名度尚可。

同樣,它建立在 asyncio 庫上,因此需要使用 Python 3.4 以上的版本。

安裝

pip3 install gmqtt

或者

git clone https://github.com/wialon/gmqtt
cd gmqtt
python3 setup.py install

訂閱者

import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    print('Connected')
    
def on_message(client, topic, payload, qos, properties):
    print(f'RECV MSG: {topic} {payload}')
    
def on_subscribe(client, mid, qos, properties):
    print('SUBSCRIBED')
    
def on_disconnect(client, packet, exc=None):
    print('Disconnected')
    
def ask_exit(*args):
    STOP.set()

async def main(broker_host):
    client = MQTTClient("client-id")
    
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_subscribe = on_subscribe
    client.on_disconnect = on_disconnect
    
    # 連接 MQTT 代理
    await client.connect(broker_host)
    
    # 訂閱主題
    client.subscribe('TEST/#')
    
    # 發送測試數據
    client.publish("TEST/A", 'AAA')
    client.publish("TEST/B", 'BBB')
    
    await STOP.wait()
    await client.disconnect()
    
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    
    loop.add_signal_handler(signal.SIGINT, ask_exit)
    loop.add_signal_handler(signal.SIGTERM, ask_exit)

    host = 'broker.emqx.io'
    loop.run_until_complete(main(host))

發布者

import asyncio
import os
import signal
import time
from gmqtt import Client as MQTTClient

STOP = asyncio.Event()

def on_connect(client, flags, rc, properties):
    print('Connected')
    client.subscribe('TEST/#', qos=0)
    
def on_message(client, topic, payload, qos, properties):
    print(f'RECV MSG: {topic}, {payload}')
    
def on_disconnect(client, packet, exc=None):
    print('Disconnected')
    
def ask_exit(*args):
    STOP.set()
    
async def main(broker_host):
    client = MQTTClient("client-id")
    
    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect
    
    await client.connect(broker_host)
    
    client.publish('TEST/TIME', str(time.time()), qos=1)
    
    await STOP.wait()
    await client.disconnect()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    
    loop.add_signal_handler(signal.SIGINT, ask_exit)
    loop.add_signal_handler(signal.SIGTERM, ask_exit)
    
    host = 'broker.emqx.io'  
    loop.run_until_complete(main(host))

如何選擇

在介紹完這三款 Python MQTT 客戶端庫之后,我們再來看看如何為自己選擇合適的 MQTT 客戶端庫。這三個客戶端各有自己的優缺點:

paho-mqtt 有著最優秀的文檔,代碼風格易于理解,同時有著強大的基金會支持,但目前文檔的版本還不支持 MQTT 5.0。

HBMQTT 使用 asyncio 庫實現,可以優化網絡 I/O 帶來的延遲。但是代碼風格不友好,同樣不支持 MQTT 5.0。

gmqtt 同樣通過 asyncio 庫實現,相比 HBMQTT ,代碼風格友好,最重要的是,它支持 MQTT 5.0。但開發進程慢,未來前景不明。

因此,在選擇時,您可以參考一下的思路:

  • 如果您是正常開發,想要將其運用在生產環境中,paho-mqtt 無疑是最好的選擇,其穩定性和代碼易讀性遠遠超過其它兩個庫。在遇到問題時,優秀的文檔和互聯網上大量的詞條,也能幫您找到更多的解決方案。

  • 對于熟練使用 asyncio 庫的讀者,不妨嘗試一下 HBMQTT 和 gmqtt。

  • 如果您想要學習、參與開源項目或者使用 MQTT 5.0, 則不妨試用一下 gmqtt,并嘗試為其共享一份代碼吧。

“Python MQTT客戶端怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

云霄县| 偏关县| 和政县| 阳朔县| 鄢陵县| 广宗县| 崇左市| 邢台县| 淮滨县| 龙山县| 宣威市| 辽宁省| 宁都县| 孟村| 上杭县| 衡南县| 天水市| 福清市| 东莞市| 隆安县| 阳东县| 巩义市| 汝州市| 武夷山市| 玉环县| 闻喜县| 双城市| 抚顺市| 师宗县| 焦作市| 房山区| 牟定县| 岱山县| 乌苏市| 峡江县| 阿克苏市| 桃源县| 屏东县| 禹城市| 札达县| 杨浦区|