并发集合 System.Collections.Concurrent 命名空间

System.Collections.Concurrent 命名空间提供多个线程安全集合类。算法

当有多个线程并发访问集合时,应使用这些类代替 System.CollectionsSystem.Collections.Generic 命名空间中的对应类型。数组

为了对集合进行线程安全的访问,定义了 IProducerConsumerCollection<T>接口。这个接口中最重 要的方法是TryAdd()和TryTake()。
TryAdd()方法尝试给集合添加一项,但若是集合禁止添加项,这个操做就可能失败。为了给出相关信息,TryAdd()方法返回一个布尔值,以说明操做是成功仍是失败。安全

TryTake()方法也以这种方式工做,以通知调用者操做是成功仍是失败,并在操做成功时返回集合中的项。并发

下面列出了 SystemCollections.Concurrent名称空间中的类及其功能:函数

ConcurrentQueue<T>

这个集合类用一种免锁定的算法实现,使用在内部合并到一个链表中的32项数组。
访问队列元素的方法有Enqueue()、TryDequeue()和TryPeek()。这些方法的命名很是相似于前面Queue<T>类的方法,只是给可能调用失败的方法加上了前缀Try。 由于这个类实现了IProducerConsumerCollection<T>接口,因此 TryAdd()和 TryTake()方法仅调用 Enqueue()和 TryDequeue()方法。spa

ConcurrentStack<T>

很是相似于ConcurrentQueue<T>类,只是带有另外的元素访问方法。
ConcurrcntStack<T>类定义了 Push()、PushRange()、TryPeek()、TiyPop()和 TryPopRange() 方法。在内部这个类使用其元素的链表。线程

ConcurrentBag<T>

该类没有定义添加或提取项的任何顺序。这个类使用一个把线程映射到内部使用的数组上的概念,所以尝试减小锁定。访问元素的方法有Add()、TryPeek()和 TryTake()。code

ConcurrentDictionary<TKey, TValue>

——这是一个线程安全的键值集合。
TryAdd()、TryGetValue()、TryRemove()和TryUpdate()方法以非阻塞的方式访问成员。
由于元素基于键和值, 因此 ConcurrentDictionary<TKey,TValue>没有实现 IProducerConsumerCollection<T>。blog

ConcurrentXXX

这些集合是线程安全的,若是某个动做不适用于线程的当前状态,它们就返回fclse。在继续以前,老是霈要确认添加或提取元素是否成功。不能相信集合会完成任务。接口

BlockingCollection<T>

这个集合在能够添加或提取元素以前,会阻塞线程并一直等待。 BlockingCollection<T>集合提供了一个接口,以使用Add()和Take()方法来添加和删除元素。 这些方法会阻寒线程,一直等到任务能够执行为止。
Add()方法有一个重载版本,其中能够给该重载版本传递一个CancellationToken令牌。这个令牌容许取消被阻塞的调用。若是不但愿线程无限期地等待下去,且不但愿从外部取消调用,就能够使用TryAdd()和 TryTake()方法,在这些方法中,也能够指定一个超时值,它表示在调用失败以前应阻塞线程和等待的最长时间。

BlockingCollection<T>

这是对实现了 IProducerConsumerCollection<T>接口的任意类的修饰器,它默认使用ConcurrentQueue<T>类。还能够给构造函数传递任何其余实现了 IProducerConsumerCollection<T> 接口的类。

下面的代码示例简单演示了使用BlockingCollection<T>类和多个线程的过程。一个线程是生成器,它使用Add()方法给集合写入元素,另外一个线程是使用者,它使用Take()方法从集合中提取元素:

internal static BlockingCollection<int> _TestBCollection;
class ThreadWork1  // producer
{
    public ThreadWork1()
    { }
    public void run()
    {
        System.Console.WriteLine("ThreadWork1 run { ");
        for (int i = 0; i < 100; i++)
        {
            System.Console.WriteLine("ThreadWork1 producer: " + i);
            _TestBCollection.Add(i);
        }
        _TestBCollection.CompleteAdding();
        System.Console.WriteLine("ThreadWork1 run } ");
    }
}
class ThreadWork2  // consumer
{
    public ThreadWork2()
    { }
    public void run()
    {
        int i = 0;
        int nCnt = 0;
        bool IsDequeuue = false;
        System.Console.WriteLine("ThreadWork2 run { ");
        while (!_TestBCollection.IsCompleted)
        {
            IsDequeuue = _TestBCollection.TryTake(out i);
            if (IsDequeuue)
            {
                System.Console.WriteLine("ThreadWork2 consumer: " + i * i + "   =====" + i);
                nCnt++;
            }
        }
        System.Console.WriteLine("ThreadWork2 run } ");
    }
}
static void StartT1()
{
    ThreadWork1 work1 = new ThreadWork1();
    work1.run();
}
static void StartT2()
{
    ThreadWork2 work2 = new ThreadWork2();
    work2.run();
}
static void Main(string[] args)
{
    Task t1 = new Task(() => StartT1());
    Task t2 = new Task(() => StartT2());
    _TestBCollection = new BlockingCollection<int>();//能够跟容量
    Console.WriteLine("Sample 4-4 Main {");
    Console.WriteLine("Main t1 t2 started {");
    t1.Start();
    t2.Start();
    Console.WriteLine("Main t1 t2 started }");
    Console.WriteLine("Main wait t1 t2 end {");
    Task.WaitAll(t1, t2);
    Console.WriteLine("Main wait t1 t2 end }");
    Console.WriteLine("Sample 4-4 Main }");
}
相关文章
相关标签/搜索