目录git
System.Threading.Tasks
中的类型被称为任务并行库(Task Parallel Library,TPL)。github
System.Thread.Tasks
命名空间是.NET Framework4.0所提供,编程
“TPL使用CLR线程池自动将应用程序的工做动态分配到可用的CPU中。TPL还处理工做分区、线程调度、状态管理和其余低级别的细节操做。最终结果是,你能够最大限度地提高.NET应用程序的性能,而且避免直接操做线程所带来的复杂性” --《精通C#》api
在System.Threading.Tasks
命名空间下有一个静态类:Parallel类数组
Parallel能够实现对实现了IEnumerable接口的数据集合的每个元素并行操做浏览器
有一点要说明的:并行操做会带来必定的成本,若是任务自己能很快完成,或是循环次数不多,那么并行处理的速度也许会比非并行处理还慢。安全
Parallel类就只有有三个方法:Parallel.For()
、Parallel.ForEach()
和Parallel.Invoke()
多线程
可是呢,这每一个方法都有大量的重载(F12-->自行查看Parallel定义)并发
使用Parallel.For()
能够对数组中的每个元素进行并行操做异步
正常的遍历数组是按照索引的顺序执行的,可是并行操做,对数组的每个元素的操做不必定按照索引顺序操做
Parallel.For(),第一个参数是循环开始的索引(包含),第二个参数是循环结束的索引(不含)
Parallel.For()的第三个参数是一个有参数无返回值的委托,其参数是数组的索引
其实就至关于:for (int i = 0; i < length; i++)
的异步版本,只是在这里是并行操做,因此并不按照数组中元素的顺序执行,具体的执行顺序是不可控的。
示例
static void Main(string[] args) { int[] intArray = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; Console.WriteLine("------------常规,对数组进行循环遍历------------"); Array.ForEach(intArray, n => Console.WriteLine($"当前操做的数组元素是{n}"));//注意这里的参数n是元素而不是索引 Console.WriteLine("------------并行操做 对数组进行循环遍历------------"); Parallel.For(0, intArray.Length, (i) => Console.WriteLine($"当前循环次数{i},当前操做的数组元素是{intArray[i]}")); Console.ReadKey(); }
运行结果:能够看出,对数组的元素的操做顺序并非按照索引的顺序,而是不肯定的。
Parallel.ForEach()
用于对泛型可枚举对象的元素进行并行操做
其实就至关于:foreach (var item in collection)
的异步版本
Parallel.ForEach()有大量的重载,这里展现一个简单的操做
Parallel.ForEach()的第一个参数是待操做的可枚举对象,第二个参数是一个有参数无返回值的委托,该委托参数是集合的元素(而不是索引)
示例
List<int> intList = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; Parallel.ForEach(intList, n => Console.WriteLine(n+100)); Console.ReadKey();
Parallel.Invoke()
对指定一系列操做并行运算
参数是一个Action委托数组(注意只能是Action[],即只能是无返回值的委托数组)
Parallel.Invoke()最多见用于并发请求接口
示例:
static void Main(string[] args) { Action action1=() => { for (int i = 0; i < 5; i++) { Console.WriteLine($"action-1-操做"); } }; Action action2 = () => { for (int i = 0; i < 5; i++) { Console.WriteLine($"action-2-操做"); } }; //Parallel.Invoke(action1, action2); Action[] actions = { action1, action2 }; Parallel.Invoke(actions); Console.ReadKey(); }
运行结果:
详细能够参考微软的在线文档
多线程对同一个数据集合同时读写操做,可能会形成数据的混乱
.NET4 引入了System.Collections.Concurrent
命名空间,其中包含多个线程安全的数据集合类型。
如今的新项目中,只要是对数据集合进行多线程的增删操做,就应该使用并发集合类。
可是,若是仅从集合进行多线程的读取,则可以使用通常的数据集合,即 System.Collections.Generic 命名空间中的类。
.net 中线程安全的数据集合有一下一些:
类型 | 描述 |
---|---|
BlockingCollection | 为实现 IProducerConsumerCollection 的全部类型提供限制和阻止功能。 有关详细信息,请参阅 BlockingCollection 概述。 |
ConcurrentDictionary | 键值对字典的线程安全实现。 |
ConcurrentQueue | FIFO(先进先出)队列的线程安全实现。 |
ConcurrentStack | LIFO(后进先出)堆栈的线程安全实现。 |
ConcurrentBag | 无序元素集合的线程安全实现。 |
IProducerConsumerCollection | 类型必须实现以在 BlockingCollection 中使用的接口。 |
一个简单的示例:给一个数据集合添加大批量的数据
List<int> list = new List<int>(); Parallel.For(0, 1000000, t => list.Add(t));
如果按照上面使用Parallel.For()
的并行方式给List添加数据,
则会报错:“索引超出了数组界限。”或“ 源数组长度不足。请检查 srcIndex 和长度以及数组的下限。”
即便没有报错,list中的数据也是有问题的(比可能数量不足)
固然能够经过加锁的方式进行弥补:
List<int> list = new List<int>(); object locker = new object(); Parallel.For(0, 1000000, t => { lock(locker) { list.Add(t); } });
这样经过对操做的线程枷锁,彻底是没有必要的,你可使用线程安全的集合类型,好比在这里使用ConcurrentBag
ConcurrentBag<int> cBag = new ConcurrentBag<int>(); Parallel.For(0, 100000, t => cBag.Add(t));
固然由于是并行操做,因此插入集合中的数据并非按照0-100000的顺序(仅仅是成段的有序)。
System.Threading.Tasks
命名空间中Task类,表示异步操做。
Task类能够轻松地在次线程中调用方法,能够做为异步委托的简单替代品。
同时在该命名空间还有一个泛型Task<TResul>
类,TResult 表示异步操做执行完成后返回值的类型。
建立一个Task操做,只须要使用静态函数Task.Run()
便可,
Task.Run()是一个.net framework4.5及以上定义的一个默认异步操做,
Task.Run()参数是委托,即须要异步执行的方法,
注意做为Task.Run()的参数的委托都是无参委托,
若Task.Run()参数是无返回值的委托Action
,则Task.Run()返回值是Task
类型
若Task.Run()参数是有返回值的委托Func<TResult>
,则Task.Run()返回值是Task<TResult>
泛型
注意:如果低于.net4.5,则可使用Task.Factory.StartNew()
,和Task.Run()静态方法做用同样
总而言之,言而总之,show you code ,一切皆明了!
示例:无返回值的Task
static void Main(string[] args) { //1.使用Task构造函数建立,必须显式的使用.Start()才能开始执行 //Task task = new Task(() => { Thread.Sleep(10); Console.WriteLine("我是Task ,我结束了"); }); //task.Start(); //2.使用TaskFactory.StartNew(工厂建立) 方法 //Task task = Task.Factory.StartNew(() => { Thread.Sleep(10); Console.WriteLine("我是Task ,我结束了"); }); //3.使用Task.Run() Task task = Task.Run(() => { Thread.Sleep(10); Console.WriteLine("我是Task.Run ,我结束了"); }); if (!task.IsCompleted)//task.IsCompleted判断当前的任务是否已完成 { Console.WriteLine("当前的Task.Run()还没有执行完,可是由于异步,返回到调用函数,因此能够先执行后续的代码"); } Console.WriteLine("当前Task.Run尚未完成,咱们是在他以后的代码可是先执行了"); task.Wait();//强行锁定线程,等待task完成 Console.WriteLine("终于Task.Run完成了工做"); Console.ReadKey(); }
如果Task任务有返回值,返回值类型为Task<T>
,使用返回值的Result
属性查询具体值
调试时注意查看,运行到 Console.WriteLine(task.Result)
的时候,其中Task任务仍是在执行Thread.Sleep(1000)
尚未出结果,咱们但愿的异步执行也没有发生,而是程序是在一直在等待,这是为何呢?
是由于一但执行了task.Result,即便task任务尚未完成,主线程则停下等待,直到等待task.Result出结果
这种状况和异步委托中调用EndInvoke()是同样的:一旦运行EndInvoke,如果引用方法尚未完成,主线程则中止,直到引用函数运行结束。
因此能够这样理解:task.Result能够看做是一个将来结果(必定有结果但还在运算中)
示例:有返回值的Task
static void Main(string[] args) { Console.WriteLine("SomeDoBeforeTask"); Func<int> Do = () => { Thread.Sleep(1000); Console.WriteLine("Task.Run结束"); return 2; }; Task<int> task = Task.Run(Do); Console.WriteLine(task.Status);//使用task.Status查看当前的Task的状态:当前的状态:WaitingToRun Console.WriteLine(task.Result);//使用task.result操做Task任务的返回值:返回值是:2 Console.WriteLine(task.Status);//使用task.Status查看当前的Task的状态:当前的状态:RanToComplation Console.WriteLine("SomeDoAfterTask"); Console.ReadKey(); }
运行结果:
说明:
其中咱们使用task.Result查看当前的task的状态,其中Task的状态(即其生命周期)只有三种:
Task任务是在后台执行的同时,主线程的继续执行后续程序
因此有时候须要在Task结束后,继续执行某个特定的任务,即为Task添加延续任务(也称接续工做)
举一个简单的例子,
求解1-5000能求被3整除的个数,这个过程须要许多时间,我把它定义为一个Task.Run()
咱们须要在求出结果后打印出结果,这里怎么操做呢?
如果直接使用task.Result
则会阻塞主线程,一直等待运算出结果,这显然不是咱们想要的
如果使用while(!task.IsComplation){//后续操做}
,你没法判断Task什么时候结束,并且一旦Task结束则会中断后续操做
这里就是须要为Task加上接续工做
这里你能够明白,接续本质和异步委托中的回调模式是同样的,回调方法就是接续工做
task1.ContinueWith(...task2..)
表示当task1结束后接着运行task2任务
注意这里咱们使用Lambda表达式编写接续工做,接续工做是有一个参数的,参数是Task类型,即上一个Task
即第一个Task完成后自动启动下一个Task,实现Task的延续
注意:ContinueWith()的返回值亦是Task类型对象,即新建立的任务
能够为接续工做task2继续添加接续工做task3
示例5 :
static void Main(string[] args) { Console.WriteLine("task执行前..."); Task<int> task1 = Task.Run(() => Enumerable.Range(1, 5000).Count(n => (n % 3) == 0)); Task task2 = task1.ContinueWith(t => Console.WriteLine($"当你看到这句话则task1结束了,1-5000中能被3整除的个数{t.Result}"));//这里的t就是task1 Task task3 = task2.ContinueWith(t => Console.WriteLine($"当你看到这句话则task2也结束了")); Console.WriteLine($"task1及其接续工做正在执行中," + "\t\n" + "咱们如今正在执行其余的后续代码"); Console.ReadKey(); }
运行结果:
使用task.GetAwaiter()
为相关的task建立一个等待者
示例:
static void Main(string[] args) { Console.WriteLine("task执行前..."); Task<int> task1 = Task.Run(() => Enumerable.Range(1, 5000).Count(n => (n % 3) == 0)); var awaiter = task1.GetAwaiter();//建立一个awaiter对象 //awaiter.OnCompleted(() => Console.WriteLine($"当你看到这句话则task1结束了,1-5000中能被3整除的个{task1.Result}")); awaiter.OnCompleted(() => Console.WriteLine($"当你看到这句话则task1结束了,1-5000中能被3整除的个{awaiter.GetResult()}")); Console.WriteLine($"task1及其接续工做正在执行中," + "\t\n" + "咱们如今正在执行其余的后续代码"); Console.ReadKey(); }
运行效果同上。
ContinueWith会返回Task对象,它很是适合用于增长更多的接续工做,不过,若是Task出错,必须直接处理AggregateException。
使用task.GetAwaiter建立awaiter对象,是在.net4.5以后,其中C#5.0的异步功能就是使用这种方式。
使用awaiter也是可使用task.Result直接的查看任务的结果,可是使用awaiter.GetResult()能够在Task出现异常的时候直接抛出,不会封装在AggregateException中。
延时执行Task
其实就至关于实现Thread.Sleep()的异步版本
如果你使用Thread.Sleep(),则会程序一直在等待(即阻塞线程),直到等待结束才会运行后续的代码
而这里就至关于给给Thread.Sleep()一个加了接续工做,且这个接续工做是异步的。
即便用Task.Delay()不会阻塞主线程,主线程能够继续执行后续代码
示例:
//新建异步任务,30毫秒秒后执行 Task.Delay(30).ContinueWith(c => { for (int i = 0; i < 50; i++) { Console.WriteLine(i + "这是Task在运行"); } }); for (int i = 0; i < 100; i++) { Console.WriteLine(i + "这是Task以后的程序在运行"); }
调试的时候你会发现,刚开始的时候的时候是先显示的"i这是Task以后的程序在运行"
以后在等带了30毫秒,后就会开始显示"i这是Task在运行"和"i这是Task以后的程序在运行"交叉显示
运行结果以下:
示例:运行效果同上
Task.Delay(30).GetAwaiter().OnCompleted(() => { for (int i = 0; i < 50; i++) { Console.WriteLine(i + "这是Awaiter在运行行"); } }); for (int i = 0; i < 100; i++) { Console.WriteLine(i + "这是Awaiter以后的程序在运行行"); } Console.ReadKey();
方法名 | 说明 |
---|---|
Task.Wait | task1.Wait();就是等待任务执行(task1)完成,task1的状态变为Completed |
Task.WaitAll | 待全部的任务都执行完成 |
Task.WaitAny | 发同Task.WaitAll,就是等待任何一个任务完成就继续向下执行 |
CancellationTokenSource | 经过cancellation的tokens来取消一个Task |
异步方法是能够请求终止运行的,
System.Threading.Tasks命名空间中有两个类是为此目的而设计的:Cance1lationToken和CancellationTokenSource。
下面看使用CancellationTokenSource和CancellationToken来实现取消某个异步操做。
这里使用Task.Run()为例,其第一个参数是一个Action委托,第二个参数就是CancellationToken对象
static void Main(string[] args) { CancellationTokenSource cts = new CancellationTokenSource();//生成一个CancellationTokenSource对象,该对象能够建立CancellationToken CancellationToken ct = cts.Token;//获取一个令牌(token) Task.Run(() => { for (int i = 0; i < 20; i++) { if (ct.IsCancellationRequested) { return; } Thread.Sleep(1000); Console.WriteLine($"异步程序的的循环:{i}"); } }, ct);//注意Run()的第二个参数就是终止令牌token for (int i = 0; i < 4; i++) { Thread.Sleep(1000); Console.WriteLine($"主线程中循环:{i}"); } Console.WriteLine("立刻sts.Cancel(),即将要终止异步程序"); cts.Cancel();//含有该CancellationTokenSource的token的异步程序,终止! Console.ReadKey(); }
运行结果:能够发现异步任务Task.Run()尚未完成,可是由于cst.Cancel()运行,token的属性IsCancellationRequested变为true,异步循环结束。
说明:取消一个异步操做的过程,注意,该过程是协同的。
即:调用CancellationTokenSource的Cancel时,它自己并不会执行取消操做。
而是会将CancellationToken的IsCancellationRequested属性设置为true。
包含CancellationToken的代码负责检查该属性,并判断是否须要中止执行并返回。
System.Linq名称空间中有一个ParallelEnumerable
类,该类中的方法能够分解Linq查询的工做,使其分布在多个线程上,即实现并行查询。
为并行运行而设计的LINQ查询称为PLINQ查询。
下面让咱们先简单的理一理:
首先咱们都知道Enumerable
类为IEnumberable<T>
接口扩展了一系列的静态方法。(就是咱们使用Linq方法语法的中用的哪些经常使用的静态方法,自行F12)
正如MSDN中所说:“ParallelEnumberable是Enumberable的并行等效项”,ParallelEnumberable
类则是Enumerable
类的并行版本,
F12查看定义能够看到ParallelEnumerable
类中几乎全部的方法都是对ParallelQuery<TSource>
接口的扩展,
可是,在ParallelEnumberable
类有一个重要的例外,AsParallel()
方法还对IEnumerable<T>
接口的扩展,而且返回的是一个ParallelQuery<TSource>
类型的对象,
因此呢?凡是实现类IEnumberable<T>集合能够经过调用静态方法AsParallel()
,返回一个ParallelQuery
注意在运行PLinq的时候,PLinq会自动的判断若是查询能从并行化中受益,则将同时运行。而若是并行执行查询会损害性能,PLINQ将按顺序运行查询。
示例:求1到50000000中能够整除3的数,将所求的结果倒序存放在modThreeIsZero[]中
这是须要很是多的重复运算,因此咱们能够对比按照通常Linq查询下方式和PLinq查询,对比一些须要的时间。
static void Main(string[] args) { int[] intArray = Enumerable.Range(1, 50000000).ToArray(); Stopwatch sw = new Stopwatch(); //顺序查询 sw.Start(); int[] modThreeIsZero1 = intArray.Select(n => n).Where(n => n % 3 == 0).OrderByDescending(n => n).ToArray(); sw.Stop(); Console.WriteLine($"顺序查询,运行时间:{sw.ElapsedMilliseconds}毫秒,能够整除3的个数:{modThreeIsZero1.Count()}"); //使用AsParallel()实现并行查询 //AsParallel()方法返回ParallelQuery<TSourc>类型对象。由于返回的类型,因此编译器选择的Select()、Where()等方法是ParallelEnumerable.Where(),而不是Enumerable.Where()。 sw.Restart(); int[] modThreeIsZero2 = intArray.AsParallel().Select(n => n).Where(n => n % 3 == 0).OrderByDescending(n => n).ToArray(); sw.Stop(); Console.WriteLine($"并行查询,运行时间:{sw.ElapsedMilliseconds}毫秒,能够整除3的个数:{modThreeIsZero2.Count()}"); Console.ReadKey(); }
说明:AsParallel()方法返回ParallelQuery<TSourc>类型对象。由于返回的类型,因此编译器选择的Select()、Where()等方法是ParallelEnumerable.Where(),而不是Enumerable.Where()。
运行结果:
能够对比结果,在大规模的Linq查询中,同步查询和并行查询二者的运行时间的差距仍是很大的!
可是小规模的Linq查询两者的效果其实并无很明显。
在3.6取消异步操做中解释了如何取消一个长时间的任务,
那么对于长时间运行的PLinq也是能够取消的
一样是使用CancellationTokenSource
生成一个CancellationToken
对象做为token
怎么把token给PLinq呢?使用ParallelQuery<TSource>
中静态方法WithCancellation(token)
在PLinq中,如果取消了并行操做,则会抛出OperationCanceledException
示例:
static void Main(string[] args) { //具体的做用和含义能够看0030取消一个异步操做 CancellationTokenSource cts = new CancellationTokenSource(); CancellationToken ct = cts.Token; int[] intArray = Enumerable.Range(1, 50000000).ToArray(); Task<int[]> task = Task.Run(() => { try { int[] modThreeIsZero = intArray.AsParallel().WithCancellation(ct).Select(n => n).Where(n=> n% 3 == 0).OrderByDescending(n => n).ToArray(); return modThreeIsZero; } catch (OperationCanceledException ex)//一旦PLinq中取消查询就会触发OperationCanceledException异常 { Console.WriteLine(ex.Message);//注意:Message的内容就是:已取消该操做 return null; } }); Console.WriteLine("取消PLinq?Y/N"); string input = Console.ReadLine(); if (input.ToLower().Equals("y")) { cts.Cancel();//取消并行查询 Console.WriteLine("取消了PLinq!");//undone:怎么验证已经真的取消了 } else { Console.WriteLine("Loading……"); Console.WriteLine(task.Result.Count()); } Console.ReadKey(); }
唉,书真是越看越厚,皆是浅尝辄止,先到这里吧!