Netty学习(八)-Netty的心跳机制

版权声明:本文为博主原创文章,未经博主容许不得转载。 https://blog.csdn.net/a953713428/article/details/69378412
咱们知道在TCP长链接或者WebSocket长链接中通常咱们都会使用心跳机制–即发送特殊的数据包来通告对方本身的业务尚未办完,不要关闭连接。那么心跳机制能够用来作什么呢?咱们知道网络的传输是不可靠的,当咱们发起一个连接请求的过程之中会发生什么事情谁都没法预料,或者断电,服务器重启,断网线之类。若是有这种状况的发生对方也没法判断你是否还在线。因此这时候咱们引入心跳机制,在长连接中双方没有数据交互的时候互相发送数据(多是空包,也多是特殊数据),对方收到该数据以后也回复相应的数据用以确保双方都在线,这样就能够确保当前连接是有效的。bootstrap

1. 如何实现心跳机制
通常实现心跳机制由两种方式:服务器

TCP协议自带的心跳机制来实现;
在应用层来实现。
可是TCP协议自带的心跳机制系统默认是设置的是2小时的心跳频率。它检查不到机器断电、网线拔出、防火墙这些断线。并且逻辑层处理断线可能也不是那么好处理。另外该心跳机制是与TCP协议绑定的,那若是咱们要是使用UDP协议岂不是用不了?因此通常咱们都不用。网络

而通常咱们本身实现呢大体的策略是这样的:socket

Client启动一个定时器,不断发送心跳;
Server收到心跳后,作出回应;
Server启动一个定时器,判断Client是否存在,这里作判断有两种方法:时间差和简单标识。
时间差:ide

收到一个心跳包以后记录当前时间;
判判定时器到达时间,计算多久没收到心跳时间=当前时间-上次收到心跳时间。若是改时间大于设定值则认为超时。
简单标识:oop

收到心跳后设置链接标识为true;
判判定时器到达时间,若是未收到心跳则设置链接标识为false;
今天咱们来看一下Netty的心跳机制的实现,在Netty中提供了IdleStateHandler类来进行心跳的处理,它能够对一个 Channel 的 读/写设置定时器, 当 Channel 在必定事件间隔内没有数据交互时(即处于 idle 状态), 就会触发指定的事件。测试

该类能够对三种类型的超时作心跳机制检测:this

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
1
2
3
readerIdleTimeSeconds:设置读超时时间;
writerIdleTimeSeconds:设置写超时时间;
allIdleTimeSeconds:同时为读或写设置超时时间;
下面咱们仍是经过一个例子来说解IdleStateHandler的使用。.net

服务端:netty

public class HeartBeatServer {
private int port;

public HeartBeatServer(int port) {
this.port = port;
}

public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();

ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HeartBeatServerChannelInitializer());

try {
ChannelFuture future = server.bind(port).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
HeartBeatServer server = new HeartBeatServer(7788);
server.start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
服务端Initializer:

public class HeartBeatServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();

pipeline.addLast("handler",new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatServerHandler());
}
}
1
2
3
4
5
6
7
8
9
10
11
在这里IdleStateHandler也是handler的一种,因此加入addLast。咱们分别设置4个参数:读超时时间为3s,写超时和读写超时为0,而后加入时间控制单元。

服务端handler:

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
private int loss_connect_time = 0;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "Server :" + msg.toString());
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
//服务端对应着读事件,当为READER_IDLE时触发
IdleStateEvent event = (IdleStateEvent)evt;
if(event.state() == IdleState.READER_IDLE){
loss_connect_time++;
System.out.println("接收消息超时");
if(loss_connect_time > 2){
System.out.println("关闭不活动的连接");
ctx.channel().close();
}
}else{
super.userEventTriggered(ctx,evt);
}
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
咱们看到在handler中调用了userEventTriggered方法,IdleStateEvent的state()方法一个有三个值:
READER_IDLE,WRITER_IDLE,ALL_IDLE。正好对应读事件写事件和读写事件。

再来写一下客户端:

public class HeartBeatsClient {
private int port;
private String address;

public HeartBeatsClient(int port, String address) {
this.port = port;
this.address = address;
}

public void start(){
EventLoopGroup group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HeartBeatsClientChannelInitializer());

try {
ChannelFuture future = bootstrap.connect(address,port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}

}

public static void main(String[] args) {
HeartBeatsClient client = new HeartBeatsClient(7788,"127.0.0.1");
client.start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
客户端Initializer:

public class HeartBeatsClientChannelInitializer extends ChannelInitializer<SocketChannel> {

protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();

pipeline.addLast("handler", new IdleStateHandler(0, 3, 0, TimeUnit.SECONDS));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new HeartBeatClientHandler());
}
}
1
2
3
4
5
6
7
8
9
10
11
这里咱们设置了IdleStateHandler的写超时为3秒,客户端执行的动做为写消息到服务端,服务端执行读动做。

客户端handler:

public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {

private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF_8));

private static final int TRY_TIMES = 3;

private int currentTime = 0;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("激活时间是:"+new Date());
System.out.println("连接已经激活");
ctx.fireChannelActive();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("中止时间是:"+new Date());
System.out.println("关闭连接");
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("当前轮询时间:"+new Date());
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
if(currentTime <= TRY_TIMES){
System.out.println("currentTime:"+currentTime);
currentTime++;
ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
}
}
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message);
if (message.equals("Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
启动服务端和客户端咱们看到输出为:

 

咱们再来屡一下思路:

首先客户端激活channel,由于客户端中并无发送消息因此会触发客户端的IdleStateHandler,它设置的写超时时间为3s;
而后触发客户端的事件机制进入userEventTriggered方法,在触发器中计数并向客户端发送消息;
服务端接收消息;
客户端触发器继续轮询发送消息,直到计数器满再也不向服务端发送消息;
服务端在IdleStateHandler设置的读消息超时时间5s内未收到消息,触发了服务端中handler的userEventTriggered方法,因而关闭客户端的连接。
大致咱们的简单心跳机制就是这样的思路,经过事件触发机制以及计数器的方式来实现,上面咱们的案例中最后客户端没有发送消息的时候咱们是强制断开了客户端的连接,那么既然能够关闭,咱们是否是也但是从新连接客户端呢?由于万一客户端自己并不想关闭而是因为别的缘由致使他没法与服务端通讯。下面咱们来讲一下重连机制。

当咱们的服务端在未读到客户端消息超时而关闭客户端的时候咱们通常在客户端的finally块中方的是关闭客户端的代码,这时咱们能够作一下修改的,finally是必定会被执行新的,因此咱们能够在finally块中从新调用一下启动客户端的代码,这样就又从新启动了客户端了,上客户端代码:

/**
* 本Client为测试netty重连机制
* Server端代码都同样,因此不作修改
* 只用在client端中作一下判断便可
*/
public class HeartBeatsClient2 {

private int port;
private String address;
ChannelFuture future;

public HeartBeatsClient2(int port, String address) {
this.port = port;
this.address = address;
}

public void start(){
EventLoopGroup group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new HeartBeatsClientChannelInitializer());

try {
future = bootstrap.connect(address,port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
//group.shutdownGracefully();
if (null != future) {
if (future.channel() != null && future.channel().isOpen()) {
future.channel().close();
}
}
System.out.println("准备重连");
start();
System.out.println("重连成功");
}

}

public static void main(String[] args) { HeartBeatsClient2 client = new HeartBeatsClient2(7788,"127.0.0.1"); client.start(); }}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748其他部分的代码与上面的实例并没有异同,只需改造客户端便可,咱们再运行服务端和客户端会看到客户端虽然被关闭了,可是立马又被重启:

相关文章
相关标签/搜索