C#并行编程(3):并行循环

初识并行循环

并行循环主要用来处理数据并行的,如,同时对数组或列表中的多个数据执行相同的操做。编程

在C#编程中,咱们使用并行类System.Threading.Tasks.Parallel提供的静态方法Parallel.ForParallel.ForEach来实现并行循环。从方法名能够看出,这两个方法是对常规循环forforeach的并行化。数组

简单用法

使用并行循环时须要传入循环范围(集合)和操做数据的委托Action<T>多线程

Parallel.For(0, 100, i => { Console.WriteLine(i); });

Parallel.ForEach(Enumerable.Range(0, 100), i => { Console.WriteLine(i); });

使用场景

对于数据的处理须要耗费较长时间的循环适宜使用并行循环,利用多线程加快执行速度。oop

对于简单的迭代操做,且迭代范围较小,使用常规循环更好好,由于并行循环涉及到线程的建立、上下文切换和销毁,使用并行循环反而影响执行效率。性能

对于迭代操做简单但迭代范围很大的状况,咱们能够对数据进行分区,再执行并行循环,减小线程数量。测试

循环结果

Parallel.ForParallel.ForEach方法的全部重载有着一样的返回值类型ParallelLoopResult,并行循环结果包含循环是否完成以及最低迭代次数两项信息。优化

下面的例子使用Parallel.ForEach展现了并行循环的结果。pwa

ParallelLoopResult result = Parallel.ForEach(Enumerable.Range(0, 100), (i,loop) =>
{// 委托传入ParallelLoopState,用来控制循环执行
    Console.WriteLine(i + 1);
    Thread.Sleep(100);
    if (i == 30) // 此处设置循环中止的确切条件
    {
        loop.Break();
        //loop.Stop();
    }
});
Console.WriteLine($"{result.IsCompleted}-{result.LowestBreakIteration}");

值得一提的是,循环的Break()Stop()只能尽早地跳出或者中止循环,而不能当即中止。线程

取消循环操做

有时候,咱们须要在中途取消循环操做,但又不知道确切条件是什么,好比用户触发的取消。这时候,能够利用循环的ParallelOptions传入一个CancellationToken,同时使用异常处理捕获OperationCanceledException以进行取消后的处理。下面是一个简单的例子。调试

/// <summary>
/// 取消通知者
/// </summary>
public static CancellationTokenSource CTSource { get; set; } = new CancellationTokenSource();

/// <summary>
/// 取消并行循环
/// </summary>
public static void CancelParallelLoop()
{
    Task.Factory.StartNew(() =>
    {
        try
        {
            Parallel.ForEach(Enumerable.Range(0, 100), new ParallelOptions { CancellationToken = CTSource.Token },
                i =>
                {
                    Console.WriteLine(i + 1);
                    Thread.Sleep(1000);
                });
        }
        catch (OperationCanceledException oce)
        {
            Console.WriteLine(oce.Message);
        }
    });
}
static void Main(string[] args)
{
    ParallelDemo.CancelParallelLoop();
    Thread.Sleep(3000);
    ParallelDemo.CTSource.Cancel();

    Console.ReadKey();
}

循环异常收集

并行循环执行过程当中,能够捕获并收集迭代操做引起的异常,循环结束时抛出一个AggregateException异常,并将收集到的异常赋给它的内部异常集合InnerExceptions。外部使用时,捕获AggregateException,便可进行并行循环的异常处理。

下面的例子模拟了并行循环的异常抛出、收集及处理的过程。

/// <summary>
/// 捕获循环异常
/// </summary>
public static void CaptureTheLoopExceptions()
{
    ConcurrentQueue<Exception> exceptions = new ConcurrentQueue<Exception>();
    Parallel.ForEach(Enumerable.Range(0, 100), i =>
    {
        try
        {
            if (i % 10 == 0)
            {//模拟抛出异常
                throw new Exception($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] had thrown a exception. [{i}]");
            }
            Console.WriteLine(i + 1);
            Thread.Sleep(100);
        }
        catch (Exception ex)
        {//捕获并收集异常
            exceptions.Enqueue(ex);
        }
    });

    if (!exceptions.IsEmpty)
    {// 方法内部可直接进行异常处理,若需外部处理,将收集到的循环异常抛出
        throw new AggregateException(exceptions);
    }
}

外部处理方式

try
{
    ParallelDemo.CaptureTheLoopExceptions();
}
catch (AggregateException aex)
{
    foreach (Exception ex in aex.InnerExceptions)
    {// 模拟异常处理
        Console.WriteLine(ex.Message);
    }
}

分区并行处理

当循环操做很简单,迭代范围很大的时候,ParallelLoop提供一种分区的方式来优化循环性能。下面的例子展现了分区循环的使用,同时也能比较几种循环方式的执行效率。

/// <summary>
/// 分区并行处理,顺便比较各类循环的效率
/// </summary>
/// <param name="rangeSize">迭代范围</param>
/// <param name="opDuration">操做耗时</param>
public static void PartationParallelLoop(int rangeSize = 10000, int opDuration = 1)
{
    //PartationParallelLoopWithBuffer
    Stopwatch watch0 = Stopwatch.StartNew();
    Parallel.ForEach(Partitioner.Create(Enumerable.Range(0, rangeSize), EnumerablePartitionerOptions.None),
        i =>
        {//模拟操做
            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
            Thread.Sleep(opDuration);
        });
    watch0.Stop();

    //PartationParallelLoopWithoutBuffer
    Stopwatch watch1 = Stopwatch.StartNew();
    Parallel.ForEach(Partitioner.Create(Enumerable.Range(0, rangeSize),EnumerablePartitionerOptions.NoBuffering),
        i =>
        {//模拟操做
            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
            Thread.Sleep(opDuration);
        });
    watch1.Stop();

    //NormalParallelLoop
    Stopwatch watch2 = Stopwatch.StartNew();
    Parallel.ForEach(Enumerable.Range(0, rangeSize),
        i =>
        {//模拟操做
            Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
            Thread.Sleep(opDuration);
        });
    watch2.Stop();

    //NormalLoop
    Stopwatch watch3 = Stopwatch.StartNew();
    foreach (int i in Enumerable.Range(0, rangeSize))
    {//模拟操做
        Console.WriteLine($"{DateTime.Now}=> Thread-[{Thread.CurrentThread.ManagedThreadId}] was running. [{i}]");
        Thread.Sleep(opDuration);
    }
    watch2.Stop();
            
    Console.WriteLine();
    Console.WriteLine($"PartationParallelLoopWithBuffer    => {watch0.ElapsedMilliseconds}ms");
    Console.WriteLine($"PartationParallelLoopWithoutBuffer => {watch1.ElapsedMilliseconds}ms");
    Console.WriteLine($"NormalParallelLoop                 => {watch2.ElapsedMilliseconds}ms");
    Console.WriteLine($"NormalLoop                         => {watch3.ElapsedMilliseconds}ms");
}

在 I7-7700HQ + 16GB 配置 VS调试模式下获得下面一组测试结果。

Loop Condition PartationParallelLoop WithBuffer PartationParallelLoop WithoutBuffer Normal ParallelLoop Normal Loop
10000,1 10527 11799 11155 19434
10000,1 9513 11442 11048 19354
10000,1 9871 11391 14782 19154
100,1000 9107 5951 5081 100363
100,1000 9086 5974 5187 100162
100,1000 9208 5125 5255 100239
100,1 350 439 243 200
100,1 390 227 166 198
100,1 466 225 84 197

应该根据不一样的应用场景选择合适的循环策略,具体如何选择,朋友们可自行体会~

相关文章
相关标签/搜索