决定从这篇文章开始,开一个读源码系列,不限制平台语言或工具,任何本身感兴趣的都会写。前几天碰到一个小问题又读了一遍ConcurrentQueue的源码,那就拿C#中比较经常使用的并发队列ConcurrentQueue做为开篇来聊一聊它的实现原理。html
话很少说,直奔主题。node
要提早说明下的是,本文解析的源码是基于.NET Framework 4.8版本,地址是:https://referencesource.microsoft.com/#mscorlib/system/Collections/Concurrent/ConcurrentQueue.cs
原本是打算用.NET Core版本的,可是找了一下居然没找到:https://github.com/dotnet/runtime/tree/master/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent
不知道是我找错位置了仍是咋回事,有知道的大佬告知一下。不过我以为实现原理应该相似吧,后面找到了我对比一下,不一样的话再写一篇来分析。git
若是是本身实现一个简单的队列功能,咱们该如何设计它的存储结构呢?通常来讲有这两种方式:数组或者链表,先来简单分析下。github
咱们都知道,数组是固定空间的集合,意味着初始化的时候要指定数组大小,可是队列的长度是随时变化的,超出数组大小了怎么办?这时候就必需要对数组进行扩容。问题又来了,扩容要扩多少呢,少了不够用多了浪费内存空间。与之相反的,链表是动态空间类型的数据结构,元素之间经过指针相连,不须要提早分配空间,须要多少分配多少。但随之而来的问题是,大量的出队入队操做伴随着大量对象的建立销毁,GC的压力又变得很是大。
事实上,在C#的普通队列Queue
类型中选择使用数组进行实现,它实现了一套扩容机制,这里再也不详细描述,有兴趣的直接看源码,比较简单。c#
回到主题,要实现一个高性能的线程安全队列,咱们试着回答如下问题:数组
经过源码能够看到ConcurrentQueue
采用了数组+链表的组合模式,充分吸取了2种结构的优势。安全
具体来讲,它的整体结构是一个链表,链表的每一个节点是一个包含数组的特殊对象,咱们称之为Segment(段或节,原话是a queue is a linked list of small arrays, each node is called a segment.
),它里面的数组是存储真实数据的地方,容量固定大小是32,每个Segment有指向下一个Segment的的指针,以此造成链表结构。而队列中维护了2个特殊的指针,他们分别指向队列的首段(head segment)和尾段(tail segment),他们对入队和出队有着重要的做用。用一张图来解释队列的内部结构:
数据结构
嗯,画图画到这里忽然联想到,搞成双向链表的话是否是就神似B+树的叶子节点?技术就是这么奇妙~并发
段的核心定义为:app
/// <summary> /// private class for ConcurrentQueue. /// 链表节点(段) /// </summary> private class Segment { //实际存储数据的容器 internal volatile T[] m_array; //存储对应位置数据的状态,当数据的对应状态位标记为true时该数据才是有效的 internal volatile VolatileBool[] m_state; //下一段的指针 private volatile Segment m_next; //当前段在队列中的索引 internal readonly long m_index; //两个位置指针 private volatile int m_low; private volatile int m_high; //所属的队列实例 private volatile ConcurrentQueue<T> m_source; }
队列的核心定义为:
/// <summary> /// 线程安全的先进先出集合, /// </summary> public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T> { //首段 [NonSerialized] private volatile Segment m_head; //尾段 [NonSerialized] private volatile Segment m_tail; //每一段的大小 private const int SEGMENT_SIZE = 32; //截取快照的操做数量 [NonSerialized] internal volatile int m_numSnapshotTakers = 0; }
先从初始化一个队列开始看起。
与普通Queue
不一样的是,ConcurrentQueue
再也不支持初始化时指定队列大小(capacity),仅仅提供一个无参构造函数和一个IEnumerable<T>
参数的构造函数。
/// <summary> /// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class. /// </summary> public ConcurrentQueue() { m_head = m_tail = new Segment(0, this); }
无参构造函数很简单,建立了一个Segment实例并把首尾指针都指向它,此时队列只包含一个Segment,它的索引是0,队列容量是32。
继续看一下Segment是如何被初始化的:
/// <summary> /// Create and initialize a segment with the specified index. /// </summary> internal Segment(long index, ConcurrentQueue<T> source) { m_array = new T[SEGMENT_SIZE]; m_state = new VolatileBool[SEGMENT_SIZE]; //all initialized to false m_high = -1; Contract.Assert(index >= 0); m_index = index; m_source = source; }
Segment只提供了一个构造函数,接受的参数分别是队列索引和队列实例,它建立了一个长度为32的数组,并建立了与之对应的状态数组,而后初始化了位置指针(m_low=0,m_high=-1,此时表示一个空的Segment)。
到这里,一个并发队列就建立好了。
使用集合建立队列的过程和上面相似,只是多了两个步骤:入队和扩容,下面会重点描述这两部分因此这里再也不过多介绍。
先亮出源码:
/// <summary> /// Adds an object to the end of the <see cref="ConcurrentQueue{T}"/>. /// </summary> /// <param name="item">The object to add to the end of the <see /// cref="ConcurrentQueue{T}"/>. The value can be a null reference /// (Nothing in Visual Basic) for reference types. /// </param> public void Enqueue(T item) { SpinWait spin = new SpinWait(); while (true) { Segment tail = m_tail; if (tail.TryAppend(item)) return; spin.SpinOnce(); } }
经过源码能够看到,入队操做是在队尾(m_tail)进行的,它尝试在最后一个Segment中追加指定的元素,若是成功了就直接返回,失败的话就自旋等待,直到成功为止。那什么状况下会失败呢?这就要继续看看是如何追加元素的:
internal bool TryAppend(T value) { //先判断一下高位指针有没有达到数组边界(也就是数组是否装满了) if (m_high >= SEGMENT_SIZE - 1) { return false; } int newhigh = SEGMENT_SIZE; try { } finally { //使用原子操做让高位指针加1 newhigh = Interlocked.Increment(ref m_high); //若是数组还有空位 if (newhigh <= SEGMENT_SIZE - 1) { //把数据放到数组中,同时更新状态 m_array[newhigh] = value; m_state[newhigh].m_value = true; } //数组满了要触发扩容 if (newhigh == SEGMENT_SIZE - 1) { Grow(); } } return newhigh <= SEGMENT_SIZE - 1; }
因此,只有当尾段m_tail装满的状况下追加元素才会失败,这时候必需要等待下一个段产生,也就是扩容(细细品一下Grow这个词真的很妙),自旋就是在等扩容完成才能有地方放数据。而在保存数据的时候,经过原子自增操做保证了同一个位置只会有一个数据被写入,从而实现了线程安全。
注意:这里的装满并非指数组每一个位置都有数据,而是指最后一个位置已被使用。
继续看一下扩容是怎么一个过程:
/// <summary> /// Create a new segment and append to the current one /// Update the m_tail pointer /// This method is called when there is no contention /// </summary> internal void Grow() { //no CAS is needed, since there is no contention (other threads are blocked, busy waiting) Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow m_next = newSegment; Contract.Assert(m_source.m_tail == this); m_source.m_tail = m_next; }
在普通队列中,扩容是经过建立一个更大的数组而后把数据拷贝过去实现扩容的,这个操做比较耗时。而在并发队列中就很是简单了,首先建立一个新Segment,而后把当前Segment的next指向它,最后挂到队列的末尾去就能够了,所有是指针操做很是高效。并且从代码注释中能够看到,这里不会出现线程竞争的状况,由于其余线程都由于位置不够被阻塞都在自旋等待中。
仍是先亮出源码:
public bool TryDequeue(out T result) { while (!IsEmpty) { Segment head = m_head; if (head.TryRemove(out result)) return true; //since method IsEmpty spins, we don't need to spin in the while loop } result = default(T); return false; }
能够看到只有在队列不为空(IsEmpty==false)的状况下才会尝试出队操做,而出队是在首段上进行操做的。关于如何判断队列是否为空总结就一句话:当首段m_head不包含任何数据且没有下一段的时候队列才为空,详细的判断过程源码注释中写的很清楚,限于篇幅不详细介绍。
出队的本质是从首段中移除低位指针所指向的元素,看一下具体实现步骤:
internal bool TryRemove(out T result) { SpinWait spin = new SpinWait(); int lowLocal = Low, highLocal = High; //判断当前段是否为空 while (lowLocal <= highLocal) { //判断低位指针位置是否能够移除 if (Interlocked.CompareExchange(ref m_low, lowLocal + 1, lowLocal) == lowLocal) { SpinWait spinLocal = new SpinWait(); //判断元素是否有效 while (!m_state[lowLocal].m_value) { spinLocal.SpinOnce(); } //取出元素 result = m_array[lowLocal]; //释放引用关系 if (m_source.m_numSnapshotTakers <= 0) { m_array[lowLocal] = default(T); } //判断当前段的元素是否所有被移除了,要丢弃它 if (lowLocal + 1 >= SEGMENT_SIZE) { spinLocal = new SpinWait(); while (m_next == null) { spinLocal.SpinOnce(); } Contract.Assert(m_source.m_head == this); m_source.m_head = m_next; } return true; } else { //线程竞争失败,自旋等待并重置 spin.SpinOnce(); lowLocal = Low; highLocal = High; } }//end of while result = default(T); return false; }
首先,只有当前Segment不为空的状况下才尝试移除元素,不然就直接返回false。而后经过一个原子操做Interlocked.CompareExchange
判断当前低位指针上是否有其余线程同时也在移除,若是有那就进入自旋等待,没有的话就从这个位置取出元素并把低位指针往前推动一位。若是当前队列没有正在进行截取快照的操做,那取出元素后还要把这个位置给释放掉。当这个Segment的全部元素都被移除掉了,这时候要把它丢弃,简单来讲就是让队列的首段指针指向它的下一段便可,丢弃的这一段等着GC来收拾它。
这里稍微提一下Interlocked.CompareExchange,它的意思是比较和交换,也就是更为你们所熟悉的CAS(Compare-and-Swap),它主要作了如下2件事情:
整个操做是原子性的,对CPU而言就是一条指令,这样就能够保证当前位置只有一个线程执行出队操做。
还有一个
TryPeek()
方法和出队相似,它是从队首获取一个元素可是无需移除该元素,能够看作Dequeue的简化版,再也不详细介绍。
与普通Queue
不一样的是,ConcurrentQueue
并无维护一个表示队列中元素个数的计数器,那就意味着要获得这个数量必须实时去计算。咱们看一下计算过程:
public int Count { get { Segment head, tail; int headLow, tailHigh; GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); if (head == tail) { return tailHigh - headLow + 1; } int count = SEGMENT_SIZE - headLow; count += SEGMENT_SIZE * ((int)(tail.m_index - head.m_index - 1)); count += tailHigh + 1; return count; } }
大体思路是,先计算(GetHeadTailPositions)出首段的低位指针和尾段的高位指针,这中间的总长度就是咱们要的数量,而后分红3节依次累加每个Segment包含的元素个数获得最终的队列长度,能够看到这是一个开销比较大的操做。
正由于如此,微软官方推荐使用IsEmpty
属性来判断队列是否为空,而不是使用队列长度Count==0
来判断,使用ConcurrentStack
也是同样。
所谓的take snapshot就是指一些格式转换的操做,例如ToArray()
、ToList()
、GetEnumerator()
这种类型的方法。在前面队列的核心定义中咱们提到有一个m_numSnapshotTakers
字段,这时候就派上用场了。下面以比较典型的ToList()
源码举例说明:
private List<T> ToList() { // Increments the number of active snapshot takers. This increment must happen before the snapshot is // taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. Interlocked.Increment(ref m_numSnapshotTakers); List<T> list = new List<T>(); try { Segment head, tail; int headLow, tailHigh; GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); if (head == tail) { head.AddToList(list, headLow, tailHigh); } else { head.AddToList(list, headLow, SEGMENT_SIZE - 1); Segment curr = head.Next; while (curr != tail) { curr.AddToList(list, 0, SEGMENT_SIZE - 1); curr = curr.Next; } tail.AddToList(list, 0, tailHigh); } } finally { // This Decrement must happen after copying is over. Interlocked.Decrement(ref m_numSnapshotTakers); } return list; }
能够看到,ToList的逻辑和Count很是类似,都是先计算出两个首尾位置指针,而后把队列分为3节依次遍历处理,最大的不一样之处在于方法的开头和结尾分别对m_numSnapshotTakers
作了一个原子操做。
在方法的第一行,使用Interlocked.Increment
作了一次递增,这时候表示队列正在进行一次截取快照操做,在处理完后又在finally中用Interlocked.Decrement
作了一次递减表示当前操做已完成,这样确保了在进行快照时不被出队影响。感受这块很难描述的特别好,因此保留了原始的英文注释,你们慢慢体会。
到这里,基本把ConcurrentQueue的核心说清楚了。
回到文章开头提出的几个问题,如今应该有了很清晰的答案:
以上所述均是我的理解,若是有错误的地方还请不吝指正,以避免误导他人。
推荐相关阅读,篇篇都是干货:https://www.cnblogs.com/lucifer1982/category/126755.html