<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          萬(wàn)字好文!Netty實(shí)現(xiàn)心跳機(jī)制與斷線重連

          共 11597字,需瀏覽 24分鐘

           ·

          2021-01-23 09:48

          心跳機(jī)制

          何為心跳

          所謂心跳, 即在?TCP?長(zhǎng)連接中, 客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包, 通知對(duì)方自己還在線, 以確保?TCP?連接的有效性.

          注:心跳包還有另一個(gè)作用,經(jīng)常被忽略,即:一個(gè)連接如果長(zhǎng)時(shí)間不用,防火墻或者路由器就會(huì)斷開該連接。

          如何實(shí)現(xiàn)

          核心Handler —— IdleStateHandler

          在?Netty?中, 實(shí)現(xiàn)心跳機(jī)制的關(guān)鍵是?IdleStateHandler, 那么這個(gè)?Handler?如何使用呢? 先看下它的構(gòu)造器:

          public?IdleStateHandler(int?readerIdleTimeSeconds,?int?writerIdleTimeSeconds,?int?allIdleTimeSeconds)?{
          ????this((long)readerIdleTimeSeconds,?(long)writerIdleTimeSeconds,?(long)allIdleTimeSeconds,?TimeUnit.SECONDS);
          }

          這里解釋下三個(gè)參數(shù)的含義:

          • readerIdleTimeSeconds: 讀超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒有從?Channel?讀取到數(shù)據(jù)時(shí), 會(huì)觸發(fā)一個(gè)?READER_IDLE?的?IdleStateEvent?事件.
          • writerIdleTimeSeconds: 寫超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒有數(shù)據(jù)寫入到?Channel?時(shí), 會(huì)觸發(fā)一個(gè)?WRITER_IDLE?的?IdleStateEvent?事件.
          • allIdleTimeSeconds: 讀/寫超時(shí). 即當(dāng)在指定的時(shí)間間隔內(nèi)沒有讀或?qū)懖僮鲿r(shí), 會(huì)觸發(fā)一個(gè)?ALL_IDLE?的?IdleStateEvent?事件.

          注:這三個(gè)參數(shù)默認(rèn)的時(shí)間單位是。若需要指定其他時(shí)間單位,可以使用另一個(gè)構(gòu)造方法:IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)

          在看下面的實(shí)現(xiàn)之前,建議先了解一下IdleStateHandler的實(shí)現(xiàn)原理。

          下面直接上代碼,需要注意的地方,會(huì)在代碼中通過注釋進(jìn)行說明。

          使用IdleStateHandler實(shí)現(xiàn)心跳

          下面將使用IdleStateHandler來(lái)實(shí)現(xiàn)心跳,Client端連接到Server端后,會(huì)循環(huán)執(zhí)行一個(gè)任務(wù):隨機(jī)等待幾秒,然后ping一下Server端,即發(fā)送一個(gè)心跳包。當(dāng)?shù)却臅r(shí)間超過規(guī)定時(shí)間,將會(huì)發(fā)送失敗,以為Server端在此之前已經(jīng)主動(dòng)斷開連接了。代碼如下:

          Client端

          ClientIdleStateTrigger —— 心跳觸發(fā)器

          ClientIdleStateTrigger也是一個(gè)Handler,只是重寫了userEventTriggered方法,用于捕獲IdleState.WRITER_IDLE事件(未在指定時(shí)間內(nèi)向服務(wù)器發(fā)送數(shù)據(jù)),然后向Server端發(fā)送一個(gè)心跳包。

          /**
          ?*?


          ?*??用于捕獲{@link?IdleState#WRITER_IDLE}事件(未在指定時(shí)間內(nèi)向服務(wù)器發(fā)送數(shù)據(jù)),然后向Server端發(fā)送一個(gè)心跳包。
          ?*?


          ?*/

          public?class?ClientIdleStateTrigger?extends?ChannelInboundHandlerAdapter?{

          ????public?static?final?String?HEART_BEAT?=?"heart?beat!";

          ????@Override
          ????public?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception?{
          ????????if?(evt?instanceof?IdleStateEvent)?{
          ????????????IdleState?state?=?((IdleStateEvent)?evt).state();
          ????????????if?(state?==?IdleState.WRITER_IDLE)?{
          ????????????????//?write?heartbeat?to?server
          ????????????????ctx.writeAndFlush(HEART_BEAT);
          ????????????}
          ????????}?else?{
          ????????????super.userEventTriggered(ctx,?evt);
          ????????}
          ????}

          }
          Pinger —— 心跳發(fā)射器
          /**
          ?*?

          客戶端連接到服務(wù)器端后,會(huì)循環(huán)執(zhí)行一個(gè)任務(wù):隨機(jī)等待幾秒,然后ping一下Server端,即發(fā)送一個(gè)心跳包。


          ?*/

          public?class?Pinger?extends?ChannelInboundHandlerAdapter?{

          ????private?Random?random?=?new?Random();
          ????private?int?baseRandom?=?8;

          ????private?Channel?channel;

          ????@Override
          ????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????super.channelActive(ctx);
          ????????this.channel?=?ctx.channel();

          ????????ping(ctx.channel());
          ????}

          ????private?void?ping(Channel?channel)?{
          ????????int?second?=?Math.max(1,?random.nextInt(baseRandom));
          ????????System.out.println("next?heart?beat?will?send?after?"?+?second?+?"s.");
          ????????ScheduledFuture?future?=?channel.eventLoop().schedule(new?Runnable()?{
          ????????????@Override
          ????????????public?void?run()?{
          ????????????????if?(channel.isActive())?{
          ????????????????????System.out.println("sending?heart?beat?to?the?server...");
          ????????????????????channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
          ????????????????}?else?{
          ????????????????????System.err.println("The?connection?had?broken,?cancel?the?task?that?will?send?a?heart?beat.");
          ????????????????????channel.closeFuture();
          ????????????????????throw?new?RuntimeException();
          ????????????????}
          ????????????}
          ????????},?second,?TimeUnit.SECONDS);

          ????????future.addListener(new?GenericFutureListener()?{
          ????????????@Override
          ????????????public?void?operationComplete(Future?future)?throws?Exception?{
          ????????????????if?(future.isSuccess())?{
          ????????????????????ping(channel);
          ????????????????}
          ????????????}
          ????????});
          ????}

          ????@Override
          ????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
          ????????//?當(dāng)Channel已經(jīng)斷開的情況下,?仍然發(fā)送數(shù)據(jù),?會(huì)拋異常,?該方法會(huì)被調(diào)用.
          ????????cause.printStackTrace();
          ????????ctx.close();
          ????}
          }
          ClientHandlersInitializer —— 客戶端處理器集合的初始化類
          public?class?ClientHandlersInitializer?extends?ChannelInitializer<SocketChannel>?{

          ????private?ReconnectHandler?reconnectHandler;
          ????private?EchoHandler?echoHandler;

          ????public?ClientHandlersInitializer(TcpClient?tcpClient)?{
          ????????Assert.notNull(tcpClient,?"TcpClient?can?not?be?null.");
          ????????this.reconnectHandler?=?new?ReconnectHandler(tcpClient);
          ????????this.echoHandler?=?new?EchoHandler();
          ????}

          ????@Override
          ????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????????ChannelPipeline?pipeline?=?ch.pipeline();
          ????????pipeline.addLast(new?LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,?0,?4,?0,?4));
          ????????pipeline.addLast(new?LengthFieldPrepender(4));
          ????????pipeline.addLast(new?StringDecoder(CharsetUtil.UTF_8));
          ????????pipeline.addLast(new?StringEncoder(CharsetUtil.UTF_8));
          ????????pipeline.addLast(new?Pinger());
          ????}
          }

          注:上面的Handler集合,除了Pinger,其他都是編解碼器和解決粘包,可以忽略。

          TcpClient —— TCP連接的客戶端
          public?class?TcpClient?{

          ????private?String?host;
          ????private?int?port;
          ????private?Bootstrap?bootstrap;
          ????/**?將Channel保存起來(lái),?可用于在其他非handler的地方發(fā)送數(shù)據(jù)?*/
          ????private?Channel?channel;

          ????public?TcpClient(String?host,?int?port)?{
          ????????this(host,?port,?new?ExponentialBackOffRetry(1000,?Integer.MAX_VALUE,?60?*?1000));
          ????}

          ????public?TcpClient(String?host,?int?port,?RetryPolicy?retryPolicy)?{
          ????????this.host?=?host;
          ????????this.port?=?port;
          ????????init();
          ????}

          ????/**
          ?????*?向遠(yuǎn)程TCP服務(wù)器請(qǐng)求連接
          ?????*/

          ????public?void?connect()?{
          ????????synchronized?(bootstrap)?{
          ????????????ChannelFuture?future?=?bootstrap.connect(host,?port);
          ????????????this.channel?=?future.channel();
          ????????}
          ????}

          ????private?void?init()?{
          ????????EventLoopGroup?group?=?new?NioEventLoopGroup();
          ????????//?bootstrap?可重用,?只需在TcpClient實(shí)例化的時(shí)候初始化即可.
          ????????bootstrap?=?new?Bootstrap();
          ????????bootstrap.group(group)
          ????????????????.channel(NioSocketChannel.class)
          ????????????????.handler(new?ClientHandlersInitializer(TcpClient.this))
          ;
          ????}

          ????public?static?void?main(String[]?args)?{
          ????????TcpClient?tcpClient?=?new?TcpClient("localhost",?2222);
          ????????tcpClient.connect();
          ????}

          }

          Server端

          ServerIdleStateTrigger —— 斷連觸發(fā)器
          /**
          ?*?

          在規(guī)定時(shí)間內(nèi)未收到客戶端的任何數(shù)據(jù)包,?將主動(dòng)斷開該連接


          ?*/

          public?class?ServerIdleStateTrigger?extends?ChannelInboundHandlerAdapter?{
          ????@Override
          ????public?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception?{
          ????????if?(evt?instanceof?IdleStateEvent)?{
          ????????????IdleState?state?=?((IdleStateEvent)?evt).state();
          ????????????if?(state?==?IdleState.READER_IDLE)?{
          ????????????????//?在規(guī)定時(shí)間內(nèi)沒有收到客戶端的上行數(shù)據(jù),?主動(dòng)斷開連接
          ????????????????ctx.disconnect();
          ????????????}
          ????????}?else?{
          ????????????super.userEventTriggered(ctx,?evt);
          ????????}
          ????}
          }
          ServerBizHandler —— 服務(wù)器端的業(yè)務(wù)處理器
          /**
          ?*?

          收到來(lái)自客戶端的數(shù)據(jù)包后,?直接在控制臺(tái)打印出來(lái).


          ?*/

          @ChannelHandler.Sharable
          public?class?ServerBizHandler?extends?SimpleChannelInboundHandler<String>?{

          ????private?final?String?REC_HEART_BEAT?=?"I?had?received?the?heart?beat!";

          ????@Override
          ????protected?void?channelRead0(ChannelHandlerContext?ctx,?String?data)?throws?Exception?{
          ????????try?{
          ????????????System.out.println("receive?data:?"?+?data);
          //????????????ctx.writeAndFlush(REC_HEART_BEAT);
          ????????}?catch?(Exception?e)?{
          ????????????e.printStackTrace();
          ????????}
          ????}

          ????@Override
          ????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????System.out.println("Established?connection?with?the?remote?client.");

          ????????//?do?something

          ????????ctx.fireChannelActive();
          ????}

          ????@Override
          ????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????System.out.println("Disconnected?with?the?remote?client.");

          ????????//?do?something

          ????????ctx.fireChannelInactive();
          ????}

          ????@Override
          ????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
          ????????cause.printStackTrace();
          ????????ctx.close();
          ????}
          }
          ServerHandlerInitializer —— 服務(wù)器端處理器集合的初始化類
          /**
          ?*?

          用于初始化服務(wù)器端涉及到的所有Handler


          ?*/

          public?class?ServerHandlerInitializer?extends?ChannelInitializer<SocketChannel>?{

          ????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????????ch.pipeline().addLast("idleStateHandler",?new?IdleStateHandler(5,?0,?0));
          ????????ch.pipeline().addLast("idleStateTrigger",?new?ServerIdleStateTrigger());
          ????????ch.pipeline().addLast("frameDecoder",?new?LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,?0,?4,?0,?4));
          ????????ch.pipeline().addLast("frameEncoder",?new?LengthFieldPrepender(4));
          ????????ch.pipeline().addLast("decoder",?new?StringDecoder());
          ????????ch.pipeline().addLast("encoder",?new?StringEncoder());
          ????????ch.pipeline().addLast("bizHandler",?new?ServerBizHandler());
          ????}

          }

          注:new IdleStateHandler(5, 0, 0)handler代表如果在5秒內(nèi)沒有收到來(lái)自客戶端的任何數(shù)據(jù)包(包括但不限于心跳包),將會(huì)主動(dòng)斷開與該客戶端的連接。

          TcpServer —— 服務(wù)器端
          public?class?TcpServer?{
          ????private?int?port;
          ????private?ServerHandlerInitializer?serverHandlerInitializer;

          ????public?TcpServer(int?port)?{
          ????????this.port?=?port;
          ????????this.serverHandlerInitializer?=?new?ServerHandlerInitializer();
          ????}

          ????public?void?start()?{
          ????????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup(1);
          ????????EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
          ????????try?{
          ????????????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
          ????????????bootstrap.group(bossGroup,?workerGroup)
          ????????????????????.channel(NioServerSocketChannel.class)
          ????????????????????.childHandler(this.serverHandlerInitializer)
          ;
          ????????????//?綁定端口,開始接收進(jìn)來(lái)的連接
          ????????????ChannelFuture?future?=?bootstrap.bind(port).sync();

          ????????????System.out.println("Server?start?listen?at?"?+?port);
          ????????????future.channel().closeFuture().sync();
          ????????}?catch?(Exception?e)?{
          ????????????bossGroup.shutdownGracefully();
          ????????????workerGroup.shutdownGracefully();
          ????????????e.printStackTrace();
          ????????}
          ????}

          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????int?port?=?2222;
          ????????new?TcpServer(port).start();
          ????}
          }

          至此,所有代碼已經(jīng)編寫完畢。

          測(cè)試

          首先啟動(dòng)客戶端,再啟動(dòng)服務(wù)器端。啟動(dòng)完成后,在客戶端的控制臺(tái)上,可以看到打印如下類似日志:

          客戶端控制臺(tái)輸出的日志

          在服務(wù)器端可以看到控制臺(tái)輸出了類似如下的日志:

          服務(wù)器端控制臺(tái)輸出的日志

          可以看到,客戶端在發(fā)送4個(gè)心跳包后,第5個(gè)包因?yàn)榈却龝r(shí)間較長(zhǎng),等到真正發(fā)送的時(shí)候,發(fā)現(xiàn)連接已斷開了;而服務(wù)器端收到客戶端的4個(gè)心跳數(shù)據(jù)包后,遲遲等不到下一個(gè)數(shù)據(jù)包,所以果斷斷開該連接。

          在測(cè)試過程中,有可能會(huì)出現(xiàn)如下情況:

          異常情況

          出現(xiàn)這種情況的原因是:在連接已斷開的情況下,仍然向服務(wù)器端發(fā)送心跳包。雖然在發(fā)送心跳包之前會(huì)使用判斷連接是否可用,但也有可能上一刻判斷結(jié)果為可用,但下一刻發(fā)送數(shù)據(jù)包之前,連接就斷了。

          目前尚未找到優(yōu)雅處理這種情況的方案,各位看官如果有好的解決方案,還望不吝賜教。拜謝?。?!

          斷線重連

          斷線重連這里就不過多介紹,相信各位都知道是怎么回事。這里只說大致思路,然后直接上代碼。

          實(shí)現(xiàn)思路

          客戶端在監(jiān)測(cè)到與服務(wù)器端的連接斷開后,或者一開始就無(wú)法連接的情況下,使用指定的重連策略進(jìn)行重連操作,直到重新建立連接或重試次數(shù)耗盡。

          對(duì)于如何監(jiān)測(cè)連接是否斷開,則是通過重寫ChannelInboundHandler#channelInactive來(lái)實(shí)現(xiàn),但連接不可用,該方法會(huì)被觸發(fā),所以只需要在該方法做好重連工作即可。

          代碼實(shí)現(xiàn)

          注:以下代碼都是在上面心跳機(jī)制的基礎(chǔ)上修改/添加的。

          因?yàn)閿嗑€重連是客戶端的工作,所以只需對(duì)客戶端代碼進(jìn)行修改。

          重試策略

          RetryPolicy —— 重試策略接口

          public?interface?RetryPolicy?{

          ????/**
          ?????*?Called?when?an?operation?has?failed?for?some?reason.?This?method?should?return
          ?????*?true?to?make?another?attempt.
          ?????*
          ?????*?@param?retryCount?the?number?of?times?retried?so?far?(0?the?first?time)
          ?????*?@return?true/false
          ?????*/

          ????boolean?allowRetry(int?retryCount);

          ????/**
          ?????*?get?sleep?time?in?ms?of?current?retry?count.
          ?????*
          ?????*?@param?retryCount?current?retry?count
          ?????*?@return?the?time?to?sleep
          ?????*/

          ????long?getSleepTimeMs(int?retryCount);
          }

          ExponentialBackOffRetry —— 重連策略的默認(rèn)實(shí)現(xiàn)

          /**
          ?*?

          Retry?policy?that?retries?a?set?number?of?times?with?increasing?sleep?time?between?retries


          ?*/

          public?class?ExponentialBackOffRetry?implements?RetryPolicy?{

          ????private?static?final?int?MAX_RETRIES_LIMIT?=?29;
          ????private?static?final?int?DEFAULT_MAX_SLEEP_MS?=?Integer.MAX_VALUE;

          ????private?final?Random?random?=?new?Random();
          ????private?final?long?baseSleepTimeMs;
          ????private?final?int?maxRetries;
          ????private?final?int?maxSleepMs;

          ????public?ExponentialBackOffRetry(int?baseSleepTimeMs,?int?maxRetries)?{
          ????????this(baseSleepTimeMs,?maxRetries,?DEFAULT_MAX_SLEEP_MS);
          ????}

          ????public?ExponentialBackOffRetry(int?baseSleepTimeMs,?int?maxRetries,?int?maxSleepMs)?{
          ????????this.maxRetries?=?maxRetries;
          ????????this.baseSleepTimeMs?=?baseSleepTimeMs;
          ????????this.maxSleepMs?=?maxSleepMs;
          ????}

          ????@Override
          ????public?boolean?allowRetry(int?retryCount)?{
          ????????if?(retryCount?????????????return?true;
          ????????}
          ????????return?false;
          ????}

          ????@Override
          ????public?long?getSleepTimeMs(int?retryCount)?{
          ????????if?(retryCount?0)?{
          ????????????throw?new?IllegalArgumentException("retries?count?must?greater?than?0.");
          ????????}
          ????????if?(retryCount?>?MAX_RETRIES_LIMIT)?{
          ????????????System.out.println(String.format("maxRetries?too?large?(%d).?Pinning?to?%d",?maxRetries,?MAX_RETRIES_LIMIT));
          ????????????retryCount?=?MAX_RETRIES_LIMIT;
          ????????}
          ????????long?sleepMs?=?baseSleepTimeMs?*?Math.max(1,?random.nextInt(1?<????????if?(sleepMs?>?maxSleepMs)?{
          ????????????System.out.println(String.format("Sleep?extension?too?large?(%d).?Pinning?to?%d",?sleepMs,?maxSleepMs));
          ????????????sleepMs?=?maxSleepMs;
          ????????}
          ????????return?sleepMs;
          ????}
          }

          ReconnectHandler—— 重連處理器

          @ChannelHandler.Sharable
          public?class?ReconnectHandler?extends?ChannelInboundHandlerAdapter?{

          ????private?int?retries?=?0;
          ????private?RetryPolicy?retryPolicy;

          ????private?TcpClient?tcpClient;

          ????public?ReconnectHandler(TcpClient?tcpClient)?{
          ????????this.tcpClient?=?tcpClient;
          ????}

          ????@Override
          ????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????System.out.println("Successfully?established?a?connection?to?the?server.");
          ????????retries?=?0;
          ????????ctx.fireChannelActive();
          ????}

          ????@Override
          ????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
          ????????if?(retries?==?0)?{
          ????????????System.err.println("Lost?the?TCP?connection?with?the?server.");
          ????????????ctx.close();
          ????????}

          ????????boolean?allowRetry?=?getRetryPolicy().allowRetry(retries);
          ????????if?(allowRetry)?{

          ????????????long?sleepTimeMs?=?getRetryPolicy().getSleepTimeMs(retries);

          ????????????System.out.println(String.format("Try?to?reconnect?to?the?server?after?%dms.?Retry?count:?%d.",?sleepTimeMs,?++retries));

          ????????????final?EventLoop?eventLoop?=?ctx.channel().eventLoop();
          ????????????eventLoop.schedule(()?->?{
          ????????????????System.out.println("Reconnecting?...");
          ????????????????tcpClient.connect();
          ????????????},?sleepTimeMs,?TimeUnit.MILLISECONDS);
          ????????}
          ????????ctx.fireChannelInactive();
          ????}


          ????private?RetryPolicy?getRetryPolicy()?{
          ????????if?(this.retryPolicy?==?null)?{
          ????????????this.retryPolicy?=?tcpClient.getRetryPolicy();
          ????????}
          ????????return?this.retryPolicy;
          ????}
          }

          ClientHandlersInitializer

          在之前的基礎(chǔ)上,添加了重連處理器ReconnectHandler。

          public?class?ClientHandlersInitializer?extends?ChannelInitializer<SocketChannel>?{

          ????private?ReconnectHandler?reconnectHandler;
          ????private?EchoHandler?echoHandler;

          ????public?ClientHandlersInitializer(TcpClient?tcpClient)?{
          ????????Assert.notNull(tcpClient,?"TcpClient?can?not?be?null.");
          ????????this.reconnectHandler?=?new?ReconnectHandler(tcpClient);
          ????????this.echoHandler?=?new?EchoHandler();
          ????}

          ????@Override
          ????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
          ????????ChannelPipeline?pipeline?=?ch.pipeline();
          ????????pipeline.addLast(this.reconnectHandler);
          ????????pipeline.addLast(new?LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,?0,?4,?0,?4));
          ????????pipeline.addLast(new?LengthFieldPrepender(4));
          ????????pipeline.addLast(new?StringDecoder(CharsetUtil.UTF_8));
          ????????pipeline.addLast(new?StringEncoder(CharsetUtil.UTF_8));
          ????????pipeline.addLast(new?Pinger());
          ????}
          }

          TcpClient

          在之前的基礎(chǔ)上添加重連、重連策略的支持。

          public?class?TcpClient?{

          ????private?String?host;
          ????private?int?port;
          ????private?Bootstrap?bootstrap;
          ????/**?重連策略?*/
          ????private?RetryPolicy?retryPolicy;
          ????/**?將Channel保存起來(lái),?可用于在其他非handler的地方發(fā)送數(shù)據(jù)?*/
          ????private?Channel?channel;

          ????public?TcpClient(String?host,?int?port)?{
          ????????this(host,?port,?new?ExponentialBackOffRetry(1000,?Integer.MAX_VALUE,?60?*?1000));
          ????}

          ????public?TcpClient(String?host,?int?port,?RetryPolicy?retryPolicy)?{
          ????????this.host?=?host;
          ????????this.port?=?port;
          ????????this.retryPolicy?=?retryPolicy;
          ????????init();
          ????}

          ????/**
          ?????*?向遠(yuǎn)程TCP服務(wù)器請(qǐng)求連接
          ?????*/

          ????public?void?connect()?{
          ????????synchronized?(bootstrap)?{
          ????????????ChannelFuture?future?=?bootstrap.connect(host,?port);
          ????????????future.addListener(getConnectionListener());
          ????????????this.channel?=?future.channel();
          ????????}
          ????}

          ????public?RetryPolicy?getRetryPolicy()?{
          ????????return?retryPolicy;
          ????}

          ????private?void?init()?{
          ????????EventLoopGroup?group?=?new?NioEventLoopGroup();
          ????????//?bootstrap?可重用,?只需在TcpClient實(shí)例化的時(shí)候初始化即可.
          ????????bootstrap?=?new?Bootstrap();
          ????????bootstrap.group(group)
          ????????????????.channel(NioSocketChannel.class)
          ????????????????.handler(new?ClientHandlersInitializer(TcpClient.this))
          ;
          ????}

          ????private?ChannelFutureListener?getConnectionListener()?{
          ????????return?new?ChannelFutureListener()?{
          ????????????@Override
          ????????????public?void?operationComplete(ChannelFuture?future)?throws?Exception?{
          ????????????????if?(!future.isSuccess())?{
          ????????????????????future.channel().pipeline().fireChannelInactive();
          ????????????????}
          ????????????}
          ????????};
          ????}

          ????public?static?void?main(String[]?args)?{
          ????????TcpClient?tcpClient?=?new?TcpClient("localhost",?2222);
          ????????tcpClient.connect();
          ????}

          }

          測(cè)試

          在測(cè)試之前,為了避開?Connection reset by peer?異常,可以稍微修改Pingerping()方法,添加if (second == 5)的條件判斷。如下:

          private?void?ping(Channel?channel)?{
          ????????int?second?=?Math.max(1,?random.nextInt(baseRandom));
          ????????if?(second?==?5)?{
          ????????????second?=?6;
          ????????}
          ????????System.out.println("next?heart?beat?will?send?after?"?+?second?+?"s.");
          ????????ScheduledFuture?future?=?channel.eventLoop().schedule(new?Runnable()?{
          ????????????@Override
          ????????????public?void?run()?{
          ????????????????if?(channel.isActive())?{
          ????????????????????System.out.println("sending?heart?beat?to?the?server...");
          ????????????????????channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
          ????????????????}?else?{
          ????????????????????System.err.println("The?connection?had?broken,?cancel?the?task?that?will?send?a?heart?beat.");
          ????????????????????channel.closeFuture();
          ????????????????????throw?new?RuntimeException();
          ????????????????}
          ????????????}
          ????????},?second,?TimeUnit.SECONDS);

          ????????future.addListener(new?GenericFutureListener()?{
          ????????????@Override
          ????????????public?void?operationComplete(Future?future)?throws?Exception?{
          ????????????????if?(future.isSuccess())?{
          ????????????????????ping(channel);
          ????????????????}
          ????????????}
          ????????});
          ????}

          啟動(dòng)客戶端

          先只啟動(dòng)客戶端,觀察控制臺(tái)輸出,可以看到類似如下日志:

          斷線重連測(cè)試——客戶端控制臺(tái)輸出

          可以看到,當(dāng)客戶端發(fā)現(xiàn)無(wú)法連接到服務(wù)器端,所以一直嘗試重連。隨著重試次數(shù)增加,重試時(shí)間間隔越大,但又不想無(wú)限增大下去,所以需要定一個(gè)閾值,比如60s。如上圖所示,當(dāng)下一次重試時(shí)間超過60s時(shí),會(huì)打印Sleep extension too large(*). Pinning to 60000,單位為ms。出現(xiàn)這句話的意思是,計(jì)算出來(lái)的時(shí)間超過閾值(60s),所以把真正睡眠的時(shí)間重置為閾值(60s)。

          啟動(dòng)服務(wù)器端

          接著啟動(dòng)服務(wù)器端,然后繼續(xù)觀察客戶端控制臺(tái)輸出。

          圖片

          斷線重連測(cè)試——服務(wù)器端啟動(dòng)后客戶端控制臺(tái)輸出

          可以看到,在第9次重試失敗后,第10次重試之前,啟動(dòng)的服務(wù)器,所以第10次重連的結(jié)果為,即成功連接到服務(wù)器。接下來(lái)因?yàn)檫€是不定時(shí)服務(wù)器,所以出現(xiàn)斷線重連、斷線重連的循環(huán)。

          擴(kuò)展

          在不同環(huán)境,可能會(huì)有不同的重連需求。有不同的重連需求的,只需自己實(shí)現(xiàn)RetryPolicy接口,然后在創(chuàng)建TcpClient的時(shí)候覆蓋默認(rèn)的重連策略即可。

          來(lái)源:jianshu.com/p/1a28e48edd92

          版權(quán)申明:內(nèi)容來(lái)源網(wǎng)絡(luò),版權(quán)歸原創(chuàng)者所有。除非無(wú)法確認(rèn),我們都會(huì)標(biāo)明作者及出處,如有侵權(quán)煩請(qǐng)告知,我們會(huì)立即刪除并表示歉意。謝謝!





          感謝閱讀



          瀏覽 52
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  奇米无码| 欧美精品射精 | 亚洲AV无码成人片在线 | 天天操天天玩 | 91福利资源在线 |