BeetleX之TCP服务应用详解

BeetleX.net core平台下的一个开源TCP 通信组件,它不只使用简便还提供了出色性能的支持,能够轻易让你实现上百万级别RPS吞吐的服务应用。组件所提供的基础功能也很是完善,可让你轻易扩展本身的服务应用,如下提组件集成的功能:linux

  • 完善的会话管理机制,能够根据链接状态和相关日志git

  • 专门针对内存池实现的异步流读写,支持标准Stream的同并提供高效的性能github

  • 消息IO合并,广播序列化合并等性能强化功能web

  • 提供简洁的协议扩展规范,轻易实现http,websocket,mqtt等应用通信协议json

  • 支持TLS,让你构建的通信服务更安全可靠安全

扩展的组件

如下是Beetlex扩展的一些功能组件websocket

性能

一开始说组可让你现上百万级别RPS吞吐的服务应用其实一点不假,BeetleX的基础性能有这样的支撑能力;虽然组件不能说是.net core上性能最好的,但在功能和综合性能上绝对很是出色(详细能够https://tfb-status.techempower.com/ 查看测试结果,惋惜这个网站提交的.net core组件比较少,大部都是基于aspcore的通信模块扩展).如下是JSON serialization基础输出的一个测试结果(Plaintext在官方的测试环境一直没办法跑起来....) 网络

在测试中组件只落后于aspcore-rhtx 这是红帽专门针对 .net core编写的linux网络驱动.session

Single query多线程

构建基础TCP应用

组件在构建TCP服务的时候很是简单,主要归功于它提供了完善的Stream读写功能,而这些功能让你彻底不用关心bytes的读写。基于Stream的好处就是能够轻松和第三方序列化的组件进行整合。如下是简单地构建一个Hello服务。

    class Program : ServerHandlerBase
    {
        private static IServer server;
        public static void Main(string[] args)
        {
            server = SocketFactory.CreateTcpServer<Program>();
            //server.Options.DefaultListen.Port =9090;
            //server.Options.DefaultListen.Host = "127.0.0.1";
            server.Open();
            Console.Read();
        }
        public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
        {
            var pipeStream = e.Stream.ToPipeStream();
            if (pipeStream.TryReadLine(out string name))
            {
                Console.WriteLine(name);
                pipeStream.WriteLine("hello " + name);
                e.Session.Stream.Flush();
            }
            base.SessionReceive(server, e);
        }
    }

以上就是一个简单的TCP服务,让以代码正常运行须要引用Beetlex最新版的组件能够在Nuget上找到。以上服务的功能很简单当接收数据后尝试从流中读取一行字符,若是读取成功则把内容写入到流中提交返回。经过以上代码是否是感受写个服务比较简单(可是PipeStream并非线程安全的,因此不能涉及到多线程读写它)

协议处理规则

其实PipeStream处理数据已经很是方便了,那为何还须要制定一个协议处理规范呢?前面已经说了PipeStream并非线程安全的,很容易带来使用上的风险,因此引入协议处理规则来进行一个安全约束的同时能够实现多线程消息处理。组件提供了这样一个接口来规范消息的处理,接口以下:

    public interface IPacket : IDisposable
    {

        EventHandler<EventArgs.PacketDecodeCompletedEventArgs> Completed
        {
            get; set;
        }

        IPacket Clone();

        void Decode(ISession session, System.IO.Stream stream);

        void Encode(object data, ISession session, System.IO.Stream stream);

        byte[] Encode(object data, IServer server);

        ArraySegment<byte> Encode(object data, IServer server, byte[] buffer);
    }

若是你要处理消息对象,则须要实现以上接口(固然这个接口的实现不是必须的,只要把握好PipeStream安全上的控制就好);但实现这接口来处理消息能够带不少好处,能够多消息合并IO,广播消息合并序列化等高效的功能。不过在不了解组件的状况实现这个接口的确也是有些难度的,因此组件提供了一个基础的类FixedHeaderPacket,它是一个抽像类用于描述有个消息头长的信息流处理。

字符消息分包

接下来经过FixedHeaderPacket来实现一个简单的字符分包协议消息;主要在发送消息的时候添加一个大小头用来描述消息的长度(这是在TCP中解决粘包的主要手段)。

    public class StringPacket : BeetleX.Packets.FixedHeaderPacket
    {
        public override IPacket Clone()
        {
            return new StringPacket();
        }

        protected override object OnReader(ISession session, PipeStream stream)
        {
            return stream.ReadString(CurrentSize);
        }
        protected override void OnWrite(ISession session, object data, PipeStream stream)
        {
            stream.Write((string)data);
        }
    }

经过FixedHeaderPacket制定一个分包规则是很是简单的,主要实现读写两个方法。下面便可在服务中引用这个包做为TCP数据流的分析规则:

    class Program : ServerHandlerBase
    {
        private static IServer server;
        public static void Main(string[] args)
        {
            server = SocketFactory.CreateTcpServer<Program,StringPacket>();
            //server.Options.DefaultListen.Port =9090;
            //server.Options.DefaultListen.Host = "127.0.0.1";
            server.Open();
            Console.Read();
        }
        protected override void OnReceiveMessage(IServer server, ISession session, object message)
        {
            Console.WriteLine(message);
            server.Send($"hello {message}", session);
        }
    }

通过分析器包装后,就不再用流来处理数据了,能够直接进行对像的发送处理。

集成Protobuf

处理String并非友好的事情,毕竟没有对象来得直观和操做方便;如下是经过FixedHeaderPacket扩展Protobuf对象传输,如下是针对Protobuf的规则扩展:

    public class ProtobufPacket : BeetleX.Packets.FixedHeaderPacket
    {
        static ProtobufPacket()
        {
            TypeHeader.Register(typeof(ProtobufClientPacket).Assembly);
        }
        public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT);

        public override IPacket Clone()
        {
            return new ProtobufPacket();
        }

        protected override object OnReader(ISession session, PipeStream stream)
        {
            Type type = TypeHeader.ReadType(stream);
            var size = CurrentSize - 4;
            return ProtoBuf.Meta.RuntimeTypeModel.Default.Deserialize(stream, null, type, size);
        }

        protected override void OnWrite(ISession session, object data, PipeStream stream)
        {
            TypeHeader.WriteType(data, stream);
            ProtoBuf.Meta.RuntimeTypeModel.Default.Serialize(stream, data);
        }
    }

使用规则分析器

    class Program : ServerHandlerBase
    {
        private static IServer server;
        public static void Main(string[] args)
        {
            server = SocketFactory.CreateTcpServer<Program, Messages.ProtobufPacket>();
            //server.Options.DefaultListen.Port =9090;
            //server.Options.DefaultListen.Host = "127.0.0.1";
            server.Open();
            Console.Read();
        }
        protected override void OnReceiveMessage(IServer server, ISession session, object message)
        {
            ((Messages.Register)message).DateTime = DateTime.Now;
            server.Send(message, session);
        }
    }

不一样序列化的扩展

既然有了一个Protobuf做为样本,那针对其余序列化的实现就比较简单了

  • json
    public class JsonPacket : BeetleX.Packets.FixedHeaderPacket
    {
        static JsonPacket()
        {
            TypeHeader.Register(typeof(JsonClientPacket).Assembly);
        }
        public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT);

        public override IPacket Clone()
        {
            return new JsonPacket();
        }

        protected override object OnReader(ISession session, PipeStream stream)
        {
            Type type = TypeHeader.ReadType(stream);
            var size = CurrentSize - 4;
            var buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(size);
            stream.Read(buffer, 0, size);
            try
            {
                return SpanJson.JsonSerializer.NonGeneric.Utf8.Deserialize(new ReadOnlySpan<byte>(buffer, 0, size), type);
            }
            finally
            {
                System.Buffers.ArrayPool<byte>.Shared.Return(buffer);
            }
        }

        protected override void OnWrite(ISession session, object data, PipeStream stream)
        {
            TypeHeader.WriteType(data, stream);
            var buffer = SpanJson.JsonSerializer.NonGeneric.Utf8.SerializeToArrayPool(data);
            try
            {
                stream.Write(buffer.Array, buffer.Offset, buffer.Count);
            }
            finally
            {
                System.Buffers.ArrayPool<byte>.Shared.Return(buffer.Array);
            }
        }
    }
  • messagepack
    public class MsgpackPacket : BeetleX.Packets.FixedHeaderPacket
    {
        static MsgpackPacket()
        {
            TypeHeader.Register(typeof(MsgpackClientPacket).Assembly);
        }
        public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT);

        public override IPacket Clone()
        {
            return new MsgpackPacket();
        }

        protected override object OnReader(ISession session, PipeStream stream)
        {
            Type type = TypeHeader.ReadType(stream);
            var size = CurrentSize - 4;
            return MessagePackSerializer.NonGeneric.Deserialize(type, stream, true);
        }

        protected override void OnWrite(ISession session, object data, PipeStream stream)
        {
            TypeHeader.WriteType(data, stream);
            MessagePackSerializer.NonGeneric.Serialize(data.GetType(), stream, data);
        }
    }

更多示例

https://github.com/IKende/BeetleX-Samples

相关文章
相关标签/搜索