简易RPC框架-心跳与重连机制

心跳

就是告诉其它人本身还活着。在简易RPC框架中,采用的是TCP长链接,为了确保长链接有效,就须要客户端与服务端之间有一种通知机制告知对方的存活状态。css

如何实现

客户端发送心跳消息

在状态空闲的时候定时给服务端发送消息类型为PING消息。java

服务端接收心跳消息

捕获通道空闲状态事件,若是接收客户端PING消息,则发送PONG消息给服务端。若是在必定时间内没有收到客户端的PING消息,则说明客户端已经不在线,此时关闭通道。git

客户端管理可用链接

因为服务端会由于长时间接收不到服务端的PING消息而关闭通道,这就致使缓存在客户端的链接的可用性发生变化。须要将不可用的从可用列表中转移出去,并对不可用链接进行处理,好比直接丢弃或者是从新链接。github

预备知识

ChannelPipeline与handle的关系。netty中的这些handle和spring mvc中的filter做用是相似的,ChannelPipeline能够理解成handle的容器,里面能够被注册众多处理不一样业务功能的事件处理器,好比:web

  • 编码
  • 解码
  • 心跳
  • 权限
  • 加密
  • 解密
  • 业务代码执行
  • ......

具体实现

空闲状态处理器

能够利用netty提供的IdleStateHandler来发送PING-PONG消息。这个处理器主要是捕获通道超时事件,主要有三类spring

  • 读超时,必定时间内没有从通道内读取到任何数据
  • 写超时,必定时间内没有从通道内写入任何数据
  • 读写超时,必定时间内没有从通道内读取或者是写入任何数据

客户端加入空闲状态处理器

客户端捕获读写超时,若是事件触发就给服务端发送PING消息。缓存

服务端加入空闲状态处理器

服务端只须要捕获读超时便可,当读超时触发后就关闭通道。mvc

为何在空闲状态才发送心跳消息框架

在正常客户端与服务端有交互的状况下,说明双方都在正常工做不须要额外的心跳来告知对方的存活。只有双方在必定时间内没有接收到对方的消息时才开始采用心跳消息来探测对方的存活,这也是一种提高效率的作法。ide

抽象心跳处理器

建立AbstractHeartbeatHandler,并继承ChannelInboundHandlerAdapter,服务于客户端与服务端的心跳处理器。在读取方法中判断消息类型:

  • 若是是PING消息就发送PONG消息给客户端
  • 若是收到的是PONG消息,则直接打印消息说明客户端已经成功接收到服务端返回的PONG消息
  • 若是是其它类型的消息,则通知下一个处理器处理消息
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {

        if(!(msg instanceof RpcMessage)){
            channelHandlerContext.fireChannelRead(msg);
            return;
        }
        RpcMessage message=(RpcMessage)msg;

        if(null==message||null==message.getMessageHeader()){
            channelHandlerContext.fireChannelRead(msg);
            return;
        }
        if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PONG){
            logger.info("ClientHeartbeatHandler.channelRead0 ,pong data is:{}",message.getMessageBody());
        }
        else if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PING){
            this.sendPong(channelHandlerContext);
        }
        else {
            channelHandlerContext.fireChannelRead(msg);
        }

    }

空闲状态事件,能够根据不一样的状态作不一样的行为处理,定义三个可重写事件供客户端与服务端处理器具体确认处理事件。

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case READER_IDLE:
                    this.handleReaderIdle(ctx);
                    break;
                case WRITER_IDLE:
                    this.handleWriterIdle(ctx);
                    break;
                case ALL_IDLE:
                    this.handleAllIdle(ctx);
                    break;
                default:
                    break;
            }
        }
    }

客户端心跳处理器

继承抽象心跳处理器,并重写事件发送PING消息。

public class ClientHeartbeatHandler extends AbstractHeartbeatHandler {

    @Override
    protected void handleAllIdle(ChannelHandlerContext ctx) {
        this.sendPing(ctx);
    }
}

服务端心跳处理器

继承抽象心跳处理器,并重写事件关闭通道。

public class ServerHeartbeatHandler extends AbstractHeartbeatHandler {

    @Override
    protected void handleReaderIdle(ChannelHandlerContext ctx) {
        logger.info("ServerHeartbeatHandler.handleReaderIdle reader timeout ,close channel");
        ctx.close();
    }

}

客户端ChannelPipeline中加入心跳处理器

好比5秒内未写入或者读取通道数据就触发超时事件。

.addLast(new IdleStateHandler(0, 0, Constants.ALLIDLE_TIME_SECONDS));

服务端ChannelPipeline中加入心跳处理器

好比10秒未接收到通道消息就触发读超时事件。

.addLast(new IdleStateHandler(Constants.READER_TIME_SECONDS, 0, 0))

客户端消息示例

正常状况下心跳消息显示以下图所示,消息的内容能够根据本身的状况自行定义。

客户端下线消息示例

中止客户端程序,而后服务端读超时事件触发,并关闭通道。

客户端可用链接管理

因为上述的服务端心跳处理器,在触发读超时后会关闭通讯管道,这致使客户端缓存的链接状态会出现不可用的状况,为了让客户端一直只能取到可用链接就须要对从缓存中获取到的链接作状态判断,若是可用直接返回,若是不可用则将链接从可用列表中删除而后取下一个可用链接。

修改获取链接方法

经过channel的isActive属性能够判断链接是否可用,若是不能够作删除并从新获取的操做。

public RpcClientInvoker getInvoker() {
        // ...
        int index = loadbalanceService.index(size);
        RpcClientInvoker invoker= RpcClientInvokerCache.get(index);
        if(invoker.getChannel().isActive()) {
            return invoker;
        }
        else {
            RpcClientInvokerCache.removeHandler(invoker);
            logger.info("invoker is not active,so remove it and get next one");
            return this.getInvoker();
        }
    }

后台启动任务处理不可用链接

启动一个每隔5秒执行一次任务的线程,定时取出不可用链接,而后重连,并将不可用链接删除。

这里我处理的重连是直接丢弃原有不可用链接,而后从新建立新链接。

private static final Logger logger = LoggerFactory.getLogger(RpcClientInvokerManager.class);

    static {
        executorService.schedule(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    List<RpcClientInvoker> notConnectedHandlers = RpcClientInvokerCache.getNotConnectedHandlers();
                    if (!CollectionUtils.isEmpty(notConnectedHandlers)) {
                        for (RpcClientInvoker invoker : notConnectedHandlers) {
                            RpcClientInvokerManager.getInstance(referenceConfig).connect();
                        }
                        RpcClientInvokerCache.clearNotConnectedHandler();
                    }
                }
            }
        }, Constants.RECONNECT_TIME_SECONDS,TimeUnit.SECONDS);

    }

本文源码

https://github.com/jiangmin168168/jim-framework

文中代码是依赖上述项目的,若是有不明白的可下载源码

引用

本文中的图取自于网格

相关文章
相关标签/搜索