Redis 解決 websocket 在分布式場(chǎng)景下 session 共享問題


來源:blog.csdn.net/weixin_45089791/article/
details/118028312
在顯示項(xiàng)目中遇到了一個(gè)問題,需要使用到websocket與小程序建立長(zhǎng)鏈接。由于項(xiàng)目是負(fù)載均衡的,存在項(xiàng)目部署在多臺(tái)機(jī)器上。這樣就會(huì)存在一個(gè)問題,當(dāng)一次請(qǐng)求負(fù)載到第一臺(tái)服務(wù)器時(shí),socketsession在第一臺(tái)服務(wù)器線程上,第二次請(qǐng)求,負(fù)載到第二臺(tái)服務(wù)器上,需要通過id查找當(dāng)前用戶的session時(shí),是查找不到的。

可以看到,由于websocket的session并沒有實(shí)現(xiàn)序列化接口。所以無法將session序列化到redis中。
web的中的httpsession 主要是通過下面的兩個(gè)管理器實(shí)現(xiàn)序列化的。
??org.apache.catalina.session.StandardManager
??org.apache.catalina.session.PersistentManager
StandardManager是Tomcat默認(rèn)使用的,在web應(yīng)用程序關(guān)閉時(shí),對(duì)內(nèi)存中的所有HttpSession對(duì)象進(jìn)行持久化,把他們保存到文件系統(tǒng)中。默認(rèn)的存儲(chǔ)文件為
/work/Catalina/<主機(jī)名>/<應(yīng)用程序名>/sessions.ser
PersistentManager比StandardManager更為靈活,只要某個(gè)設(shè)備提供了實(shí)現(xiàn)org.apache.catalina.Store接口的驅(qū)動(dòng)類,PersistentManager就可以將HttpSession對(duì)象保存到該設(shè)備。

所以spring-session-redis?解決分布場(chǎng)景下的session共享就是將session序列化到redis中間件中,使用filter 加裝飾器模式解決分布式場(chǎng)景httpsession 共享問題。
解決方案
使用消息中間件解決websocket session共享問題。 使用redis的發(fā)布訂閱模式解決
本文使用方式二
使用StringRedisTemplate的convertAndSend方法向指定頻道發(fā)送指定消息:
this.execute((connection)?->?{
??????connection.publish(rawChannel,?rawMessage);
??????return?null;
?},?true);
redis的命令publish channel message
添加一個(gè)監(jiān)聽的容器以及一個(gè)監(jiān)聽器適配器
?@Bean
????RedisMessageListenerContainer?container(RedisConnectionFactory?connectionFactory,?MessageListenerAdapter?listenerAdapter)
????{
????????RedisMessageListenerContainer?container?=?new?RedisMessageListenerContainer();
????????container.setConnectionFactory(connectionFactory);
????????//?可以添加多個(gè)?messageListener,配置不同的交換機(jī)
????????container.addMessageListener(listenerAdapter,?new?PatternTopic(Constants.REDIS_CHANNEL));//?訂閱最新消息頻道
????????return?container;
????}
????@Bean
????MessageListenerAdapter?listenerAdapter(RedisReceiver?receiver)
????{
????????//?消息監(jiān)聽適配器
????????return?new?MessageListenerAdapter(receiver,?"onMessage");
????}
添加消息接收器
/**
?*?消息監(jiān)聽對(duì)象,接收訂閱消息
?*/
@Component
public?class?RedisReceiver?implements?MessageListener?{
????Logger?log?=?LoggerFactory.getLogger(this.getClass());
????@Autowired
????private?WebSocketServer?webSocketServer;
????/**
?????*?處理接收到的訂閱消息
?????*/
????@Override
????public?void?onMessage(Message?message,?byte[]?pattern)
????{
????????String?channel?=?new?String(message.getChannel());//?訂閱的頻道名稱
????????String?msg?=?"";
????????try
????????{
????????????msg?=?new?String(message.getBody(),?Constants.UTF8);//注意與發(fā)布消息編碼一致,否則會(huì)亂碼
????????????if?(!StringUtils.isEmpty(msg)){
????????????????if?(Constants.REDIS_CHANNEL.endsWith(channel))//?最新消息
????????????????{
????????????????????JSONObject?jsonObject?=?JSON.parseObject(msg);
????????????????????webSocketServer.sendMessageByWayBillId(
????????????????????????????Long.parseLong(jsonObject.get(Constants.REDIS_MESSAGE_KEY).toString())
????????????????????????????,jsonObject.get(Constants.REDIS_MESSAGE_VALUE).toString());
????????????????}else{
????????????????????//TODO?其他訂閱的消息處理
????????????????}
????????????}else{
????????????????log.info("消息內(nèi)容為空,不處理。");
????????????}
????????}
????????catch?(Exception?e)
????????{
????????????log.error("處理消息異常:"+e.toString());
????????????e.printStackTrace();
????????}
????}
}
websocket的配置類
/**
?*?@description:?websocket的配置類
?*/
@Configuration
@EnableWebSocket
public?class?WebSocketConfiguration?{
????@Bean
????public?ServerEndpointExporter?serverEndpointExporter()?{
????????return?new?ServerEndpointExporter();
????}
}
添加websocket的服務(wù)組件
@ServerEndpoint("/websocket/{id}")
@Component
public?class?WebSocketServer?{
????private?static?final?long?sessionTimeout?=?600000;
????private?static?final?Logger?log?=?LoggerFactory.getLogger(WebSocketServer.class);
????/**
?????*?當(dāng)前在線連接數(shù)
?????*/
????private?static?AtomicInteger?onlineCount?=?new?AtomicInteger(0);
????/**
?????*?用來存放每個(gè)客戶端對(duì)應(yīng)的?WebSocketServer?對(duì)象
?????*/
????private?static?ConcurrentHashMap?webSocketMap?=?new?ConcurrentHashMap<>();
????/**
?????*?與某個(gè)客戶端的連接會(huì)話,需要通過它來給客戶端發(fā)送數(shù)據(jù)
?????*/
????private?Session?session;
????/**
?????*?接收?id
?????*/
????private?Long?id;
????@Autowired
????private?StringRedisTemplate?template;
????/**
?????*?連接建立成功調(diào)用的方法
?????*/
????@OnOpen
????public?void?onOpen(Session?session,?@PathParam("id")?Long?id)?{
????????session.setMaxIdleTimeout(sessionTimeout);
????????this.session?=?session;
????????this.id?=?id;
????????if?(webSocketMap.containsKey(id))?{
????????????webSocketMap.remove(id);
????????????webSocketMap.put(id,?this);
????????}?else?{
????????????webSocketMap.put(id,?this);
????????????addOnlineCount();
????????}
????????log.info("編號(hào)id:"?+?id?+?"連接,當(dāng)前在線數(shù)為:"?+?getOnlineCount());
????????try?{
????????????sendMessage("連接成功!");
????????}?catch?(IOException?e)?{
????????????log.error("編號(hào)id:"?+?id?+?",網(wǎng)絡(luò)異常!!!!!!");
????????}
????}
????/**
?????*?連接關(guān)閉調(diào)用的方法
?????*/
????@OnClose
????public?void?onClose()?{
????????if?(webSocketMap.containsKey(id))?{
????????????webSocketMap.remove(id);
????????????subOnlineCount();
????????}
????????log.info("編號(hào)id:"?+?id?+?"退出,當(dāng)前在線數(shù)為:"?+?getOnlineCount());
????}
????/**
?????*?收到客戶端消息后調(diào)用的方法
?????*
?????*?@param?message?客戶端發(fā)送過來的消息
?????*/
????@OnMessage
????public?void?onMessage(String?message,?Session?session)?{
????????log.info("編號(hào)id消息:"?+?id?+?",報(bào)文:"?+?message);
????}
????/**
?????*?發(fā)生錯(cuò)誤時(shí)調(diào)用
?????*
?????*?@param?session
?????*?@param?error
?????*/
????@OnError
????public?void?onError(Session?session,?Throwable?error)?{
????????log.error("編號(hào)id錯(cuò)誤:"?+?this.id?+?",原因:"?+?error.getMessage());
????????error.printStackTrace();
????}
????
????/**
?????*?@description:??分布式??使用redis?去發(fā)布消息
?????*?@dateTime:?2021/6/17?10:31
?????*/
????public?void?sendMessage(@NotNull?String?key,String?message)?{
????????String?newMessge=?null;
????????try?{
????????????newMessge?=?new?String(message.getBytes(Constants.UTF8),?Constants.UTF8);
????????}?catch?(UnsupportedEncodingException?e)?{
????????????e.printStackTrace();
????????}
????????Map?map?=?new?HashMap();
????????map.put(Constants.REDIS_MESSAGE_KEY,?key);
????????map.put(Constants.REDIS_MESSAGE_VALUE,?newMessge);
????????template.convertAndSend(Constants.REDIS_CHANNEL,?JSON.toJSONString(map));
????}
????/**
?????*?@description:?單機(jī)使用??外部接口通過指定的客戶id向該客戶推送消息。
?????*?@dateTime:?2021/6/16?17:49
?????*/
????public?void?sendMessageByWayBillId(@NotNull?Long?key,?String?message)?{
????????WebSocketServer?webSocketServer?=?webSocketMap.get(key);
????????if?(!StringUtils.isEmpty(webSocketServer))?{
????????????try?{
????????????????webSocketServer.sendMessage(message);
????????????????log.info("編號(hào)id為:"+key+"發(fā)送消息:"+message);
????????????}?catch?(IOException?e)?{
????????????????e.printStackTrace();
????????????????log.error("編號(hào)id為:"+key+"發(fā)送消息失敗");
????????????}
????????}
????????log.error("編號(hào)id號(hào)為:"+key+"未連接");
????}
????/**
?????*?實(shí)現(xiàn)服務(wù)器主動(dòng)推送
?????*/
????public?void?sendMessage(String?message)?throws?IOException?{
????????this.session.getBasicRemote().sendText(message);
????}
????public?static?synchronized?AtomicInteger?getOnlineCount()?{
????????return?onlineCount;
????}
????public?static?synchronized?void?addOnlineCount()?{
????????WebSocketServer.onlineCount.getAndIncrement();
????}
????public?static?synchronized?void?subOnlineCount()?{
????????WebSocketServer.onlineCount.getAndDecrement();
????}
}
項(xiàng)目結(jié)構(gòu)

將該項(xiàng)目使用三個(gè)端口號(hào)啟動(dòng)三個(gè)服務(wù)

使用下面的這個(gè)網(wǎng)站進(jìn)行演示。
http://www.easyswoole.com/wstool.html

啟動(dòng)兩個(gè)頁面網(wǎng)址分別是:
ws://127.0.0.1:8081/websocket/456
ws://127.0.0.1:8082/websocket/456
使用postman給http://localhost:8080/socket/456?發(fā)送請(qǐng)求

可以看到,我們給8080服務(wù)發(fā)送的消息,我們訂閱的8081和8082 服務(wù)可以也可以使用該編號(hào)進(jìn)行消息的推送。
使用8082服務(wù)發(fā)送這個(gè)消息格式{"KEY":456,"VALUE":"aaaa"} 的消息。其他的服務(wù)也會(huì)收到這個(gè)信息。

以上就是使用redis的發(fā)布訂閱解決websocket 的分布式session 問題。
碼云地址是:https://gitee.com/jack_whh/dcs-websocket-session
歡迎關(guān)注“Java引導(dǎo)者”,我們分享最有價(jià)值的Java的干貨文章,助力您成為有思想的Java開發(fā)工程師!
