<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Redis 解決 websocket 在分布式場(chǎng)景下 session 共享問題

          共 7359字,需瀏覽 15分鐘

           ·

          2022-02-13 10:55

          來源:blog.csdn.net/weixin_45089791/article/

          details/118028312

          在顯示項(xiàng)目中遇到了一個(gè)問題,需要使用到websocket與小程序建立長(zhǎng)鏈接。由于項(xiàng)目是負(fù)載均衡的,存在項(xiàng)目部署在多臺(tái)機(jī)器上。這樣就會(huì)存在一個(gè)問題,當(dāng)一次請(qǐng)求負(fù)載到第一臺(tái)服務(wù)器時(shí),socketsession在第一臺(tái)服務(wù)器線程上,第二次請(qǐng)求,負(fù)載到第二臺(tái)服務(wù)器上,需要通過id查找當(dāng)前用戶的session時(shí),是查找不到的。

          可以看到,由于websocket的session并沒有實(shí)現(xiàn)序列化接口。所以無法將session序列化到redis中。

          web的中的httpsession 主要是通過下面的兩個(gè)管理器實(shí)現(xiàn)序列化的。

          ??org.apache.catalina.session.StandardManager

          ??org.apache.catalina.session.PersistentManager

          StandardManager是Tomcat默認(rèn)使用的,在web應(yīng)用程序關(guān)閉時(shí),對(duì)內(nèi)存中的所有HttpSession對(duì)象進(jìn)行持久化,把他們保存到文件系統(tǒng)中。默認(rèn)的存儲(chǔ)文件為

          /work/Catalina/<主機(jī)名>/<應(yīng)用程序名>/sessions.ser

          PersistentManager比StandardManager更為靈活,只要某個(gè)設(shè)備提供了實(shí)現(xiàn)org.apache.catalina.Store接口的驅(qū)動(dòng)類,PersistentManager就可以將HttpSession對(duì)象保存到該設(shè)備。

          所以spring-session-redis?解決分布場(chǎng)景下的session共享就是將session序列化到redis中間件中,使用filter 加裝飾器模式解決分布式場(chǎng)景httpsession 共享問題。

          解決方案

          • 使用消息中間件解決websocket session共享問題。
          • 使用redis的發(fā)布訂閱模式解決

          本文使用方式二

          使用StringRedisTemplate的convertAndSend方法向指定頻道發(fā)送指定消息:

          this.execute((connection)?->?{
          ??????connection.publish(rawChannel,?rawMessage);
          ??????return?null;
          ?},?true);

          redis的命令publish channel message

          添加一個(gè)監(jiān)聽的容器以及一個(gè)監(jiān)聽器適配器

          ?@Bean
          ????RedisMessageListenerContainer?container(RedisConnectionFactory?connectionFactory,?MessageListenerAdapter?listenerAdapter)
          ????
          {
          ????????RedisMessageListenerContainer?container?=?new?RedisMessageListenerContainer();
          ????????container.setConnectionFactory(connectionFactory);
          ????????//?可以添加多個(gè)?messageListener,配置不同的交換機(jī)
          ????????container.addMessageListener(listenerAdapter,?new?PatternTopic(Constants.REDIS_CHANNEL));//?訂閱最新消息頻道
          ????????return?container;
          ????}

          ????@Bean
          ????MessageListenerAdapter?listenerAdapter(RedisReceiver?receiver)
          ????
          {
          ????????//?消息監(jiān)聽適配器
          ????????return?new?MessageListenerAdapter(receiver,?"onMessage");
          ????}

          添加消息接收器

          /**
          ?*?消息監(jiān)聽對(duì)象,接收訂閱消息
          ?*/

          @Component
          public?class?RedisReceiver?implements?MessageListener?{
          ????Logger?log?=?LoggerFactory.getLogger(this.getClass());

          ????@Autowired
          ????private?WebSocketServer?webSocketServer;


          ????/**
          ?????*?處理接收到的訂閱消息
          ?????*/

          ????@Override
          ????public?void?onMessage(Message?message,?byte[]?pattern)
          ????
          {
          ????????String?channel?=?new?String(message.getChannel());//?訂閱的頻道名稱
          ????????String?msg?=?"";
          ????????try
          ????????{
          ????????????msg?=?new?String(message.getBody(),?Constants.UTF8);//注意與發(fā)布消息編碼一致,否則會(huì)亂碼
          ????????????if?(!StringUtils.isEmpty(msg)){
          ????????????????if?(Constants.REDIS_CHANNEL.endsWith(channel))//?最新消息
          ????????????????{
          ????????????????????JSONObject?jsonObject?=?JSON.parseObject(msg);
          ????????????????????webSocketServer.sendMessageByWayBillId(
          ????????????????????????????Long.parseLong(jsonObject.get(Constants.REDIS_MESSAGE_KEY).toString())
          ????????????????????????????,jsonObject.get(Constants.REDIS_MESSAGE_VALUE).toString());
          ????????????????}else{
          ????????????????????//TODO?其他訂閱的消息處理
          ????????????????}

          ????????????}else{
          ????????????????log.info("消息內(nèi)容為空,不處理。");
          ????????????}
          ????????}
          ????????catch?(Exception?e)
          ????????{
          ????????????log.error("處理消息異常:"+e.toString());
          ????????????e.printStackTrace();
          ????????}
          ????}
          }

          websocket的配置類

          /**
          ?*?@description:?websocket的配置類
          ?*/

          @Configuration
          @EnableWebSocket
          public?class?WebSocketConfiguration?{

          ????@Bean
          ????public?ServerEndpointExporter?serverEndpointExporter()?{
          ????????return?new?ServerEndpointExporter();
          ????}
          }

          添加websocket的服務(wù)組件

          @ServerEndpoint("/websocket/{id}")
          @Component
          public?class?WebSocketServer?{

          ????private?static?final?long?sessionTimeout?=?600000;

          ????private?static?final?Logger?log?=?LoggerFactory.getLogger(WebSocketServer.class);

          ????/**
          ?????*?當(dāng)前在線連接數(shù)
          ?????*/

          ????private?static?AtomicInteger?onlineCount?=?new?AtomicInteger(0);

          ????/**
          ?????*?用來存放每個(gè)客戶端對(duì)應(yīng)的?WebSocketServer?對(duì)象
          ?????*/

          ????private?static?ConcurrentHashMap?webSocketMap?=?new?ConcurrentHashMap<>();

          ????/**
          ?????*?與某個(gè)客戶端的連接會(huì)話,需要通過它來給客戶端發(fā)送數(shù)據(jù)
          ?????*/

          ????private?Session?session;

          ????/**
          ?????*?接收?id
          ?????*/

          ????private?Long?id;


          ????@Autowired
          ????private?StringRedisTemplate?template;
          ????/**
          ?????*?連接建立成功調(diào)用的方法
          ?????*/

          ????@OnOpen
          ????public?void?onOpen(Session?session,?@PathParam("id")?Long?id)?{
          ????????session.setMaxIdleTimeout(sessionTimeout);
          ????????this.session?=?session;
          ????????this.id?=?id;
          ????????if?(webSocketMap.containsKey(id))?{
          ????????????webSocketMap.remove(id);
          ????????????webSocketMap.put(id,?this);
          ????????}?else?{
          ????????????webSocketMap.put(id,?this);
          ????????????addOnlineCount();
          ????????}
          ????????log.info("編號(hào)id:"?+?id?+?"連接,當(dāng)前在線數(shù)為:"?+?getOnlineCount());
          ????????try?{
          ????????????sendMessage("連接成功!");
          ????????}?catch?(IOException?e)?{
          ????????????log.error("編號(hào)id:"?+?id?+?",網(wǎng)絡(luò)異常!!!!!!");
          ????????}
          ????}

          ????/**
          ?????*?連接關(guān)閉調(diào)用的方法
          ?????*/

          ????@OnClose
          ????public?void?onClose()?{
          ????????if?(webSocketMap.containsKey(id))?{
          ????????????webSocketMap.remove(id);
          ????????????subOnlineCount();
          ????????}
          ????????log.info("編號(hào)id:"?+?id?+?"退出,當(dāng)前在線數(shù)為:"?+?getOnlineCount());
          ????}

          ????/**
          ?????*?收到客戶端消息后調(diào)用的方法
          ?????*
          ?????*?@param?message?客戶端發(fā)送過來的消息
          ?????*/

          ????@OnMessage
          ????public?void?onMessage(String?message,?Session?session)?{

          ????????log.info("編號(hào)id消息:"?+?id?+?",報(bào)文:"?+?message);
          ????}

          ????/**
          ?????*?發(fā)生錯(cuò)誤時(shí)調(diào)用
          ?????*
          ?????*?@param?session
          ?????*?@param?error
          ?????*/

          ????@OnError
          ????public?void?onError(Session?session,?Throwable?error)?{
          ????????log.error("編號(hào)id錯(cuò)誤:"?+?this.id?+?",原因:"?+?error.getMessage());
          ????????error.printStackTrace();
          ????}
          ????

          ????/**
          ?????*?@description:??分布式??使用redis?去發(fā)布消息
          ?????*?@dateTime:?2021/6/17?10:31
          ?????*/

          ????public?void?sendMessage(@NotNull?String?key,String?message)?{
          ????????String?newMessge=?null;
          ????????try?{
          ????????????newMessge?=?new?String(message.getBytes(Constants.UTF8),?Constants.UTF8);
          ????????}?catch?(UnsupportedEncodingException?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????????Map?map?=?new?HashMap();
          ????????map.put(Constants.REDIS_MESSAGE_KEY,?key);
          ????????map.put(Constants.REDIS_MESSAGE_VALUE,?newMessge);
          ????????template.convertAndSend(Constants.REDIS_CHANNEL,?JSON.toJSONString(map));
          ????}

          ????/**
          ?????*?@description:?單機(jī)使用??外部接口通過指定的客戶id向該客戶推送消息。
          ?????*?@dateTime:?2021/6/16?17:49
          ?????*/

          ????public?void?sendMessageByWayBillId(@NotNull?Long?key,?String?message)?{
          ????????WebSocketServer?webSocketServer?=?webSocketMap.get(key);
          ????????if?(!StringUtils.isEmpty(webSocketServer))?{
          ????????????try?{
          ????????????????webSocketServer.sendMessage(message);
          ????????????????log.info("編號(hào)id為:"+key+"發(fā)送消息:"+message);
          ????????????}?catch?(IOException?e)?{
          ????????????????e.printStackTrace();
          ????????????????log.error("編號(hào)id為:"+key+"發(fā)送消息失敗");
          ????????????}
          ????????}
          ????????log.error("編號(hào)id號(hào)為:"+key+"未連接");
          ????}
          ????/**
          ?????*?實(shí)現(xiàn)服務(wù)器主動(dòng)推送
          ?????*/

          ????public?void?sendMessage(String?message)?throws?IOException?{
          ????????this.session.getBasicRemote().sendText(message);
          ????}

          ????public?static?synchronized?AtomicInteger?getOnlineCount()?{
          ????????return?onlineCount;
          ????}

          ????public?static?synchronized?void?addOnlineCount()?{
          ????????WebSocketServer.onlineCount.getAndIncrement();
          ????}

          ????public?static?synchronized?void?subOnlineCount()?{
          ????????WebSocketServer.onlineCount.getAndDecrement();
          ????}
          }

          項(xiàng)目結(jié)構(gòu)

          將該項(xiàng)目使用三個(gè)端口號(hào)啟動(dòng)三個(gè)服務(wù)

          使用下面的這個(gè)網(wǎng)站進(jìn)行演示。

          • http://www.easyswoole.com/wstool.html

          啟動(dòng)兩個(gè)頁面網(wǎng)址分別是:

          • ws://127.0.0.1:8081/websocket/456

          • ws://127.0.0.1:8082/websocket/456

          使用postman給http://localhost:8080/socket/456?發(fā)送請(qǐng)求

          可以看到,我們給8080服務(wù)發(fā)送的消息,我們訂閱的8081和8082 服務(wù)可以也可以使用該編號(hào)進(jìn)行消息的推送。

          使用8082服務(wù)發(fā)送這個(gè)消息格式{"KEY":456,"VALUE":"aaaa"} 的消息。其他的服務(wù)也會(huì)收到這個(gè)信息。

          以上就是使用redis的發(fā)布訂閱解決websocket 的分布式session 問題。

          碼云地址是:https://gitee.com/jack_whh/dcs-websocket-session

          ——————END——————

          歡迎關(guān)注“Java引導(dǎo)者”,我們分享最有價(jià)值的Java的干貨文章,助力您成為有思想的Java開發(fā)工程師!

          瀏覽 106
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黄片无码在线观看 | 91吊逼 | 又大又粗视频 | 免费一级片在线观看 | 日韩AV中文字幕在线免费观看 |