C#并行编程(4):基于任务的并行

C#中的任务Task

在C#编程中,实现并行能够直接使用线程,但使用起来很繁琐;也可使用线程池,线程池很大程度上简化了线程的使用,可是也有着一些局限,好比咱们不知道做业何时完成,也取不到做业的返回值;解决线程池局限性的方案是使用任务。本文将总结C#中Task的使用。编程

相似于线程池工做项对异步操做的封装,任务是对异步操做的另外一种形式的封装,这种封装抽象层次更高,让咱们可以对异步操做进行更多的控制。异步

任务启动后,经过任务调度器TaskScheduler来调度。.NET中提供两种任务调度器,一种是线程池任务调度器,也是默认调度器,它会将任务派发给线程池工做者线程;另外一种是上下文同步任务调度器,它会将任务派发给当前上下文线程,例如GUI线程。此外,咱们也能自定义任务调度器,例如能够将异步IO任务派发给线程池IO线程。测试

Task的使用方法

隐式使用

Parallel静态类除了提供并行循环的各类重载,还提供了一个方法Parallel.Invoke。这个方法能够建立并执行一个或多个异步任务,使用方法以下:this

/// <summary>
/// 任务模拟
/// </summary>
private static void DoWork(int workId = 0)
{
    Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started work[{workId}].");
    Thread.Sleep(3000);
    Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] done work[{workId}].");
}

/// <summary>
/// 任务的隐式使用
/// </summary>
public static void ImplicitUsingOfTask()
{
    Parallel.Invoke(()=>DoWork(1),()=>DoWork(2),() => DoWork(3));
}

上例的运行结果以下:pwa

2019/3/27 20:40:18=> Thread[9] started work[1].
2019/3/27 20:40:18=> Thread[12] started work[3].
2019/3/27 20:40:18=> Thread[10] started work[2].
2019/3/27 20:40:21=> Thread[9] done work[1].
2019/3/27 20:40:21=> Thread[12] done work[3].
2019/3/27 20:40:21=> Thread[10] done work[2].线程

对于简单的多任务并行,使用上述的方式很方便,可是这种方式与线程池同样,咱们不能控制任务的执行或者获取任务返回值。code

显式使用

相对于使用Parallel.Invoke执行并行操做,更经常使用的是使用TaskTask<T>提供的方法进行异步和并行处理。下面是任务最基本的使用:继承

Task.Run(() =>
{
    //TODO
});
Task.Factory.StartNew(() =>
{
    //TODO
});

任务的经常使用操做

获取任务的返回值

具备返回值的任务使用Task<T>,T可根据咱们的需求指定,下面是获取任务返回值的方法。token

Task<int> task = Task<int>.Factory.StartNew(() =>
{
    Thread.Sleep(3000);//模拟操做用时
    return DateTime.Now.Day;
});
int day = task.Result;

须要说明的是,获取任务的结果会阻塞当前线程。ci

等待任务完成

有时候,咱们须要等待一些任务所有完成后才能执行后续操做,有时候只要多个任务中的一个完成了,就能够执行后续操做。Task提供了WaitWaitAllWaitAny等方法知足咱们的需求。下面的例子展现了各类等待方法的使用。

/// <summary>
/// 任务等待测试
/// </summary>
public static void TaskWait()
{
    Stopwatch watch = new Stopwatch();

    #region 场景1:等待一个任务完成
    Task task = Task.Run(() => DoWorkOfTask(1000));
    Console.WriteLine("start wait. work duration: 1000");
    watch.Start();
    task.Wait();//等待1秒左右
    watch.Stop();
    Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
    #endregion

    #region 场景2:等待多个任务完成
    Task[] tasks = new Task[3]
    {
        Task.Run(() => DoWorkOfTask(1000)),
        Task.Run(() => DoWorkOfTask(2000)),
        Task.Run(() => DoWorkOfTask(3000)),
    };

    Console.WriteLine("start wait all. work duration: min 1000 max 3000.");
    watch.Restart();
    Task.WaitAll(tasks);//等待3秒左右
    watch.Stop();
    Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
    #endregion

    #region 场景3:等待某个任务完成
    tasks = new Task[3]
    {
        Task.Run(() => DoWorkOfTask(1000)),
        Task.Run(() => DoWorkOfTask(2000)),
        Task.Run(() => DoWorkOfTask(3000)),
    };
    Console.WriteLine("start wait any. work duration: min 1000 max 3000.");
    watch.Restart();
    Task.WaitAny(tasks);//等待1秒左右
    watch.Stop();
    Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}");
    #endregion
}

/// <summary>
/// 作任务
/// </summary>
/// <param name="workDuration">任务时长</param>
private static void DoWorkOfTask(int workDuration)
{
    Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task[{Task.CurrentId}].");
    Thread.Sleep(workDuration);
    Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task[{Task.CurrentId}].");
}

使用WaitWaitAllWaitAny方法时,咱们能够设置超时时间或者传入取消Token,以控制等待时间。但这些方法返回布尔值,只能代表是否等待成功;假如咱们须要知道所等待的任务返回值,则可使用WhenAllWhenAny方法,这两个方法不能控制等待时间,但会返回一个完成的任务。以下例:

Task<int>[] tasks = new Task<int>[3]
{
    Task<int>.Factory.StartNew(() =>
    {
        Console.WriteLine($"task #{Task.CurrentId} run");
        Thread.Sleep(100);
        Console.WriteLine($"task #{Task.CurrentId} done");
        return 100;
    }),
    Task<int>.Factory.StartNew(() =>
    {
        Console.WriteLine($"task #{Task.CurrentId} run");
        Thread.Sleep(500);
        Console.WriteLine($"task #{Task.CurrentId} done");
        return 1000;
    }),
    Task<int>.Factory.StartNew(() =>
    {
        Console.WriteLine($"task #{Task.CurrentId} run");
        Thread.Sleep(1000);
        Console.WriteLine($"task #{Task.CurrentId} done");
        return 10000;
    }),
};

//int[] results = Task.WhenAll(tasks).Result;
//Console.WriteLine($"[{string.Join(",",results)}]");

Task<int> task = Task.WhenAny(tasks).Result;
Console.WriteLine($"task #{task.Id}. result {task.Result}");

Task.WhenAllTask.WhenAny在等待结束时,都会建立一个完成状态的任务,WhenAll将等待的全部已完成任务的结果放入建立任务的结果中,WhenAny则将等待的已完成任务放到建立任务的结果中。

任务延续

有时候,咱们须要在一个任务完成时开始另外一个任务。对于这种需求,咱们可使用TaskContinueWith等方法来处理。

Task task = Task.Run(() => DoWorkOfTask(3000));
task.ContinueWith(t => DoWorkOfTask(1000));

运行结果:

2019/3/27 21:25:09=> Thread[10] started task[1].

2019/3/27 21:25:12=> Thread[10] completed task[1].

2019/3/27 21:25:12=> Thread[11] started task[2].

2019/3/27 21:25:13=> Thread[11] completed task[2].

咱们还能够经过TaskContinuationOptions指定延续任务的执行条件,如任务取消时或者任务出现异常时才执行,等。

子任务的使用

有时候,咱们要在一个任务里面建立一些其余任务,而且还要在任务里面等待建立的任务完成,此时咱们可使用子任务。

Task parent = Task.Factory.StartNew(() =>
{
    Console.WriteLine($"parent task #{Task.CurrentId} run.");
    for (int i = 0; i < 10; i++)
    {
        Task.Factory.StartNew(() =>
        {
            Console.WriteLine($"child task #{Task.CurrentId} run.");
            Thread.Sleep(1000);
            Console.WriteLine($"child task #{Task.CurrentId} done.");
        }, TaskCreationOptions.AttachedToParent);
    }
});
parent.Wait();
Console.WriteLine($"parent task #{parent.Id} done.");

在一个任务中建立的新任务,默认状况下与父级任务是分离的,各自的运行不受影响,除非在建立任务时显式附加到父级任务中。例如,上例中若是不指定TaskCreationOptions.AttachedToParent,parent.Wait()就不会持续到全部子任务都执行完成。

任务的取消

咱们在启动任务时,传入取消令牌CancellationToken,当收到取消请求时,抛出取消异常并在等待任务完成时捕获异常TaskCanceledException。咱们经过这种方式控制任务的取消。

/// <summary>
/// 任务取消
/// </summary>
public static void TaskCancle()
{
    Console.WriteLine("Press any key to begin. Press 'c' to cancel. ");
    Console.ReadKey(true);
    Console.WriteLine();

    CancellationTokenSource tokenSource = new CancellationTokenSource();
    ConcurrentBag<Task> tasks = new ConcurrentBag<Task>();
    //单任务取消
    Task task1 = Task.Factory.StartNew(() => DoWorkOfTask(5000, tokenSource.Token), tokenSource.Token);
    tasks.Add(task1);

    //嵌套任务取消
    Task task2 = Task.Factory.StartNew(() =>
    {
        for (int i = 0; i < 10; i++)
        {
            int duration = 1000 * i;
            tasks.Add(Task.Factory.StartNew(()=>DoWorkOfTask(duration, tokenSource.Token), tokenSource.Token));
        }
        DoWorkOfTask(5000,tokenSource.Token);
    }, tokenSource.Token);
    tasks.Add(task2);

    char ch = Console.ReadKey().KeyChar;
    if (ch == 'c' || ch == 'C')
    {
        tokenSource.Cancel();
        Console.WriteLine($"{DateTime.Now}=> Task cancellation requested.");
    }

    try
    {
        Task.WaitAll(tasks.ToArray());
    }
    catch (AggregateException ae)
    {
        foreach (Exception ex in ae.InnerExceptions)
        {//任务取消经过抛出TaskCanceledException实现
            TaskCanceledException tce = ex as TaskCanceledException;
            string cancelledTask = tce == null ? string.Empty : $"Task #{tce.Task.Id}";
            Console.WriteLine($"Exception: {ex.GetType().Name}. {cancelledTask}");
        }
    }
    finally
    {
        tokenSource.Dispose();
    }

    Console.WriteLine();
    //显示任务状态
    foreach (Task task in tasks)
    {
        Console.WriteLine($"Task: #{task.Id} now is {task.Status}");
    }
}

/// <summary>
/// 带取消令牌的做业
/// </summary>
/// <param name="workDuration">做业时长</param>
/// <param name="cancleToken">取消令牌</param>
private static void DoWorkOfTask(int workDuration, CancellationToken cancleToken)
{
    if (cancleToken.IsCancellationRequested)
    {//开始以前取消
        Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled before it got started.");
        cancleToken.ThrowIfCancellationRequested();
    }

    Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task #{Task.CurrentId}.");
    Thread.Sleep(workDuration);

    if (cancleToken.IsCancellationRequested)
    {//开始以后取消
        Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled.");
        cancleToken.ThrowIfCancellationRequested();
    }
    Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task #{Task.CurrentId}.");
}

任务的异常处理

上面提到经过取消令牌抛出TaskCanceledException的方式控制任务的取消,实际上,Task会把自身执行过程当中的全部异常都包装到一个AggregateException中,并传回调用线程。咱们在主线程中经过捕获AggregateException来进行异常处理。

简单的处理方式

咱们能够在任务的调用线程捕获并遍历AggregateException的内部异常,或者使用AggregateException提供的Handle方法进行处理,以下:

Task task = Task.Run(() =>
{
    throw new Exception($"Task #{Task.CurrentId} thrown an exception");
});
try
{
    task.Wait();
}
catch (AggregateException ae)
{
    //处理方式1:遍历内部异常进行处理
    foreach (Exception ex in ae.InnerExceptions)
    {
        Console.WriteLine($"foreach: {ex.Message}");
    }

    //处理方式2:使用AggregateException的Handle方法
    ae.Handle(ex=>
    {
        Console.WriteLine($"handle: {ex.Message}");
        return true ;
    });
}

使用延续任务处理任务的异常

有时候,咱们能够给任务附加一个任务异常时才会执行的延续任务,并在延续任务中进行异常处理。

Task.Run(() => { throw new Exception($"Task #{Task.CurrentId} thrown an exception"); })
    .ContinueWith(t =>
    {
        Console.WriteLine($"{t.Exception?.InnerException?.Message}");
    }, TaskContinuationOptions.OnlyOnFaulted);

嵌套任务的异常处理

下面是一个3层嵌套的任务。

Task parent = Task.Factory.StartNew(() =>
{//父级任务
    for (int i = 0; i < 10; i++)
    {
        Task.Factory.StartNew(() =>
        {//1代子任务
            for (int j = 0; j < 10; j++)
            {
                Task.Factory.StartNew(() =>
                {//2代子任务
                    throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
                }/*, TaskCreationOptions.AttachedToParent*/);
            }

            throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
        }/*, TaskCreationOptions.AttachedToParent*/);
    }

    throw new Exception($"Task #{Task.CurrentId} thrown an exception. ");
});
try
{
    parent.Wait();
}
catch (AggregateException ae)
{
    ae.Flatten().Handle(ex =>
    {
        Console.WriteLine(ex.Message);
        return true;
    });
}

运行上面的代码只会获得一行输出:

Task #1 thrown an exception.

看起来有点奇怪,为何只捕获到一个异常呢?其实也是在情理之中的:任务默认只会把自身异常传递到它本身的调用线程,子任务是在父任务中调用的,其异常只会传递到父任务的执行线程,因此咱们在父任务的调用线程,也就是咱们的主线程中是捕获不到子任务的异常的。

取消上面代码的两处/*, TaskCreationOptions.AttachedToParent*/,就会捕获到全部异常。

任务调度器

.NET提供的任务调度器

任务是由TaskScheduler调度的,启动任务时,默认使用线程池任务调度器,任务将会被派发到线程池工做线程。线程池的调度前面已经总结过,这里再也不展开。.NET提供的另外一种任务调度器是同步上下文调度器,用TaskScheduler.FromCurrentSynchronizationContext()获取,这个调度器会把任务派发给当前的上下文线程,经常使用在GUI应用程序中。

例如,咱们在一个窗体中新建一个ListBox,新建几个任务向其中添加项,代码以下:

this.lbxMsg.Items.Add($"{DateTime.Now:O}=>Current thread is thread #{Thread.CurrentThread.ManagedThreadId} .");
for (int i = 0; i < 10; i++)
{
    new Task(() =>
    {
        for (int j = 0; j < 3; j++)
        {
            this.lbxMsg.Items.Add($"{DateTime.Now:O}=> Task #{Task.CurrentId} add an item with thread #{Thread.CurrentThread.ManagedThreadId}.");
        }
                        
    }).Start(TaskScheduler.FromCurrentSynchronizationContext());
}

运行上面的代码能够发现建立的任务都是由界面线程执行的。这里若是使用默认的任务调度器将产生"线程间操做无效"的异常。

实际使用时,能够给一个异步任务添加延续任务,来处理异步任务的结果或者异常等。以下:

Task.Run(() =>
{
    Thread.Sleep(3000); // 模拟操做过程
    return 1000; // 模拟结果
}).ContinueWith(t =>
{
    this.lbxMsg.Items.Add(t.Result); // 在界面呈现结果或作其余处理
}, TaskScheduler.FromCurrentSynchronizationContext());

自定义任务调度器

除了使用.NET提供的调度器外,咱们可以继承类TaskScheduler来实现本身的任务调度器。这里再也不展开,须要了解的能够参考Samples for Parallel Programming with the .NET Framework

相关文章
相关标签/搜索