借鉴以前的Pipeline的操做方式,如今目标是给串口读取操做也使用上Pipeline。稍微改造一下,如下代码能够直接运行。html
协议为使用连续的4个0XFF做为结尾,没有头标志。数据总长为68位定长。c#
我须要判断从开始到选定位置是否长度足够,这里面用来判断segment
长度我用了这个方式。async
buffer.Slice(0, start.Value).Length >= 64
其实最先不是使用这个东西的,而是使用的SequencePosition的GetInteger()方法,获取到了位置的index,天然就知道了长度等信息,并且很是方便进行截取操做。但是在使用的时候,发现一个很是诡异的问题:经过这个方法获取到的index值要大于Buffer的总长度。Slice直接弹出ArgumentOutOfRangeException
,可是不弹出错误,调试的时候很是麻烦。spa
查看这个方法定义的时候,发现签名是这样的:调试
[EditorBrowsable(EditorBrowsableState.Never)] public int GetInteger();
这个东西第一次见到,VS并不会提示,可是你强行写的话,可以正常编译。看来微软并非很像让咱们看到这个玩意。仔细挖掘一下,发现经过PositionOf
方法得到的SequencePosition
内部引用了一段长度为4096的内存。这个GetInteger()有时候返回的是在这段Memory上面的Index值。
看来这个东西是内部使用的,不太推荐咱们使用。code
咱们只能使用GetPosition
方法来得到相对的位置。不过我用的这个设备,协议是尾部标志,若是使用PostionOf
的话,偏移量得是负数。在尝试了不少次不通以后,发现微软文档中有说到这个潜在问题:blog
buffer.Slice(0, pos)
得到最长的片断,并将内容传输给处理程序进行。总之,不要使用操做ReadOnlySpan
为了解决操做的复杂性,.NET Core 3.0引入了一个
SequenceReader<T>
简化了操做,之后有机会使用的时候在写吧。ip
最后程序以下:
private async void Sp_DataReceived(object sender, SerialDataReceivedEventArgs e) { SerialPort p = sender as SerialPort; byte[] bytes = new byte[1024 * 4]; var dataCount = p.Read(bytes, 0, p.BytesToRead); var span = bytes.AsMemory().Slice(0, dataCount); await FillPipeAsync(span); } private void InitPipe() { Pipe pipe = new Pipe(); writer = pipe.Writer; //Task writing = FillPipeAsync(port, pipe.Writer); Task.Run(() => ReadPipeAsync(pipe.Reader)); //await Task.WhenAll(reading, writing); } private PipeWriter writer; private async Task FillPipeAsync(ReadOnlyMemory<byte> memory) { await writer.WriteAsync(memory); //writer.Advance(memory.Length); // Make the data available to the PipeReader FlushResult result = await writer.FlushAsync(); if (result.IsCompleted) writer.Complete(); } private async Task ReadPipeAsync(PipeReader reader) { while (true) { try { ReadResult result = await reader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer; SequencePosition? start = null; var headBytes = new byte[] { 0xff, 0xff, 0xff, 0xff }; do { // Find the EOL start = buffer.PositionOf(headBytes[0]); if (start != null) { if (buffer.Slice(start.Value).Length >= 4) { var headtoCheck = buffer.Slice(start.Value, 4).ToArray(); if (headtoCheck.SequenceEqual(headBytes)) { if(buffer.Slice(0, start.Value).Length >= 64) { var pos = buffer.GetPosition(4, start.Value); var mes = buffer.Slice(0, pos); DataProcess.Process(mes.ToArray()); var next = buffer.GetPosition(4, start.Value); buffer = buffer.Slice(next); } else { var next = buffer.GetPosition(4, start.Value); buffer = buffer.Slice(next); } } else { var next = buffer.GetPosition(1, start.Value); buffer = buffer.Slice(next); } } else { var next = buffer.GetPosition(1, start.Value); buffer = buffer.Slice(next); } } } while (start != null); // We sliced the buffer until no more data could be processed // Tell the PipeReader how much we consumed and how much we left to process if (result.IsCompleted) { continue; } reader.AdvanceTo(buffer.Start, buffer.End); } catch (ArgumentOutOfRangeException e) { //throw e; } } reader.Complete(); }