这一章,主要介绍下Netty的心跳处理,心跳处理在通讯开发中是最经常使用的,服务端经过心跳能够监控客户端的连接状态,进行相应的处理。java
记得,以前用NIO作了一个客户端和服务端通讯的项目,客户端并非用java写的,并且一个嵌入式的设备,走的lwapp协议栈,有时候嵌入式设备点击复位或者直接掉电后,服务端尚未反应过来,还认为连接是链接状态,资源也就是一直没有获得释放。早在BIO的时候经过检测返回值是不是-1,异常捕获,setSoTimeout(超时时间)来肯定客户端是否链接有效。到了nio只能本身实现一个心跳检测,很是的麻烦。好在Netty为咱们提供了IdleStateHandler类来完成心跳检测功能,它很是简单,只有三个参数:public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) 读超时时间,写超时时间,读写超时时间,而后实现用户事件触发监听userEventTriggered这个方法,在这个方法里作相应的处理就能够了,是否是很是的方便!bootstrap
下面来看一下服务端的代码:服务器
package com.dlb.note.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; /** * 功能:心跳时间服务器 * 版本:1.0 * 日期:2016/12/13 10:51 * 做者:馟苏 */ public class IdleTimerServer { /** * 主函数 */ public static void main(String []args) { // 配置服务端的NIO线程池 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // 当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度 .option(ChannelOption.SO_BACKLOG, 1024) .childHandler(new ChannelInitializer() { protected void initChannel(Channel channel) throws Exception { // 添加心跳处理器 5s读,5s写,10s读写 channel.pipeline().addLast(new IdleStateHandler(5, 5, 10)); channel.pipeline().addLast(new IdleTimerServerHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture future = serverBootstrap.bind(8888).sync(); System.out.println("服务器在8888端口监听hello"); // 等待服务端监听端口关闭 future.channel().closeFuture().sync(); System.out.println("服务器关闭bye"); } catch (Exception e) { e.printStackTrace(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } class IdleTimerServerHandler extends ChannelHandlerAdapter { // 可读 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 读数据 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("receive:" + body); // 写数据 ByteBuf res = Unpooled.copiedBuffer("hello,client!".getBytes()); ctx.write(res); ctx.flush(); } /** * 用户事件触发 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ // 接受心跳事件 IdleStateEvent event = (IdleStateEvent)evt; if(event.state() == IdleState.ALL_IDLE){ // 读和写状态 System.out.println("心跳结束"); //清除超时会话 ByteBuf res = Unpooled.copiedBuffer("you will close!".getBytes()); ChannelFuture writeAndFlush = ctx.writeAndFlush(res); // 监听结果 writeAndFlush.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { ctx.channel().close(); } }); } } } // 链接 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client come,ip:" + ctx.channel().remoteAddress()); } // 关闭 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("client close,ip:" + ctx.channel().remoteAddress()); ctx.close(); } // 异常 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(cause.toString()); ctx.close(); } }