Socket长链接是Android开发中很基础的功能,不少的App都会用到这个功能,实现的方式有不少,我今天就只写基于Apache Mina框架的实现方式,至于TCP/IP协议相关的知识就不涉及了,想了解更多关于Socket长链接知识的,能够去看看刚哥的文章java
Mina是一个基于NIO的网络框架,使用它编写程序时,能够专一于业务处理,而不用过于关心IO操做。不论应用程序采用什么协议(TCP、UDP)或者其它的,Mina提供了一套公用的接口,来支持这些协议。目前能够处理的协议有:HTTP, XML, TCP, LDAP, DHCP, NTP, DNS, XMPP, SSH, FTP… 。从这一点来讲,Mina不单单是一个基于NIO的框架,更是一个网络层协议的实现。git
完整项目代码我会上传到GitHub 添加相关编码协议,日志,设置心跳和Handlergithub
NioSocketConnector mSocketConnector = new NioSocketConnector();
// mSocketConnector.setConnectTimeoutMillis(Constants.TIMEOUT);
//设置协议封装解析处理
mSocketConnector.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new FrameCodecFactory()));
// 设置日志输出工厂
mSocketConnector.getFilterChain().addLast("logger", new LoggingFilter());
//设置心跳包
/*KeepAliveFilter heartFilter = new KeepAliveFilter(new HeartBeatMessageFactory()); //每 1 分钟发送一个心跳包 heartFilter.setRequestInterval(1 * 60); //心跳包超时时间 10s heartFilter.setRequestTimeout(10); // heartFilter.setRequestTimeoutHandler(new HeartBeatTimeoutHandler()); mSocketConnector.getFilterChain().addLast("heartbeat", heartFilter);*/
//设置 handler 处理业务逻辑
mSocketConnector.setHandler(new MessageHandler(context));
mSocketConnector.addListener(new MessageListener(mSocketConnector));
// 设置接收和发送缓冲区大小
mSocketConnector.getSessionConfig().setReceiveBufferSize(1024);
// mSocketConnector.getSessionConfig().setSendBufferSize(1024);
// 设置读取空闲时间:单位为s
mSocketConnector.getSessionConfig().setReaderIdleTime(60);
//配置服务器地址
InetSocketAddress mSocketAddress = new InetSocketAddress(Constants.HOST, Constants.PORT);
//发起链接
ConnectFuture mFuture = mSocketConnector.connect(mSocketAddress);
复制代码
断线重连处理,有一点不一样,mSocketConnector在添加相关协议的时候,要先判断apache
int count = 0;// 记录尝试重连的次数
NioSocketConnector mSocketConnector = null;
while (!isRepeat[0] && count < 10) {
try {
count++;
if (mSocketConnector == null) {
mSocketConnector = new NioSocketConnector();
}
// mSocketConnector.setConnectTimeoutMillis(Constants.TIMEOUT);
if (!mSocketConnector.getFilterChain().contains("protocol")) {
//设置协议封装解析处理
mSocketConnector.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new FrameCodecFactory()));
}
if (!mSocketConnector.getFilterChain().contains("logger")) {
// 设置日志输出工厂
mSocketConnector.getFilterChain().addLast("logger", new LoggingFilter());
}
/*if (!mSocketConnector.getFilterChain().contains("heartbeat")) { //设置心跳包 KeepAliveFilter heartFilter = new KeepAliveFilter(new HeartBeatMessageFactory()); //每 1 分钟发送一个心跳包 heartFilter.setRequestInterval(1 * 60); //心跳包超时时间 10s heartFilter.setRequestTimeout(10); // heartFilter.setRequestTimeoutHandler(new HeartBeatTimeoutHandler()); mSocketConnector.getFilterChain().addLast("heartbeat", heartFilter); }*/
//设置 handler 处理业务逻辑
mSocketConnector.setHandler(new MessageHandler(context));
mSocketConnector.addListener(new MessageListener(mSocketConnector));
// 设置接收和发送缓冲区大小
mSocketConnector.getSessionConfig().setReceiveBufferSize(1024);
// mSocketConnector.getSessionConfig().setSendBufferSize(1024);
// 设置读取空闲时间:单位为s
mSocketConnector.getSessionConfig().setReaderIdleTime(60);
//配置服务器地址
InetSocketAddress mSocketAddress = new InetSocketAddress(Constants.HOST, Constants.PORT);
//发起链接
ConnectFuture mFuture = mSocketConnector.connect(mSocketAddress);
mFuture.awaitUninterruptibly();
IoSession mSession = mFuture.getSession();
if (mSession.isConnected()) {
isRepeat[0] = true;
e.onNext(mSession);
e.onComplete();
break;
}
} catch (Exception e1) {
e1.printStackTrace();
if (count == Constants.REPEAT_TIME) {
System.out.println(Constants.stringNowTime() + " : 断线重连"
+ Constants.REPEAT_TIME + "次以后仍然未成功,结束重连.....");
break;
} else {
System.out.println(Constants.stringNowTime() + " : 本次断线重连失败,5s后进行第" + (count + 1) + "次重连.....");
try {
Thread.sleep(5000);
System.out.println(Constants.stringNowTime() + " : 开始第" + (count + 1) + "次重连.....");
} catch (InterruptedException e12) {
}
}
}
}
复制代码
编码,解码协议,根据实际状况而定服务器
public class FrameDecoder extends CumulativeProtocolDecoder {
private final static Charset charset = Charset.forName("UTF-8");
@Override
protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
//数据粘包,断包处理
int startPosition = ioBuffer.position();
while (ioBuffer.hasRemaining()) {
byte b = ioBuffer.get();
if (b == '\n') {//读取到\n时候认为一行已经读取完毕
int currentPosition = ioBuffer.position();
int limit = ioBuffer.limit();
ioBuffer.position(startPosition);
ioBuffer.limit(limit);
IoBuffer buffer = ioBuffer.slice();
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
String message = new String(bytes, charset);
protocolDecoderOutput.write(message);
ioBuffer.position(currentPosition);
ioBuffer.limit(limit);
return true;
}
}
ioBuffer.position(startPosition);
return false;
}
}
复制代码
public class FrameEncoder implements ProtocolEncoder {
private final static Charset charset = Charset.forName("UTF-8");
@Override
public void encode(IoSession ioSession, Object message, ProtocolEncoderOutput protocolEncoderOutput) throws Exception {
IoBuffer buff = IoBuffer.allocate(100).setAutoExpand(true);
buff.putString(message.toString(), charset.newEncoder());
// put 当前系统默认换行符 WINDOWS:\r\n, Linux:\n
buff.putString(LineDelimiter.WINDOWS.getValue(), charset.newEncoder());
// 为下一次读取数据作准备
buff.flip();
protocolEncoderOutput.write(buff);
}
@Override
public void dispose(IoSession ioSession) throws Exception {
}
}
复制代码
心跳包处理,Android设备加心跳并不能完成保持长链接的状态,毕竟保活你懂哒!网络
public class HeartBeatMessageFactory implements KeepAliveMessageFactory {
@Override
public boolean isRequest(IoSession ioSession, Object message) {
//若是是客户端主动向服务器发起的心跳包, return true, 该框架会发送 getRequest() 方法返回的心跳包内容.
if(message instanceof String && message.equals(Constants.PING_MESSAGE)){
return true;
}
return false;
}
@Override
public boolean isResponse(IoSession ioSession, Object message) {
//若是是服务器发送过来的心跳包, return true后会在 getResponse() 方法中处理心跳包.
if(message instanceof String && message.equals(Constants.PONG_MESSAGE)){
return true;
}
return false;
}
@Override
public Object getRequest(IoSession ioSession) {
//自定义向服务器发送的心跳包内容.
return Constants.PING_MESSAGE;
}
@Override
public Object getResponse(IoSession ioSession, Object message) {
//自定义解析服务器发送过来的心跳包.
return Constants.PONG_MESSAGE;
}
}
复制代码