使用高性能Pipelines构建.NET通信程序

原文: 使用高性能Pipelines构建.NET通信程序

.NET Standard支持一组新的API,System.Span , System.Memory ,还有System.IO.Pipelines。这几个新的API极大了提高了.NET程序的效能,未来.NET不少基础API都会使用它们进行重写。 html

Pipelines旨在解决.NET编写Socket通讯程序时的不少困难,相信读者也对此不胜其烦,使用stream模型进行编程,就算可以解决,也是实在麻烦。shell

System.IO.Pipelines使用简单的内存片断来管理数据,能够极大的简化编写程序的过程。关于Pipelines的详细介绍,能够看看这里。如今ASP.NET Core中使用的Kestrel已经在使用这个API。(话说这个东西貌似就是Kestrel团队搞出来的。)编程

多是直接须要用Socket场景有限(物联网用的还挺多的),Pipelines相关的资料感受不是不少。官方给出的示例是基于ASCII协议的,有固定结尾的协议,这里我以物联网设备经常使用的BINARY二进制自定义协议为例,讲解基于Pipelines的程序套路。c#

System.IO.Pipelines

与基于Stream的方式不一样,pipelines提供一个pipe,用于存储数据,pipe中间存储的数据有点链表的感受,能够基于SequencePosition进行slice操做,这样就能获得一个ReadOnlySequence<T>对象。reader能够进行自定义操做,并在操做完成以后告诉pipe已经处理了多少数据,整个过程是不须要进行内存复制操做的,所以性能获得了提高,还少了不少麻烦。能够简单理解做为服务器端,流程:数组

接受数据循环:接到数据->放pipe里面->告诉pipe放了多少数据
处理数据循环:在pipe里面找一条完整数据->交给处理流程->告诉pipe处理了多少数据服务器

协议

有一款设备,binary协议,数据包开头0x75, 0xbd, 0x7e, 0x97一共4个字节,随后跟数据包长度2个字节(固定2400字节,不固定长度也能够参照),随后是数据区。在设备链接成功以后,数据主动从设备发送到PC。socket

关键代码

虽然是.NET Core平台的,可是.NET FRAMEWORK 4.6.1上面也能够nuget安装,直接async

install-package system.io.pipelines

进行安装就能够了。Socket相关处理的代码再也不写了,只列关键的。函数

代码第一步是声明pipe。性能

private async void InitPipe(Socket socket)
{
    Pipe pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(socket, pipe.Reader);

    await Task.WhenAll(reading, writing);
}

pipe有reader还有一个writer,reader负责读取pipe数据,主要用在数据处理循环,writer负责将数据写入pipe,主要用在数据接受循环。

//写入循环
private async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    //数据流量比较大,用1M字节做为buffer
    const int minimumBufferSize = 1024 * 1024;

    while (running)
    {
        try
        {
            //从writer中,得到一段很多于指定大小的内存空间
            Memory<byte> memory = writer.GetMemory(minimumBufferSize);

            //将内存空间变成ArraySegment,提供给socket使用
            if (!MemoryMarshal.TryGetArray((ReadOnlyMemory<byte>)memory, out ArraySegment<byte> arraySegment))
            {
                throw new InvalidOperationException("Buffer backed by array was expected");
            }
            //接受数据
            int bytesRead = await SocketTaskExtensions.ReceiveAsync(socket, arraySegment, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }

            //一次接受完毕,数据已经在pipe中,告诉pipe已经给它写了多少数据。
            writer.Advance(bytesRead);
        }
        catch
        {
            break;
        }

        // 提示reader能够进行读取数据,reader能够继续执行readAsync()方法
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

    // 告诉pipe完事了
    writer.Complete();
}

//读取循环
private async Task ReadPipeAsync(Socket socket, PipeReader reader)
{
    while (running)
    {
        //等待writer写数据
        ReadResult result = await reader.ReadAsync();
        //得到内存区域
        ReadOnlySequence<byte> buffer = result.Buffer;
        SequencePosition? position = null;

        do
        {
            //寻找head的第一个字节所在的位置
            position = buffer.PositionOf((byte)0x75);
            if (position != null)
            {
                //因为是连续四个字节做为head,须要进行比对,我这里直接使用了ToArray方法,仍是有了内存拷贝动做,不是很理想,可是写起来很方便。
                //对性能有更高要求的场景,能够进行slice操做后的单独比对,这样不须要内存拷贝动做
                var headtoCheck = buffer.Slice(position.Value, 4).ToArray();
                //SequenceEqual须要引用System.Linq
                if (headtoCheck.SequenceEqual(new byte[] { 0x75, 0xbd, 0x7e, 0x97 }))
                {
                    //到这里,认为找到包开头了(从position.value开始),接下来须要从开头处截取整包的长度,须要先判断长度是否足够
                    if (buffer.Slice(position.Value).Length >= 2400)
                    {
                        //长度足够,那么取出ReadOnlySequence,进行操做
                        var mes = buffer.Slice(position.Value, 2400);
                        //这里是数据处理的函数,能够参考官方文档对ReadOnlySequence进行操做,文档里面使用了span,那样性能会好一些。我这里简单实用ToArray()操做,这样也有了内存拷贝的问题,可是处理的直接是byte数组了。
                        await ProcessMessage(mes.ToArray());
                        //这一段就算是完成了,从开头位置,一整个包的长度就算完成了
                        var next = buffer.GetPosition(2400, position.Value);
                        //将buffer处理过的舍弃,替换为剩余的buffer引用
                        buffer = buffer.Slice(next);
                    }
                    else
                    {
                        //长度不够,说明数据包不完整,等下一波数据进来再拼接,跳出循环。
                        break;
                    }
                }
                else
                {
                    //第一个是0x75可是后面不匹配,可能有数据传输问题,那么须要舍弃第一个,0x75后面的字节开始再从新找0x75
                    var next = buffer.GetPosition(1, position.Value);
                    buffer = buffer.Slice(next);
                }
            }
        }
        while (position != null);

        //数据处理完毕,告诉pipe还剩下多少数据没有处理(数据包不完整的数据,找不到head)
        reader.AdvanceTo(buffer.Start, buffer.End);

        if (result.IsCompleted)
        {
            break;
        }
    }

    reader.Complete();
}

以上代码基本解决了如下问题:

  • 数据接收不完整,找不到开头结尾,致使数据大量丢弃,或者本身维护一个queue的代码复杂性
  • 数据接收与处理的同步问题
  • 一次性收到多条数据的状况

后记

本文只是解释了pipeline处理的模式,对于茫茫多的ToArray方法,能够使用基于Span的操做进行优化(有时间就来填坑)。另外,若是在await ProcessMessage(mes.ToArray());这里,直接使用Task.Run(()=>ProcessMessage(mes);代替的话,实测会出现莫名其妙的问题,颇有多是pipe运行快,在系统调度Task以前,已经将内存释放致使的,若是须要优化这一块的话,须要格外注意。

相关文章
相关标签/搜索