正在 await 一批任务,但愿在每一个任务完成时对它作一些处理。另外,但愿在任务一完成就当即进行处理,而不须要等待其余任务。html
问题的重点在于但愿任务完成以后当即进行处理,而不去等待其余任务。
这里还沿用文中的例子。
等待几秒钟以后返回等待的秒数,以后当即打印任务等待的秒数。
等待的函数以下git
static async Task<int> DelayAndReturnAsync(int val) { await Task.Delay(TimeSpan.FromSeconds(val)); return val; }
如下方法执行以后的打印结果是“2”, “3”, “1”。想获得结果“1”, “2”, “3”应该如何实现。github
static async Task ProcessTasksAsync() { // 建立任务队列。 Task<int> taskA = DelayAndReturnAsync(2); Task<int> taskB = DelayAndReturnAsync(3); Task<int> taskC = DelayAndReturnAsync(1); var tasks = new[] { taskA, taskB, taskC }; // 按顺序 await 每一个任务。 foreach (var task in tasks) { var result = await task; Trace.WriteLine(result); } }
文中给了两种解决方案。一种是抽出更高级的async方法,一种是借助做者的nuget拓展。做者还推荐了另外两个博客文章。
Processing tasks as they complete
ORDERING BY COMPLETION, AHEAD OF TIME
这两篇文章介绍了更多处理方法。算法
static async Task AwaitAndProcessAsync(Task<int> task) { var result = await task; Trace.WriteLine(result); }
将执行和处理抽象出来,借助Task.WhenAll和LINQ并发执行。并发
var processingTasks = (from t in tasks select AwaitAndProcessAsync(t)).ToArray(); // 等待所有处理过程的完成。 await Task.WhenAll(processingTasks);
或者async
var processingTasks = tasks.Select(async t => { var result = await t; Trace.WriteLine(result); }).ToArray(); // 等待所有处理过程的完成。 await Task.WhenAll(processingTasks);
推荐预发布版本:https://www.nuget.org/packages/Nito.AsyncEx/5.0.0-pre-06
须要添加引用using Nito.AsyncEx;
函数
static async Task UseOrderByCompletionAsync() { // 建立任务队列。 Task<int> taskA = DelayAndReturnAsync(2); Task<int> taskB = DelayAndReturnAsync(3); Task<int> taskC = DelayAndReturnAsync(1); var tasks = new[] { taskA, taskB, taskC }; // 等待每个任务完成。 foreach (var task in tasks.OrderByCompletion()) { var result = await task; Trace.WriteLine(result); } }
使用ConcurrentExclusiveSchedulerPair,使任务串行执行,结果是“2”, “3”, “1”。code
var scheduler = new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler; foreach (var t in tasks) { await t.ContinueWith(completed => { switch (completed.Status) { case TaskStatus.RanToCompletion: Trace.WriteLine(completed.Result); //Process(completed.Result); break; case TaskStatus.Faulted: //Handle(completed.Exception.InnerException); break; } }, scheduler); }
上篇文章中提到了使用Task.WhenAny处理已完成的任务:http://www.javashuo.com/article/p-uxgvedou-cy.html
文中的例子从算法层面是不推荐使用的,做者推荐了他本身的拓展Nito.AsyncEx
,源码地址:https://github.com/StephenCleary/AsyncEx/blob/master/src/Nito.AsyncEx.Tasks/TaskExtensions.cs。
另外两种实现的实现方法差很少,都是借助TaskCompletionSource<T>
和Interlocked.Incrementa处理Task。
这里只列出ORDERING BY COMPLETION, AHEAD OF TIME的解决方案。htm
/// <summary> /// 返回一系列任务,这些任务的输入类型相同和返回结果类型一致 /// 返回的任务将以完成顺序返回 /// </summary> private static IEnumerable<Task<T>> OrderByCompletion<T>(IEnumerable<Task<T>> inputTasks) { // 复制输入,如下的处理将不须要考虑是否会对输入有影响 var inputTaskList = inputTasks.ToList(); var completionSourceList = new List<TaskCompletionSource<T>>(inputTaskList.Count); for (int i = 0; i < inputTaskList.Count; i++) { completionSourceList.Add(new TaskCompletionSource<T>()); } // 索引 // 索引最好是从0开始,可是 Interlocked.Increment 返回的是递增以后的值,因此这里应该赋值-1 int prevIndex = -1; // 能够不用再循环以外处理Action,这样会让代码更清晰。如今有C#7.0的新特性本地方法可使用 /* //本地方法 void continuation(Task<T> completedTask) { int index = Interlocked.Increment(ref prevIndex); var source = completionSourceList[index]; PropagateResult(completedTask, source); }*/ Action<Task<T>> continuation = completedTask => { int index = Interlocked.Increment(ref prevIndex); var source = completionSourceList[index]; PropagateResult(completedTask, source); }; foreach (var inputTask in inputTaskList) { inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } return completionSourceList.Select(source => source.Task); } /// <summary> /// 对 TaskCompletionSource 进行处理 /// </summary> private static void PropagateResult<T>(Task<T> completedTask, TaskCompletionSource<T> completionSource) { switch (completedTask.Status) { case TaskStatus.Canceled: completionSource.TrySetCanceled(); break; case TaskStatus.Faulted: completionSource.TrySetException(completedTask.Exception.InnerExceptions); break; case TaskStatus.RanToCompletion: completionSource.TrySetResult(completedTask.Result); break; default: throw new ArgumentException("Task was not completed"); } }