在并行计算中,不可避免的会碰到多个任务共享变量,实例,集合。虽然task自带了两个方法:task.ContinueWith()和Task.Factory.ContinueWhenAll()来实现任务串行化,可是这些简单的方法远远不能知足咱们实际的开发须要,从.net 4.0开始,类库给咱们提供了不少的类来帮助咱们简化并行计算中复杂的数据同步问题。html
对数据共享问题处理的方式是“分离执行”,咱们经过把每一个Task执行完成后的各自计算的值进行最后的汇总,也就是说多个Task之间不存在数据共享了,各自作各自的事,彻底分离开来。spring
每一个Task执行时不存在数据共享了,每一个Task中计算本身值,最后咱们汇总每一个Task的Result。咱们能够经过Task中传递的state参数来进行隔离执行:数据库
int Sum = 0; Task<int>[] tasks = new Task<int>[10]; for (int i = 0; i < 10; i++) { tasks[i] = new Task<int>((obj) => { var start = (int)obj; for (int j = 0; j < 1000; j++) { start = start + 1; } return start; }, Sum); tasks[i].Start(); } Task.WaitAll(tasks); for (var i = 0; i < 10; i++) { Sum += tasks[i].Result; } Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
在.Net中提供了System.Threading.ThreadLocal来建立分离。编程
ThreadLocal是一种提供线程本地存储的类型,它能够给每一个线程一个分离的实例,来提供每一个线程单独的数据结果。上面的程序咱们可使用TreadLocal:安全
int Sum = 0; Task<int>[] tasks = new Task<int>[10]; var tl = new ThreadLocal<int>(); for (int i = 0; i < 10; i++) { tasks[i] = new Task<int>((obj) => { tl.Value = (int)obj; for (int j = 0; j < 1000; j++) { tl.Value++; } returntl.Value; }, Sum); tasks[i].Start(); } Task.WaitAll(tasks); for (var i = 0; i < 10; i++) { Sum += tasks[i].Result; } Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
可是咱们要注意的一点TreadLocal是针对每一个线程的,不是针对每一个Task的。一个Tread中可能有多个Task。并发
ThreadLocal类举例:ui
static ThreadLocal<string> local; static void Main() { //建立ThreadLocal并提供默认值 local = new ThreadLocal<string>(() => "hehe"); //修改TLS的线程 Thread th = new Thread(() => { local.Value = "Mgen"; Display(); }); th.Start(); th.Join(); Display(); } //显示TLS中数据值 static void Display() { Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, local.Value); }
同步类型是一种用来调度Task访问临界区域的一种特殊类型。在.Net 4.0中提供了多种同步类型给咱们使用,主要分为:轻量级的、重量级的和等待处理型的,在下面咱们会介绍经常使用的同步处理类型。spa
首先来看看.Net 4.0中常见的几种同步类型以及处理的相关问题:操作系统
同步类型以及解决问题.net
其实最简单同步类型的使用办法就是使用lock关键字。在使用lock关键字时,首先咱们须要建立一个锁定的object,并且这个object须要全部的task都能访问,其次能咱们须要将咱们的临界区域包含在lock块中。咱们以前例子中代码能够这样加上lock:
int Sum = 0; Task[] tasks = new Task[10]; var obj = new Object(); for (int i = 0; i < 10; i++) { tasks[i] = new Task(() => { for (int j = 0; j < 1000; j++) { lock (obj) { Sum = Sum + 1; } } }); tasks[i].Start(); } Task.WaitAll(tasks); Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
其实lock关键字是使用Monitor的一种简短的方式,lock关键字自动经过调用Monitor.Enter\Monitor.Exit方法来处理得到锁以及释放锁。
Interlocked经过使用操做系统或则硬件的一些特性提供了一些列高效的静态的同步方法。其中主要提供了这些方法:Exchange、Add、Increment、CompareExchange四种类型的多个方法的重载。咱们将上面的例子中使用Interlocked:
int Sum = 0; Task[] tasks = new Task[10]; for (int i = 0; i < 10; i++) { tasks[i] = new Task(() => { for (int j = 0; j < 1000; j++) { Interlocked.Increment(ref Sum); } }); tasks[i].Start(); } Task.WaitAll(tasks); Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
Mutex也是一个同步类型,在多个线程进行访问的时候,它只向一个线程受权共享数据的独立访问。咱们能够经过Mutex中的WaitOne方法来获取Mutex的全部权,可是同时咱们要注意的是,咱们在一个线程中多少次调用过WaitOne方法,就须要调用多少次ReleaseMutex方法来释放Mutex的占有。上面的例子咱们经过Mutex这样实现:
int Sum = 0; Task[] tasks = new Task[10]; var mutex = new Mutex(); for (int i = 0; i < 10; i++) { tasks[i] = new Task(() => { for (int j = 0; j < 1000; j++) { bool lockAcquired = mutex.WaitOne(); try { Sum++; } finally { if (lockAcquired) mutex.ReleaseMutex(); } } }); tasks[i].Start(); } Task.WaitAll(tasks); Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
咱们能够经过使用Synchronization 特性来标识一个类,从而使一个类型的字段以及方法都实现同步化。在使用Synchronization 时,咱们须要将咱们的目标同步的类继承于System.ContextBoundObject类型。咱们来看看以前的例子咱们同步标识Synchronization 的实现:
static void Main(string[] args) { var sum = new SumClass(); Task[] tasks = new Task[10]; for (int i = 0; i < 10; i++) { tasks[i] = new Task(() => { for (int j = 0; j < 1000; j++) { sum.Increment(); } }); tasks[i].Start(); } Task.WaitAll(tasks); Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, sum.GetSum()); } [Synchronization] class SumClass : ContextBoundObject { private int _Sum; public void Increment() { _Sum++; } public int GetSum() { return _Sum; } }
当多个线程对某个非线程安全容器并发地进行读写操做时,这些操做将致使不可预估的后果或者会致使报错。为了解决这个问题咱们可使用lock关键字或者Monitor类来给容器上锁。但锁的引入使得咱们的代码更加复杂,同时也带来了更多的同步消耗。而.NET Framework 4提供的线程安全且可拓展的并发集合可以使得咱们的并行代码更加容易编写,此外,锁的使用次数的减小也减小了麻烦的死锁与竞争条件的问题。.NET Framework 4主要提供了以下几种并发集合:BlockingCollection,ConcurrentBag,ConcurrentDictionary,ConcurrentQueue,ConcurrentStack。这些集合经过使用一种叫作比较并交换(compare and swap, CAS)指令和内存屏障的技术来避免使用重量级的锁。
在.Net 4.0中提供了不少并发的集合类型来让咱们处理数据同步的集合的问题,这里面包括:
1.ConcurrentQueue:提供并发安全的队列集合,以先进先出的方式进行操做;
2.ConcurrentStack:提供并发安全的堆栈集合,以先进后出的方式进行操做;
3.ConcurrentBag:提供并发安全的一种无序集合;
4.ConcurrentDictionary:提供并发安全的一种key-value类型的集合。
咱们在这里只作ConcurrentQueue的一个尝试,并发队列是一种线程安全的队列集合,咱们能够经过Enqueue()进行排队、TryDequeue()进行出队列操做:
for (var j = 0; j < 10; j++) { var queue = new ConcurrentQueue<int>(); var count = 0; for (var i = 0; i < 1000; i++) { queue.Enqueue(i); } var tasks = new Task[10]; for (var i = 0; i < tasks.Length; i++) { tasks[i] = new Task(() => { while (queue.Count > 0) { int item; var isDequeue = queue.TryDequeue(out item); if (isDequeue) Interlocked.Increment(ref count); } }); tasks[i].Start(); } try { Task.WaitAll(tasks); } catch (AggregateException e) { e.Handle((ex) => { Console.WriteLine("Exception Message:{0}", ex.Message); return true; }); } Console.WriteLine("Dequeue items count :{0}", count); }
barrier叫作屏障,就像下图中的“红色线”,若是咱们的屏障设为4个task就认为已经满了的话,那么执行中先到的task必须等待后到的task,通知方式也就是barrier.SignalAndWait(),屏障中线程设置操做为new Barrier(4,(i)=>{})。SignalAndWait给咱们提供了超时的重载,为了可以取消后续执行
//四个task执行 static Task[] tasks = new Task[4]; static Barrier barrier = null; static void Main(string[] args) { barrier = new Barrier(tasks.Length, (i) => { Console.WriteLine("**********************************************************"); Console.WriteLine("\n屏障中当前阶段编号:{0}\n", i.CurrentPhaseNumber); Console.WriteLine("**********************************************************"); }); for (int j = 0; j < tasks.Length; j++) { tasks[j] = Task.Factory.StartNew((obj) => { var single = Convert.ToInt32(obj); LoadUser(single); barrier.SignalAndWait(); LoadProduct(single); barrier.SignalAndWait(); LoadOrder(single); barrier.SignalAndWait(); }, j); } Task.WaitAll(tasks); Console.WriteLine("指定数据库中全部数据已经加载完毕!"); Console.Read(); } static void LoadUser(int num) { Console.WriteLine("当前任务:{0}正在加载User部分数据!", num); } static void LoadProduct(int num) { Console.WriteLine("当前任务:{0}正在加载Product部分数据!", num); } static void LoadOrder(int num) { Console.WriteLine("当前任务:{0}正在加载Order部分数据!", num); }