任务调度

>>返回《C# 并发编程》html

1. 调度到线程池

Task task = Task.Run(() =>
{
    Thread.Sleep(TimeSpan.FromSeconds(2));
});

Task.Run 也能正常地返回结果,能使用异步 Lambda 表达式。下面代码中 Task.Run 返回的 task 会在 2 秒后完成,并返回结果 13:编程

Task<int> task = Task.Run(async () =>
{ 
    await Task.Delay(TimeSpan.FromSeconds(2));
    return 13;
});

Task.Run 返回一个 Task (或 Task<T>)对象,该对象能够被异步或响应式代码正常使用。并发

注意: 但不要在 ASP.NET 中使用 Task.Run,除非你有绝对的把握。在 ASP.NET 中, 处理请求的代码原本就是在 ASP.NET 线程池线程中运行的,强行把它放到另外一个线程池线程一般会拔苗助长。
但UI程序,使用Task.Run能够执行耗时操做,有效的防止页面卡住问题。框架

在进行动态并行开发时, 必定要用 Task.Factory.StartNew 来代替 Task.Run异步

  • 由于根据默认配置, Task.Run 返回的 Task 对象适合被异步调用(即被异步代码或响应式代码使用)。
  • Task.Run 也不支持动态并行代码中广泛使用的高级概念,例如 父/子任务。

2. 任务调度器

须要让多个代码段按照指定的方式运行。例如async

  • 让全部代码段在 UI 线程中运行
  • 只容许特定数量的代码段同时运行。

2.1. Default 调度器

TaskScheduler.Default,它的做用是让任务在线程池中排队, Task.Run、并行、数据流的代码用的都是 TaskScheduler.Defaultspa

2.2. 捕获当前同步上下文 调度器

能够捕获一个特定的上下文,用 TaskScheduler.FromCurrentSynchronizationContext 调度任务,让它回到该上下文:pwa

TaskScheduler scheduler = TaskScheduler.FromCurrentSynchronizationContext();
这条语句建立了一个捕获当前 同步上下文TaskScheduler 对象,并将代码调度到这个上下文中线程

  • SynchronizationContext 类表示一个通用的调度上下文。
  • 大多数 UI 框架有一个表示 UI 线程的 同步上下文
  • ASP.NET 有一个表示 HTTP 请求的 同步上下文

建议:
在 UI 线程上执行代码时,永远不要使用针对特定平台的类型。 + WPF、IOS、Android 都有 Dispatchercode

  • Windows 应用商店平台使用 CoreDispatcher
  • WinForms 有 ISynchronizeInvoke 接口(即 Control.Invoke

不要在新写的代码中使用这些类型,就当它们不存在吧。使用这些类型会使代码无谓地绑定在某个特定平台上。

同步上下文 是通用的、基于上述类型的抽象类。

2.3. ConcurrentExclusiveSchedulerPair 调度器

它其实是两个互相关联的调度器。 只要 ExclusiveScheduler 上没有运行任务, ConcurrentScheduler 就可让多个任务同时执行。只有当 ConcurrentScheduler 没有执行任务时, ExclusiveScheduler 才能够执行任务,而且每次只容许运行一个任务:

public static void ConcurrentExclusiveSchedulerPairRun()
{
    var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, maxConcurrencyLevel: 2);
    //因为并行被限流,因此ConcurrentScheduler 会两个两个输出,而后执行完这两个开启的8个串行任务
    TaskScheduler concurrent = schedulerPair.ConcurrentScheduler;
    TaskScheduler exclusive = schedulerPair.ExclusiveScheduler;

    //Default 因为没有限制,因此第一层会先输出,所有随机
    // TaskScheduler concurrent = TaskScheduler.Default;
    // TaskScheduler exclusive =TaskScheduler.Default;

    var list = new List<List<int>>();
    for (int i = 0; i < 4; i++)
    {
        var actionList = new List<int>();
        list.Add(actionList);
        for (int j = 0; j < 4; j++)
        {
            actionList.Add(i * 10 + j);
        }
    }

    var tasks = list.Select(u => Task.Factory.StartNew(state =>
    {
        System.Console.WriteLine($"ConcurrentScheduler");
        ((List<int>)state).Select(i => Task.Factory.StartNew(state2 => System.Console.WriteLine($"ExclusiveScheduler:{state2}"), i, CancellationToken.None, TaskCreationOptions.None, exclusive)).ToArray();
    }, u, CancellationToken.None, TaskCreationOptions.None, concurrent));


    Task.WaitAll(tasks.ToArray());
}

输出:

ConcurrentScheduler
ConcurrentScheduler
ExclusiveScheduler:0
ExclusiveScheduler:1
ExclusiveScheduler:2
ExclusiveScheduler:3
ExclusiveScheduler:10
ExclusiveScheduler:11
ExclusiveScheduler:12
ExclusiveScheduler:13
ConcurrentScheduler
ConcurrentScheduler
ExclusiveScheduler:20
ExclusiveScheduler:21
ExclusiveScheduler:22
ExclusiveScheduler:23
ExclusiveScheduler:30
ExclusiveScheduler:31
ExclusiveScheduler:32
ExclusiveScheduler:33

ConcurrentExclusiveSchedulerPair 的常见用法是

  • ExclusiveScheduler 来确保每次只运行一个任务。
  • ExclusiveScheduler 执行的代码会在线程池中运行,可是使用了同一个 ExclusiveScheduler 对象的其余代码不能同时运行。

ConcurrentExclusiveSchedulerPair 的另外一个用法是做为限流调度器。

  • 建立的 ConcurrentExclusiveSchedulerPair 对象能够限制自身的并发数量。
  • 这时一般不使用 ExclusiveScheduler
var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default,maxConcurrencyLevel: 8);
TaskScheduler scheduler = schedulerPair.ConcurrentScheduler;

3. 调度并行代码

public static void RotateMatricesRun()
{
    List<List<Action<float>>> actionLists = new List<List<Action<float>>>();
    for (int i = 0; i < 15; i++)
    {
        var actionList = new List<Action<float>>();
        actionLists.Add(actionList);
        for (int j = 0; j < 15; j++)
        {
            actionList.Add(new Action<float>(degrees =>
            {
                Thread.Sleep(200);
                System.Console.WriteLine("degrees:" + degrees + " " + DateTime.Now.ToString("HHmmss.fff"));
            }));
        }
    }
    RotateMatrices(actionLists, 10);
    //虽然两个并行嵌套可是因为调度器的设置,致使任务是8个8个执行的,结果是8个后200ms再8个
}

static void RotateMatrices(IEnumerable<IEnumerable<Action<float>>> collections, float degrees)
{
    var schedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, maxConcurrencyLevel: 8);
    TaskScheduler scheduler = schedulerPair.ConcurrentScheduler;
    ParallelOptions options = new ParallelOptions
    {
        TaskScheduler = scheduler
    };
    Parallel.ForEach(collections, options,
        matrices =>
        {
            Parallel.ForEach(matrices,
                options,
                matrix => matrix.Invoke(degrees)
            );
            System.Console.WriteLine($"============");
        });
}

输出:

degrees:10 190424.120
...  118个 ...
degrees:10 190426.963
============
============
============
============
============
============
============
============
degrees:10 190427.167
...  6个 ...
degrees:10 190427.167
... 5个 ...
degrees:10 190428.589
...  6个 ...
degrees:10 190428.589
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.791
============
degrees:10 190428.791
degrees:10 190428.791
degrees:10 190428.994
...  6个 ...
degrees:10 190428.994
============
degrees:10 190429.194
...  6个 ...
degrees:10 190429.194
============
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
============
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.395
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
============
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
degrees:10 190429.598
============
degrees:10 190429.800
============

4. 用调度器实现数据流的同步

Stopwatch sw = Stopwatch.StartNew();
// 模拟 UI同步上下文
AsyncContext.Run(() =>
{
    var options = new ExecutionDataflowBlockOptions
    {
        //使用次调度器,则代码会放到建立线程的同步上下文上执行(如果当前同步上下文是UI Context 或 此例的AsyncContext)
        //运行和注释下行运行观察Creator和Executor线程Id的变化
        TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(),
    };
    var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
    System.Console.WriteLine($"Creator ThreadId: {Thread.CurrentThread.ManagedThreadId}.");
    var displayBlock = new ActionBlock<int>(result =>
    {
        // ListBox.Items.Add(result)
        System.Console.WriteLine($"Executor ThreadId: {Thread.CurrentThread.ManagedThreadId} res:{result}.");
    }, options);
    multiplyBlock.LinkTo(displayBlock);

    for (int i = 0; i < 5; i++)
    {
        multiplyBlock.Post(i);
        System.Console.WriteLine($"Post {i}");
    }
    multiplyBlock.Completion.Wait(2000);
});
System.Console.WriteLine($"Cost {sw.ElapsedMilliseconds}ms.");

输出:

Creator ThreadId: 1.
Post 0
Post 1
Post 2
Post 3
Post 4
Executor ThreadId: 1 res:0.
Executor ThreadId: 1 res:2.
Executor ThreadId: 1 res:4.
Executor ThreadId: 1 res:6.
Executor ThreadId: 1 res:8.
Cost 2062ms.
相关文章
相关标签/搜索