《CLR Via C#》读书笔记:27.计算限制的异步操做

1、CLR 线程池基础

通常来讲若是计算机的 CPU 利用率没有 100% ,那么说明不少进程的部分线程没有运行。可能在等待 文件/网络/数据库等设备读取或者写入数据,又多是等待按键、鼠标移动等事件。算法

执行 I/O 限制的操做时,操做系统经过设备驱动程序通知硬件干活,而 CPU 处于一种空闲状态。而在现代应用程序当中,使用线程池来执行计算限制的操做,而不是手动建立线程。数据库

每一个 CLR 都有本身独立的线程池,而且由各自 CLR 控制的全部 AppDomain 所共享。安全

线程池自己维护了一个请求队列,当程序须要执行一个异步操做的时候,会将一个记录项追加到队列之中,而后由线程池将该记录项分派给线程池线程,若是没有线程则建立一个新的线程。线程任务处理完整以后,将该线程放入线程池中等待之后进行复用。网络

线程池自己是启发式的,结合程序负载,他会本身根据当前线程池内线程的状态销毁/新增线程。数据结构

2、执行简单的计算限制操做

经过 ThreadPool 静态类,咱们能够方便地使用线程池中的线程为咱们执行一些计算限制的异步操做。只须要调用 ThreadPoolQueueUserWorkItem(WaitCallBack callback) 方法,或者是他的另外一个重载方法,接收一个 state 值做为参数。多线程

他的两个方法都是非阻塞的,调用以后会当即返回。异步

WaitCallBack 的方法签名以下:函数

delegate void WaitCallBack(Object state);

在 CLR 的线程池中,将 callback 委托做为工做项添加到队列当中,而后由线程池分发线程进行处理。oop

【注意】性能

一旦回调方法抛出了未处理的异常,CLR 会当即终止进程。

3、执行上下文

每一个线程都有一个执行上下文的数据结构,包含由安全设置,宿主设置和逻辑调用上下文数据(AsyncLocal 与 CallContext)。

当在某个线程(例如主线程)使用了另一个线程(辅助线程),就会产生执行上下文由调用者线程流向被调用线程。这会对性能形成必定的影响,这是由于执行上下文包含有大量地信息。而若是辅助线程又调用了更多的辅助线程,这个时候执行上下问的复制开销就很是大。

咱们能够经过 ExecutionContext 类控制线程的执行上下文是否流向辅助线程,只有辅助线程不须要访问执行上下文时能够阻止执行上下文流动。当阻止了执行上下文流动时,辅助线程会使用最后一次与其关联的任意执行上下文,这个时候对于安全设置等就不可信,不该执行任何依赖于执行上下文的操做。

通常来讲在主线程,能够经过 ExecutionContext.SuppressFlow(); 方法阻止执行上下文流动,而后再经过 ExecutionContext.RestoreFlow(); 恢复流动。

4、协做式取消和超时

.NET 提供了标准的取消操做模式,这个模式是协做式的,也就是你要取消的操做必须显式声明本身能够被取消。这是由于用户在执行某些长耗时的计算限制操做的时候,可能会由于等待时间太长或者其余缘由须要取消这个操做。

首先咱们经过 System.Threading.CancellationTokenSource 对象管理或者取消对象状态,使用时直接 new 一个便可,而该对象拥有一个 CancellationToken 对象。

这个 Token 对象用于传递给执行计算限制操做的方法,经过该 Token 的 IsCancellationRequested 属性你能够在方法内部确认任务是否被取消,若是被取消你就能够进行返回操做。

例子以下:

static void Main(string[] args)
{
    var tokenSource = new CancellationTokenSource();

    ThreadPool.QueueUserWorkItem(z => Calculate(CancellationToken.None, 10000));

    Console.ReadKey();
    tokenSource.Cancel();

    Console.ReadLine();
}

private static void Calculate(CancellationToken token, int count)
{
    for (int i = 0; i < count; i++)
    {
        if (token.IsCancellationRequested)
        {
            Console.WriteLine("用户提早终止操做,退出线程..");
            break;
        }
        
        Console.WriteLine(count);
        Thread.Sleep(200);
    }
    
    Console.WriteLine("计数完成.");
}

【注意】

若是你要执行一个不容许被取消的操做,能够为方法传递一个 CancellationToken.None 对象,由于该对象没有 Source 源,则不可能会被调用 Cancel() 进行取消。

注册取消事件

CancellationToken 容许咱们经过 Register() 方法注册多个委托,这些被注册了的委托会在 TokenSource 调用 Cancel 取消的时候优先调用,其调用的前后顺序为注册时的顺序。

【注意】

调用 Register() 方法的时候,他有两个 bool 类型的参数,分别是 useSyncContextuseExecutionContext。这两个参数用于指定,是否要用调用线程的同步上下文或者执行上下文来调用回调函数。

同时在注册成功以后会返回一个 CancellationTokenRegistration 对象,经过调用该对象的 Dispose 方法能够删除已经注册的委托回调,这样在取消时就不会调用该回调。

TokenSource 连接

能够经过 CancellationTokenSource.CreateLinkedTokenSource() 连接两个或多个对象,连接成功后会返回一个单独的 TokenSource 对象。

一旦这个新对象连接的任何一个 TokenSource 对象被取消的时候,该对象也会被取消掉。

Cancel 的异常处理

在调用 TokenSource 的 Cancel() 方法时(默认为 false),该方法还有另一个重载传入 bool 值,若是为 true 的时候,有多个注册的回调委托,一旦某个出现异常直接会被抛出该异常,不会等待其余回调执行完毕。

若是为 false,则会等到全部回调方法执行完成时,抛出一个 AggregateException 异常,内部的 InnerExceptions 包含有全部在执行过程当中产生的异常信息集合。

超时取消

除了直接调用 Cancel() 当即取消操做以外,还有一个延迟取消的方法 CancelAfter(),经过传递具体的延迟时间,咱们能够在指定的之间以后取消某个任务。(PS:有点像 Polly 的 TimeOut )

5、任务

为啥使用任务,虽然经过 ThreadPool 能够很方便地发起一次计算限制的操做,可是你不知道你的方法啥时候执行完成,也没法在操做完成以后得到返回值。

使用任务执行一个计算限制操做有两种方式,二者也同样的能够传递 CancellationToken 进行取消操做:

  1. new Task(Sum,20).Start();
  2. Task.Run(()=>Sum(20));

除此以外还能够在构造 Task 时 传递一些标志位,用于任务调度器进行一些特殊处理。

等待任务完成并获取结果

任务除了标准的无返回值的 Task 类型以外,还有一个包含有泛型参数的 Task<TResult> 类型,其中 TResult 参数就是任务的返回值类型。

在建立好 Task<TResult> 对象以后,能够经过 Task.Wait() 等待任务执行完成,Task 的 Wait() 方法会阻塞调用者线程直到任务执行完成。执行完成以后,能够经过 Task.Reuslt 获取任务执行以后的返回值。

PS:

这里获取 Result 属性值时,其内部也会调用 Wait() 方法。

若是该 Task 内的计算限制操做抛出了未经处理的异常,这个异常会被吞噬掉,调用 Wait() 方法或者使用 Result 属性的时候,这些异常信息会被包裹在 AggregateException 内部并返回给调用者线程。

【注意】

不推荐直接调用 Wait() ,若是 Task 已经开始执行,该方法会阻塞调用者线程,直到执行完成。第二种状况是任务尚未开始执行的时候,调用者线程不会被阻塞,Task 当即执行并返回。而调度器可能会使用调用者线程来执行这些操做,这个时候,若是调用者线程获取了一个线程同步锁,而 Task 由于也是在调用者线程执行,尝试获取锁的时候,就会产生死锁。

AggregateException 可能会包含有多个异常,这个时候可使用该对象的 Handle(Func<Exception,bool> predicate) 方法来为每个异常进行处理,处理返回 true,未处理返回 false。

在调用了 Handle 方法以后,仍然有异常没有处理,这些没有处理的异常又会形成一个新的 AggregateException 被抛出。

【注意】

若是不知道有哪些 Task 内部未处理的异常,能够经过象任务调度器的 UnobservedTaskException 事件登记一个回调方法,若是存在一个没有处理到的异常,则会调用你的回调方法,并将异常传递给你。

除了 Task.Wait() 方法,还有等待一组任务的 Task.WaitAny()Task.WaitAll()。几个方法都会阻塞调用者线程,前者当传递的一组任务有任意一个完成则当即返回该任务的索引,后者则是要等待这一组任务所有完成以后才会唤醒调用线程。

这两个方法一旦被取消,都会抛出 OperationCanceledException 异常。

取消任务

能够经过一个 CancellationTokenSource 来取消 Task,同样的须要传入的计算限制方法添加 CancellationToken 参数。

只不过呢,在 Task 任务内部咱们不经过 IsCancellationRequested 来判断任务是否取消,而是经过调用 Token 的 ThrowIfCancellationRequested() 方法来抛出异常。

该方法会判断当前任务是否被取消,若是被取消了,则抛出异常。这是由于与直接经过线程池添加任务不一样,线程池没法知道任务什么时候完成,而任务则能够表示是否完成,而且还能返回值。

任务完成时启动新任务

以前说过经过调用 Task.Wait() 或者在任务还没有完成的时候调用 Task.Result 属性,都会形成线程池建立新的线程。而咱们能够经过在任务完成以后,当即开启一个新的任务,这样咱们就能够经过新的任务知道前一个任务是否已经完成了。

建立一个的计算限制任务对象,咱们在启动了该任务对象以后,调用 Task.ContinueWith() 方法来建立一个延续任务,新的延续性任务会有一个 Task 参数,该参数就是最开始的任务。

而在使用 Task.ContinueWith() 时,他还能够传递一个标识位。这个标识位用于代表这个延续性任务是在第一个任务什么状况下才会执行,通常有三种:OnlyOnCanceled(第一个任务取消时才被执行)、OnlyOnFaulted(第一个任务抛出未处理异常时执行)、OnlyOnRanToCompletion(第一个任务顺利完成时执行)。

启动子任务

一个任务在其内部能够建立其子任务,只须要在内部构造 Task 对象的时候,传递一个标识位 TaskCreationOptions.AttachedToParent 将其与父任务关联。这样的话,除非其全部子任务执行完成,父任务不会被认为已经完成。

延续性任务也能够做为第一个任务的子任务,指定对应的标识位便可。

任务的内部构造

任务主要由如下几部分构成:

  1. 任务惟一的 Task Id。
  2. 调度器的引用。
  3. 回调方法的引用。
  4. 执行上下文的引用。
  5. 其余...

能够看到构造一个 Task 仍是须要比较大的开销的,若是你不须要 Task 的附加特性,彻底可使用 TaskPool.QueueUserworkItem 来得到更好的性能与效率。

经过 Task 的只读属性 Task.Status,咱们能够知道任务目前处于哪一种状态,其最终的状态主要有 3 种,分别是:RanToCompletion(已完成)、Canceled(被取消)、Faulted(出现异常失败),这三种状态都属于任务完成状态。

另外值得注意的是,经过 ContinueWith()ContinueWhenAll()ContinueWhenAny() 等方法建立的任务状态都为 WaitingForActivation,这个状态表明任务会自动开始。

任务工厂

若是你须要在执行多个相同配置的 Task 对象,能够经过 TaskFactoryTaskFactory<TResult>,其大概含义与 Task 的含义相同。

在建立工厂时,能够传递一些经常使用的配置标识位和 CancellationToken 对象,以后咱们能够经过 StartNew() 方法来统一执行一堆任务。

任务调度器

任务调度器通常有两种,第一种是线程池任务调度器,通常用于服务端程序。还有一种是同步上下文任务调度器,通常用于 GUI 程序。

6、Parallel 的 For、Foreach、Invoke

For 与 Foreach 基本用于操做一个集合,而后循环处理其值。而若是在某个方法内部须要执行多个方法,则能够经过 Invoke 来进行执行。使用 Parallel 类可让 CPU 最大化地利用起来而不会阻塞主线程。

不过通常不会将全部 For 与 Foreach 都替换为并行化的查询,这是由于某些循环会修改共享数据,这个时候使用 Parallel 的操做则会破坏数据,虽然能够经过增长线程同步锁来解决,不过这样会形成单线程访问,没法享受并行处理的好处。

同时 Parallel 方法自己也会有开销,固然在大量重复性任务中这种开销能够忽略不计,可是若是仅为几十个短耗时的计算限制任务启用 Parallel 就会得不偿失。

这三种操做都接受一个ParallelOptions 对象用于配置最大并行的工做项数目与调度器。

Parallel 的 For 与 Foreach 的一个重载方法容许传入 3 个委托,他们分别是:

  • 任务局部初始化委托(localInit):该委托是在每次工做项处理以前被调用。
  • 任务主体委托(body):具体的工做项处理逻辑,参与工做的各个线程都会调用一次。
  • 任务局部终结器委托(localFinally):本委托是在每一个工做项处理完成以后都会被调用。

从上述逻辑来看,能够看做局部初始化委托为一个父任务,后面两个为子级连续任务的构造。

实例:

static void Main(string[] args)
{
    var files = new List<string>();
    files.AddRange(Directory.GetFiles(@"E:\Program Files","*.*",SearchOption.AllDirectories));
    files.AddRange(Directory.GetFiles(@"E:\Program Files (x86)","*.*",SearchOption.AllDirectories));
    files.AddRange(Directory.GetFiles(@"E:\Project","*.*",SearchOption.AllDirectories));
    files.AddRange(Directory.GetFiles(@"E:\Cache","*.*",SearchOption.AllDirectories));
    files.AddRange(Directory.GetFiles(@"E:\Windows Kits","*.*",SearchOption.AllDirectories));
    files.AddRange(Directory.GetFiles(@"C:\Program Files\dotnet","*.*",SearchOption.AllDirectories));
    
    Console.WriteLine($"总文件数量:{files.Count}");
    long allFileCount = 0;

    var watch = new Stopwatch();
    watch.Start();
    Parallel.ForEach<string, long>(files,
        localInit: () =>
        {
            // 初始化文件大小为 0,
            // 这里的参数类型取决于任务返回的参数
            return 0;
        },
        body: (fileName, parallelStatus, index, fileCount) =>
        {
            // 计算文件大小并返回
            long count = 0;
            try
            {
                var info = new FileInfo(fileName);
                count = info.Length;
            }
            catch (Exception e)
            {
            }
            
            // 这里由于该任务会被线程池复用,因此要进行累加
            return count + fileCount;
        },
        localFinally: fileCount => { Interlocked.Add(ref allFileCount, fileCount); }
    );
    
    watch.Stop();
    Console.WriteLine($"并行效率:{watch.ElapsedMilliseconds} ms");
    Console.WriteLine($"文件总大小:{allFileCount / 1024 / 1024 / 1024} Gb");


    allFileCount = 0;
    watch.Reset();
    
    watch.Start();
    foreach (var file in files)
    {
        long count = 0;
        
        try
        {
            var info = new FileInfo(file);
            count = info.Length;
        }
        catch (Exception e)
        {
        }
        
        allFileCount+=count;
    }
    
    watch.Stop();
    Console.WriteLine($"单线程效率:{watch.ElapsedMilliseconds} ms");
    Console.WriteLine($"文件总大小:{allFileCount / 1024 / 1024 / 1024} Gb");


    Console.ReadLine();
}

性能提高:

经过 Parallel 的 Foreach 与普通的 foreach 遍历计算,性能整体提高了约 56%,越耗时的操做提高的效果就越明显。

在 Body 的主体委托当中,传入了一个 ParallelLoopState 对象,该对象用于每一个线程与其余任务进行交互。主要有两个方法 Stop()Break(),前者用于中止循环,后者用于跳出循环,而且跳出循环以后,其 LowestBreakIteration 会返回调用过 Break() 方法的最低项。

而且 Parallel 还会返回一个 ParallelLoopResult 对象,该经过该对象咱们能够得知这些循环是否正常完成。

7、并行语言集成查询 PLINQ

LINQ 默认查询的方式是一个线程顺序处理数据集合中的全部项,称之为顺序查询。而 PLINQ 就是将这些操做分散到各个 CPU 并行执行,经过 AsParallel() 扩展方法能够将 IEnumerable<TSource> 转换为 ParallelQuery<TSource>

而从并行操做切换回顺序操做,只须要调用 ParallelEnumableAsSequential() 方法便可。

通过 PLINQ 处理后的数据项其结果是无序的,若是须要有序结果,能够调用 AsOrdered() 方法。可是该方法比较损耗性能,通常不推荐使用,若是须要排序应该直接使用与 LINQ 同名的 PLINQ 扩展方法。

PLINQ 通常会本身分析使用最好的查询方式进行查询,有时候使用顺序查询性能更好。

  • WithCancellation() :容许取消某个 PLINQ 查询。
  • WithExecutionMode():容许配置 PLINQ 执行模式,是否强制并行查询。
  • WithMergeOptions():容许配置结果的缓冲与合并方式。
  • WithDegreeOfParallelism():容许配置查询的最大并行数。

PS:

不建议在多线程环镜中使用 Console.Write() 进行输出,由于 Console 类内部会对线程进行同步,确保只有一个线程能够访问控制台窗口,这样会损害性能。

8、定时计算限制操做

经过 CLR 提供的 Timer 定时器,咱们能够传入一个回调方法。这样的话计时器会能够根据传入的周期,来定时将咱们的回调方法经过线程池线程进行调用。

同时计时器还容许传入一个 dueTime 参数来指定这个计时器首次调用回调方法时须要等待多久(当即执行能够传入 0),而 period 能够指定 Timer 调用回调方法的周期。

【原理】

在线程池内部全部的 Timer 对象只使用了一个线程,当某个 Timer 到期的时候,这个线程就会被唤醒。该线程经过 ThreadPool.QueueUserWorkItem() 方法将一个工做项添加到线程池队列,这样你的回调方法就会获得执行。

【注意】

若是回调方法执行的时常超过了你设置的周期时常,这样会形成多个线程都在执行你的回调。由于 Timer 不知道你的回调执行完成没有,他只会到期执行你的回调方法。

解决措施是构造一个 Timer 的时候,为 period 指定一个 Timeout.Infinite 常量,这样计时器只会触发一次。以后在你的回调方法执行完成以后,在其内部经过 Timer.Change() 方法指定一个执行周期,而且设置其 dueTime 为当即执行。

这样作了以后,你的 Timer 就会确保你的回调被执行完成以后再开始下一个周期。

这一点能够参考 Abp 实现的 AbpTimer 对象。

9、线程池如何管理线程

CLR 容许开发人员设置线程池最大工做者线程数,可是通常不要轻易设置该值,但你能够经过 ThreadPool.GetMaxThreads()ThreadPool.GetMinThreads()GetAvailableThreads() 方法来获取一些相关信息。

经过 ThreadPool.QueueUserWorkItem() 方法和 Timer 类处理的工做项老是存储到 CLR 线程池的 全局队列 中。工做者线程采用一个 FIFO 算法将工做项从 全局队列 取出,由于全部工做者线程都有可能去这个队列拿去工做项,这个时候会使用 线程同步锁 以确保工做项只会被工做者线程处理一次。这可能会形成性能瓶颈,对伸缩性和性能会形成某些限制。

默认的任务调度器中,非工做者线程调度 Task 时都是存放在全局队列,而工做者线程调度 Task 则是存放在他本身的本地队列。

工做者线程处理 Task 的步骤:

  • 首先从本地队列采用 LIFO 算法取得一个 Task 进行处理。
  • 若是本地队列没有 Task,则从其余的工做者线程本地队列拿一个 Task 本身来处理。(会使用线程同步锁)
  • 全部本地队列都为空,则工做者线程会使用 FIFO 算法去全局队列拿一个 Task 进行处理。
  • 若是全局队列为空,则线程处于休眠状态,时间过长则销毁自身。

PS:

结合上下文,说明工做项首先被添加到了全局队列,而后由工做者线程取到本身的本地队列进行处理。

线程池会动态地根据工做项的多少动态地调整工做者线程的数量,通常不须要开发人员进行管控。

相关文章
相关标签/搜索