C#的变迁史 - C# 4.0 之线程安全集合篇

  做为多线程和并行计算不得不考虑的问题就是临界资源的访问问题,解决临界资源的访问一般是加锁或者是使用信号量,这个你们应该很熟悉了。算法

  而集合做为一种重要的临界资源,通用性更广,为了让你们更安全的使用它们,微软为咱们带来了强大的并行集合:System.Collections.Concurrent里面的各位仁兄们。数组

  首先,我们从一个经典的问题谈起。安全

生产者消费者问题数据结构

  这个问题是最为经典的多线程应用问题,简单的表述这个问题就是:有一个或多个线程(生产者线程)产生一些数据,同时,还有一个或者多个线程(消费者线程)要取出这些数据并执行一些相应的工做。以下图所示:多线程

  下面就是使用程序去描述这个问题了。函数

  最直接的想法多是这样:spa

static void Main(string[] args)
{
    int count = 0;
    // 临界资源区
    var queue = new Queue<string>();
    // 生产者线程
    Task.Factory.StartNew(() =>
    {
        while (true)
        {
            queue.Enqueue("value" + count);
            count++;
        }
    });

    // 消费者线程1
    Task.Factory.StartNew(() =>
    {
        while (true)
        {
            if (queue.Count > 0)
            {
                string value = queue.Dequeue();
                Console.WriteLine("Worker 1: " + value);
            }
        }

    });
    // 消费者线程2
    Task.Factory.StartNew(() =>
    {
        while (true)
        {
            if (queue.Count > 0)
            {
                string value = queue.Dequeue();
                Console.WriteLine("Worker 2: " + value);
            }
        }

    });

    Thread.Sleep(50000);
}

  使用Queue<string>模拟了一个简单的资源池,一个生产者放数据,两个消费者消费数据。上面这个程序运行之后会产生异常,异常的缘由很简单,当某个时刻,第一个消费者判断queue.Count > 0为true时,就会到Queue中取数据,可是这个时候数据可能会被第二个消费者拿走了,由于第二个消费者也判断出此时有数据可取。这是一个简单的临界资源线程安全问题。线程

  知道问题了,那么如何解决呢?
  第一种方案是加锁,这个方案是可行的,不少时候咱们也是这么作的,包括微软早期实现线程安全的ArrayList和Hashtable内部(Synchronized方法)也是这么实现的。这个方案适用于只有少许的消费者,而且每一个消费者都会执行大量操做的时候,这时lock并没什么太大问题,可是,若是是大批量短小精悍的消费者存在的话,lock会严重影响代码的执行效率。
  第二种方案就是咱们直接用新的线程安全的集合区解决这个问题。新的线程安全的这些集合内部再也不使用lock机制这种比较低效的方式去实现线程安全,而是转而使用SpinWait和Interlocked等机制,间接实现了线程安全,这种方式的效率要高于使用lock的方式。看一下实现代码:3d

var queue = new ConcurrentQueue<string>();
Task.Factory.StartNew(() =>
{
    while (true)
    {
        queue.Enqueue("value" + count);
        count++;
    }
});

Task.Factory.StartNew(() =>
{
    while (true)
    {
        string value;
        if (queue.TryDequeue(out value))
        {
            Console.WriteLine("Worker 1: " + value);
        }
    }
});

Task.Factory.StartNew(() =>
{
    while (true)
    {
        string value;
        if (queue.TryDequeue(out value))
        {
            Console.WriteLine("Worker 2: " + value);
        }

    }
});

  执行这段代码,能够工做,可是有点不太优雅,能不能不要去判断集合是否为空?集合当本身没有元素的时候本身Block一下能够吗?答案固然是能够的,使用BlockingCollection便可:code

var blockingCollection = new BlockingCollection<string>();
Task.Factory.StartNew(() =>
{
    while (true)
    {
        blockingCollection.Add("value" + count);
        count++;
    }
});

Task.Factory.StartNew(() =>
{
    while (true)
    {
        Console.WriteLine("Worker 1: " + blockingCollection.Take());
    }
});

Task.Factory.StartNew(() =>
{
    while (true)
    {
        Console.WriteLine("Worker 2: " + blockingCollection.Take());
    }
});

  BlockingCollection集合是一个拥有阻塞功能的集合,它就是完成了经典生产者消费者的算法功能。它没有实现底层的存储结构,而是使用了实现IProducerConsumerCollection接口的几个集合做为底层的数据结构,例如ConcurrentBag, ConcurrentStack或者是ConcurrentQueue。你能够在构造BlockingCollection实例的时候传入这个参数,若是不指定的话,则默认使用ConcurrentQueue做为存储结构。

  而对于生产者来讲,只须要经过调用其Add方法放数据,消费者只须要调用Take方法来取数据就能够了。
  固然了上面的消费者代码中还有一点是让人不爽的,那就是while语句,能够更优雅一点吗?答案仍是确定的:

Task.Factory.StartNew(() =>
{
        foreach (string value in blockingCollection.GetConsumingEnumerable())
        {
            Console.WriteLine("Worker 1: " + value);
        }
});

  GetConsumingEnumerable()方法是关键,这个方法会遍历集合取出数据,一旦发现集合空了,则阻塞本身,直到集合中又有元素了再开始遍历,神奇吧。

  好了,到此完美了解决了生产者消费者问题。然而一般来讲,还有两个问题咱们有时须要去控制:

第一个问题:控制集合中数据的最大数量。

  这个问题由BlockingCollection构造函数解决,构造该对象实例的时候,构造函数中的BoundedCapacity决定了集合最大的可容纳数据数量,这个比较简单,很少说了。

第二个问题:什么时候中止的问题。

  这个问题由CompleteAdding和IsCompleted两个配合解决。
  CompleteAdding方法是直接不容许任何元素被加入集合;当使用了CompleteAdding方法后且集合内没有元素的时候,另外一个属性IsCompleted此时会为True,这个属性能够用来判断是否当前集合内的全部元素都被处理完。看一下生产者修改后的代码:

Task.Factory.StartNew(() =>
{
    for (int count = 0; count < 10; count++)
    {
        blockingCollection.Add("value" + count);
    }

    blockingCollection.CompleteAdding();
});

  当使用了CompleteAdding方法后,对象中止往集合中添加数据,这时若是是使用GetConsumingEnumerable枚举的,那么这种枚举会天然结束,不会再Block住集合,这种方式最优雅,也是推荐的写法。可是若是是使用TryTake访问元素的,则须要使用IsCompleted判断一下,由于这个时候使用TryTake会抛InvalidOperationException异常。

看一下最终的代码形式:

static void Main(string[] args)
{
    var blockingCollection = new BlockingCollection<string>();
    var producer = Task.Factory.StartNew(() =>
    {
        for (int count = 0; count < 10; count++)
        {
            blockingCollection.Add("value" + count);
            Thread.Sleep(300);
        }

        blockingCollection.CompleteAdding();
    });

    var consumer1 = Task.Factory.StartNew(() =>
    {
        foreach (string value in blockingCollection.GetConsumingEnumerable())
        {
            Console.WriteLine("Worker 1: " + value);
        }
    });

    var consumer2 = Task.Factory.StartNew(() =>
    {
        foreach (string value in blockingCollection.GetConsumingEnumerable())
        {
            Console.WriteLine("Worker 2: " + value);
        }
    });

    Task.WaitAll(producer, consumer1, consumer2);
}

BlockingCollection的枚举

  此外,须要注意BlockingCollection有两种枚举方法,首先BlockingCollection自己继承自IEnumerable<T>,因此它本身就能够被foreach枚举,首先BlockingCollection包装了一个线程安全集合,那么它本身也是线程安全的,而当多个线程在同时修改或访问线程安全容器时,BlockingCollection本身做为IEnumerable会返回一个必定时间内的集合片断,也就是只会枚举在那个时间点上内部集合的元素。使用这种方式枚举的时候,不会有Block效果。
  另一种方式就是咱们上面使用的GetConsumingEnumerable方式的枚举,这种方式会有Block效果,直到CompleteAdding被调用为止。

  最后提一下实现IProducerConsumerCollection接口的几个集合:ConcurrentBag(线程安全的无序的元素集合), ConcurrentStack(线程安全的堆栈)和ConcurrentQueue(线程安全的队列)。这些都很简单,功能与非线程安全的那些集合都同样,只很少是多了TryXXX方法,多线程环境下使用这些方法就行了,其余就很少说了。

  到今生产者和消费者这个经典的问题告一段落了。

  System.Collections.Concurrent下面的集合除了解决生产者消费者问题外,还有一些与多线程相关的集合,例如:

1. ConcurrentDictionary,这个是键/值对字典的线程安全实现,这个类在原来的基础上也添加了一下新的方法,例如:AddOrUpdate,GetOrAdd,TryXXX等等,都很容易理解。

2. 各类Partitioner 类,提供针对数组、列表和可枚举项的常见分区策略。

  若要对数据源操做进行并行化,其中一个必要步骤是将源分区为可由多个线程同时访问的多个部分。 PLINQ 和任务并行库 (TPL) 提供了默认的分区程序,当编写并行查询或ForEach循环时,默认的分区程序以透明方式工做。 可是毫无疑问,对于一些复杂的状况,咱们是能够插入本身的分区程序的,这就是微软为咱们提供的各类Partitioner类,这个很少说了,感兴趣的同窗请本身参考一下MSDN。

相关文章
相关标签/搜索