<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringBoot+Vue+Websocket開發(fā)聊天系統(tǒng)

          共 9023字,需瀏覽 19分鐘

           ·

          2023-03-10 23:49



          最近在開發(fā)一個(gè)聊天的系統(tǒng),看到了一篇不錯(cuò)的技術(shù)文,如果你對聊天系統(tǒng)開發(fā)也比較感興趣,可以看看今天的文章哦。


          那么今天的話也是帶來這個(gè)非常常用的一個(gè)技術(shù),那就是咱們完成nutty的一個(gè)應(yīng)用,今天的話,我會(huì)介紹地很詳細(xì),這樣的話,拿到這個(gè)博文的代碼就基本上可以按照自己的想法去構(gòu)建自己的一個(gè)在線應(yīng)用了。比如聊天,在線消息推送之類的。


          其實(shí)一開始我原來的想法做在線消息推送是直接mq走起,但是想了想對mq的依賴太高了。而且總感覺不安全,況且還有實(shí)時(shí)在線處理的一些要求,所以的話才覺得切換nutty來做。


          我的構(gòu)想是這樣的:


          在我的構(gòu)想里面的話,基本上除了和客戶端建立的連接之外,會(huì)暴露出我們的一個(gè)服務(wù)器地址和接口。


          其他的業(yè)務(wù)服務(wù),都是通過其他的服務(wù)進(jìn)行調(diào)用后返回的,客戶端和nutty服務(wù)器只是建立長連接,負(fù)責(zé)接收消息,確認(rèn)消息。


          具體的業(yè)務(wù)消息是如何發(fā)送的都是通過其他微服務(wù)的,好處就是確保安全,例如限制用戶的聊天評率(因?yàn)榭赡苁菒阂饽_本)。

          不過的話,我們今天的部分是在這里:



          就是紫色框起來的地方。這部分是基礎(chǔ),也是毛坯房,后面你們可以根據(jù)本文去造自己的房子。


          后端


          首先是我們的服務(wù)后端的搭建,這部分的話其實(shí)可以參考我的這篇文章:實(shí)用水文篇--SpringBoot整合Netty實(shí)現(xiàn)消息推送服務(wù)器


          那么我們這邊只是說說不同的地方,核心的主要的地方。


          項(xiàng)目結(jié)構(gòu)



          這里的話,可以看到我們的這邊的話其實(shí)是和先前的一樣的,其實(shí)沒什么變


          化,區(qū)別在里面:



          這里面我重寫了一下方法,對上次的一些內(nèi)容進(jìn)行了修改,因?yàn)樯洗问敲髦械拿髀铩?/p>


          初始化器


          首先是我們的初始化器,那么在這里的話,我增加了這個(gè)心跳在線的一個(gè)處理。主要是因?yàn)?,?shí)際上,就是說,避免我們的一個(gè)資源的浪費(fèi)嘛。


          public class ServerHandler extends ChannelInitializer<SocketChannel> {
          /** * 初始化通道以及配置對應(yīng)管道的處理器 * @param channel * @throws Exception */ @Override protected void initChannel(SocketChannel channel) throws Exception{
          ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new HttpServerCodec());
          pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(1024*64));
          //===========================增加心跳支持==============================
          /** * 針對客戶端,如果在1分鐘時(shí)間內(nèi)沒有向服務(wù)端發(fā)送讀寫心跳(ALL),則主動(dòng)斷開連接 * 如果有讀空閑和寫空閑,則不做任何處理 */ pipeline.addLast(new IdleStateHandler(8,10,12)); //自定義的空閑狀態(tài)檢測的handler pipeline.addLast(new HeartBeatHandler());

          pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
          //自定義的handler pipeline.addLast(new ServerListenerHandler());

          }}

          對應(yīng)的心跳檢測的實(shí)現(xiàn)類在這里:

          public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
          public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt;//強(qiáng)制類型轉(zhuǎn)化 if(event.state()== IdleState.READER_IDLE){ System.out.println("進(jìn)入讀空閑......"); }else if(event.state() == IdleState.WRITER_IDLE) { System.out.println("進(jìn)入寫空閑......"); }else if(event.state()== IdleState.ALL_IDLE){ System.out.println("channel 關(guān)閉之前:users 的數(shù)量為:"+ UserConnectPool.getChannelGroup().size()); Channel channel = ctx.channel(); //資源釋放 channel.close(); System.out.println("channel 關(guān)閉之后:users 的數(shù)量為:"+UserConnectPool.getChannelGroup().size()); } } }

          }

          服務(wù)類

          之后的話就是我們具體的消息推送,服務(wù)之類的了。

          public class ServerListenerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
          private static final Logger log = LoggerFactory.getLogger(ServerBoot.class);
          @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //獲取客戶端所傳輸?shù)南?/span> String content = msg.text(); //1.獲取客戶端發(fā)來的消息 DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class); assert dataContent != null; System.out.println("----->"+dataContent); Integer action = dataContent.getAction(); Channel channel = ctx.channel();
          //2.判斷消息類型,根據(jù)不同的類型來處理不同的業(yè)務(wù) if(Objects.equals(action, MessageActionEnum.CONNECT.type)){ //2.1 當(dāng)websocket 第一次open的時(shí)候,初始化channel,把用的channel 和 userid 關(guān)聯(lián)起來 String senderId = dataContent.getChatMsg().getSenderId(); UserConnectPool.getChannelMap().put(senderId,channel); //這里是輸出一個(gè)用戶關(guān)系 UserConnectPool.output(); }
          } else if(Objects.equals(action, MessageActionEnum.KEEPALIVE.type)){ //2.4 心跳類型的消息 System.out.println("收到來自channel 為["+channel+"]的心跳包"); }
          }
          @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //接收到請求 log.info("有新的客戶端鏈接:[{}]", ctx.channel().id().asLongText()); UserConnectPool.getChannelGroup().add(ctx.channel()); }
          @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { String chanelId = ctx.channel().id().asShortText(); log.info("客戶端被移除:channel id 為:"+chanelId); UserConnectPool.getChannelGroup().remove(ctx.channel()); }
          @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); //發(fā)生了異常后關(guān)閉連接,同時(shí)從channelgroup移除 ctx.channel().close(); UserConnectPool.getChannelGroup().remove(ctx.channel()); }}

          可以看到這里只保留了兩個(gè)玩意,一個(gè)是把用戶注冊到咱們的這個(gè)nutty服務(wù)器的內(nèi)存里面。還有一個(gè)是心跳包。


          那么其他的數(shù)據(jù)類型什么的,都在整合nutty的那篇博文里面。


          那么我們的聊天怎么處理,很簡單,在Controller接受到消息,然后在那里面調(diào)用Channel完成消息的轉(zhuǎn)發(fā)。


          具體的案例也在那篇nutty的整合里面。


          前端


          那么之后的話,是我們的一個(gè)前端 。


          封裝websocket


          這邊的話對這個(gè)websocket做了一個(gè)封裝,可以在vue、uniapp當(dāng)中使用。我這邊還用到了element-ui主要是來做消息提醒的,你可以選擇刪掉。



          // 導(dǎo)出socket對象export {  socket}import { Message } from 'element-ui'// socket主要對象var socket = {  websock: null,  /**   * 這個(gè)是我們的ws的地址   * */  ws_url: "ws://localhost:9000/ws",  /**   * 開啟標(biāo)識   * */  socket_open: false,  /**   * 心跳timer   * */  hearbeat_timer: null,  /**   * 心跳發(fā)送頻率   * */  hearbeat_interval: 5000,  /**   * 是否開啟重連   * */  is_reonnect: true,  /**   * 重新連接的次數(shù)   * */  reconnect_count: 3,  /**   * 當(dāng)前重新連接的次數(shù),默認(rèn)為:1   * */  reconnect_current: 1,  /**   * 重新連接的時(shí)間類型   * */  reconnect_timer: null,  /**   * 重新連接的間隔   * */  reconnect_interval: 3000,
          /** * 初始化連接 */ init: () => { if (!("WebSocket" in window)) { Message({ message: '當(dāng)前瀏覽器與網(wǎng)站不兼容丫', type: 'error', }); console.log('瀏覽器不支持WebSocket') return null }
          // 已經(jīng)創(chuàng)建過連接不再重復(fù)創(chuàng)建 if (socket.websock) { return socket.websock }
          socket.websock = new WebSocket(socket.ws_url) socket.websock.onmessage = function (e) { socket.receive(e) }
          // 關(guān)閉連接 socket.websock.onclose = function (e) { console.log('連接已斷開') console.log('connection closed (' + e.code + ')') clearInterval(socket.hearbeat_interval) socket.socket_open = false
          // 需要重新連接 if (socket.is_reonnect) { socket.reconnect_timer = setTimeout(() => { // 超過重連次數(shù) if (socket.reconnect_current > socket.reconnect_count) { clearTimeout(socket.reconnect_timer) return }
          // 記錄重連次數(shù) socket.reconnect_current++ socket.reconnect() }, socket.reconnect_interval) } }
          // 連接成功 socket.websock.onopen = function () { Message({ message: '連接成功,歡迎來到WhiteHole', type: 'success', }); console.log('連接成功') socket.socket_open = true socket.is_reonnect = true // 開啟心跳 socket.heartbeat() } // 連接發(fā)生錯(cuò)誤 socket.websock.onerror = function (err) { Message({ message: '服務(wù)連接發(fā)送錯(cuò)誤!', type: 'error', }); console.log('WebSocket連接發(fā)生錯(cuò)誤') } }, /** * 獲取websocket對象 * */
          getSocket:()=>{ //創(chuàng)建了直接返回,反之重來 if (socket.websock) { return socket.websock }else { socket.init(); } },
          getStatus:()=> { if (socket.websock.readyState === 0) { return "未連接"; } else if (socket.websock.readyState === 1) { return "已連接"; } else if (socket.websock.readyState === 2) { return "連接正在關(guān)閉"; } else if (socket.websock.readyState === 3) { return "連接已關(guān)閉"; } },
          /** * 發(fā)送消息 * @param {*} data 發(fā)送數(shù)據(jù) * @param {*} callback 發(fā)送后的自定義回調(diào)函數(shù) */ send: (data, callback = null) => { // 開啟狀態(tài)直接發(fā)送 if (socket.websock.readyState === socket.websock.OPEN) { socket.websock.send(JSON.stringify(data))
          if (callback) { callback() }
          // 正在開啟狀態(tài),則等待1s后重新調(diào)用 } else if (socket.websock.readyState === socket.websock.CONNECTING) { setTimeout(function () { socket.send(data, callback) }, 1000)
          // 未開啟,則等待1s后重新調(diào)用 } else { socket.init() setTimeout(function () { socket.send(data, callback) }, 1000) } },
          /** * 接收消息 * @param {*} message 接收到的消息 */ receive: (message) => { var recData = JSON.parse(message.data) /** *這部分是我們具體的對消息的處理 * */ // 自行擴(kuò)展其他業(yè)務(wù)處理... },
          /** * 心跳 */ heartbeat: () => { console.log('socket', 'ping') if (socket.hearbeat_timer) { clearInterval(socket.hearbeat_timer) }
          socket.hearbeat_timer = setInterval(() => { //發(fā)送心跳包 let data = { "action": 4, "chatMsg": null, "extend": null, } socket.send(data) }, socket.hearbeat_interval) },
          /** * 主動(dòng)關(guān)閉連接 */ close: () => { console.log('主動(dòng)斷開連接') clearInterval(socket.hearbeat_interval) socket.is_reonnect = false socket.websock.close() },
          /** * 重新連接 */ reconnect: () => { console.log('發(fā)起重新連接', socket.reconnect_current)
          if (socket.websock && socket.socket_open) { socket.websock.close() }
          socket.init() },}


          使用


          這個(gè)使用其實(shí)很簡單,我們這邊的話是Vue所以在開啟的時(shí)候就用上了,在我們的這個(gè)App.vue或者是其他的主頁面里面,我這邊是home作為主頁面(App.vue直接展示了home.vue(這個(gè)是你自己編寫的))



          效果


          剛剛的連接效果看到了,那么就來看到這個(gè),我們后端的一個(gè)心跳:


          可以看到以前正常。之后的話,拿著這套毛坯房就可以happy了。

          作者:the_way_inf
          鏈接:https://juejin.cn/post/7158792679466205221


          ---END---



          瀏覽 63
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产精品一区二区久久 | 欧美日韩爽 | 狼人国产香蕉在线 | 亚洲第一福利在线久 | 久久久久久久精 |