众所周知,推送和 IM 在 Android 应用中很常见,但真正本身去实现的比较少,咱们大多会去选择第三方提供的成熟方案,如极光推送、云信等,由于移动网络具备不肯定性,所以本身实现一套稳定的方案会耗费不少精力,这对于小公司来讲是得不偿失的。git
推送和 IM 咱们平时用的不少,但真正了解原理的很少,真正动手实现过的很少。推送和 IM 本质上都是长链接,无非是业务方向不一样,所以咱们如下统称为长链接。今天咱们一块儿来揭开长链接的神秘面纱。github
虽然不少人都对 netty 比较熟悉了,可是可能仍是有不了解的同窗,所以咱们先简单介绍下 netty。编程
Netty是由 JBOSS 开发的一个 Java 开源框架api
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.promise
Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。缓存
这段简介摘自 netty 官网,是对 netty 的高度归纳。已经帮大家翻译好了 ^ _ ^bash
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
'Quick and easy' doesn't mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.服务器Netty是一个NIO客户端服务器框架,能够快速简单地开发协议服务器和客户端等网络应用程序。 它极大地简化和简化了TCP和UDP套接字服务器等网络编程。
“快速而简单”并不意味着由此产生的应用程序将受到可维护性或性能问题的困扰。 Netty的设计经验很是丰富,包括FTP,SMTP,HTTP以及各类基于二进制和文本的传统协议。 所以,Netty已经成功地找到了一个方法来实现轻松的开发,性能,稳定性和灵活性,而不用妥协。网络
一复制就停不下来了 =。= 主要是以为官网介绍的很准确。并发
这里提到了 事件驱动
,可能你们以为有点陌生,事件驱动其实很简单,好比你点了下鼠标,软件执行相应的操做,这就是一个事件驱动模型,再举一个例子,Android 中的 Message Looper Handler 也是事件驱动,经过 Handler 发送一个消息,这个消息就至关于一个事件,Looper 取出事件,再由 Handler 处理。
这些特性就使得 netty 很适合用于高并发的长链接。
今天,咱们就一块儿使用 netty 实现一个 Android IM,包括客户端和服务端。
做为一个 IM 应用,咱们须要识别用户,客户端创建长链接后须要汇报本身的信息,服务器验证经过后将其缓存起来,代表该用户在线。
客户端是一个一个的个体,服务器做为中转,好比,A 给 B 发送消息,A 先把消息发送到服务器,并告诉服务器这条消息要发给谁,而后服务器把消息发送给 B。
服务器在收到消息后能够对消息进行存储,若是 B 不在线,就等 B 上线后再将消息发送过去。
新建一个项目
添加 netty 依赖
implementation 'io.netty:netty-all:4.1.9.Final'
复制代码
netty 已经出了 5.x 的测试版,为了稳定,咱们使用最新稳定版。
// 修改成本身的主机和端口
private static final String HOST = "10.240.78.82";
private static final int PORT = 8300;
private SocketChannel socketChannel;
NioEventLoopGroup group = new NioEventLoopGroup();
new Bootstrap()
.channel(NioSocketChannel.class)
.group(group)
.option(ChannelOption.TCP_NODELAY, true) // 不延迟,直接发送
.option(ChannelOption.SO_KEEPALIVE, true) // 保持长链接状态
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 30, 0));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new ChannelHandle());
}
})
.connect(new InetSocketAddress(HOST, PORT))
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
// 链接成功
socketChannel = (SocketChannel) future.channel();
} else {
Log.e(TAG, "connect failed");
// 这里必定要关闭,否则一直重试会引起OOM
future.channel().close();
group.shutdownGracefully();
}
});
复制代码
LoginInfo loginInfo = new LoginInfo();
loginInfo.setAccount(account);
loginInfo.setToken(token);
CMessage loginMsg = new CMessage();
loginMsg.setFrom(account);
loginMsg.setType(MsgType.LOGIN);
loginMsg.setContent(loginInfo.toJson());
socketChannel.writeAndFlush(loginMsg.toJson())
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
// 发送成功,等待服务器响应
} else {
// 发送成功
close(); // 关闭链接,节约资源
}
});
复制代码
private class ChannelHandle extends SimpleChannelInboundHandler<String> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
// 链接失效
PushService.this.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
// 空闲了,发个心跳吧
CMessage message = new CMessage();
message.setFrom(myInfo.getAccount());
message.setType(MsgType.PING);
ctx.writeAndFlush(message.toJson());
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Gson gson = new Gson();
CMessage message = gson.fromJson(msg, CMessage.class);
if (message.getType() == MsgType.LOGIN) {
// 服务器返回登陆结果
} else if (message.getType() == MsgType.PING) {
Log.d(TAG, "receive ping from server");
// 收到服务器回应的心跳
} else if (message.getType() == MsgType.TEXT) {
Log.d(TAG, "receive text message " + message.getContent());
// 收到消息
}
ReferenceCountUtil.release(msg);
}
}
复制代码
这些代码要长期在后台执行,所以咱们放在 Service 中。
新建一个 Android Library 模块做为服务端,添加一样的依赖
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.TCP_NODELAY, true) // 不延迟,直接发送
.childOption(ChannelOption.SO_KEEPALIVE, true) // 保持长链接状态
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new NettyServerHandler());
}
})
.bind(port)
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
System.out.println("netty server start");
} else {
System.out.println("netty server start failed");
}
});
复制代码
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// Channel失效,从Map中移除
NettyChannelMap.remove(ctx.channel());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
Gson gson = new Gson();
CMessage message = gson.fromJson(msg, CMessage.class);
if (message.getType() == MsgType.PING) {
System.out.println("received ping from " + message.getFrom());
// 收到 Ping,回应一下
Channel channel = NettyChannelMap.get(message.getFrom());
if (channel != null) {
channel.writeAndFlush(message.toJson());
}
} else if (message.getType() == MsgType.LOGIN) {
// 用户登陆
LoginInfo loginInfo = gson.fromJson(message.getContent(), LoginInfo.class);
if (UserManager.get().verify(loginInfo)) {
loginInfo.setCode(200);
loginInfo.setMsg("success");
message.setContent(loginInfo.toJson());
ctx.channel().writeAndFlush(message.toJson());
NettyChannelMap.add(loginInfo.getAccount(), ctx.channel());
System.out.println(loginInfo.getAccount() + " login");
} else {
loginInfo.setCode(400);
loginInfo.setMsg("用户名或密码错误");
message.setContent(loginInfo.toJson());
ctx.channel().writeAndFlush(message.toJson());
}
} else if (message.getType() == MsgType.TEXT) {
// 发送消息
Channel channel = NettyChannelMap.get(message.getTo());
if (channel != null) {
channel.isWritable();
channel.writeAndFlush(message.toJson()).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
System.out.println("send msg to " + message.getTo() + " failed");
}
});
}
}
ReferenceCountUtil.release(msg);
}
}
复制代码
已登陆的用户缓存在 NettyChannelMap 中。
这里能够加入离线消息缓存逻辑,若是消息发送失败,须要缓存起来,等待用户上线后再发送。
若是服务端在本机运行,须要和客户端在同一个局域网,若是是在公网运行则不须要。
只看上面的代码可能仍是有点懵逼,建议你们跑一下源码,会对 netty 有一个更清晰的认识。 github.com/wangchenyan…
今天咱们一块儿认识了 netty,并使用 netty 实现了一个简单的 IM 应用。这里咱们仅仅实现了 IM 核心功能,其余好比保活机制、断线重连不在本文讨论范围以内。
咱们今天实现的长链接和第三方长链接服务商提供的长链接服务其实并没有太大差别,无非是后者具备成熟的保活、短线重连机制。
读完本文,是否以为长链接其实也没那么神秘?
可是不要骄傲,咱们今天学习的只是最简单的用法,这只是皮毛,要想彻底了解其中的原理仍是要花费不少功夫的。
迁移自个人简书 2017.12.27