未讀消息,前端與RabbitMQ實(shí)時(shí)消息推送實(shí)踐,賊簡(jiǎn)單~

前幾天粉絲群里有個(gè)小伙伴問過:web 頁(yè)面的未讀消息(小紅點(diǎn))怎么實(shí)現(xiàn)比較簡(jiǎn)單,剛好本周手頭有類似的開發(fā)任務(wù),索性就整理出來供小伙伴們參考,沒準(zhǔn)哪天就能用得上呢。
web 端實(shí)時(shí)消息推送,常用的實(shí)現(xiàn)方式比較多,但萬變不離其宗,底層基本上還是依賴于 websocket,MQTT 協(xié)議也不例外。
RabbitMQ 搭建
RabbitMQ的基礎(chǔ)搭建就不詳細(xì)說了,自行百度一步一步搞問題不大,這里主要說一下兩個(gè)比較重要的配置。
1、開啟 mqtt 協(xié)議
默認(rèn)情況下RabbitMQ是不開啟MQTT 協(xié)議的,所以需要我們手動(dòng)的開啟相關(guān)的插件,而RabbitMQ的MQTT 協(xié)議分為兩種。
第一種 rabbitmq_mqtt 提供與后端服務(wù)交互使用,對(duì)應(yīng)端口1883。
rabbitmq-plugins?enable?rabbitmq_mqtt
第二種 rabbitmq_web_mqtt 提供與前端交互使用,對(duì)應(yīng)端口15675。
rabbitmq-plugins?enable?rabbitmq_web_mqtt?
在 RabbitMQ 管理后臺(tái)看到如下的顯示,就表示MQTT 協(xié)議開啟成功,到這中間件環(huán)境就搭建完畢了。
使用MQTT 協(xié)議默認(rèn)的交換機(jī) Exchange 為 amp.topic,而我們訂閱的主題會(huì)在 Queues 注冊(cè)一個(gè)客戶端隊(duì)列,路由 Routing key 就是我們?cè)O(shè)置的主題。
服務(wù)端消息發(fā)送
web 端實(shí)時(shí)消息推送一般都是單向的推送,前端接收服務(wù)端推送的消息顯示即可,所以就只實(shí)現(xiàn)消息發(fā)送即可。
1、mqtt 客戶端依賴包
引入 spring-integration-mqtt、org.eclipse.paho.client.mqttv3 兩個(gè)工具包實(shí)現(xiàn)
<dependency>
????<groupId>org.springframework.integrationgroupId>
????<artifactId>spring-integration-mqttartifactId>
dependency>
<dependency>
????<groupId>org.eclipse.pahogroupId>
???????<artifactId>org.eclipse.paho.client.mqttv3artifactId>
????<version>1.2.0version>
dependency>
2、消息發(fā)送者
消息的發(fā)送比較簡(jiǎn)單,主要是應(yīng)用到 @ServiceActivator 注解,需要注意messageHandler.setAsync屬性,如果設(shè)置成 false,關(guān)閉異步模式發(fā)送消息時(shí)可能會(huì)阻塞。
@Configuration
public?class?IotMqttProducerConfig?{
????@Autowired
????private?MqttConfig?mqttConfig;
????@Bean
????public?MqttPahoClientFactory?mqttClientFactory()?{
????????DefaultMqttPahoClientFactory?factory?=?new?DefaultMqttPahoClientFactory();
????????factory.setServerURIs(mqttConfig.getServers());
????????return?factory;
????}
????@Bean
????public?MessageChannel?mqttOutboundChannel()?{
????????return?new?DirectChannel();
????}
????@Bean
????@ServiceActivator(inputChannel?=?"iotMqttInputChannel")
????public?MessageHandler?mqttOutbound()?{
????????MqttPahoMessageHandler?messageHandler?=?new?MqttPahoMessageHandler(mqttConfig.getServerClientId(),?mqttClientFactory());
????????messageHandler.setAsync(false);
????????messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
????????return?messageHandler;
????}
}
MQTT 對(duì)外提供發(fā)送消息的 API 時(shí),需要使用 @MessagingGateway 注解,去提供一個(gè)消息網(wǎng)關(guān)代理,參數(shù) defaultRequestChannel 指定發(fā)送消息綁定的channel。
可以實(shí)現(xiàn)三種API接口,payload 為發(fā)送的消息,topic 發(fā)送消息的主題,qos 消息質(zhì)量。
@MessagingGateway(defaultRequestChannel?=?"iotMqttInputChannel")
public?interface?IotMqttGateway?{
????//?向默認(rèn)的?topic?發(fā)送消息
????void?sendMessage2Mqtt(String?payload);
????//?向指定的?topic?發(fā)送消息
????void?sendMessage2Mqtt(String?payload,@Header(MqttHeaders.TOPIC)?String?topic);
????//?向指定的?topic?發(fā)送消息,并指定服務(wù)質(zhì)量參數(shù)
????void?sendMessage2Mqtt(@Header(MqttHeaders.TOPIC)?String?topic,?@Header(MqttHeaders.QOS)?int?qos,?String?payload);
}
前端消息訂閱
前端使用與服務(wù)端對(duì)應(yīng)的工具 paho-mqtt mqttws31.js實(shí)現(xiàn),實(shí)現(xiàn)方式與傳統(tǒng)的 websocket 方式差不多,核心方法 client = new Paho.MQTT.Client 和 各種監(jiān)聽事件,代碼比較簡(jiǎn)潔。
注意:要保證前后端 clientId的全局唯一性,我這里就簡(jiǎn)單用隨機(jī)數(shù)解決了
大鸡巴久久久久久久
|
韩国一级中文无码
|
91乱伦|
99视频一区
|
欧美日韩日日夜夜
|



