bit0chat 是一个基于 Netty 的 IM 即时通信框架java
PS:bit0chat,bit后面没有0,开源中国认为我文章中包含辱骂的信息,想了半天可能只有这个缘由了!git
项目地址: https://github.com/all4you/bit0chat (将0删掉)github
特性:算法
TODO:数据库
bit0chat-example 模块提供了一个服务端与客户端的实现示例,能够参照该示例进行本身的业务实现。promise
要启动服务端,须要获取一个 Server 的实例,能够经过 ServerFactory 来获取。服务器
目前只实现了单机模式下的 Server ,经过 SimpleServerFactory 只须要定义一个端口便可获取一个单机的 Server 实例,以下所示:架构
public class StandaloneServerApplication { public static void main(String[] args) { Server server = SimpleServerFactory.getInstance() .newServer(8864); server.start(); } }
服务端启动成功后,将显示以下信息:框架
目前只实现了直连服务器的客户端,经过 SimpleClientFactory 只须要指定一个 ServerAttr 便可获取一个客户端,而后进行客户端与服务端的链接,以下所示:异步
public class DirectConnectServerClientApplication { public static void main(String[] args) { Client client = SimpleClientFactory.getInstance() .newClient(ServerAttr.getLocalServer(8864)); client.connect(); doClientBiz(client); } }
客户端链接上服务端后,将显示以下信息:
目前客户端提供了三种 Func,分别是:登陆,查看在线用户列表,发送单聊消息,每种 Func 有不一样的命令格式。
经过在客户端中执行如下命令 -lo houyi 123456
便可实现登陆,目前用户中心还未实现,经过 Mock 的方式实现一个假的用户服务,因此输入任何的用户名密码都会登陆成功,而且会为用户建立一个用户id。
登陆成功后,显示以下:
再启动一个客户端,而且也执行登陆,登陆成功后,能够执行 -lu
命令,获取在线用户列表,目前用户是保存在内存中,获取的结果以下所示:
用 gris 这个用户向 houyi 这个用户发送单聊信息,只要执行 -pc 1 hello,houyi
命令便可
其中第二个参数数要发送消息给那个用户的用户id,第三个参数是消息内容
消息发送方,发送完消息:
消息接收方,接收到消息:
客户端和服务端之间维持着心跳,双方都会检查链接是否可用,客户端每隔5s会向服务端发送一个 PingPacket,而服务端接收到这个 PingPacket 以后,会回复一个 PongPacket,这样表示双方都是健康的。
当由于某种缘由,服务端没有收到客户端发送的消息,服务端将会把该客户端的链接断开,一样的客户端也会作这样的检查。
当客户端与服务端之间的链接断开以后,将会触发客户端 HealthyChecker 的 channelInactive 方法,从而进行客户端的断线重连。
单机版的架构只涉及到服务端、客户端,另外有二者之间的协议层,以下图所示:
除了服务端和客户端以外,还有三大中心:消息中心,用户中心,连接中心。
单机版没法作到高可用,性能与可服务的用户数也有必定的限制,因此须要有可扩展的集群版,集群版在单机版的基础上增长了一个路由层,客户端经过路由层来得到可用的服务端地址,而后与服务端进行通信,以下图所示:
客户端发送消息给另外一个用户,服务端接收到这个请求后,从 Connection中心中获取目标用户“挂”在哪一个服务端下,若是在本身名下,那最简单直接将消息推送给目标用户便可,若是在其余服务端,则须要将该请求转交给目标服务端,让目标服务端将消息推送给目标用户。
经过一个自定义协议来实现服务端与客户端之间的通信,协议中有以下几个字段:
* * <p> * The structure of a Packet is like blow: * +----------+----------+----------------------------+ * | size | value | intro | * +----------+----------+----------------------------+ * | 1 bytes | 0xBC | magic number | * | 1 bytes | | serialize algorithm | * | 4 bytes | | packet symbol | * | 4 bytes | | content length | * | ? bytes | | the content | * +----------+----------+----------------------------+ * </p> *
每一个字段的含义
所占字节 | 用途 |
---|---|
1 | 魔数,默认为 0xBC |
1 | 序列化的算法 |
4 | Packet 的类型 |
4 | Packet 的内容长度 |
? | Packet 的内容 |
序列化算法将会决定该 Packet 在编解码时,使用何种序列化方式。
Packet 的类型将会决定到达服务端的字节流将被反序列化为什么种 Packet,也决定了该 Packet 将会被哪一个 PacketHandler 进行处理。
内容长度将会解决 Packet 的拆包与粘包问题,服务端在解析字节流时,将会等到字节的长度达到内容的长度时,才进行字节的读取。
除此以外,Packet 中还会存储一个 sync 字段,该字段将指定服务端在处理该 Packet 的数据时是否须要使用异步的业务线程池来处理。
服务端与客户端各自维护了一个健康检查的服务,即 Netty 为咱们提供的 IdleStateHandler,经过继承该类,而且实现 channelIdle 方法便可实现链接 “空闲” 时的逻辑处理,当出现空闲时,目前咱们只关心读空闲,咱们既能够认为这条连接出现问题了。
那么只须要在连接出现问题时,将这条连接关闭便可,以下所示:
public class IdleStateChecker extends IdleStateHandler { private static final int DEFAULT_READER_IDLE_TIME = 15; private int readerTime; public IdleStateChecker(int readerIdleTime) { super(readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime, 0, 0, TimeUnit.SECONDS); readerTime = readerIdleTime == 0 ? DEFAULT_READER_IDLE_TIME : readerIdleTime; } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { log.warn("[{}] Hasn't read data after {} seconds, will close the channel:{}", IdleStateChecker.class.getSimpleName(), readerTime, ctx.channel()); ctx.channel().close(); } }
另外,客户端须要额外再维护一个健康检查器,正常状况下他负责定时向服务端发送心跳,当连接的状态变成 inActive 时,该检查器将负责进行重连,以下所示:
public class HealthyChecker extends ChannelInboundHandlerAdapter { private static final int DEFAULT_PING_INTERVAL = 5; private Client client; private int pingInterval; public HealthyChecker(Client client, int pingInterval) { Assert.notNull(client, "client can not be null"); this.client = client; this.pingInterval = pingInterval <= 0 ? DEFAULT_PING_INTERVAL : pingInterval; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); schedulePing(ctx); } private void schedulePing(ChannelHandlerContext ctx) { ctx.executor().schedule(() -> { Channel channel = ctx.channel(); if (channel.isActive()) { log.debug("[{}] Send a PingPacket", HealthyChecker.class.getSimpleName()); channel.writeAndFlush(new PingPacket()); schedulePing(ctx); } }, pingInterval, TimeUnit.SECONDS); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ctx.executor().schedule(() -> { log.info("[{}] Try to reconnecting...", HealthyChecker.class.getSimpleName()); client.connect(); }, 5, TimeUnit.SECONDS); ctx.fireChannelInactive(); } }
咱们知道,Netty 中维护着两个 IO 线程池,一个 boss 主要负责连接的创建,另一个 worker 主要负责连接上的数据读写,咱们不该该使用 IO 线程来处理咱们的业务,由于这样极可能会对 IO 线程形成阻塞,致使新连接没法及时创建或者数据没法及时读写。
为了解决这个问题,咱们须要在业务线程池中来处理咱们的业务逻辑,可是这并非绝对的,若是咱们要执行的逻辑很简单,不会形成太大的阻塞,则能够直接在 IO 线程中处理,好比客户端发送一个 Ping 服务端回复一个 Pong,这种状况是没有必要在业务线程池中进行处理的,由于处理完了最终仍是要交给 IO 线程去写数据。可是若是一个业务逻辑须要查询数据库或者读取文件,这每每比较耗时间,因此就须要将这些操做封装起来交给业务线程池去处理。
服务端容许客户端在传输的 Packet 中指定采用何种方式进行业务的处理,服务端在将字节流解码成 Packet 以后,会根据 Packet 中的 sync 字段的值,肯定怎样对该 Packet 进行处理,以下所示:
public class ServerPacketDispatcher extends SimpleChannelInboundHandler<Packet> { @Override public void channelRead0(ChannelHandlerContext ctx, Packet request) { // if the packet should be handled async if (request.getAsync() == AsyncHandle.ASYNC) { EventExecutor channelExecutor = ctx.executor(); // create a promise Promise<Packet> promise = new DefaultPromise<>(channelExecutor); // async execute and get a future Future<Packet> future = executor.asyncExecute(promise, ctx, request); future.addListener(new GenericFutureListener<Future<Packet>>() { @Override public void operationComplete(Future<Packet> f) throws Exception { if (f.isSuccess()) { Packet response = f.get(); writeResponse(ctx, response); } } }); } else { // sync execute and get the response packet Packet response = executor.execute(ctx, request); writeResponse(ctx, response); } } }
bit0chat 除了能够做为 IM 框架以外,还能够做为一个通用的通信框架。
Packet 做为通信的载体,经过继承 AbstractPacket 便可快速实现本身的业务,搭配 PacketHandler 做为数据处理器便可实现客户端与服务端的通信。