Gateway綁定MQTT實現(xiàn)發(fā)布訂閱
前言
實現(xiàn)MQTT協(xié)議的中間件有很多,本文使用的是企業(yè)級 EMQX EnterPrise,不了解的小伙伴可以翻閱之前的博客。這里,主要介紹SpringBoot2.0集成MQTT實現(xiàn)消息推送的功能。
創(chuàng)建項目
創(chuàng)建父工程
打開?idea?點擊?File>New>Project?選擇Spring Initializr >JDK版本>Next?并按下圖創(chuàng)建項目

點擊 next ,開發(fā)者工具 Developer Tools我們勾選前兩個,Web 我們勾選第一個,安全框架和SQL這里暫時不需要勾選,Messaging中間件,我們同樣勾選第一個就好,Cloud組件我們也不用勾選。

依次點擊?next?finish創(chuàng)建好項目

刪除 src ,.gitignore,HELP.md,mvnw和mvnw.cmd 目錄,本文采用Gateway綁定的方式,需要引入以下依賴:
????org.springframework.integration
????spring-integration-stream
????org.springframework.integration
????spring-integration-mqtt
父工程pom文件:
"1.0"?encoding="UTF-8"?>"http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?https://maven.apache.org/xsd/maven-4.0.0.xsd">
????4.0.0
????pom
????
????????springboot_emqx_common
????????springboot_emqx_publish
????????springboot_emqx_subscribe
????
????
????????org.springframework.boot
????????spring-boot-starter-parent
????????2.4.1
?????????
????
????com.baba.wlb
????springboot_emqx
????1.0-SNAPSHOT
????springboot_emqx
????Demo?project?for?Spring?Boot
????
????????1.8
????
????
????????
????????????org.springframework.boot
????????????spring-boot-starter-integration
????????
????????
????????????org.springframework.integration
????????????spring-integration-stream
????????
????????
????????????org.springframework.integration
????????????spring-integration-mqtt
????????
????????
????????????org.springframework.boot
????????????spring-boot-starter-web
????????
????????
????????????org.springframework.boot
????????????spring-boot-devtools
????????????runtime
????????????true
????????
????????
????????????org.projectlombok
????????????lombok
????????????true
????????
????????
????????????org.springframework.boot
????????????spring-boot-starter-test
????????????test
????????
????????
????????????org.springframework.integration
????????????spring-integration-test
????????????test
????????
????
????
????????
????????????
????????????????org.springframework.boot
????????????????spring-boot-maven-plugin
????????????????
????????????????????com.baba.wlb.publish.PublishApplication
????????????????
????????????
????????
????
創(chuàng)建子工程
在父工程中點擊New>>module>Next 分別創(chuàng)建三個子工程:
springboot_emqx_common
springboot_emqx_publish
springboot_emqx_subscribe

springboot_emqx_common
在該模塊下新建如下package包
注:(config包下暫時沒放公共配置,因為我試過好久,發(fā)現(xiàn)丟進(jìn)來的配置只有主類'mainClass'才能加載到,其他模塊加載不到通用配置,不清楚是不是我漏了什么注解,望了解這部分的人多多指教!所以只好拆分配置到各個模塊中了)

系統(tǒng)常量:Constants.java
package?com.baba.wlb.share.common;
/**
?*?@Author?wulongbo
?*?@Date?2020/12/29?13:50
?*?@Version?1.0
?*/
/**
?*?系統(tǒng)常量
?*/
public?class?Constants?{
????public?static?final?String?MQTT_PUBLISH_CHANNEL?=?"mqttPublishChannel";
????public?static?final?String?MQTT_SUBSCRIBE_CHANNEL?=?"mqttSubscribeChannel";
}
Emqx配置類:EmqxMqttProperties.java
package?com.baba.wlb.share.properties;
import?lombok.Data;
import?org.springframework.boot.context.properties.ConfigurationProperties;
import?org.springframework.stereotype.Component;
/**
?*?@Author?wulongbo
?*?@Date?2020/12/29?11:33
?*?@Version?1.0
?*/
/**
?*?配置文件
?*/
@Data
@Component
@ConfigurationProperties("wulongbo.mqtt.emqx")
public?class?EmqxMqttProperties?{
????private?String?username;
????private?String?password;
????private?String?hostUrl;
????private?String?clientId;
????private?String?defaultTopic;
????private?Integer?timeout;
????private?Integer?keepAlive;
????private?Integer?qos;
????private?Integer?version;
}
在 resource資源目錄下新建一個 application-common.yml的yml文件。
注:方法一:以application-*.yml的形式命名。方法二:模塊之間并不用寫依賴配置,直接在common模塊的resource目錄,添加一個config文件夾,在里面創(chuàng)建application.yml文件即可
官網(wǎng)是這么介紹的(附上官網(wǎng)地址)
https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#boot-features-external-config-application-property-files
這里選擇第一種方式。
yml配置文件:application-common.yml
wulongbo:
??mqtt:
????emqx:
??????username:?admin
??????password:?public
??????#tcp://ip:port
??????host-url:?tcp://39.102.56.91:1883
??????client-id:?wulongbo${random.value}
??????default-topic:?wulongbo_topic
??????#??????default-topic:?$SYS/brokers/+/clients/#
??????timeout:?60
??????keep-alive:?60
??????# qos:{0:至多一次的傳輸?/1:至少分發(fā)一次,可重復(fù)?/2:只分發(fā)一次,不可重復(fù)}
??????qos:?1
??????version:?4
注:我自身的EMQX 是啟用了Mysql認(rèn)證登錄的,并且關(guān)閉了匿名登錄的哈,所以需要正確的用戶名和密碼
springboot_emqx_publish
在該模塊下新建如下package包

config類:EmqxMqttConfig.java
package?com.baba.wlb.publish.config;
/**
?*?@Author?wulongbo
?*?@Date?2020/12/29?11:38
?*?@Version?1.0
?*/
import?com.baba.wlb.share.common.Constants;
import?com.baba.wlb.share.properties.EmqxMqttProperties;
import?lombok.extern.slf4j.Slf4j;
import?org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import?org.springframework.context.annotation.Bean;
import?org.springframework.context.annotation.Configuration;
import?org.springframework.integration.annotation.IntegrationComponentScan;
import?org.springframework.integration.annotation.ServiceActivator;
import?org.springframework.integration.channel.DirectChannel;
import?org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import?org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import?org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import?org.springframework.messaging.MessageChannel;
import?org.springframework.messaging.MessageHandler;
import?javax.annotation.Resource;
/**
?*?EMQX配置工具類
?*/
@Configuration
@IntegrationComponentScan?//消息掃描件
@Slf4j
public?class?EmqxMqttConfig?{
????@Resource
????private?EmqxMqttProperties?emqxMqttProperties;
????/**
?????*?MQTT的連接
?????*/
????@Bean
????public?MqttConnectOptions?getMqttConnectOptions()?{
????????//?設(shè)置相關(guān)的屬性
????????MqttConnectOptions?mqttConnectOptions?=?new?MqttConnectOptions();
????????mqttConnectOptions.setUserName(emqxMqttProperties.getUsername());
????????mqttConnectOptions.setPassword(emqxMqttProperties.getPassword().toCharArray());
????????mqttConnectOptions.setServerURIs(new?String[]{emqxMqttProperties.getHostUrl()});
????????//?心跳
????????mqttConnectOptions.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());
????????mqttConnectOptions.setMqttVersion(emqxMqttProperties.getVersion());
????????mqttConnectOptions.setConnectionTimeout(emqxMqttProperties.getTimeout());
????????//?保留/清空曾經(jīng)連接的客戶端信息
????????mqttConnectOptions.setCleanSession(false);
????????//?qos
????????String?playload?=?"設(shè)備已斷開連接";
????????//?遺囑消息
????????mqttConnectOptions.setWill("last_topic",?playload.getBytes(),?emqxMqttProperties.getQos(),?false);
????????return?mqttConnectOptions;
????}
????/**
?????*?paho?factory,mqtt自定義的連接放入factory工廠中
?????*/
????@Bean
????public?MqttPahoClientFactory?getMqttPahoClientFactory()?{
????????DefaultMqttPahoClientFactory?defaultMqttPahoClientFactory?=?new?DefaultMqttPahoClientFactory();
????????defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());
????????return?defaultMqttPahoClientFactory;
????}
????/**
?????*?開啟連接通道
?????*/
????@Bean(name?=?Constants.MQTT_PUBLISH_CHANNEL)
????public?MessageChannel?getMqttPublishMessageChannel()?{
????????DirectChannel?directChannel?=?new?DirectChannel();
????????return?directChannel;
????}
//????/**
//?????*?開啟連接通道
//?????*/
//????@Bean(name?=?Constants.MQTT_SUBSCRIBE_CHANNEL)
//????public?MessageChannel?getMqttSubscribeMessageChannel()?{
//????????DirectChannel?directChannel?=?new?DirectChannel();
//????????return?directChannel;
//????}
//
//
//
//????/**
//?????*?監(jiān)聽topic.訂閱者,消費者
//?????*/
//????@Bean
//????public?MessageProducer?inbound()?{
//????????MqttPahoMessageDrivenChannelAdapter?mqttPahoMessageDrivenChannelAdapter?=?new?MqttPahoMessageDrivenChannelAdapter(
//????????????????emqxMqttProperties.getClientId()?+?"_wlb",?getMqttPahoClientFactory(),?emqxMqttProperties.getDefaultTopic().split(",")
//????????);
//????????mqttPahoMessageDrivenChannelAdapter.setDisconnectCompletionTimeout(emqxMqttProperties.getTimeout());
//????????mqttPahoMessageDrivenChannelAdapter.setConverter(new?DefaultPahoMessageConverter());
//????????mqttPahoMessageDrivenChannelAdapter.setQos(emqxMqttProperties.getQos());
//????????mqttPahoMessageDrivenChannelAdapter.setOutputChannel(getMqttPublishMessageChannel());
//????????return?mqttPahoMessageDrivenChannelAdapter;
//????}
????/**
?????*?訂閱者,消費者
?????*/
????@Bean
????@ServiceActivator(inputChannel?=?Constants.MQTT_PUBLISH_CHANNEL)
????public?MessageHandler?getMessageHandler()?{
????????MqttPahoMessageHandler?mqttPahoMessageHandler?=?new?MqttPahoMessageHandler(emqxMqttProperties.getClientId(),getMqttPahoClientFactory());
????????mqttPahoMessageHandler.setAsync(true);
????????mqttPahoMessageHandler.setDefaultQos(emqxMqttProperties.getQos());
????????mqttPahoMessageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());
????????return?mqttPahoMessageHandler;
????}
}
controller類:PublishController.java
package?com.baba.wlb.publish.controller;
import?com.baba.wlb.publish.service.PublishService;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.web.bind.annotation.RequestMapping;
import?org.springframework.web.bind.annotation.RestController;
/**
?*?@Author?wulongbo
?*?@Date?2020/12/29?13:58
?*?@Version?1.0
?*/
/**
?*?發(fā)送消息的Controller
?*/
@RestController
@RequestMapping("/publish")
public?class?PublishController?{
????/**
?????*?注入發(fā)布者的service服務(wù)
?????*/
????@Autowired
????private?PublishService?publishService;
????/**
?????*?發(fā)送消息
?????*/
????@RequestMapping("/emqxPublish")
????public?String?emqxPublish(String?data,String?topic){
????????publishService.sendToMqtt(data,topic);
????????return?"success";
????}
}
service:?PublishService.java
package?com.baba.wlb.publish.service;
import?com.baba.wlb.share.common.Constants;
import?org.springframework.integration.annotation.MessagingGateway;
import?org.springframework.integration.mqtt.support.MqttHeaders;
import?org.springframework.messaging.handler.annotation.Header;
import?org.springframework.stereotype.Component;
/**
?*?@Author?wulongbo
?*?@Date?2020/12/29?14:00
?*?@Version?1.0
?*/
@MessagingGateway(defaultRequestChannel?=?Constants.MQTT_PUBLISH_CHANNEL)
@Component
public?interface?PublishService?{
????void?sendToMqtt(String?data,?@Header(MqttHeaders.TOPIC)?String?topic);
????void?sendToMqtt(String?data);
????void?sendToMqtt(@Header(MqttHeaders.TOPIC)String?topic,?int?qos,?String?data);
}
注:必須加@Header(MqttHeaders.TOPIC)注解哈
application啟動類:PublishApplication.java
package?com.baba.wlb.publish;
import?com.baba.wlb.share.properties.EmqxMqttProperties;
import?org.springframework.boot.SpringApplication;
import?org.springframework.boot.autoconfigure.SpringBootApplication;
import?org.springframework.boot.context.properties.EnableConfigurationProperties;
/**
?*?@Author?wulongbo
?*?@Date?2020/12/29?14:04
?*?@Version?1.0
?*/
/**
?*?emqx?發(fā)布者啟動程序
?*/
@SpringBootApplication
@EnableConfigurationProperties({EmqxMqttProperties.class})
public?class?PublishApplication?{
????public?static?void?main(String[]?args)?{
????????SpringApplication.run(PublishApplication.class,args);
????}
}
注:須加入@EnableConfigurationProperties,才能加載到配置文件
yml文件:application.yml
server:
??port:?1001
#spring:
#??profiles:
#????active:?common
注:這里我們因為把publish模塊設(shè)置成為了主類,所以可引入common yml,也可以不引入
springboot_emqx_subscribe
在該模塊下新建如下package包

config類:EmqxMqttConfig.java
package?com.baba.wlb.subscribe.config;
/**
?*?@Author?wulongbo
?*?@Date?2020/12/29?11:38
?*?@Version?1.0
?*/
import?com.baba.wlb.share.common.Constants;
import?com.baba.wlb.share.properties.EmqxMqttProperties;
import?lombok.extern.slf4j.Slf4j;
import?org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import?org.springframework.context.annotation.Bean;
import?org.springframework.context.annotation.Configuration;
import?org.springframework.integration.annotation.IntegrationComponentScan;
import?org.springframework.integration.annotation.ServiceActivator;
import?org.springframework.integration.channel.DirectChannel;
import?org.springframework.integration.core.MessageProducer;
import?org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import?org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import?org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import?org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import?org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import?org.springframework.messaging.MessageChannel;
import?org.springframework.messaging.MessageHandler;
import?javax.annotation.Resource;
/**
?*?EMQX配置工具類
?*/
@Configuration
@IntegrationComponentScan?//消息掃描件
@Slf4j
public?class?EmqxMqttConfig?{
????@Resource
????private?EmqxMqttProperties?emqxMqttProperties;
????/**
?????*?MQTT的連接
?????*/
????@Bean
????public?MqttConnectOptions?getMqttConnectOptions()?{
????????//?設(shè)置相關(guān)的屬性
????????MqttConnectOptions?mqttConnectOptions?=?new?MqttConnectOptions();
????????mqttConnectOptions.setUserName(emqxMqttProperties.getUsername());
????????mqttConnectOptions.setPassword(emqxMqttProperties.getPassword().toCharArray());
????????mqttConnectOptions.setServerURIs(new?String[]{emqxMqttProperties.getHostUrl()});
????????//?心跳
????????mqttConnectOptions.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());
????????mqttConnectOptions.setMqttVersion(emqxMqttProperties.getVersion());
????????mqttConnectOptions.setConnectionTimeout(emqxMqttProperties.getTimeout());
????????//?保留/清空曾經(jīng)連接的客戶端信息
????????mqttConnectOptions.setCleanSession(false);
????????//?qos
????????String?playload?=?"設(shè)備已斷開連接";
????????//?遺囑消息
????????mqttConnectOptions.setWill("last_topic",?playload.getBytes(),?emqxMqttProperties.getQos(),?false);
????????return?mqttConnectOptions;
????}
????/**
?????*?paho?factory,mqtt自定義的連接放入factory工廠中
?????*/
????@Bean
????public?MqttPahoClientFactory?getMqttPahoClientFactory()?{
????????DefaultMqttPahoClientFactory?defaultMqttPahoClientFactory?=?new?DefaultMqttPahoClientFactory();
????????defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());
????????return?defaultMqttPahoClientFactory;
????}
//????/**
//?????*?開啟連接通道
//?????*/
//????@Bean(name?=?Constants.MQTT_PUBLISH_CHANNEL)
//????public?MessageChannel?getMqttPublishMessageChannel()?{
//????????DirectChannel?directChannel?=?new?DirectChannel();
//????????return?directChannel;
//????}
????/**
?????*?開啟連接通道
?????*/
????@Bean(name?=?Constants.MQTT_SUBSCRIBE_CHANNEL)
????public?MessageChannel?getMqttSubscribeMessageChannel()?{
????????DirectChannel?directChannel?=?new?DirectChannel();
????????return?directChannel;
????}
????/**
?????*?監(jiān)聽topic.訂閱者,消費者
?????*/
????@Bean
????public?MessageProducer?inbound()?{
????????MqttPahoMessageDrivenChannelAdapter?mqttPahoMessageDrivenChannelAdapter?=?new?MqttPahoMessageDrivenChannelAdapter(
????????????????emqxMqttProperties.getClientId()?+?"_wlb",?getMqttPahoClientFactory(),?emqxMqttProperties.getDefaultTopic().split(",")
????????);
????????mqttPahoMessageDrivenChannelAdapter.setDisconnectCompletionTimeout(emqxMqttProperties.getTimeout());
????????mqttPahoMessageDrivenChannelAdapter.setConverter(new?DefaultPahoMessageConverter());
????????mqttPahoMessageDrivenChannelAdapter.setQos(emqxMqttProperties.getQos());
????????mqttPahoMessageDrivenChannelAdapter.setOutputChannel(getMqttSubscribeMessageChannel());
????????return?mqttPahoMessageDrivenChannelAdapter;
????}
????/**
?????*?發(fā)布者,生產(chǎn)者
?????*/
????@Bean
????@ServiceActivator(inputChannel?=?Constants.MQTT_SUBSCRIBE_CHANNEL)
????public?MessageHandler?getMessageHandler()?{
????????MqttPahoMessageHandler?mqttPahoMessageHandler?=?new?MqttPahoMessageHandler(emqxMqttProperties.getClientId(),getMqttPahoClientFactory());
????????mqttPahoMessageHandler.setAsync(true);
????????mqttPahoMessageHandler.setDefaultQos(emqxMqttProperties.getQos());
????????mqttPahoMessageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());
????????return?mqttPahoMessageHandler;
????}
}
service業(yè)務(wù)類:SubscribeService.java
package?com.baba.wlb.subscribe.service;
import?com.baba.wlb.share.common.Constants;
import?org.springframework.context.annotation.Bean;
import?org.springframework.integration.annotation.ServiceActivator;
import?org.springframework.messaging.Message;
import?org.springframework.messaging.MessageHandler;
import?org.springframework.messaging.MessagingException;
import?org.springframework.stereotype.Service;
/**
?*?@Author?wulongbo
?*?@Date?2020/12/29?14:11
?*?@Version?1.0
?*/
/**
?*?訂閱者
?*/
@Service
public?class?SubscribeService?{
????@Bean
????@ServiceActivator(inputChannel?=?Constants.MQTT_SUBSCRIBE_CHANNEL)
????public?MessageHandler?messageHandler()?{
????????MessageHandler?messageHandler?=?new?MessageHandler()?{
????????????@Override
????????????public?void?handleMessage(Message>?message)?throws?MessagingException?{
????????????????System.out.println("訂閱者訂閱消息頭是:"?+?message.getHeaders());
????????????????System.out.println("訂閱者訂閱消息主體是:"?+?message.getPayload());
????????????}
????????};
????????return?messageHandler;
????}
}
注:我們把MessageHandler放入了專門的server做業(yè)務(wù)處理,其實放config類也是OK的
application啟動類:SubscribeApplication.java
package?com.baba.wlb.subscribe;
/**
?*?@Author?wulongbo
?*?@Date?2020/12/29?14:16
?*?@Version?1.0
?*/
import?com.baba.wlb.share.properties.EmqxMqttProperties;
import?org.springframework.boot.SpringApplication;
import?org.springframework.boot.autoconfigure.SpringBootApplication;
import?org.springframework.boot.context.properties.EnableConfigurationProperties;
/**
?*?訂閱者啟動類
?*/
@SpringBootApplication
@EnableConfigurationProperties({EmqxMqttProperties.class})
public?class?SubscribeApplication?{
????public?static?void?main(String[]?args)?{
????????SpringApplication.run(SubscribeApplication.class,args);
????}
}
yml配置文件:application.yml
server:
??port:?1002
spring:
??profiles:
????include:?common
注:當(dāng)然我們上面的publish和subscribe模塊都是依賴于common模塊的我們需要在各個模塊上右擊--Open Model Settings

并按下圖依次來添加模塊之間的依賴關(guān)系

最后,我們在分別在 publish和subscribe模塊的pom文件中引入common依賴就Ok了
????????
????????????com.baba.wlb
????????????springboot_emqx_common
????????????1.0-SNAPSHOT
????????
????

至此,我們多模塊用Gateway綁定的方式就集成好了MQTT消息推送和消息訂閱功能。
啟動項目
分別啟動PublishApplication和?SubscribeApplication
端口分別為:1001,1002

PostMan測試
打開postman:發(fā)起Get請求
localhost:1001/publish/emqxPublish?topic=wulongbo_topic&data=我是一條消息
可以看到我們訂閱者訂閱到了這條消息:

至于service業(yè)務(wù)模塊對消息的處理:具體是根據(jù)主題來篩選,還是根據(jù)playload來區(qū)分,看具體的業(yè)務(wù)場景和設(shè)計需要。當(dāng)然EMQX 有更解耦的方式就是規(guī)則引擎來對各個事件響應(yīng)動作,也有HTTP API供我們調(diào)用,讀者靈活運用即可。

