【Parallel】.Net 并行执行程序的使用心得

1、摘要

官方介绍:提供对并行循环和区域的支持。算法

命名空间:using System.Threading.Tasks并发

三个静态方法:Parallel.Invoke,Parallel.For,Parallel.ForEach函数

经常使用到的参数类型:ParallelLoopResult,ParallelOptions,ParallelLoopStateoop

2、参数

咱们先来介绍参数,明白了参数的做用,在选择和调用三个静态方法及其重载,就游刃有余了。spa

一、ParallelLoopResult:提供执行 Parallel 循环的完成状态

属性:pwa

1)public bool IsCompleted { get; } 线程

   若是该循环已运行完成(该循环的全部迭代均已执行,而且该循环没有收到提早结束的请求),则为 true;不然为 false。code

2)public long? LowestBreakIteration { get; } blog

  返回一个表示从中调用 Break 语句的最低迭代的整数继承

用途:判断当并行循环结束时,是否因调用了break方法或stop方法而提早退出并行循环,或全部迭代均已执行。

判断依据:

条件 结果
IsCompleted 运行完成

!IsCompleted &&

LowestBreakIteration==null

使用了Stop语句而提早终止

!IsCompleted &&

LowestBreakIteration!=null

 使用了Break语句而提早终止

 

 

 

 

 

 

 

 

 

 

二、ParallelLoopState:可用来使 Parallel 循环的迭代与其余迭代交互。此类的实例由 Parallel 类提供给每一个循环;不能在您的用户代码中建立实例。

属性:

1)public bool ShouldExitCurrentIteration { get; }

  获取循环的当前迭代是否应基于此迭代或其余迭代发出的请求退出。若是当前迭代应退出,则为 true;不然为 false。

2) public bool IsStopped { get; } 

  获取循环的任何迭代是否已调用 System.Threading.Tasks.ParallelLoopState.Stop。若是任何迭代已中止循环,则为 true;不然为 false。

3) public bool IsExceptional { get; }

  获取循环的任何迭代是否已引起相应迭代未处理的异常。若是引起了未经处理的异常,则为 true;不然为 false。

4) public long? LowestBreakIteration { get; }

  获取从中调用 System.Threading.Tasks.ParallelLoopState.Break 的最低循环迭代。一个表示从中调用 Break 的最低迭代的整数。

方法:(在下面方法介绍时,有进一步的介绍)

1)Break():通知并行循环在执行完当前迭代以后尽快中止执行,可确保低索引步骤完成。且可确保正在执行的迭代继续运行直到完成。

2)Stop():通知并行循环尽快中止执行。对于还没有运行的迭代不能会尝试执行低索引迭代。不保证全部已运行的迭代都执行完。

用途:提前退出并行循环。

说明:

1)不能同时在同一个并行循环中同时使用Break和Stop。

2)Stop比Break更经常使用。break语句用在并行循环中的效果和用在串行循环中不一样。Break用在并行循环中,委托的主体方法在每次迭代的时候被调用,退出委托的主体方法对并行循环的执行没有影响。Stop中止循环比Break快。

三、ParallelOptions:存储用于配置  Parallel 类的方法的操做的选项。

属性

1)public CancellationToken CancellationToken { get; set; }

  获取或设置传播有关应取消操做的通知。

2)public int MaxDegreeOfParallelism { get; set; }

  获取或设置此 ParallelOptions 实例所容许的最大并行度。

3)public TaskScheduler TaskScheduler { get; set; } [没用过,不知道功效]

  获取或设置与此 System.Threading.Tasks.ParallelOptions 实例关联的 System.Threading.Tasks.TaskScheduler

说明:

1)经过设置CancellationToken来取消并行循环,当前正在运行的迭代会执行完,而后抛出System.OperationCanceledException类型的异常。

2)TPL的方法老是会试图利用全部可用内核以达到最好的效果,可是极可能.NET Framework内部使用的启发式算法所获得的注入和使用的线程数比实际须要的多(一般都会高于硬件线程数,这样会更好地支持CPU和I/O混合型的工做负载)。

   一般将最大并行度设置为小于等于逻辑内核数。若是设置为等于逻辑内核数,那么要确保不会影响其余程序的执行。设置为小于逻辑内核数是为了有空闲内核来处理其余紧急的任务。

用途:

1)从循环外部取消并行循环

2)指定并行度

 3、方法介绍

一、Parallel.Invoke

1)public static void Invoke(params Action[] actions);尽量并行执行提供的每一个操做。

    public class Test
    {
        private void Action()
        {
            Thread.Sleep(1000);
            Console.WriteLine("Action :ThreadID-{0}", Thread.CurrentThread.ManagedThreadId);
        }
        private void Action1()
        {
            Thread.Sleep(2000);
            Console.WriteLine("Action1:ThreadID-{0}", Thread.CurrentThread.ManagedThreadId);
        }
        public void Parallel_Invoke()
        {
            Console.WriteLine("开始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                Parallel.Invoke(Action, Action1);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("结束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

运行结果:

注:Action()休眠1s,Action1()休眠2s,执行总耗时为2043ms,能够看作 nvoke方法只有在actions所有执行完才会返回,而且耗时取决于最大耗时的方法。

2)public static void Invoke(ParallelOptions parallelOptions, params Action[] actions);执行所提供的每一个操做,并且尽量并行运行,除非用户取消了操做。

    public class Test
    {
        ParallelOptions parallelOptions = new ParallelOptions();
        private void Action()
        {
            Thread.Sleep(1000);
            //标记取消并行操做
            parallelOptions.CancellationToken = new CancellationToken(true);
            Console.WriteLine("Action :ThreadID-{0}", Thread.CurrentThread.ManagedThreadId);
        }
        private void Action1()
        {
            Thread.Sleep(2000);
            Console.WriteLine("Action1:ThreadID-{0}", Thread.CurrentThread.ManagedThreadId);
        }
        public void Parallel_Invoke()
        {
            Console.WriteLine("开始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                Parallel.Invoke(parallelOptions, Action1, Action, Action1, Action1, Action1);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("结束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

说明:

1)Invoke方法只有在actions所有执行完才会返回。

2)不能保证actions中的全部操做同时执行。好比actions大小为4,但硬件线程数为2,那么同时运行的操做数最多为2。

3)actions中的操做并行的运行且与顺序无关,若编写与运行顺序有关的并发代码,应选择其余方法。

4)若是使用Invoke加载多个操做,多个操做运行时间迥异,总的运行时间以消耗时间最长操做为基准,这会致使不少逻辑内核长时间处于空闲状态。

 二、Parallel.For

 1)public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body); 

    public class Test
    {
        private void Action(int i)
        {
            Console.WriteLine("Action :ThreadID-{0}|i={1}", Thread.CurrentThread.ManagedThreadId, i);
        }
        public void Parallel_For()
        {
            Console.WriteLine("开始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                //fromInclusive: 开始索引(含)。
                //toExclusive: 结束索引(不含)。
                //body: 将为每一个迭代调用一次的委托。
                Parallel.For(0, 5, Action);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("结束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

运行结果:

注:能够看出,方法并非顺序执行

2)public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body);

使用ParallelLoopState.Break() 退出迭代:

    public class Test
    {
        ParallelOptions parallelOptions = new ParallelOptions();

        private void Action(int i, ParallelLoopState parallelLoopState)
        {
            //当执行到 索引等于5时,咱们调用Break()
            if (i > 5)
            {
                parallelLoopState.Break();
            }
            Console.WriteLine("Action :ThreadID-{0}|i={1}", Thread.CurrentThread.ManagedThreadId, i);
        }
        public void Parallel_For()
        {
            Console.WriteLine("开始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                //设置最大并行数为3
                parallelOptions.MaxDegreeOfParallelism = 3;
                Parallel.For(0, 10, parallelOptions, Action);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("结束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

 运行结果:

使用ParallelLoopState.Stop() 退出迭代:

    public class Test
    {
        ParallelOptions parallelOptions = new ParallelOptions();

        private void Action(int i, ParallelLoopState parallelLoopState)
        {
            //当执行到 索引等于5时,咱们调用Stop()
            if (i > 5)
            {
                parallelLoopState.Stop();
            }
            Console.WriteLine("Action :ThreadID-{0}|i={1}", Thread.CurrentThread.ManagedThreadId, i);
        }
        public void Parallel_For()
        {
            Console.WriteLine("开始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                //设置最大并行数为3
                parallelOptions.MaxDegreeOfParallelism = 3;
                Parallel.For(0, 10, parallelOptions, Action);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("结束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

 

运行结果:

注:当使用Break()退出迭代时,程序保证了索引小于5的方法都执行完成,便可确保低索引步骤完成;当使用Stop()退出迭代时,程序执行到索引为5时,就当即退出了(正在进行的迭代方法,会执行完成),不确保低索引执行完成。

3)public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);

    public class Test
    {
        ParallelOptions parallelOptions = new ParallelOptions();

        private void Action(int i, ParallelLoopState parallelLoopState)
        {
            //当执行到 索引等于5时,咱们调用Stop()
            if (i > 5)
            {
                parallelLoopState.Stop();
            }
            Console.WriteLine("Action :ThreadID-{0}|i={1}", Thread.CurrentThread.ManagedThreadId, i);
        }
        private string LocalInit()
        {
            var init = "go";
            Console.WriteLine("LocalInit:ThreadID-{0}|init={1}", Thread.CurrentThread.ManagedThreadId, init);
            return init;
        }
        private void LocalFinally(string x)
        {
            Console.WriteLine("LocalFinally:ThreadID-{0}|result={1}", Thread.CurrentThread.ManagedThreadId, x);
        }
        private string Body(int i, ParallelLoopState parallelLoopState, string x)
        {
            x = x + "_" + i;
            Console.WriteLine("Body:ThreadID-{0}|i={1}|x={2}", Thread.CurrentThread.ManagedThreadId, i, x);
            return x;
        }
        public void Parallel_For()
        {
            Console.WriteLine("开始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                //设置最大并行数为3
                parallelOptions.MaxDegreeOfParallelism = 3;
                //LocalInit: 用于返回每一个任务的本地数据的初始状态的函数委托。
                //Body: 将为每一个迭代调用一次的委托。
                //LocalFinally: 用于对每一个任务的本地状态执行一个最终操做的委托。
                //<TLocal>: 线程本地数据的类型。
                Parallel.For(0, 5, parallelOptions, LocalInit, Body, LocalFinally);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("结束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

运行结果:

注:LocalInit()执行了3次,Body()执行了5次,LocalFinally()执行了3次(这只是执行的一种状况),能够看出在同一线程中,init参数是共享传递的,即LocalInit()=>Body()..n次迭代..Body()=>LoaclFinally()

* localInit只是在每一个 Task/Thread 开始参与到对集合元素的处理时执行一次, 【而不是针对每一个集合元素都执行一次】相似的, localFinally只有在 Task/Thread 完成全部分配给它的任务以后,才被执行一次。
 CLR会为每一个 Task/Thread 维护一个thread - local storage,能够理解为 Task/Thread 在整个执行过程当中的状态。 当一个 Task/Thread 参与到执行中时,localInit中返回的TLocal类型值会被做为这个状态的初始值,随着body的执行,
 这个状态值会被改变,而body的返回类型也是TLocal,意味着每一次body执行结束,会把最新的TLocal值返回给CLR, 而CLR会把这个值设置到 Task/Thread 的thread - local storage上去,从而实现 Task/Thread 状态的更新。
 最后,localFinally能够返回这个状态值,做为 Task/Thread 完成它所负责的全部处理任务后的最终结果。

说明:

1)不支持浮点。

2)没法保证迭代的执行顺序。

3)若是fromInclusive大于或等于toExclusive,方法当即返回而不会执行任何迭代。

4)对于body参数中含有的ParallelLoopState实例,其做用为提前中断并行循环。

5)只有在迭代所有完成之后才会返回结果,不然循环将一直阻塞。

三、Parallel.ForEach 

1)public static ParallelLoopResult ForEach(IEnumerable<TSource> source, Action<TSource> body);

2)public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body);

3)public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body);

用法及其重载和Parallel.For不尽相同(把fromInclusive-toExclusive 换成你想遍历的集合List),就不在这里赘述了(关键是如今好饿啊...要去吃饭了)

4、异常处理

1)异常优先于从循环外部取消和使用Break()方法或Stop()方法提早退出并行循环。

2)并行循环体抛出一个未处理的异常,并行循环就不能再开始新的迭代。

3)默认状况下当某次迭代抛出一个未处理异常,那么正在执行的迭代若是没抛出异常,正在执行的迭代会执行完。
***当全部迭代都执行完(有可能其余的迭代在执行的过程当中也抛出异常),并行循环将在调用它的线程中抛出异常。
***并行循环运行的过程当中,可能有多个迭代抛出异常,因此通常使用AggregateException来捕获异常。AggregateException继承自Exception。
***为了防止仅使用AggregateException未能捕获某些异常,使用AggregateException的同时还要使用Exception。

异常捕获:

try
{
    //Do something
}
catch(AggregateException e)
{
    Foreach(Exception ex in e.InnerExceptions)
    {
        //Do something
    }
}
catch(Exception e)
{
    //Do something
}

5、总结

1.Parallel执行方法组,不是顺序的,和方法位置前后,索引大小无关;

2.迭代所有完成之后才会返回结果(前提没有Break、Stop、Exception),不然循环将一直阻塞;

3.整体耗时通常取决于耗时最长的方法;

4.某一方法出现异常,程序将中止新的迭代,当前正在进行的方法,会继续执行完成,后抛出异常。

5.能够去吃一碗牛肉面了...

相关文章
相关标签/搜索