客户端和服务端的链接属于socket链接,也属于长链接,每每会存在客户端在链接了服务端以后就没有任何操做了,但仍是占用了一个链接;当愈来愈多相似的客户端出现就会浪费不少链接,netty中能够经过心跳检测来找出必定程度(自定义规则判断哪些链接是无效连接)的无效连接并断开链接,保存真正活跃的链接。
我理解的心跳检测应该是客户端/服务端定时发送一个数据包给服务端/客户端,检测对方是否有响应; 若是是存活的链接,在必定的时间内应该会收到响应回来的数据包; 若是在必定时间内仍是收不到接收方的响应的话,就能够当作是挂机,能够断开此链接; 若是检测到了掉线以后还能够进行重连;
idleStateHandler在通道注册以后会开启一个定时任务,定时去检测通道中后续是否还有进行数据传输,若是在规定的时间内没有进行数据传输则会触发对应的超时事件,使用者能够根据对应的事件自定义规则来判别当前链接是不是活跃,是否须要关闭链接等来进行操做。 通常idleStateHandler触发的事件IdleStateEvent会在心跳handler中的userEventTriggered方法中捕获到对应的超时事件。
IdleStateHandler的继承关系:经过ChannelDuplexHandler类继承ChannelInboundHandler和实现ChannelOutboundHandler来实现对入站和出站的重写和监控java
IdleStateHandler初始化:为0表明不监控git
/** * @observeOutput 观察输出 * @readerIdleTime 读超时时间 自定义时间内检测Channel通道有没有读取到数据,为0表明不监控 * @writerIdleTime 写超时时间 自定义时间内检测Channel通道有没有write数据,为0表明不监控 * @allIdleTime 总超时时间 自定义时间内检测Channel通道有没有读/写数据,为0表明不监控 * @unit 时间单位 */ public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { if (unit == null) { throw new NullPointerException("unit"); } this.observeOutput = observeOutput; // 初始化读取空闲时间,最小值为0 if (readerIdleTime <= 0) { readerIdleTimeNanos = 0; } else { // 定义读取超时时间为自定义设置时间 readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS); } if (writerIdleTime <= 0) { writerIdleTimeNanos = 0; } else { // 设置写超时时间 writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS); } if (allIdleTime <= 0) { allIdleTimeNanos = 0; } else { // 设置总超时时间 allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS); } }
IdleStateHandler和channel通道的关联:经过类图能够得知,idleStateHandler能够重写入站和出站的方法,不过只是经过channelRead和write方法来记录阅读的时间等不作其余操做github
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 初始化检测器,开启定时任务 initialize(ctx); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 在通道非活跃状态的时候销毁定时任务 destroy(); super.channelInactive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 若是设置的读超时时间大于0则设置是否读操做为true if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { reading = true; firstReaderIdleEvent = firstAllIdleEvent = true; } // 记录时间和标识标志以后就直接fire当前read到下一个ChannelHandler处理类中 ctx.fireChannelRead(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 当读取完毕时,设置是否正在读取为false,设置最后读取时间为系统当前时间 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) { lastReadTime = ticksInNanos(); reading = false; } // 一样直接fire掉,跳入到下一个handler中 ctx.fireChannelReadComplete(); } // idleStateHandler重写的write方法 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // Allow writing with void promise if handler is only configured for read timeout events. if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { ctx.write(msg, promise.unvoid()).addListener(writeListener); } else { ctx.write(msg, promise); } } // ChannelOutboundHandler提供的write接口方法 /** * Called once a write operation is made. The write operation will write the messages through the * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once * {@link Channel#flush()} is called * 执行一次写操做;写操做经过ChannelPipeline来传输信息;最后经过channel的flush()方法来刷新 * * @param ctx the {@link ChannelHandlerContext} for which the write operation is made 实际写操做者 * @param msg the message to write 写的消息 * @param promise the {@link ChannelPromise} to notify once the operation completes 在操做完成时当即通知(相似future异步通知) * @throws Exception thrown if an error occurs */ void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
定时器初始化/销毁:将使用者自定义的超时时间设置为延迟定时任务bootstrap
初始化: private void initialize(ChannelHandlerContext ctx) { // Avoid the case where destroy() is called before scheduling timeouts. // See: https://github.com/netty/netty/issues/143 // state; // 0 - none, 1 - 初始化, 2 - 销毁 switch (state) { case 1: case 2: return; } state = 1; initOutputChanged(ctx); // 最后读取时间 lastReadTime = lastWriteTime = ticksInNanos(); // 若是设置的空闲时间大于0则开启定时任务进行监控 if (readerIdleTimeNanos > 0) { readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } // 写超时时间 if (writerIdleTimeNanos > 0) { writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } // 总超时时间 if (allIdleTimeNanos > 0) { allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } }
销毁: /** * @readerIdleTimeout ==> ScheduledFuture类 * @writerIdleTimeout ==> ScheduledFuture类 */ private void destroy() { // 设置销毁状态 state = 2; // 销毁线程 // ScheduledFuture if (readerIdleTimeout != null) { readerIdleTimeout.cancel(false); readerIdleTimeout = null; } if (writerIdleTimeout != null) { writerIdleTimeout.cancel(false); writerIdleTimeout = null; } if (allIdleTimeout != null) { allIdleTimeout.cancel(false); allIdleTimeout = null; } }
开启定时任务promise
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { return ctx.executor().schedule(task, delay, unit); }
读/写定时任务:定时任务启动的时候,经过设置的超时时间和上一次触发channelRead的时间进行相减比较来判断是否超时了网络
// 仅分析读取超时时间定时任务,写超时差很少就是触发的时间不同,比对的变量换成了设置的写超时时间 private final class ReaderIdleTimeoutTask extends AbstractIdleTask { ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { super(ctx); } @Override protected void run(ChannelHandlerContext ctx) { // readerIdleTimeNanos:初始化IdleStateHandler设置的读取超时时间 long nextDelay = readerIdleTimeNanos; // 若是没有任何读取操做 if (!reading) { // 判断是否有超时 // nextDelay = nextDelay-(ticksInNanos() - lastReadTime) // 即设置的超时时间减去距离上一次读取的时间 nextDelay -= ticksInNanos() - lastReadTime; } // 若是小于等于0 则触发读取超时事件,设置新的延迟时间 if (nextDelay <= 0) { // Reader is idle - set a new timeout and notify the callback. readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); // 标记为第一次 boolean first = firstReaderIdleEvent; // 设置成非第一次 firstReaderIdleEvent = false; try { IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { // Read occurred before the timeout - set a new timeout with shorter delay. // 超时的时候发生的读取事件,则从新延迟执行 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } } }
项目结构异步
├─src │ ├─main │ │ ├─java │ │ │ └─com │ │ │ └─hetangyuese │ │ │ └─netty │ │ │ ├─client │ │ │ │ MyChannelFutureListener.java │ │ │ │ MyClient05.java │ │ │ │ MyClientChannelHandler.java │ │ │ │ MyClientChannelInitializer.java │ │ │ │ │ │ │ └─server │ │ │ │ MyServer05.java │ │ │ │ MyServerChannelInitializer.java │ │ │ │ MyServerHandler.java │ │ │ │ │ │ │ └─decoder
在ChannelPipeline中注册IdleStateHandlersocket
public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline(). addLast(new StringEncoder(Charset.forName("GBK"))) .addLast(new StringDecoder(Charset.forName("GBK"))) .addLast(new LoggingHandler(LogLevel.INFO)) // 设置读取超时时间为5秒,写超时和总超时为0即不作监控 .addLast(new IdleStateHandler(5, 0, 0)) .addLast(new MyServerHandler()); } }
服务端处理handler(其中userEventTriggered为接收心跳任务触发的事件,此次作了计数三次触发读空闲超时则断开链接)ide
public class MyServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger count = new AtomicInteger(1); /** * 心跳检测机制会进入 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("心跳检测触发了事件, object: , time: " + evt + new Date().toLocaleString()); super.userEventTriggered(ctx, evt); if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; // 客户端链接应该是请求 write if (e.state() == IdleState.READER_IDLE) { System.out.println("服务端监测到了读取超时"); count.incrementAndGet(); if (count.get() > 3) { System.out.println("客户端还在?? 已经3次检测没有访问了,我要断开了哦!!!"); ctx.channel().close(); } } else if (e.state() == IdleState.WRITER_IDLE) { // 若是一直有交互则会发送writer_idle System.out.println("服务端收到了写入超时"); } else { System.out.println("服务端收到了All_idle"); } } else { super.userEventTriggered(ctx,evt); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("myServerHandler is active, time: " + new Date().toLocaleString()); ctx.writeAndFlush("成功链接服务端, 当前时间:" + new Date().toLocaleString()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("服务端与客户端断开了链接, time: " + new Date().toLocaleString()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("myServerHandler 收到了客户端的信息 msg:" + msg + ", time: " + new Date().toLocaleString()); ctx.writeAndFlush("您好,客户端,我是服务端"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
运行结果oop
myServer is start time: 2019-11-8 14:47:16 myServerHandler is active, time: 2019-11-8 14:47:18 十一月 08, 2019 2:47:18 下午 io.netty.handler.logging.LoggingHandler channelRegistered 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] REGISTERED 十一月 08, 2019 2:47:18 下午 io.netty.handler.logging.LoggingHandler channelActive 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] ACTIVE 十一月 08, 2019 2:47:18 下午 io.netty.handler.logging.LoggingHandler write 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] WRITE: 成功链接服务端, 当前时间:2019-11-8 14:47:18 十一月 08, 2019 2:47:18 下午 io.netty.handler.logging.LoggingHandler flush 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] FLUSH 心跳检测触发了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@a3a0212019-11-8 14:47:23 服务端监测到了读取超时 心跳检测触发了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@18692702019-11-8 14:47:28 服务端监测到了读取超时 心跳检测触发了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@18692702019-11-8 14:47:33 服务端监测到了读取超时 客户端还在?? 已经3次检测没有访问了,我要断开了哦!!! 十一月 08, 2019 2:47:33 下午 io.netty.handler.logging.LoggingHandler close 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 - R:/127.0.0.1:62195] CLOSE 十一月 08, 2019 2:47:33 下午 io.netty.handler.logging.LoggingHandler channelInactive 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 ! R:/127.0.0.1:62195] INACTIVE 服务端与客户端断开了链接, time: 2019-11-8 14:47:33 十一月 08, 2019 2:47:33 下午 io.netty.handler.logging.LoggingHandler channelUnregistered 信息: [id: 0x8d924d06, L:/127.0.0.1:9001 ! R:/127.0.0.1:62195] UNREGISTERED
出现断线重连的状况:
ChannelFutureListener进行重连
public class MyChannelFutureListener implements ChannelFutureListener { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("当前已链接"); return; } System.out.println("启动链接客户端失败,开始重连"); final EventLoop loop = future.channel().eventLoop(); loop.schedule(new Runnable() { @Override public void run() { try { MyClient05.reConnection(); System.out.println("客户端重连成功"); } catch (Exception e){ e.printStackTrace(); } } }, 1L, TimeUnit.SECONDS); } }
client启动 不知道为啥想要测试listener的效果时,connect的sync()不能带,是因为同步阻塞的缘由?
public void start() { EventLoopGroup group = new NioEventLoopGroup(); try { bootstrap = getBootstrap(); bootstrap.group(group) .option(ChannelOption.AUTO_READ, true) .option(ChannelOption.TCP_NODELAY, true) .channel(NioSocketChannel.class) .handler(new MyClientChannelInitializer()); // ChannelFuture future = bootstrap.connect(new InetSocketAddress(ip, port)).sync(); ChannelFuture future = bootstrap.connect(new InetSocketAddress(ip, port)); // 增长监听 future.addListener(new MyChannelFutureListener()); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } }
直接启动客户端,能够看到listener输出:启动链接客户端失败,开始重连
channelInactive进行重连(我是直接new了一个线程去重连)
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端断开了链接, time: " + new Date().toLocaleString()); new Thread(new Runnable() { @Override public void run() { try { new MyClient05("127.0.0.1", 9001).start(); System.out.println("客户端从新链接了服务端 time:" + new Date().toLocaleString()); } catch (Exception e) { e.printStackTrace(); } } }).start(); }
测试方法:
1.能够直接经过上面的心跳机制断开链接后,客户端的channelInactive检测到断开会自动执行重连
测试结果:
服务端: ------ 心跳检测触发了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@ab81552019-11-8 16:43:12 服务端监测到了读取超时 心跳检测触发了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@15192622019-11-8 16:43:17 服务端监测到了读取超时 心跳检测触发了事件, object: , time: io.netty.handler.timeout.IdleStateEvent@15192622019-11-8 16:43:22 服务端监测到了读取超时 客户端还在?? 已经3次检测没有访问了,我要断开了哦!!! 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler close 信息: [id: 0x6bfc0d90, L:/192.168.0.118:9001 - R:/192.168.0.118:51031] CLOSE 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler channelInactive 信息: [id: 0x6bfc0d90, L:/192.168.0.118:9001 ! R:/192.168.0.118:51031] INACTIVE 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler channelUnregistered 信息: [id: 0x6bfc0d90, L:/192.168.0.118:9001 ! R:/192.168.0.118:51031] UNREGISTERED 服务端与客户端断开了链接, time: 2019-11-8 16:43:22 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler channelRegistered 信息: [id: 0xbcb2ec62, L:/127.0.0.1:9001 - R:/127.0.0.1:51061] REGISTERED 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler channelActive 信息: [id: 0xbcb2ec62, L:/127.0.0.1:9001 - R:/127.0.0.1:51061] ACTIVE myServerHandler is active, time: 2019-11-8 16:43:22 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler write 信息: [id: 0xbcb2ec62, L:/127.0.0.1:9001 - R:/127.0.0.1:51061] WRITE: 成功链接服务端, 当前时间:2019-11-8 16:43:22 十一月 08, 2019 4:43:22 下午 io.netty.handler.logging.LoggingHandler flush 信息: [id: 0xbcb2ec62, L:/127.0.0.1:9001 - R:/127.0.0.1:51061] FLUSH 十一月 08, 2019 4:43:26 下午 io.netty.handler.logging.LoggingHandler channelReadComplete ------------------------------------------------------------------------------------- 客户端: ------- 当前已链接 客户端与服务端创建了链接 time: 2019-11-8 16:43:07 客户端接收到了服务的响应的数据 msg: 成功链接服务端, 当前时间:2019-11-8 16:43:07, time: 2019-11-8 16:43:07 客户端断开了链接, time: 2019-11-8 16:43:22 当前已链接 客户端与服务端创建了链接 time: 2019-11-8 16:43:22 客户端接收到了服务的响应的数据 msg: 成功链接服务端, 当前时间:2019-11-8 16:43:22, time: 2019-11-8 16:43:22