<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spring Boot+Netty+Websocket實(shí)現(xiàn)后臺(tái)向前端推送信息

          共 15045字,需瀏覽 31分鐘

           ·

          2022-11-01 10:31

          學(xué)過 Netty 的都知道,Netty 對(duì) NIO 進(jìn)行了很好的封裝,簡單的 API,龐大的開源社區(qū)。深受廣大程序員喜愛?;诖吮疚姆窒硪幌禄A(chǔ)的 netty 使用。實(shí)戰(zhàn)制作一個(gè) Netty + websocket 的消息推送小栗子。

          netty服務(wù)器

          @Component
          public class NettyServer {

              static final Logger log = LoggerFactory.getLogger(NettyServer.class);

              /**
               * 端口號(hào)
               */

              @Value("${webSocket.netty.port:8888}")
              int port;

              EventLoopGroup bossGroup;
              EventLoopGroup workGroup;

              @Autowired
              ProjectInitializer nettyInitializer;

              @PostConstruct
              public void start() throws InterruptedException {
                  new Thread(() -> {
                      bossGroup = new NioEventLoopGroup();
                      workGroup = new NioEventLoopGroup();
                      ServerBootstrap bootstrap = new ServerBootstrap();
                      // bossGroup輔助客戶端的tcp連接請求, workGroup負(fù)責(zé)與客戶端之前的讀寫操作
                      bootstrap.group(bossGroup, workGroup);
                      // 設(shè)置NIO類型的channel
                      bootstrap.channel(NioServerSocketChannel.class);
                      // 設(shè)置監(jiān)聽端口
                      bootstrap.localAddress(new InetSocketAddress(port));
                      // 設(shè)置管道
                      bootstrap.childHandler(nettyInitializer);

                      // 配置完成,開始綁定server,通過調(diào)用sync同步方法阻塞直到綁定成功
                      ChannelFuture channelFuture = null;
                      try {
                          channelFuture = bootstrap.bind().sync();
                          log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
                          // 對(duì)關(guān)閉通道進(jìn)行監(jiān)聽
                          channelFuture.channel().closeFuture().sync();
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }).start();
              }

              /**
               * 釋放資源
               */

              @PreDestroy
              public void destroy() throws InterruptedException {
                  if (bossGroup != null) {
                      bossGroup.shutdownGracefully().sync();
                  }
                  if (workGroup != null) {
                      workGroup.shutdownGracefully().sync();
                  }
              }
          }

          Netty配置

          管理全局Channel以及用戶對(duì)應(yīng)的channel(推送消息)

          public class NettyConfig {

              /**
               * 定義全局單利channel組 管理所有channel
               */

              private static volatile ChannelGroup channelGroup = null;

              /**
               * 存放請求ID與channel的對(duì)應(yīng)關(guān)系
               */

              private static volatile ConcurrentHashMap<String, Channel> channelMap = null;

              /**
               * 定義兩把鎖
               */

              private static final Object lock1 = new Object();
              private static final Object lock2 = new Object();


              public static ChannelGroup getChannelGroup() {
                  if (null == channelGroup) {
                      synchronized (lock1) {
                          if (null == channelGroup) {
                              channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                          }
                      }
                  }
                  return channelGroup;
              }

              public static ConcurrentHashMap<String, Channel> getChannelMap() {
                  if (null == channelMap) {
                      synchronized (lock2) {
                          if (null == channelMap) {
                              channelMap = new ConcurrentHashMap<>();
                          }
                      }
                  }
                  return channelMap;
              }

              public static Channel getChannel(String userId) {
                  if (null == channelMap) {
                      return getChannelMap().get(userId);
                  }
                  return channelMap.get(userId);
              }
          }

          管道配置

          @Component
          public class ProjectInitializer extends ChannelInitializer<SocketChannel{

              /**
               * webSocket協(xié)議名
               */

              static final String WEBSOCKET_PROTOCOL = "WebSocket";

              /**
               * webSocket路徑
               */

              @Value("${webSocket.netty.path:/webSocket}")
              String webSocketPath;
              @Autowired
              WebSocketHandler webSocketHandler;

              @Override
              protected void initChannel(SocketChannel socketChannel) throws Exception {
                  // 設(shè)置管道
                  ChannelPipeline pipeline = socketChannel.pipeline();
                  // 流水線管理通道中的處理程序(Handler),用來處理業(yè)務(wù)
                  // webSocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http編解碼器
                  pipeline.addLast(new HttpServerCodec());
                  pipeline.addLast(new ObjectEncoder());
                  // 以塊的方式來寫的處理器
                  pipeline.addLast(new ChunkedWriteHandler());
                  pipeline.addLast(new HttpObjectAggregator(8192));
                  pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true65536 * 10));
                  // 自定義的handler,處理業(yè)務(wù)邏輯
                  pipeline.addLast(webSocketHandler);
              }
          }

          自定義handler

          @Component
          @ChannelHandler.Sharable
          public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame{
              private static final Logger log = LoggerFactory.getLogger(NettyServer.class);

              /**
               * 一旦連接,第一個(gè)被執(zhí)行
               */

              @Override
              public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
                  log.info("有新的客戶端鏈接:[{}]", ctx.channel().id().asLongText());
                  // 添加到channelGroup 通道組
                  NettyConfig.getChannelGroup().add(ctx.channel());
              }

              /**
               * 讀取數(shù)據(jù)
               */

              @Override
              protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
                  log.info("服務(wù)器收到消息:{}", msg.text());

                  // 獲取用戶ID,關(guān)聯(lián)channel
                  JSONObject jsonObject = JSONUtil.parseObj(msg.text());
                  String uid = jsonObject.getStr("uid");
                  NettyConfig.getChannelMap().put(uid, ctx.channel());

                  // 將用戶ID作為自定義屬性加入到channel中,方便隨時(shí)channel中獲取用戶ID
                  AttributeKey<String> key = AttributeKey.valueOf("userId");
                  ctx.channel().attr(key).setIfAbsent(uid);

                  // 回復(fù)消息
                  ctx.channel().writeAndFlush(new TextWebSocketFrame("服務(wù)器收到消息啦"));
              }

              @Override
              public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
                  log.info("用戶下線了:{}", ctx.channel().id().asLongText());
                  // 刪除通道
                  NettyConfig.getChannelGroup().remove(ctx.channel());
                  removeUserId(ctx);
              }

              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                  log.info("異常:{}", cause.getMessage());
                  // 刪除通道
                  NettyConfig.getChannelGroup().remove(ctx.channel());
                  removeUserId(ctx);
                  ctx.close();
              }

              /**
               * 刪除用戶與channel的對(duì)應(yīng)關(guān)系
               */

              private void removeUserId(ChannelHandlerContext ctx) {
                  AttributeKey<String> key = AttributeKey.valueOf("userId");
                  String userId = ctx.channel().attr(key).get();
                  NettyConfig.getChannelMap().remove(userId);
              }
          }

          推送消息接口及實(shí)現(xiàn)類

          public interface PushMsgService {

              /**
               * 推送給指定用戶
               */

              void pushMsgToOne(String userId, String msg);

              /**
               * 推送給所有用戶
               */

              void pushMsgToAll(String msg);

          }
          @Service
          public class PushMsgServiceImpl implements PushMsgService {

              @Override
              public void pushMsgToOne(String userId, String msg) {
                  Channel channel = NettyConfig.getChannel(userId);
                  if (Objects.isNull(channel)) {
                      throw new RuntimeException("未連接socket服務(wù)器");
                  }

                  channel.writeAndFlush(new TextWebSocketFrame(msg));
              }

              @Override
              public void pushMsgToAll(String msg) {
                  NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
              }
          }

          測試

          圖片

          鏈接服務(wù)器

          圖片

          發(fā)送消息

          圖片


          圖片

          調(diào)用接口,往前端推送消息!

          圖片


          圖片

          OK!

          一個(gè)簡單的 netty 小栗子就完成了。

          來源:https://blog.csdn.net/weixin_44912855

          /article/details/122667977



          最近熬夜給大家準(zhǔn)備了非常全的一套Java一線大廠面試題。全面覆蓋BATJ等一線互聯(lián)網(wǎng)公司的面試題及解答,由BAT一線互聯(lián)網(wǎng)公司大牛帶你深度剖析面試題背后的原理,不僅授你以魚,更授你以漁,為你面試掃除一切障礙。




          資源,怎么領(lǐng)???


          掃二維碼,加我微信,備注:面試題


          一定要備注:面試題,不要急哦,工作忙完后就會(huì)通過!



          瀏覽 69
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天插天天干天天操 | se婷婷 | 久久Y成人电影 | 久久久久久久视频 | 大香蕉福利视频 |