t-io 入门篇(三)即时消息发送demo学习

前言

     t-io做者在开源其框架的同时还附带了几个demo,如:简单的hello world、im等。接下来这篇博客将会围绕tio-examples-im-simple-client、tio-examples-im-simple-server展开分析和学习。java

demo项目结构

    im-simple分红三个maven子项目:nginx

  • tio-examples-im-simple-client就是客户端聊天工具项目
  • tio-examples-im-simple-common保存了一些公用的类型定义和utils,
  • tio-examples-im-simple-server包含了聊天系统的服务端和一套附带了nginx的网页聊天工具。

   整片博客是围绕着网页版聊天工具而写的。web

聊天系统

  1. 概述

      首先介绍一下这个聊天系统demo大概交互的逻辑是怎么样子的,以下图:算法

  • demo中web聊天系统使用了websocket协议来和服务端进行通讯。
  • demo中数据交互不管是浏览器端、仍是服务端数据传输方式都使用了proto-buf(这个让我眼前一亮,以前只据说过客户端和服务器端采用proto-buf,demo里面居然使用了js的proto-buf,由于以前接触js这一块很少,第一次看到就给跪了)。

    2.代码分析 apache

  • 服务端初始化demo:
..
// 定义handler,全部的请求数据所有都由这个handler来处理,decode/encode/handler等等
// 若是您作web开发必定知道dispatcher的概念,这个handler会将数据解码,而后将数据分发给对应的handler处理业务
aioHandler = new ImServerAioHandler();
// listenr 能够在链接上、接收到消息、发送消息后等等回调其内部方法
aioListener = new ImServerAioListener();
// 服务端上下文初始化
serverGroupContext = new ServerGroupContext<>(aioHandler, aioListener);

serverGroupContext.setEncodeCareWithChannelContext(true);		
aioServer = new AioServer<>(serverGroupContext);
aioServer.start(ip, port);
..

从初始化的几句代码中能够看出ImServerAioHandler 是聊天系统中的重中之重,咱们来看一下里面的代码浏览器

/..
// 握手请求
handlerMap.put(Command.COMMAND_HANDSHAKE_REQ, new HandshakeReqHandler());
// 受权请求
handlerMap.put(Command.COMMAND_AUTH_REQ, new AuthReqHandler());
// 聊天请求
handlerMap.put(Command.COMMAND_CHAT_REQ, new ChatReqHandler());
// 加入群组请求
handlerMap.put(Command.COMMAND_JOIN_GROUP_REQ, new JoinReqHandler());
// 心跳请求
handlerMap.put(Command.COMMAND_HEARTBEAT_REQ, new HeartbeatReqHandler());
// 关闭链接请求
handlerMap.put(Command.COMMAND_CLOSE_REQ, new CloseReqHandler());
/..

/.. 在这里经过解析消息头以后获取下一步调用哪一个handler来处理业务
public Object handler(ImPacket packet, ChannelContext<ImSessionContext, ImPacket, Object> channelContext) throws Exception
	{
		Command command = packet.getCommand();
		ImBsHandlerIntf handler = handlerMap.get(command);
		if (handler != null)
		{
			Object obj = handler.handler(packet, channelContext);
			CommandStat.getCount(command).handled.incrementAndGet();
			return obj;
		} else
		{
			CommandStat.getCount(command).handled.incrementAndGet();
			log.warn("找不到对应的命令码[{}]处理类", command);
			return null;
		}

	}

/..

固然,上面的其实都是业务代码,尚未很明显的看到框架代码,接下来咱们看看怎样发送一条聊天的消息给一个群组。服务器

  • ChatReqHandler.java是处理发送聊天消息的类:
// protobuf反序列化消息体         
        ChatReqBody chatReqBody = ChatReqBody.parseFrom(packet.getBody());
        // demo这里写死了一些信息,消息发送者
		Integer fromId = 111;
		String fromNick = "test";
        // 把消息发送给谁
		Integer toId = chatReqBody.getToId();
		String toNick = chatReqBody.getToNick();
		String toGroup = chatReqBody.getGroup();
        // 其实demo的代码这里写的不太好,若是chatReqBody==null,前面就报错了
		if (chatReqBody != null)
		{
            // 构建消息体响应类
			ChatRespBody.Builder builder = ChatRespBody.newBuilder();
			builder.setType(chatReqBody.getType());
			builder.setText(chatReqBody.getText());
			builder.setFromId(fromId);
			builder.setFromNick(fromNick);
			builder.setToId(toId);
			builder.setToNick(toNick);
			builder.setGroup(toGroup);
			builder.setTime(SystemTimer.currentTimeMillis());
            //一样protobuf序列化
			ChatRespBody chatRespBody = builder.build();
			byte[] bodybyte = chatRespBody.toByteArray();
            
            //组建即时聊天响应包
			ImPacket respPacket = new ImPacket();
			respPacket.setCommand(Command.COMMAND_CHAT_RESP);

			respPacket.setBody(bodybyte);
            // 若是是对群组发送,直接调用Aio.sendToGroup便可(框架代码)
			if (Objects.equals(ChatType.CHAT_TYPE_PUBLIC, chatReqBody.getType()))
			{
				Aio.sendToGroup(channelContext.getGroupContext(), toGroup, respPacket);
			} else if (Objects.equals(ChatType.CHAT_TYPE_PRIVATE, chatReqBody.getType()))
			{   // 若是是对单我的发送,也有Aio.sendToUser方法(框架代码)
				if (toId != null)
				{
					Aio.sendToUser(channelContext.getGroupContext(), toId + "", respPacket);
				}
			}
		}

     看看,真正使用到框架的代码实际上就一行:Aio.sendToXXX静态方法。websocket

  • 那也许会有人问,框架是怎么知道这个group里面有哪些链接啊?框架怎么知道会有哪些链接的user呢?

     注意看众多handler中,其中有一个JoinReqHandler是在客户端调用加入群组命令的时候执行的:框架

public Object handler(ImPacket packet, ChannelContext<ImSessionContext, ImPacket, Object> channelContext) throws Exception
{
// ..
// 链接上下文绑定群组
Aio.bindGroup(channelContext, group);
// ..

}

     一个简单的bindGroup便可把这个链接绑定到一个组里面,从而经过Aio.sendToGroup便可轻松的给一个组发送消息。而在AuthReqHandler中一样调用了Aio.bindUser方法来绑定有用户链接(这个不贴代码了,贴多了界面太长,让人不想看)。socket

     保存user和group,做者使用的是一个带了读写锁的本身封装的DualHashBidiMap (apache工具包里面的,双向map,能够经过key获取value,能够经过value获取key)。

     总结t-io在聊天系统中的使用方法

  • 建立了一个server端
  • 客户端在鉴权的时候Aio.bindUser
  • 客户端在加入群组的时候Aio.bindGroup
  • 客户端在发送消息时Aio.sendToUser/Aio.sendToGroup便可
  • 固然,只要你实现一下ClientAioHandler的heartbeatPacket方法,框架自己给实现了自动心跳检测,重连也只须要在初始化链接上下文时传入ReconnConf便可。
  • 后面我本身设计config-server的时候,给客户端推送一组服务IP过去也是至关容易的事情啦。
  • 棋牌类游戏交互也相似了,一桌四我的,一个的操做,发送到服务端作算法逻辑校验后,同时推送给其余三我的,好像这样实现起来也不复杂了。而须要关心的仅仅只是server自己的集群问题了。

    demo中我最感兴趣的和我学到的

  • 框架提供的user和group概念很方便
  • ByteBuffer消息头的定义方法和解析,demo里面定义消息头定义得很节省空间
    ByteBuffer buffer = ByteBuffer.allocate(allLen);
    buffer.order(groupContext.getByteOrder());
    //这里比较有迷惑性,这个version并不是随便乱定,而是须要根据后面是否使用压缩、序号同步等等标记出来的二进制
    buffer.put(ImPacket.VERSION);
    buffer.put((byte) packet.getCommand().getNumber());
    buffer.put(isCompress ? (byte)1 : (byte)0);
    buffer.putInt(packet.getSynSeq());
    buffer.putShort((short)bodyLen);

    这里注意,我差点被做者绕进去了,ImPacket.VERSION比较有迷惑性

    //这是客户端解析第一个字节的定义,不是浏览器解析规则
    //其实做者定义得version本身规定的只占用4个比特位,如:0B00001111,后四位都是用来真正标示版本号
    //实际上前面的几位是另外分出来标记是否压缩,是否同步序号等等
    byte version = ImPacket.decodeVersion(firstbyte);
    boolean isCompress = ImPacket.decodeCompress(firstbyte);
    boolean hasSynSeq = ImPacket.decodeHasSynSeq(firstbyte);
    boolean is4ByteLength = ImPacket.decode4ByteLength(firstbyte);

     

  • 消息点对点发送和消息的群组发送性能和稳定性还有待我作作测试,后续在全面了解完t-io框架后准备弄一份关于性能测试的报告出来试试。

     同时感谢t-io做者对个人指导 !

相关文章
相关标签/搜索