<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          服務(wù)的心跳機制與斷線重連,Netty底層是怎么實現(xiàn)的?

          共 11785字,需瀏覽 24分鐘

           ·

          2021-01-23 10:19

          點擊上方藍色“小哈學(xué)Java”,選擇“設(shè)為星標(biāo)

          回復(fù)“資源”獲取獨家整理的學(xué)習(xí)資料!

          作者:sprinkle_liz

          www.jianshu.com/p/1a28e48edd92

          提醒:本篇適合有一定netty基礎(chǔ)的讀者閱讀

          心跳機制

          何為心跳

          所謂心跳, 即在 TCP 長連接中, 客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包, 通知對方自己還在線, 以確保 TCP 連接的有效性.

          注:心跳包還有另一個作用,經(jīng)常被忽略,即:一個連接如果長時間不用,防火墻或者路由器就會斷開該連接

          如何實現(xiàn)

          核心Handler —— IdleStateHandler

          Netty 中, 實現(xiàn)心跳機制的關(guān)鍵是 IdleStateHandler, 那么這個 Handler 如何使用呢? 先看下它的構(gòu)造器:

          public?IdleStateHandler(int?readerIdleTimeSeconds,?int?writerIdleTimeSeconds,?int?allIdleTimeSeconds)?{
          ????this((long)readerIdleTimeSeconds,?(long)writerIdleTimeSeconds,?(long)allIdleTimeSeconds,?TimeUnit.SECONDS);
          }

          這里解釋下三個參數(shù)的含義:

          • readerIdleTimeSeconds: 讀超時. 即當(dāng)在指定的時間間隔內(nèi)沒有從 Channel 讀取到數(shù)據(jù)時, 會觸發(fā)一個 READER_IDLEIdleStateEvent 事件.

          • writerIdleTimeSeconds: 寫超時. 即當(dāng)在指定的時間間隔內(nèi)沒有數(shù)據(jù)寫入到 Channel 時, 會觸發(fā)一個 WRITER_IDLEIdleStateEvent 事件.

          • allIdleTimeSeconds: 讀/寫超時. 即當(dāng)在指定的時間間隔內(nèi)沒有讀或?qū)懖僮鲿r, 會觸發(fā)一個 ALL_IDLEIdleStateEvent 事件.

          注:這三個參數(shù)默認(rèn)的時間單位是。若需要指定其他時間單位,可以使用另一個構(gòu)造方法:IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)

          在看下面的實現(xiàn)之前,建議先了解一下IdleStateHandler的實現(xiàn)原理。相關(guān)鏈接:

          https://blog.csdn.net/linuu/article/details/51385682

          下面直接上代碼,需要注意的地方,會在代碼中通過注釋進行說明。

          使用IdleStateHandler實現(xiàn)心跳

          下面將使用IdleStateHandler來實現(xiàn)心跳,Client端連接到Server端后,會循環(huán)執(zhí)行一個任務(wù):隨機等待幾秒,然后ping一下Server端,即發(fā)送一個心跳包。當(dāng)?shù)却臅r間超過規(guī)定時間,將會發(fā)送失敗,以為Server端在此之前已經(jīng)主動斷開連接了。代碼如下:

          Client端

          ClientIdleStateTrigger —— 心跳觸發(fā)器

          ClientIdleStateTrigger也是一個Handler,只是重寫了userEventTriggered方法,用于捕獲IdleState.WRITER_IDLE事件(未在指定時間內(nèi)向服務(wù)器發(fā)送數(shù)據(jù)),然后向Server端發(fā)送一個心跳包。

          /**
          ?*?


          ?*??用于捕獲{@link?IdleState#WRITER_IDLE}事件(未在指定時間內(nèi)向服務(wù)器發(fā)送數(shù)據(jù)),然后向Server端發(fā)送一個心跳包。
          ?*?


          ?*/

          public?class?ClientIdleStateTrigger?extends?ChannelInboundHandlerAdapter?{

          ????public?static?final?String?HEART_BEAT?=?"heart?beat!";

          ????@Override
          ????public?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception?{
          ????????if?(evt?instanceof?IdleStateEvent)?{
          ????????????IdleState?state?=?((IdleStateEvent)?evt).state();
          ????????????if?(state?==?IdleState.WRITER_IDLE)?{
          ????????????????//?write?heartbeat?to?server
          ????????????????ctx.writeAndFlush(HEART_BEAT);
          ????????????}
          ????????}?else?{
          ????????????super.userEventTriggered(ctx,?evt);
          ????????}
          ????}

          }
          Pinger —— 心跳發(fā)射器
          /**
          ?*?

          客戶端連接到服務(wù)器端后,會循環(huán)執(zhí)行一個任務(wù):隨機等待幾秒,然后ping一下Server端,即發(fā)送一個心跳包。


          ?*/

          public?class?Pinger?extends?ChannelInboundHandlerAdapter?{

          ????private?Random?random?=?new?Random();
          ????private?int?baseRandom?=?8;

          ????private?Channel?channel;

          ????@Override
          ????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????super.channelActive(ctx);
          ????????this.channel?=?ctx.channel();

          ????????ping(ctx.channel());
          ????}

          ????private?void?ping(Channel?channel)?{
          ????????int?second?=?Math.max(1,?random.nextInt(baseRandom));
          ????????System.out.println("next?heart?beat?will?send?after?"?+?second?+?"s.");
          ????????ScheduledFuture?future?=?channel.eventLoop().schedule(new?Runnable()?{
          ????????????@Override
          ????????????public?void?run()?{
          ????????????????if?(channel.isActive())?{
          ????????????????????System.out.println("sending?heart?beat?to?the?server...");
          ????????????????????channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
          ????????????????}?else?{
          ????????????????????System.err.println("The?connection?had?broken,?cancel?the?task?that?will?send?a?heart?beat.");
          ????????????????????channel.closeFuture();
          ????????????????????throw?new?RuntimeException();
          ????????????????}
          ????????????}
          ????????},?second,?TimeUnit.SECONDS);

          ????????future.addListener(new?GenericFutureListener()?{
          ????????????@Override
          ????????????public?void?operationComplete(Future?future)?throws?Exception?{
          ????????????????if?(future.isSuccess())?{
          ????????????????????ping(channel);
          ????????????????}
          ????????????}
          ????????});
          ????}

          ????@Override
          ????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
          ????????//?當(dāng)Channel已經(jīng)斷開的情況下,?仍然發(fā)送數(shù)據(jù),?會拋異常,?該方法會被調(diào)用.
          ????????cause.printStackTrace();
          ????????ctx.close();
          ????}
          }
          ClientHandlersInitializer —— 客戶端處理器集合的初始化類
          public?class?ClientHandlersInitializer?extends?ChannelInitializer<SocketChannel>?{

          ????private?ReconnectHandler?reconnectHandler;
          ????private?EchoHandler?echoHandler;

          ????public?ClientHandlersInitializer(TcpClient?tcpClient)?{
          ????????Assert.notNull(tcpClient,?"TcpClient?can?not?be?null.");
          ????????this.reconnectHandler?=?new?ReconnectHandler(tcpClient);
          ????????this.echoHandler?=?new?EchoHandler();
          ????}

          ????@Override
          ????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????????ChannelPipeline?pipeline?=?ch.pipeline();
          ????????pipeline.addLast(new?LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,?0,?4,?0,?4));
          ????????pipeline.addLast(new?LengthFieldPrepender(4));
          ????????pipeline.addLast(new?StringDecoder(CharsetUtil.UTF_8));
          ????????pipeline.addLast(new?StringEncoder(CharsetUtil.UTF_8));
          ????????pipeline.addLast(new?Pinger());
          ????}
          }

          注:上面的Handler集合,除了Pinger,其他都是編解碼器和解決粘包,可以忽略。

          TcpClient —— TCP連接的客戶端
          public?class?TcpClient?{

          ????private?String?host;
          ????private?int?port;
          ????private?Bootstrap?bootstrap;
          ????/**?將Channel保存起來,?可用于在其他非handler的地方發(fā)送數(shù)據(jù)?*/
          ????private?Channel?channel;

          ????public?TcpClient(String?host,?int?port)?{
          ????????this(host,?port,?new?ExponentialBackOffRetry(1000,?Integer.MAX_VALUE,?60?*?1000));
          ????}

          ????public?TcpClient(String?host,?int?port,?RetryPolicy?retryPolicy)?{
          ????????this.host?=?host;
          ????????this.port?=?port;
          ????????init();
          ????}

          ????/**
          ?????*?向遠(yuǎn)程TCP服務(wù)器請求連接
          ?????*/

          ????public?void?connect()?{
          ????????synchronized?(bootstrap)?{
          ????????????ChannelFuture?future?=?bootstrap.connect(host,?port);
          ????????????this.channel?=?future.channel();
          ????????}
          ????}

          ????private?void?init()?{
          ????????EventLoopGroup?group?=?new?NioEventLoopGroup();
          ????????//?bootstrap?可重用,?只需在TcpClient實例化的時候初始化即可.
          ????????bootstrap?=?new?Bootstrap();
          ????????bootstrap.group(group)
          ????????????????.channel(NioSocketChannel.class)
          ????????????????.handler(new?ClientHandlersInitializer(TcpClient.this))
          ;
          ????}

          ????public?static?void?main(String[]?args)?{
          ????????TcpClient?tcpClient?=?new?TcpClient("localhost",?2222);
          ????????tcpClient.connect();
          ????}

          }

          Server端

          ServerIdleStateTrigger —— 斷連觸發(fā)器
          /**
          ?*?

          在規(guī)定時間內(nèi)未收到客戶端的任何數(shù)據(jù)包,?將主動斷開該連接


          ?*/

          public?class?ServerIdleStateTrigger?extends?ChannelInboundHandlerAdapter?{
          ????@Override
          ????public?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception?{
          ????????if?(evt?instanceof?IdleStateEvent)?{
          ????????????IdleState?state?=?((IdleStateEvent)?evt).state();
          ????????????if?(state?==?IdleState.READER_IDLE)?{
          ????????????????//?在規(guī)定時間內(nèi)沒有收到客戶端的上行數(shù)據(jù),?主動斷開連接
          ????????????????ctx.disconnect();
          ????????????}
          ????????}?else?{
          ????????????super.userEventTriggered(ctx,?evt);
          ????????}
          ????}
          }
          ServerBizHandler —— 服務(wù)器端的業(yè)務(wù)處理器
          /**
          ?*?

          收到來自客戶端的數(shù)據(jù)包后,?直接在控制臺打印出來.


          ?*/

          @ChannelHandler.Sharable
          public?class?ServerBizHandler?extends?SimpleChannelInboundHandler<String>?{

          ????private?final?String?REC_HEART_BEAT?=?"I?had?received?the?heart?beat!";

          ????@Override
          ????protected?void?channelRead0(ChannelHandlerContext?ctx,?String?data)?throws?Exception?{
          ????????try?{
          ????????????System.out.println("receive?data:?"?+?data);
          //????????????ctx.writeAndFlush(REC_HEART_BEAT);
          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????}

          ????@Override
          ????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????System.out.println("Established?connection?with?the?remote?client.");

          ????????//?do?something

          ????????ctx.fireChannelActive();
          ????}

          ????@Override
          ????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????System.out.println("Disconnected?with?the?remote?client.");

          ????????//?do?something

          ????????ctx.fireChannelInactive();
          ????}

          ????@Override
          ????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
          ????????cause.printStackTrace();
          ????????ctx.close();
          ????}
          }
          ServerHandlerInitializer —— 服務(wù)器端處理器集合的初始化類
          /**
          ?*?

          用于初始化服務(wù)器端涉及到的所有Handler


          ?*/

          public?class?ServerHandlerInitializer?extends?ChannelInitializer<SocketChannel>?{

          ????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????????ch.pipeline().addLast("idleStateHandler",?new?IdleStateHandler(5,?0,?0));
          ????????ch.pipeline().addLast("idleStateTrigger",?new?ServerIdleStateTrigger());
          ????????ch.pipeline().addLast("frameDecoder",?new?LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,?0,?4,?0,?4));
          ????????ch.pipeline().addLast("frameEncoder",?new?LengthFieldPrepender(4));
          ????????ch.pipeline().addLast("decoder",?new?StringDecoder());
          ????????ch.pipeline().addLast("encoder",?new?StringEncoder());
          ????????ch.pipeline().addLast("bizHandler",?new?ServerBizHandler());
          ????}

          }

          注:new IdleStateHandler(5, 0, 0)handler代表如果在5秒內(nèi)沒有收到來自客戶端的任何數(shù)據(jù)包(包括但不限于心跳包),將會主動斷開與該客戶端的連接。

          TcpServer —— 服務(wù)器端
          public?class?TcpServer?{
          ????private?int?port;
          ????private?ServerHandlerInitializer?serverHandlerInitializer;

          ????public?TcpServer(int?port)?{
          ????????this.port?=?port;
          ????????this.serverHandlerInitializer?=?new?ServerHandlerInitializer();
          ????}

          ????public?void?start()?{
          ????????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup(1);
          ????????EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
          ????????try?{
          ????????????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
          ????????????bootstrap.group(bossGroup,?workerGroup)
          ????????????????????.channel(NioServerSocketChannel.class)
          ????????????????????.childHandler(this.serverHandlerInitializer)
          ;
          ????????????//?綁定端口,開始接收進來的連接
          ????????????ChannelFuture?future?=?bootstrap.bind(port).sync();

          ????????????System.out.println("Server?start?listen?at?"?+?port);
          ????????????future.channel().closeFuture().sync();
          ????????}?catch?(Exception?e)?{
          ????????????bossGroup.shutdownGracefully();
          ????????????workerGroup.shutdownGracefully();
          ????????????e.printStackTrace();
          ????????}
          ????}

          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????int?port?=?2222;
          ????????new?TcpServer(port).start();
          ????}
          }

          至此,所有代碼已經(jīng)編寫完畢。

          測試

          首先啟動客戶端,再啟動服務(wù)器端。啟動完成后,在客戶端的控制臺上,可以看到打印如下類似日志:

          客戶端控制臺輸出的日志

          在服務(wù)器端可以看到控制臺輸出了類似如下的日志:

          服務(wù)器端控制臺輸出的日志

          可以看到,客戶端在發(fā)送4個心跳包后,第5個包因為等待時間較長,等到真正發(fā)送的時候,發(fā)現(xiàn)連接已斷開了;而服務(wù)器端收到客戶端的4個心跳數(shù)據(jù)包后,遲遲等不到下一個數(shù)據(jù)包,所以果斷斷開該連接。

          異常情況

          在測試過程中,有可能會出現(xiàn)如下情況:

          異常情況

          出現(xiàn)這種情況的原因是:在連接已斷開的情況下,仍然向服務(wù)器端發(fā)送心跳包。雖然在發(fā)送心跳包之前會使用channel.isActive()判斷連接是否可用,但也有可能上一刻判斷結(jié)果為可用,但下一刻發(fā)送數(shù)據(jù)包之前,連接就斷了。

          目前尚未找到優(yōu)雅處理這種情況的方案,各位看官如果有好的解決方案,還望不吝賜教。拜謝!!!

          斷線重連

          斷線重連這里就不過多介紹,相信各位都知道是怎么回事。這里只說大致思路,然后直接上代碼。

          實現(xiàn)思路

          客戶端在監(jiān)測到與服務(wù)器端的連接斷開后,或者一開始就無法連接的情況下,使用指定的重連策略進行重連操作,直到重新建立連接或重試次數(shù)耗盡。

          對于如何監(jiān)測連接是否斷開,則是通過重寫ChannelInboundHandler#channelInactive來實現(xiàn),但連接不可用,該方法會被觸發(fā),所以只需要在該方法做好重連工作即可。

          代碼實現(xiàn)

          注:以下代碼都是在上面心跳機制的基礎(chǔ)上修改/添加的。

          因為斷線重連是客戶端的工作,所以只需對客戶端代碼進行修改。

          重試策略

          RetryPolicy —— 重試策略接口

          public?interface?RetryPolicy?{

          ????/**
          ?????*?Called?when?an?operation?has?failed?for?some?reason.?This?method?should?return
          ?????*?true?to?make?another?attempt.
          ?????*
          ?????*?@param?retryCount?the?number?of?times?retried?so?far?(0?the?first?time)
          ?????*?@return?true/false
          ?????*/

          ????boolean?allowRetry(int?retryCount);

          ????/**
          ?????*?get?sleep?time?in?ms?of?current?retry?count.
          ?????*
          ?????*?@param?retryCount?current?retry?count
          ?????*?@return?the?time?to?sleep
          ?????*/

          ????long?getSleepTimeMs(int?retryCount);
          }

          ExponentialBackOffRetry —— 重連策略的默認(rèn)實現(xiàn)

          /**
          ?*?

          Retry?policy?that?retries?a?set?number?of?times?with?increasing?sleep?time?between?retries


          ?*/

          public?class?ExponentialBackOffRetry?implements?RetryPolicy?{

          ????private?static?final?int?MAX_RETRIES_LIMIT?=?29;
          ????private?static?final?int?DEFAULT_MAX_SLEEP_MS?=?Integer.MAX_VALUE;

          ????private?final?Random?random?=?new?Random();
          ????private?final?long?baseSleepTimeMs;
          ????private?final?int?maxRetries;
          ????private?final?int?maxSleepMs;

          ????public?ExponentialBackOffRetry(int?baseSleepTimeMs,?int?maxRetries)?{
          ????????this(baseSleepTimeMs,?maxRetries,?DEFAULT_MAX_SLEEP_MS);
          ????}

          ????public?ExponentialBackOffRetry(int?baseSleepTimeMs,?int?maxRetries,?int?maxSleepMs)?{
          ????????this.maxRetries?=?maxRetries;
          ????????this.baseSleepTimeMs?=?baseSleepTimeMs;
          ????????this.maxSleepMs?=?maxSleepMs;
          ????}

          ????@Override
          ????public?boolean?allowRetry(int?retryCount)?{
          ????????if?(retryCount?????????????return?true;
          ????????}
          ????????return?false;
          ????}

          ????@Override
          ????public?long?getSleepTimeMs(int?retryCount)?{
          ????????if?(retryCount?0)?{
          ????????????throw?new?IllegalArgumentException("retries?count?must?greater?than?0.");
          ????????}
          ????????if?(retryCount?>?MAX_RETRIES_LIMIT)?{
          ????????????System.out.println(String.format("maxRetries?too?large?(%d).?Pinning?to?%d",?maxRetries,?MAX_RETRIES_LIMIT));
          ????????????retryCount?=?MAX_RETRIES_LIMIT;
          ????????}
          ????????long?sleepMs?=?baseSleepTimeMs?*?Math.max(1,?random.nextInt(1?<????????if?(sleepMs?>?maxSleepMs)?{
          ????????????System.out.println(String.format("Sleep?extension?too?large?(%d).?Pinning?to?%d",?sleepMs,?maxSleepMs));
          ????????????sleepMs?=?maxSleepMs;
          ????????}
          ????????return?sleepMs;
          ????}
          }

          ReconnectHandler—— 重連處理器

          @ChannelHandler.Sharable
          public?class?ReconnectHandler?extends?ChannelInboundHandlerAdapter?{

          ????private?int?retries?=?0;
          ????private?RetryPolicy?retryPolicy;

          ????private?TcpClient?tcpClient;

          ????public?ReconnectHandler(TcpClient?tcpClient)?{
          ????????this.tcpClient?=?tcpClient;
          ????}

          ????@Override
          ????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????System.out.println("Successfully?established?a?connection?to?the?server.");
          ????????retries?=?0;
          ????????ctx.fireChannelActive();
          ????}

          ????@Override
          ????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????if?(retries?==?0)?{
          ????????????System.err.println("Lost?the?TCP?connection?with?the?server.");
          ????????????ctx.close();
          ????????}

          ????????boolean?allowRetry?=?getRetryPolicy().allowRetry(retries);
          ????????if?(allowRetry)?{

          ????????????long?sleepTimeMs?=?getRetryPolicy().getSleepTimeMs(retries);

          ????????????System.out.println(String.format("Try?to?reconnect?to?the?server?after?%dms.?Retry?count:?%d.",?sleepTimeMs,?++retries));

          ????????????final?EventLoop?eventLoop?=?ctx.channel().eventLoop();
          ????????????eventLoop.schedule(()?->?{
          ????????????????System.out.println("Reconnecting?...");
          ????????????????tcpClient.connect();
          ????????????},?sleepTimeMs,?TimeUnit.MILLISECONDS);
          ????????}
          ????????ctx.fireChannelInactive();
          ????}


          ????private?RetryPolicy?getRetryPolicy()?{
          ????????if?(this.retryPolicy?==?null)?{
          ????????????this.retryPolicy?=?tcpClient.getRetryPolicy();
          ????????}
          ????????return?this.retryPolicy;
          ????}
          }

          ClientHandlersInitializer

          在之前的基礎(chǔ)上,添加了重連處理器ReconnectHandler

          public?class?ClientHandlersInitializer?extends?ChannelInitializer<SocketChannel>?{

          ????private?ReconnectHandler?reconnectHandler;
          ????private?EchoHandler?echoHandler;

          ????public?ClientHandlersInitializer(TcpClient?tcpClient)?{
          ????????Assert.notNull(tcpClient,?"TcpClient?can?not?be?null.");
          ????????this.reconnectHandler?=?new?ReconnectHandler(tcpClient);
          ????????this.echoHandler?=?new?EchoHandler();
          ????}

          ????@Override
          ????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????????ChannelPipeline?pipeline?=?ch.pipeline();
          ????????pipeline.addLast(this.reconnectHandler);
          ????????pipeline.addLast(new?LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,?0,?4,?0,?4));
          ????????pipeline.addLast(new?LengthFieldPrepender(4));
          ????????pipeline.addLast(new?StringDecoder(CharsetUtil.UTF_8));
          ????????pipeline.addLast(new?StringEncoder(CharsetUtil.UTF_8));
          ????????pipeline.addLast(new?Pinger());
          ????}
          }

          TcpClient

          在之前的基礎(chǔ)上添加重連、重連策略的支持。

          public?class?TcpClient?{

          ????private?String?host;
          ????private?int?port;
          ????private?Bootstrap?bootstrap;
          ????/**?重連策略?*/
          ????private?RetryPolicy?retryPolicy;
          ????/**?將Channel保存起來,?可用于在其他非handler的地方發(fā)送數(shù)據(jù)?*/
          ????private?Channel?channel;

          ????public?TcpClient(String?host,?int?port)?{
          ????????this(host,?port,?new?ExponentialBackOffRetry(1000,?Integer.MAX_VALUE,?60?*?1000));
          ????}

          ????public?TcpClient(String?host,?int?port,?RetryPolicy?retryPolicy)?{
          ????????this.host?=?host;
          ????????this.port?=?port;
          ????????this.retryPolicy?=?retryPolicy;
          ????????init();
          ????}

          ????/**
          ?????*?向遠(yuǎn)程TCP服務(wù)器請求連接
          ?????*/

          ????public?void?connect()?{
          ????????synchronized?(bootstrap)?{
          ????????????ChannelFuture?future?=?bootstrap.connect(host,?port);
          ????????????future.addListener(getConnectionListener());
          ????????????this.channel?=?future.channel();
          ????????}
          ????}

          ????public?RetryPolicy?getRetryPolicy()?{
          ????????return?retryPolicy;
          ????}

          ????private?void?init()?{
          ????????EventLoopGroup?group?=?new?NioEventLoopGroup();
          ????????//?bootstrap?可重用,?只需在TcpClient實例化的時候初始化即可.
          ????????bootstrap?=?new?Bootstrap();
          ????????bootstrap.group(group)
          ????????????????.channel(NioSocketChannel.class)
          ????????????????.handler(new?ClientHandlersInitializer(TcpClient.this))
          ;
          ????}

          ????private?ChannelFutureListener?getConnectionListener()?{
          ????????return?new?ChannelFutureListener()?{
          ????????????@Override
          ????????????public?void?operationComplete(ChannelFuture?future)?throws?Exception?{
          ????????????????if?(!future.isSuccess())?{
          ????????????????????future.channel().pipeline().fireChannelInactive();
          ????????????????}
          ????????????}
          ????????};
          ????}

          ????public?static?void?main(String[]?args)?{
          ????????TcpClient?tcpClient?=?new?TcpClient("localhost",?2222);
          ????????tcpClient.connect();
          ????}

          }

          測試

          在測試之前,為了避開 Connection reset by peer 異常,可以稍微修改Pingerping()方法,添加if (second == 5)的條件判斷。如下:

          private?void?ping(Channel?channel)?{
          ????????int?second?=?Math.max(1,?random.nextInt(baseRandom));
          ????????if?(second?==?5)?{
          ????????????second?=?6;
          ????????}
          ????????System.out.println("next?heart?beat?will?send?after?"?+?second?+?"s.");
          ????????ScheduledFuture?future?=?channel.eventLoop().schedule(new?Runnable()?{
          ????????????@Override
          ????????????public?void?run()?{
          ????????????????if?(channel.isActive())?{
          ????????????????????System.out.println("sending?heart?beat?to?the?server...");
          ????????????????????channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
          ????????????????}?else?{
          ????????????????????System.err.println("The?connection?had?broken,?cancel?the?task?that?will?send?a?heart?beat.");
          ????????????????????channel.closeFuture();
          ????????????????????throw?new?RuntimeException();
          ????????????????}
          ????????????}
          ????????},?second,?TimeUnit.SECONDS);

          ????????future.addListener(new?GenericFutureListener()?{
          ????????????@Override
          ????????????public?void?operationComplete(Future?future)?throws?Exception?{
          ????????????????if?(future.isSuccess())?{
          ????????????????????ping(channel);
          ????????????????}
          ????????????}
          ????????});
          ????}

          啟動客戶端

          先只啟動客戶端,觀察控制臺輸出,可以看到類似如下日志:

          斷線重連測試——客戶端控制臺輸出

          可以看到,當(dāng)客戶端發(fā)現(xiàn)無法連接到服務(wù)器端,所以一直嘗試重連。隨著重試次數(shù)增加,重試時間間隔越大,但又不想無限增大下去,所以需要定一個閾值,比如60s。如上圖所示,當(dāng)下一次重試時間超過60s時,會打印Sleep extension too large(*). Pinning to 60000,單位為ms。出現(xiàn)這句話的意思是,計算出來的時間超過閾值(60s),所以把真正睡眠的時間重置為閾值(60s)。

          啟動服務(wù)器端

          接著啟動服務(wù)器端,然后繼續(xù)觀察客戶端控制臺輸出。

          斷線重連測試——服務(wù)器端啟動后客戶端控制臺輸出

          可以看到,在第9次重試失敗后,第10次重試之前,啟動的服務(wù)器,所以第10次重連的結(jié)果為Successfully established a connection to the server.,即成功連接到服務(wù)器。接下來因為還是不定時ping服務(wù)器,所以出現(xiàn)斷線重連、斷線重連的循環(huán)。

          擴展

          在不同環(huán)境,可能會有不同的重連需求。有不同的重連需求的,只需自己實現(xiàn)RetryPolicy接口,然后在創(chuàng)建TcpClient的時候覆蓋默認(rèn)的重連策略即可。

          完!!!

          END


          有熱門推薦??

          1.?面試官:Java 反射是什么?我回答不上來!

          2.?一個員工的離職成本到底有多恐怖!

          3.?面試官問:ZooKeeper是強一致的嗎?怎么實現(xiàn)的?

          4.?阿里 Nacos 驚爆,安全漏洞以繞過身份驗證(附修復(fù)建議)

          最近面試BAT,整理一份面試資料Java面試BATJ通關(guān)手冊,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。

          獲取方式:點“在看”,關(guān)注公眾號并回復(fù)?Java?領(lǐng)取,更多內(nèi)容陸續(xù)奉上。

          文章有幫助的話,在看,轉(zhuǎn)發(fā)吧。

          謝謝支持喲 (*^__^*)

          瀏覽 53
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  一级全黄60分钟免费看 | 黑人大粗鸡巴肏白人老骚 逼 | 哪里可以免费看av | 大香蕉做爱视频 | 黄片在线免费视频 |