就是告诉其它人本身还活着。在简易RPC框架中,采用的是TCP长链接,为了确保长链接有效,就须要客户端与服务端之间有一种通知机制告知对方的存活状态。css
在状态空闲的时候定时给服务端发送消息类型为PING消息。java
捕获通道空闲状态事件,若是接收客户端PING消息,则发送PONG消息给服务端。若是在必定时间内没有收到客户端的PING消息,则说明客户端已经不在线,此时关闭通道。git
因为服务端会由于长时间接收不到服务端的PING消息而关闭通道,这就致使缓存在客户端的链接的可用性发生变化。须要将不可用的从可用列表中转移出去,并对不可用链接进行处理,好比直接丢弃或者是从新链接。github
ChannelPipeline与handle的关系。netty中的这些handle和spring mvc中的filter做用是相似的,ChannelPipeline能够理解成handle的容器,里面能够被注册众多处理不一样业务功能的事件处理器,好比:web
能够利用netty提供的IdleStateHandler来发送PING-PONG消息。这个处理器主要是捕获通道超时事件,主要有三类spring
客户端捕获读写超时,若是事件触发就给服务端发送PING消息。缓存
服务端只须要捕获读超时便可,当读超时触发后就关闭通道。mvc
为何在空闲状态才发送心跳消息框架
在正常客户端与服务端有交互的状况下,说明双方都在正常工做不须要额外的心跳来告知对方的存活。只有双方在必定时间内没有接收到对方的消息时才开始采用心跳消息来探测对方的存活,这也是一种提高效率的作法。ide
建立AbstractHeartbeatHandler,并继承ChannelInboundHandlerAdapter,服务于客户端与服务端的心跳处理器。在读取方法中判断消息类型:
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception { if(!(msg instanceof RpcMessage)){ channelHandlerContext.fireChannelRead(msg); return; } RpcMessage message=(RpcMessage)msg; if(null==message||null==message.getMessageHeader()){ channelHandlerContext.fireChannelRead(msg); return; } if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PONG){ logger.info("ClientHeartbeatHandler.channelRead0 ,pong data is:{}",message.getMessageBody()); } else if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PING){ this.sendPong(channelHandlerContext); } else { channelHandlerContext.fireChannelRead(msg); } }
空闲状态事件,能够根据不一样的状态作不一样的行为处理,定义三个可重写事件供客户端与服务端处理器具体确认处理事件。
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case READER_IDLE: this.handleReaderIdle(ctx); break; case WRITER_IDLE: this.handleWriterIdle(ctx); break; case ALL_IDLE: this.handleAllIdle(ctx); break; default: break; } } }
继承抽象心跳处理器,并重写事件发送PING消息。
public class ClientHeartbeatHandler extends AbstractHeartbeatHandler { @Override protected void handleAllIdle(ChannelHandlerContext ctx) { this.sendPing(ctx); } }
继承抽象心跳处理器,并重写事件关闭通道。
public class ServerHeartbeatHandler extends AbstractHeartbeatHandler { @Override protected void handleReaderIdle(ChannelHandlerContext ctx) { logger.info("ServerHeartbeatHandler.handleReaderIdle reader timeout ,close channel"); ctx.close(); } }
好比5秒内未写入或者读取通道数据就触发超时事件。
.addLast(new IdleStateHandler(0, 0, Constants.ALLIDLE_TIME_SECONDS));
好比10秒未接收到通道消息就触发读超时事件。
.addLast(new IdleStateHandler(Constants.READER_TIME_SECONDS, 0, 0))
正常状况下心跳消息显示以下图所示,消息的内容能够根据本身的状况自行定义。
中止客户端程序,而后服务端读超时事件触发,并关闭通道。
因为上述的服务端心跳处理器,在触发读超时后会关闭通讯管道,这致使客户端缓存的链接状态会出现不可用的状况,为了让客户端一直只能取到可用链接就须要对从缓存中获取到的链接作状态判断,若是可用直接返回,若是不可用则将链接从可用列表中删除而后取下一个可用链接。
经过channel的isActive属性能够判断链接是否可用,若是不能够作删除并从新获取的操做。
public RpcClientInvoker getInvoker() { // ... int index = loadbalanceService.index(size); RpcClientInvoker invoker= RpcClientInvokerCache.get(index); if(invoker.getChannel().isActive()) { return invoker; } else { RpcClientInvokerCache.removeHandler(invoker); logger.info("invoker is not active,so remove it and get next one"); return this.getInvoker(); } }
启动一个每隔5秒执行一次任务的线程,定时取出不可用链接,而后重连,并将不可用链接删除。
这里我处理的重连是直接丢弃原有不可用链接,而后从新建立新链接。
private static final Logger logger = LoggerFactory.getLogger(RpcClientInvokerManager.class); static { executorService.schedule(new Runnable() { @Override public void run() { while (true) { List<RpcClientInvoker> notConnectedHandlers = RpcClientInvokerCache.getNotConnectedHandlers(); if (!CollectionUtils.isEmpty(notConnectedHandlers)) { for (RpcClientInvoker invoker : notConnectedHandlers) { RpcClientInvokerManager.getInstance(referenceConfig).connect(); } RpcClientInvokerCache.clearNotConnectedHandler(); } } } }, Constants.RECONNECT_TIME_SECONDS,TimeUnit.SECONDS); }
https://github.com/jiangmin168168/jim-framework
文中代码是依赖上述项目的,若是有不明白的可下载源码
本文中的图取自于网格