介绍部分参考博客:TPL DataFlow初探(一)
侵权请联系删除html
官方解释:git
TPL(任务并行库) 数据流库向具备高吞吐量和低滞后时间的占用大量 CPU 和 I/O 操做的应用程序的并行化和消息传递提供了基础。 它还能显式控制缓存数据的方式以及在系统中移动的方式。传统编程模型一般须要使用回调和同步对象(例如锁)来协调任务和访问共享数据。在数据流模型下,您能够声明当数据可用时的处理方式,以及数据之间的全部依赖项。 因为运行时管理数据之间的依赖项,所以一般能够避免这种要求来同步访问共享数据。 此外,由于运行时计划基于数据的异步到达,因此数据流能够经过有效管理基础线程提升响应能力和吞吐量。
借助于异步消息传递与管道,它能够提供比线程池更好的控制,也比手工线程方式具有更好的性能。咱们经常能够消息传递,生产-消费模式或Actor-Agent模式中使用。在TDF是构建于Task Parallel Library (TPL)之上的,它是咱们开发高性能,高并发的应用程序的又一利器。github
TDP的主要做用就是Buffering Data和Processing Data,在TDF中,有两个很是重要的接口,ISourceBlock<T> 和ITargetBlock<T>接口。继承于ISourceBlock<T>的对象时做为提供数据的数据源对象-生产者,而继承于ITargetBlock<T>接口类主要是扮演目标对象-消费者。在这个类库中,System.Threading.Tasks.Dataflow名称空间下,提供了不少以Block名字结尾的类,ActionBlock,BufferBlock,TransformBlock,BroadcastBlock等9个Block,咱们在开发中一般使用单个或多个Block组合的方式来实现一些功能。web
BufferBlock是TDF中最基础的Block。BufferBlock提供了一个有界限或没有界限的Buffer,该Buffer中存储T。该Block很像BlockingCollection<T>。能够用过Post往里面添加数据,也能够经过Receive方法阻塞或异步的的获取数据,数据处理的顺序是FIFO的。它也能够经过Link向其余Block输出数据。编程
private static BufferBlock<int> m_buffer = new BufferBlock<int>(); // Producer private static void Producer() { while(true) { int item = Produce(); m_buffer.Post(item); } } // Consumer private static void Consumer() { while(true) { int item = m_buffer.Receive(); Process(item); } } // Main public static void Main() { var p = Task.Factory.StartNew(Producer); var c = Task.Factory.StartNew(Consumer); Task.WaitAll(p,c); }
ActionBlock实现ITargetBlock,说明它是消费数据的,也就是对输入的一些数据进行处理。它在构造函数中,容许输入一个委托,来对每个进来的数据进行一些操做。若是使用Action(T)委托,那说明每个数据的处理完成须要等待这个委托方法结束,若是使用了Func<TInput, Task>)来构造的话,那么数据的结束将不是委托的返回,而是Task的结束。默认状况下,ActionBlock会FIFO的处理每个数据,并且一次只能处理一个数据,一个处理完了再处理第二个,但也能够经过配置来并行的执行多个数据。缓存
public ActionBlock<int> abSync = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now); } ); public void TestSync() { for (int i = 0; i < 10; i++) { abSync.Post(i); } Console.WriteLine("Post finished"); }
TransformBlock是TDF提供的另外一种Block,顾名思义它经常在数据流中充当数据转换处理的功能。在TransformBlock内部维护了2个Queue,一个InputQueue,一个OutputQueue。InputQueue存储输入的数据,而经过Transform处理之后的数据则放在OutputQueue,OutputQueue就好像是一个BufferBlock。最终咱们能够经过Receive方法来阻塞的一个一个获取OutputQueue中的数据。TransformBlock的Completion.Wait()方法只有在OutputQueue中的数据为0的时候才会返回。服务器
举个例子,咱们有一组网址的URL,咱们须要对每一个URL下载它的HTML数据并存储。那咱们经过以下的代码来完成:网络
public TransformBlock<string, string> tbUrl = new TransformBlock<string, string>((url) => { WebClient webClient = new WebClient(); return webClient.DownloadString(new Uri(url)); } public void TestDownloadHTML() { tbUrl.Post("www.baidu.com"); tbUrl.Post("www.sina.com.cn"); string baiduHTML = tbUrl.Receive(); string sinaHTML = tbUrl.Receive(); }
其余的Block请参考上述博客架构
咱们须要采集三个通道的数据$x$,$y$,$z$. 而后使用$x$,$y$,$z$组合通过以下公式计算获得一个结果$m$并发
$$m=x*y+z $$
而后对$m$数列作中值滤波,每5个数值求中间值,生成一个最终的值。
获得最终值以后,在界面中显示波形,存二进制文件,经过网络发送到数据服务器。
其流式处理图以下:
按照C# Dataflow的思想,流式处理的各个节点可使用的Block以下图:
细心的人能够看到,最后的业务处理前面加上了一个BroadCastBlock
,是为了同时给三个业务分发消息。
本博客采用WPF窗体框架,界面以下
因为手上确实没有合适的板卡作测试,我就用三个Task模拟数据生成,而后放入三个ConcurrentQueue
.
代码以下:
/// <summary> /// 通道1队列 /// </summary> private readonly ConcurrentQueue<double> _queue1 = new ConcurrentQueue<double>(); /// <summary> /// 通道2队列 /// </summary> private readonly ConcurrentQueue<double> _queue2 = new ConcurrentQueue<double>(); /// <summary> /// 通道3队列 /// </summary> private readonly ConcurrentQueue<double> _queue3 = new ConcurrentQueue<double>(); /// <summary> /// 生成通道1数据按钮事件 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void GenerateChannel1Button_OnClick(object sender, RoutedEventArgs e) { Task.Factory.StartNew(() => GenerateData(this._queue1)); } /// <summary> /// 生成通道2数据按钮事件 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void GenerateChannel2Button_OnClick(object sender, RoutedEventArgs e) { Task.Factory.StartNew(() => GenerateData(this._queue2)); } /// <summary> /// 生成通道3数据按钮事件 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void GenerateChannel3Button_OnClick(object sender, RoutedEventArgs e) { Task.Factory.StartNew(() => GenerateData(this._queue3)); } /// <summary> /// 生成数据 /// </summary> /// <param name="queue"></param> /// <returns></returns> private async Task GenerateData(ConcurrentQueue<double> queue) { var random = new Random(); while (this._stop) { queue.Enqueue(random.NextDouble() * 10); await Task.Delay(TimeSpan.FromMilliseconds(50)); } }
初始化各类Block
/// <summary> /// 通道1BufferBlock /// </summary> private BufferBlock<double> _bufferBlock1 = new BufferBlock<double>(); /// <summary> /// 通道2BufferBlock /// </summary> private BufferBlock<double> _bufferBlock2 = new BufferBlock<double>(); /// <summary> /// 通道3BufferBlock /// </summary> private BufferBlock<double> _bufferBlock3 = new BufferBlock<double>(); /// <summary> /// 拼接3个通道JoinBlock /// </summary> private JoinBlock<double, double,double> _joinBlock = new JoinBlock<double, double, double>(); /// <summary> /// 计算M的TransformBlock /// </summary> private TransformBlock<Tuple<double,double,double>, double> _calculateMTransformBlock = new TransformBlock<Tuple<double, double, double>, double>(t => t.Item1 * t.Item2 + t.Item3); /// <summary> /// 每5个m组成一组BatchBlock /// </summary> private BatchBlock<double> _mBatchBlock = new BatchBlock<double>(5); /// <summary> /// m的中值滤波TransformBlock /// </summary> private TransformBlock<double[], double> _mMiddleFilterTransformBlock = new TransformBlock<double[], double>( t => { Array.Sort(t); return t[2]; }); /// <summary> /// 广播mBroadcastBlock /// </summary> private BroadcastBlock<double> _broadcastBlock = new BroadcastBlock<double>(t => t); /// <summary> /// 界面显示ActionBlock /// </summary> private ActionBlock<double> _showPlotActionBlock; /// <summary> /// 写入文件ActionBlock /// </summary> private ActionBlock<double> _writeFileActionBlock; /// <summary> /// 网络上传ActionBlock /// </summary> private ActionBlock<double> _netUpActionBlock;
因为Lambda
须要访问外部变量,则须要在laod
事件中初始化:
//UI显示ActionBlock this._showPlotActionBlock = new ActionBlock<double>(t => { if (this.Datas.Count >= 10000) { this.Datas.RemoveAt(0); } this.Datas.Add(new DataPoint(_xIndex++, t)); Application.Current.Dispatcher.Invoke(() => { Plot?.InvalidatePlot(); }); }, new ExecutionDataflowBlockOptions() { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }); //写入文件 this._writeFileActionBlock = new ActionBlock<double>(t => { this._binaryWriter.Write(t); }); //上传数据,暂时不实现 this._netUpActionBlock = new ActionBlock<double>(t => Console.WriteLine($@"Net upload value: {t}"));
连接这些Block
/// <summary> /// 连接Blocks /// </summary> private void LinkBlocks() { this._bufferBlock1.LinkTo(this._joinBlock.Target1); this._bufferBlock2.LinkTo(this._joinBlock.Target2); this._bufferBlock3.LinkTo(this._joinBlock.Target3); this._joinBlock.LinkTo(this._calculateMTransformBlock); this._calculateMTransformBlock.LinkTo(this._mBatchBlock); this._mBatchBlock.LinkTo(this._mMiddleFilterTransformBlock); this._mMiddleFilterTransformBlock.LinkTo(this._broadcastBlock); this._broadcastBlock.LinkTo(this._showPlotActionBlock); this._broadcastBlock.LinkTo(this._writeFileActionBlock); this._broadcastBlock.LinkTo(this._netUpActionBlock); }
开始测量按钮事件:
this._stop = false; this._fileStream = new FileStream($"{DateTime.Now:yyyy_MM_dd_HH_mm_ss}.dat", FileMode.OpenOrCreate, FileAccess.Write); this._binaryWriter = new BinaryWriter(this._fileStream); Task.Factory.StartNew(async () => { while (!this._stop) { if (this._queue1.Count > 0) { double result; this._queue1.TryDequeue(out result); this._bufferBlock1.Post(result); } else { await Task.Delay(TimeSpan.FromMilliseconds(30)); } } }); Task.Factory.StartNew(async () => { while (!this._stop) { if (this._queue2.Count > 0) { double result; this._queue2.TryDequeue(out result); this._bufferBlock2.Post(result); } else { await Task.Delay(TimeSpan.FromMilliseconds(30)); } } }); Task.Factory.StartNew(async () => { while (!this._stop) { if (this._queue3.Count > 0) { double result; this._queue3.TryDequeue(out result); this._bufferBlock3.Post(result); } else { await Task.Delay(TimeSpan.FromMilliseconds(30)); } } });
结束测量事件
this._stop = true; this._binaryWriter.Flush(); this._binaryWriter.Close(); this._fileStream.Close();
最终的效果:
本文源码已发布到github,地址:https://github.com/spartajet/...