您好,登錄后才能下訂單哦!
本篇內容介紹了“基于MQTT怎么對接EMQ-X服務器”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
MQTT AL用來解耦基于MQTT的業務和MQTT的具體實現,具體來說以后的MQTT業務層應該有且只能使用MQTT AL提供的相關功能(API 數據結構 流程等)。MQTT AL定義MQTT的標準,用來屏蔽各個MQTT協議實現的差異(如軟件庫 或者硬件),讓上層業務無需關心MQTT的實現部分。
MQTT AL的api接口聲明在<mqtt_al.h>中,使用相關的接口需要包含該頭文件,關于函數的詳細參數請參考該頭文件的聲明。
對接服務器的所有信息保存在結構體mqtt_al_conpara_t
中,其定義在mqtt_al.h
中,如下:
/** @brief defines the paramter for the mqtt connect */ typedef struct { mqtt_al_string_t serveraddr; ///< mqtt server:support domain name and dot format int serverport; ///< mqtt server port mqtt_al_security_para_t *security; ///< if NULL,will use en_mqtt_security_none en_mqtt_al_verison version; ///< mqtt version will be used mqtt_al_string_t clientid; ///< mqtt connect client identifier mqtt_al_string_t user; ///< mqtt connect user mqtt_al_string_t passwd; ///< mqtt connect passwd int cleansession; ///< 1 clean the session while 0 not mqtt_al_willmsg_t *willmsg; ///< mqtt connect will message unsigned short keepalivetime;///< keep alive time char conret; ///< mqtt connect code, return by server int timeout; ///< how much time will be blocked }mqtt_al_conpara_t;
其中的一些參數值已經使用枚舉給出:
security:安全連接參數(使用此需要確保mbedtls組件開啟)
枚舉值如下:
/** @brief this enum all the transport encode we support now*/ typedef enum { en_mqtt_al_security_none = 0, ///< no encode en_mqtt_al_security_psk, ///< use the psk mode in transport layer en_mqtt_al_security_cas, ///< use the ca mode in transport layer,only check the server en_mqtt_al_security_cacs, ///< use the ca mode in transport layer,both check the server and client en_mqtt_al_security_end, ///< the end for the mqtt }en_mqtt_al_security_t;
version:使用的MQTT協議版本
枚舉值如下:
/** @brief enum the mqtt version*/ typedef enum { en_mqtt_al_version_3_1_0 = 0, en_mqtt_al_version_3_1_1, }en_mqtt_al_verison;
另外,在復制的時候還需要注意,很多字符串參數都是使用mqtt_al_string_t類型,其定義如下:
/** brief defines for all the ascii or data used in the mqtt engine */ typedef struct { char *data; ///< buffer to storage the data int len; ///< buffer data length }mqtt_al_string_t; //used to represent any type string (maybe not ascii)
在配置結構體完成之后,調用配置函數進行配置并連接,API如下:
/** *@brief: you could use this function to connect to the mqtt server * *@param[in] conparam the parameter we will use in connect, refer to the data mqtt_al_conpara_t *@ *@return: first you should check the return value then the return code in conparam * *@retval NULL which means you could not get the connect to the server,maybe network reason *@retval handle, which means you get the context, please check the conparam for more */ void * mqtt_al_connect( mqtt_al_conpara_t *conparam);
連接之后,首先應該檢查返回的handle指針是否為空,其次應該檢查mqtt_al_conpara_t結構體中conret的值,有以下枚舉值:
/** @brief defines for the mqtt connect code returned by the server */ #define cn_mqtt_al_con_code_ok 0 ///< has been accepted by the server #define cn_mqtt_al_con_code_err_version 1 ///< server not support the version #define cn_mqtt_al_con_code_err_clientID 2 ///< client identifier is error #define cn_mqtt_al_con_code_err_netrefuse 3 ///< server service not ready yet #define cn_mqtt_al_con_code_err_u_p 4 ///< bad user name or password #define cn_mqtt_al_con_code_err_auth 5 ///< the client is not authorized #define cn_mqtt_al_con_code_err_unkown -1 ///< unknown reason #define cn_mqtt_al_con_code_err_network 0x80 ///< network reason,you could try once more
EMQ-X服務器有心跳機制,實際應用中訂閱之前應該先檢查連接狀態,本實驗中暫不檢查。
連接成功后,首先訂閱消息,設置回調函數,方便接收下發的命令。
訂閱消息的API如下:
/** * @brief you could use this function subscribe a topic from the server * * @param[in] handle the handle we get from mqtt_al_connect * * @param[in] subpara refer to the data mqtt_al_subpara_t * * @return 0 success -1 failed * */ int mqtt_al_subscribe(void *handle, mqtt_al_subpara_t *subpara);
兩個參數中,handle參數是之前使用mqtt_al_connect時返回的指針,直接傳入即可,subpara參數需要重點講述。
mqtt_al_subpara_t的定義如下:
/** @brief defines the mqtt subscribe parameter*/ typedef struct { mqtt_al_string_t topic; ///< topic will be subscribe en_mqtt_al_qos_t qos; ///< qos requested fn_mqtt_al_msg_dealer dealer; ///< message dealer:used to deal the received message void *arg; ///< used for the message dealer char subret; ///< subscribe result code int timeout; ///< how much time will be blocked }mqtt_al_subpara_t;
其中訂閱消息質量qos的枚舉值如下:
/** @brief enum all the qos supported for the application */ typedef enum { en_mqtt_al_qos_0 = 0, ///< mqtt QOS 0 en_mqtt_al_qos_1, ///< mqtt QOS 1 en_mqtt_al_qos_2, ///< mqtt QOS 2 en_mqtt_al_qos_err }en_mqtt_al_qos_t;
dealer是一個函數指針,接收到下發命令之后會被回調,arg是回調函數參數,其定義如下:
/** @brief defines the mqtt received message dealer, called by mqtt engine*/ typedef void (*fn_mqtt_al_msg_dealer)(void *arg,mqtt_al_msgrcv_t *msg);
訂閱之后,可以通過mqtt_al_subpara_t結構體中的subret值查看是否訂閱成功。
發布消息的API如下:
/** * @brief you could use this function to publish a message to the server * * @param[in] handle the handle we get from mqtt_al_connect * * @param[in] msg the message we will publish, see the data mqtt_al_pubpara_t * * @return 0 success -1 failed * */ int mqtt_al_publish(void *handle, mqtt_al_pubpara_t *pubpara);
兩個參數中,handle參數是之前使用mqtt_al_connect時返回的指針,直接傳入即可,pubpara參數需要重點講述。
mqtt_al_pubpara_t的定義如下:
/** @brief defines for the mqtt publish */ typedef struct { mqtt_al_string_t topic; ///< selected publish topic mqtt_al_string_t msg; ///< message to be published en_mqtt_al_qos_t qos; ///< message qos int retain; ///< message retain :1 retain while 0 not int timeout; ///< how much time will blocked }mqtt_al_pubpara_t;
MQTT在配置之后,會自動初始化。
在SDK目錄中的IoT_LINK_1.0.0\iot_link\link_main.c
文件中可以看到:
因為本次實驗用到的組件較多:
AT框架
ESP8266設備驅動
串口驅動框架
cJSON組件
SAL組件
MQTT組件
這些實驗代碼全部編譯下來,有350KB,而小熊派開發板所使用的主控芯片STM32L431RCT6的 Flash 僅有256KB,會導致編譯器無法鏈接出可執行文件,所以要在makefile中修改優化選項,修改為-Os
參數,即最大限度的優化代碼尺寸,并去掉-g
參數,即代碼只能下載運行,無法調試,如圖:
在工程目錄中的OS_CONFIG/iot_link_config.h
文件中,配置ESP8266設備的波特率和設備名稱:
SDK:
C:\Users\Administrator\.icode\sdk\IoT_LINK_1.0.0
(其中Administrator是實驗電腦的用戶名)。
在SDK目錄中的iot_link\network\tcpip\esp8266_socket\esp8266_socket_imp.c
文件中,配置連接信息:
之后修改同路徑下的esp8266_socket_imp.mk
文件,如圖,將 TOP_DIR 改為 SDK_DIR :
在SDK目錄中的iot_link\network\mqtt\paho_mqtt\paho_mqtt.mk
文件中,如圖,將 TOP_DIR 改為 SDK_DIR :
對接信息配置如下:
其中ClientID隨機生成一個即可。
使用mqtt.fx連接客戶端,訂閱本次實驗中的兩個主題:
主題led_cmd
:用于發布控制命令
主題lightness
:用于上報亮度
在 Demo 文件夾下創建cloud_test_demo
文件夾,在其中創建emqx_mqtt_demo.c
文件。
編寫代碼:
#include <osal.h> #include <mqtt_al.h> #include <string.h> #define DEFAULT_LIFETIME 60 #define DEFAULT_SERVER_IPV4 "122.51.89.94" #define DEFAULT_SERVER_PORT 1883 #define CN_MQTT_EP_CLIENTID "emqx-test-001" #define CN_MQTT_EP_USERNAME "mculover666" #define CN_MQTT_EP_PASSWD "123456789" #define CN_MQTT_EP_SUB_TOPIC1 "led_cmd" #define CN_MQTT_EP_PUB_TOPIC1 "lightness" #define recv_buf_len 100 static char recv_buffer[recv_buf_len]; //下發數據接收緩沖區 static int recv_datalen; //表示接收數據長度 osal_semp_t recv_sync; //命令接收回調函數和處理函數之間的信號量 char lightness_buf[10]; static void mqtt_al_msg_dealer(void *arg,mqtt_al_msgrcv_t *msg) { if((msg->msg.len) < recv_buf_len) { //保存數據 memcpy(recv_buffer,msg->msg.data,msg->msg.len ); recv_buffer[msg->msg.len] = '\0'; recv_datalen = msg->msg.len; printf("recv buf: %s.\r\n", recv_buffer); //釋放信號量,交由數據處理線程進行處理 osal_semp_post(recv_sync); } else { printf("recv buf is too small, len = %d.\r\n", msg->msg.len); } } static int task_recv_cmd_entry(void *args) { while(1) { /* 阻塞等待信號量 */ osal_semp_pend(recv_sync,cn_osal_timeout_forever); if(strstr(recv_buffer, "on")) { printf("-----------------LED ON !!! --------------------\r\n"); } else if(strstr(recv_buffer, "off")) { printf("-----------------LED OFF !!! --------------------\r\n"); } } return 0; } static int task_report_msg_entry(void *args) { int ret = -1; void *handle = NULL; mqtt_al_conpara_t config; mqtt_al_string_t str_temp; mqtt_al_subpara_t subpara_led_cmd; mqtt_al_pubpara_t pubpara_lightness; int lightness_value = 0; /* 配置結構體 */ str_temp.data = DEFAULT_SERVER_IPV4; str_temp.len = sizeof(DEFAULT_SERVER_IPV4); config.serveraddr = str_temp; config.serverport = DEFAULT_SERVER_PORT; config.security = en_mqtt_al_security_none; config.version = en_mqtt_al_version_3_1_0; str_temp.data = CN_MQTT_EP_CLIENTID; str_temp.len = sizeof(CN_MQTT_EP_CLIENTID); config.clientid = str_temp; str_temp.data = CN_MQTT_EP_USERNAME; str_temp.len = sizeof(CN_MQTT_EP_USERNAME); config.user = str_temp; str_temp.data = CN_MQTT_EP_PASSWD; str_temp.len = sizeof(CN_MQTT_EP_PASSWD); config.passwd = str_temp; config.cleansession = 1; config.willmsg = NULL; config.keepalivetime = DEFAULT_LIFETIME; config.timeout = 30; /* 配置并連接服務器 */ handle = mqtt_al_connect(&config); if(handle == NULL) { /* 連接出錯 */ printf("config error.\r\n"); return -1; } else { /* 進一步檢查服務器返回值 */ if(config.conret != cn_mqtt_al_con_code_ok) { /* 服務器返回值出錯 */ printf("server return error, conret = %d.\r\n", config.conret); return -1; } else { printf("connect to server success.\r\n"); } } /* 連接成功后,訂閱led_cmd主題消息 */ str_temp.data = CN_MQTT_EP_SUB_TOPIC1; str_temp.len = sizeof(CN_MQTT_EP_SUB_TOPIC1); subpara_led_cmd.topic = str_temp; subpara_led_cmd.qos = en_mqtt_al_qos_0; subpara_led_cmd.dealer = mqtt_al_msg_dealer; subpara_led_cmd.arg = NULL; subpara_led_cmd.timeout = 60; ret = mqtt_al_subscribe(handle, &subpara_led_cmd); if(ret < 0) { printf("sub topic %s fail.\r\n", subpara_led_cmd.topic.data); return -1; } else { /* 進一步判斷是否訂閱成功 */ if(cn_mqtt_al_con_code_ok != subpara_led_cmd.subret) { printf("sub topic %s fail, subret = %d.\r\n", subpara_led_cmd.topic.data, subpara_led_cmd.subret); return -1; } else { printf("sub topic %s success.\r\n", subpara_led_cmd.topic.data); } } /* 每隔10s上報一次數據 */ str_temp.data = CN_MQTT_EP_PUB_TOPIC1; str_temp.len = sizeof(CN_MQTT_EP_PUB_TOPIC1); pubpara_lightness.topic = str_temp; pubpara_lightness.qos = en_mqtt_al_qos_0; pubpara_lightness.retain = 0; pubpara_lightness.timeout = 30; while(1) { sprintf(lightness_buf, "%d", lightness_value); str_temp.data = lightness_buf; str_temp.len = strlen(lightness_buf); pubpara_lightness.msg = str_temp; ret = mqtt_al_publish(handle, &pubpara_lightness); if(ret < 0) { printf("publish topic %s fail.\r\n", pubpara_lightness.topic.data); return -1; } else { printf("publish topic %s success. payload = %s, lightness = %d.\r\n", pubpara_lightness.topic.data, pubpara_lightness.msg.data, lightness_value); } lightness_value++; osal_task_sleep(10*1000); } } int standard_app_demo_main() { /* 創建信號量 */ osal_semp_create(&recv_sync,1,0); /* 創建任務 */ osal_task_create("task_reportmsg",task_report_msg_entry,NULL,0x800,NULL,8); osal_task_create("task_recv_cmd",task_recv_cmd_entry,NULL,0x400,NULL,8); return 0; }
在user_demo.mk中添加如下:
#example for emqx_mqtt_demo ifeq ($(CONFIG_USER_DEMO), "emqx_mqtt_demo") user_demo_src = ${wildcard $(TOP_DIR)/targets/STM32L431_BearPi/Demos/cloud_test_demo/emqx_mqtt_demo.c} endif
添加位置如下:
特別說明:實驗時需要關閉shell組件,否則會因動態內存分配失敗而無法連接。
編譯下載之后,可以在串口助手中看到輸出信息:
在訂閱了該主題的客戶端也可以看到上報數據:
在mqtt.fx中下發一條開啟命令:
可以看到設備后作出回應:
再下發一條關閉命令:
可以看到設備后作出回應:
“基于MQTT怎么對接EMQ-X服務器”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。