SpringCloud項目:消息中間件中獲取消息
點擊上方 Java學(xué)習(xí)之道,選擇 設(shè)為星標
來源: blog.csdn.net/yt812100/article/details/111874857
作者: 楊桃桃
Part1外部環(huán)境搭建
發(fā)送消息到MQ和外部環(huán)境的搭建見 : Springcloud項目發(fā)送消息到RabbitMQ以及環(huán)境搭建。
(注:RabbitMQ是安裝在虛擬機上的)
Part2依賴注入
本文不僅導(dǎo)入了上文的amqp依賴坐標還有新的netty依賴坐標
Part3編寫配置文件(yaml)
和上文一樣。不變的是這個。注意端口是5672,路徑看rabbitMQ安裝在本機還是虛擬機
Part4業(yè)務(wù)層邏輯分析
首先聲明本文的業(yè)務(wù)邏輯。各位讀者可能遇到的業(yè)務(wù)邏輯不一樣,所以寫法會有些許不同。但是大致還是一樣,本文在這先聲明本文在處理消息發(fā)送時候的業(yè)務(wù)邏輯
業(yè)務(wù)場景:在用戶已經(jīng)關(guān)注了粉絲的情況下,RabbitMQ中已經(jīng)有了用戶的消息隊列。那么我只需要在作者發(fā)布文章的時候或者點贊的時候,將存入進隊列的消息立刻發(fā)送給已經(jīng)登錄的用戶即可。(注:發(fā)送消息參考上文:發(fā)送消息至MQ)
那么業(yè)務(wù)層的處理首先需要準備一下六個類:
那么接下來就詳解每個類的作用。其中業(yè)務(wù)邏輯復(fù)雜的只有監(jiān)聽器類和業(yè)務(wù)邏輯類
工具類
“ApplicationContextProvider”:返回一些需要的Bean實例以及上下文對象實例(無需改變)
package com.tensquare.notice.config;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextProvider implements ApplicationContextAware {
/**
* 上下文對象實例
*/
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* 獲取applicationContext
*
* @return
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 通過name獲取 Bean.
*
* @param name
* @return
*/
public Object getBean(String name) {
return getApplicationContext().getBean(name);
}
/**
* 通過class獲取Bean.
*
* @param clazz
* @param <T>
* @return
*/
public <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
/**
* 通過name,以及Clazz返回指定的Bean
*
* @param name
* @param clazz
* @param <T>
* @return
*/
public <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
Nettt服務(wù)類
“NettyServer”:實現(xiàn)NIO的傳輸模式 --固定寫法,配置端口以及協(xié)議名即可(端口自定義,無需改變)
package com.tensquare.notice.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
public class NettyServer {
/**
* 啟動netty服務(wù),傳遞一個端口號
*/
public void start(int port){
System.out.println("準備啟動Netty......");
//服務(wù)器引導(dǎo)程序
ServerBootstrap serverBootstrap = new ServerBootstrap();
//用來處理新的連接
EventLoopGroup boos = new NioEventLoopGroup();
//用來處理業(yè)務(wù)邏輯(讀寫)
EventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap.group(boos,worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
//請求消息解碼器
ch.pipeline().addLast(new HttpServerCodec());
//將多個消息轉(zhuǎn)為單一的request或者response對象
ch.pipeline().addLast(new HttpObjectAggregator(65536));
//處理websocket的消息事件(websocket服務(wù)器協(xié)議處理程序)
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
//創(chuàng)建自己的webscoket處理器,自己用來編寫業(yè)務(wù)邏輯
MyWebSocketHandler myWebSocketHandler = new MyWebSocketHandler();
ch.pipeline().addLast(myWebSocketHandler);
}
}).bind(port);
}
}
Netty配置類
“NettyConfig”:NettyConfig是Springcloud項目中的一種配置文件。自動加載。所以會自動開啟線程 因此需要configuration注解以及Bean注解
package com.tensquare.notice.config;
import com.tensquare.notice.netty.NettyServer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NettyConfig {
@Bean
public NettyServer createNettyServer(){
NettyServer nettyServer = new NettyServer();
//啟動netty服務(wù),使用新的線程啟動
new Thread(){
@Override
public void run(){
nettyServer.start(1234);
}
}.start();
return nettyServer;
}
}
消息容器配置類:
“RabbitConfig”類:聲明出需要的消息容器,(注:與后續(xù)的消息監(jiān)聽器相呼應(yīng)。名稱不建議改變)
package com.tensquare.notice.config;
import com.tensquare.notice.listener.SysNoticeListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//配置類
@Configuration
public class RabbitConfig {
@Bean("sysNoticeContainer")
public SimpleMessageListenerContainer create(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//使用Channel
container.setExposeListenerChannel(true);
//設(shè)置自己編寫的監(jiān)聽器
container.setMessageListener(new SysNoticeListener());
return container;
}
}
通訊處理類
**“MyWebSocketHandler”**類:也就是MQ和WebSocket進行交互
一:MyWebSocketHandler是用來進行通訊處理的,也就是MQ和WebSocket進行交互(通訊處理類–核心業(yè)務(wù)類) 二:MyWebSocketHandler進行業(yè)務(wù)處理,獲取消息數(shù)量(業(yè)務(wù)場景:獲取到消息數(shù)量即可) 三:MyWebSocketHandler繼承SimpleChannelInboundHandler< TextWebSocketFrame>,重寫channelRead0(ChannelHandlerContext 這個參數(shù)獲取連接,TextWebSocketFrame 這個參數(shù)獲取頁面參數(shù)
package com.tensquare.notice.netty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.tensquare.entity.Result;
import com.tensquare.entity.StatusCode;
import com.tensquare.notice.config.ApplicationContextProvider;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
//核心業(yè)務(wù)類,獲取MQ的消息
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 創(chuàng)建對象監(jiān)聽器
*/
private static ObjectMapper Mapper = new ObjectMapper();
/**
* 從Spring容器中獲取消息監(jiān)聽器容器,處理訂閱消息sysNotice
*/
SimpleMessageListenerContainer sysNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext().getBean("sysNoticeContainer");
/**
* 從spring容器中獲取RabbitTemplate
*
*/
RabbitTemplate rabbitTemplate = ApplicationContextProvider.getApplicationContext().getBean(RabbitTemplate.class);
// @Autowired
// private RabbitTemplate rabbitTemplate;
/**
* 存放WebScoket連接Map,根據(jù)用戶ID存放
*/
public static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>();
/**
*用戶請求服務(wù)端,執(zhí)行的方法
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//約定用戶第一次請求攜帶的數(shù)據(jù):{"userid":"1"}
//獲取用戶請求數(shù)據(jù)并解析
String json = msg.text();
//解析數(shù)據(jù)獲取用戶ID
String userId = Mapper.readTree(json).get("userId").asText();
//第一次請求的時候需要建立WebScoket連接
Channel channel = userChannelMap.get(userId);
if (channel==null){
//獲取WebScoket連接
channel = ctx.channel();
//把連接放到容器中
userChannelMap.put(userId,channel);
}
//只用完成新消息的提醒即可,只需要獲取消息的數(shù)量
//獲取RabbitMQ的內(nèi)容,并且發(fā)送給用戶
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
//拼接捕獲隊列的名稱
String queueName = "article_subscribe_"+userId;
//獲取Rabbit的properties容器 (獲取rabbit的屬性容器)
Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);
//獲取消息數(shù)量
int noticeCount = 0;
//判斷properties是否不為空
if (queueProperties!=null){
//如果不為空,獲取消息數(shù)量
noticeCount = (int)queueProperties.get("QUEUE_MESSAGE_COUNT");
}
//----------------------------------
//封裝返回的數(shù)據(jù)
HashMap countMap = new HashMap();
countMap.put("sysNoticeCount",noticeCount);
Result result = new Result(true, StatusCode.OK,"查詢成功?。?,countMap);
//把數(shù)據(jù)發(fā)送給用戶
channel.writeAndFlush(new TextWebSocketFrame(Mapper.writeValueAsString(result)));
//把消息從隊列里清空,否則MQ消息監(jiān)聽器會再次消費一次
if (noticeCount>0){
rabbitAdmin.purgeQueue(queueName,true);
}
//為用戶的消息隊列通知注冊監(jiān)聽器,便于用戶在線的時候,
//一旦有新消息,可以主動推送給用戶,不需要用戶請求服務(wù)器獲取數(shù)據(jù)
sysNoticeContainer.addQueueNames(queueName);
}
}
接下來就是關(guān)于這個類的具體解釋了。
務(wù)必細看。截圖都是從剛剛代碼中截取的。和我發(fā)的源碼是一樣的
測試參數(shù)是自定義的,真實開發(fā)環(huán)境不會如此
這個其實就是將參數(shù)獲取到,然后以id為標識將連接存入連接容器的過程
其中有一個Result類可以不用定義,本文是作測試用的所以定義了
通過管家獲取到消息的數(shù)量
發(fā)送消息的代碼
那么以上就是關(guān)于整個MyWebSocketHandler類的詳解。
監(jiān)聽器:
SysNoticeListener類:判斷用戶是否在線,發(fā)送消息
package com.tensquare.notice.listener;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.tensquare.entity.Result;
import com.tensquare.entity.StatusCode;
import com.tensquare.notice.netty.MyWebSocketHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import java.util.HashMap;
//消息監(jiān)聽器
public class SysNoticeListener implements ChannelAwareMessageListener {
private static ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//獲取用戶id,可以通過隊列名稱獲取
String queueName = message.getMessageProperties().getConsumerQueue();
String userId = queueName.substring(queueName.lastIndexOf("_")+1);
io.netty.channel.Channel wsChannel = MyWebSocketHandler.userChannelMap.get(userId);
//判斷用戶是否在線
if (wsChannel!=null){
//如果連接不為空,代表用戶在線
//封裝返回數(shù)據(jù)
HashMap countMap = new HashMap();
countMap.put("sysNoticeCount",1);
Result result = new Result(true, StatusCode.OK,"查詢成功",countMap);
//把數(shù)據(jù)通過WebScoket連接主動推送給用戶
wsChannel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
}
}
}
這里與RabbitConfig工具類中相對應(yīng)
具體作用如注釋所說。
測試:
這里將一個靜態(tài)html頁面用作測試,加載服務(wù)的靜態(tài)資源里面即可
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>測試 notice 微服務(wù)與頁面 websocket 交互</title>
</head>
<body>
<h1>
websocket連接服務(wù)器獲取mq消息測試
</h1>
<form onSubmit="return false;">
<table><tr>
<td><span>服務(wù)器地址:</span></td>
<td><input type="text" id="serverUrl" value="ws://127.0.0.1:1234/ws" /></td>
</tr>
<tr>
<td><input type="button" id="action" value="連接服務(wù)器" onClick="connect()" /></td>
<td><input type="text" id="connStatus" value="未連接 ......" /></td>
</tr></table>
<br />
<hr color="blue" />
<div>
<div style="width: 50%;float:left;">
<div>
<table><tr>
<td><h3>發(fā)送給服務(wù)端的消息</h3></td>
<td><input type="button" value="發(fā)送" onClick="send(this.form.message.value)" /></td>
</tr></table>
</div>
<div><textarea type="text" name="message" style="width:500px;height:300px;">
{
"userId":"1"
}
</textarea></div>
</div>
<div style="width: 50%;float:left;">
<div><table>
<tr>
<td><h3>服務(wù)端返回的應(yīng)答消息</h3></td>
</tr>
</table></div>
<div><textarea id="responseText" name="responseText" style="width: 500px;height: 300px;" onfocus="this.scrollTop = this.scrollHeight ">
這里顯示服務(wù)器推送的信息
</textarea></div>
</div>
</div>
</form>
<script type="text/javascript">
var socket;
var connStatus = document.getElementById('connStatus');;
var respText = document.getElementById('responseText');
var actionBtn = document.getElementById('action');
var sysCount = 0;
var userCount = 0;
function connect() {
connStatus.value = "正在連接 ......";
if(!window.WebSocket){
window.WebSocket = window.MozWebSocket;
}
if(window.WebSocket){
socket = new WebSocket("ws://127.0.0.1:1234/ws");
socket.onmessage = function(event){
respText.scrollTop = respText.scrollHeight;
respText.value += "\r\n" + event.data;
var sysData = JSON.parse(event.data).data.sysNoticeCount;
if(sysData){
sysCount = sysCount + parseInt(sysData)
}
var userData = JSON.parse(event.data).data.userNoticeCount;
if(userData){
userCount = userCount + parseInt(sysData)
}
respText.value += "\r\n現(xiàn)在有" + sysCount + "條訂閱新消息";
respText.value += "\r\n現(xiàn)在有" + userCount + "條點贊新消息";
respText.scrollTop = respText.scrollHeight;
};
socket.onopen = function(event){
respText.value += "\r\nWebSocket 已連接";
respText.scrollTop = respText.scrollHeight;
connStatus.value = "已連接 O(∩_∩)O";
actionBtn.value = "斷開服務(wù)器";
actionBtn.onclick =function(){
disconnect();
};
};
socket.onclose = function(event){
respText.value += "\r\n" + "WebSocket 已關(guān)閉";
respText.scrollTop = respText.scrollHeight;
connStatus.value = "已斷開";
actionBtn.value = "連接服務(wù)器";
actionBtn.onclick = function() {
connect();
};
};
} else {
//alert("您的瀏覽器不支持WebSocket協(xié)議!");
connStatus.value = "您的瀏覽器不支持WebSocket協(xié)議!";
}
}
function disconnect() {
socket.close();
}
function send(message){
if(!window.WebSocket){return;}
if(socket.readyState == WebSocket.OPEN){
socket.send(message);
}else{
alert("WebSocket 連接沒有建立成功!");
}
}
</script>
</body>
</html>
端口不需要改變。下圖為測試結(jié)果

可以看到,我多發(fā)送2條文章,由于關(guān)聯(lián)了一個粉絲,所以又多了2條消息
而消息中間件中消息總數(shù)始終為01,因為都以及發(fā)送出去了
-
| 更多精彩文章 -
▽加我微信,交個朋友 長按/掃碼添加↑↑↑



