<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringCloud項目:消息中間件中獲取消息

          共 30052字,需瀏覽 61分鐘

           ·

          2021-06-21 21:04

          點擊上方 Java學(xué)習(xí)之道,選擇 設(shè)為星標

          每天18:30點,干貨準時奉上!

          來源: blog.csdn.net/yt812100/article/details/111874857
          作者: 楊桃桃


          Part1外部環(huán)境搭建

          發(fā)送消息到MQ和外部環(huán)境的搭建見 : Springcloud項目發(fā)送消息RabbitMQ以及環(huán)境搭建。

          (注:RabbitMQ是安裝在虛擬機上的)

          Part2依賴注入

          本文不僅導(dǎo)入了上文的amqp依賴坐標還有新的netty依賴坐標

          Part3編寫配置文件(yaml)

          和上文一樣。不變的是這個。注意端口是5672,路徑看rabbitMQ安裝在本機還是虛擬機

          Part4業(yè)務(wù)層邏輯分析

          首先聲明本文的業(yè)務(wù)邏輯。各位讀者可能遇到的業(yè)務(wù)邏輯不一樣,所以寫法會有些許不同。但是大致還是一樣,本文在這先聲明本文在處理消息發(fā)送時候的業(yè)務(wù)邏輯

          業(yè)務(wù)場景:在用戶已經(jīng)關(guān)注了粉絲的情況下,RabbitMQ中已經(jīng)有了用戶的消息隊列。那么我只需要在作者發(fā)布文章的時候或者點贊的時候,將存入進隊列的消息立刻發(fā)送給已經(jīng)登錄的用戶即可。(注:發(fā)送消息參考上文:發(fā)送消息至MQ)

          那么業(yè)務(wù)層的處理首先需要準備一下六個類:那么接下來就詳解每個類的作用。其中業(yè)務(wù)邏輯復(fù)雜的只有監(jiān)聽器類和業(yè)務(wù)邏輯類

          工具類

          “ApplicationContextProvider”:返回一些需要的Bean實例以及上下文對象實例(無需改變)

          package com.tensquare.notice.config;

          import org.springframework.beans.BeansException;
          import org.springframework.context.ApplicationContext;
          import org.springframework.context.ApplicationContextAware;
          import org.springframework.stereotype.Component;

          @Component
          public class ApplicationContextProvider implements ApplicationContextAware {
              /**
               * 上下文對象實例
               */

              private static ApplicationContext applicationContext;

              @Override
              public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
                  this.applicationContext = applicationContext;
              }

              /**
               * 獲取applicationContext
               *
               * @return
               */

              public static ApplicationContext getApplicationContext() {
                  return applicationContext;
              }

              /**
               * 通過name獲取 Bean.
               *
               * @param name
               * @return
               */

              public Object getBean(String name) {
                  return getApplicationContext().getBean(name);
              }

              /**
               * 通過class獲取Bean.
               *
               * @param clazz
               * @param <T>
               * @return
               */

              public <T> getBean(Class<T> clazz) {
                  return getApplicationContext().getBean(clazz);
              }

              /**
               * 通過name,以及Clazz返回指定的Bean
               *
               * @param name
               * @param clazz
               * @param <T>
               * @return
               */

              public <T> getBean(String name, Class<T> clazz) {
                  return getApplicationContext().getBean(name, clazz);
              }
          }

          Nettt服務(wù)類

          “NettyServer”:實現(xiàn)NIO的傳輸模式 --固定寫法,配置端口以及協(xié)議名即可(端口自定義,無需改變)

          package com.tensquare.notice.netty;

          import io.netty.bootstrap.ServerBootstrap;
          import io.netty.channel.Channel;
          import io.netty.channel.ChannelInitializer;
          import io.netty.channel.EventLoopGroup;
          import io.netty.channel.nio.NioEventLoopGroup;
          import io.netty.channel.socket.nio.NioServerSocketChannel;
          import io.netty.handler.codec.http.HttpObjectAggregator;
          import io.netty.handler.codec.http.HttpServerCodec;
          import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

          public class NettyServer {

              /**
               *  啟動netty服務(wù),傳遞一個端口號
               */

              public void start(int port){
                  System.out.println("準備啟動Netty......");
                  //服務(wù)器引導(dǎo)程序
                  ServerBootstrap serverBootstrap = new ServerBootstrap();
                  //用來處理新的連接
                  EventLoopGroup boos = new NioEventLoopGroup();
                  //用來處理業(yè)務(wù)邏輯(讀寫)
                  EventLoopGroup worker = new NioEventLoopGroup();
                  serverBootstrap.group(boos,worker)
                          .channel(NioServerSocketChannel.class)
                          .childHandler(new ChannelInitializer() 
          {
                              @Override
                              protected void initChannel(Channel ch) throws Exception {
                                 //請求消息解碼器
                                  ch.pipeline().addLast(new HttpServerCodec());
                                  //將多個消息轉(zhuǎn)為單一的request或者response對象
                                  ch.pipeline().addLast(new HttpObjectAggregator(65536));
                                  //處理websocket的消息事件(websocket服務(wù)器協(xié)議處理程序)
                                  ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
                                  //創(chuàng)建自己的webscoket處理器,自己用來編寫業(yè)務(wù)邏輯
                                  MyWebSocketHandler myWebSocketHandler = new MyWebSocketHandler();
                                  ch.pipeline().addLast(myWebSocketHandler);
                              }
                          }).bind(port);
              }
          }

          Netty配置類

          “NettyConfig”:NettyConfig是Springcloud項目中的一種配置文件。自動加載。所以會自動開啟線程 因此需要configuration注解以及Bean注解

          package com.tensquare.notice.config;

          import com.tensquare.notice.netty.NettyServer;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;

          @Configuration
          public class NettyConfig {

              @Bean
              public NettyServer createNettyServer(){
                  NettyServer nettyServer = new NettyServer();
                  //啟動netty服務(wù),使用新的線程啟動
                  new Thread(){
                      @Override
                      public void run(){
                          nettyServer.start(1234);
                      }

                  }.start();
                  return nettyServer;
              }
          }

          消息容器配置類:

          “RabbitConfig”類:聲明出需要的消息容器,(注:與后續(xù)的消息監(jiān)聽器相呼應(yīng)。名稱不建議改變)

          package com.tensquare.notice.config;

          import com.tensquare.notice.listener.SysNoticeListener;
          import org.springframework.amqp.rabbit.connection.ConnectionFactory;
          import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;

          //配置類
          @Configuration
          public class RabbitConfig {

              @Bean("sysNoticeContainer")
              public SimpleMessageListenerContainer create(ConnectionFactory connectionFactory){
                  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
                  //使用Channel
                  container.setExposeListenerChannel(true);
                  //設(shè)置自己編寫的監(jiān)聽器
                  container.setMessageListener(new SysNoticeListener());
                  return container;
              }
          }

          通訊處理類

          **“MyWebSocketHandler”**類:也就是MQ和WebSocket進行交互

          • 一:MyWebSocketHandler是用來進行通訊處理的,也就是MQ和WebSocket進行交互(通訊處理類–核心業(yè)務(wù)類)
          • 二:MyWebSocketHandler進行業(yè)務(wù)處理,獲取消息數(shù)量(業(yè)務(wù)場景:獲取到消息數(shù)量即可)
          • 三:MyWebSocketHandler繼承SimpleChannelInboundHandler< TextWebSocketFrame>,重寫channelRead0(ChannelHandlerContext 這個參數(shù)獲取連接,TextWebSocketFrame 這個參數(shù)獲取頁面參數(shù)
          package com.tensquare.notice.netty;

          import com.fasterxml.jackson.databind.ObjectMapper;
          import com.tensquare.entity.Result;
          import com.tensquare.entity.StatusCode;
          import com.tensquare.notice.config.ApplicationContextProvider;
          import io.netty.channel.Channel;
          import io.netty.channel.ChannelHandlerContext;
          import io.netty.channel.SimpleChannelInboundHandler;
          import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
          import org.springframework.amqp.rabbit.core.RabbitAdmin;
          import org.springframework.amqp.rabbit.core.RabbitTemplate;
          import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
          import org.springframework.beans.factory.annotation.Autowired;

          import java.util.HashMap;
          import java.util.Properties;
          import java.util.concurrent.ConcurrentHashMap;

          //核心業(yè)務(wù)類,獲取MQ的消息
          public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame{
              /**
               * 創(chuàng)建對象監(jiān)聽器
               */

              private static ObjectMapper Mapper = new ObjectMapper();
              /**
               * 從Spring容器中獲取消息監(jiān)聽器容器,處理訂閱消息sysNotice
               */

              SimpleMessageListenerContainer sysNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext().getBean("sysNoticeContainer");
              /**
               * 從spring容器中獲取RabbitTemplate
               *
               */

              RabbitTemplate rabbitTemplate = ApplicationContextProvider.getApplicationContext().getBean(RabbitTemplate.class);
             // @Autowired
             // private RabbitTemplate rabbitTemplate;
              /**
               * 存放WebScoket連接Map,根據(jù)用戶ID存放
               */

              public static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>();
              /**
               *用戶請求服務(wù)端,執(zhí)行的方法
               */

              @Override
              protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
                  //約定用戶第一次請求攜帶的數(shù)據(jù):{"userid":"1"}
                  //獲取用戶請求數(shù)據(jù)并解析
                  String json = msg.text();
                  //解析數(shù)據(jù)獲取用戶ID
                  String userId = Mapper.readTree(json).get("userId").asText();
                  //第一次請求的時候需要建立WebScoket連接
                  Channel channel = userChannelMap.get(userId);
                  if (channel==null){
                      //獲取WebScoket連接
                      channel  = ctx.channel();
                      //把連接放到容器中
                      userChannelMap.put(userId,channel);
                  }
                  //只用完成新消息的提醒即可,只需要獲取消息的數(shù)量
                  //獲取RabbitMQ的內(nèi)容,并且發(fā)送給用戶
                  RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
                  //拼接捕獲隊列的名稱
                  String queueName = "article_subscribe_"+userId;
                  //獲取Rabbit的properties容器 (獲取rabbit的屬性容器)
                  Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);
                  //獲取消息數(shù)量
                  int noticeCount = 0;
                  //判斷properties是否不為空
                  if (queueProperties!=null){
                      //如果不為空,獲取消息數(shù)量
                      noticeCount = (int)queueProperties.get("QUEUE_MESSAGE_COUNT");
                  }
                  //----------------------------------
                  //封裝返回的數(shù)據(jù)
                  HashMap countMap = new HashMap();
                  countMap.put("sysNoticeCount",noticeCount);
                  Result result = new Result(true, StatusCode.OK,"查詢成功?。?,countMap);
                  //把數(shù)據(jù)發(fā)送給用戶
                  channel.writeAndFlush(new TextWebSocketFrame(Mapper.writeValueAsString(result)));
                  //把消息從隊列里清空,否則MQ消息監(jiān)聽器會再次消費一次
                  if (noticeCount>0){
                      rabbitAdmin.purgeQueue(queueName,true);
                  }
                  //為用戶的消息隊列通知注冊監(jiān)聽器,便于用戶在線的時候,
                  //一旦有新消息,可以主動推送給用戶,不需要用戶請求服務(wù)器獲取數(shù)據(jù)
                  sysNoticeContainer.addQueueNames(queueName);
              }
          }

          接下來就是關(guān)于這個類的具體解釋了。

          務(wù)必細看。截圖都是從剛剛代碼中截取的。和我發(fā)的源碼是一樣的

          測試參數(shù)是自定義的,真實開發(fā)環(huán)境不會如此 這個其實就是將參數(shù)獲取到,然后以id為標識將連接存入連接容器的過程 其中有一個Result類可以不用定義,本文是作測試用的所以定義了通過管家獲取到消息的數(shù)量發(fā)送消息的代碼那么以上就是關(guān)于整個MyWebSocketHandler類的詳解。

          監(jiān)聽器:

          SysNoticeListener類:判斷用戶是否在線,發(fā)送消息

          package com.tensquare.notice.listener;

          import com.fasterxml.jackson.databind.ObjectMapper;
          import com.rabbitmq.client.Channel;
          import com.tensquare.entity.Result;
          import com.tensquare.entity.StatusCode;
          import com.tensquare.notice.netty.MyWebSocketHandler;
          import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
          import org.springframework.amqp.core.Message;
          import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

          import java.util.HashMap;

          //消息監(jiān)聽器
          public class SysNoticeListener implements ChannelAwareMessageListener {

              private static ObjectMapper MAPPER = new ObjectMapper();

              @Override
              public void onMessage(Message message, Channel channel) throws Exception {
                  //獲取用戶id,可以通過隊列名稱獲取
                  String queueName = message.getMessageProperties().getConsumerQueue();
                  String userId = queueName.substring(queueName.lastIndexOf("_")+1);
                  io.netty.channel.Channel wsChannel = MyWebSocketHandler.userChannelMap.get(userId);
                  //判斷用戶是否在線
                  if (wsChannel!=null){
                      //如果連接不為空,代表用戶在線
                      //封裝返回數(shù)據(jù)
                      HashMap countMap = new HashMap();
                      countMap.put("sysNoticeCount",1);
                      Result result = new Result(true, StatusCode.OK,"查詢成功",countMap);
                      //把數(shù)據(jù)通過WebScoket連接主動推送給用戶
                      wsChannel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
                  }
              }
          }

          這里與RabbitConfig工具類中相對應(yīng) 具體作用如注釋所說。

          測試:

          這里將一個靜態(tài)html頁面用作測試,加載服務(wù)的靜態(tài)資源里面即可

          <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
          <html xmlns="http://www.w3.org/1999/xhtml">
          <head>
              <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
              <title>測試 notice 微服務(wù)與頁面 websocket 交互</title>
          </head>
          <body>
          <h1>
              websocket連接服務(wù)器獲取mq消息測試
          </h1>
          <form onSubmit="return false;">
              <table><tr>
                  <td><span>服務(wù)器地址:</span></td>
                  <td><input type="text" id="serverUrl" value="ws://127.0.0.1:1234/ws" /></td>
                </tr>
                <tr>
                  <td><input type="button" id="action" value="連接服務(wù)器" onClick="connect()" /></td>
                  <td><input type="text" id="connStatus" value="未連接 ......" /></td>
              </tr></table>
              <br />
              <hr color="blue" />
              <div>
                  <div style="width: 50%;float:left;">
                      <div>
                          <table><tr>
                              <td><h3>發(fā)送給服務(wù)端的消息</h3></td>
                              <td><input type="button" value="發(fā)送" onClick="send(this.form.message.value)" /></td>
                          </tr></table>
                      </div>
                      <div><textarea type="text" name="message" style="width:500px;height:300px;">
          {
              "userId":"1"
          }
                          </textarea></div>
                  </div>
                  <div style="width: 50%;float:left;">
                      <div><table>
                          <tr>
                              <td><h3>服務(wù)端返回的應(yīng)答消息</h3></td>
                          </tr>
                      </table></div>
                      <div><textarea id="responseText" name="responseText" style="width: 500px;height: 300px;" onfocus="this.scrollTop = this.scrollHeight ">
          這里顯示服務(wù)器推送的信息
                      </textarea></div>
                  </div>
              </div>

          </form>

          <script type="text/javascript">
              var socket;
              var connStatus = document.getElementById('connStatus');;
              var respText = document.getElementById('responseText');
              var actionBtn = document.getElementById('action');
              var sysCount = 0;
              var userCount = 0;

              function connect({
                  connStatus.value = "正在連接 ......";

                  if(!window.WebSocket){
                      window.WebSocket = window.MozWebSocket;
                  }
                  if(window.WebSocket){

                      socket = new WebSocket("ws://127.0.0.1:1234/ws");

                      socket.onmessage = function(event){
                          respText.scrollTop = respText.scrollHeight;
                          respText.value += "\r\n" + event.data;
                          var sysData = JSON.parse(event.data).data.sysNoticeCount;
                          if(sysData){
                              sysCount = sysCount + parseInt(sysData)
                          }
                          var userData = JSON.parse(event.data).data.userNoticeCount;
                          if(userData){
                              userCount = userCount + parseInt(sysData)
                          }
                          respText.value += "\r\n現(xiàn)在有" + sysCount + "條訂閱新消息";
                          respText.value += "\r\n現(xiàn)在有" + userCount + "條點贊新消息";
                          respText.scrollTop = respText.scrollHeight;
                      };
                      socket.onopen = function(event){
                          respText.value += "\r\nWebSocket 已連接";
                          respText.scrollTop = respText.scrollHeight;

                          connStatus.value = "已連接 O(∩_∩)O";

                          actionBtn.value = "斷開服務(wù)器";
                          actionBtn.onclick =function(){
                              disconnect();
                          };

                      };
                      socket.onclose = function(event){
                          respText.value += "\r\n" + "WebSocket 已關(guān)閉";
                          respText.scrollTop = respText.scrollHeight;

                          connStatus.value = "已斷開";

                          actionBtn.value = "連接服務(wù)器";
                          actionBtn.onclick = function({
                              connect();
                          };
                      };

                  } else {
                      //alert("您的瀏覽器不支持WebSocket協(xié)議!");
                      connStatus.value = "您的瀏覽器不支持WebSocket協(xié)議!";
                  }
              }

              function disconnect({
                  socket.close();
              }

              function send(message){
                  if(!window.WebSocket){return;}
                  if(socket.readyState == WebSocket.OPEN){
                      socket.send(message);
                  }else{
                      alert("WebSocket 連接沒有建立成功!");
                  }
              }
          </script>
          </body>
          </html>

          端口不需要改變。下圖為測試結(jié)果

          可以看到,我多發(fā)送2條文章,由于關(guān)聯(lián)了一個粉絲,所以又多了2條消息 而消息中間件中消息總數(shù)始終為01,因為都以及發(fā)送出去了

          -- END --

           | 更多精彩文章 -




          加我微信,交個朋友
          長按/掃碼添加↑↑↑

          瀏覽 102
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚州sese| 东京热一区二区三区 | 自拍偷拍第一页 | 亚洲成人无码网站 | 好吊视频一区二区三区四区五区六区七区八区 |