8天玩转并行开发——第一天 Parallel的使用

      随着多核时代的到来,并行开发愈来愈展现出它的强大威力,像咱们这样的码农不再用过多的关注底层线程的实现和手工控制,安全

要了解并行开发,须要先了解下两个概念:“硬件线程”和“软件线程”。服务器

 

1. 硬件线程oop

    相信你们手头的电脑都是双核以上的,像我这样古董的电脑都是双核的,这样的双核叫作物理内核。性能

 

硬件线程又叫作逻辑内核,咱们能够在”任务管理器“中查看”性能“标签页,以下图,咱们知道有2个硬件线程。spa

 

 

通常状况下,一个物理内核对应一个逻辑内核,好比我这里的2对2。固然若是你的cpu采用的是超线程技术,那么可能就会有4个物理内核对应.net

8个硬件线程,如今有不少服务器都有8个硬件线程,上午在公司的服务器上截了个图。pwa

咱们要知道并行开发要作的事情就是将任务分摊给这些硬件线程去并行执行来达到负载和加速。线程

 

2. 软件线程3d

    相信这个你们最熟悉了,咱们知道传统的代码都是串行的,就一个主线程,当咱们为了实现加速而开了不少工做线程,这些工做线程code

也就是软件线程。

 

好,咱们知道了基本概念就ok了,在.net 4.0中,微软给咱们提供了一个新的命名空间:System.Threading.Tasks。这里面有不少好玩

的东西,做为第一篇就介绍下最基础,最简单的Parallel的使用。

 

 

一: Parallel的使用

在Parallel下面有三个经常使用的方法invoke,for和forEach。

1:  Parallel.Invoke

    这是最简单,最简洁的将串行的代码并行化。

复制代码
 1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var watch = Stopwatch.StartNew();
6
7 watch.Start();
8
9 Run1();
10
11 Run2();
12
13 Console.WriteLine("我是串行开发,总共耗时:{0}\n", watch.ElapsedMilliseconds);
14
15 watch.Restart();
16
17 Parallel.Invoke(Run1, Run2);
18
19 watch.Stop();
20
21 Console.WriteLine("我是并行开发,总共耗时:{0}", watch.ElapsedMilliseconds);
22
23 Console.Read();
24 }
25
26 static void Run1()
27 {
28 Console.WriteLine("我是任务一,我跑了3s");
29 Thread.Sleep(3000);
30 }
31
32 static void Run2()
33 {
34 Console.WriteLine("我是任务二,我跑了5s");
35 Thread.Sleep(5000);
36 }
37 }
复制代码

在这个例子中能够获取二点信息:

第一:一个任务是能够分解成多个任务,采用分而治之的思想。

第二:尽量的避免子任务之间的依赖性,由于子任务是并行执行,因此就没有谁必定在前,谁必定在后的规定了。

 

2:Parallel.for

 咱们知道串行代码中也有一个for,可是那个for并无用到多核,而Paraller.for它会在底层根据硬件线程的运行情况来充分的使用全部的可

利用的硬件线程,注意这里的Parallel.for的步行是1。

这里咱们来演示一下,向一个线程安全的集合插入数据,固然这个集合采用原子性来实现线程同步,比那些重量级的锁机制更加的节省消耗。

复制代码
 1  class Program
2 {
3 static void Main(string[] args)
4 {
5 for (int j = 1; j < 4; j++)
6 {
7 Console.WriteLine("\n第{0}次比较", j);
8
9 ConcurrentBag<int> bag = new ConcurrentBag<int>();
10
11 var watch = Stopwatch.StartNew();
12
13 watch.Start();
14
15 for (int i = 0; i < 20000000; i++)
16 {
17 bag.Add(i);
18 }
19
20 Console.WriteLine("串行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
21
22 GC.Collect();
23
24 bag = new ConcurrentBag<int>();
25
26 watch = Stopwatch.StartNew();
27
28 watch.Start();
29
30 Parallel.For(0, 20000000, i =>
31 {
32 bag.Add(i);
33 });
34
35 Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);
36
37 GC.Collect();
38
39 }
40 }
41 }
复制代码

 

能够看的出,加速的效果仍是比较明显的。

 

3:Parallel.forEach
    forEach的独到之处就是能够将数据进行分区,每个小区内实现串行计算,分区采用Partitioner.Create实现。

复制代码
 class Program
{
static void Main(string[] args)
{
for (int j = 1; j < 4; j++)
{
Console.WriteLine("\n第{0}次比较", j);

ConcurrentBag<int> bag = new ConcurrentBag<int>();

var watch = Stopwatch.StartNew();

watch.Start();

for (int i = 0; i < 3000000; i++)
{
bag.Add(i);
}

Console.WriteLine("串行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);

GC.Collect();

bag = new ConcurrentBag<int>();

watch = Stopwatch.StartNew();

watch.Start();

Parallel.ForEach(Partitioner.Create(0, 3000000), i =>
{
for (int m = i.Item1; m < i.Item2; m++)
{
bag.Add(m);
}
});

Console.WriteLine("并行计算:集合有:{0},总共耗时:{1}", bag.Count, watch.ElapsedMilliseconds);

GC.Collect();

}
}
}
复制代码

这里仍是要说一下:Partitioner.Create(0, 3000000)。

第一:咱们要分区的范围是0-3000000。

第二:咱们确定想知道系统给咱们分了几个区? 很遗憾,这是系统内部协调的,无权告诉咱们,固然系统也不反对咱们本身指定分区个数,

        这里可使用Partitioner.Create的第六个重载,好比这样:Partitioner.Create(0, 3000000, Environment.ProcessorCount),

        由于 Environment.ProcessorCount可以获取到当前的硬件线程数,因此这里也就开了2个区。

 

下面分享下并行计算中咱们可能有的疑惑?

<1> 如何中途退出并行循环?

      是的,在串行代码中咱们break一下就搞定了,可是并行就不是这么简单了,不过不要紧,在并行循环的委托参数中提供了一个

ParallelLoopState,该实例提供了Break和Stop方法来帮咱们实现。

Break: 固然这个是通知并行计算尽快的退出循环,好比并行计算正在迭代100,那么break后程序还会迭代全部小于100的。

Stop:这个就不同了,好比正在迭代100忽然遇到stop,那它啥也无论了,直接退出。

 

下面举个例子,当迭代到1000的时候退出循环

复制代码
 1   class Program
2 {
3 static void Main(string[] args)
4 {
5 var watch = Stopwatch.StartNew();
6
7 watch.Start();
8
9 ConcurrentBag<int> bag = new ConcurrentBag<int>();
10
11 Parallel.For(0, 20000000, (i, state) =>
12 {
13 if (bag.Count == 1000)
14 {
15 state.Break();
16 return;
17 }
18 bag.Add(i);
19 });
20
21 Console.WriteLine("当前集合有{0}个元素。", bag.Count);
22
23 }
24 }
复制代码

 

<2> 并行计算中抛出异常怎么处理?

 首先任务是并行计算的,处理过程当中可能会产生n多的异常,那么如何来获取到这些异常呢?普通的Exception并不能获取到异常,然而为并行诞生的AggregateExcepation就能够获取到一组异常。

复制代码
class Program
{
static void Main(string[] args)
{
try
{
Parallel.Invoke(Run1, Run2);
}
catch (AggregateException ex)
{
foreach (var single in ex.InnerExceptions)
{
Console.WriteLine(single.Message);
}
}

Console.Read();
}

static void Run1()
{
Thread.Sleep(3000);
throw new Exception("我是任务1抛出的异常");
}

static void Run2()
{
Thread.Sleep(5000);

throw new Exception("我是任务2抛出的异常");
}
}
复制代码

 

<3> 并行计算中我能够留一个硬件线程出来吗?

  默认的状况下,底层机制会尽量多的使用硬件线程,然而咱们使用手动指定的好处是咱们能够在2,4,8个硬件线程的状况下来进行测量加速比。

复制代码
 class Program
{
static void Main(string[] args)
{
var bag = new ConcurrentBag<int>();

ParallelOptions options = new ParallelOptions();

//指定使用的硬件线程数为1
options.MaxDegreeOfParallelism = 1;

Parallel.For(0, 300000, options, i =>
{
bag.Add(i);
});

Console.WriteLine("并行计算:集合有:{0}", bag.Count);

}
}
复制代码

 

————————————————————————————————————————————————————————————

————————————————————————————————————————————————————————————

友情提示:若是不喜欢看文章,能够移步本系列的  C#IL解读完整视频 【一把伞的钱哦

相关文章
相关标签/搜索