System.IO.Pipelines
是一个新的库,旨在简化在.NET中执行高性能IO的过程。它是一个依赖.NET Standard的库,适用于全部.NET实现。html
Pipelines诞生于.NET Core团队,为使Kestrel成为业界最快的Web服务器之一。最初从做为Kestrel内部的实现细节发展成为可重用的API,它在.Net Core 2.1中做为可用于全部.NET开发人员的最高级BCL API(System.IO.Pipelines)提供。git
为了正确解析Stream或Socket中的数据,代码有固定的样板,而且有许多极端状况,为了处理他们,不得不编写难以维护的复杂代码。
实现高性能和正确性,同时也难以处理这种复杂性。Pipelines旨在解决这种复杂性。github
让咱们从一个简单的问题开始吧。咱们想编写一个TCP服务器,它接收来自客户端的用行分隔的消息(由\n
分隔)。(译者注:即一行为一条消息)数组
声明:与全部对性能敏感的工做同样,应在应用程序中测量每一个方案的实际状况。根据您的网络应用程序须要处理的规模,可能不须要在意的各类技术的开销。缓存
在Pipelines以前用.NET编写的典型代码以下所示:安全
async Task ProcessLinesAsync(NetworkStream stream) { var buffer = new byte[1024]; await stream.ReadAsync(buffer, 0, buffer.Length); // 在buffer中处理一行消息 ProcessLine(buffer); }
此代码可能在本地测试时正确工做,但它有几个潜在错误:服务器
ReadAsync
调用可能没有收到整个消息(行尾)。stream.ReadAsync()
返回值中实际填充到buffer
中的数据量。(译者注:即不必定将buffer
填充满)ReadAsync
调用不能处理多条消息。这些是读取流数据时常见的一些缺陷。为了解决这个问题,咱们须要作一些改变:网络
async Task ProcessLinesAsync(NetworkStream stream) { var buffer = new byte[1024]; var bytesBuffered = 0; var bytesConsumed = 0; while (true) { var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered); if (bytesRead == 0) { // EOF 已经到末尾 break; } // 跟踪已缓冲的字节数 bytesBuffered += bytesRead; var linePosition = -1; do { // 在缓冲数据中查找找一个行末尾 linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed); if (linePosition >= 0) { // 根据偏移量计算一行的长度 var lineLength = linePosition - bytesConsumed; // 处理这一行 ProcessLine(buffer, bytesConsumed, lineLength); // 移动bytesConsumed为了跳过咱们已经处理掉的行 (包括\n) bytesConsumed += lineLength + 1; } } while (linePosition >= 0); } }
这一次,这可能适用于本地开发,但一行可能大于1KiB(1024字节)。咱们须要调整输入缓冲区的大小,直到找到新行。数据结构
所以,咱们能够在堆上分配缓冲区去处理更长的一行。咱们从客户端解析较长的一行时,能够经过使用ArrayPool<byte>
避免重复分配缓冲区来改进这一点。并发
async Task ProcessLinesAsync(NetworkStream stream) { byte[] buffer = ArrayPool<byte>.Shared.Rent(1024); var bytesBuffered = 0; var bytesConsumed = 0; while (true) { // 在buffer中计算中剩余的字节数 var bytesRemaining = buffer.Length - bytesBuffered; if (bytesRemaining == 0) { // 将buffer size翻倍 而且将以前缓冲的数据复制到新的缓冲区 var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2); Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length); // 将旧的buffer丢回池中 ArrayPool<byte>.Shared.Return(buffer); buffer = newBuffer; bytesRemaining = buffer.Length - bytesBuffered; } var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining); if (bytesRead == 0) { // EOF 末尾 break; } // 跟踪已缓冲的字节数 bytesBuffered += bytesRead; do { // 在缓冲数据中查找找一个行末尾 linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed); if (linePosition >= 0) { // 根据偏移量计算一行的长度 var lineLength = linePosition - bytesConsumed; // 处理这一行 ProcessLine(buffer, bytesConsumed, lineLength); // 移动bytesConsumed为了跳过咱们已经处理掉的行 (包括\n) bytesConsumed += lineLength + 1; } } while (linePosition >= 0); } }
这段代码有效,但如今咱们正在从新调整缓冲区大小,从而产生更多缓冲区副本。它将使用更多内存,由于根据代码在处理一行行后不会缩缓冲区的大小。为避免这种状况,咱们能够存储缓冲区序列,而不是每次超过1KiB大小时调整大小。
此外,咱们不会增加1KiB的 缓冲区,直到它彻底为空。这意味着咱们最终传递给ReadAsync
愈来愈小的缓冲区,这将致使对操做系统的更多调用。
为了缓解这种状况,咱们将在现有缓冲区中剩余少于512个字节时分配一个新缓冲区:
译者注:这段代码太复杂了,懒得翻译注释了,你们将就看吧
public class BufferSegment { public byte[] Buffer { get; set; } public int Count { get; set; } public int Remaining => Buffer.Length - Count; } async Task ProcessLinesAsync(NetworkStream stream) { const int minimumBufferSize = 512; var segments = new List<BufferSegment>(); var bytesConsumed = 0; var bytesConsumedBufferIndex = 0; var segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) }; segments.Add(segment); while (true) { // Calculate the amount of bytes remaining in the buffer if (segment.Remaining < minimumBufferSize) { // Allocate a new segment segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) }; segments.Add(segment); } var bytesRead = await stream.ReadAsync(segment.Buffer, segment.Count, segment.Remaining); if (bytesRead == 0) { break; } // Keep track of the amount of buffered bytes segment.Count += bytesRead; while (true) { // Look for a EOL in the list of segments var (segmentIndex, segmentOffset) = IndexOf(segments, (byte)'\n', bytesConsumedBufferIndex, bytesConsumed); if (segmentIndex >= 0) { // Process the line ProcessLine(segments, segmentIndex, segmentOffset); bytesConsumedBufferIndex = segmentOffset; bytesConsumed = segmentOffset + 1; } else { break; } } // Drop fully consumed segments from the list so we don't look at them again for (var i = bytesConsumedBufferIndex; i >= 0; --i) { var consumedSegment = segments[i]; // Return all segments unless this is the current segment if (consumedSegment != segment) { ArrayPool<byte>.Shared.Return(consumedSegment.Buffer); segments.RemoveAt(i); } } } } (int segmentIndex, int segmentOffest) IndexOf(List<BufferSegment> segments, byte value, int startBufferIndex, int startSegmentOffset) { var first = true; for (var i = startBufferIndex; i < segments.Count; ++i) { var segment = segments[i]; // Start from the correct offset var offset = first ? startSegmentOffset : 0; var index = Array.IndexOf(segment.Buffer, value, offset, segment.Count - offset); if (index >= 0) { // Return the buffer index and the index within that segment where EOL was found return (i, index); } first = false; } return (-1, -1); }
此代码只是获得不少更加复杂。当咱们正在寻找分隔符时,咱们同时跟踪已填充的缓冲区序列。为此,咱们此处使用List<BufferSegment>
查找新行分隔符时表示缓冲数据。其结果是,ProcessLine
和IndexOf
如今接受List<BufferSegment>
做为参数,而不是一个byte[],offset和count
。咱们的解析逻辑如今须要处理一个或多个缓冲区序列。
咱们的服务器如今处理部分消息,它使用池化内存来减小整体内存消耗,但咱们还须要进行更多更改:
byte[]
和ArrayPool<byte>
的只是普通的托管数组。这意味着不管什么时候咱们执行ReadAsync
或WriteAsync
,这些缓冲区都会在异步操做的生命周期内被固定(以便与操做系统上的本机IO API互操做)。这对GC有性能影响,由于没法移动固定内存,这可能致使堆碎片。根据异步操做挂起的时间长短,池的实现可能须要更改。复杂性已经到了极端(咱们甚至没有涵盖全部案例)。高性能网络应用一般意味着编写很是复杂的代码,以便从系统中得到更高的性能。
System.IO.Pipelines的目标是使这种类型的代码更容易编写。
让咱们来看看这个例子的样子System.IO.Pipelines:
async Task ProcessLinesAsync(Socket socket) { var pipe = new Pipe(); Task writing = FillPipeAsync(socket, pipe.Writer); Task reading = ReadPipeAsync(pipe.Reader); return Task.WhenAll(reading, writing); } async Task FillPipeAsync(Socket socket, PipeWriter writer) { const int minimumBufferSize = 512; while (true) { // 从PipeWriter至少分配512字节 Memory<byte> memory = writer.GetMemory(minimumBufferSize); try { int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None); if (bytesRead == 0) { break; } // 告诉PipeWriter从套接字读取了多少 writer.Advance(bytesRead); } catch (Exception ex) { LogError(ex); break; } // 标记数据可用,让PipeReader读取 FlushResult result = await writer.FlushAsync(); if (result.IsCompleted) { break; } } // 告诉PipeReader没有更多的数据 writer.Complete(); } async Task ReadPipeAsync(PipeReader reader) { while (true) { ReadResult result = await reader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer; SequencePosition? position = null; do { // 在缓冲数据中查找找一个行末尾 position = buffer.PositionOf((byte)'\n'); if (position != null) { // 处理这一行 ProcessLine(buffer.Slice(0, position.Value)); // 跳过 这一行+\n (basically position 主要位置?) buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); } } while (position != null); // 告诉PipeReader咱们以及处理多少缓冲 reader.AdvanceTo(buffer.Start, buffer.End); // 若是没有更多的数据,中止都去 if (result.IsCompleted) { break; } } // 将PipeReader标记为完成 reader.Complete(); }
咱们的行读取器的pipelines版本有2个循环:
FillPipeAsync
从Socket读取并写入PipeWriter。ReadPipeAsync
从PipeReader中读取并解析传入的行。与原始示例不一样,在任何地方都没有分配显式缓冲区。这是管道的核心功能之一。全部缓冲区管理都委托给PipeReader/PipeWriter实现。
这使得使用代码更容易专一于业务逻辑而不是复杂的缓冲区管理。
在第一个循环中,咱们首先调用PipeWriter.GetMemory(int)
从底层编写器获取一些内存; 而后咱们调用PipeWriter.Advance(int)
告诉PipeWriter咱们实际写入缓冲区的数据量。而后咱们调用PipeWriter.FlushAsync()
来提供数据给PipeReader。
在第二个循环中,咱们正在使用PipeWriter最终来自的缓冲区Socket。当调用PipeReader.ReadAsync()
返回时,咱们获得一个ReadResult包含2条重要信息,包括以ReadOnlySequence<byte>
形式读取的数据和bool IsCompleted
,让reader知道writer是否写完(EOF)。在找到行尾(EOL)分隔符并解析该行以后,咱们将缓冲区切片以跳过咱们已经处理过的内容,而后咱们调用PipeReader.AdvanceTo
告诉PipeReader咱们消耗了多少数据。
在每一个循环结束时,咱们完成了reader和writer。这容许底层Pipe释放它分配的全部内存。
除了处理内存管理以外,其余核心管道功能还包括可以在Pipe不实际消耗数据的状况下查看数据。
PipeReader有两个核心API ReadAsync
和AdvanceTo
。ReadAsync
获取Pipe数据,AdvanceTo
告诉PipeReader再也不须要这些缓冲区,以即可以丢弃它们(例如返回到底层缓冲池)。
这是一个http解析器的示例,它在接收Pipe到有效起始行以前读取部分数据缓冲区数据。
该Pipe实现存储了在PipeWriter和PipeReader之间传递的缓冲区的连接列表。PipeReader.ReadAsync暴露一个ReadOnlySequence<T>新的BCL类型,它表示一个或多个ReadOnlyMemory<T>段的视图,相似于Span<T>和Memory<T>提供数组和字符串的视图。
该Pipe内部维护指向reader和writer能够分配或更新它们的数据集合,。SequencePosition表示缓冲区链表中的单个点,可用于有效地对ReadOnlySequence<T>进行切片。
这段实在翻译困难,给出原文
The Pipe internally maintains pointers to where the reader and writer are in the overall set of allocated data and updates them as data is written or read. The SequencePosition represents a single point in the linked list of buffers and can be used to efficiently slice the ReadOnlySequence.
因为ReadOnlySequence<T>能够支持一个或多个段,所以高性能处理逻辑一般基于单个或多个段来分割快速和慢速路径(fast and slow paths?)。
例如,这是一个将ASCII ReadOnlySequence<byte>转换为string如下内容的例程:
string GetAsciiString(ReadOnlySequence<byte> buffer) { if (buffer.IsSingleSegment) { return Encoding.ASCII.GetString(buffer.First.Span); } return string.Create((int)buffer.Length, buffer, (span, sequence) => { foreach (var segment in sequence) { Encoding.ASCII.GetChars(segment.Span, span); span = span.Slice(segment.Length); } }); }
在一个完美的世界中,读取和解析工做是一个团队:读取线程消耗来自网络的数据并将其放入缓冲区,而解析线程负责构建适当的数据结构。一般,解析将比仅从网络复制数据块花费更多时间。结果,读取线程能够轻易地压倒解析线程。结果是读取线程必须减慢或分配更多内存来存储解析线程的数据。为得到最佳性能,在频繁暂停和分配更多内存之间存在平衡。
为了解决这个问题,管道有两个设置来控制数据的流量,PauseWriterThreshold和ResumeWriterThreshold。PauseWriterThreshold决定有多少数据应该在调用PipeWriter.FlushAsync
以前进行缓冲停顿。ResumeWriterThreshold控制reader消耗多少后写入能够恢复。
当Pipe的数据量超过PauseWriterThreshold,PipeWriter.FlushAsync
会异步阻塞。数据量变得低于ResumeWriterThreshold,它会解锁时。两个值用于防止在极限附近发生反复阻塞和解锁。
一般在使用async / await时,会在线程池线程或当前线程上调用continuation SynchronizationContext。
在执行IO时,对执行IO的位置进行细粒度控制很是重要,这样能够更有效地利用CPU缓存,这对于Web服务器等高性能应用程序相当重要。Pipelines公开了一个PipeScheduler肯定异步回调运行位置的方法。这使得调用者能够精确控制用于IO的线程。
实践中的一个示例是在Kestrel Libuv传输中,其中IO回调在专用事件循环线程上运行。
做为制做System.IO.Pipelines的一部分,咱们还添加了许多新的原始BCL类型:
MemoryPool<T>
,IMemoryOwner<T>
,MemoryManager<T>
- .NET Core 1.0添加了ArrayPool<T>
,在.NET Core 2.1中,咱们如今有一个更通用的抽象,适用于任何工做的池Memory<T>
。这提供了一个可扩展点,容许您插入更高级的分配策略以及控制缓冲区的管理方式(例如,提供预先固定的缓冲区而不是纯托管的阵列)。IBufferWriter<T>
- 表示用于写入同步缓冲数据的接收器。(PipeWriter实现这个)ValueTask<T>
自.NET Core 1.1以来就已存在,但在.NET Core 2.1中得到了一些超级权限,容许无分配的等待异步操做。有关详细信息,请参阅https://github.com/dotnet/corefx/issues/27445。API存在于System.IO.Pipelines
nuget包中。
如下是使用管道处理基于行的消息的.NET Core 2.1服务器应用程序的示例(上面的示例)https://github.com/davidfowl/TcpEcho。它应该运行`dotnet run`(或经过在Visual Studio中运行)。它侦听端口8087上的套接字并将收到的消息写入控制台。您可使用netcat或putty等客户端创建与8087的链接,并发送基于行的消息以使其正常工做。
今天Pipelines为Kestrel和SignalR提供支持,咱们但愿看见它做为.NET社区中许多网络库和组件的核心。
PS: 首次翻译英文文章,不足错漏请指出,多谢支持