【Netty】利用Netty实现心跳检测和重连机制

1、前言

  心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道本身还活着,以确保链接的有效性的机制。
  咱们用到的不少框架都用到了心跳检测,好比服务注册到 Eureka Server 以后会维护一个心跳链接,告诉 Eureka Server 本身还活着。本文就是利用 Netty 来实现心跳检测,以及客户端重连。html

2、设计思路

  1. 分为客户端和服务端
  2. 创建链接后,客户端先发送一个消息询问服务端是否能够进行通讯了。
  3. 客户端收到服务端 Yes 的应答后,主动发送心跳消息,服务端接收到心跳消息后,返回心跳应答,周而复始。
  4. 心跳超时利用 Netty 的 ReadTimeOutHandler 机制,当必定周期内(默认值50s)没有读取到对方任何消息时,须要主动关闭链路。若是是客户端,从新发起链接。
  5. 为了不出现粘/拆包问题,使用 DelimiterBasedFrameDecoderStringDecoder 来处理消息。

3、编码

  1. 先编写客户端 NettyClient
  1. public class NettyClient
  2.  
  3. private static final String HOST = "127.0.0.1"
  4.  
  5. private static final int PORT = 9911
  6.  
  7. private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); 
  8.  
  9. EventLoopGroup group = new NioEventLoopGroup(); 
  10.  
  11.  
  12. private void connect(String host,int port)
  13. try
  14. Bootstrap b = new Bootstrap(); 
  15. b.group(group) 
  16. .channel(NioSocketChannel.class) 
  17. .option(ChannelOption.TCP_NODELAY,true
  18. .remoteAddress(new InetSocketAddress(host,port)) 
  19. .handler(new ChannelInitializer<SocketChannel>() { 
  20. @Override 
  21. protected void initChannel(SocketChannel ch) throws Exception
  22. ByteBuf delimiter = Unpooled.copiedBuffer("$_", CharsetUtil.UTF_8); 
  23. ch.pipeline() 
  24. .addLast(new DelimiterBasedFrameDecoder(1024,delimiter)) 
  25. .addLast(new StringDecoder()) 
  26. // 当必定周期内(默认50s)没有收到对方任何消息时,须要主动关闭连接 
  27. .addLast("readTimeOutHandler",new ReadTimeoutHandler(50)) 
  28. .addLast("heartBeatHandler",new HeartBeatReqHandler()); 
  29. }); 
  30. // 发起异步链接操做 
  31. ChannelFuture future = b.connect().sync(); 
  32. future.channel().closeFuture().sync(); 
  33. }catch (Exception e){ 
  34. e.printStackTrace(); 
  35. }finally
  36. // 全部资源释放完以后,清空资源,再次发起重连操做 
  37. executor.execute(()->{ 
  38. try
  39. TimeUnit.SECONDS.sleep(5); 
  40. //发起重连操做 
  41. connect(NettyClient.HOST,NettyClient.PORT); 
  42. } catch (InterruptedException e) { 
  43. e.printStackTrace(); 
  44. }); 
  45.  
  46. public static void main(String[] args)
  47. new NettyClient().connect(NettyClient.HOST,NettyClient.PORT); 
  48.  

这里稍微复杂点的就是38行开始的重连部分。
2. 心跳消息发送类 HeartBeatReqHandlerjava

  1. package cn.sp.heartbeat; 
  2.  
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandler; 
  5. import io.netty.channel.ChannelHandlerContext; 
  6. import io.netty.channel.SimpleChannelInboundHandler; 
  7.  
  8. import java.util.concurrent.ScheduledFuture; 
  9. import java.util.concurrent.TimeUnit; 
  10.  
  11. /** 
  12. * Created by 2YSP on 2019/5/23. 
  13. */ 
  14. @ChannelHandler.Sharable 
  15. public class HeartBeatReqHandler extends SimpleChannelInboundHandler<String>
  16.  
  17. private volatile ScheduledFuture<?> heartBeat; 
  18.  
  19. private static final String hello = "start notify with server$_"
  20.  
  21. @Override 
  22. public void channelActive(ChannelHandlerContext ctx) throws Exception
  23. ctx.writeAndFlush(Unpooled.copiedBuffer(hello.getBytes())); 
  24. System.out.println("================"); 
  25.  
  26. @Override 
  27. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
  28. if (heartBeat != null){ 
  29. heartBeat.cancel(true); 
  30. heartBeat = null
  31. ctx.fireExceptionCaught(cause); 
  32.  
  33. @Override 
  34. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception
  35. if ("ok".equalsIgnoreCase(msg)){ 
  36. //服务端返回ok开始心跳 
  37. heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatTask(ctx),0,5000, TimeUnit.MILLISECONDS); 
  38. }else
  39. System.out.println("Client receive server heart beat message : --->"+msg); 
  40.  
  41. private class HeartBeatTask implements Runnable
  42.  
  43. private final ChannelHandlerContext ctx; 
  44.  
  45. public HeartBeatTask(ChannelHandlerContext ctx)
  46. this.ctx = ctx; 
  47.  
  48.  
  49. @Override 
  50. public void run()
  51. String heartBeat = "I am ok"
  52. System.out.println("Client send heart beat message to server: ----->"+heartBeat); 
  53. ctx.writeAndFlush(Unpooled.copiedBuffer((heartBeat+"$_").getBytes())); 
  54.  

channelActive()方法在首次创建链接后向服务端问好,若是服务端返回了 "ok" 就建立一个线程每隔5秒发送一次心跳消息。若是发生了异常,就取消定时任务并将其设置为 null,等待 GC 回收。
3. 服务端 NettyServergit

  1. public class NettyServer
  2.  
  3. public static void main(String[] args)
  4. new NettyServer().bind(9911); 
  5.  
  6. private void bind(int port)
  7. EventLoopGroup group = new NioEventLoopGroup(); 
  8. try
  9. ServerBootstrap b = new ServerBootstrap(); 
  10. b.group(group) 
  11. .channel(NioServerSocketChannel.class) 
  12. .childHandler(new ChannelInitializer<SocketChannel>() { 
  13. @Override 
  14. protected void initChannel(SocketChannel ch) throws Exception
  15. ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); 
  16.  
  17. ch.pipeline() 
  18. .addLast(new DelimiterBasedFrameDecoder(1024,delimiter)) 
  19. .addLast(new StringDecoder()) 
  20. .addLast("readTimeOutHandler",new ReadTimeoutHandler(50)) 
  21. .addLast("HeartBeatHandler",new HeartBeatRespHandler()); 
  22. }); 
  23. // 绑定端口,同步等待成功 
  24. b.bind(port).sync(); 
  25. System.out.println("Netty Server start ok ...."); 
  26. }catch (Exception e){ 
  27. e.printStackTrace(); 
  1. 心跳响应类 HeartBeatRespHandler
  1. package cn.sp.heartbeat; 
  2.  
  3. import io.netty.buffer.Unpooled; 
  4. import io.netty.channel.ChannelHandler; 
  5. import io.netty.channel.ChannelHandlerContext; 
  6. import io.netty.channel.SimpleChannelInboundHandler; 
  7.  
  8. /** 
  9. * Created by 2YSP on 2019/5/23. 
  10. */ 
  11. @ChannelHandler.Sharable 
  12. public class HeartBeatRespHandler extends SimpleChannelInboundHandler<String>
  13.  
  14. private static final String resp = "I have received successfully$_"
  15.  
  16. @Override 
  17. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception
  18. if (msg.equals("start notify with server")){ 
  19. ctx.writeAndFlush(Unpooled.copiedBuffer("ok$_".getBytes())); 
  20. }else
  21. //返回心跳应答信息 
  22. System.out.println("Receive client heart beat message: ---->"+ msg); 
  23. ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); 
  24.  
  25.  

第一次告诉客户端我已经准备好了,后面打印客户端发过来的信息并告诉客户端我已经收到你的消息了。github

4、测试

启动服务端再启动客户端,能够看到心跳检测正常,以下图。框架

 

服务端控制台
服务端控制台

 

 

客户端控制台
客户端控制台

如今让服务端宕机一段时间,看客户端可否重连并开始正常工做。

 

关闭服务端后,客户端周期性的链接失败,控制台输出如图:异步

 

链接失败
链接失败

从新启动服务端,过一下子发现重连成功了。

 

 

成功重连
成功重连

 

5、总结

总得来讲,使用 Netty 实现心跳检测仍是比较简单的,这里比较懒没有使用其余序列化协议(如 ProtoBuf 等),若是感兴趣的话你们能够本身试试。
代码地址,点击这里
有篇SpringBoot 整合长链接心跳机制的文章写的也很不错,地址https://crossoverjie.top/2018/05/24/netty/Netty(1)TCP-Heartbeat/ide