<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          WebSocket 集群解決方案

          共 10886字,需瀏覽 22分鐘

           ·

          2021-10-28 15:48

          點擊關注公眾號,利用碎片時間學習

          問題起因

          最近做項目時遇到了需要多用戶之間通信的問題,涉及到了WebSocket握手請求,以及集群中WebSocket Session共享的問題。

          期間我經過了幾天的研究,總結出了幾個實現分布式WebSocket集群的辦法,從zuul到spring cloud gateway的不同嘗試,總結出了這篇文章,希望能幫助到某些人,并且能一起分享這方面的想法與研究。

          以下是我的場景描述

          • 資源:4臺服務器。其中只有一臺服務器具備ssl認證域名,一臺redis+mysql服務器,兩臺應用服務器(集群)
          • 應用發(fā)布限制條件:由于場景需要,應用場所需要ssl認證的域名才能發(fā)布。因此ssl認證的域名服務器用來當api網關,負責https請求與wss(安全認證的ws)連接。俗稱https卸載,用戶請求https域名服務器(eg:https://oiscircle.com/xxx),但真實訪問到的是http+ip地址的形式。只要網關配置高,能handle多個應用
          • 需求:用戶登錄應用,需要與服務器建立wss連接,不同角色之間可以單發(fā)消息,也可以群發(fā)消息
          • 集群中的應用服務類型:每個集群實例都負責http無狀態(tài)請求服務與ws長連接服務

          系統(tǒng)架構圖

          在我的實現里,每個應用服務器都負責http and ws請求,其實也可以將ws請求建立的聊天模型單獨成立為一個模塊。從分布式的角度來看,這兩種實現類型差不多,但從實現方便性來說,一個應用服務http+ws請求的方式更為方便。下文會有解釋

          本文涉及的技術棧

          • Eureka 服務發(fā)現與注冊
          • Redis Session共享
          • Redis 消息訂閱
          • Spring Boot
          • Zuul 網關
          • Spring Cloud Gateway 網關
          • Spring WebSocket 處理長連接
          • Ribbon 負載均衡
          • Netty 多協(xié)議NIO網絡通信框架
          • Consistent Hash 一致性哈希算法

          相信能走到這一步的人都了解過我上面列舉的技術棧了,如果還沒有,可以先去網上找找入門教程了解一下。下面的內容都與上述技術相關,題主默認大家都了解過了...

          技術可行性分析

          下面我將描述session特性,以及根據這些特性列舉出n個解決分布式架構中處理ws請求的集群方案

          WebSocketSession與HttpSession

          在Spring所集成的WebSocket里面,每個ws連接都有一個對應的session:WebSocketSession,在Spring WebSocket中,我們建立ws連接之后可以通過類似這樣的方式進行與客戶端的通信:

          protected?void?handleTextMessage(WebSocketSession?session,?TextMessage?message)?{
          ???System.out.println("服務器接收到的消息:?"+?message?);
          ???//send?message?to?client
          ???session.sendMessage(new?TextMessage("message"));
          }

          那么問題來了:ws的session無法序列化到redis,因此在集群中,我們無法將所有WebSocketSession都緩存到redis進行session共享。每臺服務器都有各自的session。于此相反的是HttpSession,redis可以支持httpsession共享,但是目前沒有websocket session共享的方案,因此走redis websocket session共享這條路是行不通的。

          有的人可能會想:我可不可以將sessin關鍵信息緩存到redis,集群中的服務器從redis拿取session關鍵信息然后重新構建websocket session...我只想說這種方法如果有人能試出來,請告訴我一聲...

          以上便是websocket session與http session共享的區(qū)別,總的來說就是http session共享已經有解決方案了,而且很簡單,只要引入相關依賴:spring-session-data-redisspring-boot-starter-redis,大家可以從網上找個demo玩一下就知道怎么做了。而websocket session共享的方案由于websocket底層實現的方式,我們無法做到真正的websocket session共享。

          解決方案的演變

          Netty與Spring WebSocket

          剛開始的時候,我嘗試著用netty實現了websocket服務端的搭建。在netty里面,并沒有websocket session這樣的概念,與其類似的是channel,每一個客戶端連接都代表一個channel。前端的ws請求通過netty監(jiān)聽的端口,走websocket協(xié)議進行ws握手連接之后,通過一些列的handler(責鏈模式)進行消息處理。與websocket session類似地,服務端在連接建立后有一個channel,我們可以通過channel進行與客戶端的通信

          ???/**
          ????*?TODO?根據服務器傳進來的id,分配到不同的group
          ????*/

          ???private?static?final?ChannelGroup?GROUP?=?new?DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
          ?
          ???@Override
          ???protected?void?channelRead0(ChannelHandlerContext?ctx,?TextWebSocketFrame?msg)?throws?Exception?{
          ???????//retain增加引用計數,防止接下來的調用引用失效
          ???????System.out.println("服務器接收到來自?"?+?ctx.channel().id()?+?"?的消息:?"?+?msg.text());
          ???????//將消息發(fā)送給group里面的所有channel,也就是發(fā)送消息給客戶端
          ???????GROUP.writeAndFlush(msg.retain());
          ???}

          那么,服務端用netty還是用spring websocket?以下我將從幾個方面列舉這兩種實現方式的優(yōu)缺點

          使用netty實現websocket

          玩過netty的人都知道netty是的線程模型是nio模型,并發(fā)量非常高,spring5之前的網絡線程模型是servlet實現的,而servlet不是nio模型,所以在spring5之后,spring的底層網絡實現采用了netty。如果我們單獨使用netty來開發(fā)websocket服務端,速度快是絕對的,但是可能會遇到下列問題:

          1. 與系統(tǒng)的其他應用集成不方便,在rpc調用的時候,無法享受springcloud里feign服務調用的便利性
          2. 業(yè)務邏輯可能要重復實現
          3. 使用netty可能需要重復造輪子
          4. 怎么連接上服務注冊中心,也是一件麻煩的事情
          5. restful服務與ws服務需要分開實現,如果在netty上實現restful服務,有多麻煩可想而知,用spring一站式restful開發(fā)相信很多人都習慣了。

          使用spring websocket實現ws服務

          spring websocket已經被springboot很好地集成了,所以在springboot上開發(fā)ws服務非常方便,做法非常簡單

          第一步:添加依賴

          <dependency>
          ???<groupId>org.springframework.bootgroupId>
          ???<artifactId>spring-boot-starter-websocketartifactId>
          dependency>

          第二步:添加配置類

          @Configuration
          public?class?WebSocketConfig?implements?WebSocketConfigurer?{
          @Override
          public?void?registerWebSocketHandlers(WebSocketHandlerRegistry?registry)?{
          ????registry.addHandler(myHandler(),?"/")
          ????????.setAllowedOrigins("*");
          }
          ?
          @Bean
          ?public?WebSocketHandler?myHandler()?{
          ?????return?new?MessageHandler();
          ?}
          }

          第三步:實現消息監(jiān)聽類

          @Component
          @SuppressWarnings("unchecked")
          public?class?MessageHandler?extends?TextWebSocketHandler?{
          ???private?List?clients?=?new?ArrayList<>();
          ?
          ???@Override
          ???public?void?afterConnectionEstablished(WebSocketSession?session)?{
          ???????clients.add(session);
          ???????System.out.println("uri?:"?+?session.getUri());
          ???????System.out.println("連接建立:?"?+?session.getId());
          ???????System.out.println("current?seesion:?"?+?clients.size());
          ???}
          ?
          ???@Override
          ???public?void?afterConnectionClosed(WebSocketSession?session,?CloseStatus?status)?{
          ???????clients.remove(session);
          ???????System.out.println("斷開連接:?"?+?session.getId());
          ???}
          ?
          ???@Override
          ???protected?void?handleTextMessage(WebSocketSession?session,?TextMessage?message)?{
          ???????String?payload?=?message.getPayload();
          ???????Map?map?=?JSONObject.parseObject(payload,?HashMap.class);
          ???????System.out.println("接受到的數據"?+?map);
          ???????clients.forEach(s?->?{
          ???????????try?{
          ???????????????System.out.println("發(fā)送消息給:?"?+?session.getId());
          ???????????????s.sendMessage(new?TextMessage("服務器返回收到的信息,"?+?payload));
          ???????????}?catch?(Exception?e)?{
          ???????????????e.printStackTrace();
          ???????????}
          ???????});
          ???}
          }

          從這個demo中,使用spring websocket實現ws服務的便利性大家可想而知了。為了能更好地向spring cloud大家族看齊,我最終采用了spring websocket實現ws服務。

          因此我的應用服務架構是這樣子的:一個應用既負責restful服務,也負責ws服務。沒有將ws服務模塊拆分是因為拆分出去要使用feign來進行服務調用。第一本人比較懶惰,第二拆分與不拆分相差在多了一層服務間的io調用,所以就沒有這么做了。

          學習資料:Java進階視頻資源

          從zuul技術轉型到spring cloud gateway

          要實現websocket集群,我們必不可免地得從zuul轉型到spring cloud gateway。原因如下:

          zuul1.0版本不支持websocket轉發(fā),zuul 2.0開始支持websocket,zuul2.0幾個月前開源了,但是2.0版本沒有被spring boot集成,而且文檔不健全。因此轉型是必須的,同時轉型也很容易實現。

          在gateway中,為了實現ssl認證和動態(tài)路由負載均衡,yml文件中以下的某些配置是必須的,在這里提前避免大家采坑

          server:
          ??port:?443
          ??ssl:
          ????enabled:?true
          ????key-store:?classpath:xxx.jks
          ????key-store-password:?xxxx
          ????key-store-type:?JKS
          ????key-alias:?alias
          spring:
          ??application:
          ????name:?api-gateway
          ??cloud:
          ????gateway:
          ??????httpclient:
          ????????ssl:
          ??????????handshake-timeout-millis:?10000
          ??????????close-notify-flush-timeout-millis:?3000
          ??????????close-notify-read-timeout-millis:?0
          ??????????useInsecureTrustManager:?true
          ??????discovery:
          ????????locator:
          ??????????enabled:?true
          ??????????lower-case-service-id:?true
          ??????routes:
          ??????-?id:?dc
          ????????uri:?lb://dc
          ????????predicates:
          ????????-?Path=/dc/**
          ??????-?id:?wecheck
          ????????uri:?lb://wecheck
          ????????predicates:
          ????????-?Path=/wecheck/**

          如果要愉快地玩https卸載,我們還需要配置一個filter,否則請求網關時會出現錯誤not an SSL/TLS record

          @Component
          public?class?HttpsToHttpFilter?implements?GlobalFilter,?Ordered?{
          ??private?static?final?int?HTTPS_TO_HTTP_FILTER_ORDER?=?10099;
          ??@Override
          ??public?Mono?filter(ServerWebExchange?exchange,?GatewayFilterChain?chain)?{
          ??????URI?originalUri?=?exchange.getRequest().getURI();
          ??????ServerHttpRequest?request?=?exchange.getRequest();
          ??????ServerHttpRequest.Builder?mutate?=?request.mutate();
          ??????String?forwardedUri?=?request.getURI().toString();
          ??????if?(forwardedUri?!=?null?&&?forwardedUri.startsWith("https"))?{
          ??????????try?{
          ??????????????URI?mutatedUri?=?new?URI("http",
          ??????????????????????originalUri.getUserInfo(),
          ??????????????????????originalUri.getHost(),
          ??????????????????????originalUri.getPort(),
          ??????????????????????originalUri.getPath(),
          ??????????????????????originalUri.getQuery(),
          ??????????????????????originalUri.getFragment());
          ??????????????mutate.uri(mutatedUri);
          ??????????}?catch?(Exception?e)?{
          ??????????????throw?new?IllegalStateException(e.getMessage(),?e);
          ??????????}
          ??????}
          ??????ServerHttpRequest?build?=?mutate.build();
          ??????ServerWebExchange?webExchange?=?exchange.mutate().request(build).build();
          ??????return?chain.filter(webExchange);
          ??}
          ?
          ??@Override
          ??public?int?getOrder()?{
          ??????return?HTTPS_TO_HTTP_FILTER_ORDER;
          ??}
          }

          這樣子我們就可以使用gateway來卸載https請求了,到目前為止,我們的基本框架已經搭建完畢,網關既可以轉發(fā)https請求,也可以轉發(fā)wss請求。接下來就是用戶多對多之間session互通的通訊解決方案了。接下來,我將根據方案的優(yōu)雅性,從最不優(yōu)雅的方案開始講起。

          session廣播

          這是最簡單的websocket集群通訊解決方案。場景如下:

          教師A想要群發(fā)消息給他的學生們

          • 教師的消息請求發(fā)給網關,內容包含{我是教師A,我想把xxx消息發(fā)送我的學生們}
          • 網關接收到消息,獲取集群所有ip地址,逐個調用教師的請求
          • 集群中的每臺服務器獲取請求,根據教師A的信息查找本地有沒有與學生關聯(lián)的session,有則調用sendMessage方法,沒有則忽略請求

          session廣播實現很簡單,但是有一個致命缺陷:計算力浪費現象,當服務器沒有消息接收者session的時候,相當于浪費了一次循環(huán)遍歷的計算力,該方案在并發(fā)需求不高的情況下可以優(yōu)先考慮,實現很容易。

          spring cloud中獲取服務集群中每臺服務器信息的方法如下

          @Resource
          private?EurekaClient?eurekaClient;
          ?
          Application?app?=?eurekaClient.getApplication("service-name");
          //instanceInfo包括了一臺服務器ip,port等消息
          InstanceInfo?instanceInfo?=?app.getInstances().get(0);
          System.out.println("ip?address:?"?+?instanceInfo.getIPAddr());

          服務器需要維護關系映射表,將用戶的id與session做映射,session建立時在映射表中添加映射關系,session斷開后要刪除映射表內關聯(lián)關系

          一致性哈希算法實現(本文的要點)

          這種方法是本人認為最優(yōu)雅的實現方案,理解這種方案需要一定的時間,如果你耐心看下去,相信你一定會有所收獲。再強調一次,不了解一致性哈希算法的同學請先看這里,現先假設哈希環(huán)是順時針查找的。

          首先,想要將一致性哈希算法的思想應用到我們的websocket集群,我們需要解決以下新問題:

          • 集群節(jié)點DOWN,會影響到哈希環(huán)映射到狀態(tài)是DOWN的節(jié)點。
          • 集群節(jié)點UP,會影響到舊key映射不到對應的節(jié)點。
          • 哈希環(huán)讀寫共享。

          在集群中,總會出現服務UP/DOWN的問題。

          針對節(jié)點DOWN的問題分析如下:

          一個服務器DOWN的時候,其擁有的websocket session會自動關閉連接,并且前端會收到通知。此時會影響到哈希環(huán)的映射錯誤。我們只需要當監(jiān)聽到服務器DOWN的時候,刪除哈希環(huán)上面對應的實際結點和虛結點,避免讓網關轉發(fā)到狀態(tài)是DOWN的服務器上。

          實現方法:在eureka治理中心監(jiān)聽集群服務DOWN事件,并及時更新哈希環(huán)。

          學習資料:Java進階視頻資源

          針對節(jié)點UP的問題分析如下:

          現假設集群中有服務 CacheB上線了,該服務器的ip地址剛好被映射到key1和 cacheA之間。那么key1對應的用戶每次要發(fā)消息時都跑去 CacheB發(fā)送消息,結果明顯是發(fā)送不了消息,因為 CacheB沒有key1對應的session。

          此時我們有兩種解決方案。

          方案A簡單,動作大:

          eureka監(jiān)聽到節(jié)點UP事件之后,根據現有集群信息,更新哈希環(huán)。并且斷開所有session連接,讓客戶端重新連接,此時客戶端會連接到更新后的哈希環(huán)節(jié)點,以此避免消息無法送達的情況。

          方案B復雜,動作小:

          我們先看看沒有虛擬節(jié)點的情況,假設 CacheC和 CacheA之間上線了服務器 CacheB。所有映射在 CacheC到 CacheB的用戶發(fā)消息時都會去 CacheB里面找session發(fā)消息。也就是說 CacheB一但上線,便會影響到 CacheC到 CacheB之間的用戶發(fā)送消息。所以我們只需要將 CacheA斷開 CacheC到 CacheB的用戶所對應的session,讓客戶端重連。

          接下來是有虛擬節(jié)點的情況,假設淺色的節(jié)點是虛擬節(jié)點。我們用長括號來代表某段區(qū)域映射的結果屬于某個 Cache。首先是C節(jié)點未上線的情況。圖大家應該都懂吧,所有B的虛擬節(jié)點都會指向真實的B節(jié)點,所以所有B節(jié)點逆時針那一部分都會映射到B(因為我們規(guī)定哈希環(huán)順時針查找)。

          接下來是C節(jié)點上線的情況,可以看到某些區(qū)域被C占領了。

          由以上情況我們可以知道:節(jié)點上線,會有許多對應虛擬節(jié)點也同時上線,因此我們需要將多段范圍key對應的session斷開連接(上圖紅色的部分)。具體算法有點復雜,實現的方式因人而異,大家可以嘗試一下自己實現算法。

          哈希環(huán)應該放在哪里?

          • gateway本地創(chuàng)建并維護哈希環(huán)。當ws請求進來的時候,本地獲取哈希環(huán)并獲取映射服務器信息,轉發(fā)ws請求。這種方法看上去不錯,但實際上是不太可取的,回想一下上面服務器DOWN的時候只能通過eureka監(jiān)聽,那么eureka監(jiān)聽到DOWN事件之后,需要通過io來通知gateway刪除對應節(jié)點嗎?顯然太麻煩了,將eureka的職責分散到gateway,不建議這么做。
          • eureka創(chuàng)建,并放到redis共享讀寫。這個方案可行,當eureka監(jiān)聽到服務DOWN的時候,修改哈希環(huán)并推送到redis上。為了請求響應時間盡量地短,我們不可以讓gateway每次轉發(fā)ws請求的時候都去redis取一次哈希環(huán)。哈希環(huán)修改的概率的確很低,gateway只需要應用redis的消息訂閱模式,訂閱哈希環(huán)修改事件便可以解決此問題。

          至此我們的spring websocket集群已經搭建的差不多了,最重要的地方還是一致性哈希算法。現在有最后一個技術瓶頸,網關如何根據ws請求轉發(fā)到指定的集群服務器上?

          答案在負載均衡。spring cloud gateway或zuul都默認集成了ribbon作為負載均衡,我們只需要根據建立ws請求時客戶端發(fā)來的user id,重寫ribbon負載均衡算法,根據user id進行hash,并在哈希環(huán)上尋找ip,并將ws請求轉發(fā)到該ip便完事了。流程如下圖所示:

          接下來用戶溝通的時候,只需要根據id進行hash,在哈希環(huán)上獲取對應ip,便可以知道與該用戶建立ws連接時的session存在哪臺服務器上了!

          spring cloud Finchley.RELEASE 版本中ribbon未完善的地方

          題主在實際操作的時候發(fā)現了ribbon兩個不完善的地方......

          • 根據網上找的方法,繼承AbstractLoadBalancerRule重寫負載均衡策略之后,多個不同應用的請求變得混亂。假如eureka上有兩個service A和B,重寫負載均衡策略之后,請求A或B的服務,最終只會映射到其中一個服務上。非常奇怪!可能spring cloud gateway官網需要給出一個正確的重寫負載均衡策略的demo。
          • 一致性哈希算法需要一個key,類似user id,根據key進行hash之后在哈希環(huán)上搜索并返回ip。但是ribbon沒有完善choose函數的key參數,直接寫死了default!

          難道這樣子我們就沒有辦法了嗎?其實還有一個可行并且暫時可替代的辦法!

          如下圖所示,客戶端發(fā)送一個普通的http請求(包含id參數)給網關,網關根據id進行hash,在哈希環(huán)中尋找ip地址,將ip地址返回給客戶端,客戶端再根據該ip地址進行ws請求。

          由于ribbon未完善key的處理,我們暫時無法在ribbon上實現一致性哈希算法。只能間接地通過客戶端發(fā)起兩次請求(一次http,一次ws)的方式來實現一致性哈希。希望不久之后ribbon能更新這個缺陷!讓我們的websocket集群實現得更優(yōu)雅一點。

          后記

          以上便是我這幾天探索的結果。期間遇到了許多問題,并逐一解決難題,列出兩個websocket集群解決方案。第一個是session廣播,第二個是一致性哈希。

          這兩種方案針對不同場景各有優(yōu)缺點,本文并未用到ActiveMQ,Karfa等消息隊列實現消息推送,只是想通過自己的想法,不依靠消息隊列來簡單地實現多用戶之間的長連接通訊。希望能為大家提供一條不同于尋常的思路。

          來源:blog.csdn.net/weixin_34194702/

          article/details/88701309

          Stackoverflow 高贊答案,為什么牛逼的程序員都不用 “ ! = null ' 做判空

          你用什么軟件做筆記?

          Java17,有史以來最快 JDK

          如何保護 SpringBoot 配置文件中的敏感信息

          最近面試BAT,整理一份面試資料Java面試BATJ通關手冊,覆蓋了Java核心技術、JVM、Java并發(fā)、SSM、微服務、數據庫、數據結構等等。

          獲取方式:點“在看”,關注公眾號并回復?Java?領取,更多內容陸續(xù)奉上。

          文章有幫助的話,在看,轉發(fā)吧。

          謝謝支持喲 (*^

          瀏覽 65
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产无码av在线 国产一级内射视频 | av天堂中文网 | 成人做爰黄 片视频免费看 | 国产高潮久久 | 亚洲色噜噜噜 |