Netty 中的心跳機制,還有誰不會?
作者:rickiyang
出處:www.cnblogs.com/rickiyang/p/11074231.html我們知道在TCP長連接或者WebSocket長連接中一般我們都會使用心跳機制–即發(fā)送特殊的數(shù)據(jù)包來通告對方自己的業(yè)務(wù)還沒有辦完,不要關(guān)閉鏈接。
那么心跳機制可以用來做什么呢? 我們知道網(wǎng)絡(luò)的傳輸是不可靠的,當我們發(fā)起一個鏈接請求的過程之中會發(fā)生什么事情誰都無法預(yù)料,或者斷電,服務(wù)器重啟,斷網(wǎng)線之類。
如果有這種情況的發(fā)生對方也無法判斷你是否還在線。所以這時候我們引入心跳機制,在長鏈接中雙方?jīng)]有數(shù)據(jù)交互的時候互相發(fā)送數(shù)據(jù)(可能是空包,也可能是特殊數(shù)據(jù)),對方收到該數(shù)據(jù)之后也回復(fù)相應(yīng)的數(shù)據(jù)用以確保雙方都在線,這樣就可以確保當前鏈接是有效的。 1. 如何實現(xiàn)心跳機制
一般實現(xiàn)心跳機制由兩種方式:
TCP協(xié)議自帶的心跳機制來實現(xiàn); 在應(yīng)用層來實現(xiàn)。
但是TCP協(xié)議自帶的心跳機制系統(tǒng)默認是設(shè)置的是2小時的心跳頻率。它檢查不到機器斷電、網(wǎng)線拔出、防火墻這些斷線。而且邏輯層處理斷線可能也不是那么好處理。另外該心跳機制是與TCP協(xié)議綁定的,那如果我們要是使用UDP協(xié)議豈不是用不了?所以一般我們都不用。
而一般我們自己實現(xiàn)呢大致的策略是這樣的:
Client啟動一個定時器,不斷發(fā)送心跳; Server收到心跳后,做出回應(yīng); Server啟動一個定時器,判斷Client是否存在,這里做判斷有兩種方法:時間差和簡單標識。
收到一個心跳包之后記錄當前時間; 判斷定時器到達時間,計算多久沒收到心跳時間=當前時間-上次收到心跳時間。如果改時間大于設(shè)定值則認為超時。
收到心跳后設(shè)置連接標識為true; 判斷定時器到達時間,如果未收到心跳則設(shè)置連接標識為false;
今天我們來看一下Netty的心跳機制的實現(xiàn),在Netty中提供了IdleStateHandler類來進行心跳的處理,它可以對一個 Channel 的 讀/寫設(shè)置定時器, 當 Channel 在一定事件間隔內(nèi)沒有數(shù)據(jù)交互時(即處于 idle 狀態(tài)), 就會觸發(fā)指定的事件。
該類可以對三種類型的超時做心跳機制檢測:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
下面我們還是通過一個例子來講解IdleStateHandler的使用。
public class HeartBeatServer {
private int port;
public HeartBeatServer(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HeartBeatServerChannelInitializer());
try {
ChannelFuture future = server.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
HeartBeatServer server = new HeartBeatServer(7788);
server.start();
}
}
public class HeartBeatServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("handler",new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatServerHandler());
}
}
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
private int loss_connect_time = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "Server :" + msg.toString());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
//服務(wù)端對應(yīng)著讀事件,當為READER_IDLE時觸發(fā)
IdleStateEvent event = (IdleStateEvent)evt;
if(event.state() == IdleState.READER_IDLE){
loss_connect_time++;
System.out.println("接收消息超時");
if(loss_connect_time > 2){
System.out.println("關(guān)閉不活動的鏈接");
ctx.channel().close();
}
}else{
super.userEventTriggered(ctx,evt);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
再來寫一下客戶端:
public class HeartBeatsClient {
private int port;
private String address;
public HeartBeatsClient(int port, String address) {
this.port = port;
this.address = address;
}
public void start(){
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HeartBeatsClientChannelInitializer());
try {
ChannelFuture future = bootstrap.connect(address,port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
HeartBeatsClient client = new HeartBeatsClient(7788,"127.0.0.1");
client.start();
}
}
public class HeartBeatsClientChannelInitializer extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("handler", new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatClientHandler());
}
}
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF_8));
private static final int TRY_TIMES = 3;
private int currentTime = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("激活時間是:"+new Date());
System.out.println("鏈接已經(jīng)激活");
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("停止時間是:"+new Date());
System.out.println("關(guān)閉鏈接");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("當前輪詢時間:"+new Date());
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
if(currentTime <= TRY_TIMES){
System.out.println("currentTime:"+currentTime);
currentTime++;
ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
}
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message);
if (message.equals("Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
首先客戶端激活channel,因為客戶端中并沒有發(fā)送消息所以會觸發(fā)客戶端的IdleStateHandler,它設(shè)置的寫超時時間為3s; 然后觸發(fā)客戶端的事件機制進入userEventTriggered方法,在觸發(fā)器中計數(shù)并向客戶端發(fā)送消息; 服務(wù)端接收消息; 客戶端觸發(fā)器繼續(xù)輪詢發(fā)送消息,直到計數(shù)器滿不再向服務(wù)端發(fā)送消息; 服務(wù)端在IdleStateHandler設(shè)置的讀消息超時時間5s內(nèi)未收到消息,觸發(fā)了服務(wù)端中handler的userEventTriggered方法,于是關(guān)閉客戶端的鏈接。
/**
* 本Client為測試netty重連機制
* Server端代碼都一樣,所以不做修改
* 只用在client端中做一下判斷即可
*/
public class HeartBeatsClient2 {
private int port;
private String address;
ChannelFuture future;
public HeartBeatsClient2(int port, String address) {
this.port = port;
this.address = address;
}
public void start(){
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HeartBeatsClientChannelInitializer());
try {
future = bootstrap.connect(address,port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
//group.shutdownGracefully();
if (null != future) {
if (future.channel() != null && future.channel().isOpen()) {
future.channel().close();
}
}
System.out.println("準備重連");
start();
System.out.println("重連成功");
}
}
public static void main(String[] args) {
HeartBeatsClient2 client = new HeartBeatsClient2(7788,"127.0.0.1");
client.start();
}
}
其余部分的代碼與上面的實例并無異同,只需改造客戶端即可,我們再運行服務(wù)端和客戶端會看到客戶端雖然被關(guān)閉了,但是立馬又被重啟:
當然生產(chǎn)級別的代碼應(yīng)該不是這樣實現(xiàn)的吧,哈哈。
正文結(jié)束
1.不認命,從10年流水線工人,到谷歌上班的程序媛,一位湖南妹子的勵志故事
3.從零開始搭建創(chuàng)業(yè)公司后臺技術(shù)棧
5.37歲程序員被裁,120天沒找到工作,無奈去小公司,結(jié)果懵了...
一個人學習、工作很迷茫?
點擊「閱讀原文」加入我們的小圈子!

