并发集合数组
1 为何使用并发集合?安全
缘由主要有如下几点:并发
- System.Collections和System.Collections.Generic名称空间中所提供的经典列表、集合和数组都不是线程安全的,若无同步机制,他们不适合于接受并发的指令来添加和删除元素。
- 在并发代码中使用上述经典集合须要复杂的同步管理,使用起来很不方便。
- 使用复杂的同步机制会大大下降性能。
- NET Framework 4所提供的新的集合尽量地减小须要使用锁的次数。这些新的集合经过使用比较并交换(compare-and-swap,CAS)指令和内存屏障,避免使用互斥的重量级锁。这对性能有保障。
注意:函数
与经典集合相比,并发集合会有更大的开销,所以在串行代码中使用并发集合无心义,只会增长额外的开销且运行速度比访问经典集合慢。性能
2 并发集合spa
1)ConcurrentQueue:线程安全的先进先出 (FIFO) 集合线程
主要方法:3d
- Enqueue(T item);将对象添加到集合结尾。
- TryDequeue(out T result); 尝试移除并返回位于集合开始处的对象,返回值表示操做是否成功。
- TryPeek(out T result);尝试返回集合开始处的对象,但不将其移除,返回值表示操做是否成功。
说明:code
- ConcurrentQueue是彻底无锁的,但当CAS操做失败且面临资源争用时,它可能会自旋而且重试操做。
- ConcurrentQueue是FIFO集合,某些和出入顺序无关的场合,尽可能不要用ConcurrentQueue。
2)ConcurrentStack:线程安全的后进先出 (LIFO) 集合orm
主要方法及属性:
- Push(T item);将对象插入集合的顶部。
- TryPop(out T result);尝试弹出并返回集合顶部的对象,返回值表示操做是否成功。
- TryPeek(out T result);尝试返回集合开始处的对象,但不将其移除,返回值表示操做是否成功。
- IsEmpty { get; }指示集合是否为空。
- PushRange(T[] items);将多个对象插入集合的顶部。
- TryPopRange(T[] items);弹出顶部多个元素,返回结果为弹出元素个数。
说明:
- 与ConcurrentQueue类似地,ConcurrentStack彻底无锁的,但当CAS操做失败且面临资源争用时,它可能会自旋而且重试操做。
- 获取集合是否包含元素使用IsEmpty属性,而不是经过判断Count属性是否大于零。调用Count比调用IsEmpty开销大。
- 使用PushRange(T[] items)和TryPopRange(T[] items)时注意缓冲引发的额外开销和额外的内存消耗。
3) ConcurrentBag:元素可重复的无序集合
主要方法及属性:
- TryPeek(out T result);尝试从集合返回一个对象,但不移除该对象,返回值表示是否成功得到该对象。
- TryTake(out T result);尝试从集合返回一个对象并移除该对象,返回值表示是否成功得到该对象。
- Add(T item);将对象添加到集合中。
- IsEmpty { get; }解释同ConcurrentStack
说明:
- ConcurrentBag为每个访问集合的线程维护了一个本地队列,在可能的状况下,它会以无锁的方式访问本地队列。
- ConcurrentBag在同一个线程添加和删除元素的场合下效率很是高。
- 由于ConcurrentBag有时会须要锁,在生产者线程和消费者线程彻底分开的场景下效率很是低。
- ConcurrentBag调用IsEmpty的开销很是大,由于这须要临时得到这个无序组的全部锁。
4)BlockingCollection:实现
System.Collections.Concurrent.IProducerConsumerCollection<T> 的线程安全集合,提供阻塞和限制功能
主要方法及属性:
- BlockingCollection(int boundedCapacity);boundedCapacity表示集合限制大小。
- CompleteAdding();将BlockingCollection实例标记为再也不接受任何添加。
- IsCompleted { get; }此集合是否已标记为已完成添加而且为空。
- GetConsumingEnumerable();从集合中移除并返回移除的元素
- Add(T item);添加元素到集合。
- TryTake(T item, int millisecondsTimeout, CancellationToken cancellationToken);
说明:
- 使用BlockingCollection()构造函数实例化BlockingCollection,意味着不设置boundedCapacity,那么boundedCapacity为默认值: int.MaxValue。
- 限界:使用BlockingCollection(int boundedCapacity),设置boundedCapacity的值,当集合容量达到这个值得时候,向BlockingCollection添加元素的线程将会被阻塞,直到有元素被删除。
限界功能可控制内存中集合最大大小,这对于须要处理大量元素的时候很是有用。
- 默认状况下,BlockingCollection封装了一个ConcurrentQueue。能够在构造函数中指定一个实现了IProducerConsumerCollection接口的并发集合,包括:ConcurrentStack、ConcurrentBag。
- 使用此集合包含易于无限制等待的风险,因此使用TryTake更加,由于TryTake提供了超时控制,指定的时间内能够从集合中移除某个项,则为 true;不然为 false。
5)ConcurrentDictionary:可由多个线程同时访问的键值对的线程安全集合。
主要方法
- AddOrUpdate(TKey key, TValue addValue, Func<TKey, TValue, TValue> updateValueFactory);若是指定的键尚不存在,则将键/值对添加到 字典中;若是指定的键已存在,则更新字典中的键/值对。
- GetOrAdd(TKey key, TValue value);若是指定的键尚不存在,则将键/值对添加到字典中。
- TryRemove(TKey key, out TValue value);尝试从字典中移除并返回具备指定键的值。
- TryUpdate(TKey key, TValue newValue, TValue comparisonValue);将指定键的现有值与指定值进行比较,若是相等,则用第三个值更新该键。
说明:
- ConcurrentDictionary对于读操做是彻底无锁的。当多个任务或线程向其中添加元素或修改数据的时候,ConcurrentDictionary使用细粒度的锁。使用细粒度的锁只会锁定真正须要锁定的部分,而不是整个字典。
6)IProducerConsumerCollection:定义供生产者/消费者用来操做线程安全集合的方法。 此接口提供一个统一的表示(为生产者/消费者集合),从而更高级别抽象如 System.Collections.Concurrent.BlockingCollection<T>可使用集合做为基础的存储机制。
3.经常使用模式
1)并行的生产者-消费者模式
定义:
生成者和消费者是此模式中的两类对象模型,消费者依赖于生产者的结果,生产者生成结果的同时,消费者使用结果。
图1 并行的生产者-消费者模式
说明:
- 并发集合用在此模式下很是合适,由于并发集合支持此模式中对象的并行操做。
- 若不使用并发集合,那么就要加入同步机制,从而使程序变得比较复杂,难于维护和理解,同时大大下降性能。
- 上图为生产者消费者模式示意图,纵轴为时间轴,生成者与消费者的并不在一条时间线上,但两者有交叉,意在代表生成者先产生结果,然后消费者才真正使用了生成者产生的数据。
2)流水线模式
定义:
流水线由多个阶段构成,每一个阶段由一系列的生产者和消费者构成。通常来说前一个阶段是后一个阶段的生成者;依靠相邻两个阶段之间的缓冲区队列,每一个阶段能够并发执行。
图2 并行的流水线模式
说明:
- 常使用BlockingCollection<T>做为缓冲罐区队列。
- 流水线的速度近似等于流水线最慢阶段的速度。
- 上图为流水线模式示意图,前一阶段为后一阶段的生成者,这里展现了最为简单和基本的流水线模式,更复杂的模式能够认为是每一个阶段都包括了对数据更多的处理过程。
4 使用方式
仅以ConcurrentBag和BlockingCollection为例,其余的并发集合与之类似。
ConcurrentBag
1 List<string> list = ......
2 ConcurrentBag<string> bags = new ConcurrentBag<string>();
3 Parallel.ForEach(list, (item) =>
4 {
5 //对list中的每一个元素进行处理而后,加入bags中
6 bags.Add(itemAfter);
7 });
BlockingCollection—生产者消费者模式
1 public static void Execute()
2 {
3 //调用Invoke,使得生产者任务和消费者任务并行执行
4 //Producer方法和Customer方法在Invoke中的参数顺序任意,不论何种顺序都会得到正确的结果
5 Parallel.Invoke(()=>Customer(),()=>Producer());
6 Console.WriteLine(string.Join(",",customerColl));
7 }
8
9 //生产者集合
10 private static BlockingCollection<int> producerColl = new BlockingCollection<int>();
11 //消费者集合
12 private static BlockingCollection<string> customerColl = new BlockingCollection<string>();
13
14 public static void Producer()
15 {
16 //循环将数据加入生成者集合
17 for (int i = 0; i < 100; i++)
18 {
19 producerColl.Add(i);
20 }
21
22 //设置信号,代表不在向生产者集合中加入新数据
23 //能够设置更加复杂的通知形式,好比数据量达到必定值且其中的数据知足某一条件时就设置完成添加
24 producerColl.CompleteAdding();
25 }
26
27 public static void Customer()
28 {
29 //调用IsCompleted方法,判断生产者集合是否在添加数据,是否还有未"消费"的数据
30 //注意不要使用IsAddingCompleted,IsAddingCompleted只代表集合标记为已完成添加,而不能说明其为空
31 //而IsCompleted为ture时,那么IsAddingCompleted为ture且集合为空
32 while (!producerColl.IsCompleted)
33 {
34 //调用Take或TryTake "消费"数据,消费一个,移除一个
35 //TryAdd的好处是提供超时机制
36 customerColl.Add(string.Format("消费:{0}", producerColl.Take()));
37 }
38 }
-----------------------------------------------------------------------------------------
转载与引用请注明出处。
时间仓促,水平有限,若有不当之处,欢迎指正。