TPL DataFlow初探(一)

属性TPL Dataflow是微软面向高并发应用而推出的一个类库。借助于异步消息传递与管道,它能够提供比线程池更好的控制,也比手工线程方式具有更好的性能。咱们经常能够消息传递,生产-消费模式或Actor-Agent模式中使用。在TDF是构建于Task Parallel Library (TPL)之上的,它是咱们开发高性能,高并发的应用程序的又一利器。您能够在NuGet中下载使用,目前最新的版本只支持.net framework 4.5。最先支持.net framework 4.0是做为Microsoft Visual Studio Async CTP中的一部分发布的,你能够在这里下载到。html

TDP的主要做用就是Buffering Data和Processing Data,在TDF中,有两个很是重要的接口,ISourceBlock<T> 和ITargetBlock<T>接口。继承于ISourceBlock<T>的对象时做为提供数据的数据源对象-生产者,而继承于ITargetBlock<T>接口类主要是扮演目标对象-消费者。在这个类库中,System.Threading.Tasks.Dataflow名称空间下,提供了不少以Block名字结尾的类,ActionBlock,BufferBlock,TransformBlock,BroadcastBlock等9个Block,咱们在开发中一般使用单个或多个Block组合的方式来实现一些功能。如下咱们逐个来简单介绍一下。web

 

BufferBlock并发

BufferBlock是TDF中最基础的Block。BufferBlock提供了一个有界限或没有界限的Buffer,该Buffer中存储T。该Block很像BlockingCollection<T>。能够用过Post往里面添加数据,也能够经过Receive方法阻塞或异步的的获取数据,数据处理的顺序是FIFO的。它也能够经过Link向其余Block输出数据。异步

image

 

 

简单的同步的生产者消费者代码示例:async

 
private static BufferBlock<int> m_buffer = new BufferBlock<int>();

// Producer
private static void Producer()
{
    while(true)
    {
        int item = Produce();
        m_buffer.Post(item);
    }
}

// Consumer
private static void Consumer()
{
    while(true)
    {
        int item = m_buffer.Receive();
        Process(item);
    }
}

// Main
public static void Main()
{
    var p = Task.Factory.StartNew(Producer);
    var c = Task.Factory.StartNew(Consumer);
    Task.WaitAll(p,c);
}
 

 

 

 

 

ActionBlock函数

 

ActionBlock实现ITargetBlock,说明它是消费数据的,也就是对输入的一些数据进行处理。它在构造函数中,容许输入一个委托,来对每个进来的数据进行一些操做。若是使用Action(T)委托,那说明每个数据的处理完成须要等待这个委托方法结束,若是使用了Func<TInput, Task>)来构造的话,那么数据的结束将不是委托的返回,而是Task的结束。默认状况下,ActionBlock会FIFO的处理每个数据,并且一次只能处理一个数据,一个处理完了再处理第二个,但也能够经过配置来并行的执行多个数据。高并发

 

image

先看一个例子:post

 
public ActionBlock<int> abSync = new ActionBlock<int>((i) =>
            {
                Thread.Sleep(1000);
                Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
            }
        );

        public void TestSync()
        {
            for (int i = 0; i < 10; i++)
            {
                abSync.Post(i);
            }

            Console.WriteLine("Post finished");
        }
 

 

6{%7WGLQU90CW4[(OF)H6DC

 

可见,ActionBlock是顺序处理数据的,这也是ActionBlock一大特性之一。主线程在往ActionBlock中Post数据之后立刻返回,具体数据的处理是另一个线程来作的。数据是异步处理的,但处理自己是同步的,这样在必定程度上保证数据处理的准确性。下面的例子是使用async和await。性能

public ActionBlock<int> abSync2 = new ActionBlock<int>(async (i) =>
        {
            await Task.Delay(1000);
            Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
        }

U55C4LS4`0SY0O)}[5W]{%C

虽然仍是1秒钟处理一个数据,可是处理数据的线程会有不一样。测试

 

若是你想异步处理多个消息的话,ActionBlock也提供了一些接口,让你轻松实现。在ActionBlock的构造函数中,能够提供一个ExecutionDataflowBlockOptions的类型,让你定义ActionBlock的执行选项,在下面了例子中,咱们定义了MaxDegreeOfParallelism选项,设置为3。目的的让ActionBlock中的Item最多能够3个并行处理。

 
public ActionBlock<int> abAsync = new ActionBlock<int>((i) =>
        {
            Thread.Sleep(1000);
            Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
        }
        , new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 3 });

public void TestAsync()
        {
            for (int i = 0; i < 10; i++)
            {
                abAsync.Post(i);
            }
            Console.WriteLine("Post finished");
        }
 

XVGW}JJK7YY7(%E}])11J7V

 

 

运行程序,咱们看见,每3个数据几乎同时处理,而且他们的线程ID也是不同的。

 

 

ActionBlock也有本身的生命周期,全部继承IDataflowBlock的类型都有Completion属性和Complete方法。调用Complete方法是让ActionBlock中止接收数据,而Completion属性则是一个Task,是在ActionBlock处理完全部数据时候会执行的任务,咱们可使用Completion.Wait()方法来等待ActionBlock完成全部的任务,Completion属性只有在设置了Complete方法后才会有效。

 

 

 
public void TestAsync()
        {
            for (int i = 0; i < 10; i++)
            {
                abAsync.Post(i);
            }
            abAsync.Complete();
            Console.WriteLine("Post finished");
            abAsync.Completion.Wait();
            Console.WriteLine("Process finished");
        }
 

 

 

$WSKZ$6M1`[J7T_W@~Y~WZ3

 

TransformBlock

TransformBlock是TDF提供的另外一种Block,顾名思义它经常在数据流中充当数据转换处理的功能。在TransformBlock内部维护了2个Queue,一个InputQueue,一个OutputQueue。InputQueue存储输入的数据,而经过Transform处理之后的数据则放在OutputQueue,OutputQueue就好像是一个BufferBlock。最终咱们能够经过Receive方法来阻塞的一个一个获取OutputQueue中的数据。TransformBlock的Completion.Wait()方法只有在OutputQueue中的数据为0的时候才会返回。

image

举个例子,咱们有一组网址的URL,咱们须要对每一个URL下载它的HTML数据并存储。那咱们经过以下的代码来完成:

 
public TransformBlock<string, string> tbUrl = new TransformBlock<string, string>((url) =>
        {
            WebClient webClient = new WebClient();
            return webClient.DownloadString(new Uri(url));
        }

        public void TestDownloadHTML()
        {
            tbUrl.Post("www.baidu.com");
            tbUrl.Post("www.sina.com.cn");

            string baiduHTML = tbUrl.Receive();
            string sinaHTML = tbUrl.Receive();
        }
 

固然,Post操做和Receive操做能够在不一样的线程中进行,Receive操做一样也是阻塞操做,在OutputQueue中有可用的数据时,才会返回。

 

TransformManyBlock

TransformManyBlock和TransformBlock很是相似,关键的不一样点是,TransformBlock对应于一个输入数据只有一个输出数据,而TransformManyBlock能够有多个,及能够从InputQueue中取一个数据出来,而后放多个数据放入到OutputQueue中。

image

 

 
TransformManyBlock<int, int> tmb = new TransformManyBlock<int, int>((i) => { return new int[] { i, i + 1 }; });

        ActionBlock<int> ab = new ActionBlock<int>((i) => Console.WriteLine(i));

        public void TestSync()
        {
            tmb.LinkTo(ab);

            for (int i = 0; i < 4; i++)
            {
                tmb.Post(i);
            }

            Console.WriteLine("Finished post");
        }
 

GC(K]J4DB4UKP$S@8C9ZVMV

 

BroadcastBlock

BroadcastBlock的做用不像BufferBlock,它是使命是让全部和它相联的目标Block都收到数据的副本,这点从它的命名上面就能够看出来了。还有一点不一样的是,BroadcastBlock并不保存数据,在每个数据被发送到全部接收者之后,这条数据就会被后面最新的一条数据所覆盖。如没有目标Block和BroadcastBlock相连的话,数据将被丢弃。但BroadcastBlock总会保存最后一个数据,无论这个数据是否是被发出去过,若是有一个新的目标Block连上来,那么这个Block将收到这个最后一个数据。

image

 
        BroadcastBlock<int> bb = new BroadcastBlock<int>((i) => { return i; });

        ActionBlock<int> displayBlock = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i));

        ActionBlock<int> saveBlock = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i));

        ActionBlock<int> sendBlock = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i));

        public void TestSync()
        {
            bb.LinkTo(displayBlock);
            bb.LinkTo(saveBlock);
            bb.LinkTo(sendBlock);

            for (int i = 0; i < 4; i++)
            {
                bb.Post(i);
            }

            Console.WriteLine("Post finished");
        }
 

A][PVWN1@4UMGZ[YTEV$[E9

 

若是咱们在Post之后再添加链接Block的话,那些Block就只会收到最后一个数据了。

 
public void TestSync()
        {
            for (int i = 0; i < 4; i++)
            {
                bb.Post(i);
            }

            Thread.Sleep(5000);

            bb.LinkTo(displayBlock);
            bb.LinkTo(saveBlock);
            bb.LinkTo(sendBlock);
            Console.WriteLine("Post finished");
        }
 

 

AC}VT(NM__HO1@UJ948)$@W

 

WriteOnceBlock

若是说BufferBlock是最基本的Block,那么WriteOnceBock则是最最简单的Block。它最多只能存储一个数据,一旦这个数据被发送出去之后,这个数据仍是会留在Block中,但不会被删除或被新来的数据替换,一样全部的接收者都会收到这个数据的备份。

image

和BroadcastBlock一样的代码,可是结果不同:

 
WriteOnceBlock<int> bb = new WriteOnceBlock<int>((i) => { return i; });

        ActionBlock<int> displayBlock = new ActionBlock<int>((i) => Console.WriteLine("Displayed " + i));

        ActionBlock<int> saveBlock = new ActionBlock<int>((i) => Console.WriteLine("Saved " + i));

        ActionBlock<int> sendBlock = new ActionBlock<int>((i) => Console.WriteLine("Sent " + i));

        public void TestSync()
        {
            bb.LinkTo(displayBlock);
            bb.LinkTo(saveBlock);
            bb.LinkTo(sendBlock);
            for (int i = 0; i < 4; i++)
            {
                bb.Post(i);
            }

            Console.WriteLine("Post finished");
        }
 

@2[203}OL`G6VH2K}}9}DNE

WriteOnceBock只会接收一次数据。并且始终保留那个数据。

一样使用Receive方法来获取数据也是同样的结果,获取到的都是第一个数据:

 
public void TestReceive()
        {
            for (int i = 0; i < 4; i++)
            {
                bb.Post(i);
            }
            Console.WriteLine("Post finished");

            Console.WriteLine("1st Receive:" + bb.Receive());
            Console.WriteLine("2nd Receive:" + bb.Receive());
            Console.WriteLine("3rd Receive:" + bb.Receive());
        }
 

7M5Q]MH5K82OVQ}N]E(J8MV

 

 BatchBlock

 

 image

BatchBlock提供了可以把多个单个的数据组合起来处理的功能,如上图。应对有些需求须要固定多个数据才能处理的问题。在构造函数中须要制定多少个为一个Batch,一旦它收到了那个数量的数据后,会打包放在它的OutputQueue中。当BatchBlock被调用Complete告知Post数据结束的时候,会把InputQueue中余下的数据打包放入OutputQueue中等待处理,而无论InputQueue中的数据量是否是知足构造函数的数量。

 
        BatchBlock<int> bb = new BatchBlock<int>(3);

        ActionBlock<int[]> ab = new ActionBlock<int[]>((i) => 
            {
                string s = string.Empty;

                foreach (int m in i)
                {
                    s += m + " ";
                }
                Console.WriteLine(s);
            });

        public void TestSync()
        {
            bb.LinkTo(ab);

            for (int i = 0; i < 10; i++)
            {
                bb.Post(i);
            }
            bb.Complete();

            Console.WriteLine("Finished post");
        }
 

@D__1{B5V72~T7`AGM74D_0

BatchBlock执行数据有两种模式:贪婪模式和非贪婪模式。贪婪模式是默认的。贪婪模式是指任何Post到BatchBlock,BatchBlock都接收,并等待个数满了之后处理。非贪婪模式是指BatchBlock须要等到构造函数中设置的BatchSize个数的Source都向BatchBlock发数据,Post数据的时候才会处理。否则都会留在Source的Queue中。也就是说BatchBlock可使用在每次从N个Source那个收一个数据打包处理或从1个Source那里收N个数据打包处理。这里的Source是指其余的继承ISourceBlock的,用LinkTo链接到这个BatchBlock的Block。

在另外一个构造参数中GroupingDataflowBlockOptions,能够经过设置Greedy属性来选择是否贪婪模式和MaxNumberOfGroups来设置最大产生Batch的数量,若是到达了这个数量,BatchBlock将不会再接收数据。

 

JoinBlock

image

JoinBlock一看名字就知道是须要和两个或两个以上的Source Block相链接的。它的做用就是等待一个数据组合,这个组合须要的数据都到达了,它才会处理数据,并把这个组合做为一个Tuple传递给目标Block。举个例子,若是定义了JoinBlock<int, string>类型,那么JoinBlock内部会有两个ITargetBlock,一个接收int类型的数据,一个接收string类型的数据。那只有当两个ITargetBlock都收到各自的数据后,才会放到JoinBlock的OutputQueue中,输出。

 

 

 

 
JoinBlock<int, string> jb = new JoinBlock<int, string>();
        ActionBlock<Tuple<int, string>> ab = new ActionBlock<Tuple<int, string>>((i) =>
            {
                Console.WriteLine(i.Item1 + " " + i.Item2);
            });
            
        public void TestSync()
        {
            jb.LinkTo(ab);

            for (int i = 0; i < 5; i++)
            {
                jb.Target1.Post(i);
            }

            for (int i = 5; i > 0; i--)
            {
                Thread.Sleep(1000);
                jb.Target2.Post(i.ToString());
            }

            Console.WriteLine("Finished post");
        }
 

)DNMCJE%H41G[2YBPD%W4%B

 

BatchedJoinBlock

image

BatchedJoinBlock一看就是BacthBlock和JoinBlick的组合。JoinBlick是组合目标队列的一个数据,而BatchedJoinBlock是组合目标队列的N个数据,固然这个N能够在构造函数中配置。若是咱们定义的是BatchedJoinBlock<int, string>, 那么在最后的OutputQueue中存储的是Tuple<IList<int>, IList<string>>,也就是说最后获得的数据是Tuple<IList<int>, IList<string>>。它的行为是这样的,仍是假设上文的定义,BatchedJoinBlock<int, string>, 构造BatchSize输入为3。那么在这个BatchedJoinBlock种会有两个ITargetBlock,会接收Post的数据。那何时会生成一个Tuple<IList<int>,IList<string>>到OutputQueue中呢,测试下来并非咱们想的须要有3个int数据和3个string数据,而是只要2个ITargetBlock中的数据个数加起来等于3就能够了。3和0,2和1,1和2或0和3的组合都会生成Tuple<IList<int>,IList<string>>到OutputQueue中。能够参看下面的例子:

 
BatchedJoinBlock<int, string> bjb = new BatchedJoinBlock<int, string>(3);

        ActionBlock<Tuple<IList<int>, IList<string>>> ab = new ActionBlock<Tuple<IList<int>, IList<string>>>((i) =>
            {
                Console.WriteLine("-----------------------------");

                foreach (int m in i.Item1)
                {
                    Console.WriteLine(m);
                };

                foreach (string s in i.Item2)
                {
                    Console.WriteLine(s);
                };
            });

        public void TestSync()
        {
            bjb.LinkTo(ab);

            for (int i = 0; i < 5; i++)
            {
                bjb.Target1.Post(i);
            }

            for (int i = 5; i > 0; i--)
            {
                bjb.Target2.Post(i.ToString());
            }

            Console.WriteLine("Finished post");
        }
 

GZ}X_[]DM}42_()PXL05A(T

最后剩下的一个数据1,因为没有满3个,因此一直被保留在Target2中。

 

TDF中最有用的功能之一就是多个Block之间能够组合应用。ISourceBlock能够链接ITargetBlock,一对一,一对多,或多对多。下面的例子就是一个TransformBlock和一个ActionBlock的组合。TransformBlock用来把数据*2,并转换成字符串,而后把数据扔到ActionBlock中,而ActionBlock则用来最后的处理数据打印结果。

 

 

 

 

 
public ActionBlock<string> abSync = new ActionBlock<string>((i) =>
        {
            Thread.Sleep(1000);
            Console.WriteLine(i + " ThreadId:" + Thread.CurrentThread.ManagedThreadId + " Execute Time:" + DateTime.Now);
        }
);

        public TransformBlock<int, string> tbSync = new TransformBlock<int, string>((i) =>
            {
                i = i * 2;
                return i.ToString();
            }
        );

        public void TestSync()
        {
            tbSync.LinkTo(abSync);

            for (int i = 0; i < 10; i++)
            {
                tbSync.Post(i);
            }
            tbSync.Complete();
            Console.WriteLine("Post finished");

            tbSync.Completion.Wait();
            Console.WriteLine("TransformBlock process finished");
        }
 

7S`N)T79TI4~0X8${XF8[PB


测试代码能够在这里下载。下一篇将介绍Block的一些配置,来应对一些高级应用。

相关文章
相关标签/搜索