t-io做者在开源其框架的同时还附带了几个demo,如:简单的hello world、im等。接下来这篇博客将会围绕tio-examples-im-simple-client、tio-examples-im-simple-server展开分析和学习。java
im-simple分红三个maven子项目:nginx
整片博客是围绕着网页版聊天工具而写的。web
首先介绍一下这个聊天系统demo大概交互的逻辑是怎么样子的,以下图:算法
2.代码分析 apache
.. // 定义handler,全部的请求数据所有都由这个handler来处理,decode/encode/handler等等 // 若是您作web开发必定知道dispatcher的概念,这个handler会将数据解码,而后将数据分发给对应的handler处理业务 aioHandler = new ImServerAioHandler(); // listenr 能够在链接上、接收到消息、发送消息后等等回调其内部方法 aioListener = new ImServerAioListener(); // 服务端上下文初始化 serverGroupContext = new ServerGroupContext<>(aioHandler, aioListener); serverGroupContext.setEncodeCareWithChannelContext(true); aioServer = new AioServer<>(serverGroupContext); aioServer.start(ip, port); ..
从初始化的几句代码中能够看出ImServerAioHandler 是聊天系统中的重中之重,咱们来看一下里面的代码浏览器
/.. // 握手请求 handlerMap.put(Command.COMMAND_HANDSHAKE_REQ, new HandshakeReqHandler()); // 受权请求 handlerMap.put(Command.COMMAND_AUTH_REQ, new AuthReqHandler()); // 聊天请求 handlerMap.put(Command.COMMAND_CHAT_REQ, new ChatReqHandler()); // 加入群组请求 handlerMap.put(Command.COMMAND_JOIN_GROUP_REQ, new JoinReqHandler()); // 心跳请求 handlerMap.put(Command.COMMAND_HEARTBEAT_REQ, new HeartbeatReqHandler()); // 关闭链接请求 handlerMap.put(Command.COMMAND_CLOSE_REQ, new CloseReqHandler()); /.. /.. 在这里经过解析消息头以后获取下一步调用哪一个handler来处理业务 public Object handler(ImPacket packet, ChannelContext<ImSessionContext, ImPacket, Object> channelContext) throws Exception { Command command = packet.getCommand(); ImBsHandlerIntf handler = handlerMap.get(command); if (handler != null) { Object obj = handler.handler(packet, channelContext); CommandStat.getCount(command).handled.incrementAndGet(); return obj; } else { CommandStat.getCount(command).handled.incrementAndGet(); log.warn("找不到对应的命令码[{}]处理类", command); return null; } } /..
固然,上面的其实都是业务代码,尚未很明显的看到框架代码,接下来咱们看看怎样发送一条聊天的消息给一个群组。服务器
// protobuf反序列化消息体 ChatReqBody chatReqBody = ChatReqBody.parseFrom(packet.getBody()); // demo这里写死了一些信息,消息发送者 Integer fromId = 111; String fromNick = "test"; // 把消息发送给谁 Integer toId = chatReqBody.getToId(); String toNick = chatReqBody.getToNick(); String toGroup = chatReqBody.getGroup(); // 其实demo的代码这里写的不太好,若是chatReqBody==null,前面就报错了 if (chatReqBody != null) { // 构建消息体响应类 ChatRespBody.Builder builder = ChatRespBody.newBuilder(); builder.setType(chatReqBody.getType()); builder.setText(chatReqBody.getText()); builder.setFromId(fromId); builder.setFromNick(fromNick); builder.setToId(toId); builder.setToNick(toNick); builder.setGroup(toGroup); builder.setTime(SystemTimer.currentTimeMillis()); //一样protobuf序列化 ChatRespBody chatRespBody = builder.build(); byte[] bodybyte = chatRespBody.toByteArray(); //组建即时聊天响应包 ImPacket respPacket = new ImPacket(); respPacket.setCommand(Command.COMMAND_CHAT_RESP); respPacket.setBody(bodybyte); // 若是是对群组发送,直接调用Aio.sendToGroup便可(框架代码) if (Objects.equals(ChatType.CHAT_TYPE_PUBLIC, chatReqBody.getType())) { Aio.sendToGroup(channelContext.getGroupContext(), toGroup, respPacket); } else if (Objects.equals(ChatType.CHAT_TYPE_PRIVATE, chatReqBody.getType())) { // 若是是对单我的发送,也有Aio.sendToUser方法(框架代码) if (toId != null) { Aio.sendToUser(channelContext.getGroupContext(), toId + "", respPacket); } } }
看看,真正使用到框架的代码实际上就一行:Aio.sendToXXX静态方法。websocket
注意看众多handler中,其中有一个JoinReqHandler是在客户端调用加入群组命令的时候执行的:框架
public Object handler(ImPacket packet, ChannelContext<ImSessionContext, ImPacket, Object> channelContext) throws Exception { // .. // 链接上下文绑定群组 Aio.bindGroup(channelContext, group); // .. }
一个简单的bindGroup便可把这个链接绑定到一个组里面,从而经过Aio.sendToGroup便可轻松的给一个组发送消息。而在AuthReqHandler中一样调用了Aio.bindUser方法来绑定有用户链接(这个不贴代码了,贴多了界面太长,让人不想看)。socket
保存user和group,做者使用的是一个带了读写锁的本身封装的DualHashBidiMap (apache工具包里面的,双向map,能够经过key获取value,能够经过value获取key)。
ByteBuffer buffer = ByteBuffer.allocate(allLen); buffer.order(groupContext.getByteOrder()); //这里比较有迷惑性,这个version并不是随便乱定,而是须要根据后面是否使用压缩、序号同步等等标记出来的二进制 buffer.put(ImPacket.VERSION); buffer.put((byte) packet.getCommand().getNumber()); buffer.put(isCompress ? (byte)1 : (byte)0); buffer.putInt(packet.getSynSeq()); buffer.putShort((short)bodyLen);
这里注意,我差点被做者绕进去了,ImPacket.VERSION比较有迷惑性
//这是客户端解析第一个字节的定义,不是浏览器解析规则 //其实做者定义得version本身规定的只占用4个比特位,如:0B00001111,后四位都是用来真正标示版本号 //实际上前面的几位是另外分出来标记是否压缩,是否同步序号等等 byte version = ImPacket.decodeVersion(firstbyte); boolean isCompress = ImPacket.decodeCompress(firstbyte); boolean hasSynSeq = ImPacket.decodeHasSynSeq(firstbyte); boolean is4ByteLength = ImPacket.decode4ByteLength(firstbyte);
同时感谢t-io做者对个人指导 !