跟我一块儿开发商业级IM(3)—— 长链接稳定性之链接及重连

写在前面

贴个Kula高清图镇楼:
git

在以前的跟我一块儿开发商业级IM(1)—— 技术选型及协议定义跟我一块儿开发商业级IM(2)—— 接口定义及封装两篇文章,咱们已经了解IMS的技术选型及接口定义与封装,接下来,咱们来真正实现链接及重连部分。github

一个社交产品,长链接稳定是前提,绝大部分业务逻辑的正常运行都须要稳定的长链接支撑,可谓重中之重。本篇文章将会讲述如何去实现并维护一个稳定的长链接,以及各类异常状况的处理等。阅读完本篇文章,你将会学到链接、重连机制、心跳机制等知识。同时,会在Github(https://github.com/FreddyChen)上开源相关代码(包含Android客户端/Java服务端、基于TCP/WebSocket),废话不说,咱们开始吧。web

初始化配置

初始化配置,也就是在应用程序启动并进行IMS初始化时,传入所需配置参数,可根据本身的业务需求自定义。下面咱们来看看NettyTCPIMS初始化接口的代码实现(「因为基于NettyWebSocket实现的NettyWebSocketIMS大部分代码及逻辑都与NettyTCPIMS相同,就不单独贴出NettyWebSocketIMS代码了,下面只会讲解WebSocket对比TCP实现所不一样的地方」,有须要完整代码的话能够跳转Github(https://github.com/FreddyChen/ims_kula)查看):算法

 /**
  * 初始化
  * @param context
  * @param options               IMS初始化配置
  * @param connectStatusListener IMS链接状态监听
  * @param msgReceivedListener   IMS消息接收监听
  * @return
  */
 @Override
 public boolean init(Context context, IMSOptions options, IMSConnectStatusListener connectStatusListener, IMSMsgReceivedListener msgReceivedListener) {
    if (context == null) {
        Log.d(TAG, "初始化失败:Context is null.");
        initialized = false;
        return false;
    }

    if (options == null) {
        Log.d(TAG, "初始化失败:IMSOptions is null.");
        initialized = false;
        return false;
    }
    this.mContext = context;
    this.mIMSOptions = options;
    this.mIMSConnectStatusListener = connectStatusListener;
    this.mIMSMsgReceivedListener = msgReceivedListener;
    executors = new ExecutorServiceFactory();
    // 初始化重连线程池
    executors.initBossLoopGroup();
    // 注册网络链接状态监听
    NetworkManager.getInstance().registerObserver(context, this);
    // 标识ims初始化成功
    initialized = true;
    // 标识ims已打开
    isClosed = false;
    callbackIMSConnectStatus(IMSConnectStatus.Unconnected);
    return true;
}

如上图,简单讲讲初始化的几个步骤:bootstrap

  1. 「参数」
  • 「context」应用程序上下文,方便IMS获取系统资源并进行一些系统操做等。
  • 「options」IMS初始化所需配置,其中定义了通讯实现方式、通讯协议、传输协议、链接超时时间、重连延时时间、重连次数、心跳先后台间隔时间、服务器地址等一些支持自定义的参数。
  • 「connectStatusListener」IMS链接状态回调,便于把链接状态反馈到应用层。
  • 「msgReceivedListener」消息接收回调,便于IMS把接收到的消息回调到应用层( 「本篇文章主要讲解链接及重连,因此不涉及消息部分,后续会详细讲解」)。
  1. 「建立线程池组」
    线程池组分为boss线程池和work线程池,其中boss线程池负责链接及重连部分;work线程池负责心跳部分,均为单线程线程池(由于同时只能有一个线程进行链接或心跳)。至于为何用线程池,纯属我的习惯,你们也能够分别用一个子线程实现便可。
  2. 「注册网络状态监听」
    网络变化时,进行IMS重连。

初始Bootstrap

初始化Bootstrap,可参考Netty ChannelOption并根据实际业务场景进行定制,下面贴出我本身定制的配置:服务器

/**
 * 初始化bootstrap
 */
void initBootstrap() {
    closeBootstrap();// 初始化前先关闭
    NioEventLoopGroup loopGroup = new NioEventLoopGroup(4);
    bootstrap = new Bootstrap();
    bootstrap.group(loopGroup).channel(NioSocketChannel.class)
            // 设置该选项之后,若是在两小时内没有数据的通讯时,TCP会自动发送一个活动探测数据报文
            .option(ChannelOption.SO_KEEPALIVE, true)
            // 设置禁用nagle算法,若是要求高实时性,有数据发送时就立刻发送,就将该选项设置为true关闭Nagle算法;若是要减小发送次数减小网络交互,就设置为false等累积必定大小后再发送。默认为false
            .option(ChannelOption.TCP_NODELAY, true)
            // 设置TCP发送缓冲区大小(字节数)
            .option(ChannelOption.SO_SNDBUF, 32 * 1024)
            // 设置TCP接收缓冲区大小(字节数)
            .option(ChannelOption.SO_RCVBUF, 32 * 1024)
            // 设置链接超时时长,单位:毫秒
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, mIMSOptions.getConnectTimeout())
            // 设置初始化ChannelHandler
            .handler(new NettyTCPChannelInitializerHandler(this));
}

至于参数的含义,你们可参照官方文档的介绍。微信

链接

链接也能够认为是重连,执行重连响应逻辑便可:websocket

/**
 * 链接
 */
@Override
public void connect() {
    if(!initialized) {
        Log.w(TAG, "IMS初始化失败,请查看日志");
        return;
    }
    isExecConnect = true;// 标识已执行过链接
    this.reconnect(true);
}

「因此咱们直接看重连部分,也是整篇文章中最核心最复杂的部分」网络

重连

由于链接及重连部分代码较多及逻辑较复杂,为了使NettyTCPIMS代码尽可能简洁及逻辑清晰,因此将链接及重连部分代码抽取到NettyTCPReconnectTask框架

public class NettyTCPReconnectTask implements Runnable {

    private static final String TAG = NettyTCPReconnectTask.class.getSimpleName();
    private NettyTCPIMS ims;
    private IMSOptions mIMSOptions;

    NettyTCPReconnectTask(NettyTCPIMS ims) {
        this.ims = ims;
        this.mIMSOptions = ims.getIMSOptions();
    }

    @Override
    public void run() {
        try {
            // 重连时,释放工做线程组,也就是中止心跳
            ims.getExecutors().destroyWorkLoopGroup();

            // ims未关闭而且网络可用的状况下,才去链接
            while (!ims.isClosed() && ims.isNetworkAvailable()) {
                IMSConnectStatus status;
                if ((status = connect()) == IMSConnectStatus.Connected) {
                    ims.callbackIMSConnectStatus(status);
                    break;// 链接成功,跳出循环
                }

                if (status == IMSConnectStatus.ConnectFailed
                        || status == IMSConnectStatus.ConnectFailed_IMSClosed
                        || status == IMSConnectStatus.ConnectFailed_ServerListEmpty
                        || status == IMSConnectStatus.ConnectFailed_ServerEmpty
                        || status == IMSConnectStatus.ConnectFailed_ServerIllegitimate
                        || status == IMSConnectStatus.ConnectFailed_NetworkUnavailable) {
                    ims.callbackIMSConnectStatus(status);

                    if(ims.isClosed() || !ims.isNetworkAvailable()) {
                        return;
                    }
                    // 一个服务器地址列表都链接失败后,说明网络状况可能不好,延时指定时间(重连间隔时间*2)再去进行下一个服务器地址的链接
                    Log.w(TAG, String.format("一个周期链接失败,等待%1$dms后再次尝试重连", mIMSOptions.getReconnectInterval() * 2));
                    try {
                        Thread.sleep(mIMSOptions.getReconnectInterval() * 2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } finally {
            // 标识重连任务中止
            ims.setReconnecting(false);
        }
    }

    /**
     * 链接服务器
     * @return
     */
    private IMSConnectStatus connect() {
        if (ims.isClosed()) return IMSConnectStatus.ConnectFailed_IMSClosed;
        ims.initBootstrap();
        List<String> serverList = mIMSOptions.getServerList();
        if (serverList == null || serverList.isEmpty()) {
            return IMSConnectStatus.ConnectFailed_ServerListEmpty;
        }

        for (int i = 0; i < serverList.size(); i++) {
            String server = serverList.get(i);
            if (StringUtil.isNullOrEmpty(server)) {
                return IMSConnectStatus.ConnectFailed_ServerEmpty;
            }

            String[] params = null;
            try {
                params = server.split(" ");
            } catch (Exception e) {
                e.printStackTrace();
            }
            if (params == null || params.length < 2) {
                return IMSConnectStatus.ConnectFailed_ServerIllegitimate;
            }

            if(i == 0) {
                ims.callbackIMSConnectStatus(IMSConnectStatus.Connecting);
            }

            // +1是由于首次链接也认为是重连,因此若是重连次数设置为3,则最大链接次数为3+1次
            for (int j = 0; j < mIMSOptions.getReconnectCount() + 1; j++) {
                if (ims.isClosed()) {
                    return IMSConnectStatus.ConnectFailed_IMSClosed;
                }
                if (!ims.isNetworkAvailable()) {
                    return IMSConnectStatus.ConnectFailed_NetworkUnavailable;
                }

                Log.d(TAG, String.format("正在进行【%1$s】的第%2$d次链接", server, j + 1));
                try {
                    String host = params[0];
                    int port = Integer.parseInt(params[1]);
                    Channel channel = toServer(host, port);
                    if (channel != null && channel.isOpen() && channel.isActive() && channel.isRegistered() && channel.isWritable()) {
                        ims.setChannel(channel);
                        return IMSConnectStatus.Connected;
                    } else {
                        if (j == mIMSOptions.getReconnectCount()) {
                            // 若是当前已达到最大重连次数,而且是最后一个服务器地址,则回调链接失败
                            if(i == serverList.size() - 1) {
                                Log.w(TAG, String.format("【%1$s】链接失败", server));
                                return IMSConnectStatus.ConnectFailed;
                            }
                            // 不然,无需回调链接失败,等待一段时间再去进行下一个服务器地址链接便可
                            // 也就是说,当服务器地址列表里的地址都链接失败,才认为是链接失败
                            else {
                                // 一个服务器地址链接失败后,延时指定时间再去进行下一个服务器地址的链接
                                Log.w(TAG, String.format("【%1$s】链接失败,正在等待进行下一个服务器地址的重连,当前重连延时时长:%2$dms", server, mIMSOptions.getReconnectInterval()));
                                Log.w(TAG, "=========================================================================================");
                                Thread.sleep(mIMSOptions.getReconnectInterval());
                            }
                        } else {
                            // 链接失败,则线程休眠(重连间隔时长 / 2 * n) ms
                            int delayTime = mIMSOptions.getReconnectInterval() + mIMSOptions.getReconnectInterval() / 2 * j;
                            Log.w(TAG, String.format("【%1$s】链接失败,正在等待重连,当前重连延时时长:%2$dms", server, delayTime));
                            Thread.sleep(delayTime);
                        }
                    }
                } catch (InterruptedException e) {
                    break;// 线程被中断,则强制关闭
                }
            }
        }

        return IMSConnectStatus.ConnectFailed;
    }

    /**
     * 真正链接服务器的地方
     * @param host
     * @param port
     * @return
     */
    private Channel toServer(String host, int port) {
        Channel channel;
        try {
            channel = ims.getBootstrap().connect(host, port).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
            channel = null;
        }

        return channel;
    }
}

从以上代码,能够看到主要分为三个方法:

  • 「run()」重连任务是一个Thread,run()方法也就是线程启动时执行的方法,主要是判断IMS是否关闭和网络状态,知足这两个条件就一直循环去链接,链接成功后,回调链接状态并中止线程,不然,一个周期链接失败后(一个链接失败周期,表明从开始链接到全部服务器地址达到最大重连次数),延时一段时间再去尝试重连(你们可能会问为何要去延时,直接链接很差吗?主要是由于若是链接失败的话,大多数状况下多是客户端网络环境很差或者是服务端存在问题,延时是为了在下一个时间节点时网络恢复等,避免频繁链接,节约性能),直至链接成功为止。

  • 「toServer()」toServer()主要是Netty框架进行TCP长链接的代码,比较简单。

  • 「connect()」链接及重连的全部逻辑,都放到connect()方法中进行。TCPWebSocket的方式有细微的区别,下面主要以TCP为例,至于WebSocket的区别,稍后会列出来。
    「注:ims_kulaSDK中固定的TCP服务器地址的格式为:IP地址 端口号,例:192.168.0.1 8808,你们也能够根据本身的需求来定义格式。」

    connect()方法大致逻辑以下:

    • 判断IMS是否已关闭或网络是否不可用,若知足两个条件的其中之一,即返回链接失败状态;
    • 判断用户是否设置了服务器地址列表,若未设置,即返回链接失败状态;
    • 若以上条件都未知足,也就是IMS未关闭,网络可用,而且服务器地址已设置,则初始化Bootstrap;
    • 接着须要两个for循环,外层循环负责遍历服务器地址列表,取出每个服务器地址;内层循环负责遍历用户设置的最大重连次数,默认为3次,加上链接所需的一次,也就是说在不设置最大重连次数的状况下, ims_kulaSDK会对每一个服务器地址进行 「4」次链接。同时,重连间隔时间为 reconnectInterval + reconnectInterval / 2 * n,也就是若是设置重连间隔时间为8000ms,那么第二次重连间隔时间将为12000ms,第三次为16000ms,以此类推;
    • 获取服务器地址后,对地址进行字符串分割,分别获取host和port;
    • 接着调用Netty链接TCP的方法( toServer(String host, int port))进行链接便可。

「注:WebSocket链接方式与TCP大同小异,惟一的区别就是WebSocket的服务器地址格式与TCP不一样,ws://IP地址:端口号/websocket,例:ws://192.168.0.1:8808/websocket,因此WebSocket获取host和port代码以下:」
(伪代码,具体代码可见NettyWebSocketReconnectTask

URI uri = URI.create(server);
String host = uri.getHost();
int port = uri.getPort();

至于其他链接及重连部分代码,WebSocketTCP是一致的,由于WebSocket自己就是基于TCP协议做一层封装。

什么时候重连及断开链接

首先明确一下,「重连是相对于客户端来讲的,服务端不存在主动链接;断开链接是相对于服务端来讲的,严格来讲是移除响应的Channel。」

客户端重连时机:

  • 网络切换
  • 断开网络链接
  • 可感知的服务端异常
  • 心跳超时等

服务端断开链接时机:

  • 可感知的客户端异常
  • 心跳超时
  • 同一IP的客户端重复链接等

不知道你们注意到没有,上述的客户端重连时机和服务端断开链接时机,都分别有一个可感知异常。什么是可感知异常?也就是不管客户端仍是服务端,在对方断开链接的时候,能够感知到,就是可感知异常。

经测试,在双方创建链接成功的状态下,对于客户端来讲,若是服务端手动中止服务,Netty会回调exceptionCaught()方法,以下:

「服务端直接关机或者拔网线时,客户端没法感知,须要利用心跳超时机制进行重连。」

同理,对于服务端来讲,若是客户端手动杀死进程,Netty会回调channelInactive()方法,以下:

「客户端直接关机或者断网时,服务端没法感知,一样须要利用心跳机制进行断开客户端链接(移除channel)。」

「注:利用心跳超时机制进行重连及断开链接会在后续文章讲解,本篇文章主要讲解链接及重连,就不在此展开了。」

效果展现

考虑到GIF图片体积过大,暂时先把链接超时时间和重连间隔时间适当缩短,下面展现几种状况下的客户端链接变化:

  • 正常链接
  • 客户端主动断网重连
  • 服务端中止服务重连

「注:以上GIF图,客户端与服务端创建链接成功时,会显示“消息”字样,不然会显示链接状态。」

客户端日志以下:

服务端日志以下:

因为客户端杀死进程及服务端主动中止服务,日志会清空,因此就不贴更详细的日志了,感兴趣的同窗能够pull代码自行验证。

写在最后

经过以上代码实现,若是不考虑长链接稳定性的状况下(未加入心跳超时重连逻辑),已经能够进行客户端与服务端消息的收发,本文主要讲解链接及重连模块,因此暂未加入消息收发功能。

在下一篇文章中,将会讲解TCP/WebSocket的拆包与粘包处理,因为Netty已封装了各类不一样的消息编解码器,因此若是使用我定义的消息格式,拆包与粘包的处理将会很简单,直接拿来用便可。考虑到你们可能有不一样业务协议的需求,因此会加入自定义协议的消息编解码器的实现,敬请期待。

相关代码已提交Github,须要自取:

  • KulaChat(https://github.com/FreddyChen/KulaChat)
  • kulachat-server(https://github.com/FreddyChen/kulachat-server)
  • ims_kula(https://github.com/FreddyChen/ims_kula)

下篇文章见,古德拜~ ~ ~


本文分享自微信公众号 - FreddyChen(FreddyChenAndroid)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索