写在前面
贴个Kula高清图镇楼:git
在以前的跟我一块儿开发商业级IM(1)—— 技术选型及协议定义和跟我一块儿开发商业级IM(2)—— 接口定义及封装两篇文章,咱们已经了解IMS的技术选型及接口定义与封装,接下来,咱们来真正实现链接及重连部分。github
一个社交产品,长链接稳定是前提,绝大部分业务逻辑的正常运行都须要稳定的长链接支撑,可谓重中之重。本篇文章将会讲述如何去实现并维护一个稳定的长链接,以及各类异常状况的处理等。阅读完本篇文章,你将会学到链接、重连机制、心跳机制等知识。同时,会在Github(https://github.com/FreddyChen)上开源相关代码(包含Android客户端/Java服务端、基于TCP/WebSocket),废话不说,咱们开始吧。web
初始化配置
初始化配置,也就是在应用程序启动并进行IMS
初始化时,传入所需配置参数,可根据本身的业务需求自定义。下面咱们来看看NettyTCPIMS
初始化接口的代码实现(「因为基于Netty
和WebSocket
实现的NettyWebSocketIMS
大部分代码及逻辑都与NettyTCPIMS
相同,就不单独贴出NettyWebSocketIMS
代码了,下面只会讲解WebSocket
对比TCP
实现所不一样的地方」,有须要完整代码的话能够跳转Github(https://github.com/FreddyChen/ims_kula)查看):算法
/**
* 初始化
* @param context
* @param options IMS初始化配置
* @param connectStatusListener IMS链接状态监听
* @param msgReceivedListener IMS消息接收监听
* @return
*/
@Override
public boolean init(Context context, IMSOptions options, IMSConnectStatusListener connectStatusListener, IMSMsgReceivedListener msgReceivedListener) {
if (context == null) {
Log.d(TAG, "初始化失败:Context is null.");
initialized = false;
return false;
}
if (options == null) {
Log.d(TAG, "初始化失败:IMSOptions is null.");
initialized = false;
return false;
}
this.mContext = context;
this.mIMSOptions = options;
this.mIMSConnectStatusListener = connectStatusListener;
this.mIMSMsgReceivedListener = msgReceivedListener;
executors = new ExecutorServiceFactory();
// 初始化重连线程池
executors.initBossLoopGroup();
// 注册网络链接状态监听
NetworkManager.getInstance().registerObserver(context, this);
// 标识ims初始化成功
initialized = true;
// 标识ims已打开
isClosed = false;
callbackIMSConnectStatus(IMSConnectStatus.Unconnected);
return true;
}
如上图,简单讲讲初始化的几个步骤:bootstrap
-
「参数」
-
「context」应用程序上下文,方便IMS获取系统资源并进行一些系统操做等。 -
「options」IMS初始化所需配置,其中定义了通讯实现方式、通讯协议、传输协议、链接超时时间、重连延时时间、重连次数、心跳先后台间隔时间、服务器地址等一些支持自定义的参数。 -
「connectStatusListener」IMS链接状态回调,便于把链接状态反馈到应用层。 -
「msgReceivedListener」消息接收回调,便于IMS把接收到的消息回调到应用层( 「本篇文章主要讲解链接及重连,因此不涉及消息部分,后续会详细讲解」)。
-
「建立线程池组」
线程池组分为boss线程池和work线程池,其中boss线程池负责链接及重连部分;work线程池负责心跳部分,均为单线程线程池(由于同时只能有一个线程进行链接或心跳)。至于为何用线程池,纯属我的习惯,你们也能够分别用一个子线程实现便可。 -
「注册网络状态监听」
网络变化时,进行IMS重连。
初始Bootstrap
初始化Bootstrap,可参考Netty ChannelOption并根据实际业务场景进行定制,下面贴出我本身定制的配置:服务器
/**
* 初始化bootstrap
*/
void initBootstrap() {
closeBootstrap();// 初始化前先关闭
NioEventLoopGroup loopGroup = new NioEventLoopGroup(4);
bootstrap = new Bootstrap();
bootstrap.group(loopGroup).channel(NioSocketChannel.class)
// 设置该选项之后,若是在两小时内没有数据的通讯时,TCP会自动发送一个活动探测数据报文
.option(ChannelOption.SO_KEEPALIVE, true)
// 设置禁用nagle算法,若是要求高实时性,有数据发送时就立刻发送,就将该选项设置为true关闭Nagle算法;若是要减小发送次数减小网络交互,就设置为false等累积必定大小后再发送。默认为false
.option(ChannelOption.TCP_NODELAY, true)
// 设置TCP发送缓冲区大小(字节数)
.option(ChannelOption.SO_SNDBUF, 32 * 1024)
// 设置TCP接收缓冲区大小(字节数)
.option(ChannelOption.SO_RCVBUF, 32 * 1024)
// 设置链接超时时长,单位:毫秒
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, mIMSOptions.getConnectTimeout())
// 设置初始化ChannelHandler
.handler(new NettyTCPChannelInitializerHandler(this));
}
至于参数的含义,你们可参照官方文档的介绍。微信
链接
链接也能够认为是重连,执行重连响应逻辑便可:websocket
/**
* 链接
*/
@Override
public void connect() {
if(!initialized) {
Log.w(TAG, "IMS初始化失败,请查看日志");
return;
}
isExecConnect = true;// 标识已执行过链接
this.reconnect(true);
}
「因此咱们直接看重连部分,也是整篇文章中最核心最复杂的部分」。网络
重连
由于链接及重连部分代码较多及逻辑较复杂,为了使NettyTCPIMS
代码尽可能简洁及逻辑清晰,因此将链接及重连部分代码抽取到NettyTCPReconnectTask
:框架
public class NettyTCPReconnectTask implements Runnable {
private static final String TAG = NettyTCPReconnectTask.class.getSimpleName();
private NettyTCPIMS ims;
private IMSOptions mIMSOptions;
NettyTCPReconnectTask(NettyTCPIMS ims) {
this.ims = ims;
this.mIMSOptions = ims.getIMSOptions();
}
@Override
public void run() {
try {
// 重连时,释放工做线程组,也就是中止心跳
ims.getExecutors().destroyWorkLoopGroup();
// ims未关闭而且网络可用的状况下,才去链接
while (!ims.isClosed() && ims.isNetworkAvailable()) {
IMSConnectStatus status;
if ((status = connect()) == IMSConnectStatus.Connected) {
ims.callbackIMSConnectStatus(status);
break;// 链接成功,跳出循环
}
if (status == IMSConnectStatus.ConnectFailed
|| status == IMSConnectStatus.ConnectFailed_IMSClosed
|| status == IMSConnectStatus.ConnectFailed_ServerListEmpty
|| status == IMSConnectStatus.ConnectFailed_ServerEmpty
|| status == IMSConnectStatus.ConnectFailed_ServerIllegitimate
|| status == IMSConnectStatus.ConnectFailed_NetworkUnavailable) {
ims.callbackIMSConnectStatus(status);
if(ims.isClosed() || !ims.isNetworkAvailable()) {
return;
}
// 一个服务器地址列表都链接失败后,说明网络状况可能不好,延时指定时间(重连间隔时间*2)再去进行下一个服务器地址的链接
Log.w(TAG, String.format("一个周期链接失败,等待%1$dms后再次尝试重连", mIMSOptions.getReconnectInterval() * 2));
try {
Thread.sleep(mIMSOptions.getReconnectInterval() * 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} finally {
// 标识重连任务中止
ims.setReconnecting(false);
}
}
/**
* 链接服务器
* @return
*/
private IMSConnectStatus connect() {
if (ims.isClosed()) return IMSConnectStatus.ConnectFailed_IMSClosed;
ims.initBootstrap();
List<String> serverList = mIMSOptions.getServerList();
if (serverList == null || serverList.isEmpty()) {
return IMSConnectStatus.ConnectFailed_ServerListEmpty;
}
for (int i = 0; i < serverList.size(); i++) {
String server = serverList.get(i);
if (StringUtil.isNullOrEmpty(server)) {
return IMSConnectStatus.ConnectFailed_ServerEmpty;
}
String[] params = null;
try {
params = server.split(" ");
} catch (Exception e) {
e.printStackTrace();
}
if (params == null || params.length < 2) {
return IMSConnectStatus.ConnectFailed_ServerIllegitimate;
}
if(i == 0) {
ims.callbackIMSConnectStatus(IMSConnectStatus.Connecting);
}
// +1是由于首次链接也认为是重连,因此若是重连次数设置为3,则最大链接次数为3+1次
for (int j = 0; j < mIMSOptions.getReconnectCount() + 1; j++) {
if (ims.isClosed()) {
return IMSConnectStatus.ConnectFailed_IMSClosed;
}
if (!ims.isNetworkAvailable()) {
return IMSConnectStatus.ConnectFailed_NetworkUnavailable;
}
Log.d(TAG, String.format("正在进行【%1$s】的第%2$d次链接", server, j + 1));
try {
String host = params[0];
int port = Integer.parseInt(params[1]);
Channel channel = toServer(host, port);
if (channel != null && channel.isOpen() && channel.isActive() && channel.isRegistered() && channel.isWritable()) {
ims.setChannel(channel);
return IMSConnectStatus.Connected;
} else {
if (j == mIMSOptions.getReconnectCount()) {
// 若是当前已达到最大重连次数,而且是最后一个服务器地址,则回调链接失败
if(i == serverList.size() - 1) {
Log.w(TAG, String.format("【%1$s】链接失败", server));
return IMSConnectStatus.ConnectFailed;
}
// 不然,无需回调链接失败,等待一段时间再去进行下一个服务器地址链接便可
// 也就是说,当服务器地址列表里的地址都链接失败,才认为是链接失败
else {
// 一个服务器地址链接失败后,延时指定时间再去进行下一个服务器地址的链接
Log.w(TAG, String.format("【%1$s】链接失败,正在等待进行下一个服务器地址的重连,当前重连延时时长:%2$dms", server, mIMSOptions.getReconnectInterval()));
Log.w(TAG, "=========================================================================================");
Thread.sleep(mIMSOptions.getReconnectInterval());
}
} else {
// 链接失败,则线程休眠(重连间隔时长 / 2 * n) ms
int delayTime = mIMSOptions.getReconnectInterval() + mIMSOptions.getReconnectInterval() / 2 * j;
Log.w(TAG, String.format("【%1$s】链接失败,正在等待重连,当前重连延时时长:%2$dms", server, delayTime));
Thread.sleep(delayTime);
}
}
} catch (InterruptedException e) {
break;// 线程被中断,则强制关闭
}
}
}
return IMSConnectStatus.ConnectFailed;
}
/**
* 真正链接服务器的地方
* @param host
* @param port
* @return
*/
private Channel toServer(String host, int port) {
Channel channel;
try {
channel = ims.getBootstrap().connect(host, port).sync().channel();
} catch (Exception e) {
e.printStackTrace();
channel = null;
}
return channel;
}
}
从以上代码,能够看到主要分为三个方法:
-
「run()」重连任务是一个Thread,
run()
方法也就是线程启动时执行的方法,主要是判断IMS是否关闭和网络状态,知足这两个条件就一直循环去链接,链接成功后,回调链接状态并中止线程,不然,一个周期链接失败后(一个链接失败周期,表明从开始链接到全部服务器地址达到最大重连次数),延时一段时间再去尝试重连(你们可能会问为何要去延时,直接链接很差吗?主要是由于若是链接失败的话,大多数状况下多是客户端网络环境很差或者是服务端存在问题,延时是为了在下一个时间节点时网络恢复等,避免频繁链接,节约性能),直至链接成功为止。 -
「toServer()」
toServer()
主要是Netty框架进行TCP长链接的代码,比较简单。 -
「connect()」链接及重连的全部逻辑,都放到
connect()
方法中进行。TCP
和WebSocket
的方式有细微的区别,下面主要以TCP
为例,至于WebSocket
的区别,稍后会列出来。
「注:ims_kula
SDK中固定的TCP服务器地址的格式为:IP地址 端口号,例:192.168.0.1 8808,你们也能够根据本身的需求来定义格式。」connect()
方法大致逻辑以下: -
判断IMS是否已关闭或网络是否不可用,若知足两个条件的其中之一,即返回链接失败状态; -
判断用户是否设置了服务器地址列表,若未设置,即返回链接失败状态; -
若以上条件都未知足,也就是IMS未关闭,网络可用,而且服务器地址已设置,则初始化Bootstrap; -
接着须要两个for循环,外层循环负责遍历服务器地址列表,取出每个服务器地址;内层循环负责遍历用户设置的最大重连次数,默认为3次,加上链接所需的一次,也就是说在不设置最大重连次数的状况下, ims_kula
SDK会对每一个服务器地址进行 「4」次链接。同时,重连间隔时间为 reconnectInterval + reconnectInterval / 2 * n,也就是若是设置重连间隔时间为8000ms,那么第二次重连间隔时间将为12000ms,第三次为16000ms,以此类推; -
获取服务器地址后,对地址进行字符串分割,分别获取host和port; -
接着调用Netty链接TCP的方法( toServer(String host, int port)
)进行链接便可。
「注:WebSocket
链接方式与TCP
大同小异,惟一的区别就是WebSocket
的服务器地址格式与TCP
不一样,ws://IP地址:端口号/websocket,例:ws://192.168.0.1:8808/websocket,因此WebSocket
获取host和port代码以下:」
(伪代码,具体代码可见NettyWebSocketReconnectTask
)
URI uri = URI.create(server);
String host = uri.getHost();
int port = uri.getPort();
至于其他链接及重连部分代码,WebSocket
与TCP
是一致的,由于WebSocket
自己就是基于TCP
协议做一层封装。
什么时候重连及断开链接
首先明确一下,「重连是相对于客户端来讲的,服务端不存在主动链接;断开链接是相对于服务端来讲的,严格来讲是移除响应的Channel。」
客户端重连时机:
-
网络切换 -
断开网络链接 -
可感知的服务端异常 -
心跳超时等
服务端断开链接时机:
-
可感知的客户端异常 -
心跳超时 -
同一IP的客户端重复链接等
不知道你们注意到没有,上述的客户端重连时机和服务端断开链接时机,都分别有一个可感知异常。什么是可感知异常?也就是不管客户端仍是服务端,在对方断开链接的时候,能够感知到,就是可感知异常。
经测试,在双方创建链接成功的状态下,对于客户端来讲,若是服务端手动中止服务,Netty会回调exceptionCaught()
方法,以下:
「服务端直接关机或者拔网线时,客户端没法感知,须要利用心跳超时机制进行重连。」
同理,对于服务端来讲,若是客户端手动杀死进程,Netty会回调channelInactive()
方法,以下:
「客户端直接关机或者断网时,服务端没法感知,一样须要利用心跳机制进行断开客户端链接(移除channel)。」
「注:利用心跳超时机制进行重连及断开链接会在后续文章讲解,本篇文章主要讲解链接及重连,就不在此展开了。」
效果展现
考虑到GIF图片体积过大,暂时先把链接超时时间和重连间隔时间适当缩短,下面展现几种状况下的客户端链接变化:
-
正常链接
-
客户端主动断网重连
-
服务端中止服务重连
「注:以上GIF图,客户端与服务端创建链接成功时,会显示“消息”字样,不然会显示链接状态。」
客户端日志以下:
服务端日志以下:
因为客户端杀死进程及服务端主动中止服务,日志会清空,因此就不贴更详细的日志了,感兴趣的同窗能够pull代码自行验证。
写在最后
经过以上代码实现,若是不考虑长链接稳定性的状况下(未加入心跳超时重连逻辑),已经能够进行客户端与服务端消息的收发,本文主要讲解链接及重连模块,因此暂未加入消息收发功能。
在下一篇文章中,将会讲解TCP/WebSocket的拆包与粘包处理,因为Netty已封装了各类不一样的消息编解码器,因此若是使用我定义的消息格式,拆包与粘包的处理将会很简单,直接拿来用便可。考虑到你们可能有不一样业务协议的需求,因此会加入自定义协议的消息编解码器的实现,敬请期待。
相关代码已提交Github,须要自取:
-
KulaChat(https://github.com/FreddyChen/KulaChat) -
kulachat-server(https://github.com/FreddyChen/kulachat-server) -
ims_kula(https://github.com/FreddyChen/ims_kula)
下篇文章见,古德拜~ ~ ~
本文分享自微信公众号 - FreddyChen(FreddyChenAndroid)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。