Netty實戰(zhàn),Springboot + netty +websocket 實現(xiàn)推送消息
你知道的越多,不知道的就越多,業(yè)余的像一棵小草!
你來,我們一起精進!你不來,我和你的競爭對手一起精進!
編輯:業(yè)余草
blog.csdn.net/weixin_44912855
推薦:https://www.xttblog.com/?p=5315
學過 Netty 的都知道,Netty 對 NIO 進行了很好的封裝,簡單的 API,龐大的開源社區(qū)。深受廣大程序員喜愛。基于此本文分享一下基礎的 netty 使用。實戰(zhàn)制作一個 Netty + websocket 的消息推送小栗子。
netty服務器
@Component
public?class?NettyServer?{
????static?final?Logger?log?=?LoggerFactory.getLogger(NettyServer.class);
????/**
?????*?端口號
?????*/
????@Value("${webSocket.netty.port:8888}")
????int?port;
????EventLoopGroup?bossGroup;
????EventLoopGroup?workGroup;
????@Autowired
????ProjectInitializer?nettyInitializer;
????@PostConstruct
????public?void?start()?throws?InterruptedException?{
????????new?Thread(()?->?{
????????????bossGroup?=?new?NioEventLoopGroup();
????????????workGroup?=?new?NioEventLoopGroup();
????????????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
????????????//?bossGroup輔助客戶端的tcp連接請求,?workGroup負責與客戶端之前的讀寫操作
????????????bootstrap.group(bossGroup,?workGroup);
????????????//?設置NIO類型的channel
????????????bootstrap.channel(NioServerSocketChannel.class);
????????????//?設置監(jiān)聽端口
????????????bootstrap.localAddress(new?InetSocketAddress(port));
????????????//?設置管道
????????????bootstrap.childHandler(nettyInitializer);
????????????//?配置完成,開始綁定server,通過調(diào)用sync同步方法阻塞直到綁定成功
????????????ChannelFuture?channelFuture?=?null;
????????????try?{
????????????????channelFuture?=?bootstrap.bind().sync();
????????????????log.info("Server?started?and?listen?on:{}",?channelFuture.channel().localAddress());
????????????????//?對關閉通道進行監(jiān)聽
????????????????channelFuture.channel().closeFuture().sync();
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????}).start();
????}
????/**
?????*?釋放資源
?????*/
????@PreDestroy
????public?void?destroy()?throws?InterruptedException?{
????????if?(bossGroup?!=?null)?{
????????????bossGroup.shutdownGracefully().sync();
????????}
????????if?(workGroup?!=?null)?{
????????????workGroup.shutdownGracefully().sync();
????????}
????}
}
Netty配置
管理全局Channel以及用戶對應的channel(推送消息)
public?class?NettyConfig?{
????/**
?????*?定義全局單利channel組?管理所有channel
?????*/
????private?static?volatile?ChannelGroup?channelGroup?=?null;
????/**
?????*?存放請求ID與channel的對應關系
?????*/
????private?static?volatile?ConcurrentHashMap?channelMap?=?null;
????/**
?????*?定義兩把鎖
?????*/
????private?static?final?Object?lock1?=?new?Object();
????private?static?final?Object?lock2?=?new?Object();
????public?static?ChannelGroup?getChannelGroup()?{
????????if?(null?==?channelGroup)?{
????????????synchronized?(lock1)?{
????????????????if?(null?==?channelGroup)?{
????????????????????channelGroup?=?new?DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
????????????????}
????????????}
????????}
????????return?channelGroup;
????}
????public?static?ConcurrentHashMap?getChannelMap()? {
????????if?(null?==?channelMap)?{
????????????synchronized?(lock2)?{
????????????????if?(null?==?channelMap)?{
????????????????????channelMap?=?new?ConcurrentHashMap<>();
????????????????}
????????????}
????????}
????????return?channelMap;
????}
????public?static?Channel?getChannel(String?userId)?{
????????if?(null?==?channelMap)?{
????????????return?getChannelMap().get(userId);
????????}
????????return?channelMap.get(userId);
????}
}
管道配置
@Component
public?class?ProjectInitializer?extends?ChannelInitializer<SocketChannel>?{
????/**
?????*?webSocket協(xié)議名
?????*/
????static?final?String?WEBSOCKET_PROTOCOL?=?"WebSocket";
????/**
?????*?webSocket路徑
?????*/
????@Value("${webSocket.netty.path:/webSocket}")
????String?webSocketPath;
????@Autowired
????WebSocketHandler?webSocketHandler;
????@Override
????protected?void?initChannel(SocketChannel?socketChannel)?throws?Exception?{
????????//?設置管道
????????ChannelPipeline?pipeline?=?socketChannel.pipeline();
????????//?流水線管理通道中的處理程序(Handler),用來處理業(yè)務
????????//?webSocket協(xié)議本身是基于http協(xié)議的,所以這邊也要使用http編解碼器
????????pipeline.addLast(new?HttpServerCodec());
????????pipeline.addLast(new?ObjectEncoder());
????????//?以塊的方式來寫的處理器
????????pipeline.addLast(new?ChunkedWriteHandler());
????????pipeline.addLast(new?HttpObjectAggregator(8192));
????????pipeline.addLast(new?WebSocketServerProtocolHandler(webSocketPath,?WEBSOCKET_PROTOCOL,?true,?65536?*?10));
????????//?自定義的handler,處理業(yè)務邏輯
????????pipeline.addLast(webSocketHandler);
????}
}
自定義handler
@Component
@ChannelHandler.Sharable
public?class?WebSocketHandler?extends?SimpleChannelInboundHandler<TextWebSocketFrame>?{
????private?static?final?Logger?log?=?LoggerFactory.getLogger(NettyServer.class);
????/**
?????*?一旦連接,第一個被執(zhí)行
?????*/
????@Override
????public?void?handlerAdded(ChannelHandlerContext?ctx)?throws?Exception?{
????????log.info("有新的客戶端鏈接:[{}]",?ctx.channel().id().asLongText());
????????//?添加到channelGroup?通道組
????????NettyConfig.getChannelGroup().add(ctx.channel());
????}
????/**
?????*?讀取數(shù)據(jù)
?????*/
????@Override
????protected?void?channelRead0(ChannelHandlerContext?ctx,?TextWebSocketFrame?msg)?throws?Exception?{
????????log.info("服務器收到消息:{}",?msg.text());
????????//?獲取用戶ID,關聯(lián)channel
????????JSONObject?jsonObject?=?JSONUtil.parseObj(msg.text());
????????String?uid?=?jsonObject.getStr("uid");
????????NettyConfig.getChannelMap().put(uid,?ctx.channel());
????????//?將用戶ID作為自定義屬性加入到channel中,方便隨時channel中獲取用戶ID
????????AttributeKey?key?=?AttributeKey.valueOf("userId");
????????ctx.channel().attr(key).setIfAbsent(uid);
????????//?回復消息
????????ctx.channel().writeAndFlush(new?TextWebSocketFrame("服務器收到消息啦"));
????}
????@Override
????public?void?handlerRemoved(ChannelHandlerContext?ctx)?throws?Exception?{
????????log.info("用戶下線了:{}",?ctx.channel().id().asLongText());
????????//?刪除通道
????????NettyConfig.getChannelGroup().remove(ctx.channel());
????????removeUserId(ctx);
????}
????@Override
????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
????????log.info("異常:{}",?cause.getMessage());
????????//?刪除通道
????????NettyConfig.getChannelGroup().remove(ctx.channel());
????????removeUserId(ctx);
????????ctx.close();
????}
????/**
?????*?刪除用戶與channel的對應關系
?????*/
????private?void?removeUserId(ChannelHandlerContext?ctx)?{
????????AttributeKey?key?=?AttributeKey.valueOf("userId");
????????String?userId?=?ctx.channel().attr(key).get();
????????NettyConfig.getChannelMap().remove(userId);
????}
}
推送消息接口及實現(xiàn)類
public?interface?PushMsgService?{
????/**
?????*?推送給指定用戶
?????*/
????void?pushMsgToOne(String?userId,?String?msg);
????/**
?????*?推送給所有用戶
?????*/
????void?pushMsgToAll(String?msg);
}
@Service
public?class?PushMsgServiceImpl?implements?PushMsgService?{
????@Override
????public?void?pushMsgToOne(String?userId,?String?msg)?{
????????Channel?channel?=?NettyConfig.getChannel(userId);
????????if?(Objects.isNull(channel))?{
????????????throw?new?RuntimeException("未連接socket服務器");
????????}
????????channel.writeAndFlush(new?TextWebSocketFrame(msg));
????}
????@Override
????public?void?pushMsgToAll(String?msg)?{
????????NettyConfig.getChannelGroup().writeAndFlush(new?TextWebSocketFrame(msg));
????}
}
測試

鏈接服務器


發(fā)送消息


調(diào)用接口,往前端推送消息!


OK!
一個簡單的 netty 小栗子就完成了。如果需要本文完整源碼,加我微信領取!
評論
圖片
表情
