Spring整合Netty、WebSocket的互聯(lián)網(wǎng)聊天系統(tǒng)
作者:Kanarien
來源:SegmentFault 思否社區(qū)
前言
最近一段時間在學(xué)習(xí)Netty網(wǎng)絡(luò)框架,又趁著計算機網(wǎng)絡(luò)的課程設(shè)計,決定以Netty為核心,以WebSocket為應(yīng)用層通信協(xié)議做一個互聯(lián)網(wǎng)聊天系統(tǒng),整體而言就像微信網(wǎng)頁版一樣,但考慮到這個聊天系統(tǒng)的功能非常多,因此只打算實現(xiàn)核心的聊天功能,包括單發(fā)、群發(fā)、文件發(fā)送,然后把項目與Spring整合做成開源、可拓展的方式,給大家參考、討論、使用,歡迎大家的指點。
關(guān)于Netty
Netty 是一個利用 Java 的高級網(wǎng)絡(luò)的能力,隱藏其背后的復(fù)雜性而提供一個易于使用的 API 的客戶端/服務(wù)器框架。
這里借用《Essential Netty In Action》的一句話來簡單介紹Netty,詳細的可參考閱讀該書的電子版
Essential Netty in Action 《Netty 實戰(zhàn)(精髓)》: https://legacy.gitbook.com/book/waylau/essential-netty-in-action/details
關(guān)于WebSocket通信協(xié)議
簡單說一下WebSocket通信協(xié)議,WebSocket是為了解決HTTP協(xié)議中通信只能由客戶端發(fā)起這個弊端而出現(xiàn)的,WebSocket基于HTTP5協(xié)議,借用HTTP進行握手、升級,能夠做到輕量的、高效的、雙向的在客戶端和服務(wù)端之間傳輸文本數(shù)據(jù)。詳細可參考以下文章:
WebSocket 是什么原理?為什么可以實現(xiàn)持久連接?: https://www.zhihu.com/question/20215561 WebSocket 教程 - 阮一峰的網(wǎng)絡(luò)日志: http://www.ruanyifeng.com/blog/2017/05/websocket.html
1. 技術(shù)準備
操作平臺:Windows 7, 64bit 8G IDE:MyEclipse 2016 JDK版本:1.8.0_121 瀏覽器:谷歌瀏覽器、360瀏覽器(極速模式)(涉及網(wǎng)頁前端設(shè)計,后端開發(fā)表示很苦悶) 涉及技術(shù): Netty 4 WebSocket + HTTP Spring MVC + Spring JQuery Bootstrap 3 + Bootstrap-fileinput Maven 3.5 Tomcat 8.0
2. 整體說明
2.1 設(shè)計思想
整個通信系統(tǒng)以Tomcat作為核心服務(wù)器運行,其下另開一個線程運行Netty WebSocket服務(wù)器,Tomcat服務(wù)器主要處理客戶登錄、個人信息管理等的HTTP類型請求(通常的業(yè)務(wù)類型),端口為8080,Netty WebSockt服務(wù)器主要處理用戶消息通信的WebSocket類型請求,端口為3333。用戶通過瀏覽器登錄后,瀏覽器會維持一個Session對象(有效時間30分鐘)來保持登錄狀態(tài),Tomcat服務(wù)器會返回用戶的個人信息,同時記錄在線用戶,根據(jù)用戶id建立一條WebSocket連接并保存在后端以便進行實時通信。當(dāng)一個用戶向另一用戶發(fā)起通信,服務(wù)器會根據(jù)消息內(nèi)容中的對話方用戶id,找到保存的WebSocket連接,通過該連接發(fā)送消息,對方就能夠收到即時收到消息。當(dāng)用戶注銷或退出時,釋放WebSocket連接,清空Session對象中的登錄狀態(tài)。
事實上Netty也可以用作一個HTTP服務(wù)器,而這里使用Spring MVC處理HTTP請求是出于熟悉的緣故,也比較接近傳統(tǒng)開發(fā)的方式。
2.2 系統(tǒng)結(jié)構(gòu)
系統(tǒng)采用B/S(Browser/Server),即瀏覽器/服務(wù)器的結(jié)構(gòu),主要事務(wù)邏輯在服務(wù)器端(Server)實現(xiàn)。借鑒MVC模式的思想,從上至下具體又分為視圖層(View)、控制層(Controller)、業(yè)務(wù)層(Service)、模型層(Model)、數(shù)據(jù)訪問層(Data Access)
2.3 項目結(jié)構(gòu)
項目后端結(jié)構(gòu):

項目前端結(jié)構(gòu):

2.4 系統(tǒng)功能模塊
系統(tǒng)只包括兩個模塊:登錄模塊和聊天管理模塊。
登錄模塊:既然作為一個系統(tǒng),那么登錄的角色認證是必不可少的,這里使用簡單、傳統(tǒng)的Session方式維持登錄狀態(tài),當(dāng)然也有對應(yīng)的注銷功能,但這里的注銷除了清空Session對象,還要釋放WebSocket連接,否則造成內(nèi)存泄露。 聊天管理模塊:系統(tǒng)的核心模塊,這部分主要使用Netty框架實現(xiàn),功能包括信息、文件的單條和多條發(fā)送,也支持表情發(fā)送。 其他模塊:如好友管理模塊、聊天記錄管理、注冊模塊等,我并沒有實現(xiàn),有興趣的話可以自行實現(xiàn),與傳統(tǒng)的開發(fā)方式類似。

到這里,可能會有人出現(xiàn)疑問了,首先是前面的涉及技術(shù)中沒有ORM框架(Mybatis或Hibernate),這里又沒有實現(xiàn)好友管理的功能,那用戶如何確定自己的好友并發(fā)送信息呢?
其實,這里我在dao層的實現(xiàn)里并沒有連接數(shù)據(jù)庫,而是用硬編碼的方式固定好系統(tǒng)的用戶以及用戶的好友表、群組表,之所以這么做是因為當(dāng)初考慮到這個課程設(shè)計要是連接數(shù)據(jù)庫就太麻煩了光是已有的模塊(包括前后端)就差不多3k行代碼了,時間上十分劃不來,于是就用了硬編碼的方式偷懶,后面會再說明系統(tǒng)用戶的情況。
2.5 用戶狀態(tài)轉(zhuǎn)換圖
由于本系統(tǒng)涉及多個用戶狀態(tài),有必要進行說明,下面給出本系統(tǒng)的用戶狀態(tài)轉(zhuǎn)換圖。

2.6 系統(tǒng)界面
系統(tǒng)聊天界面如下:


3. 核心編碼
3.1 Netty WebSocket服務(wù)端
3.1.1 Netty服務(wù)器啟動與關(guān)閉
不得不說的是,當(dāng)關(guān)閉Tomcat服務(wù)器時,也要釋放Netty相關(guān)資源,否則會造成內(nèi)存泄漏,關(guān)閉方法如下面的close(),如果只是使用shutdownGracefully()方法的話,關(guān)閉時會報內(nèi)存泄露Memory Leak異常(但IDE可能來不及輸出到控制臺)
/**
?*?描述:?Netty?WebSocket服務(wù)器
?*??????使用獨立的線程啟動
?*?@author?Kanarien
?*?@version?1.0
?*?@date?2018年5月18日?上午11:22:51
?*/
public?class?WebSocketServer?implements?Runnable{
????private?final?Logger?logger?=?LoggerFactory.getLogger(WebSocketServer.class);
????@Autowired
????private?EventLoopGroup?bossGroup;
????@Autowired
????private?EventLoopGroup?workerGroup;
????@Autowired
????private?ServerBootstrap?serverBootstrap;
????private?int?port;
????private?ChannelHandler?childChannelHandler;
????private?ChannelFuture?serverChannelFuture;
??//?構(gòu)造方法少了會報錯
????public?WebSocketServer()?{}
????@Override
????public?void?run()?{
????????build();
????}
????/**
?????*?描述:啟動Netty Websocket服務(wù)器
?????*/
????public?void?build()?{
????????try?{
??????long?begin?=?System.currentTimeMillis();
????serverBootstrap.group(bossGroup,?workerGroup)?//boss輔助客戶端的tcp連接請求??worker負責(zé)與客戶端之前的讀寫操作
?????????????.channel(NioServerSocketChannel.class)?//配置客戶端的channel類型
?????????????.option(ChannelOption.SO_BACKLOG,?1024)?//配置TCP參數(shù),握手字符串長度設(shè)置
?????????????.option(ChannelOption.TCP_NODELAY,?true)?//TCP_NODELAY算法,盡可能發(fā)送大塊數(shù)據(jù),減少充斥的小塊數(shù)據(jù)
?????????????.childOption(ChannelOption.SO_KEEPALIVE,?true)//開啟心跳包活機制,就是客戶端、服務(wù)端建立連接處于ESTABLISHED狀態(tài),超過2小時沒有交流,機制會被啟動
?????????????.childOption(ChannelOption.RCVBUF_ALLOCATOR,?new?FixedRecvByteBufAllocator(592048))//配置固定長度接收緩存區(qū)分配器
?????????????.childHandler(childChannelHandler);?//綁定I/O事件的處理類,WebSocketChildChannelHandler中定義
????long?end?=?System.currentTimeMillis();
????????????logger.info("Netty?Websocket服務(wù)器啟動完成,耗時?"?+?(end?-?begin)?+?"?ms,已綁定端口?"?+?port?+?"?阻塞式等候客戶端連接");
????????????serverChannelFuture?=?serverBootstrap.bind(port).sync();
????????}?catch?(Exception?e)?{
????????????logger.info(e.getMessage());
????????????bossGroup.shutdownGracefully();
????????????workerGroup.shutdownGracefully();
????????????e.printStackTrace();
????????}
????}
??/**
?????*?描述:關(guān)閉Netty Websocket服務(wù)器,主要是釋放連接
?????*?????連接包括:服務(wù)器連接serverChannel,
?????*?????客戶端TCP處理連接bossGroup,
?????*?????客戶端I/O操作連接workerGroup
?????*
?????*?????若只使用
?????*?????????bossGroupFuture?=?bossGroup.shutdownGracefully();
?????*?????????workerGroupFuture?=?workerGroup.shutdownGracefully();
?????*?????會造成內(nèi)存泄漏。
?????*/
????public?void?close(){
????????serverChannelFuture.channel().close();
????????Future>?bossGroupFuture?=?bossGroup.shutdownGracefully();
????????Future>?workerGroupFuture?=?workerGroup.shutdownGracefully();
????????try?{
????????????bossGroupFuture.await();
????????????workerGroupFuture.await();
????????}?catch?(InterruptedException?ignore)?{
????????????ignore.printStackTrace();
????????}
????}
????public?ChannelHandler?getChildChannelHandler()?{
????????return?childChannelHandler;
????}
????public?void?setChildChannelHandler(ChannelHandler?childChannelHandler)?{
????????this.childChannelHandler?=?childChannelHandler;
????}
????public?int?getPort()?{
????????return?port;
????}
????public?void?setPort(int?port)?{
????????this.port?=?port;
????}
}
3.1.2 Netty服務(wù)器處理鏈
獨立出處理器鏈類,方便修改與注入,免得混在一起顯得混亂。
@Component
public?class?WebSocketChildChannelHandler?extends?ChannelInitializer{
????@Resource(name?=?"webSocketServerHandler")
????private?ChannelHandler?webSocketServerHandler;
????@Resource(name?=?"httpRequestHandler")
????private?ChannelHandler?httpRequestHandler;
????@Override
????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
????????ch.pipeline().addLast("http-codec",?new?HttpServerCodec());?//?HTTP編碼解碼器
????????ch.pipeline().addLast("aggregator",?new?HttpObjectAggregator(65536));?//?把HTTP頭、HTTP體拼成完整的HTTP請求
????????ch.pipeline().addLast("http-chunked",?new?ChunkedWriteHandler());?//?分塊,方便大文件傳輸,不過實質(zhì)上都是短的文本數(shù)據(jù)
????????ch.pipeline().addLast("http-handler",?httpRequestHandler);
????????ch.pipeline().addLast("websocket-handler",webSocketServerHandler);
????}
}
3.1.3 Netty服務(wù)器HTTP請求處理器
值得一提的是,當(dāng)在處理鏈中使用Spring注入處理器的bean的時候,如果處理器類不使用@Sharable標(biāo)簽的話,會出現(xiàn)錯誤。如果不使用Spring注入bean的方式,那么應(yīng)該new一個新的處理器對象,如ch.pipeline().addLast("http-handler", new HttpRequestHandler())。另外,判斷HTTP請求還是WebSocket請求的方式稍微不太優(yōu)雅,但我按照《Essential Netty in Action》中的方法去試,結(jié)果有問題的,只好用下面的if語句判斷。
@Component
@Sharable
public?class?HttpRequestHandler?extends?SimpleChannelInboundHandler
3.1.4 Netty服務(wù)器WebSocket請求處理器
考慮到規(guī)范性與可維護性,switch語句中的case常量應(yīng)該放在常量類中聲明比較好。另外說下群發(fā)的邏輯(屬于業(yè)務(wù)邏輯,這里沒有給出代碼),群發(fā)也就是在一個群中發(fā)言,后端會掃描群中在線的用戶,逐一發(fā)送信息。用戶的WebSocket連接(即ChannelHandlerContext對象),會保存在全局變量onlineUserMap中,以用戶id作鍵,方便操作連接。關(guān)于表情的發(fā)送邏輯,與單發(fā)邏輯相同,不同的是發(fā)送內(nèi)容為對應(yīng)的img標(biāo)簽字符串。
@Component
@Sharable
public?class?WebSocketServerHandler?extends?SimpleChannelInboundHandler?{
????private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(WebSocketServerHandler.class);
????@Autowired
????private?ChatService?chatService;
????/**
?????*?描述:讀取完連接的消息后,對消息進行處理。
?????*??????這里主要是處理WebSocket請求
?????*/
????@Override
????protected?void?channelRead0(ChannelHandlerContext?ctx,?WebSocketFrame?msg)?throws?Exception?{
????????handlerWebSocketFrame(ctx,?msg);
????}
????/**
?????*?描述:處理WebSocketFrame
?????*?@param?ctx
?????*?@param?frame
?????*?@throws?Exception
?????*/
????private?void?handlerWebSocketFrame(ChannelHandlerContext?ctx,?WebSocketFrame?frame)?throws?Exception?{
????????//?關(guān)閉請求
????????if?(frame?instanceof?CloseWebSocketFrame)?{
????????????WebSocketServerHandshaker?handshaker?=
????????????????????Constant.webSocketHandshakerMap.get(ctx.channel().id().asLongText());
????????????if?(handshaker?==?null)?{
????????????????sendErrorMessage(ctx,?"不存在的客戶端連接!");
????????????}?else?{
????????????????handshaker.close(ctx.channel(),?(CloseWebSocketFrame)?frame.retain());
????????????}
????????????return;
????????}
????????//?ping請求
????????if?(frame?instanceof?PingWebSocketFrame)?{
????????????ctx.channel().write(new?PongWebSocketFrame(frame.content().retain()));
????????????return;
????????}
????????//?只支持文本格式,不支持二進制消息
????????if?(!(frame?instanceof?TextWebSocketFrame))?{
????????????sendErrorMessage(ctx,?"僅支持文本(Text)格式,不支持二進制消息");
????????}
????????//?客服端發(fā)送過來的消息
????????String?request?=?((TextWebSocketFrame)frame).text();
????????LOGGER.info("服務(wù)端收到新信息:"?+?request);
????????JSONObject?param?=?null;
????????try?{
????????????param?=?JSONObject.parseObject(request);
????????}?catch?(Exception?e)?{
????????????sendErrorMessage(ctx,?"JSON字符串轉(zhuǎn)換出錯!");
????????????e.printStackTrace();
????????}
????????if?(param?==?null)?{
????????????sendErrorMessage(ctx,?"參數(shù)為空!");
????????????return;
????????}
????????String?type?=?(String)?param.get("type");
????????switch?(type)?{
????????????case?"REGISTER":
????????????????chatService.register(param,?ctx);
????????????????break;
????????????case?"SINGLE_SENDING":
????????????????chatService.singleSend(param,?ctx);
????????????????break;
????????????case?"GROUP_SENDING":
????????????????chatService.groupSend(param,?ctx);
????????????????break;
????????????case?"FILE_MSG_SINGLE_SENDING":
????????????????chatService.FileMsgSingleSend(param,?ctx);
????????????????break;
????????????case?"FILE_MSG_GROUP_SENDING":
????????????????chatService.FileMsgGroupSend(param,?ctx);
????????????????break;
????????????default:
????????????????chatService.typeError(ctx);
????????????????break;
????????}
????}
????/**
?????*?描述:客戶端斷開連接
?????*/
????@Override
????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
????????chatService.remove(ctx);
????}
????/**
?????*?異常處理:關(guān)閉channel
?????*/
????@Override
????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
????????cause.printStackTrace();
????????ctx.close();
????}
????private?void?sendErrorMessage(ChannelHandlerContext?ctx,?String?errorMsg)?{
????????String?responseJson?=?new?ResponseJson()
????????????????.error(errorMsg)
????????????????.toString();
????????ctx.channel().writeAndFlush(new?TextWebSocketFrame(responseJson));
????}
}
3.1.5 文件上傳
文件上傳的思路是先把文件上傳到服務(wù)器上,再返回給對方文件的url以及相關(guān)信息。文件上傳并沒有使用WebSocket連接來上傳,而是直接使用HTTP連接結(jié)合Spring的接口簡單的實現(xiàn)了,可自行修改實現(xiàn)使用WebSocket連接來上傳文件。另外,文件保存的路徑是http://localhost:8080/WebSocket/UploadFile,如果Tomcat端口不是8080或者想改變存儲路徑的話,請注意修改常量。
@Service
public?class?FileUploadServiceImpl?implements?FileUploadService{
????private?final?static?String?SERVER_URL_PREFIX?=?"http://localhost:8080/WebSocket/";
????private?final?static?String?FILE_STORE_PATH?=?"UploadFile";
????@Override
????public?ResponseJson?upload(MultipartFile?file,?HttpServletRequest?request)?{
????????//?重命名文件,防止重名
????????String?filename?=?getRandomUUID();
????????String?suffix?=?"";
????????String?originalFilename?=?file.getOriginalFilename();
????????String?fileSize?=?FileUtils.getFormatSize(file.getSize());
????????//?截取文件的后綴名
????????if?(originalFilename.contains("."))?{
????????????suffix?=?originalFilename.substring(originalFilename.lastIndexOf("."));
????????}
????????filename?=?filename?+?suffix;
????????String?prefix?=?request.getSession().getServletContext().getRealPath("/")?+?FILE_STORE_PATH;
????????System.out.println("存儲路徑為:"?+?prefix?+?"\\"?+?filename);
????????Path?filePath?=?Paths.get(prefix,?filename);
????????try?{
????????????Files.copy(file.getInputStream(),?filePath);
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????????return?new?ResponseJson().error("文件上傳發(fā)生錯誤!");
????????}
????????return?new?ResponseJson().success()
????????????????.setData("originalFilename",?originalFilename)
????????????????.setData("fileSize",?fileSize)
????????????????.setData("fileUrl",?SERVER_URL_PREFIX?+?FILE_STORE_PATH?+?"\\"?+?filename);
????}
????private?String?getRandomUUID()?{
????????return?UUID.randomUUID().toString().replace("-",?"");
????}
}
3.2 WebSocket客戶端
3.2.1 瀏覽器客戶端代碼
下面只展示核心的websocket連接代碼。補充說明:考慮到瀏覽器的兼容性,經(jīng)測試,建議使用谷歌瀏覽器和360瀏覽器(極速模式),火狐瀏覽器和IE11的界面有點問題。也說明一下,UI設(shè)計的排版是從網(wǎng)上找的,由修改了下,自己嘔心瀝血的用JS補充了動態(tài)功能,包括:
新消息紅標(biāo)簽提醒 新消息置頂 客戶端保存已發(fā)聊天記錄 用戶己方聊天信息靠左,接收信息靠右 聊天信息框的寬度動態(tài)計算
詳細可見chatroom.js文件
4. 效果及操作演示
4.1 登錄操作
登錄入口為:
http://localhost/或?http://localhost/當(dāng)前系統(tǒng)用戶固定為9個,群組1個,包括9人用戶。
用戶1 用戶名:Member001 密碼:001 用戶2 用戶名:Member002 密碼:002
······
用戶9 用戶名:Member009 密碼:009


4.2 聊天演示

4.3 文件上傳演示

5.補充

