您好,登錄后才能下訂單哦!
這篇“Spring boot怎么集成MQTT”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Spring boot怎么集成MQTT”文章吧。
MQTT
(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基于發布/訂閱(publish/subscribe)模式的"輕量級"通訊協議,可以以極少的代碼和有限的帶寬為連接遠程設備提供實時可靠的消息服務。目前在物聯網、小型設備、移動應用等方面有較廣泛的應用。
(1)使用發布/訂閱消息模式,提供一對多的消息發布,解除應用程序耦合。
(2)對負載內容屏蔽的消息傳輸。
(3)使用TCP/IP提供網絡連接。
(4)有三種消息發布服務質量:
“至多一次”,消息發布完全依賴底層TCP/IP網絡。會發生消息丟失或重復。這一級別可用于如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久后還會有第二次發送。這一種方式主要普通APP的推送,倘若你的智能設備在消息推送時未聯網,推送過去沒收到,再次聯網也就收不到了。
“至少一次”,確保消息到達,但消息重復可能會發生。
“只有一次”,確保消息到達一次。在一些要求比較嚴格的計費系統中,可以使用此級別。在計費系統中,消息重復或丟失會導致不正確的結果。這種最高質量的消息發布服務還可以用于即時通訊類的APP的推送,確保用戶收到且只會收到一次。
(5)小型傳輸,開銷很小(固定長度的頭部是2字節),協議交換最小化,以降低網絡流量。
(6)使用Last Will和Testament特性通知有關各方客戶端異常中斷的機制。
Last Will:即遺言機制,用于通知同一主題下的其他設備發送遺言的設備已經斷開了連接。
Testament:遺囑機制,功能類似于Last Will。
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
@Configuration public class MqttConfig { @Autowired private MqttProperties mqttProperties; /** * 連接器 * @return */ @Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); // 設置是否清空session,false表示服務器會保留客戶端的連接記錄,true表示每次連接到服務器都以新的身份連接 mqttConnectOptions.setCleanSession(true); // 設置超時時間,默認30秒 mqttConnectOptions.setConnectionTimeout(mqttProperties.getConnectionTimeOut()); mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive()); mqttConnectOptions.setAutomaticReconnect(true); // 設置連接的用戶名 mqttConnectOptions.setUserName(mqttProperties.getUsername()); // 設置連接的密碼 mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); //服務器地址 mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getUrl()}); mqttConnectOptions.setKeepAliveInterval(2); return mqttConnectOptions; } /*** * MQTT客戶端 * @return */ @Bean("mqttClientFactory") public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } /*----------------- 消息生產者的配置 ---------------------*/ /** * MQTT生產端發布處理器 * * @return {@link org.springframework.messaging.MessageHandler} */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getProducerClientId(), mqttClientFactory()); messageHandler.setAsync(true); return messageHandler; } /** * MQTT生產端發布通道 * @return */ @Bean("mqttOutboundChannel") public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /*----------------- 消息消費者的配置 ---------------------*/ /** * MQTT消費端訂閱通道 * * @return {@link org.springframework.messaging.MessageChannel} */ @Bean(name = "mqttInboundChannel") public MessageChannel mqttInboundChannel() { return new DirectChannel(); } /** * MQTT消費端連接配置 * * @param channel {@link org.springframework.messaging.MessageChannel} * @param factory {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory} * @return {@link org.springframework.integration.core.MessageProducer} */ @Bean public MessageProducer inbound( @Qualifier("mqttInboundChannel") MessageChannel channel, @Qualifier("mqttClientFactory") MqttPahoClientFactory factory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getConsumerClientId(), factory, "test"); adapter.setCompletionTimeout(30000); adapter.setConverter(new DefaultPahoMessageConverter()); // 0 至多一次,數據可能丟失 // 1 至少一次,數據可能重復 // 2 只有一次,且僅有一次,最耗性能 adapter.setQos(1); // 設置訂閱通道 adapter.setOutputChannel(channel); return adapter; } }
@ConfigurationProperties("mqtt") @Component public class MqttProperties implements Serializable { private static final long serialVersionUID = -1425980007744001158L; private String url; private String username; private String password; private int keepAlive; private int connectionTimeOut; private String producerClientId; private String producerQos; private String consumerClientId; private String consumerQos; private String consumerTopic; private int completionTimeout; private String defaultTopic; //get、set方法省略 }
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { void sendToMqtt(byte[] data,@Header(MqttHeaders.TOPIC) String topic); }
@Autowired private MqttGateway mqttGateway; @RequestMapping("/sendTest") public String sendMqttTest(String msg) { mqttGateway.send("test",msg); return "OK"; }
mqtt: url: tcp://localhost:1883 username: test password: test1234 keep-alive: 30 connection-timeout: 3000 producerClientId: test-producer producerQos: 1 consumerClientId: test-consumer consumerQos: 1 deafultTopic : test
以上就是關于“Spring boot怎么集成MQTT”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。