<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Gateway綁定MQTT實現(xiàn)發(fā)布訂閱

          共 17643字,需瀏覽 36分鐘

           ·

          2021-01-06 18:42

          作者:isWulongbo
          來源:SegmentFault 思否社區(qū)



          前言


          實現(xiàn)MQTT協(xié)議的中間件有很多,本文使用的是企業(yè)級 EMQX EnterPrise,不了解的小伙伴可以翻閱之前的博客。這里,主要介紹SpringBoot2.0集成MQTT實現(xiàn)消息推送的功能。




          創(chuàng)建項目


          創(chuàng)建父工程


          打開?idea?點擊?File>New>Project?選擇Spring Initializr >JDK版本>Next?并按下圖創(chuàng)建項目




          點擊 next ,開發(fā)者工具 Developer Tools我們勾選前兩個,Web 我們勾選第一個,安全框架和SQL這里暫時不需要勾選,Messaging中間件,我們同樣勾選第一個就好,Cloud組件我們也不用勾選。



          依次點擊?next?finish創(chuàng)建好項目



          刪除 src ,.gitignore,HELP.md,mvnw和mvnw.cmd 目錄,本文采用Gateway綁定的方式,需要引入以下依賴:



          ????org.springframework.integration
          ????spring-integration-stream



          ????org.springframework.integration
          ????spring-integration-mqtt


          父工程pom文件:


          "1.0"?encoding="UTF-8"?>
          "http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          ?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?https://maven.apache.org/xsd/maven-4.0.0.xsd">
          ????4.0.0
          ????pom
          ????
          ????????springboot_emqx_common
          ????????springboot_emqx_publish
          ????????springboot_emqx_subscribe
          ????

          ????
          ????????org.springframework.boot
          ????????spring-boot-starter-parent
          ????????2.4.1
          ?????????
          ????

          ????com.baba.wlb
          ????springboot_emqx
          ????1.0-SNAPSHOT
          ????springboot_emqx
          ????Demo?project?for?Spring?Boot

          ????
          ????????1.8
          ????


          ????
          ????????
          ????????????org.springframework.boot
          ????????????spring-boot-starter-integration
          ????????


          ????????
          ????????????org.springframework.integration
          ????????????spring-integration-stream
          ????????


          ????????
          ????????????org.springframework.integration
          ????????????spring-integration-mqtt
          ????????


          ????????
          ????????????org.springframework.boot
          ????????????spring-boot-starter-web
          ????????


          ????????
          ????????????org.springframework.boot
          ????????????spring-boot-devtools
          ????????????runtime
          ????????????true
          ????????

          ????????
          ????????????org.projectlombok
          ????????????lombok
          ????????????true
          ????????

          ????????
          ????????????org.springframework.boot
          ????????????spring-boot-starter-test
          ????????????test
          ????????

          ????????
          ????????????org.springframework.integration
          ????????????spring-integration-test
          ????????????test
          ????????

          ????


          ????
          ????????
          ????????????
          ????????????????org.springframework.boot
          ????????????????spring-boot-maven-plugin








          ????????????????
          ????????????????????com.baba.wlb.publish.PublishApplication
          ????????????????

          ????????????

          ????????

          ????




          創(chuàng)建子工程


          在父工程中點擊New>>module>Next 分別創(chuàng)建三個子工程:


          springboot_emqx_common
          springboot_emqx_publish
          springboot_emqx_subscribe



          springboot_emqx_common

          在該模塊下新建如下package包


          注:(config包下暫時沒放公共配置,因為我試過好久,發(fā)現(xiàn)丟進(jìn)來的配置只有主類'mainClass'才能加載到,其他模塊加載不到通用配置,不清楚是不是我漏了什么注解,望了解這部分的人多多指教!所以只好拆分配置到各個模塊中了)



          系統(tǒng)常量:Constants.java


          package?com.baba.wlb.share.common;

          /**
          ?*?@Author?wulongbo
          ?*?@Date?2020/12/29?13:50
          ?*?@Version?1.0
          ?*/

          /**
          ?*?系統(tǒng)常量
          ?*/
          public?class?Constants?{

          ????public?static?final?String?MQTT_PUBLISH_CHANNEL?=?"mqttPublishChannel";
          ????public?static?final?String?MQTT_SUBSCRIBE_CHANNEL?=?"mqttSubscribeChannel";

          }


          Emqx配置類:EmqxMqttProperties.java


          package?com.baba.wlb.share.properties;

          import?lombok.Data;
          import?org.springframework.boot.context.properties.ConfigurationProperties;
          import?org.springframework.stereotype.Component;

          /**
          ?*?@Author?wulongbo
          ?*?@Date?2020/12/29?11:33
          ?*?@Version?1.0
          ?*/

          /**
          ?*?配置文件
          ?*/

          @Data
          @Component
          @ConfigurationProperties("wulongbo.mqtt.emqx")
          public?class?EmqxMqttProperties?{
          ????private?String?username;
          ????private?String?password;
          ????private?String?hostUrl;
          ????private?String?clientId;
          ????private?String?defaultTopic;
          ????private?Integer?timeout;
          ????private?Integer?keepAlive;
          ????private?Integer?qos;
          ????private?Integer?version;
          }


          在 resource資源目錄下新建一個 application-common.yml的yml文件。


          注:方法一:以application-*.yml的形式命名。方法二:模塊之間并不用寫依賴配置,直接在common模塊的resource目錄,添加一個config文件夾,在里面創(chuàng)建application.yml文件即可


          官網(wǎng)是這么介紹的(附上官網(wǎng)地址)

          https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#boot-features-external-config-application-property-files


          這里選擇第一種方式。


          yml配置文件:application-common.yml


          wulongbo:
          ??mqtt:
          ????emqx:
          ??????username:?admin
          ??????password:?public
          ??????#tcp://ip:port
          ??????host-url:?tcp://39.102.56.91:1883
          ??????client-id:?wulongbo${random.value}
          ??????default-topic:?wulongbo_topic
          ??????#??????default-topic:?$SYS/brokers/+/clients/#
          ??????timeout:?60
          ??????keep-alive:?60
          ??????# qos:{0:至多一次的傳輸?/1:至少分發(fā)一次,可重復(fù)?/2:只分發(fā)一次,不可重復(fù)}
          ??????qos:?1
          ??????version:?4


          注:我自身的EMQX 是啟用了Mysql認(rèn)證登錄的,并且關(guān)閉了匿名登錄的哈,所以需要正確的用戶名和密碼


          springboot_emqx_publish

          在該模塊下新建如下package包



          config類:EmqxMqttConfig.java


          package?com.baba.wlb.publish.config;

          /**
          ?*?@Author?wulongbo
          ?*?@Date?2020/12/29?11:38
          ?*?@Version?1.0
          ?*/

          import?com.baba.wlb.share.common.Constants;
          import?com.baba.wlb.share.properties.EmqxMqttProperties;
          import?lombok.extern.slf4j.Slf4j;
          import?org.eclipse.paho.client.mqttv3.MqttConnectOptions;
          import?org.springframework.context.annotation.Bean;
          import?org.springframework.context.annotation.Configuration;
          import?org.springframework.integration.annotation.IntegrationComponentScan;
          import?org.springframework.integration.annotation.ServiceActivator;
          import?org.springframework.integration.channel.DirectChannel;
          import?org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
          import?org.springframework.integration.mqtt.core.MqttPahoClientFactory;
          import?org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
          import?org.springframework.messaging.MessageChannel;
          import?org.springframework.messaging.MessageHandler;

          import?javax.annotation.Resource;

          /**
          ?*?EMQX配置工具類
          ?*/

          @Configuration
          @IntegrationComponentScan?//消息掃描件
          @Slf4j
          public?class?EmqxMqttConfig?{

          ????@Resource
          ????private?EmqxMqttProperties?emqxMqttProperties;

          ????/**
          ?????*?MQTT的連接
          ?????*/
          ????@Bean
          ????public?MqttConnectOptions?getMqttConnectOptions()?{
          ????????//?設(shè)置相關(guān)的屬性
          ????????MqttConnectOptions?mqttConnectOptions?=?new?MqttConnectOptions();
          ????????mqttConnectOptions.setUserName(emqxMqttProperties.getUsername());
          ????????mqttConnectOptions.setPassword(emqxMqttProperties.getPassword().toCharArray());
          ????????mqttConnectOptions.setServerURIs(new?String[]{emqxMqttProperties.getHostUrl()});
          ????????//?心跳
          ????????mqttConnectOptions.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());
          ????????mqttConnectOptions.setMqttVersion(emqxMqttProperties.getVersion());
          ????????mqttConnectOptions.setConnectionTimeout(emqxMqttProperties.getTimeout());
          ????????//?保留/清空曾經(jīng)連接的客戶端信息
          ????????mqttConnectOptions.setCleanSession(false);
          ????????//?qos
          ????????String?playload?=?"設(shè)備已斷開連接";
          ????????//?遺囑消息
          ????????mqttConnectOptions.setWill("last_topic",?playload.getBytes(),?emqxMqttProperties.getQos(),?false);
          ????????return?mqttConnectOptions;
          ????}

          ????/**
          ?????*?paho?factory,mqtt自定義的連接放入factory工廠中
          ?????*/
          ????@Bean
          ????public?MqttPahoClientFactory?getMqttPahoClientFactory()?{
          ????????DefaultMqttPahoClientFactory?defaultMqttPahoClientFactory?=?new?DefaultMqttPahoClientFactory();
          ????????defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());
          ????????return?defaultMqttPahoClientFactory;
          ????}

          ????/**
          ?????*?開啟連接通道
          ?????*/
          ????@Bean(name?=?Constants.MQTT_PUBLISH_CHANNEL)
          ????public?MessageChannel?getMqttPublishMessageChannel()?{
          ????????DirectChannel?directChannel?=?new?DirectChannel();
          ????????return?directChannel;
          ????}


          //????/**
          //?????*?開啟連接通道
          //?????*/
          //????@Bean(name?=?Constants.MQTT_SUBSCRIBE_CHANNEL)
          //????public?MessageChannel?getMqttSubscribeMessageChannel()?{
          //????????DirectChannel?directChannel?=?new?DirectChannel();
          //????????return?directChannel;
          //????}
          //
          //
          //
          //????/**
          //?????*?監(jiān)聽topic.訂閱者,消費者
          //?????*/
          //????@Bean
          //????public?MessageProducer?inbound()?{
          //????????MqttPahoMessageDrivenChannelAdapter?mqttPahoMessageDrivenChannelAdapter?=?new?MqttPahoMessageDrivenChannelAdapter(
          //????????????????emqxMqttProperties.getClientId()?+?"_wlb",?getMqttPahoClientFactory(),?emqxMqttProperties.getDefaultTopic().split(",")
          //????????);
          //????????mqttPahoMessageDrivenChannelAdapter.setDisconnectCompletionTimeout(emqxMqttProperties.getTimeout());
          //????????mqttPahoMessageDrivenChannelAdapter.setConverter(new?DefaultPahoMessageConverter());
          //????????mqttPahoMessageDrivenChannelAdapter.setQos(emqxMqttProperties.getQos());
          //????????mqttPahoMessageDrivenChannelAdapter.setOutputChannel(getMqttPublishMessageChannel());
          //????????return?mqttPahoMessageDrivenChannelAdapter;
          //????}

          ????/**
          ?????*?訂閱者,消費者
          ?????*/
          ????@Bean
          ????@ServiceActivator(inputChannel?=?Constants.MQTT_PUBLISH_CHANNEL)
          ????public?MessageHandler?getMessageHandler()?{
          ????????MqttPahoMessageHandler?mqttPahoMessageHandler?=?new?MqttPahoMessageHandler(emqxMqttProperties.getClientId(),getMqttPahoClientFactory());
          ????????mqttPahoMessageHandler.setAsync(true);
          ????????mqttPahoMessageHandler.setDefaultQos(emqxMqttProperties.getQos());
          ????????mqttPahoMessageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());
          ????????return?mqttPahoMessageHandler;
          ????}

          }

          controller類:PublishController.java

          package?com.baba.wlb.publish.controller;


          import?com.baba.wlb.publish.service.PublishService;
          import?org.springframework.beans.factory.annotation.Autowired;
          import?org.springframework.web.bind.annotation.RequestMapping;
          import?org.springframework.web.bind.annotation.RestController;

          /**
          ?*?@Author?wulongbo
          ?*?@Date?2020/12/29?13:58
          ?*?@Version?1.0
          ?*/

          /**
          ?*?發(fā)送消息的Controller
          ?*/

          @RestController
          @RequestMapping("/publish")
          public?class?PublishController?{

          ????/**
          ?????*?注入發(fā)布者的service服務(wù)
          ?????*/
          ????@Autowired
          ????private?PublishService?publishService;

          ????/**
          ?????*?發(fā)送消息
          ?????*/
          ????@RequestMapping("/emqxPublish")
          ????public?String?emqxPublish(String?data,String?topic){
          ????????publishService.sendToMqtt(data,topic);
          ????????return?"success";
          ????}
          }

          service:?PublishService.java

          package?com.baba.wlb.publish.service;

          import?com.baba.wlb.share.common.Constants;
          import?org.springframework.integration.annotation.MessagingGateway;
          import?org.springframework.integration.mqtt.support.MqttHeaders;
          import?org.springframework.messaging.handler.annotation.Header;
          import?org.springframework.stereotype.Component;

          /**
          ?*?@Author?wulongbo
          ?*?@Date?2020/12/29?14:00
          ?*?@Version?1.0
          ?*/

          @MessagingGateway(defaultRequestChannel?=?Constants.MQTT_PUBLISH_CHANNEL)
          @Component
          public?interface?PublishService?{
          ????void?sendToMqtt(String?data,?@Header(MqttHeaders.TOPIC)?String?topic);

          ????void?sendToMqtt(String?data);

          ????void?sendToMqtt(@Header(MqttHeaders.TOPIC)String?topic,?int?qos,?String?data);
          }


          注:必須加@Header(MqttHeaders.TOPIC)注解哈


          application啟動類:PublishApplication.java


          package?com.baba.wlb.publish;

          import?com.baba.wlb.share.properties.EmqxMqttProperties;
          import?org.springframework.boot.SpringApplication;
          import?org.springframework.boot.autoconfigure.SpringBootApplication;
          import?org.springframework.boot.context.properties.EnableConfigurationProperties;

          /**
          ?*?@Author?wulongbo
          ?*?@Date?2020/12/29?14:04
          ?*?@Version?1.0
          ?*/

          /**
          ?*?emqx?發(fā)布者啟動程序
          ?*/
          @SpringBootApplication
          @EnableConfigurationProperties({EmqxMqttProperties.class})
          public?class?PublishApplication?{

          ????public?static?void?main(String[]?args)?{
          ????????SpringApplication.run(PublishApplication.class,args);
          ????}
          }

          注:須加入@EnableConfigurationProperties,才能加載到配置文件

          yml文件:application.yml

          server:
          ??port:?1001

          #spring:
          #??profiles:
          #????active:?common


          注:這里我們因為把publish模塊設(shè)置成為了主類,所以可引入common yml,也可以不引入


          springboot_emqx_subscribe

          在該模塊下新建如下package包



          config類:EmqxMqttConfig.java


          package?com.baba.wlb.subscribe.config;

          /**
          ?*?@Author?wulongbo
          ?*?@Date?2020/12/29?11:38
          ?*?@Version?1.0
          ?*/

          import?com.baba.wlb.share.common.Constants;
          import?com.baba.wlb.share.properties.EmqxMqttProperties;
          import?lombok.extern.slf4j.Slf4j;
          import?org.eclipse.paho.client.mqttv3.MqttConnectOptions;
          import?org.springframework.context.annotation.Bean;
          import?org.springframework.context.annotation.Configuration;
          import?org.springframework.integration.annotation.IntegrationComponentScan;
          import?org.springframework.integration.annotation.ServiceActivator;
          import?org.springframework.integration.channel.DirectChannel;
          import?org.springframework.integration.core.MessageProducer;
          import?org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
          import?org.springframework.integration.mqtt.core.MqttPahoClientFactory;
          import?org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
          import?org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
          import?org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
          import?org.springframework.messaging.MessageChannel;
          import?org.springframework.messaging.MessageHandler;

          import?javax.annotation.Resource;

          /**
          ?*?EMQX配置工具類
          ?*/

          @Configuration
          @IntegrationComponentScan?//消息掃描件
          @Slf4j
          public?class?EmqxMqttConfig?{

          ????@Resource
          ????private?EmqxMqttProperties?emqxMqttProperties;

          ????/**
          ?????*?MQTT的連接
          ?????*/
          ????@Bean
          ????public?MqttConnectOptions?getMqttConnectOptions()?{
          ????????//?設(shè)置相關(guān)的屬性
          ????????MqttConnectOptions?mqttConnectOptions?=?new?MqttConnectOptions();
          ????????mqttConnectOptions.setUserName(emqxMqttProperties.getUsername());
          ????????mqttConnectOptions.setPassword(emqxMqttProperties.getPassword().toCharArray());
          ????????mqttConnectOptions.setServerURIs(new?String[]{emqxMqttProperties.getHostUrl()});
          ????????//?心跳
          ????????mqttConnectOptions.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());
          ????????mqttConnectOptions.setMqttVersion(emqxMqttProperties.getVersion());
          ????????mqttConnectOptions.setConnectionTimeout(emqxMqttProperties.getTimeout());
          ????????//?保留/清空曾經(jīng)連接的客戶端信息
          ????????mqttConnectOptions.setCleanSession(false);
          ????????//?qos
          ????????String?playload?=?"設(shè)備已斷開連接";
          ????????//?遺囑消息
          ????????mqttConnectOptions.setWill("last_topic",?playload.getBytes(),?emqxMqttProperties.getQos(),?false);
          ????????return?mqttConnectOptions;
          ????}

          ????/**
          ?????*?paho?factory,mqtt自定義的連接放入factory工廠中
          ?????*/
          ????@Bean
          ????public?MqttPahoClientFactory?getMqttPahoClientFactory()?{
          ????????DefaultMqttPahoClientFactory?defaultMqttPahoClientFactory?=?new?DefaultMqttPahoClientFactory();
          ????????defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());
          ????????return?defaultMqttPahoClientFactory;
          ????}

          //????/**
          //?????*?開啟連接通道
          //?????*/
          //????@Bean(name?=?Constants.MQTT_PUBLISH_CHANNEL)
          //????public?MessageChannel?getMqttPublishMessageChannel()?{
          //????????DirectChannel?directChannel?=?new?DirectChannel();
          //????????return?directChannel;
          //????}


          ????/**
          ?????*?開啟連接通道
          ?????*/
          ????@Bean(name?=?Constants.MQTT_SUBSCRIBE_CHANNEL)
          ????public?MessageChannel?getMqttSubscribeMessageChannel()?{
          ????????DirectChannel?directChannel?=?new?DirectChannel();
          ????????return?directChannel;
          ????}



          ????/**
          ?????*?監(jiān)聽topic.訂閱者,消費者
          ?????*/
          ????@Bean
          ????public?MessageProducer?inbound()?{
          ????????MqttPahoMessageDrivenChannelAdapter?mqttPahoMessageDrivenChannelAdapter?=?new?MqttPahoMessageDrivenChannelAdapter(
          ????????????????emqxMqttProperties.getClientId()?+?"_wlb",?getMqttPahoClientFactory(),?emqxMqttProperties.getDefaultTopic().split(",")
          ????????);
          ????????mqttPahoMessageDrivenChannelAdapter.setDisconnectCompletionTimeout(emqxMqttProperties.getTimeout());
          ????????mqttPahoMessageDrivenChannelAdapter.setConverter(new?DefaultPahoMessageConverter());
          ????????mqttPahoMessageDrivenChannelAdapter.setQos(emqxMqttProperties.getQos());
          ????????mqttPahoMessageDrivenChannelAdapter.setOutputChannel(getMqttSubscribeMessageChannel());
          ????????return?mqttPahoMessageDrivenChannelAdapter;
          ????}

          ????/**
          ?????*?發(fā)布者,生產(chǎn)者
          ?????*/
          ????@Bean
          ????@ServiceActivator(inputChannel?=?Constants.MQTT_SUBSCRIBE_CHANNEL)
          ????public?MessageHandler?getMessageHandler()?{
          ????????MqttPahoMessageHandler?mqttPahoMessageHandler?=?new?MqttPahoMessageHandler(emqxMqttProperties.getClientId(),getMqttPahoClientFactory());
          ????????mqttPahoMessageHandler.setAsync(true);
          ????????mqttPahoMessageHandler.setDefaultQos(emqxMqttProperties.getQos());
          ????????mqttPahoMessageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());
          ????????return?mqttPahoMessageHandler;
          ????}

          }


          service業(yè)務(wù)類:SubscribeService.java

          package?com.baba.wlb.subscribe.service;

          import?com.baba.wlb.share.common.Constants;
          import?org.springframework.context.annotation.Bean;
          import?org.springframework.integration.annotation.ServiceActivator;
          import?org.springframework.messaging.Message;
          import?org.springframework.messaging.MessageHandler;
          import?org.springframework.messaging.MessagingException;
          import?org.springframework.stereotype.Service;

          /**
          ?*?@Author?wulongbo
          ?*?@Date?2020/12/29?14:11
          ?*?@Version?1.0
          ?*/

          /**
          ?*?訂閱者
          ?*/
          @Service
          public?class?SubscribeService?{

          ????@Bean
          ????@ServiceActivator(inputChannel?=?Constants.MQTT_SUBSCRIBE_CHANNEL)
          ????public?MessageHandler?messageHandler()?{
          ????????MessageHandler?messageHandler?=?new?MessageHandler()?{
          ????????????@Override
          ????????????public?void?handleMessage(Message?message)?throws?MessagingException?{
          ????????????????System.out.println("訂閱者訂閱消息頭是:"?+?message.getHeaders());
          ????????????????System.out.println("訂閱者訂閱消息主體是:"?+?message.getPayload());
          ????????????}
          ????????};
          ????????return?messageHandler;
          ????}
          }


          注:我們把MessageHandler放入了專門的server做業(yè)務(wù)處理,其實放config類也是OK的


          application啟動類:SubscribeApplication.java


          package?com.baba.wlb.subscribe;

          /**
          ?*?@Author?wulongbo
          ?*?@Date?2020/12/29?14:16
          ?*?@Version?1.0
          ?*/

          import?com.baba.wlb.share.properties.EmqxMqttProperties;
          import?org.springframework.boot.SpringApplication;
          import?org.springframework.boot.autoconfigure.SpringBootApplication;
          import?org.springframework.boot.context.properties.EnableConfigurationProperties;

          /**
          ?*?訂閱者啟動類
          ?*/
          @SpringBootApplication
          @EnableConfigurationProperties({EmqxMqttProperties.class})
          public?class?SubscribeApplication?{
          ????public?static?void?main(String[]?args)?{
          ????????SpringApplication.run(SubscribeApplication.class,args);
          ????}
          }


          yml配置文件:application.yml


          server:
          ??port:?1002

          spring:
          ??profiles:
          ????include:?common


          注:當(dāng)然我們上面的publish和subscribe模塊都是依賴于common模塊的我們需要在各個模塊上右擊--Open Model Settings



          并按下圖依次來添加模塊之間的依賴關(guān)系




          最后,我們在分別在 publish和subscribe模塊的pom文件中引入common依賴就Ok了

              

          ????????
          ????????????com.baba.wlb
          ????????????springboot_emqx_common
          ????????????1.0-SNAPSHOT
          ????????

          ????




          至此,我們多模塊用Gateway綁定的方式就集成好了MQTT消息推送和消息訂閱功能。




          啟動項目


          分別啟動PublishApplication和?SubscribeApplication


          端口分別為:1001,1002





          PostMan測試


          打開postman:發(fā)起Get請求


          localhost:1001/publish/emqxPublish?topic=wulongbo_topic&data=我是一條消息


          可以看到我們訂閱者訂閱到了這條消息:



          至于service業(yè)務(wù)模塊對消息的處理:具體是根據(jù)主題來篩選,還是根據(jù)playload來區(qū)分,看具體的業(yè)務(wù)場景和設(shè)計需要。當(dāng)然EMQX 有更解耦的方式就是規(guī)則引擎來對各個事件響應(yīng)動作,也有HTTP API供我們調(diào)用,讀者靈活運用即可。




          點擊左下角閱讀原文,到?SegmentFault 思否社區(qū)?和文章作者展開更多互動和交流。

          -?END -

          瀏覽 49
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产在线精品婷婷 | 人人操人人超碰 | 可以免费看的黄色网址 | A在线大香蕉 | 蜜桃Av久久精品人人槡 |