线程 Z

原文:http://www.albahari.com/threading/part5.aspxhtml

专题:C#中的多线程前端

1并行编程Permalink

在这一部分,咱们讨论 Framework 4.0 加入的多线程 API,它们能够充分利用多核处理器。程序员

这些 API 能够统称为 PFX(Parallel Framework,并行框架)。Parallel类与任务并行构造一块儿被称为 TPL(Task Parallel Library,任务并行库)。web

Framework 4.0 也增长了一些更底层的线程构造,它们针对传统的多线程。咱们以前讲过的:算法

在继续阅读前,你须要了解第 1 部分 - 第 4 部分中的基本原理,特别是线程安全shell

并行编程这一部分提供的全部代码均可以在LINQPad中试验。LINQPad 是一个 C# 代码草稿板,能够用来测试代码段,而无需建立类、项目或解决方案。想要获取这些示例代码,能够在 LINQPad 左下方的 Samples 标签页中点击 Download More Samples,而且选择 C# 4.0 in a Nutshell: More Chapters。(译者注:如今应该是 C# 5.0 in a Nutshell 和 C# 6.0 in a Nutshell 了)数据库

2为什么须要 PFX?Permalink

近年来,CPU 时钟频率发展陷于停滞,制造商已经将重心转移至增长核心数量。这对咱们程序员来讲是个问题,由于标准的单线程代码没法自动利用那些增长的核心来提高程序运行速度。编程

利用多个核心对大多数服务端应用程序来讲很容易,每一个线程能够独立处理单独的客户端请求,但在桌面环境下就不那么容易了,由于一般这须要你优化计算密集型代码,按以下步骤进行:数组

  1. 将工做分解成块。
  2. 多线程并行处理这些工做块。
  3. 以线程安全和高效的方式整理结果。

尽管你能够使用传统的多线程构造,但那比较笨拙,尤为是在分解工做和整理结果的步骤。而且,为确保线程安全,一般的策略是使用,而它在不少线程同时访问一份数据时会致使大量竞争。缓存

PFX 库就是专门被设计用来为这些场景提供帮助的。

利用多核心或多处理器的编程被称为并行编程(parallel programming)。它是多线程这个更宽泛概念的子集。

2.1PFX 概念Permalink

有两种分解工做的策略:数据并行(data parallelism)和任务并行(task parallelism)。

当一系列任务须要处理不少数据时,可让每一个线程都执行这一系列(相同的)任务来处理一部分数据(即全部数据的一个子集)。这样实现的并行化称为数据并行,由于咱们是为线程分解了数据。与此相对,任务并行是指对任务进行分解,换句话说就是让每一个线程执行不一样的任务。

一般,对高度并行的硬件来讲,数据并行更简单,可伸缩性也更好,由于它减小或消除了共享数据(也就减小了竞争和线程安全问题)。而且,事实上通常都是数据比任务要多,因此数据并行能够增长并发的可能。

数据并行也有利于结构化并行(structured parallelism),意思是说并行工做单元的启动和完成是在程序中的同一位置。相对的,任务并行趋向于非结构化,就是说并行工做单元的启动和完成可 能分散在程序各处。结构化并行比较简单,而且不易出错,也让你能够把工做分解和线程协调(甚至包括结果整理)这些复杂的任务交给 PFX 库来完成。

2.2PFX 组件Permalink

PFX 包含两层功能。上层是由结构化数据并行 API:PLINQParallel类组成。下层包含任务并行的类,以及一组额外的构造,来帮助你实现并行编程。

并行编程组件

PLINQ 提供了最丰富的功能:它可以自动化并行的全部步骤,包括分解工做、多线程执行、最后把结果整理成一个输出序列。它被称为声明式(declarative) 的,由于你只是声明但愿并行化你的工做(构造一个 LINQ 查询),而后让 Framework 来处理实现细节。相对的,另外一种方式是指令式(imperative)的,这种方式是须要你显式编写代码来处理工做分解和结果整理。例如使用Parallel类时,你必须本身整理结果,而若是使用任务并行构造,你还必须本身分解工做。

  分解工做 整理结果
PLINQ    
Parallel   -
PFX 的任务并行 - -

并发集合自旋基元可 以帮助你实现低层次的并行编程。这很重要,由于 PFX 不只被设计适用于当今的硬件,也适用于将来更多核心的处理器。若是你但愿搬运一堆木块,而且有 32 个工人,最麻烦的是如何让工人们搬运木块时不互相挡道。这与把算法分解运行在 32 个核心上相似:若是普通的锁被用于保护公共资源,所产生的阻塞可能意味着同时只有一小部分核心真正在工做。并发集合专门针对于高并发访问,致力于最小化或消除阻塞。PLINQ 和 Parallel类就依赖于并发集合和自旋基元来实现高效的工做管理。

PFX 与传统的多线程Permalink

传统多线程的场景是,即便在单核的机器上,使用多线程也有好处,而此时并无真正的并行发生。就像咱们以前讨论过的:保持用户界面的响应以及同时下载两个网页。

这一部分将要讲到的一些构造有时对于传统多线程也有用。特别是:

2.3什么时候使用 PFXPermalink

PFX 主要用于并行编程:充分利用多核处理器来加速执行计算密集型代码。

充分利用多个核心的挑战在于阿姆达尔定律(Amdahl’s law),它指出经过并行化产生的最大性能提高,取决于有多少必须顺序执行的代码段。例如,若是一个算法只有三分之二的执行时间能够并行,即便有无数核心,也没法得到超过三倍的性能提高。

所以,在使用 PFX 前,有必要先检查可并行代码中的瓶颈。还须要考虑下,你的代码是否有必要是计算密集的,优化这里每每是最简单有效的方法。然而,这也须要平衡,由于一些优化技术会使代码难以并行化。

最容易获益的是“很差意思不并行的问题(embarrassingly parallel problems)”:工做能够很容易地被分解为多个任务,每一个任务本身能够高效执行(结构化并行很是适合这种问题)。例如:不少图片处理任务、光线跟踪 算法、数学和密码学方面的暴力计算和破解。而相反的例子是:实现快速排序算法的优化版本,想把它实现得好须要必定思考,而且可能须要非结构化并行。

3PLINQPermalink

PLINQ 会自动并行化本地的 LINQ 查询。其优点在于使用简单,由于将工做分解和结果整理的负担交给了 Framework。

使用 PLINQ 时,只要在输入序列上调用AsParallel(),而后像日常同样继续 LINQ 查询就能够了。下边的查询计算 3 到 100,000 内的素数,这会充分利用目标机器上的全部核心。

// 使用一个简单的(未优化)算法计算素数。 // // 注意:这一部分提供的全部代码均可以在 LINQPad 中试验。 IEnumerable<int> numbers = Enumerable.Range (3, 100000-3); var parallelQuery = from n in numbers.AsParallel() where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0) select n; int[] primes = parallelQuery.ToArray(); 

AsParallelSystem.Linq.ParallelEnumerable中的一个扩展方法。它使用ParallelQuery<TSource>来封装输入,就会将你随后调用的 LINQ 查询操做符绑定在ParallelEnumerable中定义的另一组扩展方法上。它们提供了全部标准查询操做符的并行化实现。本质上,它们就是将输入序列进行分区,造成工做块,并在不一样的线程上执行,以后再将结果整理成一个输出序列:

PLINQ 执行

调用AsSequential()能够拆封ParallelQuery,使随后的查询操做符绑定到标准查询操做符来顺序执行。在调用有反作用或非线程安全的方法前,有必要这样作。

对于那些接受两个输入序列的查询操做符(JoinGroupJoinContactUnionIntersectZip)来讲,必须在这两个输入序列上都使用AsParallel()(不然将抛出异常)。然而,不须要为中间过程的查询使用AsParallel,由于 PLINQ 的查询操做符会输出另外一个ParallelQuery序列。实际上,在这个输出序列上再次调用AsParallel会下降效率,它会强制对序列进行合并和从新分区。

mySequence.AsParallel() // 使用 ParallelQuery<int> 封装序列 .Where (n => n > 100) // 输出另外一个 ParallelQuery<int> .AsParallel() // 不须要,会下降效率! .Select (n => n * n) 

并不是全部的查询操做符均可以被有效地并行化。对于那些不能的,PLINQ 使用了顺序的实现。若是 PLINQ 认为并行化的开销实际会使查询变慢,它也会顺序执行。

PLINQ 仅适用于本地集合:它没法在 LINQ to SQL 或 Entity Framework 中使用,由于在那些场景中,LINQ 会被翻译成 SQL 语句,而后在数据库服务器上执行。然而,你能够使用 PLINQ 对从数据库查询得到的结果执行进一步的本地查询。

若是 PLINQ 查询抛出异常,它会被封装进AggregateException从新抛出,其InnerExceptions属性包含真正的异常。详见使用 AggregateException

为何 AsParallel 不是默认的?Permalink

咱们知道AsParallel能够透明的并行化 LINQ 查询,那么问题来了,“微软为何不直接并行化标准查询操做符,使 PLINQ 成为默认的?”

有不少缘由使其成为这种选择使用(opt-in)的方式。首先,要使 PLINQ 有用,必需要有必定数量的计算密集型任务,它们能够被分配到多个工做线程。大多数 LINQ to Objects 的查询执行很是快,根本不须要并行化,并行化过程当中的任务分区、结果整理以及线程协调反而会使程序变慢。

其次:

  • PLINQ 查询的输出(默认状况下)在元素排序方面不一样于 LINQ 查询
  • PLINQ 将异常封装在AggregateException中(可以处理抛出的多个异常)。
  • 若是查询引用了非线程安全的方法,PLINQ 会给出不可靠的结果。

最后,PLINQ 为了进行微调提供了一些钩子(hook)。把这些累赘加入标准的 LINQ to Objects 的 API 会增长使用障碍。

3.1并行执行的特征Permalink

与普通的 LINQ 查询同样,PLINQ 查询也是延迟估值的。这意味着只有当结果开始被使用时,查询才会被触发执行。一般结果是经过一个foreach循环被使用(经过转换操做符也会触发,例如ToArray,还有返回单个元素或值的操做符)。

当枚举结果时,执行过程与普通的顺序查询略有不一样。顺序查询彻底由使用方经过“拉”的方式驱动:每一个元素都在使用方须要时从输入序列中被提取。并行 查询一般使用独立的线程从输入序列中提取元素,这可能比使用方的须要稍微提早了一些(很像一个给播报员使用的提词机,或者 CD 机中的防震缓冲区)。而后经过查询链并行处理这些元素,将结果保存在一个小缓冲区中,以准备在须要的时候提供给使用方。若是使用方在枚举过程当中暂停或中 断,查询也会暂停或中止,这样能够不浪费 CPU 时间或内存。

你能够经过在AsParallel以后调用WithMergeOptions来调整 PLINQ 的缓冲行为。默认值AutoBuffered一般能产生最佳的总体效果;NotBuffered禁用缓冲,若是你但愿尽快看到结果能够使用这个;FullyBuffered在呈现给使用方前缓存整个查询的输出(OrderByReverse操做符天生以这种方式工做,取元素、聚合和转换操做符也是同样)。

3.2PLINQ 与排序Permalink

并行化查询操做符的一个反作用是:当整理结果时,不必定能与它们提交时的顺序保持一致,就如同以前图中所示的那样。换句话说,就是没法像普通的 LINQ 那样能保证序列的正常顺序。

若是你须要保持序列顺序,能够经过在AsParallel后调用AsOrdered()来强制它保证:

myCollection.AsParallel().AsOrdered()... 

在大量元素的状况下调用AsOrdered会形成必定性能损失,由于 PLINQ 必须跟踪每一个元素原始位置。

以后你能够经过调用AsUnordered来取消AsOrdered的效果:这会引入一个“随机洗牌点(random shuffle point)”,容许查询从这个点开始更高效的执行。所以,若是你但愿仅为前两个查询操做保持输入序列的顺序,能够这样作:

inputSequence.AsParallel().AsOrdered() .QueryOperator1() .QueryOperator2() .AsUnordered() // 从这开始顺序可有可无 .QueryOperator3() // ... 

AsOrdered不是默认的,由于对于大多数查询来讲,原始的输入顺序可有可无。换句话说,若是AsOrdered是默认的,你就不得不为大多数并行查询使用AsUnordered来得到最好的性能,这会成为负担。

3.3PLINQ 的限制Permalink

目前,PLINQ 在可以并行化的操做上有些实用性限制。这些限制可能会在以后的更新包或 Framework 版本中解决。

下列查询操做符会阻止查询的并行化,除非源元素是在它们原始的索引位置:

  • TakeTakeWhileSkipSkipWhile
  • SelectSelectManyElementAt这几个操做符的带索引版本

大多数查询操做符都会改变元素的索引位置(包括可能移除元素的那些操做符,例如Where)。这意味着若是你但愿使用上述操做符,就要在查询开始的地方使用。

下列查询操做符能够并行化,但会使用代价高昂的分区策略,有时可能比顺序执行还慢。

  • JoinGroupByGroupJoinDistinctUnionIntersectExcept

Aggregate操做符的带种子(seed)的重载是不能并行化的,PLINQ 提供了专门的重载来解决。

其它全部操做符都是能够并行化的,然而使用这些操做符并不能确保你的查询会被并行化。若是 PLINQ 认为进行分区的开销会致使部分查询变慢,它也许会顺序执行查询。你能够覆盖这个行为,方法是在AsParallel()以后调用以下代码来强制并行化:

.WithExecutionMode (ParallelExecutionMode.ForceParallelism) 

3.4例:并行拼写检查Permalink

假设咱们但愿实现一个拼写检查程序,它在处理大文档时,可以经过充分利用全部可用的核心来快速运行。咱们把算法设计成一个 LINQ 查询,这样就能够很容易的并行化它。

第一步是下载英文单词字典,为了可以高效查找,将其放在一个HashSet中:

if (!File.Exists ("WordLookup.txt")) // 包含约 150,000 个单词 new WebClient().DownloadFile ( "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt"); var wordLookup = new HashSet<string> ( File.ReadAllLines ("WordLookup.txt"), StringComparer.InvariantCultureIgnoreCase); 

而后,使用wordLookup来建立一个测试“文档”,该“文档”是个包含了一百万个随机单词的数组。建立完数组后,引入两个拼写错误:

var random = new Random(); string[] wordList = wordLookup.ToArray(); string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray(); wordsToTest [12345] = "woozsh"; // 引入两个 wordsToTest [23456] = "wubsie"; // 拼写错误 

如今,经过对比wordLookup检查wordsToTest,来完成这个并行的拼写检查程序。PLINQ 让这变得很简单:

var query = wordsToTest .AsParallel() .Select ((word, index) => new IndexedWord { Word=word, Index=index }) .Where (iword => !wordLookup.Contains (iword.Word)) .OrderBy (iword => iword.Index); query.Dump(); // 在 LINQPad 中显示输出 

下边是 LINQPad 中的显示的输出:

LINQPad的查询输出

IndexedWord是一个自定义的结构体,定义以下:

struct IndexedWord { public string Word; public int Index; } 

断定器中的wordLookup.Contains方法做为查询的主要部分,它使得这个查询值得并行化。

咱们能够使用匿名类型来代替IndexedWord结构体,从而稍微简化下这个查询。然而这会下降性能,由于匿名类型(是类,所以是引用类型)会产生分配堆内存的开销,以及以后的垃圾回收。

这个区别对于顺序查询来讲没太大关系,但对于并行查询来讲,基于栈的内存分配则至关有利。这是由于基于栈的内存分配是能够高度并行化的(由于每一个线程有其本身的栈),反之基于堆的内存分配会使全部线程竞争同一个堆,它是由单一的内存管理器和垃圾回收器管理的。

使用 ThreadLocal<T>Permalink

来扩展一下咱们的例子,让建立随机测试单词列表的过程并行化。咱们把它做为 LINQ 查询来构造,这样事情就简单多了。如下是顺序执行版本:

string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray(); 

不幸的是,对Random.Next的调用不是线程安全的,因此实现并行化不是向查询语句直接插入AsParallel()这么简单。一个可能的解决办法是写个方法对random.Next加锁,然而这会限制并发能力。更好的处理办法是使用ThreadLocal<Random>为每一个线程建立独立的Random对象。而后咱们能够使用以下代码来并行化查询:

var localRandom = new ThreadLocal<Random> ( () => new Random (Guid.NewGuid().GetHashCode()) ); string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel() .Select (i => wordList [localRandom.Value.Next (0, wordList.Length)]) .ToArray(); 

在实例化Random对象的工厂方法中,咱们传递了一个Guid的散列值,用来确保:若是两个Random对象在很短的时间范围内被建立,它们能够生成不一样的随机数序列。

什么时候使用 PLINQPermalink

在你的程序中寻找 LINQ 查询,尝试并行化它们貌似是很诱人的。然而这一般没什么用,由于绝大多数明显应该使用 LINQ 的地方执行都很快,因此并行化并无什么好处。更好的方法是找到 CPU 密集型工做的瓶颈,而后考虑“这能写成 LINQ 查询吗?”(这样重构的一个好处是 LINQ 一般能够使代码变得更短,而且更具可读性。)

PLINQ 很是适合于“很差意思不并行的问题(embarrassingly parallel problems)”。它也能很好的应用于结构化阻塞任务(structured blocking tasks),例如同时调用多个 web 服务(见调用阻塞或 I/O 密集型功能)。

对于图像处理来讲 PLINQ 是个糟糕的选择,由于整理几百万个像素到输出序列将造成瓶颈。更好的方法是把像素直接写入数组或非托管的内存块,而后使用Parallel类或任务并行来管理多线程。(也能够使用ForAll来绕过结果整理。若是该图像处理算法天生适合 LINQ,这么作可能有益。)

3.5纯方法Permalink

(译者注:pure function 译为纯方法,是指一个方法 / 函数不能改变任何状态,也不能进行任何 I/O 操做,它的返回值不能依赖任何可能被改变的状态,而且使用相同的输入调用就会产生相同的输出。)

由于 PLINQ 会在并行的线程上运行查询,所以必须注意不要执行非线程安全的操做。特别须要注意,对变量进行写操做有反作用(side-effecting),是非线程安全的。

// 下列查询将每一个元素与其索引相乘。 // 给定一个 0 到 999 的输入序列, 它应该输出元素的平方。 int i = 0; var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++; 

能够经过使用锁或Interlocked来确保i的自增是线程安全的,可是问题仍然存在,i并不能保证对应输入元素的原始索引。而且加上AsOrdered也没法解决这个问题,由于AsOrdered仅仅确保输出是按顺序的,就像顺序执行的输出顺序同样。但这并不意味着实际的处理过程也是按顺序的。

替代方法是将这个查询重写,使用带索引的Select版本。

var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i); 

为了达到最佳性能,任何被查询操做符调用的方法必须是线程安全的:不要给字段或属性赋值(无反作用,纯方法)。若是用来保证线程安全,查询的并行能力将会受到限制。这个限制能够经过锁定的持续时间除以花费在方法上的总时间来计算。

3.6调用阻塞或 I/O 密集型功能Permalink

有时一个查询的长时间运行并非由于是 CPU 密集型操做,而是由于它在等待某些东西,例如等待网页下载或是硬件的响应。PLINQ 可以有效地并行化这种类型的查询,能够经过在AsParallel后调用WithDegreeOfParallelism来提示这种特征。例如,假设咱们但愿同时 ping 6 个网站。比起使用异步委托或手动让 6 个线程自旋,使用 PLINQ 查询能够轻松实现它:

from site in new[] { "www.albahari.com", "www.linqpad.net", "www.oreilly.com", "www.takeonit.com", "stackoverflow.com", "www.rebeccarey.com" } .AsParallel().WithDegreeOfParallelism(6) let p = new Ping().Send (site) select new { site, Result = p.Status, Time = p.RoundtripTime } 

WithDegreeOfParallelism强制 PLINQ 同时运行指定数量的任务。在调用阻塞方法(例如Ping.Send)时有必要这么作,不然的话,PLINQ 会认为这个查询是 CPU 密集型的,并进行相应的任务分配。在双核机器上,PLINQ 会默认同时运行 2 个任务,对于上述状况来讲这显然不是咱们但愿看到的。

线程池的影响,PLINQ 一般为每一个任务分配一个线程。能够经过调用ThreadPool.SetMinThreads来加速初始线程的建立速度。

再给一个例子:假设咱们要实现一个监控系统,但愿它不断未来自 4 个安全摄像头的图像合并成一个图像,并在闭路电视上显示。使用下边的类来表示一个摄像头:

class Camera { public readonly int CameraID; public Camera (int cameraID) { CameraID = cameraID; } // 获取来自摄像头的图像: 返回一个字符串来代替图像 public string GetNextFrame() { Thread.Sleep (123); // 模拟获取图像的时间 return "Frame from camera " + CameraID; } } 

要获取一个合成图像,咱们必须分别在 4 个摄像头对象上调用GetNextFrame。假设操做主要是受 I/O 影响的,经过并行化咱们能将帧率提高 4 倍,即便是在单核机器上。PLINQ 使用一小段程序就能实现它:

Camera[] cameras = Enumerable.Range (0, 4) // 建立 4 个摄像头对象 .Select (i => new Camera (i)) .ToArray(); while (true) { string[] data = cameras .AsParallel().AsOrdered().WithDegreeOfParallelism (4) .Select (c => c.GetNextFrame()).ToArray(); Console.WriteLine (string.Join (", ", data)); // 显示数据... } 

GetNextFrame是一个阻塞方法,因此咱们使用了WithDegreeOfParallelism来得到指望的并发度。在咱们的例子中,阻塞是在调用Sleep时发生。而在真实状况下,阻塞的发生是由于从摄像头中获取图像是 I/O 密集型操做,而不是 CPU 密集型操做。

调用AsOrdered能够确保图像按照一致的顺序显示。由于序列中只有 4 个元素,因此它对性能的影响能够忽略不计。

改变并发度Permalink

在一个 PLINQ 查询内,仅可以调用WithDegreeOfParallelism一次。若是你须要再次调用它,必须在查询中经过再次调用AsParallel()强制进行查询的合并和从新分区:

"The Quick Brown Fox" .AsParallel().WithDegreeOfParallelism (2) .Where (c => !char.IsWhiteSpace (c)) .AsParallel().WithDegreeOfParallelism (3) // 强制合并和从新分区 .Select (c => char.ToUpper (c)) 

3.7取消Permalink

当在foreach循环中使用 PLINQ 查询的结果时,取消该查询很简单:使用break退出循环就能够了。查询会被自动取消,由于枚举器会被隐式销毁。

对于结束一个使用转换、取元素或聚合操做符的查询来讲,你能够在其它线程使用取消标记来取消它。在AsParallel后调用WithCancellation来添加一个标记,并把CancellationTokenSource对象的Token属性做为参数传递。以后另外一个线程就能够在这个CancellationTokenSource对象上调用Cancel,它会在查询的使用方那边抛出OperationCanceledException异常。

IEnumerable<int> million = Enumerable.Range (3, 1000000); var cancelSource = new CancellationTokenSource(); var primeNumberQuery = from n in million.AsParallel().WithCancellation (cancelSource.Token) where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0) select n; new Thread (() => { Thread.Sleep (100); // 在 100 毫秒后 cancelSource.Cancel(); // 取消查询 } ).Start(); try { // 开始运行查询 int[] primes = primeNumberQuery.ToArray(); // 永远到不了这里,由于其它线程会进行取消操做。 } catch (OperationCanceledException) { Console.WriteLine ("Query canceled"); } 

PLINQ 不会直接停止线程,由于这么作是危险的。在取消时,它会等待全部工做线程处理完当前的元素,而后结束查询。这意味着查询调用的任何外部方法都会执行完成。

3.8优化 PLINQPermalink

输出端优化Permalink

PLINQ 的一个优势是它可以很容易地将并行化任务的结果整理成一个输出序列。然而有时,最终要作的是在输出序列的每一个元素上运行一些方法:

foreach (int n in parallelQuery) DoSomething (n); 

若是是上述状况,而且不关心元素的处理顺序,那么能够使用 PLINQ 的ForAll方法来提升效率。

ForAll方法在ParallelQuery的每一个输出元素上运行一个委托。它直接挂钩(hook)到 PLINQ 内部,绕过整理和枚举结果的步骤。举个栗子:

"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write); 

ForAll 方法

整理和枚举结果的开销不是很是大,因此当有大量输入元素且处理执行很快的时候,才能最大化ForAll优化的收益。

输入端优化Permalink

PLINQ 有 3 种分区策略,用来分配输入元素到线程:

策略 元素分配 相对性能
块分区(Chunk partitioning) 动态 平均
范围分区(Range partitioning) 静态 差 - 极好
散列分区(Hash partitioning) 静态

对于那些须要比较元素的查询操做符(GroupByJoinGroupJoinIntersectExceptUnionDistinct),PLINQ 老是使用散列分区。散列分区相对低效,由于它必须预先计算每一个元素的散列值(拥有一样散列值的元素会在同一个线程中被处理)。若是发现运行太慢,惟一的选择是调用AsSequential来禁止并行处理。

对于其它全部查询操做符,你能够选择使用范围分区或块分区。默认状况下:

  • 若是输入序列能够经过索引访问(数组或是IList<T>的实现),PLINQ 选用范围分区。
  • 不然,PLINQ 选用块分区。

归纳来说,对于较长的序列且处理每一个元素所需的 CPU 时间比较近似时,范围分区更快。不然,块分区一般更快。

若是想强制使用范围分区:

  • 若是查询以Enumerable.Range开始,将其替换为ParallelEnumerable.Range
  • 不然,在输入序列上调用ToListToArray(显然,你须要考虑在这里产生的性能开销)。

ParallelEnumerable.Range并非对Enumerable.Range(…).AsParallel()的简单封装。它经过激活范围分区改变了查询的性能。

若是想强制使用块分区,就经过调用Partitioner.Create(在命名空间System.Collection.Concurrent中)来封装输入序列,例如:

int[] numbers = { 3, 4, 5, 6, 7, 8, 9 }; var parallelQuery = Partitioner.Create (numbers, true).AsParallel() .Where (...) 

Partitioner.Create的第二个参数表示:但愿对查询开启负载均衡(load-balance),这是另外一个使用块分区的动机。

块分区的工做方式是按期从输入序列中抓取小块元素来处理。PLINQ 一开始会分配很是小的块(一次 1 到 2 个元素),而后随着查询的进行增长块的大小:这确保小序列可以被有效地并行化,而大序列不会致使过多的抓取工做。若是一个工做线程碰巧拿到了一些“容易” 的元素(处理很快),它最终将拿到更多的块。这个系统使每一个线程保持均等的繁忙程度(使核心负载均衡)。惟一的不利因素是从共享的输入序列中获取元素须要 同步(一般使用一个排它锁),这会产生必定的开销和竞争。

分区策略

范围分区会绕过正常的输入端枚举,而且为每一个工做线程预分配相同数量的元素,避免了在输入序列上的竞争。可是若是某些线程拿到了容易的元素并很早就 完成了处理,在其它工做线程仍在继续工做的时候它就会是空闲的。咱们以前的素数计算的例子在使用范围分区时就性能不高。举个范围分区适用的例子,计算 1000 万之内数字的平方和:

ParallelEnumerable.Range (1, 10000000).Sum (i => Math.Sqrt (i)) 

ParallelEnumerable.Range返回一个ParallelQuery<T>,所以不须要在以后调用AsParallel

范围分区不是必须把元素分红相邻的块,它也许会选用一种 “条纹式(striping)”策略。例如,有两个工做线程,一个工做线程可能会处理奇数位置的元素,而另外一个工做线程处理偶数位置的元素。TakeWhile操做符几乎必定会触发条纹式策略,用来避免处理序列后边没必要要的元素。

3.9并行化自定义聚合Permalink

PLINQ 能够在无需额外干预的状况下有效地并行化SumAverageMinMax操做符。然而,Aggregate操做符对于 PLINQ 来讲是个特殊的麻烦。

若是不熟悉Aggregate操做符,你能够认为它就是一个SumAverageMinMax的泛化版本,换句话说,就是一个能够使你经过自定义的聚合算法实现非一般聚合操做的操做符。以下代码展示了Aggregate如何实现Sum操做符的工做:

int[] numbers = { 1, 2, 3 }; int sum = numbers.Aggregate (0, (total, n) => total + n); // 6 

Aggregate的第一个参数是 seed(种子,初值),聚合操做从这里开始。第二个参数是一个用于更新聚合值的表达式,该表达式生成一个新的元素。第三个参数是可选的,用来表示如何经过聚合值生成最终的结果值。

大多数Aggregate被设计用来解决的问题都可以使用foreach循环轻松解决,而且这也是更熟悉的语法。而Aggregate的优势在于对庞大或复杂的聚合操做能够使用 PLINQ 来进行声明式的并行化。

无种子的聚合Permalink

调用Aggregate时能够省略种子值,这种状况下第一个元素会被隐式看成种子,以后聚合处理会从第二个元素开始进行。下边是一个无种子的例子:

int[] numbers = { 1, 2, 3 }; int sum = numbers.Aggregate ((total, n) => total + n); // 6 

这获得了与以前相同的结果,然而实际上倒是进行了不一样的计算。以前例子计算的是 0+1+2+3,而如今计算的是1+2+3。经过乘法运算来代替加法运算可以更好地说明这个不一样:

int[] numbers = { 1, 2, 3 }; int x = numbers.Aggregate (0, (prod, n) => prod * n); // 0*1*2*3 = 0 int y = numbers.Aggregate ( (prod, n) => prod * n); // 1*2*3 = 6 

如同咱们立刻将要看到的,无种子的聚合的优势在于被并行化时不须要使用特殊的重载。然而,无种子的聚合存在一个陷阱:无种子的聚合方法指望使用的委 托中的计算应知足交换律和结合律。若是用在别的状况下,结果要否则是反直觉的(普通查询),要否则是不肯定的(PLINQ 并行化查询)。例如考虑以下函数:

(total, n) => total + n * n 

它既不知足交换律也不知足结合律。(例如:1+2*2 != 2+1*1)。咱们来看一下使用它来对数字 二、三、4 计算平方和时会发生什么:

int[] numbers = { 2, 3, 4 }; int sum = numbers.Aggregate ((total, n) => total + n * n); // 27 

原本的计算应该是:

2*2 + 3*3 + 4*4 // 29 

但如今的计算是:

2 + 3*3 + 4*4 // 27 

能够经过多种方法解决这个问题。首先,咱们能够在序列最前端加入 0 做为第一个元素:

int[] numbers = { 0, 2, 3, 4 }; 

这不只不优雅,并且在并行执行的状况下仍然会产生错误的结果,由于 PLINQ 会选择多个元素做为种子,这至关于假定了计算知足结合律。为说明这个问题,用以下方式表示咱们的聚合函数:

f(total, n) => total + n * n 

LINQ to Objects 会这样计算:

f(f(f(0, 2),3),4) 

PLINQ 可能会这样计算:

f(f(0,2),f(3,4)) 

结果是:

第一个分区:   a = 0 + 2*2  (= 4)
第二个分区:   b = 3 + 4*4  (= 19)
最终结果:     a + b*b  (= 365)
甚至多是:    b + a*a  (= 35)

有两种好的解决方案:第一种是将其转换为有种子的聚合,使用 0 做为种子。这种方案带来的复杂度的提高仅仅是使用 PLINQ 时,咱们须要使用特殊的重载,确保查询并行执行(立刻会看到)。

第二种解决方案是:重构查询,使聚合函数知足交换律和结合律:

int sum = numbers.Select (n => n * n).Aggregate ((total, n) => total + n); 

固然,在这种简单的场景下你能够(而且应该)使用Sum操做符来代替Aggregate

int sum = numbers.Sum (n => n * n); 

实际上能够更进一步使用SumAverage。例如,能够使用Average来计算均方根(root-mean-square):

Math.Sqrt (numbers.Average (n => n * n)) 

甚至是标准差:

double mean = numbers.Average(); double sdev = Math.Sqrt (numbers.Average (n => { double dif = n - mean; return dif * dif; })); 

上述两个方法都是安全、高效而且可彻底并行化的。

并行化聚合Permalink

咱们刚刚看到了无种子的聚合,提供的委托必须知足交换律和结合律。若是违反这个规则,PLINQ 会给出错误的结果,由于它可能使用输入序列中多个的元素做为种子,来同时聚合多个分区。

指定种子的聚合也许看起来像是使用 PLINQ 的安全选择,然而不幸的是,这样一般会致使顺序执行,由于它依赖于单独一个种子。为减缓这个问题,PLINQ 提供了另外一个Aggregate的重载,容许你指定多个种子,或者是一个种子工厂方法。对每一个线程,它执行这个方法来生成一个独立的种子,这就造成了一个线程局部的累加器,经过它在聚合局部元素。

你必须再提供一个方法来指示如何合并局部累加器至主累加器。最后,Aggregate的这个重载还须要一个委托,用来对结果进行任意的最终变换(有些不必,你能够以后对结果运行一些代码完成一样操做)。因此,这里有 4 个委托,按照它们被传递的顺序:

  • 种子工厂(seedFactory):
    返回一个新的局部累加器
  • 更新累加器方法(updateAccumulatorFunc):
    聚合元素至局部累加器
  • 合并累加器方法(combineAccumulatorFunc):
    合并局部累加器至主累加器
  • 结果选择器(resultSelector):
    在结果上应用任意最终变换

在简单的场景中,你能够指定一个种子值来代替种子工厂。当种子是你须要改变的引用类型时这种策略行不通,由于同一个实例将在线程间共享。

提供一个简单的例子,下边的代码对numbers数组中的值进行求和:

numbers.AsParallel().Aggregate ( () => 0, // 种子工厂 (localTotal, n) => localTotal + n, // 更新累加器方法 (mainTot, localTot) => mainTot + localTot, // 合并累加器方法 finalResult => finalResult) // 结果选择器 

这个例子有些刻意,咱们能够使用更简单的方式获取相同的结果(例如无种子的聚合,或者更好的选择是使用Sum操做符)。给一个更加实际的例子,假设咱们要计算字符串中每一个英文字母的出现频率。简单的顺序执行方案看起来是这样:

string text = "Let’s suppose this is a really long string"; var letterFrequencies = new int[26]; foreach (char c in text) { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) letterFrequencies [index]++; }; 

基因序列是一个输入文本可能会很是长的例子,它的“字母表”是由字母 acgt 组成。

为了将它并行化,咱们能够把foreach替换为Parallel.ForEach(在接下来的一节会讲到),但这会致使共享数组上的并发问题。对数组的访问加能够解决问题,但会下降并发的可能性。

Aggregate提供了一个好的解决方案。这种状况下,累加器是一个数组,就像是以前例子中letterFrequencies数组。使用Aggregate的顺序执行版本以下:

int[] result = text.Aggregate ( new int[26], // 建立“累加器” (letterFrequencies, c) => // 聚合一个字母至累加器 { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) letterFrequencies [index]++; return letterFrequencies; }); 

下面是并行版本,它使用 PLINQ 的专门重载:

int[] result = text.AsParallel().Aggregate ( () => new int[26], // 新建局部累加器 (localFrequencies, c) => // 聚合至局部累加器 { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) localFrequencies [index]++; return localFrequencies; }, // 聚合局部累加器至主累加器 (mainFreq, localFreq) => mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(), finalResult => finalResult // 对结果进行 ); // 最终变换 

注意:局部累加方法会改动localFrequencies数组。这个优化是很是重要的,也是合法的,由于localFrequencies是每一个线程的局部变量。

4Parallel 类Permalink

PFX 经过Parallel类上的三个静态方法提供告终构化并行的基本形式:

Parallel.Invoke
并行执行一组委托
Parallel.For
C# for循环的并行版本
Parallel.ForEach
C# foreach循环的并行版本

三个方法都是在工做完成前会阻塞。相似于PLINQ,若是有未处理的异常,其它工做线程会在当前迭代完成以后中止,异常会被封装在AggregateException中抛给调用方。

4.1Parallel.InvokePermalink

Parallel.Invoke并行执行一组Action类型的委托,而后等待它们完成。这个方法最简单的版本以下:

public static void Invoke (params Action[] actions); 

下面是使用Parallel.Invoke来同时下载两个网页:

Parallel.Invoke ( () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"), () => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html")); 

这表面上看起来像是建立了两个Task对象(或异步委托)并等待它们。可是有个重要的区别:Parallel.Invoke在你传递一百万个委托时仍然能高效工做。这是由于它会对大量元素进行分区(partition),造成多个块,再对其分配底层的Task。而不是直接对每个委托建立独立的Task

使用Parallel上的全部方法时,都须要自行实现整理结果的代码。这意味着你须要注意线程安全。例如,下面的代码不是线程安全的:

var data = new List<string>(); Parallel.Invoke ( () => data.Add (new WebClient().DownloadString ("http://www.foo.com")), () => data.Add (new WebClient().DownloadString ("http://www.far.com"))); 

对添加的过程加能够解决问题,可是若是你的委托数量更多,它们每个执行的又很快,那么锁可能形成瓶颈。更好的解决方案是使用线程安全的集合,好比ConcurrentBag就是这里的理想方案。

Parallel.Invoke也有接受ParallelOptions对象的重载:

public static void Invoke (ParallelOptions options, params Action[] actions); 

经过ParallelOptions,你能够添加取消标记、限制最大并发数量和指定自定义任务调度器。若是要执行的委托数量(大体上)大于核心数,那么使用取消标记才有意义:在取消时,全部未启动的委托都会被抛弃。而全部已经在执行的委托会继续完成。对于如何使用取消标记,能够参考取消中的例子。

4.2Parallel.For 和 Parallel.ForEachPermalink

Parallel.ForParallel.ForEach与 C# forforeach相似,但会并行执行,而不是顺序执行。下面是它们(最简单的)方法签名:

public static ParallelLoopResult For ( int fromInclusive, int toExclusive, Action<int> body) public static ParallelLoopResult ForEach<TSource> ( IEnumerable<TSource> source, Action<TSource> body) 

对于下面的for循环:

for (int i = 0; i < 100; i++) Foo (i); 

并行版本是这样:

Parallel.For (0, 100, i => Foo (i)); 

或更简洁的:

Parallel.For (0, 100, Foo); 

而对于下面的foreach循环:

foreach (char c in "Hello, world") Foo (c); 

并行版本是这样:

Parallel.ForEach ("Hello, world", Foo); 

给一个实际点的例子。引入System.Security.Cryptography命名空间,而后咱们能够像这样并行生成六组密钥对的字符串形式:

var keyPairs = new string[6]; Parallel.For (0, keyPairs.Length, i => keyPairs[i] = RSA.Create().ToXmlString (true)); 

Parallel.Invoke一样,咱们也可让Parallel.ForParallel.ForEach执行大量工做项,它们也会被分区,分配给任务高效执行。

上面的例子也能够使用PLINQ来实现:

string[] keyPairs = ParallelEnumerable.Range (0, 6) .Select (i => RSA.Create().ToXmlString (true)) .ToArray(); 

外循环 vs 内循环Permalink

Parallel.ForParallel.ForEach一般更适合用于外循环,而不是内循环。这是由于前者会带来更大的分区块,就稀释了管理并行的开销。通常没有必要同时并行内外循环。对于下面的例子,咱们须要 100 个核心才能让内循环的并行有益处:

Parallel.For (0, 100, i => { Parallel.For (0, 50, j => Foo (i, j)); // 对于内循环, }); // 顺序执行更好。 

带索引的 Parallel.ForEachPermalink

有时须要获知循环迭代的索引。在顺序的foreach中这很简单:

int i = 0; foreach (char c in "Hello, world") Console.WriteLine (c.ToString() + i++); 

然而在并行环境中,让共享变量自增并非线程安全的。你必须使用下面这个ForEach版本:

public static ParallelLoopResult ForEach<TSource> ( IEnumerable<TSource> source, Action<TSource,ParallelLoopState,long> body) 

先忽略ParallelLoopState(下一节会讲)。如今咱们关注的是Action的第三个long类型的参数,它表明了循环的索引:

Parallel.ForEach ("Hello, world", (c, state, i) => { Console.WriteLine (c.ToString() + i); }); 

为了把它用到实际场景中,咱们来回顾下使用 PLINQ 的拼写检查。下面的代码加载了一个字典,并生成了一个用来测试的数组,有一百万个测试项:

if (!File.Exists ("WordLookup.txt")) // 包含约 150,000 个单词 new WebClient().DownloadFile ( "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt"); var wordLookup = new HashSet<string> ( File.ReadAllLines ("WordLookup.txt"), StringComparer.InvariantCultureIgnoreCase); var random = new Random(); string[] wordList = wordLookup.ToArray(); string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray(); wordsToTest [12345] = "woozsh"; // 引入两个 wordsToTest [23456] = "wubsie"; // 拼写错误 

咱们能够使用带索引的Parallel.ForEach来对wordsToTest数组进行拼写检查,以下:

var misspellings = new ConcurrentBag<Tuple<int,string>>(); Parallel.ForEach (wordsToTest, (word, state, i) => { if (!wordLookup.Contains (word)) misspellings.Add (Tuple.Create ((int) i, word)); }); 

注意,必须使用线程安全的集合来整理结果:这一点是相对于使用 PLINQ 的劣势。而优点是咱们能够避免使用带索引的Select查询操做符,它没有带索引的ForEach高效。

ParallelLoopState:提早退出循环Permalink

由于对于并行的ForForEach循环,循环体是一个委托,因此就没法使用break语句来提早退出循环。在这里,你必须使用ParallelLoopState对象上的BreakStop

public class ParallelLoopState { public void Break(); public void Stop(); public bool IsExceptional { get; } public bool IsStopped { get; } public long? LowestBreakIteration { get; } public bool ShouldExitCurrentIteration { get; } } 

获取ParallelLoopState很容易:全部版本的ForForEach都有重载能够接受Action<TSource,ParallelLoopState>类型的循环体。因此,若是要并行化:

foreach (char c in "Hello, world") if (c == ',') break; else Console.Write (c); 

能够使用:

Parallel.ForEach ("Hello, world", (c, loopState) => { if (c == ',') loopState.Break(); else Console.Write (c); }); 

输出:

Hlloe

从结果中能够发现,循环体会以随机顺序完成。除这点不一样之外,调用Break会给出与顺序循环至少相同数量的元素:在上例中老是以必定顺序至少输出 Hello 这几个字母。而若是改成调用Stop,会强制全部线程在当前迭代完成后当即结束。在上例中,若是有些线程滞后了,调用Stop可能给出 Hello 的子集。当发现已经找到了须要的东西时,或是发现出错了不想看结果的状况下,Stop比较适用。

Parallel.ForParallel.ForEach方法都返回一个ParallelLoopResult对象,它暴露了IsCompletedLowestBreakIteration属性。它们能够告知循环是否完成,若是没有完成,是在哪一个迭代中断的。

若是LowestBreakIteration返回null,意味着在循环中调用了Stop(而不是Break)。

若是你的循环体很长,可能会但愿其它线程可以在执行中途中断循环体,来让使用BreakStop时更快的退出。实现方法是,在代码中多个地方查询ShouldExitCurrentIteration属性,它会在调用Stop后当即为true,或者是在Break后很快为true

ShouldExitCurrentIteration在请求取消或者循环中有异常抛出时也会为true

IsExceptional属性能够告知其它线程上是否有异常产生。任何未处理的异常都会致使循环在全部线程完成当前迭代后结束:若是想要避免,必须在代码中显式处理异常。

使用局部值进行优化Permalink

Parallel.ForParallel.ForEach都提供了拥有TLocal泛型变量的重载。这是为了协助你优化密集迭代的循环中的数据整理工做。最简单的形式以下:

public static ParallelLoopResult For <TLocal> ( int fromInclusive, int toExclusive, Func <TLocal> localInit, Func <int, ParallelLoopState, TLocal, TLocal> body, Action <TLocal> localFinally); 

这些方法在实际中不多用到,由于它们的目标场景基本都被PLINQ覆盖了(好开森,由于这些重载真可怕!)。

本质上,问题在于:假设咱们要计算从 1 到 10,000,000 的平方根的和。并行计算一千万个平方根很容易,可是求和是个问题,由于必须像这样加才能更新和值:

object locker = new object(); double total = 0; Parallel.For (1, 10000000, i => { lock (locker) total += Math.Sqrt (i); }); 

并行化的收益都被获取一千万个锁的开销抵消了,还不算致使的阻塞。

然而,实际上并不须要一千万个锁。想象一队志愿者捡一大堆垃圾的场景,若是你们都共享单独一个垃圾桶,那冲突就会使整个过程极端低效。明显的方案是每一个人都有本身“局部”的垃圾桶,偶尔去一趟主垃圾桶倾倒干净。

ForForEachTLocal版本就是这样工做的。志愿者就是内部的工做线程,局部值(local value)就是局部垃圾桶。想要让Parallel以这种方式工做,那么必须提供两个额外的委托:

  1. 如何初始化新的局部值
  2. 如何将局部的聚合值合并到主值

另外,循环体委托如今不能返回void,而是应该返回局部值新的聚合结果。下面是重构后的例子:

object locker = new object(); double grandTotal = 0; Parallel.For (1, 10000000, () => 0.0, // 初始化局部值 (i, state, localTotal) => // 循环体委托。注意如今 localTotal + Math.Sqrt (i), // 返回新的局部值 localTotal => // 把局部值 { lock (locker) grandTotal += localTotal; } // 加入主值 ); 

咱们仍是须要锁,可是只须要锁定将局部和加入总和的过程。这让处理效率有了极大的提高。

前面说过,PLINQ 通常更适合这些场景。咱们的例子若是使用 PLINQ 来并行会很简单:

ParallelEnumerable.Range(1, 10000000) .Sum (i => Math.Sqrt (i)) 

(注意咱们使用了ParallelEnumerable来强制范围分区:在这里能够提升性能,由于对全部数字的计算都是相等时间的。)

更复杂的场景中,你可能会用到 LINQ 的Aggregate操做符而不是Sum。若是指定了局部种子工厂,那状况就和使用局部值的Parallel.For差很少了。

5任务并行Permalink

任务并行(task parallelism)是 PFX 中最底层的并行方式。这一层次的类定义在System.Threading.Tasks命名空间中,以下所示:

做用
Task 管理工做单元
Task<TResult> 管理有返回值的工做单元
TaskFactory 建立任务
TaskFactory<TResult> 建立有相同返回类型的任务和任务延续
TaskScheduler 管理任务调度
TaskCompletionSource 手动控制任务的工做流

本质上,任务是用来管理可并行工做单元的轻量级对象。任务使用 CLR 的线程池来避免启动独立线程的开销:它和ThreadPool.QueueUserWorkItem使用的是同一个线程池,在 CLR 4.0 中这个线程池被调节过,让Task工做的更有效率(通常来讲)。

须要并行执行代码时均可以使用任务。然而,它们是为了充分利用多核而调节的:事实上,Parallel类和PLINQ内部就是基于任务并行构建的。

任务并不仅是提供了简单高效的使用线程池的方式。它们还提供了一些强大的功能来管理工做单元,包括:

任务也实现了局部工做队列(local work queues),这个优化可以让你高效的建立不少快速执行的子任务,而不会带来单一工做队列会致使的竞争开销。

TPL 可让你使用极小的开销建立几百个(甚至几千个)任务,但若是你要建立上百万个任务,那须要把这些任务分红大一些的工做单元才能有效率。Parallel类和 PLINQ 能够自动实现这种工做分解。

Visual Studio 2010 提供了一个新的窗口来监视任务(调试 | 窗口 | 并行任务)。它和线程窗口相似,只是用于任务。并行栈窗口也有一个专门的模式用于任务。

5.1建立与启动任务Permalink

如同咱们在第 1 部分线程池的讨论中那样,你能够调用Task.Factory.StartNew,并给它传递一个Action委托来建立并启动Task

Task.Factory.StartNew (() => Console.WriteLine ("Hello from a task!")); 

泛型的版本Task<TResult>Task的子类)可让你在任务结束时得到返回的数据:

Task<string> task = Task.Factory.StartNew<string> (() => // 开始任务 { using (var wc = new System.Net.WebClient()) return wc.DownloadString ("http://www.linqpad.net"); }); RunSomeOtherMethod(); // 咱们能够并行的作其它工做... string result = task.Result; // 等待任务结束并获取结果 

Task.Factory.StartNew是一步建立并启动任务。你也能够分解它,先建立Task实例,再调用Start

var task = new Task (() => Console.Write ("Hello")); // ... task.Start(); 

使用这种方式建立的任务也能够同步运行(在当前线程上):使用RunSynchronously替代Start

能够使用Status属性来追踪任务的执行状态。

指定状态对象Permalink

当建立任务实例或调用Task.Factory.StartNew时,能够指定一个状态对象(state object),它会被传递给目标方法。若是你但愿直接调用方法而不是 lambda 表达式,则能够使用它。

static void Main() { var task = Task.Factory.StartNew (Greet, "Hello"); task.Wait(); // 等待任务结束 } static void Greet (object state) { Console.Write (state); } // 打印 "Hello" 

由于 C# 中有 lambda 表达式,咱们能够更好的使用状态对象,用它来给任务赋予一个有意义的名字。而后就能够使用AsyncState属性来查询这个名字:

static void Main() { var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting"); Console.WriteLine (task.AsyncState); // 打印 "Greeting" task.Wait(); } static void Greet (string message) { Console.Write (message); } 

Visual Studio 会在并行任务窗口显示每一个任务的AsyncState属性,因此指定有意义的名字能够很大程度的简化调试。

TaskCreationOptionsPermalink

在调用StartNew(或实例化Task)时,能够指定一个TaskCreationOptions枚举来调节线程的执行。TaskCreationOptions是一个按位组合的枚举,它有下列(可组合的)值:LongRunningPreferFairnessAttachedToParent

LongRunning向调度器建议为任务使用一个独立的线程。这对长时间运行的任务有好处,由于它们可能会“霸占”队列,强迫短期任务等待过长的时间后才能被调度。LongRunning对于会阻塞的任务也有好处。

因为任务调度器通常会试图保持恰好足够数量的任务在线程上运行,来保持全部 CPU 核心都工做。因此不要超额分配(oversubscribing) CPU,或者说不要使用过多的活动线程,以免因为操做系统被迫进行大量耗时的时间切片和上下文切换致使的性能降低。

PreferFairness让调度器试图确保任务以它们启动的顺序被调度。默认状况下是使用另外一种方式,由于内部使用了局部工做窃取队列来优化任务调度。这个优化对于很是小的(细粒度)任务有实际的好处。

AttachedToParent用来建立子任务。

子任务Permalink

当一个任务启动另外一个任务时,你能够经过指定TaskCreationOptions.AttachedToParent选择性地创建父子关系:

Task parent = Task.Factory.StartNew (() => { Console.WriteLine ("I am a parent"); Task.Factory.StartNew (() => // 分离的任务 { Console.WriteLine ("I am detached"); }); Task.Factory.StartNew (() => // 子任务 { Console.WriteLine ("I am a child"); }, TaskCreationOptions.AttachedToParent); }); 

子任务的特殊之处在于,当你等待父任务结束时,也一样会等待全部子任务。这对于子任务是一个延续任务时很是有用,稍后咱们会看到。

5.2等待任务Permalink

有两种方式能够显式等待任务完成:

  • 调用Wait方法(可选择指定超时时间)
  • 访问Result属性(当使用Task<TResult>时)

也能够同时等待多个任务:经过静态方法Task.WaitAll(等待全部指定任务完成)和Task.WaitAny(等待任意一个任务完成)。

WaitAll和依次等待每一个任务相似,但它更高效,由于它只须要(至多)一次上下文切换。而且,若是有一个或多个任务抛出未处理的异常,WaitAll仍然可以等待全部任务,并在以后从新抛出一个AggregateException异常,它聚合了全部出错任务的异常,功能至关于下面的代码:

// 假设 t一、t2 和 t3 是任务: var exceptions = new List<Exception>(); try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } if (exceptions.Count > 0) throw new AggregateException (exceptions); 

调用WaitAny至关于在一个ManualResetEventSlim上等待,每一个任务结束时都对它发信号。

除了使用超时时间,你也能够传递一个取消标记Wait方法:这样能够取消等待。注意这不是取消任务。

5.3异常处理Permalink

当你等待一个任务结束时(经过调用Wait方法或访问其Result属性),全部未处理的异常都会用一个AggregateException对象封装,方便从新抛给调用方。通常就无需在任务代码中处理异常,而是这么作:

int x = 0; Task<int> calc = Task.Factory.StartNew (() => 7 / x); try { Console.WriteLine (calc.Result); } catch (AggregateException aex) { Console.Write (aex.InnerException.Message); // 试图以 0 为除数 } 

你仍然须要对独立的任务(无父任务而且没有在等待它)进行异常处理,以避免当任务失去做用域被垃圾回收时(见如下注释)有未处理的异常,那会致使程序结束。若是对任务的等待指定了超时时间,那也是如此,由于全部超时时间事后抛出的异常都是未处理的。

TaskScheduler.UnobservedTaskException静态事件提供了应对未处理的任务异常的最后手段。经过挂接这个事件,你就能够拦截这些本来会致使程序结束的异常,而且使用本身的逻辑对它们进行处理。

对于有父子关系的任务,在父任务上等待也会隐式的等待子任务,全部子任务的异常也会传递出来。

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; var parent = Task.Factory.StartNew (() => { Task.Factory.StartNew (() => // 子 { Task.Factory.StartNew (() => { throw null; }, atp); // 孙 }, atp); }); // 下面的调用会抛出 NullReferenceException 异常 (封装在 // 嵌套的 AggregateExceptions 中): parent.Wait(); 

有趣的是,若是你在任务抛出异常后检查它的Exception属性,这个读取属性的动做会防止由于该异常致使程序结束。基本原则是:PFX 的设计者不但愿你忽略异常,只要采起某种方式接收异常,就不会受到结束程序的惩罚。

任务中的未处理异常不会致使程序当即结束:它会延迟直到垃圾回收器处理到这个任务,并调用它的析构方法时。这个延迟是由于在进行垃圾回收前,还没法判断是否会调用Wait,或检查ResultException属性。它有时也会误导你对错误源头的判断(Visual Studio 的调试器若是开启了在首个异常处中断,能够帮助进行判断)。

立刻咱们会看处处理异常的另外一种策略,就是使用任务延续

5.4取消任务Permalink

启动任务时能够可选的传递一个取消标记(cancellation token)。它可让你经过协做取消模式取消任务,像以前描述的那样:

var cancelSource = new CancellationTokenSource(); CancellationToken token = cancelSource.Token; Task task = Task.Factory.StartNew (() => { // 作些事情... token.ThrowIfCancellationRequested(); // 检查取消请求 // 作些事情... }, token); // ... cancelSource.Cancel(); 

若是要检测任务取消,能够用以下方式捕捉AggregateException,并检查它的内部异常:

try { task.Wait(); } catch (AggregateException ex) { if (ex.InnerException is OperationCanceledException) Console.Write ("Task canceled!"); } 

若是但愿显式的抛出OperationCanceledException异常(而不是经过调用ThrowIfCancellationRequested),那么必须把取消标记传递给OperationCanceledException的构造方法。若是不这么作,这个任务就不会以TaskStatus.Canceled状态结束,而且也不会触发使用OnlyOnCanceled条件的任务延续。

若是任务在启动前被取消,它就不会被调度,而是直接在任务中抛出OperationCanceledException

由于取消标记也能够被其它 API 识别,因此能够在其它构造中无缝使用:

var cancelSource = new CancellationTokenSource(); CancellationToken token = cancelSource.Token; Task task = Task.Factory.StartNew (() => { // 传递取消标记给 PLINQ 查询: var query = someSequence.AsParallel().WithCancellation (token)... // ... enumerate query ... }); 

调用cancelSource上的Cancel方法就能够取消该 PLINQ 查询,它会在任务中抛出OperationCanceledException异常,从而取消该任务。

也能够给WaitCancelAndWait这类方法传递取消标记,它可让你取消等待操做,而不是任务自己。

5.5任务延续Permalink

有时,在一个任务完成(或失败)后立刻启动另外一个任务会颇有用。Task类上的ContinueWith方法正是实现了这种功能:

Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant..")); Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation")); 

一旦task1(前项,antecedent)完成、失败或取消,task2(延续,continuation)会自动启动。(若是task1在运行第二行代码前已经结束,那么task2会被当即调度执行。)传递给延续的 lambda 表达式的ant参数是对前项任务的引用。

咱们的例子演示了最简单的延续,它和如下代码功能相似:

Task task = Task.Factory.StartNew (() => { Console.Write ("antecedent.."); Console.Write ("..continuation"); }); 

可是经过延续的方式能够更加灵活,好比先等待task1完成,以后再等待task2。若是task1返回数据,这样就很是有用。

另外一个(不明显的)差别是:默认状况下,前项和延续任务多是在不一样的线程上执行。你能够在调用ContinueWith时指定TaskContinuationOptions.ExecuteSynchronously来强制它们在同一个线程执行:若是延续是很是细粒度的,这样作能够经过减小开销来提高性能。

延续和 Task<TResult>Permalink

像普通任务同样,延续也能够使用Task<TResult>类型并返回数据。下面的例子中,咱们使用链状任务来计算Math.Sqrt(8*2)并打印结果:

Task.Factory.StartNew<int> (() => 8) .ContinueWith (ant => ant.Result * 2) .ContinueWith (ant => Math.Sqrt (ant.Result)) .ContinueWith (ant => Console.WriteLine (ant.Result)); // 4 

咱们的例子比较简单,实际应用中,这些 lambda 表达式可能会调用计算密集型的方法。

延续与异常Permalink

延续能够经过前项的Exception属性来获取前项抛出的异常。下面的代码会输出NullReferenceException信息:

Task task1 = Task.Factory.StartNew (() => { throw null; }); Task task2 = task1.ContinueWith (ant => Console.Write (ant.Exception)); 

若是前项抛出了异常但延续没有检查前项的Exception属性(而且也没有在等待前项),那么异常会被认为是未处理的,就会致使程序结束(除非使用TaskScheduler.UnobservedTaskException进行了处理)。

安全的模式是从新抛出前项的异常。只要延续被Wait等待,异常就可以传播并从新抛出给等待方。

Task continuation = Task.Factory.StartNew (() => { throw null; }) .ContinueWith (ant => { if (ant.Exception != null) throw ant.Exception; // 继续处理... }); continuation.Wait(); // 异常被抛回调用方 

另外一种处理异常的方法是为异常和正常状况指定不一样的延续。须要用到TaskContinuationOptions

Task task1 = Task.Factory.StartNew (() => { throw null; }); Task error = task1.ContinueWith (ant => Console.Write (ant.Exception), TaskContinuationOptions.OnlyOnFaulted); Task ok = task1.ContinueWith (ant => Console.Write ("Success!"), TaskContinuationOptions.NotOnFaulted); 

这种模式在结合子任务使用时很是有用,咱们立刻会看到

下面的扩展方法会“吞掉”任务的未处理异常:

public static void IgnoreExceptions (this Task task) { task.ContinueWith (t => { var ignore = t.Exception; }, TaskContinuationOptions.OnlyOnFaulted); } 

(能够添加对异常的日志记录来进一步改进它。)如下是用法:

Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions(); 

延续与子任务Permalink

延续的一个强大功能是它仅在全部子任务都完成时才会启动。这时,全部子任务抛出的异常都会被封送给延续。

接下来的例子中,咱们启动三个子任务,每一个都抛出NullReferenceException。而后使用父任务的延续来一次性捕捉这些异常:

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; Task.Factory.StartNew (() => { Task.Factory.StartNew (() => { throw null; }, atp); Task.Factory.StartNew (() => { throw null; }, atp); Task.Factory.StartNew (() => { throw null; }, atp); }) .ContinueWith (p => Console.WriteLine (p.Exception), TaskContinuationOptions.OnlyOnFaulted); 

延续

条件延续Permalink

默认状况下,延续是被无条件调度的,也就是说不管前项是完成、抛出异常仍是取消,延续都会执行。你能够经过设置TaskContinuationOptions枚举中的标识(可组合)来改变这种行为。三种控制条件延续的核心标识是:

NotOnRanToCompletion = 0x10000, NotOnFaulted = 0x20000, NotOnCanceled = 0x40000, 

这些标识是作减法的,也就是组合的越多,延续越不可能被执行。为了方便使用,也提供了如下预先组合好的值:

OnlyOnRanToCompletion = NotOnFaulted | NotOnCanceled, OnlyOnFaulted = NotOnRanToCompletion | NotOnCanceled, OnlyOnCanceled = NotOnRanToCompletion | NotOnFaulted 

(组合全部Not*标识[NotOnRanToCompletion, NotOnFaulted, NotOnCanceled]没有意义,这会致使延续始终被取消。)

RanToCompletion表明前项成功完成,没有被取消,也没有未处理的异常。

Faulted表明前项中有未处理的异常抛出。

Canceled表明如下两种状况之一:

  • 前项经过其取消标记被取消。换句话说,OperationCanceledException在前项中抛出,它的CancellationToken属性与启动时传递给前项的标记取消匹配。

  • 前项被隐式的取消,由于没法知足指定的延续条件。

特别须要注意的是,若是这些标识致使延续没法执行,延续并非被忘记或抛弃,而是被取消。这意味着全部延续任务上的延续就会开始运行,除非你指定了NotOnCanceled。例如:

Task t1 = Task.Factory.StartNew (...); Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"), TaskContinuationOptions.OnlyOnFaulted); Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3")); 

像以前说的同样,t3始终会被调度,即便是t1没有抛出异常也是如此。由于t1成功完成,fault任务会被取消,而t3上并无定义任何限制延续的条件,因此t3就会被无条件执行。

条件延续

若是但愿仅在fault真正运行的状况下执行t3,须要把代码改为:

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"), TaskContinuationOptions.NotOnCanceled); 

(此外,也能够指定OnlyOnRanToCompletion,不一样之处就是t3fault抛出异常的状况下不会执行。)

多前项的延续Permalink

延续的另外一个有用的功能是它能够在多个前项完成后调度执行。ContinueWhenAll是在多个前项都完成后调度,而ContinueWhenAny是在任意一个前项完成后调度。这两个方法都定义在TaskFactory类上:

var task1 = Task.Factory.StartNew (() => Console.Write ("X")); var task2 = Task.Factory.StartNew (() => Console.Write ("Y")); var continuation = Task.Factory.ContinueWhenAll ( new[] { task1, task2 }, tasks => Console.WriteLine ("Done")); 

上面的例子会在打印 “ XY “ 或 “ YX “ 以后打印 “ Done “。Lambda 表达式中的tasks参数能够用来访问完成的任务数组,当前项返回数据时能够用到。下面的例子对两个前项返回的数字求和:

// 真实场景中 task1 和 task2 可能调用复杂的功能: Task<int> task1 = Task.Factory.StartNew (() => 123); Task<int> task2 = Task.Factory.StartNew (() => 456); Task<int> task3 = Task<int>.Factory.ContinueWhenAll ( new[] { task1, task2 }, tasks => tasks.Sum (t => t.Result)); Console.WriteLine (task3.Result); // 579 

在这个例子中,咱们使用了<int>类型参数来调用Task.Factory是为了演示得到了一个泛型的任务工厂。这个类型参数不是必须的,它能够被编译器推断。

单前项的多个延续Permalink

对一个任务调用一次以上的ContinueWith会建立单前项的多个延续。当该前项完成时,全部延续会一块儿启动(除非指定了TaskContinuationOptions.ExecuteSynchronously,这会致使延续顺序执行)。

下面的代码会等待一秒,而后打印 “ XY “ 或者 “ YX “:

var t = Task.Factory.StartNew (() => Thread.Sleep (1000)); t.ContinueWith (ant => Console.Write ("X")); t.ContinueWith (ant => Console.Write ("Y")); 

5.6任务调度器与 UIPermalink

任务调度器(task scheduler)为任务分配线程,其由抽象类TaskScheduler类表明,全部任务都会和一个任务调度器关联。Framework 提供了两种具体实现:默认调度器(default scheduler)是使用 CLR 线程池工做,还有同步上下文调度器(synchronization context scheduler),它(主要)是为了对于使用 WPF 和 Windows Forms 的场景提供帮助,这里的线程模型须要 UI 控件只能在建立它们的线程上访问。例如,假设咱们须要在后台从一个 web 服务获取数据,而后使用它更新一个叫作lblResult的 WPF 标签。这能够分解为两个任务:

  1. 调用方法从 web 服务获取数据(前项任务)。
  2. 使用结果更新lblResult延续任务)。

若是对延续任务指定了窗口建立时获取的同步上下文调度器,那么就能够安全的更新lblResult

public partial class MyWindow : Window { TaskScheduler _uiScheduler; // 定义一个字段以便于 // 在类中使用 public MyWindow() { InitializeComponent(); // 从建立窗口的线程获取 UI 调度器: _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext(); Task.Factory.StartNew<string> (SomeComplexWebService) .ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler); } string SomeComplexWebService() { ... } } 

也能够实现本身的任务调度器(经过继承TaskScheduler),可是通常只会在很是特殊的场景下才会这么作。对于自定义调度,须要常用TaskCompletionSource,咱们立刻会讲到。

5.7TaskFactoryPermalink

当调用Task.Factory时,就是经过Task上的静态属性获取了默认的TaskFactory对象。这个任务工厂的做用就是建立任务,具体的说,有三种任务:

  • 普通任务(经过StartNew
  • 多前项的延续(经过ContinueWhenAllContinueWhenAny
  • 封装了异步编程模型(APM)的任务(经过FromAsync

有趣的是,TaskFactory是建立后两种任务的惟一方法。而对于StartNewTaskFactory纯粹是为了方便,技术上说是多余的,这彻底等同于建立Task对象而后调用其Start方法。

建立本身的任务工厂Permalink

TaskFactory不是抽象工厂:你能够实例化这个类,在但愿重复使用一样的(非默认的)TaskCreationOptions值、TaskContinuationOptions值或者TaskScheduler时有用。例如,若是但愿重复建立长时间运行的子任务,咱们能够这样建立一个自定义工厂:

var factory = new TaskFactory ( TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None); 

而后建立任务就能够仅调用这个工厂上的StartNew

Task task1 = factory.StartNew (Method1); Task task2 = factory.StartNew (Method2); // ... 

在调用ContinueWhenAllContinueWhenAny时,自定义的延续选项会被应用。

5.8TaskCompletionSourcePermalink

Task类作了两件事情:

  • 它能够调度一个委托到线程池线程上运行。
  • 它提供了管理工做项的丰富功能(延续、子任务、异常封送等等)。

有趣的是,这两件事能够是分离的:能够只利用任务的管理工做项的功能而不让它调度到线程池上运行。TaskCompletionSource类开启了这个模式。

使用TaskCompletionSource时,就建立它的实例。它暴露一个Task属性来返回一个任务,你能够对其等待或附加延续,就和对通常的任务同样。然而这个任务能够经过TaskCompletionSource对象的下列方法进行彻底控制:

public class TaskCompletionSource<TResult> { public void SetResult (TResult result); public void SetException (Exception exception); public void SetCanceled(); public bool TrySetResult (TResult result); public bool TrySetException (Exception exception); public bool TrySetCanceled(); // ... } 

若是调用屡次,SetResultSetExceptionSetCanceled会抛出异常,而Try*方法会返回false

TResult对应任务的返回类型,因此TaskCompletionSource<int>会给你一个Task<int>。若是须要不返回结果的任务,能够使用object类型来建立TaskCompletionSource,并在调用SetResult时传递null。能够把Task<object>转换为Task类型来使用。

下面的代码在等待五秒以后打印 “ 123 “:

var source = new TaskCompletionSource<int>(); new Thread (() => { Thread.Sleep (5000); source.SetResult (123); }) .Start(); Task<int> task = source.Task; // 咱们的“奴隶”任务 Console.WriteLine (task.Result); // 123 

稍后,咱们会展现使用如何BlockingCollection写一个生产者 / 消费者队列。而后会演示使用TaskCompletionSource来改进这个方案,它能够使队列中的工做项能够被等待和取消。

6使用 AggregateExceptionPermalink

如前所属,PLINQParallel类和Task都会自动封送异常给使用者。为了明白这么作的重要性,考虑如下 LINQ 查询,它在第一次迭代时会抛出DivideByZeroException

try { var query = from i in Enumerable.Range (0, 1000000) select 100 / i; // ... } catch (DivideByZeroException) { // ... } 

若是咱们使用 PLINQ 来并行化查询而假设它并无进行异常处理,那么DivideByZeroException可能会在一个线程中被抛出,就会无视catch块从而致使程序结束。

所以,异常会被自动捕捉并从新抛给调用方。然而不幸的是,状况并非就像捕捉一个DivideByZeroException那般简单。由于这些类库会利用不少线程,极可能有两个或更多的异常被同时抛出。为了确保可以报告全部异常,就使用了AggregateException做为容器来封装它们,并经过InnerExceptions属性来暴露:

try { var query = from i in ParallelEnumerable.Range (0, 1000000) select 100 / i; // 对查询进行枚举 // ... } catch (AggregateException aex) { foreach (Exception ex in aex.InnerExceptions) Console.WriteLine (ex.Message); } 

PLINQ 和Parallel类都会在遇到第一个异常时中止查询或循环执行,它使用的方式是不处理以后的元素或循环体。而在本轮循环结束前,还有可能抛出更多的异常。第一个异常能够经过AggregateException上的InnerException属性获取。

6.1Flatten 和 HandlePermalink

AggregateException类提供了一对方法来简化异常处理:FlattenHandle

FlattenPermalink

AggregateException常常会包含其它的AggregateException。好比在子任务抛出异常时就可能如此。你能够经过调用Flatten来消除任意层级的嵌套以简化处理。这个方法会返回一个新的AggregateException,它的InnerExceptions就是展平以后的结果:

catch (AggregateException aex) { foreach (Exception ex in aex.Flatten().InnerExceptions) myLogWriter.LogException (ex); } 

HandlePermalink

有时只须要捕捉特定类型的异常,并从新抛出其它类型的异常。AggregateException上的Handle方法提供了一个快捷方案。它接受一个异常断定器,来对全部封装的异常进行断定:

public void Handle (Func<Exception, bool> predicate) 

若是断定器返回true,则该异常被认为是“已处理”。对于全部异常都运行断定以后,接下来会发生:

  • 若是全部异常都“已处理”(断定器返回true),则不会从新抛出异常。
  • 若是有异常被断定为false(“未处理”),则会生成一个新的AggregateException来封装这些异常,并从新抛出。

例如,下面的代码最后会从新抛出一个AggregateException,而且其中仅包含一个NullReferenceException

var parent = Task.Factory.StartNew (() => { // 咱们使用 3 个子任务同时抛出 3 个异常: int[] numbers = { 0 }; var childFactory = new TaskFactory (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None); childFactory.StartNew (() => 5 / numbers[0]); // 除数为零 childFactory.StartNew (() => numbers [1]); // 索引越界 childFactory.StartNew (() => { throw null; }); // 空引用 }); try { parent.Wait(); } catch (AggregateException aex) { aex.Flatten().Handle (ex => // 注意这里仍是须要调用 Flatten { if (ex is DivideByZeroException) { Console.WriteLine ("Divide by zero"); return true; // 该异常“已处理” } if (ex is IndexOutOfRangeException) { Console.WriteLine ("Index out of range"); return true; // 该异常“已处理” } return false; // 其它全部异常会被从新抛出 }); } 

7并发集合Permalink

Framework 4.0 在System.Collections.Concurrent命名空间中提供了一组新的集合。它们都是彻底线程安全的:

并发集合 对应的非并发集合
ConcurrentStack<T> Stack<T>
ConcurrentQueue<T> Queue<T>
ConcurrentBag<T> ( none )
BlockingCollection<T> ( none )
ConcurrentDictionary<TKey,TValue> Dictionary<TKey,TValue>

在通常的多线程场景中,须要线程安全的集合时可能会用到这些并发集合。可是,有些注意事项:

  • 并发集合是为了并行编程而调整的。除了高并发场景,传统的集合都比它们更高效。
  • 线程安全的集合并不能确保使用它的代码也是线程安全的
  • 若是在对并发集合进行枚举的同时有其它线程修改了集合,并不会产生异常,而是会获得一个新旧内容的混合结果。
  • 没有List<T>的并发版本。
  • 并发的栈、队列和包(bag)类内部都是使用链表实现的。这使得它们的空间效率不如非并发的StackQueue类,可是这对于并发访问更好,由于链表有助于实现无锁或更少的锁。(这是由于向链表中插入一个节点只须要更新两个引用,而对于List<T>这种结构插入一个元素可能须要移动几千个已存在的元素。)

换句话说,这些集合并非提供了加锁使用普通集合的快捷办法。为了演示这一点,若是咱们在单一线程上执行如下代码:

var d = new ConcurrentDictionary<int,int>(); for (int i = 0; i < 1000000; i++) d[i] = 123; 

它会比下面的代码慢三倍:

var d = new Dictionary<int,int>(); for (int i = 0; i < 1000000; i++) lock (d) d[i] = 123; 

(可是对ConcurrentDictionary读取会更快,由于读是无锁的。)

并发集合与普通集合的另外一个不一样之处是它们暴露了一些特殊的方法,来进行原子的检查并行动(test-and-act)的操做,例如TryPop。这些方法中的大部分都是由IProducerConsumerCollection<T>接口统一的。

7.1IProducerConsumerCollection<T>Permalink

生产者 / 消费者集合有两个主要用例:

  • 添加一个元素(“生产”)
  • 获取一个元素并移除它(“消费”)

典型的例子是栈和队列。生产者 / 消费者集合在并行编程中很是重要,由于它有助于高效的无锁实现。

IProducerConsumerCollection<T>接口表明了线程安全的生产者 / 消费者集合。如下类实现了该接口:ConcurrentStack<T>ConcurrentQueue<T>ConcurrentBag<T>

IProducerConsumerCollection<T>扩展自ICollection,并加入了如下方法:

void CopyTo (T[] array, int index); T[] ToArray(); bool TryAdd (T item); bool TryTake (out T item); 

TryAddTryTake方法检查是否能进行添加 / 移除操做,若是能够,就进行添加 / 移除。检查和操做是原子的,因此无需像普通集合那样使用锁:

int result; lock (myStack) if (myStack.Count > 0) result = myStack.Pop(); 

TryTake在集合为空时返回falseTryAdd在三种实现中都总会成功并返回true。而若是你要写本身的不容许重复元素的并发集合,就能够在元素已存在时让TryAdd返回false(好比本身写并发集(set))。

TryTake移除的具体元素是在子类中定义的:

  • 对于栈,TryTake移除最新添加的元素。
  • 对于队列,TryTake移除最先添加的元素。
  • 对于包,TryTake移除能够最快移除的元素。

这三个具体类基本都是显式实现了TryTakeTryAdd方法,也经过更具体的的名字暴露了一样的功能,好比TryDequeueTryPop

7.2ConcurrentBag<T>Permalink

ConcurrentBag<T>用来存储一组无序的对象(容许重复)。它适用于你不关心调用TakeTryTake会返回哪一个元素的场景。

ConcurrentBag<T>相比并发队列和栈的好处是它的Add方法被不少线程同时调用时几乎没有竞争冲突。而对于并发队列和栈,并行调用Add会有一些竞争冲突(可是比对非并发集合加锁的方式要小得多)。并发包的Take方法也很是高效,只要每一个线程不要拿出比它添加的数量更多的元素。

在并发包的内部,每个线程都有其私有的链表。元素会加入到调用Add的线程对应的私有链表中,就消除了竞争冲突。在对包进行枚举时,枚举器会遍历全部线程的私有链表,返回其中的每个元素。

调用Take时,包会首先检查当前线程的私有链表。若是其中有至少一个元素,就能够没有冲突的轻松完成任务(大多数状况都是如此)。可是若是链表没有元素,它就必须从其它线程的私有链表中“偷”一个元素,就可能致使竞争冲突。

因此,准确的说,调用Take会返回当前线程最新添加的元素,若是当前线程没有对应的元素,就会随机取一个其它线程,返回它最新添加的元素。

若是你的并行操做基本都是在添加元素,或者每一个线程的AddTake是平衡的,那么使用并发包就很理想。咱们来看前面的一个例子,是使用Parallel.ForEach来实现并行拼写检查:

var misspellings = new ConcurrentBag<Tuple<int,string>>(); Parallel.ForEach (wordsToTest, (word, state, i) => { if (!wordLookup.Contains (word)) misspellings.Add (Tuple.Create ((int) i, word)); }); 

对于实现生产者 / 消费者队列,并发包就不是一个好的选择,由于元素是在不一样的线程进行添加和移除的。

7.3BlockingCollection<T>Permalink

若是在ConcurrentStack<T>ConcurrentQueue<T>ConcurrentBag<T>这些生产者 / 消费者集合上调用TryTake时,集合为空,该方法会返回false。这种场景下,有时可能等待一个元素被添加会更有用。

与其重载TryTake方法来实现这个功能(若是还要容许取消和超时就可能须要大量成员),不如使用 PFX 的设计者已经实现好的BlockingCollection<T>类。阻塞集合能够封装任意实现了IProducerConsumerCollection<T>接口的对象,就能够调用这个封装上面的Take方法,它在没有元素时会阻塞。

阻塞集合也可让你限制集合的大小,若是超过限制就阻塞生产者。这样限制了大小的集合被称为有界阻塞集合(bounded blocking collection)。

使用BlockingCollection<T>时:

  1. 建立其实例,可选的指定一个IProducerConsumerCollection<T>来封装,还有集合的最大大小(上界)。
  2. 调用AddTryAdd来对底层集合添加元素。
  3. 调用TakeTryTake来移除(消费)底层集合中的元素。

若是调用构造方法的时候没有指定目标集合,就会自动使用一个ConcurrentQueue<T>的实例。进行生成和消费的方法均可以指定取消标记和超时时间。AddTryAdd在集合有界时可能会阻塞,TakeTryTake在集合为空时会阻塞。

另外一种消费元素的方式是调用GetConsumingEnumerable。它会返回一个(可能)无限的序列,当有元素时就能够返回它。你能够调用CompleteAdding来强行结束这个序列,它也会阻止以后再添加元素。

前面咱们写过一个使用 WaitPulse的生产者 / 消费者队列。这里使用BlockingCollection<T>来重构同一个类(不考虑异常处理):

public class PCQueue : IDisposable { BlockingCollection<Action> _taskQ = new BlockingCollection<Action>(); public PCQueue (int workerCount) { // 为每一个消费者建立并启动单独的任务: for (int i = 0; i < workerCount; i++) Task.Factory.StartNew (Consume); } public void Dispose() { _taskQ.CompleteAdding(); } public void EnqueueTask (Action action) { _taskQ.Add (action); } void Consume() { // 没有元素时,对序列的枚举就会被阻塞, // 而调用 CompleteAdding 能够结束枚举。 foreach (Action action in _taskQ.GetConsumingEnumerable()) action(); // 进行任务 } } 

由于没有给BlockingCollection的构造方法传递任何参数,因此会自动建立一个并发队列。而若是传递一个ConcurrentStack,咱们就会获得生产者 / 消费者栈。

BlockingCollection还提供了AddToAnyTakeFromAny这些静态方法,它们可让你对指定的多个阻塞集合进行添加或移除元素。操做会对第一个可以进行操做的集合进行。

利用 TaskCompletionSourcePermalink

咱们以前实现的生产者 / 消费者模式还不够灵活,由于工做项添加后没法追踪它们。若是可以实现如下功能会更好:

  • 可以获知工做项的完成。
  • 取消未启动的工做项。
  • 优雅的处理工做项抛出的异常。

理想的解决方案是让EnqueueTask方法返回一个对象,来提供咱们上面描述的功能。好消息是这个类已经存在,正是Task类。咱们须要作的只是经过TaskCompletionSource来操控它:

public class PCQueue : IDisposable { class WorkItem { public readonly TaskCompletionSource<object> TaskSource; public readonly Action Action; public readonly CancellationToken? CancelToken; public WorkItem ( TaskCompletionSource<object> taskSource, Action action, CancellationToken? cancelToken) { TaskSource = taskSource; Action = action; CancelToken = cancelToken; } } BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>(); public PCQueue (int workerCount) { // 为每一个消费者建立并启动单独的任务: for (int i = 0; i < workerCount; i++) Task.Factory.StartNew (Consume); } public void Dispose() { _taskQ.CompleteAdding(); } public Task EnqueueTask (Action action) { return EnqueueTask (action, null); } public Task EnqueueTask (Action action, CancellationToken? cancelToken) { var tcs = new TaskCompletionSource<object>(); _taskQ.Add (new WorkItem (tcs, action, cancelToken)); return tcs.Task; } void Consume() { foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable()) if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested) { workItem.TaskSource.SetCanceled(); } else try { workItem.Action(); workItem.TaskSource.SetResult (null); // 表示完成 } catch (OperationCanceledException ex) { if (ex.CancellationToken == workItem.CancelToken) workItem.TaskSource.SetCanceled(); else workItem.TaskSource.SetException (ex); } catch (Exception ex) { workItem.TaskSource.SetException (ex); } } } 

EnqueueTask中,咱们入队一个工做项,它封装了目标委托和任务完成源,从而让咱们以后能够控制返回给消费者的任务。

Consume中,咱们在出队一个工做项后先检查任务是否被取消。若是没有,就运行委托而后调用任务完成源上的SetResult来表示任务完成。

下面是如何使用这个类:

var pcQ = new PCQueue (1); Task task = pcQ.EnqueueTask (() => Console.WriteLine ("Easy!")); // ... 

咱们如今能够对task等待、附加延续、让延续中的异常传播给父任务等等。换句话说,咱们得到了任务模型的丰富功能,同时也至关于自行实现了一个调度器。

8SpinLock 和 SpinWaitPermalink

在并行编程中,短暂的自旋常常比阻塞更好,由于它避免了上下文切换和内核模式转换的开销。SpinLockSpinWait被设计用来在这种场景下提供帮助。它们的主要用途是实现自定义的同步构造。

SpinLockSpinWait是结构体而不是类!这个设计是一种避免间址和垃圾回收的极限优化技术。它意味着你必须当心,不能不经意地复制了实例,好比不使用ref修饰符把它们传递给另外一个方法,或者把它们定义成了readonly的字段。这在使用SpinLock时十分重要。

8.1SpinLockPermalink

SpinLock结构体可让你进行锁定,而无需上下文切换的开销,它的代价是保持一个线程自旋(空忙)。这种方式适用于高竞争的场景下锁定很是短暂的状况(好比,从头写一个线程安全的链表)。

若是让自旋锁等待的过久(最可能是几毫秒),它会和普通的锁同样出让其时间片,致使上下文切换。再被从新调度后,它会继续出让,就这样不断的“自旋出让”。这比彻底使用自旋消耗的 CPU 资源要少得多,可是比阻塞要高。

在单核的机器上,自旋锁在遇到竞争时会当即开始“自旋出让”。

使用SpinLock和普通的锁差很少,除了如下几点:

  • 自旋锁是结构体(前面有提到)。
  • 自旋锁不可重入,意味着不能在一个线程上连续两次调用同一个SpinLock上的Enter方法。若是违反了这条规则,要否则会抛出异常(启用全部者追踪(owner tracking)时),要否则会死锁(禁用全部者追踪时)。在构造自旋锁时,能够指定是否启用全部者追踪,启用会影响性能。
  • SpinLock可让你经过IsHeld属性查询锁是否已被获取,若是启用了全部者追踪,那么使用IsHeldByCurrentThread属性。
  • 没有像 C# 的lock语句那样的语法糖来简化SpinLock的使用。

另外一个不一样之处是当调用Enter时,你必须遵循提供 lockTaken 参数的健壮模式(几乎老是使用try / finally一块儿实现)。

下面是个例子:

var spinLock = new SpinLock (true); // 启用全部者追踪 bool lockTaken = false; try { spinLock.Enter (ref lockTaken); // 作些事情... } finally { if (lockTaken) spinLock.Exit(); } 

和普通的锁同样,当(且仅当)Enter方法抛出异常而且锁没有被获取时,lockTaken会为false。这种场景很是罕见(当调用该线程的Abort,或者OutOfMemoryException异常被抛出时),但可让你肯定以后是否须要调用Exit

SpinLock也提供了接受超时时间的TryEnter方法。

因为SpinLock笨拙的值类型语义和缺少语法支持,几乎每次想用它都是受罪!在替换掉普通的锁前请三思。

SpinLock在须要写本身的可重用同步构造时最有意义。即使如此,自旋锁也不像看上去那么有用。它仍然限制了并发。而且会什么都不作的浪费 CPU 时间。常常更好的选择都是把时间花在一些“投机”的事情上,并使用SpinWait来辅助。(译者注:这里“投机”是指先进行操做并检测抢占,若是发现被抢占就重试,详见SpinWait

8.2SpinWaitPermalink

SpinWait能够帮助实现无锁的代码,使用自旋而非阻塞。它实现了安全措施来避免普通自旋可能会形成的资源饥饿和优先级倒置。

使用SpinWait的无锁编程是多线程中最难的,它是为了应对没有其它高层构造能够使用的场景。先决条件是理解非阻塞同步

为何须要 SpinWaitPermalink

假设咱们写了一个纯粹基于一个简单标识的自旋信号系统:

bool _proceed; void Test() { // 自旋直到其它线程把 _proceed 设置为 true: while (!_proceed) Thread.MemoryBarrier(); // ... } 

若是Test运行时_proceed已经为true,或者几回循环内就能变为true,那么这个实现就会很高效。可是如今假设_proceed在几秒内保持false,而且有四个线程同时调用Test。这个自旋就会彻底占用一个四核的 CPU!这会致使其它线程运行缓慢(资源饥饿),包括那个会把_proceed设置为true的线程(优先级倒置)。在单核机器上,情况会进一步恶化,由于自旋几乎老是致使优先级倒置。(虽然如今单核机器已经不多见了,但是单核的虚拟机并很多。)

SpinWait使用两种方法解决这个问题。首先,它会限制消耗 CPU 的自旋,在通过必定次数的自旋后,就会每次循环都出让其时间片(经过调用Thread.YieldThread.Sleep),从而减小资源消耗。其次,它会检测是不是在单核机器上运行,若是是,就会每次循环都出让其时间片。

如何使用 SpinWaitPermalink

有两种方式使用SpinWait。第一种是调用静态方法SpinUntil。这个方法接受一个断定器(和一个可选的超时时间):

bool _proceed; void Test() { SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; }); // ... } 

另外一种(更灵活)的方式是建立SpinWait结构体的实例,并在循环中调用SpinOnce

bool _proceed; void Test() { var spinWait = new SpinWait(); while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); } // ... } 

前者就是使用后者提供的快捷方式。

SpinWait 如何工做Permalink

在当前的实现中,SpinWait会在出让以前进行 10 次消耗 CPU 的迭代。但它并不会在每次迭代后当即返回调用方,而是调用Thread.SpinWait来 经过 CLR(最终是经过操做系统)再自旋必定时间。这个时间最初是几十纳秒,每次迭代都会加倍,直到 10 次迭代结束。这在必定程度上保证了消耗 CPU 的自旋阶段的总时间的可预测性,CLR 和操做系统能够根据状况来调节。通常来讲,这会在几十微秒的区间,很小,可是要大于上下文切换的开销。

在单核机器上,SpinWait每次迭代都会出让。能够经过NextSpinWillYield属性来检查SpinWait在下一次自旋时会不会出让。

若是SpinWait在自旋出让模式保持了好久(大概 20 次),就会按期Sleep几微秒来进一步节约资源给其它线程使用。

使用 SpinWait 和 Interlocked.CompareExchange 进行无锁更新Permalink

结合SpinWaitInterlocked.CompareExchange能够原子的更新一个经过本身的值进行计算的字段(读 - 改 - 写)。例如,假设咱们要把字段 x 乘 10。非线程安全的简单代码就是:

x = x * 10; 

它不是线程同步的缘由就和咱们在非阻塞同步中看到的对字段自增不是线程同步的缘由同样。

正确的无锁方式以下:

  1. 使用局部变量获取 x 的一个“快照”。
  2. 计算新值(这里就是将快照乘 10)。
  3. 若是快照仍是最新的,就将计算后的值写回(这一步必须是原子的,经过调用Interlocked.CompareExchange实现)。
  4. 若是快照过时了,自旋并返回第 1 步。

例如:

int x; void MultiplyXBy (int factor) { var spinWait = new SpinWait(); while (true) { int snapshot1 = x; Thread.MemoryBarrier(); int calc = snapshot1 * factor; int snapshot2 = Interlocked.CompareExchange (ref x, calc, snapshot1); if (snapshot1 == snapshot2) return; // 没有被抢占 spinWait.SpinOnce(); } } 

咱们能够去掉对Thread.MemoryBarrier的调用来略微提升性能。这是由于CompareExchange也会生成内存屏障。最坏的状况就是若是snapshot1在第一次迭代时就读取了一个过时的值,那么会多进行一次自旋。

Interlocked.CompareExchange是在字段的当前值与第三个参数相等时使用指定的值来更新字段。它会返回字段的旧值,就能够用来与原快照比较,检查是否过时。若是值不相等,意味着被另外一个线程抢占,就须要自旋重试。

CompareExchange也有重载能够对于object类型使用。咱们能够利用这个重载来实现对全部引用类型的无锁更新方法:

static void LockFreeUpdate<T> (ref T field, Func <T, T> updateFunction) where T : class { var spinWait = new SpinWait(); while (true) { T snapshot1 = field; T calc = updateFunction (snapshot1); T snapshot2 = Interlocked.CompareExchange (ref field, calc, snapshot1); if (snapshot1 == snapshot2) return; spinWait.SpinOnce(); } } 

下面是如何使用这个方法来写一个无锁的线程安全的事件(实际上,这是 C# 4.0 的编译器对于事件默认的处理):

EventHandler _someDelegate; public event EventHandler SomeEvent { add { LockFreeUpdate (ref _someDelegate, d => d + value); } remove { LockFreeUpdate (ref _someDelegate, d => d - value); } } 

SpinWait vs SpinLockPermalink

咱们也能够经过把对共享的字段的访问放进SpinLock里来解决上面的问题。问题是自旋锁同一时间只容许一个线程进入,尽管它(一般)可以消除上下文切换的开销。而使用SpinWait时,咱们能够假设没有竞争,投机的运行。若是被抢占就重试。花费 CPU 时间作事情也许比在自旋锁中浪费 CPU 时间好!

最后,考虑下面的类:

class Test { ProgressStatus _status = new ProgressStatus (0, "Starting"); class ProgressStatus // 不可变类 { public readonly int PercentComplete; public readonly string StatusMessage; public ProgressStatus (int percentComplete, string statusMessage) { PercentComplete = percentComplete; StatusMessage = statusMessage; } } } 

咱们能够使用LockFreeUpdate方法来增长_statusPercentComplete字段的值:

LockFreeUpdate (ref _status, s => new ProgressStatus (s.PercentComplete + 1, s.StatusMessage)); 

注意咱们基于现有值建立了新的ProgressStatus对象。要感谢LockFreeUpdate方法,读取PercentComplete的值、增长它并写回的操做不会被不安全的抢占:任何抢占均可以被可靠的检测到,触发自旋重试。

相关文章
相关标签/搜索