<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Netty 中的心跳機制,還有誰不會?

          共 18481字,需瀏覽 37分鐘

           ·

          2021-06-11 20:51

          上一篇:深夜看了張一鳴的微博,讓我越想越后怕

          作者:rickiyang
          出處:www.cnblogs.com/rickiyang/p/11074231.html

          我們知道在TCP長連接或者WebSocket長連接中一般我們都會使用心跳機制–即發(fā)送特殊的數(shù)據(jù)包來通告對方自己的業(yè)務(wù)還沒有辦完,不要關(guān)閉鏈接。

          那么心跳機制可以用來做什么呢?

          我們知道網(wǎng)絡(luò)的傳輸是不可靠的,當我們發(fā)起一個鏈接請求的過程之中會發(fā)生什么事情誰都無法預(yù)料,或者斷電,服務(wù)器重啟,斷網(wǎng)線之類。

          如果有這種情況的發(fā)生對方也無法判斷你是否還在線。所以這時候我們引入心跳機制,在長鏈接中雙方?jīng)]有數(shù)據(jù)交互的時候互相發(fā)送數(shù)據(jù)(可能是空包,也可能是特殊數(shù)據(jù)),對方收到該數(shù)據(jù)之后也回復(fù)相應(yīng)的數(shù)據(jù)用以確保雙方都在線,這樣就可以確保當前鏈接是有效的。

          1. 如何實現(xiàn)心跳機制

          一般實現(xiàn)心跳機制由兩種方式:

          • TCP協(xié)議自帶的心跳機制來實現(xiàn);
          • 在應(yīng)用層來實現(xiàn)。

          但是TCP協(xié)議自帶的心跳機制系統(tǒng)默認是設(shè)置的是2小時的心跳頻率。它檢查不到機器斷電、網(wǎng)線拔出、防火墻這些斷線。而且邏輯層處理斷線可能也不是那么好處理。另外該心跳機制是與TCP協(xié)議綁定的,那如果我們要是使用UDP協(xié)議豈不是用不了?所以一般我們都不用。

          而一般我們自己實現(xiàn)呢大致的策略是這樣的:

          1. Client啟動一個定時器,不斷發(fā)送心跳;
          2. Server收到心跳后,做出回應(yīng);
          3. Server啟動一個定時器,判斷Client是否存在,這里做判斷有兩種方法:時間差和簡單標識。
          時間差:
          1. 收到一個心跳包之后記錄當前時間;
          2. 判斷定時器到達時間,計算多久沒收到心跳時間=當前時間-上次收到心跳時間。如果改時間大于設(shè)定值則認為超時。
          簡單標識:
          1. 收到心跳后設(shè)置連接標識為true;
          2. 判斷定時器到達時間,如果未收到心跳則設(shè)置連接標識為false;

          今天我們來看一下Netty的心跳機制的實現(xiàn),在Netty中提供了IdleStateHandler類來進行心跳的處理,它可以對一個 Channel 的 讀/寫設(shè)置定時器, 當 Channel 在一定事件間隔內(nèi)沒有數(shù)據(jù)交互時(即處于 idle 狀態(tài)), 就會觸發(fā)指定的事件。

          該類可以對三種類型的超時做心跳機制檢測:

          public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
              this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
          }

          下面我們還是通過一個例子來講解IdleStateHandler的使用。

          服務(wù)端:

          public class HeartBeatServer {
              private int port;

              public HeartBeatServer(int port) {
                  this.port = port;
              }

              public void start(){
                  EventLoopGroup bossGroup = new NioEventLoopGroup();
                  EventLoopGroup workGroup = new NioEventLoopGroup();

                  ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
                          .channel(NioServerSocketChannel.class)
                          .childHandler(new HeartBeatServerChannelInitializer());

                  try {
                      ChannelFuture future = server.bind(port).sync();
                      future.channel().closeFuture().sync();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }finally {
                      bossGroup.shutdownGracefully();
                      workGroup.shutdownGracefully();
                  }
              }

              public static void main(String[] args) {
                  HeartBeatServer server = new HeartBeatServer(7788);
                  server.start();
              }
          }

          服務(wù)端Initializer:

          public class HeartBeatServerChannelInitializer extends ChannelInitializer<SocketChannel> {
              @Override
              protected void initChannel(SocketChannel socketChannel) throws Exception {
                  ChannelPipeline pipeline = socketChannel.pipeline();

                  pipeline.addLast("handler",new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
                  pipeline.addLast("decoder", new StringDecoder());
                  pipeline.addLast("encoder", new StringEncoder());
                  pipeline.addLast(new HeartBeatServerHandler());
              }
          }

          在這里IdleStateHandler也是handler的一種,所以加入addLast。我們分別設(shè)置4個參數(shù):讀超時時間為3s,寫超時和讀寫超時為0,然后加入時間控制單元。另外,關(guān)注公眾號互聯(lián)網(wǎng)架構(gòu)師,在后臺回復(fù):2T,可以獲取我整理的 Java 系列面試題和答案,非常齊全。

          服務(wù)端handler:

          public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
              private int loss_connect_time = 0;

              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  System.out.println(ctx.channel().remoteAddress() + "Server :" + msg.toString());
              }

              @Override
              public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                  if(evt instanceof IdleStateEvent){
                      //服務(wù)端對應(yīng)著讀事件,當為READER_IDLE時觸發(fā)
                          IdleStateEvent event = (IdleStateEvent)evt;
                      if(event.state() == IdleState.READER_IDLE){
                          loss_connect_time++;
                          System.out.println("接收消息超時");
                          if(loss_connect_time > 2){
                              System.out.println("關(guān)閉不活動的鏈接");
                              ctx.channel().close();
                          }
                      }else{
                          super.userEventTriggered(ctx,evt);
                      }
                  }
              }

              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                  ctx.close();
              }
          }
          我們看到在handler中調(diào)用了userEventTriggered方法,IdleStateEvent的state()方法一個有三個值:READER_IDLE,WRITER_IDLE,ALL_IDLE。正好對應(yīng)讀事件寫事件和讀寫事件。

          再來寫一下客戶端:

          public class HeartBeatsClient {
              private  int port;
              private  String address;

              public HeartBeatsClient(int port, String address) {
                  this.port = port;
                  this.address = address;
              }

              public void start(){
                  EventLoopGroup group = new NioEventLoopGroup();

                  Bootstrap bootstrap = new Bootstrap();
                  bootstrap.group(group)
                          .channel(NioSocketChannel.class)
                          .handler(new HeartBeatsClientChannelInitializer());

                  try {
                      ChannelFuture future = bootstrap.connect(address,port).sync();
                      future.channel().closeFuture().sync();
                  } catch (Exception e) {
                      e.printStackTrace();
                  }finally {
                      group.shutdownGracefully();
                  }

              }

              public static void main(String[] args) {
                  HeartBeatsClient client = new HeartBeatsClient(7788,"127.0.0.1");
                  client.start();
              }
          }
          客戶端Initializer:
          public class HeartBeatsClientChannelInitializer extends  ChannelInitializer<SocketChannel> {

              protected void initChannel(SocketChannel socketChannel) throws Exception {
                  ChannelPipeline pipeline = socketChannel.pipeline();

                  pipeline.addLast("handler", new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
                  pipeline.addLast("decoder", new StringDecoder());
                  pipeline.addLast("encoder", new StringEncoder());
                  pipeline.addLast(new HeartBeatClientHandler());
              }
          }

          這里我們設(shè)置了IdleStateHandler的寫超時為3秒,客戶端執(zhí)行的動作為寫消息到服務(wù)端,服務(wù)端執(zhí)行讀動作。

          客戶端handler:

          public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {

              private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
                      CharsetUtil.UTF_8));

              private static final int TRY_TIMES = 3;

              private int currentTime = 0;

              @Override
              public void channelActive(ChannelHandlerContext ctx) throws Exception {
                  System.out.println("激活時間是:"+new Date());
                  System.out.println("鏈接已經(jīng)激活");
                  ctx.fireChannelActive();
              }

              @Override
              public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                  System.out.println("停止時間是:"+new Date());
                  System.out.println("關(guān)閉鏈接");
              }

              @Override
              public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                  System.out.println("當前輪詢時間:"+new Date());
                  if (evt instanceof IdleStateEvent) {
                          IdleStateEvent event = (IdleStateEvent) evt;
                      if (event.state() == IdleState.WRITER_IDLE) {
                          if(currentTime <= TRY_TIMES){
                              System.out.println("currentTime:"+currentTime);
                              currentTime++;
                              ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
                          }
                      }
                  }
              }

              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  String message = (String) msg;
                  System.out.println(message);
                  if (message.equals("Heartbeat")) {
                      ctx.write("has read message from server");
                      ctx.flush();
                  }
                  ReferenceCountUtil.release(msg);
              }

              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                  ctx.close();
              }
          }

          啟動服務(wù)端和客戶端我們看到輸出為:

          我們再來屢一下思路:
          1. 首先客戶端激活channel,因為客戶端中并沒有發(fā)送消息所以會觸發(fā)客戶端的IdleStateHandler,它設(shè)置的寫超時時間為3s;
          2. 然后觸發(fā)客戶端的事件機制進入userEventTriggered方法,在觸發(fā)器中計數(shù)并向客戶端發(fā)送消息;
          3. 服務(wù)端接收消息;
          4. 客戶端觸發(fā)器繼續(xù)輪詢發(fā)送消息,直到計數(shù)器滿不再向服務(wù)端發(fā)送消息;
          5. 服務(wù)端在IdleStateHandler設(shè)置的讀消息超時時間5s內(nèi)未收到消息,觸發(fā)了服務(wù)端中handler的userEventTriggered方法,于是關(guān)閉客戶端的鏈接。
          大體我們的簡單心跳機制就是這樣的思路,通過事件觸發(fā)機制以及計數(shù)器的方式來實現(xiàn),上面我們的案例中最后客戶端沒有發(fā)送消息的時候我們是強制斷開了客戶端的鏈接,那么既然可以關(guān)閉,我們是不是也可是重新鏈接客戶端呢?因為萬一客戶端本身并不想關(guān)閉而是由于別的原因?qū)е滤麩o法與服務(wù)端通信。下面我們來說一下重連機制。

          當我們的服務(wù)端在未讀到客戶端消息超時而關(guān)閉客戶端的時候我們一般在客戶端的finally塊中方的是關(guān)閉客戶端的代碼,這時我們可以做一下修改的,finally是一定會被執(zhí)行新的,所以我們可以在finally塊中重新調(diào)用一下啟動客戶端的代碼,這樣就又重新啟動了客戶端了,上客戶端代碼:

          /**
           * 本Client為測試netty重連機制
           * Server端代碼都一樣,所以不做修改
           * 只用在client端中做一下判斷即可
           */
          public class HeartBeatsClient2 {

              private  int port;
              private  String address;
              ChannelFuture future;

              public HeartBeatsClient2(int port, String address) {
                  this.port = port;
                  this.address = address;
              }

              public void start(){
                  EventLoopGroup group = new NioEventLoopGroup();

                  Bootstrap bootstrap = new Bootstrap();
                  bootstrap.group(group)
                          .channel(NioSocketChannel.class)
                          .handler(new HeartBeatsClientChannelInitializer());

                  try {
                      future = bootstrap.connect(address,port).sync();
                      future.channel().closeFuture().sync();
                  } catch (Exception e) {
                      e.printStackTrace();
                  }finally {
                      //group.shutdownGracefully();
                      if (null != future) {
                          if (future.channel() != null && future.channel().isOpen()) {
                              future.channel().close();
                          }
                      }
                      System.out.println("準備重連");
                      start();
                      System.out.println("重連成功");
                  }

              }

              public static void main(String[] args) {
                  HeartBeatsClient2 client = new HeartBeatsClient2(7788,"127.0.0.1");
                  client.start();
              }
          }

          其余部分的代碼與上面的實例并無異同,只需改造客戶端即可,我們再運行服務(wù)端和客戶端會看到客戶端雖然被關(guān)閉了,但是立馬又被重啟:

          當然生產(chǎn)級別的代碼應(yīng)該不是這樣實現(xiàn)的吧,哈哈。

          感謝您的閱讀,也歡迎您發(fā)表關(guān)于這篇文章的任何建議,關(guān)注我,技術(shù)不迷茫!小編到你上高速。


              · END ·
          最后,關(guān)注公眾號互聯(lián)網(wǎng)架構(gòu)師,在后臺回復(fù):2T,可以獲取我整理的 Java 系列面試題和答案,非常齊全


          正文結(jié)束


          推薦閱讀 ↓↓↓

          1.不認命,從10年流水線工人,到谷歌上班的程序媛,一位湖南妹子的勵志故事

          2.如何才能成為優(yōu)秀的架構(gòu)師?

          3.從零開始搭建創(chuàng)業(yè)公司后臺技術(shù)棧

          4.程序員一般可以從什么平臺接私活?

          5.37歲程序員被裁,120天沒找到工作,無奈去小公司,結(jié)果懵了...

          6.IntelliJ IDEA 2019.3 首個最新訪問版本發(fā)布,新特性搶先看

          7.漫畫:程序員相親圖鑒,笑屎我了~

          8.15張圖看懂瞎忙和高效的區(qū)別!

          一個人學習、工作很迷茫?


          點擊「閱讀原文」加入我們的小圈子!

          瀏覽 47
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲视频VS在线免费观看 | 人妻无码喷水 | 人妻水蜜桃| 国产精品色哟 | 免费精品久久久久久中文字幕-无删减 |