菜鸟学习并行编程,参考《C#并行编程高级教程.PDF》,若有错误,欢迎指正。html
C#并行编程-Task负载均衡
任务简介学习
TPL引入新的基于任务的编程模型,经过这种编程模型能够发挥多核的功效,提高应用程序的性能,不须要编写底层复杂且重量级的线程代码。编码
但须要注意:任务并非线程(任务运行的时候须要使用线程,但并非说任务取代了线程,任务代码是使用底层的线程(软件线程,调度在特定的硬件线程或逻辑内核上)运行的,任务与线程之间并无一对一的关系。)spa
建立一个新的任务时,调度器(调度器依赖于底层的线程池引擎)会使用工做窃取队列找到一个最合适的线程,而后将任务加入队列,任务所包含的代码会在一个线程中运行。如图:
System.Threading.Tasks.Task
一个Task表示一个异步操做,Task提供了不少方法和属性,经过这些方法和属性可以对Task的执行进行控制,而且可以得到其状态信息。
Task的建立和执行都是独立的,所以能够对关联操做的执行拥有彻底的控制权。
使用Parallel.For、Parallel.ForEach的循环迭代的并行执行,TPL会在后台建立System.Threading.Tasks.Task的实例。
使用Parallel.Invoke时,TPL也会建立与调用的委托数目一致的System.Threading.Tasks.Task的实例。
注意项
程序中添加不少异步的操做做为Task实例加载的时候,为了充分利用运行时全部可用的逻辑内核,任务调度器会尝试的并行的运行这些任务,也会尝试在全部的可用内核上对工做进行负载均衡。
但在实际的编码过程中,并非全部的代码片断都可以方便的用任务来运行,由于任务会带来额外的开销,尽管这种开销比添加线程所带来的开销要小,可是仍然须要将这个开销考虑在内。
Task状态与生命周期
一个Task实例只会完成其生命周期一次,当Task到达它的3种肯呢过的最终状态之一是,就没法回到以前的任何状态
下面贴代码,详解见注释,方便你们理解Task的状态:
class Program { /* coder:释迦苦僧 */ static void Main(string[] args) { /* 建立一个任务 不调用 不执行 状态为Created */ Task tk = new Task(() => { }); Console.WriteLine(tk.Status.ToString()); /* 建立一个任务 执行 状态为 WaitingToRun */ Task tk1 = new Task(() => { }); tk1.Start();/*对于安排好的任务,就算调用Start方法也不会立马启动 此时任务的状态为WaitingToRun*/ Console.WriteLine(tk1.Status.ToString()); /* 建立一个主任务 */ Task mainTask = new Task(() => { SpinWait.SpinUntil(() => { return false; }, 30000); }); /* 将子任务加入到主任务完成以后执行 */ Task subTask = mainTask.ContinueWith((t1) => { }); /* 启动主任务 */ mainTask.Start(); /* 此时子任务状态为 WaitingForActivation */ Console.WriteLine(subTask.Status.ToString()); /* 建立一个任务 执行 后 等待一段时间 并行未结束的状况下 状态为 Running */ Task tk2 = new Task(() => { SpinWait.SpinUntil(() => false, 30000); }); tk2.Start();/*对于安排好的任务,就算调用Start方法也不会立马启动*/ SpinWait.SpinUntil(() => false, 300); Console.WriteLine(tk2.Status.ToString()); /* 建立一个任务 而后取消该任务 状态为Canceled */ CancellationTokenSource cts = new CancellationTokenSource(); Task tk3 = new Task(() => { for (int i = 0; i < int.MaxValue; i++) { if (!cts.Token.IsCancellationRequested) { cts.Token.ThrowIfCancellationRequested(); } } }, cts.Token); tk3.Start();/*启动任务*/ SpinWait.SpinUntil(() => false, 100); cts.Cancel();/*取消该任务执行 但并不是立马取消 因此对于Canceled状态也不会立马生效*/ SpinWait.SpinUntil(() => false, 1000); Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled); SpinWait.SpinUntil(() => false, 1000); Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled); SpinWait.SpinUntil(() => false, 1000); Console.WriteLine(tk3.Status.ToString() + " " + tk3.IsCanceled); /*建立一个任务 让它成功的运行完成 会获得 RanToCompletion 状态*/ Task tk4 = new Task(() => { SpinWait.SpinUntil(() => false, 10); }); tk4.Start(); SpinWait.SpinUntil(() => false, 300); Console.WriteLine(tk4.Status.ToString()); /*建立一个任务 让它运行失败 会获得 Faulted 状态*/ Task tk5 = new Task(() => { throw new Exception(); }); tk5.Start(); SpinWait.SpinUntil(() => false, 300); Console.WriteLine(tk5.Status.ToString()); Console.ReadLine(); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
使用任务来对代码进行并行化
使用Parallel.Invoke能够并行加载多个方法,使用Task实例也能完成一样的工做,下面贴代码:
class Program { private static ConcurrentQueue<Product> queue = null; /* coder:释迦苦僧 */ static void Main(string[] args) { queue = new ConcurrentQueue<Product>(); Task tk1 = new Task(() => { SetProduct(1); SetProduct(3);}); Task tk2 = new Task(() => SetProduct(2)); tk1.Start(); tk2.Start(); Console.ReadLine(); } static void SetProduct(int index) { Parallel.For(0, 10000, (i) => { Product model = new Product(); model.Name = "Name" + i; model.SellPrice = i; model.Category = "Category" + i; queue.Enqueue(model); }); Console.WriteLine("SetProduct {0} 执行完成", index); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
等待任务完成Task.WaitAll
Task.WaitAll 方法,这个方法是同步执行的,在Task做为参数被接受,全部Task结束其执行前,主线程不会继续执行下一条指令,下面贴代码
class Program { private static ConcurrentQueue<Product> queue = null; /* coder:释迦苦僧 */ static void Main(string[] args) { queue = new ConcurrentQueue<Product>(); Task tk1 = new Task(() => { SetProduct(1); SetProduct(3); }); Task tk2 = new Task(() => SetProduct(2)); tk1.Start(); tk2.Start(); /*等待任务执行完成后再输出 ====== */ Task.WaitAll(tk1, tk2); Console.WriteLine("等待任务执行完成后再输出 ======"); Task tk3 = new Task(() => { SetProduct(1); SetProduct(3); }); Task tk4 = new Task(() => SetProduct(2)); tk3.Start(); tk4.Start(); /*等待任务执行前输出 ====== */ Console.WriteLine("等待任务执行前输出 ======"); Task.WaitAll(tk3, tk4); Console.ReadLine(); } static void SetProduct(int index) { Parallel.For(0, 10000, (i) => { Product model = new Product(); model.Name = "Name" + i; model.SellPrice = i; model.Category = "Category" + i; queue.Enqueue(model); }); Console.WriteLine("SetProduct {0} 执行完成", index); } }
Task.WaitAll 限定等待时长
queue = new ConcurrentQueue<Product>(); Task tk1 = new Task(() => { SetProduct(1); SetProduct(3);}); Task tk2 = new Task(() => SetProduct(2)); tk1.Start(); tk2.Start(); /*若是tk1 tk2 没能在10毫秒内完成 则输出 ***** */ if (!Task.WaitAll(new Task[] { tk1, tk2 }, 10)) { Console.WriteLine("******"); } Console.ReadLine();
如图10毫秒没有完成任务,则输出了****
经过取消标记取消任务
经过取消标记来中断Task实例的执行。 CancellationTokenSource,CancellationToken下的IsCanceled属性标志当前是否已经被取消,取消任务,任务也不必定会立刻取消,下面贴代码:
class Program { private static ConcurrentQueue<Product> queue = null; /* coder:释迦苦僧 */ static void Main(string[] args) { queue = new ConcurrentQueue<Product>(); System.Threading.CancellationTokenSource token = new CancellationTokenSource(); Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token)); Task tk2 = Task.Factory.StartNew(() => SetProduct(token.Token)); Thread.Sleep(10); /*取消任务操做*/ token.Cancel(); try { /*等待完成*/ Task.WaitAll(new Task[] { tk1, tk2 }); } catch (AggregateException ex) { /*若是当前的任务正在被取消,那么还会抛出一个TaskCanceledException异常,这个异常包含在AggregateException异常中*/ Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled); Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled); } Thread.Sleep(2000); Console.WriteLine("tk1 Canceled:{0}", tk1.IsCanceled); Console.WriteLine("tk1 Canceled:{0}", tk2.IsCanceled); Console.ReadLine(); } static void SetProduct(System.Threading.CancellationToken ct) { /* 每一次循环迭代,都会有新的代码调用 ThrowIfCancellationRequested * 这行代码可以对 OpreationCanceledException 异常进行观察 * 而且这个异常的标记与Task实例关联的那个标记进行比较,若是二者相同 ,并且IsCancelled属性为True,那么Task实例就知道存在一个要求取消的请求,而且会将状态转变为Canceled状态,中断任务执行。 * 若是当前的任务正在被取消,那么还会抛出一个TaskCanceledException异常,这个异常包含在AggregateException异常中 /*检查取消标记*/ ct.ThrowIfCancellationRequested(); for (int i = 0; i < 50000; i++) { Product model = new Product(); model.Name = "Name" + i; model.SellPrice = i; model.Category = "Category" + i; queue.Enqueue(model); ct.ThrowIfCancellationRequested(); } Console.WriteLine("SetProduct 执行完成"); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
Task异常处理 当不少任务并行运行的时候,可能会并行发生不少异常。Task实例可以处理一组一组的异常,这些异常有System.AggregateException类处理
class Program { private static ConcurrentQueue<Product> queue = null; /* coder:释迦苦僧 */ static void Main(string[] args) { queue = new ConcurrentQueue<Product>(); System.Threading.CancellationTokenSource token = new CancellationTokenSource(); Task tk1 = Task.Factory.StartNew(() => SetProduct(token.Token)); Thread.Sleep(2000); if (tk1.IsFaulted) { /* 循环输出异常 */ foreach (Exception ex in tk1.Exception.InnerExceptions) { Console.WriteLine("tk1 Exception:{0}", ex.Message); } } Console.ReadLine(); } static void SetProduct(System.Threading.CancellationToken ct) { for (int i = 0; i < 5; i++) { throw new Exception(string.Format("Exception Index {0}", i)); } Console.WriteLine("SetProduct 执行完成"); } }
Task返回值 Task<TResult>
class Program { /* coder:释迦苦僧 */ static void Main(string[] args) { Task<List<Product>> tk1 = Task<List<Product>>.Factory.StartNew(() => SetProduct()); Task.WaitAll(tk1); Console.WriteLine(tk1.Result.Count); Console.WriteLine(tk1.Result[0].Name); Console.ReadLine(); } static List<Product> SetProduct() { List<Product> result = new List<Product>(); for (int i = 0; i < 500; i++) { Product model = new Product(); model.Name = "Name" + i; model.SellPrice = i; model.Category = "Category" + i; result.Add(model); } Console.WriteLine("SetProduct 执行完成"); return result; } }
经过延续串联多个任务
ContinueWith:建立一个目标Task完成时,异步执行的延续程序,await,如代码所示:
class Program { /* coder:释迦苦僧 */ static void Main(string[] args) { /*建立任务t1*/ Task t1 = Task.Factory.StartNew(() => { Console.WriteLine("执行 t1 任务"); SpinWait.SpinUntil(() => { return false; }, 2000); }); /*建立任务t2 t2任务的执行 依赖与t1任务的执行完成*/ Task t2 = t1.ContinueWith((t) => { Console.WriteLine("执行 t2 任务"); SpinWait.SpinUntil(() => { return false; }, 2000); }); /*建立任务t3 t3任务的执行 依赖与t2任务的执行完成*/ Task t3 = t2.ContinueWith((t) => { Console.WriteLine("执行 t3 任务"); }); Console.ReadLine(); } }
TaskContinuationOptions
TaskContinuationOptions参数,能够控制延续另外一个任的任务调度和执行的可选行为。下面看代码:
class Program { /* coder:释迦苦僧 */ static void Main(string[] args) { /*建立任务t1*/ Task t1 = Task.Factory.StartNew(() => { Console.WriteLine("执行 t1 任务"); SpinWait.SpinUntil(() => { return false; }, 2000); throw new Exception("异常"); }); /*建立任务t2 t2任务的执行 依赖与t1任务的执行完成*/ Task t2 = t1.ContinueWith((t) => { Console.WriteLine(t.Status); Console.WriteLine("执行 t2 任务"); SpinWait.SpinUntil(() => { return false; }, 2000); /*定义 TaskContinuationOptions 行为为 NotOnFaulted 在 t1 任务抛出异常后,t1 的任务状态为 Faulted , 则t2 不会执行里面的方法 可是须要注意的是t3任务*/ /*t2在不符合条件时 返回Canceled状态状态让t3任务执行*/ }, TaskContinuationOptions.NotOnFaulted); /*建立任务t3 t3任务的执行 依赖与t2任务的执行完成*/ /*t2在不符合条件时 返回Canceled状态状态让t3任务执行*/ Task t3 = t2.ContinueWith((t) => { Console.WriteLine(t.Status); Console.WriteLine("执行 t3 任务"); }); Console.ReadLine(); } }
TaskContinuationOptions 属性有不少,以下所示
关于并行编程中的Task就写到这,若有问题,请指正。
做者:释迦苦僧 出处:http://www.cnblogs.com/woxpp/p/3928788.html本文版权归做者和博客园共有,欢迎转载,但未经做者赞成必须保留此段声明,且在文章页面明显位置给出原文链接。