继上篇Dictionary源码分析,上篇讲过的在这里不会再重复html
ConcurrentDictionary源码地址:node
ConcurrentDictionary一大特色是线程安全,在没有ConcurrentDictionary以前在多线程下用Dictionary,无论读写都要加个锁,不但麻烦,性能上也不是很好,由于在上篇分析中咱们知道Dictionary内部是由多个bucket组成,不一样bucket的操做即便在多线程下也能够不互相影响,若是一个锁把整个Dictionary都锁住实在有点浪费。github
不过凡事都有两面性,给每一个Bucket都加一个锁也不可取,Bucket的数量和Dictionary元素数量是同样的,而Bucket可能会有一部分是空的,并且访问Dictionary的线程若是数量不是太多也根本用上不这么多锁,想一想即便有10个线程在不停的操做这个Dictionary,同时操做的最多也就10个,即便两两冲突访问同一个Bucket,5个锁就够了,固然这是最好的状况,最坏状况是这5个bucket用同一个锁。因此,要获得最好的结果须要尝试取一个最优解,而影响因素则是bucket数量和线程数量。咱们想要的结果是锁够用但又不浪费。算法
微软得出的结果是默认的锁的数量是CPU核的个数,这个线程池默认的线程数量同样。随着Dictionary的扩容,锁的个数也能够跟着增长,这个能够在构造函数中本身指定。c#
下面看看ConcurrentDictionary里元素是作了怎样的封装。数组
private volatile Tables _tables; // 这不一样于Dictionary的bucket 数组,而是整个封装起来,并且用volatile来保证读写时的原子性 private sealed class Tables { internal readonly Node[] _buckets; // bucket成了这样,也就是ConcurrentDictionary能够认为是一个bucket数组,每一个Bucket里又由next来造成链表 internal readonly object[] _locks; // 这个就是锁的数组了 internal volatile int[] _countPerLock; // 这个是每一个锁罩的元素个数 internal Tables(Node[] buckets, object[] locks, int[] countPerLock) { _buckets = buckets; _locks = locks; _countPerLock = countPerLock; } } //由Dictionary里的Entry改为Node,而且把next放到Node里 private sealed class Node { internal readonly TKey _key; internal TValue _value; internal volatile Node _next; //next由volatile修饰,确保不被优化且读写原子性 internal readonly int _hashcode; internal Node(TKey key, TValue value, int hashcode, Node next) { _key = key; _value = value; _next = next; _hashcode = hashcode; } }
private readonly bool _growLockArray; // 是否在Dictionary扩容时也增长锁的数量 private int _budget; // 单个锁罩的元素的最大个数 private const int DefaultCapacity = 31; //ConcurrentDictionary默认大小,和List,Dictionary不同 private const int MaxLockNumber = 1024; //最大锁的个数,不过也能够在构造函数中弄个更大的,不般不必
internal ConcurrentDictionary(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer) { if (concurrencyLevel < 1) { throw new ArgumentOutOfRangeException(nameof(concurrencyLevel), SR.ConcurrentDictionary_ConcurrencyLevelMustBePositive); } if (capacity < 0) { throw new ArgumentOutOfRangeException(nameof(capacity), SR.ConcurrentDictionary_CapacityMustNotBeNegative); } if (comparer == null) throw new ArgumentNullException(nameof(comparer)); // The capacity should be at least as large as the concurrency level. Otherwise, we would have locks that don't guard // any buckets. if (capacity < concurrencyLevel) //concurrencyLevel就是锁的个数,容量小于锁的个数时,一部分锁就真是彻底没用了 { capacity = concurrencyLevel; //因此容量至少要和锁的个数同样 } object[] locks = new object[concurrencyLevel]; //初始化锁数组 for (int i = 0; i < locks.Length; i++) { locks[i] = new object(); } int[] countPerLock = new int[locks.Length]; //初始化锁罩元素个数的数组 Node[] buckets = new Node[capacity]; //初始化Node _tables = new Tables(buckets, locks, countPerLock); //初始化table _comparer = comparer; _growLockArray = growLockArray; //这就是指定锁是否增加 _budget = buckets.Length / locks.Length; //锁最大罩【容量/锁个数】这么多元素(毕竟不是扛把子,罩不了太多),多了怎么办,扩地盘。。 }
public bool TryGetValue(TKey key, out TValue value) { if (key == null) ThrowKeyNullException(); return TryGetValueInternal(key, _comparer.GetHashCode(key), out value); } private bool TryGetValueInternal(TKey key, int hashcode, out TValue value) { Debug.Assert(_comparer.GetHashCode(key) == hashcode); //先用本地变量存一下,省得在另一个线程扩容时变了 Tables tables = _tables; //又是hashcode取余哈,很少说 //int bucketNo = (hashcode & 0x7fffffff) % bucketCount; int bucketNo = GetBucket(hashcode, tables._buckets.Length); //这个用Valatile确保读了最新的Node Node n = Volatile.Read<Node>(ref tables._buckets[bucketNo]); //遍历bucket,真怀疑这些代码是几我的写的,风格都不同 while (n != null) { //找到了 if (hashcode == n._hashcode && _comparer.Equals(n._key, key)) { //返回true和value value = n._value; return true; } n = n._next; } value = default(TValue); return false; }
public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory) { if (key == null) ThrowKeyNullException(); if (valueFactory == null) throw new ArgumentNullException(nameof(valueFactory)); int hashcode = _comparer.GetHashCode(key); TValue resultingValue; //先TryGet,没有的再TryAdd if (!TryGetValueInternal(key, hashcode, out resultingValue)) { TryAddInternal(key, hashcode, valueFactory(key), false, true, out resultingValue); } return resultingValue; } private bool TryAddInternal(TKey key, int hashcode, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue) { Debug.Assert(_comparer.GetHashCode(key) == hashcode); while (true) { int bucketNo, lockNo; Tables tables = _tables; //GetBucketAndLockNo函数里面就是下面两句 //bucketNo = (hashcode & 0x7fffffff) % bucketCount; 取余得bucket No.,和Dictionary同样 //lockNo = bucketNo % lockCount; 也是取余得锁No. 也就是一个锁也是可能给多个Bucket用的 GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length); bool resizeDesired = false; bool lockTaken = false; try { if (acquireLock) //参数指定须要锁的话就锁上这个bucket的锁,也就在构造函数初始化时不须要锁 Monitor.Enter(tables._locks[lockNo], ref lockTaken); //这里是作个校验,判断tables是否在这边取完锁后其余线程把元素给扩容了,扩容会生成一个新的tables,tables变了的话上面的锁就没意义了,须要重来,因此这整个是在while(true)里面 if (tables != _tables) { continue; } Node prev = null; //这里就遍历bucket里的链表了,和Dictionary差很少 for (Node node = tables._buckets[bucketNo]; node != null; node = node._next) { Debug.Assert((prev == null && node == tables._buckets[bucketNo]) || prev._next == node); if (hashcode == node._hashcode && _comparer.Equals(node._key, key))//看是否找到 { //看是否须要更新node if (updateIfExists) { if (s_isValueWriteAtomic) //这个是判断是不是支持原子操做的值类型,好比32位上byte,int,byte,short都是原子的,而long,double就不是了,支持原子操做的直接赋值就能够了,得注意是值类型,引用类型可不能这么搞 { node._value = value; } else //不是原子操做的值类型就new一个node { Node newNode = new Node(node._key, value, hashcode, node._next); if (prev == null) { tables._buckets[bucketNo] = newNode; } else { prev._next = newNode; } } resultingValue = value; } else//不更新就直接取值 { resultingValue = node._value; } return false; //找到了返回false,表示不用Add就Get了 } prev = node; } // 找了一圈没找着,就Add吧,new一个node用Volatile的写操做写到bucket里 Volatile.Write<Node>(ref tables._buckets[bucketNo], new Node(key, value, hashcode, tables._buckets[bucketNo])); checked//这里若是超出int大小,抛overflow exception, 能进这里表示一个锁罩int.MaxValue大小的Node,真成扛把子了,极端状况下只有一个锁并且Node的大小已是Int.MaxValue才可能会出现(还要看budget同不一样意) { tables._countPerLock[lockNo]++; } //若是锁罩的Node个数大于budget就表示差很少须要扩容了,黑社会表示地盘不够用了 if (tables._countPerLock[lockNo] > _budget) { resizeDesired = true; } } finally { if (lockTaken) //出现异常要把锁释放掉 Monitor.Exit(tables._locks[lockNo]); } if (resizeDesired) { GrowTable(tables); //扩容 } resultingValue = value; //result值 return true; } }
private void GrowTable(Tables tables) { const int MaxArrayLength = 0X7FEFFFFF; int locksAcquired = 0; try { // 先把第一个锁锁住,省得其余线程也要扩容走进来 AcquireLocks(0, 1, ref locksAcquired); //若是table已经变了,也就是那些等着上面锁的线程进来发现已经扩容完了直接返回就行了 if (tables != _tables) { return; } // 计算每一个锁罩的元素的个数总和,也就是当前元素的个数 long approxCount = 0; for (int i = 0; i < tables._countPerLock.Length; i++) { approxCount += tables._countPerLock[i]; } //若是元素总和不到Bucket大小的1/4,说明扩容扩得不是时候,归根结底是budget小了 if (approxCount < tables._buckets.Length / 4) { _budget = 2 * _budget;//2倍增长budget if (_budget < 0) //小于0说明overflow了,看看,前面用check,这里又用小于0。。 { _budget = int.MaxValue; //直接最大值吧 } return; } int newLength = 0; bool maximizeTableSize = false; try { checked { //2倍+1取得一个奇数做了新的容量 newLength = tables._buckets.Length * 2 + 1; //看是否能整除3/5/7,能就+2,直到不能整除为止,也挺奇怪这算法,List是2倍,Dictionary是比2倍大的一个质数,这里又是另一种,只能说各人有各人的算法 while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0) { newLength += 2; } Debug.Assert(newLength % 2 != 0); if (newLength > MaxArrayLength) { maximizeTableSize = true; } } } catch (OverflowException) { maximizeTableSize = true; } if (maximizeTableSize)//进这里表示溢出了 { newLength = MaxArrayLength; //直接给最大值 _budget = int.MaxValue; //budget也给最大值,由于无法再扩容了,给小了进来也没意义 } //扩容以后又是熟悉的从新分配元素,和Dictionary基本一致,这里要先把全部锁锁住,前面已经锁了第一个,这里锁其余的 AcquireLocks(1, tables._locks.Length, ref locksAcquired); object[] newLocks = tables._locks; //若是容许增长锁并则锁的个数还不到1024,就增长锁 if (_growLockArray && tables._locks.Length < MaxLockNumber) { newLocks = new object[tables._locks.Length * 2]; //也是2倍增长 Array.Copy(tables._locks, 0, newLocks, 0, tables._locks.Length); //旧锁复制到新数组里 for (int i = tables._locks.Length; i < newLocks.Length; i++) //再初始化增的锁 { newLocks[i] = new object(); } } //新的Node数组 Node[] newBuckets = new Node[newLength]; int[] newCountPerLock = new int[newLocks.Length]; //遍历bucket for (int i = 0; i < tables._buckets.Length; i++) { Node current = tables._buckets[i];//当前node while (current != null) { Node next = current._next; int newBucketNo, newLockNo; //算新的bucket No.和lock No. GetBucketAndLockNo(current._hashcode, out newBucketNo, out newLockNo, newBuckets.Length, newLocks.Length); //重建个新的node,注意next指到了上一个node,和Dictionary里同样 newBuckets[newBucketNo] = new Node(current._key, current._value, current._hashcode, newBuckets[newBucketNo]); checked { newCountPerLock[newLockNo]++; //这个锁又罩了一个小弟,加一个 } current = next; } } //调整下budget _budget = Math.Max(1, newBuckets.Length / newLocks.Length); //获得新的table _tables = new Tables(newBuckets, newLocks, newCountPerLock); } finally { // 释放锁 ReleaseLocks(0, locksAcquired); } }
经过这几个函数差很少也就清楚了ConcurrentDictionary整个的原理,其余函数有兴趣的能够去看看,都差很少这个意思。安全
说完了,总结下,ConcurrentDictionary能够说是为了不一个大锁锁住整个Dictionary带来的性能损失而出来的,固然也是采用空间换时间,不过这空间换得仍是很值得的,一些object而已。
原理在于Dictionary本质是是一个链表数组,只有在多线程同时操做到数组里同一个链表时才须要锁,因此就用到一个锁数组,每一个锁罩着几个小弟(bucket及bucket内的链表元素),这样多线程读写不一样锁罩的区域的时候能够同时进行而不会等待,进而提升多线程性能。
不过也凡事无绝对,不一样业务场景的需求不同,可能Dictionary配合ReaderWriterLockSlim在某些场景(好比读的机会远大于写的)可能会有更好的表现。多线程