原做者: Pamela Vagata, Parallel Computing Platform Group, Microsoft Corporationweb
原文pdf:http://download.csdn.NET/detail/sqlchen/7509513算法
====================================================================sql
当须要为多核机器进行优化的时候,最好先检查下你的程序是否有处理可以分割开来进行并行处理。(例如,有一个巨大的数据集合,其中的元素须要一个一个进行彼此独立的耗时计算)。编程
.net framework 4 中提供了 Parallel.ForEach 和 PLINQ 来帮助咱们进行并行处理,本文探讨这二者的差异及适用的场景。数组
Parallel.ForEach 是 foreach 的多线程实现,他们都能对 IEnumerable<T> 类型对象进行遍历,Parallel.ForEach 的特殊之处在于它使用多线程来执行循环体内的代码段。缓存
Parallel.ForEach 最经常使用的形式以下:服务器
public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource> body)
PLINQ 也是一种对数据进行并行处理的编程模型,它经过 LINQ 的语法来实现相似 Parallel.ForEach 的多线程并行处理。
数据结构
示例代码:多线程
public static void IndependentAction(IEnumerable<T> source, Action<T> action) { Parallel.ForEach(source, element => action(element)); }
理由:oop
1. 虽然 PLINQ 也提供了一个相似的 ForAll 接口,但它对于简单的独立操做过重量化了。
2. 使用 Parallel.ForEach 你还可以设定 ParallelOptions.MaxDegreeOfParalelism 参数(指定最多须要多少个线程),这样当 ThreadPool 资源匮乏(甚至当可用线程数<MaxDegreeOfParalelism)的时候, Parallel.ForEach 依然可以顺利运行,而且当后续有更多可用线程出现时,Parallel.ForEach 也能及时地利用这些线程。PLINQ 只能经过WithDegreeOfParallelism 方法来要求固定的线程数,即:要求了几个就是几个,不会多也不会少。
当输出的数据序列须要保持原始的顺序时采用 PLINQ 的 AsOrdered 方法很是简单高效。
示例代码:
public static void GrayscaleTransformation(IEnumerable<Frame> Movie) { var ProcessedMovie = Movie .AsParallel() .AsOrdered() .Select(frame => ConvertToGrayscale(frame)); foreach (var grayscaleFrame in ProcessedMovie) { // Movie frames will be evaluated lazily } }
理由:
1. Parallel.ForEach 实现起来须要绕一些弯路,首先你须要使用如下的重载在方法:
public static ParallelLoopResult ForEach<TSource >( IEnumerable<TSource> source, Action<TSource, ParallelLoopState, Int64> body)
这个重载的 Action 多包含了 index 参数,这样你在输出的时候就能利用这个值来维持原先的序列顺序。请看下面的例子:
public static double [] PairwiseMultiply(double[] v1, double[] v2) { var length = Math.Min(v1.Length, v2.Lenth); double[] result = new double[length]; Parallel.ForEach(v1, (element, loopstate, elementIndex) => result[elementIndex] = element * v2[elementIndex]); return result; }
你可能已经意识到这里有个明显的问题:咱们使用了固定长度的数组。若是传入的是 IEnumerable 那么你有4个解决方案:
(1) 调用 IEnumerable.Count() 来获取数据长度,而后用这个值实例化一个固定长度的数组,而后使用上例的代码。
(2) The second option would be to materialize the original collection before using it; in the event that your input data set is prohibitively large, neither of the first two options will be feasible.(没看懂贴原文)
(3) 第三种方式是采用返回一个哈希集合的方式,这种方式下一般须要至少2倍于传入数据的内存,因此处理大数据时请慎用。
(4) 本身实现排序算法(保证传入数据与传出数据通过排序后次序一致)
2. 相比之下 PLINQ 的 AsOrdered 方法如此简单,并且该方法能处理流式的数据,从而容许传入数据是延迟实现的(lazy materialized)
PLINQ 能输出流数据,这个特性在一下场合很是有用:
1. 结果集不须要是一个完整的处理完毕的数组,即:任什么时候间点下内存中仅保持数组中的部分信息
2. 你可以在一个单线程上遍历输出结果(就好像他们已经存在/处理完了)
示例:
public static void AnalyzeStocks(IEnumerable<Stock> Stocks) { var StockRiskPortfolio = Stocks .AsParallel() .AsOrdered() .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)}) .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk)); foreach (var stockRisk in StockRiskPortfolio) { SomeStockComputation(stockRisk.Risk); // StockRiskPortfolio will be a stream of results } }
这里使用一个单线程的 foreach 来对 PLINQ 的输出进行后续处理,一般状况下 foreach 不须要等待 PLINQ 处理完全部数据就能开始运做。
PLINQ 也容许指定输出缓存的方式,具体可参照 PLINQ 的 WithMergeOptions 方法,及 ParallelMergeOptions 枚举
PLINQ 的 Zip 方法提供了同时遍历两个集合并进行结合元算的方法,而且它能够与其余查询处理操做结合,实现很是复杂的机能。
示例:
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b) { return a .AsParallel() .AsOrdered() .Select(element => ExpensiveComputation(element)) .Zip( b .AsParallel() .AsOrdered() .Select(element => DifferentExpensiveComputation(element)), (a_element, b_element) => Combine(a_element,b_element)); }
示例中的两个数据源可以并行处理,当双方都有一个可用元素时提供给 Zip 进行后续处理(Combine)。
Parallel.ForEach 也能实现相似的 Zip 处理:
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b) { var numElements = Math.Min(a.Count(), b.Count()); var result = new T[numElements]; Parallel.ForEach(a, (element, loopstate, index) => { var a_element = ExpensiveComputation(element); var b_element = DifferentExpensiveComputation(b.ElementAt(index)); result[index] = Combine(a_element, b_element); }); return result; }
固然使用 Parallel.ForEach 后你就得本身确认是否要维持原始序列,而且要注意数组越界访问的问题。
Parallel.ForEach 提供了一个线程局部变量的重载,定义以下:
public static ParallelLoopResult ForEach<TSource, TLocal>( IEnumerable<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal,TLocal> body, Action<TLocal> localFinally)
使用的示例:
public static List<R> Filtering<T,R>(IEnumerable<T> source) { var results = new List<R>(); using (SemaphoreSlim sem = new SemaphoreSlim(1)) { Parallel.ForEach(source, () => new List<R>(), (element, loopstate, localStorage) => { bool filter = filterFunction(element); if (filter) localStorage.Add(element); return localStorage; }, (finalStorage) => { lock(myLock) { results.AddRange(finalStorage) }; }); } return results; }
线程局部变量有什么优点呢?请看下面的例子(一个网页抓取程序):
public static void UnsafeDownloadUrls () { WebClient webclient = new WebClient(); Parallel.ForEach(urls, (url,loopstate,index) => { webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
一般初版代码是这么写的,可是运行时会报错“System.NotSupportedException -> WebClient does not support concurrent I/O operations.”。这是由于多个线程没法同时访问同一个 WebClient 对象。因此咱们会把 WebClient 对象定义到线程中来:
public static void BAD_DownloadUrls () { Parallel.ForEach(urls, (url,loopstate,index) => { WebClient webclient = new WebClient(); webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
修改以后依然有问题,由于你的机器不是服务器,大量实例化的 WebClient 迅速达到你机器容许的虚拟链接上限数。线程局部变量能够解决这个问题:
public static void downloadUrlsSafe() { Parallel.ForEach(urls, () => new WebClient(), (url, loopstate, index, webclient) => { webclient.DownloadFile(url, filenames[index]+".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); return webclient; }, (webclient) => { }); }
这样的写法保证了咱们能得到足够的 WebClient 实例,同时这些 WebClient 实例彼此隔离仅仅属于各自关联的线程。
虽然 PLINQ 提供了 ThreadLocal<T> 对象来实现相似的功能:
public static void downloadUrl() { var webclient = new ThreadLocal<WebClient>(()=> new WebClient ()); var res = urls .AsParallel() .ForAll( url => { webclient.Value.DownloadFile(url, host[url] +".dat")); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
可是请注意:ThreadLocal<T> 相对而言开销更大!
Parallel.ForEach 有个重载声明以下,其中包含一个 ParallelLoopState 对象:
public static ParallelLoopResult ForEach<TSource >( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
ParallelLoopState.Stop() 提供了退出循环的方法,这种方式要比其余两种方法更快。这个方法通知循环不要再启动执行新的迭代,并尽量快的推出循环。
ParallelLoopState.IsStopped 属性可用来断定其余迭代是否调用了 Stop 方法。
示例:
public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var matchFound = false; Parallel.ForEach(TSpace, (curValue, loopstate) => { if (curValue.Equals(match) ) { matchFound = true; loopstate.Stop(); } }); return matchFound; }
ParallelLoopState.Break() 通知循环继续执行本元素前的迭代,但不执行本元素以后的迭代。最前调用 Break 的起做用,并被记录到 ParallelLoopState.LowestBreakIteration 属性中。这种处理方式一般被应用在一个有序的查找处理中,好比你有一个排序过的数组,你想在其中查找匹配元素的最小 index,那么可使用如下的代码:
public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var loopResult = Parallel.ForEach(source, (curValue, loopState, curIndex) => { if (curValue.Equals(match)) { loopState.Break(); } }); var matchedIndex = loopResult.LowestBreakIteration; return matchedIndex.HasValue ? matchedIndex : -1; }
虽然 PLINQ 也提供了退出的机制(cancellation token),但相对来讲退出的时机并无 Parallel.ForEach 那么及时。