[一块儿读源码]走进C#并发队列ConcurrentQueue的内部世界 — .NET Core篇

在上一篇《走进C#并发队列ConcurrentQueue的内部世界》中解析了Framework下的ConcurrentQueue实现原理,通过抛砖引玉,获得了一众大佬的指点,找到了.NET Core版本下的ConcurrentQueue源码,位于如下地址:html

我大体看了一下,虽然二者的实现有很多类似的地方,不过在细节上新增了许多有意思的东西,仍是以为要单独拉出来讲一下。画外音:谁叫我上篇立了flag,如今跪着也要写完。。🤣git

必需要吐糟的是,代码中ConcurrentQueue类明明是包含在System.Collections.Concurrent命名空间下,可是源码结构中的文件却放在System.Private.CoreLib目录中,这是闹哪出~github


存储结构

从上面给出的源码地址能够猜想出整个结构依然是Segment+Queue的组合,经过一个Segment链表实现了Queue结构,但实际上内部又加了新的设计。抛去Queue先不看的话,Segment自己就是一个实现了多生产者多消费者的线程安全集合,甚至能够直接拿它当一个固定容量的线程安全队列使用,这点与以前Framework中差异很大。若是结合Queue总体来看,Segment再也不是固定容量,而是能够由Queue来控制每一个Segment的容量大小(最小是32,上限是1024 * 1024)。算法

在Framework中,队列会给每一个Segment分配一个索引,虽然这个索引是long类型的,但理论上说队列容量仍是存在上限。在Core中就不同了,它取消了这个索引,真正实现了一个无边界(unbounded)队列。c#

我猜想的缘由是,在Framework中因为每一个Segment是固定大小的,维护一个索引能够很方便的计算队列里的元素数量,可是Core中的Segment大小不是固定的,使用索引并不能加快计算速度,使得这个索引再也不有意义,这也意味着计算元素数量变得很是复杂。数组

一张图看清它的真实面目,这里继续沿用上一篇的结构图稍做修改:
 缓存

从图中能够看到,总体结构上基本一致,核心改动就是Segment中增长了Slot(槽)的概念,这是真正存储数据的地方,同时有一个序列号与之对应。安全

从代码来看一下Segment的核心定义:并发

internal sealed class ConcurrentQueueSegment<T>
{
    //存放数据的容器
	internal readonly Slot[] _slots;

	//这个mask用来计算槽点,能够防止查找越界
	internal readonly int _slotsMask;

	//首尾位置指针
	internal PaddedHeadAndTail _headAndTail;

	//观察保留标记,表示当前段在出队时可否删除数据
	internal bool _preservedForObservation;

	//标记当前段是否被锁住
	internal bool _frozenForEnqueues;

	//下一段的指针
	internal ConcurrentQueueSegment<T>? _nextSegment;
}

其中_preservedForObservation_frozenForEnqueues会比较难理解,后面再详细介绍。分布式

再看一下队列的核心定义:

public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
{
    //每一段的初始化长度,也是最小长度
	private const int InitialSegmentLength = 32;

    //每一段的最大长度
	private const int MaxSegmentLength = 1024 * 1024;

    //操做多个段时的锁对象
	private readonly object _crossSegmentLock;

    //尾段指针
	private volatile ConcurrentQueueSegment<T> _tail;

    //首段指针
	private volatile ConcurrentQueueSegment<T> _head;
}

常规操做

仍是按上一篇的套路为主线按部就班。

建立实例

ConcurrentQueue依然提供了2个构造函数,分别能够建立一个空队列和指定数据集的队列。

/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class.
/// </summary>
public ConcurrentQueue()
{
    _crossSegmentLock = new object();
    _tail = _head = new ConcurrentQueueSegment<T>(InitialSegmentLength);
}

仍是熟悉的操做,建立了一个长度是32的Segment并把队列的首尾指针都指向它,同时建立了锁对象实例,仅此而已。
进一步看看Segment是怎么建立的:

internal ConcurrentQueueSegment(int boundedLength)
{
    //这里验证了长度不能小于2而且必须是2的N次幂
    Debug.Assert(boundedLength >= 2, $"Must be >= 2, got {boundedLength}");
    Debug.Assert((boundedLength & (boundedLength - 1)) == 0, $"Must be a power of 2, got {boundedLength}");

    _slots = new Slot[boundedLength];
    //这个mask的做用就是用来计算数组索引的防止越界,能够用`& _slotsMask`取代`% _slots.Length`
    _slotsMask = boundedLength - 1;

    //设置初始序列号
    for (int i = 0; i < _slots.Length; i++)
    {
        _slots[i].SequenceNumber = i;
    }
}

internal struct Slot
{
    [AllowNull, MaybeNull] public T Item; 
    
    public int SequenceNumber;
}

再看看怎么用集合初始化队列,这个过程稍微麻烦点,可是颇有意思:

public ConcurrentQueue(IEnumerable<T> collection)
{
    if (collection == null)
    {
        ThrowHelper.ThrowArgumentNullException(ExceptionArgument.collection);
    }

    _crossSegmentLock = new object();

    //计算获得第一段的长度
    int length = InitialSegmentLength;
    if (collection is ICollection<T> c)
    {
        int count = c.Count;
        if (count > length)
        {
            length = Math.Min(ConcurrentQueueSegment<T>.RoundUpToPowerOf2(count), MaxSegmentLength);
        }
    }

    //根据前面计算出来的长度建立一个Segment,再把数据依次入队
    _tail = _head = new ConcurrentQueueSegment<T>(length);
    foreach (T item in collection)
    {
        Enqueue(item);
    }
}

能够看到,第一段的大小是根据初始集合的大小肯定的,若是集合大小count大于32就对count进行向上取2的N次幂(RoundUpToPowerOf2)获得实际大小(可是不能超过最大值),不然就按默认值32来初始化。

向上取2的N次幂究竟是啥意思??例如count是5,那获得的结果就是8(2×2×2);若是count是9,那结果就是16(2×2×2×2);若是恰好count是8那结果就是8(2×2×2),具体算法是经过位运算实现的颇有意思。至于为何必定要是2的N次幂,中间的玄机我也没搞明白。。

顺藤摸瓜,再看看进队操做如何实现。

元素进队

/// <summary>在队尾追加一个元素</summary>
public void Enqueue(T item)
{
    // 先尝试在尾段插入一个元素
    if (!_tail.TryEnqueue(item))
    {
        // 若是插入失败,就意味着尾段已经填满,须要日后扩容
        EnqueueSlow(item);
    }
}

private void EnqueueSlow(T item)
{
    while (true)
    {
        ConcurrentQueueSegment<T> tail = _tail;

        // 先尝试再队尾插入元素,若是扩容完成了就会成功
        if (tail.TryEnqueue(item))
        {
            return;
        }
        // 得到一把锁,避免多个线程同时进行扩容
        lock (_crossSegmentLock)
        {
            //检查是否扩容过了
            if (tail == _tail)
            {
                // 尾段冻结
                tail.EnsureFrozenForEnqueues();
                // 计算下一段的长度
                int nextSize = tail._preservedForObservation ? InitialSegmentLength : Math.Min(tail.Capacity * 2, MaxSegmentLength);
                var newTail = new ConcurrentQueueSegment<T>(nextSize);

                // 改变队尾指向
                tail._nextSegment = newTail;
                // 指针交换
                _tail = newTail;
            }
        }
    }
}

从以上流程能够看到,扩容的主动权再也不由Segment去控制,而是交给了队列。正由于如此,因此在跨段操做时要先加锁,在Framework版本中是在原子操做得到指针后进行的扩容因此不会有这个问题,后面的出队操做也是同样的道理。扩容过程当中有两个细节须要重点关注,那就是SegmentFrozen和下一段的长度计算。
从前面Segment的定义中咱们看到它维护了一个_frozenForEnqueues标记字段,表示当前段是否被冻结锁定,在被锁住的状况下会让其余入队操做失败,看一下实现过程:

// must only be called while queue's segment lock is held
internal void EnsureFrozenForEnqueues() 
{
    // flag used to ensure we don't increase the Tail more than once if frozen more than once
    if (!_frozenForEnqueues) 
    {
        _frozenForEnqueues = true;
        Interlocked.Add(ref _headAndTail.Tail, FreezeOffset);
    }
}

首先判断当前冻结状态,而后把它设置为true,再使用原子操做把尾指针增长了2倍段长的偏移量,这个尾指针才是真正限制当前段不可新增元素的关键点,后面讲段的元素追加再关联起来详细介绍。而为何要指定2倍段长这么一个特殊值呢,目的是为了把尾指针和mask作运算后落在同一个slot上,也就是说虽然两个指针位置不同可是都指向的是同一个槽。

再说说下一段长度的计算问题,它主要是受_preservedForObservation这个字段影响,正常状况下一段的长度是尾段的2倍,但若是尾段正好被标记为观察保留(相似于上一篇的截取快照),那么下一段的长度依然是初始值32,原做者认为入队操做不是很频繁,这样作主要是为了不浪费空间。

接着是重头戏,看一下如何给段追加元素:

public bool TryEnqueue(T item)
{
    Slot[] slots = _slots;

    // 若是发生竞争就自旋等待
    SpinWait spinner = default;
    while (true)
    {
        // 获取当前段的尾指针
        int currentTail = Volatile.Read(ref _headAndTail.Tail);
        // 计算槽点
        int slotsIndex = currentTail & _slotsMask;
        // 读取对应槽的序列号
        int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);

        // 判断槽点序列号和指针是否匹配
        int diff = sequenceNumber - currentTail;
        if (diff == 0)
        {
            // 经过原子操做比较交换,保证了只有一个入队者得到可用空间
            if (Interlocked.CompareExchange(ref _headAndTail.Tail, currentTail + 1, currentTail) == currentTail)
            {
                // 把数据存入对应的槽点,以及更新序列号
                slots[slotsIndex].Item = item;
                Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentTail + 1);
                return true;
            }
        }
        else if (diff < 0)
        {
            // 序列号小于指针就说明该段已经装满了,直接返回false
            return false;
        }

        // 此次竞争失败了,只好等下去
        spinner.SpinOnce(sleep1Threshold: -1);
    }
}

整个流程的核心就是借助槽点序列号和尾指针的匹配状况判断是否有可用空间,由于在初始化的时候序列号是从0递增,正常状况下尾指针和序列号确定是匹配的,只有在整个段被装满时尾指针才会大于序列号,由于前面的冻结操做会给尾指针追加2倍段长的偏移量。要重点提出的是,只有在数据被写入而且序列号更新完成后才表示整个位置的元素有效,才能有出队的机会,在Framework是经过维护一个状态位来实现这个功能。整个设计颇有意思,要慢慢品。

这里咱们能够总结一下序列号的核心做用:假设一个槽点N,对应序列号是Q,它能容许入队的必要条件之一就是N==Q,因为入队操做把位置N的序列号修改为N+1,那么能够猜想出在出队时的必要条件之一就是知足Q==N+1

代码中的CompareExchange在上一篇中有介绍,这里再也不重复。另外关于Volatile相关的稍微提一下,它的核心做用是避免内存与CPU之间的高速缓存带来的数据不一致问题,告诉编译器直接读写原始数据,有兴趣的能够找资料了解,限于篇幅不过多介绍。

元素出队

能够猜想到,入队的时候要根据容量大小进行扩容,那么与之对应的,出队的时候就须要对它进行压缩,也就是丢弃没有数据的段。

/// <summary>从队首移除一个元素</summary>
public bool TryDequeue([MaybeNullWhen(false)] out T result) =>
    _head.TryDequeue(out result) || 
    TryDequeueSlow(out result); 

private bool TryDequeueSlow([MaybeNullWhen(false)] out T item)
{
    // 不断循环尝试出队,直到成功或失败为止
    while (true)
    {
        ConcurrentQueueSegment<T> head = _head;

        // 尝试从队首移除,若是成功就直接返回了
        if (head.TryDequeue(out item))
        {
            return true;
        }

        // 若是首段为空而且没有下一段了,则说明整个队列都没有数据了,返回失败
        if (head._nextSegment == null)
        {
            item = default!;
            return false;
        }

        // 既然下一段不为空,那就再次确认本段是否还能出队成功,不然就要把它给移除了,等待下次循环从下一段出队
        if (head.TryDequeue(out item))
        {
            return true;
        }

        // 首段指针要日后移动,表示当前首段已丢弃,跨段操做要先加锁
        lock (_crossSegmentLock)
        {
            if (head == _head)
            {
                _head = head._nextSegment;
            }
        }
    }
}

总体流程基本和入队同样,外层经过一个死循环不断尝试操做,直到出队成功或者队列为空返回失败为止。释放空间的操做也从Segment转移到队列上,因此要加锁保证线程安全。这一步我在代码注释中写的很详细就很少解释了,再看一下核心操做Segment是如何移除元素的:

public bool TryDequeue([MaybeNullWhen(false)] out T item)
{
    Slot[] slots = _slots;

    // 遇到竞争时自旋等待
    SpinWait spinner = default;
    while (true)
    {
        // 获取头指针地址
        int currentHead = Volatile.Read(ref _headAndTail.Head);
        // 计算槽点
        int slotsIndex = currentHead & _slotsMask;

        // 获取槽点对应的序列号
        int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);

        // 比较序列号是否和指望值同样,为何要加1的缘由前面入队时说过
        int diff = sequenceNumber - (currentHead + 1);
        if (diff == 0)
        {
            // 经过原子操做比较交换获得能够出队的槽点,并把头指针日后移动一位
            if (Interlocked.CompareExchange(ref _headAndTail.Head, currentHead + 1, currentHead) == currentHead)
            {
                // 取出数据
                item = slots[slotsIndex].Item!;
                // 此时若是该段没有被标记观察保护,要把这个槽点的数据清空
                if (!Volatile.Read(ref _preservedForObservation))
                {
                    slots[slotsIndex].Item = default;
                    Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentHead + slots.Length);
                }
                return true;
            }
        }
        else if (diff < 0)
        {
            // 这种状况说明该段已经没有有效数据了,直接返回失败。
            bool frozen = _frozenForEnqueues;
            int currentTail = Volatile.Read(ref _headAndTail.Tail);
            if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
            {
                item = default!;
                return false;
            }
        }

        // 竞争失败进入下一轮等待
        spinner.SpinOnce(sleep1Threshold: -1);
    }
}

流程和追加元素相似,大部分都写在备注里面了,这里只额外提一下为空的状况。Segment为空只有一种状况,那就是头尾指针落在了同一个槽点,但这是会出现两种可能性:

  • 第一种是都落在了非最后一个槽点,意味着该段没有被装满,拿首尾指针相减便可判断。
  • 第二种是都落在了最后一个槽点,意味着该段已经被装满了,若是此时正在进行扩容(frozen),那么必需要在尾指针的基础上减去FreezeOffset再去和头指针判断,缘由前面有说过;

是否是感受环环相扣、相辅相成、如胶似漆、balabala.....😜

统计元素数量

前面也预告过,由于队列再也不维护段索引,这样会致使计算元素数量变得很是复杂,复杂到我都不想说这一部分了😭。简单描述一下就跳过了:核心思路就是一段一段来遍历,而后计算出每段的大小最后把结果累加,若是涉及多个段还得加锁,具体到段内部就要根据首尾指针计算槽点得出实际数量等等等等,代码很长就不贴出来了。

这里也严重提醒一句,非必要状况下不要调用Count不要调用Count不要调用Count。

接下来重点说一下队列的IsEmpty。因为Segment再也不维护IsEmpty信息,因此实现方式就有点曲线救国了,经过尝试可否从队首位置获取一个元素来判断是否队列为空,也就是常说的TryPeek操做,但细节上稍有不一样。

/// <summary>
/// 判断队列是否为空,千万不要使用Count==0来判断,也不要直接TryPeek
/// </summary>
public bool IsEmpty => !TryPeek(out _, resultUsed: false);

private bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed)
{
    ConcurrentQueueSegment<T> s = _head;
    while (true)
    {
        ConcurrentQueueSegment<T>? next = Volatile.Read(ref s._nextSegment);

        // 从首段中获取头部元素,成功的话直接返回true,获取失败就意味着首段为空了
        if (s.TryPeek(out result, resultUsed))
        {
            return true;
        }

        // 若是下一段不为空那就再尝试从下一段从新获取
        if (next != null)
        {
            s = next;
        }
        //若是下一段为空就说明整个队列为空,跳出循环直接返回false了
        else if (Volatile.Read(ref s._nextSegment) == null)
        {
            break;
        }
    }
    result = default!;
    return false;
}

上面的代码能够看到有一个特殊的参数resultUsed,它具体会有什么影响呢,那就得看看Segment是如何peek的:

public bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed)
{
    // 实际上队列的TryPeek是一个观察保护操做,这时resultUsed会标记成true,若是是IsEmpty操做的话就为false,由于并不关心这个元素是否被释放了
    if (resultUsed)
    {
        _preservedForObservation = true;
        Interlocked.MemoryBarrier();
    }

    Slot[] slots = _slots;

    SpinWait spinner = default;
    while (true)
    {
        int currentHead = Volatile.Read(ref _headAndTail.Head);
        int slotsIndex = currentHead & _slotsMask;

        int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);

        int diff = sequenceNumber - (currentHead + 1);
        if (diff == 0)
        {
            result = resultUsed ? slots[slotsIndex].Item! : default!;
            return true;
        }
        else if (diff < 0)
        {
            bool frozen = _frozenForEnqueues;
            int currentTail = Volatile.Read(ref _headAndTail.Tail);
            if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
            {
                result = default!;
                return false;
            }
        }
        spinner.SpinOnce(sleep1Threshold: -1);
    }
}

除了最开始的resultUsed判断,其余的基本和出队的逻辑一致,前面说的很详细,这里很少介绍了。

枚举转换数据

前面反复的提到观察保护,这到底是个啥意思??为何要有这个操做??

其实看过上一篇文章的话就比较好理解一点,这里稍微回顾一下方便对比。在Framework中会有截取快照的操做,也就是相似ToArray\ToList\GetEnumerator这种要作数据迭代,它是经过原子操做维护一个m_numSnapshotTakers字段来实现对数据的保护,目的是为了告诉其余出队的线程我正在遍历数据,大家执行出队的时候不要把数据给删了我要用的。在Core中也是为了实现一样的功能才引入了观察保护的概念,换了一种实现方式而已。

那么就以ToArray为例是怎么和其余操做交互的:

public T[] ToArray()
{
    // 这一步能够理解为保护现场
    SnapForObservation(out ConcurrentQueueSegment<T> head, out int headHead, out ConcurrentQueueSegment<T> tail, out int tailTail);

    // 计算队列长度,这也是要返回的数组大小
    long count = GetCount(head, headHead, tail, tailTail);
    T[] arr = new T[count];

    // 开始迭代数据塞到目标数组中
    using (IEnumerator<T> e = Enumerate(head, headHead, tail, tailTail))
    {
        int i = 0;
        while (e.MoveNext())
        {
            arr[i++] = e.Current;
        }
        Debug.Assert(count == i);
    }
    return arr;
}

上面的代码中,有一次获取队列长度的操做,还有一次获取迭代数据的操做,这两步逻辑比较类似都是对整个队列进行遍历,因此作一次数据转换的开销很是很是大,使用的时候必定要谨慎。别的很少说,重点介绍一下如何实现保护现场的过程:

private void SnapForObservation(out ConcurrentQueueSegment<T> head, out int headHead, out ConcurrentQueueSegment<T> tail, out int tailTail)
{
    // 要保护现场确定要先来一把锁
    lock (_crossSegmentLock) 
    {
        head = _head;
        tail = _tail;

        // 一段一段进行遍历
        for (ConcurrentQueueSegment<T> s = head; ; s = s._nextSegment!)
        {
            // 把每一段的观察保护标记设置成true
            s._preservedForObservation = true;
            // 遍历到最后一段了就结束
            if (s == tail) break;
        }
        // 尾段冻结,这样就不能新增元素
        tail.EnsureFrozenForEnqueues(); 

        // 返回两个指针地址用来对每个元素进行遍历
        headHead = Volatile.Read(ref head._headAndTail.Head);
        tailTail = Volatile.Read(ref tail._headAndTail.Tail);
    }
}

能够看到上来就是一把锁,若是此时正在进行扩容或者收容的操做会直接阻塞掉,运气好没有阻塞的话你也不能有新元素入队了,由于尾段已经冻结锁死只能自旋等待,而出队也不能释放空间了。原话是:

At this point, any dequeues from any segment won't overwrite the value, and none of the existing segments can have new items enqueued.

有人就要问,这里把尾段锁死那等ToArray()完成后岂不是也不能有新元素入队了?不用担忧,前面入队逻辑提到过若是该段被锁住队列会新建立一个段而后再尝试入队,这样就能成功了。可是问题又来了,假如前面的段还有不少空位,那岂不是有浪费空间的嫌疑?咱们知道没有观察保护的时候每段会以2倍长度递增,这样的话空间浪费率仍是挺高的。带着疑问提了个Issue问一下:
https://github.com/dotnet/runtime/issues/35094

到这里就基本把.NET Core ConcurrentQueue说完了。


总结

对比Framework下的并发队列,Core里面的改动仍是不小的,尽管保留了SpinWaitInterlocked相关操做,可是也加入了lock,逻辑上也复杂了不少,我一步步分析和写文章搞了好几天。

至于性能对比,我找到一个官方给出的测试结果,有兴趣的能够看看:

https://github.com/dotnet/runtime/issues/27458#issuecomment-423964046

最后强行打个广告,基于.NET Core平台的开源分布式任务调度系统ScheduleMaster有兴趣的star支持一下,2.0版本即将上线:

相关文章
相关标签/搜索