<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Netty實戰(zhàn),Springboot + netty +websocket 實現(xiàn)推送消息

          共 6809字,需瀏覽 14分鐘

           ·

          2022-02-19 12:30

          你知道的越多,不知道的就越多,業(yè)余的像一棵小草!

          你來,我們一起精進!你不來,我和你的競爭對手一起精進!

          編輯:業(yè)余草

          blog.csdn.net/weixin_44912855

          推薦:https://www.xttblog.com/?p=5315

          學過 Netty 的都知道,Netty 對 NIO 進行了很好的封裝,簡單的 API,龐大的開源社區(qū)。深受廣大程序員喜愛。基于此本文分享一下基礎的 netty 使用。實戰(zhàn)制作一個 Netty + websocket 的消息推送小栗子。

          netty服務器

          @Component
          public?class?NettyServer?{

          ????static?final?Logger?log?=?LoggerFactory.getLogger(NettyServer.class);

          ????/**
          ?????*?端口號
          ?????*/

          ????@Value("${webSocket.netty.port:8888}")
          ????int?port;

          ????EventLoopGroup?bossGroup;
          ????EventLoopGroup?workGroup;

          ????@Autowired
          ????ProjectInitializer?nettyInitializer;

          ????@PostConstruct
          ????public?void?start()?throws?InterruptedException?{
          ????????new?Thread(()?->?{
          ????????????bossGroup?=?new?NioEventLoopGroup();
          ????????????workGroup?=?new?NioEventLoopGroup();
          ????????????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
          ????????????//?bossGroup輔助客戶端的tcp連接請求,?workGroup負責與客戶端之前的讀寫操作
          ????????????bootstrap.group(bossGroup,?workGroup);
          ????????????//?設置NIO類型的channel
          ????????????bootstrap.channel(NioServerSocketChannel.class);
          ????????????//?設置監(jiān)聽端口
          ????????????bootstrap.localAddress(new?InetSocketAddress(port));
          ????????????//?設置管道
          ????????????bootstrap.childHandler(nettyInitializer);

          ????????????//?配置完成,開始綁定server,通過調(diào)用sync同步方法阻塞直到綁定成功
          ????????????ChannelFuture?channelFuture?=?null;
          ????????????try?{
          ????????????????channelFuture?=?bootstrap.bind().sync();
          ????????????????log.info("Server?started?and?listen?on:{}",?channelFuture.channel().localAddress());
          ????????????????//?對關閉通道進行監(jiān)聽
          ????????????????channelFuture.channel().closeFuture().sync();
          ????????????}?catch?(InterruptedException?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????}).start();
          ????}

          ????/**
          ?????*?釋放資源
          ?????*/

          ????@PreDestroy
          ????public?void?destroy()?throws?InterruptedException?{
          ????????if?(bossGroup?!=?null)?{
          ????????????bossGroup.shutdownGracefully().sync();
          ????????}
          ????????if?(workGroup?!=?null)?{
          ????????????workGroup.shutdownGracefully().sync();
          ????????}
          ????}
          }

          Netty配置

          管理全局Channel以及用戶對應的channel(推送消息)

          public?class?NettyConfig?{

          ????/**
          ?????*?定義全局單利channel組?管理所有channel
          ?????*/

          ????private?static?volatile?ChannelGroup?channelGroup?=?null;

          ????/**
          ?????*?存放請求ID與channel的對應關系
          ?????*/

          ????private?static?volatile?ConcurrentHashMap?channelMap?=?null;

          ????/**
          ?????*?定義兩把鎖
          ?????*/

          ????private?static?final?Object?lock1?=?new?Object();
          ????private?static?final?Object?lock2?=?new?Object();


          ????public?static?ChannelGroup?getChannelGroup()?{
          ????????if?(null?==?channelGroup)?{
          ????????????synchronized?(lock1)?{
          ????????????????if?(null?==?channelGroup)?{
          ????????????????????channelGroup?=?new?DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
          ????????????????}
          ????????????}
          ????????}
          ????????return?channelGroup;
          ????}

          ????public?static?ConcurrentHashMap?getChannelMap()?{
          ????????if?(null?==?channelMap)?{
          ????????????synchronized?(lock2)?{
          ????????????????if?(null?==?channelMap)?{
          ????????????????????channelMap?=?new?ConcurrentHashMap<>();
          ????????????????}
          ????????????}
          ????????}
          ????????return?channelMap;
          ????}

          ????public?static?Channel?getChannel(String?userId)?{
          ????????if?(null?==?channelMap)?{
          ????????????return?getChannelMap().get(userId);
          ????????}
          ????????return?channelMap.get(userId);
          ????}
          }

          管道配置

          @Component
          public?class?ProjectInitializer?extends?ChannelInitializer<SocketChannel>?{

          ????/**
          ?????*?webSocket協(xié)議名
          ?????*/

          ????static?final?String?WEBSOCKET_PROTOCOL?=?"WebSocket";

          ????/**
          ?????*?webSocket路徑
          ?????*/

          ????@Value("${webSocket.netty.path:/webSocket}")
          ????String?webSocketPath;
          ????@Autowired
          ????WebSocketHandler?webSocketHandler;

          ????@Override
          ????protected?void?initChannel(SocketChannel?socketChannel)?throws?Exception?{
          ????????//?設置管道
          ????????ChannelPipeline?pipeline?=?socketChannel.pipeline();
          ????????//?流水線管理通道中的處理程序(Handler),用來處理業(yè)務
          ????????//?webSocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http編解碼器
          ????????pipeline.addLast(new?HttpServerCodec());
          ????????pipeline.addLast(new?ObjectEncoder());
          ????????//?以塊的方式來寫的處理器
          ????????pipeline.addLast(new?ChunkedWriteHandler());
          ????????pipeline.addLast(new?HttpObjectAggregator(8192));
          ????????pipeline.addLast(new?WebSocketServerProtocolHandler(webSocketPath,?WEBSOCKET_PROTOCOL,?true,?65536?*?10));
          ????????//?自定義的handler,處理業(yè)務邏輯
          ????????pipeline.addLast(webSocketHandler);
          ????}
          }

          自定義handler

          @Component
          @ChannelHandler.Sharable
          public?class?WebSocketHandler?extends?SimpleChannelInboundHandler<TextWebSocketFrame>?{
          ????private?static?final?Logger?log?=?LoggerFactory.getLogger(NettyServer.class);

          ????/**
          ?????*?一旦連接,第一個被執(zhí)行
          ?????*/

          ????@Override
          ????public?void?handlerAdded(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????log.info("有新的客戶端鏈接:[{}]",?ctx.channel().id().asLongText());
          ????????//?添加到channelGroup?通道組
          ????????NettyConfig.getChannelGroup().add(ctx.channel());
          ????}

          ????/**
          ?????*?讀取數(shù)據(jù)
          ?????*/

          ????@Override
          ????protected?void?channelRead0(ChannelHandlerContext?ctx,?TextWebSocketFrame?msg)?throws?Exception?{
          ????????log.info("服務器收到消息:{}",?msg.text());

          ????????//?獲取用戶ID,關聯(lián)channel
          ????????JSONObject?jsonObject?=?JSONUtil.parseObj(msg.text());
          ????????String?uid?=?jsonObject.getStr("uid");
          ????????NettyConfig.getChannelMap().put(uid,?ctx.channel());

          ????????//?將用戶ID作為自定義屬性加入到channel中,方便隨時channel中獲取用戶ID
          ????????AttributeKey?key?=?AttributeKey.valueOf("userId");
          ????????ctx.channel().attr(key).setIfAbsent(uid);

          ????????//?回復消息
          ????????ctx.channel().writeAndFlush(new?TextWebSocketFrame("服務器收到消息啦"));
          ????}

          ????@Override
          ????public?void?handlerRemoved(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????log.info("用戶下線了:{}",?ctx.channel().id().asLongText());
          ????????//?刪除通道
          ????????NettyConfig.getChannelGroup().remove(ctx.channel());
          ????????removeUserId(ctx);
          ????}

          ????@Override
          ????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
          ????????log.info("異常:{}",?cause.getMessage());
          ????????//?刪除通道
          ????????NettyConfig.getChannelGroup().remove(ctx.channel());
          ????????removeUserId(ctx);
          ????????ctx.close();
          ????}

          ????/**
          ?????*?刪除用戶與channel的對應關系
          ?????*/

          ????private?void?removeUserId(ChannelHandlerContext?ctx)?{
          ????????AttributeKey?key?=?AttributeKey.valueOf("userId");
          ????????String?userId?=?ctx.channel().attr(key).get();
          ????????NettyConfig.getChannelMap().remove(userId);
          ????}
          }

          推送消息接口及實現(xiàn)類

          public?interface?PushMsgService?{

          ????/**
          ?????*?推送給指定用戶
          ?????*/

          ????void?pushMsgToOne(String?userId,?String?msg);

          ????/**
          ?????*?推送給所有用戶
          ?????*/

          ????void?pushMsgToAll(String?msg);

          }
          @Service
          public?class?PushMsgServiceImpl?implements?PushMsgService?{

          ????@Override
          ????public?void?pushMsgToOne(String?userId,?String?msg)?{
          ????????Channel?channel?=?NettyConfig.getChannel(userId);
          ????????if?(Objects.isNull(channel))?{
          ????????????throw?new?RuntimeException("未連接socket服務器");
          ????????}

          ????????channel.writeAndFlush(new?TextWebSocketFrame(msg));
          ????}

          ????@Override
          ????public?void?pushMsgToAll(String?msg)?{
          ????????NettyConfig.getChannelGroup().writeAndFlush(new?TextWebSocketFrame(msg));
          ????}
          }

          測試

          e849b25c907a76a0a5dcad9a1bcadf63.webp

          鏈接服務器

          b32b604a78a9db2ae16b3747fe29de33.webpc11b76e0670fa7a6051794211cbf1a62.webp

          發(fā)送消息

          3354f52cbd6d0da442eef2d563dbb9bd.webp5f88ffb357e15abe0665c177f4b6740c.webp

          調(diào)用接口,往前端推送消息!

          6eb592e1ace342abfcbb3954384df23b.webpc4bdfdeaf7bf533b0eb6103ec5a16614.webp

          OK!

          一個簡單的 netty 小栗子就完成了。如果需要本文完整源碼,加我微信領取!

          瀏覽 92
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  靠逼网站在线观看 | 韩国啪啪免费视频 | 天堂无码不卡 | 日本黄色日批视频网站 | 男人天堂久草视频 |