实现SuperSocket模板协议FixedHeaderReceiveFilter与msgpack结

实现SuperSocket模板协议FixedHeaderReceiveFilter与msgpack结合

在群里有个群友在群里一直在问使用FixedHeaderReceiveFilter与msgpack结合的实现问题,搞了几天都没有搞定,因而就写了这个文章来讲明。但愿能帮助到他。 SuperSocket的内置的经常使用协议实现模版文档连接 msgpack的官网git

SuperSocket 简称为SSc#


首先要定义一下协议数组

/// +-------+---+-------------------------------+
/// |request| l |                               |
/// | name  | e |    request body               |
/// |  (4)  | n |                               |
/// |       |(4)|                               |
/// +-------+---+-------------------------------+

如上说明,request name 占4个字节,这个是用来寻找SS里面的命令对象,根据是根据命令类名称来查找的。 len是表示 request body序列化成byte后的长度。 request body 是表示咱们用msgpack序列化后的内容。缓存

*注意:*request name 和len占用的字节是能够本身定义的session

协议搞懂后,咱们就须要来编写代码了,须要编写以下类:数据结构

  1. MsgPackReceiveFilter要继承FixedHeaderReceiveFilter,做用是实现协议解析
  2. MsgPackReceiveFilterFactory要继承IReceiveFilterFactory,做用是使得server能知道是用哪一个协议解析对象来作协议解析。
  3. MsgPackServer继承AppServer,做用是加载协议解析工厂的。
  4. MsgPackSession做用参考一个Telnet示例
  5. MsgPackCommand做用参考一个Telnet示例,同时实现request body的反序列化
  6. 实现一个命令Test继承MsgPackCommand
  7. MyData一个要传输的数据结构

下面来看下代码的实现app

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using SuperSocket.Common;
using SuperSocket.Facility.Protocol;
using SuperSocket.SocketBase.Protocol;

namespace FixedHeader
{
    /// +-------+---+-------------------------------+
    /// |request| l |                               |
    /// | name  | e |    request body               |
    /// |  (4)  | n |                               |
    /// |       |(4)|                               |
    /// +-------+---+-------------------------------+
    public class MsgPackReceiveFilter : FixedHeaderReceiveFilter<BinaryRequestInfo>
    {
        public MsgPackReceiveFilter() : base(8)
        {
        }

        protected override int GetBodyLengthFromHeader(byte[] header, int offset, int length)
        {
            var headerData = new byte[4];
            Array.Copy(header,offset+4,headerData,0,4);
            return BitConverter.ToInt32(headerData, 0);
        }

        protected override BinaryRequestInfo ResolveRequestInfo(ArraySegment<byte> header, byte[] bodyBuffer, int offset, int length)
        {
            return new BinaryRequestInfo(Encoding.UTF8.GetString(header.Array, header.Offset, 4), bodyBuffer.CloneRange(offset, length));
        }
    }
}

首先咱们看到构造函数base(8)里面的输入了一个8,这个8是协议头的长度,也就是request name 加 len的长度。 而后再看实现了方法GetBodyLengthFromHeader,从名字上看,就能够知道是根据协议头的数据来获取打包的数据的长度。 这个方法有三个参数socket

  1. *byte[] header * 缓存的数据,这个并非单纯只包含协议头的数据
  2. int offset 要取的数据的偏移量,也就是在header里面从offset开始就是咱们从客户的发送过来的数据。
  3. int length 就是我在base(8)这里设置的长度也就是8.

在这里咱们能够取获得协议头的数据,就是在header从偏移量offset开始截取长度为length的部分数组,就是咱们的协议头了。可是咱们的协议头是8位,要取打包数据的长度那么就须要从偏移offset上再加4位,代码就是ide

var headerData = new byte[4];
Array.Copy(header,offset+4,headerData,0,4);

而后再把取到的数据转换成为int类型也就是函数

BitConverter.ToInt32(headerData, 0);

ss就能够根据这个长度来帮助咱们获取到打包的数据。而后传给方法ResolveRequestInfo。咱们须要实现这个方法。这个方法有四个参数:

  1. header 咱们的协议头的数据
  2. bodyBuffer 缓存的数据,这个并非只单纯包含打包数据的
  3. offset 打包数据在bodyBuffer里面开始的位置
  4. int length 打包数据的长度

ResolveRequestInfo 返回的是 FixedHeaderReceiveFilter的一个泛型,这个对象是用于注入实现命令的。咱们这里使用的是BinaryRequestInfo。

Encoding.UTF8.GetString(header.Array, header.Offset, 4)

这个代码是把协议头的前四位转换成为字符串,这字符串是用于查找要执行的命令的。

bodyBuffer.CloneRange(offset, length)

这个代码是截取咱们须要的数据,这个数据也将是会传给执行命令的。

到这里咱们的MsgPackReceiveFilter协议解析已经完成了,而后再实现一个工厂来使得server可以加载到MsgPackReceiveFilter来解析咱们的协议


using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using SuperSocket.SocketBase;
using SuperSocket.SocketBase.Protocol;

namespace FixedHeader
{
    public class MsgPackReceiveFilterFactory : IReceiveFilterFactory<BinaryRequestInfo>
    {
        public IReceiveFilter<BinaryRequestInfo> CreateFilter(IAppServer appServer, IAppSession appSession, IPEndPoint remoteEndPoint)
        {
            return new MsgPackReceiveFilter();
        }
    }
}

MsgPackReceiveFilterFactory的实现相对简单,就是实现接口IReceiveFilterFactory的方法返回一个MsgPackReceiveFilter对象,这个就用多作解释

而后要使得server可以加载到,还须要再实现一个server

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using SuperSocket.SocketBase;
using SuperSocket.SocketBase.Protocol;

namespace FixedHeader
{
    public class MsgPackServer : AppServer<MsgPackSession, BinaryRequestInfo>
    {
        public MsgPackServer() : base(new MsgPackReceiveFilterFactory())
        {
            
        }
    }
}

只要实例化工厂MsgPackReceiveFilterFactory而后传给构造函数就能够了。

而后再实现一个MsgPackSession这个只要继承AppSession就能够了,不用作任何的实现。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using SuperSocket.SocketBase;
using SuperSocket.SocketBase.Protocol;

namespace FixedHeader
{
    public class MsgPackSession : AppSession<MsgPackSession, BinaryRequestInfo>
    {
    }
}

而后再实现一个MsgPackCommand。这个主要是为了把打包发送过来的数据统一反序列列化,这样只要继承MsgPackCommand的类,均可以直接获得想要的对象。

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using MsgPack.Serialization;
using SuperSocket.SocketBase.Command;
using SuperSocket.SocketBase.Protocol;

namespace FixedHeader
{
    public abstract class MsgPackCommand<TMsgPack> : CommandBase<MsgPackSession, BinaryRequestInfo> where TMsgPack : class
    {
        public override void ExecuteCommand(MsgPackSession session, BinaryRequestInfo requestInfo)
        {
            var serializer = SerializationContext.Default.GetSerializer<TMsgPack>();
            using (var stream = new MemoryStream(requestInfo.Body))
            {
                var unpackedObject = serializer.Unpack(stream) as TMsgPack;
                ExecuteCommand(session, unpackedObject);
            }
            
        }

        public abstract void ExecuteCommand(MsgPackSession session, TMsgPack pack);
    }
}

最后再作一个测试的命令和一个数据结构,

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using SuperSocket.SocketBase.Command;
using SuperSocket.SocketBase.Protocol;

namespace FixedHeader
{
    public class Test : MsgPackCommand<MyData>
    {
        public override void ExecuteCommand(MsgPackSession session, MyData pack)
        {
            Console.WriteLine(pack.Name+":"+ pack.Other);
        }
    }
}
namespace FixedHeader
{
    public class MyData
    {
        public string Name { get; set; }

        public string Other { get; set; }
    }
}

到这里就服务端就能够了。接下来还须要实现一个客户端来作简单的测试,这个没有什么好说的,直接上代码:

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using MsgPack.Serialization;

namespace FixedHeaderClient
{
    class Program
    {
        static void Main(string[] args)
        {
            var socket = new Socket(AddressFamily.InterNetwork,SocketType.Stream, ProtocolType.Tcp);
            socket.Connect("127.0.0.1",2012);
            using (var stream = new MemoryStream())
            {
                var serializer = SerializationContext.Default.GetSerializer<MyData>();
                var myData = new MyData()
                {
                    Name = "Test",
                    Other = "abcd"
                };

                serializer.Pack(stream, myData);

                //var commandData = new byte[4];//协议命令只占4位
                var commandData = Encoding.UTF8.GetBytes("Test");//协议命令只占4位,若是占的位数长过协议,那么协议解析确定会出错的

                var dataBody = stream.ToArray();

                var dataLen = BitConverter.GetBytes(dataBody.Length);//int类型占4位,根据协议这里也只能4位,不然会出错

                var sendData = new byte[8+dataBody.Length];//命令加内容长度为8

                // +-------+---+-------------------------------+
                // |request| l |                               |
                // | name  | e |    request body               |
                // |  (4)  | n |                               |
                // |       |(4)|                               |
                // +-------+---+-------------------------------+

                Array.ConstrainedCopy(commandData, 0, sendData, 0, 4);
                Array.ConstrainedCopy(dataLen, 0, sendData, 4,4);
                Array.ConstrainedCopy(dataBody, 0, sendData, 8, dataBody.Length);

                for (int i = 0; i < 1000; i++)
                {
                    socket.Send(sendData);
                }
                
            }

            Console.Read();
        }
    }
}

源码托管

相关文章
相关标签/搜索