服務(wù)的心跳機制與斷線重連,Netty底層是怎么實現(xiàn)的?
點擊上方藍色“小哈學(xué)Java”,選擇“設(shè)為星標(biāo)”
回復(fù)“資源”獲取獨家整理的學(xué)習(xí)資料!
作者:sprinkle_liz
www.jianshu.com/p/1a28e48edd92
提醒:本篇適合有一定
netty基礎(chǔ)的讀者閱讀
心跳機制
何為心跳
所謂心跳, 即在 TCP 長連接中, 客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包, 通知對方自己還在線, 以確保 TCP 連接的有效性.
注:心跳包還有另一個作用,經(jīng)常被忽略,即:一個連接如果長時間不用,防火墻或者路由器就會斷開該連接。
如何實現(xiàn)
核心Handler —— IdleStateHandler
在 Netty 中, 實現(xiàn)心跳機制的關(guān)鍵是 IdleStateHandler, 那么這個 Handler 如何使用呢? 先看下它的構(gòu)造器:
public?IdleStateHandler(int?readerIdleTimeSeconds,?int?writerIdleTimeSeconds,?int?allIdleTimeSeconds)?{
????this((long)readerIdleTimeSeconds,?(long)writerIdleTimeSeconds,?(long)allIdleTimeSeconds,?TimeUnit.SECONDS);
}
這里解釋下三個參數(shù)的含義:
readerIdleTimeSeconds: 讀超時. 即當(dāng)在指定的時間間隔內(nèi)沒有從
Channel讀取到數(shù)據(jù)時, 會觸發(fā)一個READER_IDLE的IdleStateEvent事件.writerIdleTimeSeconds: 寫超時. 即當(dāng)在指定的時間間隔內(nèi)沒有數(shù)據(jù)寫入到
Channel時, 會觸發(fā)一個WRITER_IDLE的IdleStateEvent事件.allIdleTimeSeconds: 讀/寫超時. 即當(dāng)在指定的時間間隔內(nèi)沒有讀或?qū)懖僮鲿r, 會觸發(fā)一個
ALL_IDLE的IdleStateEvent事件.
注:這三個參數(shù)默認(rèn)的時間單位是秒。若需要指定其他時間單位,可以使用另一個構(gòu)造方法:
IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
在看下面的實現(xiàn)之前,建議先了解一下IdleStateHandler的實現(xiàn)原理。相關(guān)鏈接:
https://blog.csdn.net/linuu/article/details/51385682
下面直接上代碼,需要注意的地方,會在代碼中通過注釋進行說明。
使用IdleStateHandler實現(xiàn)心跳
下面將使用IdleStateHandler來實現(xiàn)心跳,Client端連接到Server端后,會循環(huán)執(zhí)行一個任務(wù):隨機等待幾秒,然后ping一下Server端,即發(fā)送一個心跳包。當(dāng)?shù)却臅r間超過規(guī)定時間,將會發(fā)送失敗,以為Server端在此之前已經(jīng)主動斷開連接了。代碼如下:
Client端
ClientIdleStateTrigger —— 心跳觸發(fā)器
類ClientIdleStateTrigger也是一個Handler,只是重寫了userEventTriggered方法,用于捕獲IdleState.WRITER_IDLE事件(未在指定時間內(nèi)向服務(wù)器發(fā)送數(shù)據(jù)),然后向Server端發(fā)送一個心跳包。
/**
?*?
?*??用于捕獲{@link?IdleState#WRITER_IDLE}事件(未在指定時間內(nèi)向服務(wù)器發(fā)送數(shù)據(jù)),然后向Server端發(fā)送一個心跳包。
?*?
?*/
public?class?ClientIdleStateTrigger?extends?ChannelInboundHandlerAdapter?{
????public?static?final?String?HEART_BEAT?=?"heart?beat!";
????@Override
????public?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception?{
????????if?(evt?instanceof?IdleStateEvent)?{
????????????IdleState?state?=?((IdleStateEvent)?evt).state();
????????????if?(state?==?IdleState.WRITER_IDLE)?{
????????????????//?write?heartbeat?to?server
????????????????ctx.writeAndFlush(HEART_BEAT);
????????????}
????????}?else?{
????????????super.userEventTriggered(ctx,?evt);
????????}
????}
}
Pinger —— 心跳發(fā)射器
/**
?*?客戶端連接到服務(wù)器端后,會循環(huán)執(zhí)行一個任務(wù):隨機等待幾秒,然后ping一下Server端,即發(fā)送一個心跳包。
?*/
public?class?Pinger?extends?ChannelInboundHandlerAdapter?{
????private?Random?random?=?new?Random();
????private?int?baseRandom?=?8;
????private?Channel?channel;
????@Override
????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
????????super.channelActive(ctx);
????????this.channel?=?ctx.channel();
????????ping(ctx.channel());
????}
????private?void?ping(Channel?channel)?{
????????int?second?=?Math.max(1,?random.nextInt(baseRandom));
????????System.out.println("next?heart?beat?will?send?after?"?+?second?+?"s.");
????????ScheduledFuture>?future?=?channel.eventLoop().schedule(new?Runnable()?{
????????????@Override
????????????public?void?run()?{
????????????????if?(channel.isActive())?{
????????????????????System.out.println("sending?heart?beat?to?the?server...");
????????????????????channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
????????????????}?else?{
????????????????????System.err.println("The?connection?had?broken,?cancel?the?task?that?will?send?a?heart?beat.");
????????????????????channel.closeFuture();
????????????????????throw?new?RuntimeException();
????????????????}
????????????}
????????},?second,?TimeUnit.SECONDS);
????????future.addListener(new?GenericFutureListener()?{
????????????@Override
????????????public?void?operationComplete(Future?future)?throws?Exception?{
????????????????if?(future.isSuccess())?{
????????????????????ping(channel);
????????????????}
????????????}
????????});
????}
????@Override
????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
????????//?當(dāng)Channel已經(jīng)斷開的情況下,?仍然發(fā)送數(shù)據(jù),?會拋異常,?該方法會被調(diào)用.
????????cause.printStackTrace();
????????ctx.close();
????}
}
ClientHandlersInitializer —— 客戶端處理器集合的初始化類
public?class?ClientHandlersInitializer?extends?ChannelInitializer<SocketChannel>?{
????private?ReconnectHandler?reconnectHandler;
????private?EchoHandler?echoHandler;
????public?ClientHandlersInitializer(TcpClient?tcpClient)?{
????????Assert.notNull(tcpClient,?"TcpClient?can?not?be?null.");
????????this.reconnectHandler?=?new?ReconnectHandler(tcpClient);
????????this.echoHandler?=?new?EchoHandler();
????}
????@Override
????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
????????ChannelPipeline?pipeline?=?ch.pipeline();
????????pipeline.addLast(new?LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,?0,?4,?0,?4));
????????pipeline.addLast(new?LengthFieldPrepender(4));
????????pipeline.addLast(new?StringDecoder(CharsetUtil.UTF_8));
????????pipeline.addLast(new?StringEncoder(CharsetUtil.UTF_8));
????????pipeline.addLast(new?Pinger());
????}
}
注:上面的
Handler集合,除了Pinger,其他都是編解碼器和解決粘包,可以忽略。
TcpClient —— TCP連接的客戶端
public?class?TcpClient?{
????private?String?host;
????private?int?port;
????private?Bootstrap?bootstrap;
????/**?將Channel保存起來,?可用于在其他非handler的地方發(fā)送數(shù)據(jù)?*/
????private?Channel?channel;
????public?TcpClient(String?host,?int?port)?{
????????this(host,?port,?new?ExponentialBackOffRetry(1000,?Integer.MAX_VALUE,?60?*?1000));
????}
????public?TcpClient(String?host,?int?port,?RetryPolicy?retryPolicy)?{
????????this.host?=?host;
????????this.port?=?port;
????????init();
????}
????/**
?????*?向遠(yuǎn)程TCP服務(wù)器請求連接
?????*/
????public?void?connect()?{
????????synchronized?(bootstrap)?{
????????????ChannelFuture?future?=?bootstrap.connect(host,?port);
????????????this.channel?=?future.channel();
????????}
????}
????private?void?init()?{
????????EventLoopGroup?group?=?new?NioEventLoopGroup();
????????//?bootstrap?可重用,?只需在TcpClient實例化的時候初始化即可.
????????bootstrap?=?new?Bootstrap();
????????bootstrap.group(group)
????????????????.channel(NioSocketChannel.class)
????????????????.handler(new?ClientHandlersInitializer(TcpClient.this));
????}
????public?static?void?main(String[]?args)?{
????????TcpClient?tcpClient?=?new?TcpClient("localhost",?2222);
????????tcpClient.connect();
????}
}
Server端
ServerIdleStateTrigger —— 斷連觸發(fā)器
/**
?*?在規(guī)定時間內(nèi)未收到客戶端的任何數(shù)據(jù)包,?將主動斷開該連接
?*/
public?class?ServerIdleStateTrigger?extends?ChannelInboundHandlerAdapter?{
????@Override
????public?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception?{
????????if?(evt?instanceof?IdleStateEvent)?{
????????????IdleState?state?=?((IdleStateEvent)?evt).state();
????????????if?(state?==?IdleState.READER_IDLE)?{
????????????????//?在規(guī)定時間內(nèi)沒有收到客戶端的上行數(shù)據(jù),?主動斷開連接
????????????????ctx.disconnect();
????????????}
????????}?else?{
????????????super.userEventTriggered(ctx,?evt);
????????}
????}
}
ServerBizHandler —— 服務(wù)器端的業(yè)務(wù)處理器
/**
?*?收到來自客戶端的數(shù)據(jù)包后,?直接在控制臺打印出來.
?*/
@ChannelHandler.Sharable
public?class?ServerBizHandler?extends?SimpleChannelInboundHandler<String>?{
????private?final?String?REC_HEART_BEAT?=?"I?had?received?the?heart?beat!";
????@Override
????protected?void?channelRead0(ChannelHandlerContext?ctx,?String?data)?throws?Exception?{
????????try?{
????????????System.out.println("receive?data:?"?+?data);
//????????????ctx.writeAndFlush(REC_HEART_BEAT);
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????}
????@Override
????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
????????System.out.println("Established?connection?with?the?remote?client.");
????????//?do?something
????????ctx.fireChannelActive();
????}
????@Override
????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
????????System.out.println("Disconnected?with?the?remote?client.");
????????//?do?something
????????ctx.fireChannelInactive();
????}
????@Override
????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
????????cause.printStackTrace();
????????ctx.close();
????}
}
ServerHandlerInitializer —— 服務(wù)器端處理器集合的初始化類
/**
?*?用于初始化服務(wù)器端涉及到的所有Handler
?*/
public?class?ServerHandlerInitializer?extends?ChannelInitializer<SocketChannel>?{
????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
????????ch.pipeline().addLast("idleStateHandler",?new?IdleStateHandler(5,?0,?0));
????????ch.pipeline().addLast("idleStateTrigger",?new?ServerIdleStateTrigger());
????????ch.pipeline().addLast("frameDecoder",?new?LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,?0,?4,?0,?4));
????????ch.pipeline().addLast("frameEncoder",?new?LengthFieldPrepender(4));
????????ch.pipeline().addLast("decoder",?new?StringDecoder());
????????ch.pipeline().addLast("encoder",?new?StringEncoder());
????????ch.pipeline().addLast("bizHandler",?new?ServerBizHandler());
????}
}
注:
new IdleStateHandler(5, 0, 0)該handler代表如果在5秒內(nèi)沒有收到來自客戶端的任何數(shù)據(jù)包(包括但不限于心跳包),將會主動斷開與該客戶端的連接。
TcpServer —— 服務(wù)器端
public?class?TcpServer?{
????private?int?port;
????private?ServerHandlerInitializer?serverHandlerInitializer;
????public?TcpServer(int?port)?{
????????this.port?=?port;
????????this.serverHandlerInitializer?=?new?ServerHandlerInitializer();
????}
????public?void?start()?{
????????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup(1);
????????EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
????????try?{
????????????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
????????????bootstrap.group(bossGroup,?workerGroup)
????????????????????.channel(NioServerSocketChannel.class)
????????????????????.childHandler(this.serverHandlerInitializer);
????????????//?綁定端口,開始接收進來的連接
????????????ChannelFuture?future?=?bootstrap.bind(port).sync();
????????????System.out.println("Server?start?listen?at?"?+?port);
????????????future.channel().closeFuture().sync();
????????}?catch?(Exception?e)?{
????????????bossGroup.shutdownGracefully();
????????????workerGroup.shutdownGracefully();
????????????e.printStackTrace();
????????}
????}
????public?static?void?main(String[]?args)?throws?Exception?{
????????int?port?=?2222;
????????new?TcpServer(port).start();
????}
}
至此,所有代碼已經(jīng)編寫完畢。
測試
首先啟動客戶端,再啟動服務(wù)器端。啟動完成后,在客戶端的控制臺上,可以看到打印如下類似日志:

在服務(wù)器端可以看到控制臺輸出了類似如下的日志:

可以看到,客戶端在發(fā)送4個心跳包后,第5個包因為等待時間較長,等到真正發(fā)送的時候,發(fā)現(xiàn)連接已斷開了;而服務(wù)器端收到客戶端的4個心跳數(shù)據(jù)包后,遲遲等不到下一個數(shù)據(jù)包,所以果斷斷開該連接。
異常情況
在測試過程中,有可能會出現(xiàn)如下情況:

出現(xiàn)這種情況的原因是:在連接已斷開的情況下,仍然向服務(wù)器端發(fā)送心跳包。雖然在發(fā)送心跳包之前會使用channel.isActive()判斷連接是否可用,但也有可能上一刻判斷結(jié)果為可用,但下一刻發(fā)送數(shù)據(jù)包之前,連接就斷了。
目前尚未找到優(yōu)雅處理這種情況的方案,各位看官如果有好的解決方案,還望不吝賜教。拜謝!!!
斷線重連
斷線重連這里就不過多介紹,相信各位都知道是怎么回事。這里只說大致思路,然后直接上代碼。
實現(xiàn)思路
客戶端在監(jiān)測到與服務(wù)器端的連接斷開后,或者一開始就無法連接的情況下,使用指定的重連策略進行重連操作,直到重新建立連接或重試次數(shù)耗盡。
對于如何監(jiān)測連接是否斷開,則是通過重寫ChannelInboundHandler#channelInactive來實現(xiàn),但連接不可用,該方法會被觸發(fā),所以只需要在該方法做好重連工作即可。
代碼實現(xiàn)
注:以下代碼都是在上面心跳機制的基礎(chǔ)上修改/添加的。
因為斷線重連是客戶端的工作,所以只需對客戶端代碼進行修改。
重試策略
RetryPolicy —— 重試策略接口
public?interface?RetryPolicy?{
????/**
?????*?Called?when?an?operation?has?failed?for?some?reason.?This?method?should?return
?????*?true?to?make?another?attempt.
?????*
?????*?@param?retryCount?the?number?of?times?retried?so?far?(0?the?first?time)
?????*?@return?true/false
?????*/
????boolean?allowRetry(int?retryCount);
????/**
?????*?get?sleep?time?in?ms?of?current?retry?count.
?????*
?????*?@param?retryCount?current?retry?count
?????*?@return?the?time?to?sleep
?????*/
????long?getSleepTimeMs(int?retryCount);
}
ExponentialBackOffRetry —— 重連策略的默認(rèn)實現(xiàn)
/**
?*?Retry?policy?that?retries?a?set?number?of?times?with?increasing?sleep?time?between?retries
?*/
public?class?ExponentialBackOffRetry?implements?RetryPolicy?{
????private?static?final?int?MAX_RETRIES_LIMIT?=?29;
????private?static?final?int?DEFAULT_MAX_SLEEP_MS?=?Integer.MAX_VALUE;
????private?final?Random?random?=?new?Random();
????private?final?long?baseSleepTimeMs;
????private?final?int?maxRetries;
????private?final?int?maxSleepMs;
????public?ExponentialBackOffRetry(int?baseSleepTimeMs,?int?maxRetries)?{
????????this(baseSleepTimeMs,?maxRetries,?DEFAULT_MAX_SLEEP_MS);
????}
????public?ExponentialBackOffRetry(int?baseSleepTimeMs,?int?maxRetries,?int?maxSleepMs)?{
????????this.maxRetries?=?maxRetries;
????????this.baseSleepTimeMs?=?baseSleepTimeMs;
????????this.maxSleepMs?=?maxSleepMs;
????}
????@Override
????public?boolean?allowRetry(int?retryCount)?{
????????if?(retryCount?????????????return?true;
????????}
????????return?false;
????}
????@Override
????public?long?getSleepTimeMs(int?retryCount)?{
????????if?(retryCount?0)?{
????????????throw?new?IllegalArgumentException("retries?count?must?greater?than?0.");
????????}
????????if?(retryCount?>?MAX_RETRIES_LIMIT)?{
????????????System.out.println(String.format("maxRetries?too?large?(%d).?Pinning?to?%d",?maxRetries,?MAX_RETRIES_LIMIT));
????????????retryCount?=?MAX_RETRIES_LIMIT;
????????}
????????long?sleepMs?=?baseSleepTimeMs?*?Math.max(1,?random.nextInt(1?<????????if?(sleepMs?>?maxSleepMs)?{
????????????System.out.println(String.format("Sleep?extension?too?large?(%d).?Pinning?to?%d",?sleepMs,?maxSleepMs));
????????????sleepMs?=?maxSleepMs;
????????}
????????return?sleepMs;
????}
}
ReconnectHandler—— 重連處理器
@ChannelHandler.Sharable
public?class?ReconnectHandler?extends?ChannelInboundHandlerAdapter?{
????private?int?retries?=?0;
????private?RetryPolicy?retryPolicy;
????private?TcpClient?tcpClient;
????public?ReconnectHandler(TcpClient?tcpClient)?{
????????this.tcpClient?=?tcpClient;
????}
????@Override
????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
????????System.out.println("Successfully?established?a?connection?to?the?server.");
????????retries?=?0;
????????ctx.fireChannelActive();
????}
????@Override
????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
????????if?(retries?==?0)?{
????????????System.err.println("Lost?the?TCP?connection?with?the?server.");
????????????ctx.close();
????????}
????????boolean?allowRetry?=?getRetryPolicy().allowRetry(retries);
????????if?(allowRetry)?{
????????????long?sleepTimeMs?=?getRetryPolicy().getSleepTimeMs(retries);
????????????System.out.println(String.format("Try?to?reconnect?to?the?server?after?%dms.?Retry?count:?%d.",?sleepTimeMs,?++retries));
????????????final?EventLoop?eventLoop?=?ctx.channel().eventLoop();
????????????eventLoop.schedule(()?->?{
????????????????System.out.println("Reconnecting?...");
????????????????tcpClient.connect();
????????????},?sleepTimeMs,?TimeUnit.MILLISECONDS);
????????}
????????ctx.fireChannelInactive();
????}
????private?RetryPolicy?getRetryPolicy()?{
????????if?(this.retryPolicy?==?null)?{
????????????this.retryPolicy?=?tcpClient.getRetryPolicy();
????????}
????????return?this.retryPolicy;
????}
}
ClientHandlersInitializer
在之前的基礎(chǔ)上,添加了重連處理器ReconnectHandler。
public?class?ClientHandlersInitializer?extends?ChannelInitializer<SocketChannel>?{
????private?ReconnectHandler?reconnectHandler;
????private?EchoHandler?echoHandler;
????public?ClientHandlersInitializer(TcpClient?tcpClient)?{
????????Assert.notNull(tcpClient,?"TcpClient?can?not?be?null.");
????????this.reconnectHandler?=?new?ReconnectHandler(tcpClient);
????????this.echoHandler?=?new?EchoHandler();
????}
????@Override
????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
????????ChannelPipeline?pipeline?=?ch.pipeline();
????????pipeline.addLast(this.reconnectHandler);
????????pipeline.addLast(new?LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,?0,?4,?0,?4));
????????pipeline.addLast(new?LengthFieldPrepender(4));
????????pipeline.addLast(new?StringDecoder(CharsetUtil.UTF_8));
????????pipeline.addLast(new?StringEncoder(CharsetUtil.UTF_8));
????????pipeline.addLast(new?Pinger());
????}
}
TcpClient
在之前的基礎(chǔ)上添加重連、重連策略的支持。
public?class?TcpClient?{
????private?String?host;
????private?int?port;
????private?Bootstrap?bootstrap;
????/**?重連策略?*/
????private?RetryPolicy?retryPolicy;
????/**?將Channel保存起來,?可用于在其他非handler的地方發(fā)送數(shù)據(jù)?*/
????private?Channel?channel;
????public?TcpClient(String?host,?int?port)?{
????????this(host,?port,?new?ExponentialBackOffRetry(1000,?Integer.MAX_VALUE,?60?*?1000));
????}
????public?TcpClient(String?host,?int?port,?RetryPolicy?retryPolicy)?{
????????this.host?=?host;
????????this.port?=?port;
????????this.retryPolicy?=?retryPolicy;
????????init();
????}
????/**
?????*?向遠(yuǎn)程TCP服務(wù)器請求連接
?????*/
????public?void?connect()?{
????????synchronized?(bootstrap)?{
????????????ChannelFuture?future?=?bootstrap.connect(host,?port);
????????????future.addListener(getConnectionListener());
????????????this.channel?=?future.channel();
????????}
????}
????public?RetryPolicy?getRetryPolicy()?{
????????return?retryPolicy;
????}
????private?void?init()?{
????????EventLoopGroup?group?=?new?NioEventLoopGroup();
????????//?bootstrap?可重用,?只需在TcpClient實例化的時候初始化即可.
????????bootstrap?=?new?Bootstrap();
????????bootstrap.group(group)
????????????????.channel(NioSocketChannel.class)
????????????????.handler(new?ClientHandlersInitializer(TcpClient.this));
????}
????private?ChannelFutureListener?getConnectionListener()?{
????????return?new?ChannelFutureListener()?{
????????????@Override
????????????public?void?operationComplete(ChannelFuture?future)?throws?Exception?{
????????????????if?(!future.isSuccess())?{
????????????????????future.channel().pipeline().fireChannelInactive();
????????????????}
????????????}
????????};
????}
????public?static?void?main(String[]?args)?{
????????TcpClient?tcpClient?=?new?TcpClient("localhost",?2222);
????????tcpClient.connect();
????}
}
測試
在測試之前,為了避開 Connection reset by peer 異常,可以稍微修改Pinger的ping()方法,添加if (second == 5)的條件判斷。如下:
private?void?ping(Channel?channel)?{
????????int?second?=?Math.max(1,?random.nextInt(baseRandom));
????????if?(second?==?5)?{
????????????second?=?6;
????????}
????????System.out.println("next?heart?beat?will?send?after?"?+?second?+?"s.");
????????ScheduledFuture>?future?=?channel.eventLoop().schedule(new?Runnable()?{
????????????@Override
????????????public?void?run()?{
????????????????if?(channel.isActive())?{
????????????????????System.out.println("sending?heart?beat?to?the?server...");
????????????????????channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
????????????????}?else?{
????????????????????System.err.println("The?connection?had?broken,?cancel?the?task?that?will?send?a?heart?beat.");
????????????????????channel.closeFuture();
????????????????????throw?new?RuntimeException();
????????????????}
????????????}
????????},?second,?TimeUnit.SECONDS);
????????future.addListener(new?GenericFutureListener()?{
????????????@Override
????????????public?void?operationComplete(Future?future)?throws?Exception?{
????????????????if?(future.isSuccess())?{
????????????????????ping(channel);
????????????????}
????????????}
????????});
????}
啟動客戶端
先只啟動客戶端,觀察控制臺輸出,可以看到類似如下日志:

可以看到,當(dāng)客戶端發(fā)現(xiàn)無法連接到服務(wù)器端,所以一直嘗試重連。隨著重試次數(shù)增加,重試時間間隔越大,但又不想無限增大下去,所以需要定一個閾值,比如60s。如上圖所示,當(dāng)下一次重試時間超過60s時,會打印Sleep extension too large(*). Pinning to 60000,單位為ms。出現(xiàn)這句話的意思是,計算出來的時間超過閾值(60s),所以把真正睡眠的時間重置為閾值(60s)。
啟動服務(wù)器端
接著啟動服務(wù)器端,然后繼續(xù)觀察客戶端控制臺輸出。

可以看到,在第9次重試失敗后,第10次重試之前,啟動的服務(wù)器,所以第10次重連的結(jié)果為Successfully established a connection to the server.,即成功連接到服務(wù)器。接下來因為還是不定時ping服務(wù)器,所以出現(xiàn)斷線重連、斷線重連的循環(huán)。
擴展
在不同環(huán)境,可能會有不同的重連需求。有不同的重連需求的,只需自己實現(xiàn)RetryPolicy接口,然后在創(chuàng)建TcpClient的時候覆蓋默認(rèn)的重連策略即可。
完!!!
END
有熱門推薦??
最近面試BAT,整理一份面試資料《Java面試BATJ通關(guān)手冊》,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。
獲取方式:點“在看”,關(guān)注公眾號并回復(fù)?Java?領(lǐng)取,更多內(nèi)容陸續(xù)奉上。
文章有幫助的話,在看,轉(zhuǎn)發(fā)吧。
謝謝支持喲 (*^__^*)



