TPL 数据流库向具备高吞吐量和低滞后时间的占用大量 CPU 和 I/O 操做的应用程序的并行化和消息传递提供了基础。 它还能显式控制缓存数据的方式以及在系统中移动的方式。 为了更好地了解数据流编程模型,请考虑一个以异步方式从磁盘加载图像并建立复合图像的应用程序。 传统编程模型一般须要使用回调和同步对象(例如锁)来协调任务和访问共享数据。 经过使用数据流编程模型,您能够从磁盘读取时建立处理图像的数据流对象。 在数据流模型下,您能够声明当数据可用时的处理方式,以及数据之间的全部依赖项。 因为运行时管理数据之间的依赖项,所以一般能够避免这种要求来同步访问共享数据。 此外,由于运行时计划基于数据的异步到达,因此数据流能够经过有效管理基础线程提升响应能力和吞吐量。web
System.Threading.Tasks.Dataflow 命名空间提供基于角色的编程模型,用以支持粗粒度数据流和流水线操做任务的进程内消息传递。TDP的主要做用就是Buffering Data和Processing Data,在TDF中,有两个很是重要的接口,ISourceBlock<T> 和ITargetBlock<T>接口。继承于ISourceBlock<T>的对象时做为提供数据的数据源对象-生产者,而继承于ITargetBlock<T>接口类主要是扮演目标对象-消费者。在这个类库中,System.Threading.Tasks.Dataflow名称空间下,提供了不少以Block名字结尾的类,ActionBlock,BufferBlock,TransformBlock,BroadcastBlock等9个Block,咱们在开发中一般使用单个或多个Block组合的方式来实现一些功能,如下逐个来简单介绍一下。编程
BufferBlock缓存
BufferBlock是TDF中最基础的Block。BufferBlock提供了一个有界限或没有界限的Buffer,该Buffer中存储T。该Block很像BlockingCollection<T>。能够用过Post往里面添加数据,也能够经过Receive方法阻塞或异步的的获取数据,数据处理的顺序是FIFO的。它也能够经过Link向其余Block输出数据。负载均衡
简单的同步的生产者消费者代码示例:异步
private static BufferBlock<int> m_buffer = new BufferBlock<int>(); // Producer private static void Producer() { while(true) { int item = Produce(); m_buffer.Post(item); } } // Consumer private static void Consumer() { while(true) { int item = m_buffer.Receive(); Process(item); } } // Main public static void Main() { var p = Task.Factory.StartNew(Producer); var c = Task.Factory.StartNew(Consumer); Task.WaitAll(p,c); }
ActionBlockasync
ActionBlock实现ITargetBlock,说明它是消费数据的,也就是对输入的一些数据进行处理。它在构造函数中,容许输入一个委托,来对每个进来的数据进行一些操做。若是使用Action(T)委托,那说明每个数据的处理完成须要等待这个委托方法结束,若是使用了Func<TInput, Task>)来构造的话,那么数据的结束将不是委托的返回,而是Task的结束。默认状况下,ActionBlock会FIFO的处理每个数据,并且一次只能处理一个数据,一个处理完了再处理第二个,但也能够经过配置来并行的执行多个数据。函数
先看一个例子:post
public ActionBlock<int> abSync = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now); } ); public void TestSync() { for (int i = 0; i < 10; i++) { abSync.Post(i); } Console.WriteLine("Post finished"); }
可见,ActionBlock是顺序处理数据的,这也是ActionBlock一大特性之一。主线程在往ActionBlock中Post数据之后立刻返回,具体数据的处理是另一个线程来作的。数据是异步处理的,但处理自己是同步的,这样在必定程度上保证数据处理的准确性。下面的例子是使用async和await。性能
public ActionBlock<int> abSync2 = new ActionBlock<int>(async (i) => { await Task.Delay(1000); Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now); }
虽然仍是1秒钟处理一个数据,可是处理数据的线程会有不一样。测试
若是你想异步处理多个消息的话,ActionBlock也提供了一些接口,让你轻松实现。在ActionBlock的构造函数中,能够提供一个ExecutionDataflowBlockOptions的类型,让你定义ActionBlock的执行选项,在下面了例子中,咱们定义了MaxDegreeOfParallelism选项,设置为3。目的的让ActionBlock中的Item最多能够3个并行处理。
public ActionBlock<int> abAsync = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 3 }); public void TestAsync() { for (int i = 0; i < 10; i++) { abAsync.Post(i); } Console.WriteLine("Post finished"); }
运行程序,咱们看见,每3个数据几乎同时处理,而且他们的线程ID也是不同的。
ActionBlock也有本身的生命周期,全部继承IDataflowBlock的类型都有Completion属性和Complete方法。调用Complete方法是让ActionBlock中止接收数据,而Completion属性则是一个Task,是在ActionBlock处理完全部数据时候会执行的任务,咱们可使用Completion.Wait()方法来等待ActionBlock完成全部的任务,Completion属性只有在设置了Complete方法后才会有效。
public void TestAsync() { for (int i = 0; i < 10; i++) { abAsync.Post(i); } abAsync.Complete(); Console.WriteLine("Post finished"); abAsync.Completion.Wait(); Console.WriteLine("Process finished"); }
TransformBlock
TransformBlock是TDF提供的另外一种Block,顾名思义它经常在数据流中充当数据转换处理的功能。在TransformBlock内部维护了2个Queue,一个InputQueue,一个OutputQueue。InputQueue存储输入的数据,而经过Transform处理之后的数据则放在OutputQueue,OutputQueue就好像是一个BufferBlock。最终咱们能够经过Receive方法来阻塞的一个一个获取OutputQueue中的数据。TransformBlock的Completion.Wait()方法只有在OutputQueue中的数据为0的时候才会返回。
举个例子,咱们有一组网址的URL,咱们须要对每一个URL下载它的HTML数据并存储。那咱们经过以下的代码来完成:
public TransformBlock<string, string> tbUrl = new TransformBlock<string, string>((url) => { WebClient webClient = new WebClient(); return webClient.DownloadString(new Uri(url)); } public void TestDownloadHTML() { tbUrl.Post("www.baidu.com"); tbUrl.Post("www.sina.com.cn"); string baiduHTML = tbUrl.Receive(); string sinaHTML = tbUrl.Receive(); }
固然,Post操做和Receive操做能够在不一样的线程中进行,Receive操做一样也是阻塞操做,在OutputQueue中有可用的数据时,才会返回。
TransformManyBlock
TransformManyBlock和TransformBlock很是相似,关键的不一样点是,TransformBlock对应于一个输入数据只有一个输出数据,而TransformManyBlock能够有多个,及能够从InputQueue中取一个数据出来,而后放多个数据放入到OutputQueue中。
TransformManyBlock<int, int> tmb = new TransformManyBlock<int, int>((i) => { return new int[] { i, i + 1 }; }); ActionBlock<int> ab = new ActionBlock<int>((i) => Console.WriteLine(i)); public void TestSync() { tmb.LinkTo(ab); for (int i = 0; i < 4; i++) { tmb.Post(i); } Console.WriteLine("Finished post"); }
BroadcastBlock
BroadcastBlock的做用不像BufferBlock,它是使命是让全部和它相联的目标Block都收到数据的副本,这点从它的命名上面就能够看出来了。还有一点不一样的是,BroadcastBlock并不保存数据,在每个数据被发送到全部接收者之后,这条数据就会被后面最新的一条数据所覆盖。如没有目标Block和BroadcastBlock相连的话,数据将被丢弃。但BroadcastBlock总会保存最后一个数据,无论这个数据是否是被发出去过,若是有一个新的目标Block连上来,那么这个Block将收到这个最后一个数据。
BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; }); ActionBlock<int> displayBlock = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i)); ActionBlock<int> saveBlock = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i)); ActionBlock<int> sendBlock = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i)); public void TestSync() { bb.LinkTo(displayBlock); bb.LinkTo(saveBlock); bb.LinkTo(sendBlock); for (int i = 0; i < 4; i++) { bb.Post(i); } Console.WriteLine("Post finished"); }
若是咱们在Post之后再添加链接Block的话,那些Block就只会收到最后一个数据了。
public void TestSync() { for (int i = 0; i < 4; i++) { bb.Post(i); } Thread.Sleep(5000); bb.LinkTo(displayBlock); bb.LinkTo(saveBlock); bb.LinkTo(sendBlock); Console.WriteLine("Post finished"); }
WriteOnceBlock
若是说BufferBlock是最基本的Block,那么WriteOnceBock则是最最简单的Block。它最多只能存储一个数据,一旦这个数据被发送出去之后,这个数据仍是会留在Block中,但不会被删除或被新来的数据替换,一样全部的接收者都会收到这个数据的备份。
和BroadcastBlock一样的代码,可是结果不同:
WriteOnceBlock<int> bb = new WriteOnceBlock<int>((i) => { return i; }); ActionBlock<int> displayBlock = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i)); ActionBlock<int> saveBlock = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i)); ActionBlock<int> sendBlock = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i)); public void TestSync() { bb.LinkTo(displayBlock); bb.LinkTo(saveBlock); bb.LinkTo(sendBlock); for (int i = 0; i < 4; i++) { bb.Post(i); } Console.WriteLine("Post finished"); }
WriteOnceBock只会接收一次数据。并且始终保留那个数据。
一样使用Receive方法来获取数据也是同样的结果,获取到的都是第一个数据:
public void TestReceive() { for (int i = 0; i < 4; i++) { bb.Post(i); } Console.WriteLine("Post finished"); Console.WriteLine("1st Receive:" + bb.Receive()); Console.WriteLine("2nd Receive:" + bb.Receive()); Console.WriteLine("3rd Receive:" + bb.Receive()); }
BatchBlock
BatchBlock提供了可以把多个单个的数据组合起来处理的功能,如上图。应对有些需求须要固定多个数据才能处理的问题。在构造函数中须要制定多少个为一个Batch,一旦它收到了那个数量的数据后,会打包放在它的OutputQueue中。当BatchBlock被调用Complete告知Post数据结束的时候,会把InputQueue中余下的数据打包放入OutputQueue中等待处理,而无论InputQueue中的数据量是否是知足构造函数的数量。
BatchBlock<int> bb = new BatchBlock<int>(3); ActionBlock<int[]> ab = new ActionBlock<int[]>((i) => { string s = string.Empty; foreach (int m in i) { s += m + " "; } Console.WriteLine(s); }); public void TestSync() { bb.LinkTo(ab); for (int i = 0; i < 10; i++) { bb.Post(i); } bb.Complete(); Console.WriteLine("Finished post"); }
BatchBlock执行数据有两种模式:贪婪模式和非贪婪模式。贪婪模式是默认的。贪婪模式是指任何Post到BatchBlock,BatchBlock都接收,并等待个数满了之后处理。非贪婪模式是指BatchBlock须要等到构造函数中设置的BatchSize个数的Source都向BatchBlock发数据,Post数据的时候才会处理。否则都会留在Source的Queue中。也就是说BatchBlock可使用在每次从N个Source那个收一个数据打包处理或从1个Source那里收N个数据打包处理。这里的Source是指其余的继承ISourceBlock的,用LinkTo链接到这个BatchBlock的Block。
在另外一个构造参数中GroupingDataflowBlockOptions,能够经过设置Greedy属性来选择是否贪婪模式和MaxNumberOfGroups来设置最大产生Batch的数量,若是到达了这个数量,BatchBlock将不会再接收数据。
JoinBlock
JoinBlock一看名字就知道是须要和两个或两个以上的Source Block相链接的。它的做用就是等待一个数据组合,这个组合须要的数据都到达了,它才会处理数据,并把这个组合做为一个Tuple传递给目标Block。举个例子,若是定义了JoinBlock<int, string>类型,那么JoinBlock内部会有两个ITargetBlock,一个接收int类型的数据,一个接收string类型的数据。那只有当两个ITargetBlock都收到各自的数据后,才会放到JoinBlock的OutputQueue中,输出。
JoinBlock<int, string> jb = new JoinBlock<int, string>(); ActionBlock<Tuple<int, string>> ab = new ActionBlock<Tuple<int, string>>((i) => { Console.WriteLine(i.Item1 + " " + i.Item2); }); public void TestSync() { jb.LinkTo(ab); for (int i = 0; i < 5; i++) { jb.Target1.Post(i); } for (int i = 5; i > 0; i--) { Thread.Sleep(1000); jb.Target2.Post(i.ToString()); } Console.WriteLine("Finished post"); }
BatchedJoinBlock
BatchedJoinBlock一看就是BacthBlock和JoinBlick的组合。JoinBlick是组合目标队列的一个数据,而BatchedJoinBlock是组合目标队列的N个数据,固然这个N能够在构造函数中配置。若是咱们定义的是BatchedJoinBlock<int, string>, 那么在最后的OutputQueue中存储的是Tuple<IList<int>, IList<string>>,也就是说最后获得的数据是Tuple<IList<int>, IList<string>>。它的行为是这样的,仍是假设上文的定义,BatchedJoinBlock<int, string>, 构造BatchSize输入为3。那么在这个BatchedJoinBlock种会有两个ITargetBlock,会接收Post的数据。那何时会生成一个Tuple<IList<int>,IList<string>>到OutputQueue中呢,测试下来并非咱们想的须要有3个int数据和3个string数据,而是只要2个ITargetBlock中的数据个数加起来等于3就能够了。3和0,2和1,1和2或0和3的组合都会生成Tuple<IList<int>,IList<string>>到OutputQueue中。能够参看下面的例子:
BatchedJoinBlock<int, string> bjb = new BatchedJoinBlock<int, string>(3); ActionBlock<Tuple<IList<int>, IList<string>>> ab = new ActionBlock<Tuple<IList<int>, IList<string>>>((i) => { Console.WriteLine("-----------------------------"); foreach (int m in i.Item1) { Console.WriteLine(m); }; foreach (string s in i.Item2) { Console.WriteLine(s); }; }); public void TestSync() { bjb.LinkTo(ab); for (int i = 0; i < 5; i++) { bjb.Target1.Post(i); } for (int i = 5; i > 0; i--) { bjb.Target2.Post(i.ToString()); } Console.WriteLine("Finished post"); }
最后剩下的一个数据1,因为没有满3个,因此一直被保留在Target2中。
TDF中最有用的功能之一就是多个Block之间能够组合应用。ISourceBlock能够链接ITargetBlock,一对一,一对多,或多对多。下面的例子就是一个TransformBlock和一个ActionBlock的组合。TransformBlock用来把数据*2,并转换成字符串,而后把数据扔到ActionBlock中,而ActionBlock则用来最后的处理数据打印结果。
public ActionBlock<string> abSync = new ActionBlock<string>((i) => { Thread.Sleep(1000); Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now); } ); public TransformBlock<int, string> tbSync = new TransformBlock<int, string>((i) => { i = i * 2; return i.ToString(); } ); public void TestSync() { tbSync.LinkTo(abSync); for (int i = 0; i < 10; i++) { tbSync.Post(i); } tbSync.Complete(); Console.WriteLine("Post finished"); tbSync.Completion.Wait(); Console.WriteLine("TransformBlock process finished"); }
TDF提供的一些Block,经过对这些Block配置和组合,能够知足不少的数据处理的场景。这一篇将继续介绍与这些Block配置的相关类,和挖掘一些高级功能。
在一些Block的构造函数中,咱们经常能够看见须要你输入DataflowBlockOptions 类型或者它的两个派生类型ExecutionDataflowBlockOptions 和 GroupingDataflowBlockOptions。
DataflowBlockOptions
DataflowBlockOptions有五个属性:BoundedCapacity,CancellationToken,MaxMessagesPerTask,NameFormat和TaskScheduler。
用BoundedCapacity来限定容量
这个属性用来限制一个Block中最多能够缓存数据项的数量,大多数Block都支持这个属性,这个值默认是DataflowBlockOptions.Unbounded = -1,也就是说没有限制。开发人员能够制定这个属性设置数量的上限。那后面的新数据将会延迟。好比说用一个BufferBlock链接一个ActionBlock,若是在ActionBlock上面设置了上限,ActionBlock处理的操做速度比较慢,留在ActionBlock中的数据到达了上限,那么余下的数据将留在BufferBlock中,直到ActionBlock中的数据量低于上限。这种状况经常会发生在生产者生产的速度大于消费者速度的时候,致使的问题是内存愈来愈大,数据操做愈来愈延迟。咱们能够经过一个BufferBlock链接多个ActionBlock来解决这样的问题,也就是负载均衡。一个ActionBlock满了,就会放到另一个ActionBlock中去了。
用CancellationToken来取消操做
TPL中经常使用的类型。在Block的构造函数中放入CancellationToken,Block将在它的整个生命周期中全程监控这个对象,只要在这个Block结束运行(调用Complete方法)前,用CancellationToken发送取消请求,该Block将会中止运行,若是Block中还有没有处理的数据,那么将不会再被处理。
用MaxMessagesPerTask控制公平性
每个Block内部都是异步处理,都是使用TPL的Task。TDF的设计是在保证性能的状况下,尽可能使用最少的任务对象来完成数据的操做,这样效率会高一些,一个任务执行完成一个数据之后,任务对象并不会销毁,而是会保留着去处理下一个数据,直到没有数据处理的时候,Block才会回收掉这个任务对象。可是若是数据来自于多个Source,公平性就很难保证。从其余Source来的数据必需要等到早前的那些Source的数据都处理完了才能被处理。这时咱们就能够经过MaxMessagesPerTask来控制。这个属性的默认值仍是DataflowBlockOptions.Unbounded=-1,表示没有上限。假如这个数值被设置为1的话,那么单个任务只会处理一个数据。这样就会带来极致的公平性,可是将带来更多的任务对象消耗。
用NameFormat来定义Block名称
MSDN上说属性NameFormat用来获取或设置查询块的名称时要使用的格式字符串。
Block的名字Name=string.format(NameFormat, block.GetType ().Name, block.Completion.Id)。因此当咱们输入”{0}”的时候,名字就是block.GetType ().Name,若是咱们数据的是”{1}”,那么名字就是block.Completion.Id。若是是“{2}”,那么就会抛出异常。
用TaskScheduler来调度Block行为
TaskScheduler是很是重要的属性。一样这个类型来源于TPL。每一个Block里面都使用TaskScheduler来调度行为,不管是源Block和目标Block之间的数据传递,仍是用户自定义的执行数据方法委托,都是使用的TaskScheduler。若是没有特别设置的话,将使用TaskScheduler.Default(System.Threading.Tasks.ThreadPoolTaskScheduler)来调度。咱们可使用其余的一些继承于TaskScheduler的类型来设置这个调度器,一旦设置了之后,Block中的全部行为都会使用这个调度器来执行。.Net Framework 4中内建了两个Scheduler,一个是默认的ThreadPoolTaskScheduler,另外一个是用于UI线程切换的SynchronizationContextTaskScheduler。若是你使用的Block设计到UI的话,那可使用后者,这样在UI线程切换上面将更加方便。
.Net Framework 4.5 中,还有一个类型被加入到System.Threading.Tasks名称空间下:ConcurrentExclusiveSchedulerPair。这个类是两个TaskScheduler的组合。它提供两个TaskScheduler:ConcurrentScheduler和ExclusiveScheduler;咱们能够把这两个TaskScheduler构造进要使用的Block中。他们保证了在没有排他任务的时候(使用ExclusiveScheduler的任务),其余任务(使用ConcurrentScheduler)能够同步进行,当有排他任务在运行的时候,其余任务都不能运行。其实它里面就是一个读写锁。这在多个Block操做共享资源的问题上是一个很方便的解决方案。
public ActionBlock<int> readerAB1; public ActionBlock<int> readerAB2; public ActionBlock<int> readerAB3; public ActionBlock<int> writerAB1; public BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; }); public void Test() { ConcurrentExclusiveSchedulerPair pair = new ConcurrentExclusiveSchedulerPair(); readerAB1 = new ActionBlock<int>((i) => { Console.WriteLine("ReaderAB1 begin handling." + " Execute Time:" + DateTime.Now); Thread.Sleep(500); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler }); readerAB2 = new ActionBlock<int>((i) => { Console.WriteLine("ReaderAB2 begin handling." + " Execute Time:" + DateTime.Now); Thread.Sleep(500); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler }); readerAB3 = new ActionBlock<int>((i) => { Console.WriteLine("ReaderAB3 begin handling." + " Execute Time:" + DateTime.Now); Thread.Sleep(500); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ConcurrentScheduler }); writerAB1 = new ActionBlock<int>((i) => { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("WriterAB1 begin handling." + " Execute Time:" + DateTime.Now); Console.ResetColor(); Thread.Sleep(3000); } , new ExecutionDataflowBlockOptions() { TaskScheduler = pair.ExclusiveScheduler }); bb.LinkTo(readerAB1); bb.LinkTo(readerAB2); bb.LinkTo(readerAB3); Task.Run(() => { while (true) { bb.Post(1); Thread.Sleep(1000); } }); Task.Run(() => { while (true) { Thread.Sleep(6000); writerAB1.Post(1); } }); }
用MaxDegreeOfParallelism来并行处理
一般,Block中处理数据都是单线程的,一次只能处理一个数据,好比说ActionBlock中自定义的代理。使用MaxDegreeOfParallelism可让你并行处理这些数据。属性的定义是最大的并行处理个数。若是定义成-1的话,那就是没有限制。用户须要在实际状况中选择这个值的大小,并非越大越好。若是是平行处理的话,还应该考虑是否有共享资源。
TDF中的负载均衡
咱们可使用Block很方便的构成一个生产者消费者的模式来处理数据。当生产者产生数据的速度快于消费者的时候,消费者Block的Buffer中的数据会愈来愈多,消耗大量的内存,数据处理也会延时。这时,咱们能够用一个生产者Block链接多个消费者Block来解决这个问题。因为多个消费者Block必定是并行处理,因此对共享资源的处理必定要作同步处理。
使用BoundedCapacity属性来实现
当链接多个ActionBlock的时候,能够经过设置ActionBlock的BoundedCapacity属性。当第一个满了,就会放到第二个,第二个满了就会放到第三个。
public BufferBlock<int> bb = new BufferBlock<int>(); public ActionBlock<int> ab1 = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine("ab1 handle data" + i + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 }); public ActionBlock<int> ab2 = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine("ab2 handle data" + i + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 }); public ActionBlock<int> ab3 = new ActionBlock<int>((i) => { Thread.Sleep(1000); Console.WriteLine("ab3 handle data:" + i + " Execute Time:" + DateTime.Now); } , new ExecutionDataflowBlockOptions() { BoundedCapacity = 2 }); public void Test() { bb.LinkTo(ab1); bb.LinkTo(ab2); bb.LinkTo(ab3); for (int i = 0; i < 9; i++) { bb.Post(i); } }