在上一篇文章你真的了解字典吗?一文中我介绍了Hash Function和字典的工做的基本原理.
有网友在文章底部评论,说个人Remove和Add方法没有考虑线程安全问题.
https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.dictionary-2?redirectedfrom=MSDN&view=netframework-4.7.2
查阅相关资料后,发现字典.net中Dictionary自己时不支持线程安全的,若是要想使用支持线程安全的字典,那么咱们就要使用ConcurrentDictionary了.
在研究ConcurrentDictionary的源码后,我以为在ConcurrentDictionary的线程安全的解决思路颇有意思,其对线程安全的处理对对咱们项目中的其余高并发场景也有必定的参考价值,在这里再次分享个人一些学习心得和体会,但愿对你们有所帮助.html
ConcurrentDictionary是Dictionary的线程安全版本,位于System.Collections.Concurrent的命名空间下,该命名空间下除了有ConcurrentDictionary,还有如下Class都是咱们经常使用的那些类库的线程安全版本.node
BlockingCollection
ConcurrentBag
ConcurrentQueue
若是读过我上一篇文章你真的了解字典吗?的小伙伴,对这个ConcurrentDictionary
的工做原理应该也不难理解,它是简简单单地在读写方法加个lock
吗?c#
以下图所示,在字典中,数组entries用来存储数据,buckets做为桥梁,每次经过hash function获取了key的哈希值后,对这个哈希值进行取余,即hashResult%bucketsLength=bucketIndex
,余数做为buckets的index,而buckets的value就是这个key对应的entry所在entries中的索引,因此最终咱们就能够经过这个索引在entries中拿到咱们想要的数据,整个过程不须要对全部数据进行遍历,的时间复杂度为1.api
ConcurrentDictionary的数据存储相似,只是buckets有个更多的职责,它除了有dictionary中的buckets的桥梁的做用外,负责了数据存储.数组
key的哈希值与buckets的length取余后hashResult%bucketsLength=bucketIndex
,余数做为buckets的索引就能找到咱们要的数据所存储的块,当出现两个key指向同一个块时,即上图中的John Smith和Sandra Dee他同时指向152怎么办呢?存储节点Node具备Next属性执行下个Node,上图中,node 152的Next为154,即咱们从152开始找Sandra Dee,发现不是咱们想要的,再到154找,便可取到所需数据.安全
因为官方原版的源码较为复杂,理解起来有所难度,我对官方源码作了一些精简,下文将围绕这个精简版的ConcurrentDictionary展开叙述.
https://github.com/liuzhenyulive/DictionaryMini数据结构
ConcurrentDictionary中的每一个数据存储在一个Node中,它除了存储value信息,还存储key信息,以及key对应的hashcode
private class Node { internal TKey m_key; //数据的key internal TValue m_value; //数据值 internal volatile Node m_next; //当前Node的下级节点 internal int m_hashcode; //key的hashcode //构造函数 internal Node(TKey key, TValue value, int hashcode, Node next) { m_key = key; m_value = value; m_next = next; m_hashcode = hashcode; } }
而整个ConcurrentDictionary的数据存储在这样的一个Table中,其中m_buckets的Index负责映射key,m_locks是线程锁,下文中会有详细介绍,m_countPerLock存储每一个lock锁负责的node数量.
private class Tables { internal readonly Node[] m_buckets; //上文中提到的buckets internal readonly object[] m_locks; //线程锁 internal volatile int[] m_countPerLock; //索格锁所管理的数据数量 internal readonly IEqualityComparer<TKey> m_comparer; //当前key对应的type的比较器 //构造函数 internal Tables(Node[] buckets, object[] locks, int[] countPerlock, IEqualityComparer<TKey> comparer) { m_buckets = buckets; m_locks = locks; m_countPerLock = countPerlock; m_comparer = comparer; } }
ConcurrentDictionary会在构造函数中建立Table,这里我对原有的构造函数进行了简化,经过默认值进行建立,其中DefaultConcurrencyLevel默认并发级别为当前计算机处理器的线程数.
//构造函数 public ConcurrentDictionaryMini() : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true, EqualityComparer<TKey>.Default) { } /// <summary> /// /// </summary> /// <param name="concurrencyLevel">并发等级,默认为CPU的线程数</param> /// <param name="capacity">默认容量,31,超过31后会自动扩容</param> /// <param name="growLockArray">时否动态扩充锁的数量</param> /// <param name="comparer">key的比较器</param> internal ConcurrentDictionaryMini(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer) { if (concurrencyLevel < 1) { throw new Exception("concurrencyLevel 必须为正数"); } if (capacity < 0) { throw new Exception("capacity 不能为负数."); } if (capacity < concurrencyLevel) { capacity = concurrencyLevel; } object[] locks = new object[concurrencyLevel]; for (int i = 0; i < locks.Length; i++) { locks[i] = new object(); } int[] countPerLock = new int[locks.Length]; Node[] buckets = new Node[capacity]; m_tables = new Tables(buckets, locks, countPerLock, comparer); m_growLockArray = growLockArray; m_budget = buckets.Length / locks.Length; }
ConcurrentDictionary中较为基础重点的方法分别位Add,Get,Remove,Grow Table方法,其余方法基本上是创建在这四个方法的基础上进行的扩充.
向Table中添加元素有如下亮点值得咱们关注.
开始操做前会声明一个tables变量来存储操做开始前的m_tables,在正式开始操做后(进入lock)的时候,会检查tables在准备工做阶段是否别的线程改变,若是改变了,则从新开始准备工做并重新开始.
经过GetBucketAndLockNo方法获取bucket索引以及lock索引,其内部就是取余操做.
private void GetBucketAndLockNo( int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount) { //0x7FFFFFFF 是long int的最大值 与它按位与数据小于等于这个最大值 bucketNo = (hashcode & 0x7fffffff) % bucketCount; lockNo = bucketNo % lockCount; }
对数据进行操做前会从m_locks取出第lockNo个对象最为lock,操做完成后释放该lock.多个lock必定程度上减小了阻塞的可能性.
在对数据进行更新时,若是该Value的Type为容许原子性写入的,则直接更新该Value,不然建立一个新的node进行覆盖.
/// <summary> /// Determines whether type TValue can be written atomically /// </summary> private static bool IsValueWriteAtomic() { Type valueType = typeof(TValue); // // Section 12.6.6 of ECMA CLI explains which types can be read and written atomically without // the risk of tearing. // // See http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-335.pdf // if (valueType.IsClass) { return true; } switch (Type.GetTypeCode(valueType)) { case TypeCode.Boolean: case TypeCode.Byte: case TypeCode.Char: case TypeCode.Int16: case TypeCode.Int32: case TypeCode.SByte: case TypeCode.Single: case TypeCode.UInt16: case TypeCode.UInt32: return true; case TypeCode.Int64: case TypeCode.Double: case TypeCode.UInt64: return IntPtr.Size == 8; default: return false; } }
该方法依据CLI规范进行编写,简单来讲,32位的计算机,对32字节如下的数据类型写入时能够一次写入的而不须要移动内存指针,64位计算机对64位如下的数据可一次性写入,不须要移动内存指针.保证了写入的安全.
详见12.6.6 http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-335.pdf
private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue) { while (true) { int bucketNo, lockNo; int hashcode; //https://www.cnblogs.com/blurhkh/p/10357576.html //须要了解一下值传递和引用传递 Tables tables = m_tables; IEqualityComparer<TKey> comparer = tables.m_comparer; hashcode = comparer.GetHashCode(key); GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length); bool resizeDesired = false; bool lockTaken = false; try { if (acquireLock) Monitor.Enter(tables.m_locks[lockNo], ref lockTaken); //若是表刚刚调整了大小,咱们可能没有持有正确的锁,必须重试。 //固然这种状况不多见 if (tables != m_tables) continue; Node prev = null; for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next) { if (comparer.Equals(node.m_key, key)) { //key在字典里找到了。若是容许更新,则更新该key的值。 //咱们须要为更新建立一个node,以支持不能以原子方式写入的TValue类型,由于free-lock 读取可能同时发生。 if (updateIfExists) { if (s_isValueWriteAtomic) { node.m_value = value; } else { Node newNode = new Node(node.m_key, value, hashcode, node.m_next); if (prev == null) { tables.m_buckets[bucketNo] = newNode; } else { prev.m_next = newNode; } } resultingValue = value; } else { resultingValue = node.m_value; } return false; } prev = node; } //key没有在bucket中找到,则插入该数据 Volatile.Write(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo])); //当m_countPerLock超过Int Max时会抛出OverflowException checked { tables.m_countPerLock[lockNo]++; } // // 若是m_countPerLock[lockNo] > m_budget,则须要调整buckets的大小。 // GrowTable也可能会增长m_budget,但不会调整bucket table的大小。. // 若是发现bucket table利用率很低,也会发生这种状况。 // if (tables.m_countPerLock[lockNo] > m_budget) { resizeDesired = true; } } finally { if (lockTaken) Monitor.Exit(tables.m_locks[lockNo]); } if (resizeDesired) { GrowTable(tables, tables.m_comparer, false, m_keyRehashCount); } resultingValue = value; return true; } }
从Table中获取元素的的流程与前文介绍ConcurrentDictionary工做原理时一致,但有如下亮点值得关注.
public bool TryGetValue(TKey key, out TValue value) { if (key == null) throw new ArgumentNullException("key"); // We must capture the m_buckets field in a local variable. It is set to a new table on each table resize. Tables tables = m_tables; IEqualityComparer<TKey> comparer = tables.m_comparer; GetBucketAndLockNo(comparer.GetHashCode(key), out var bucketNo, out _, tables.m_buckets.Length, tables.m_locks.Length); // We can get away w/out a lock here. // The Volatile.Read ensures that the load of the fields of 'n' doesn't move before the load from buckets[i]. Node n = Volatile.Read(ref tables.m_buckets[bucketNo]); while (n != null) { if (comparer.Equals(n.m_key, key)) { value = n.m_value; return true; } n = n.m_next; } value = default(TValue); return false; }
Remove方法实现其实也并不复杂,相似咱们链表操做中移除某个Node.移除节点的同时,还要对先后节点进行连接,相信一块小伙伴们确定很好理解.
private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue) { while (true) { Tables tables = m_tables; IEqualityComparer<TKey> comparer = tables.m_comparer; int bucketNo, lockNo; GetBucketAndLockNo(comparer.GetHashCode(key), out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length); lock (tables.m_locks[lockNo]) { if (tables != m_tables) continue; Node prev = null; for (Node curr = tables.m_buckets[bucketNo]; curr != null; curr = curr.m_next) { if (comparer.Equals(curr.m_key, key)) { if (matchValue) { bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr.m_value); if (!valuesMatch) { value = default(TValue); return false; } } if (prev == null) Volatile.Write(ref tables.m_buckets[bucketNo], curr.m_next); else { prev.m_next = curr.m_next; } value = curr.m_value; tables.m_countPerLock[lockNo]--; return true; } prev = curr; } } value = default(TValue); return false; } }
当table中任何一个m_countPerLock的数量超过了设定的阈值后,会触发此操做对Table进行扩容.
private void GrowTable(Tables tables, IEqualityComparer<TKey> newComparer, bool regenerateHashKeys, int rehashCount) { int locksAcquired = 0; try { //首先锁住第一个lock进行resize操做. AcquireLocks(0, 1, ref locksAcquired); if (regenerateHashKeys && rehashCount == m_keyRehashCount) { tables = m_tables; } else { if (tables != m_tables) return; long approxCount = 0; for (int i = 0; i < tables.m_countPerLock.Length; i++) { approxCount += tables.m_countPerLock[i]; } //若是bucket数组太空,则将预算加倍,而不是调整表的大小 if (approxCount < tables.m_buckets.Length / 4) { m_budget = 2 * m_budget; if (m_budget < 0) { m_budget = int.MaxValue; } return; } } int newLength = 0; bool maximizeTableSize = false; try { checked { newLength = tables.m_buckets.Length * 2 + 1; while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0) { newLength += 2; } } } catch (OverflowException) { maximizeTableSize = true; } if (maximizeTableSize) { newLength = int.MaxValue; m_budget = int.MaxValue; } AcquireLocks(1, tables.m_locks.Length, ref locksAcquired); object[] newLocks = tables.m_locks; //Add more locks if (m_growLockArray && tables.m_locks.Length < MAX_LOCK_NUMBER) { newLocks = new object[tables.m_locks.Length * 2]; Array.Copy(tables.m_locks, newLocks, tables.m_locks.Length); for (int i = tables.m_locks.Length; i < newLocks.Length; i++) { newLocks[i] = new object(); } } Node[] newBuckets = new Node[newLength]; int[] newCountPerLock = new int[newLocks.Length]; for (int i = 0; i < tables.m_buckets.Length; i++) { Node current = tables.m_buckets[i]; while (current != null) { Node next = current.m_next; int newBucketNo, newLockNo; int nodeHashCode = current.m_hashcode; if (regenerateHashKeys) { //Recompute the hash from the key nodeHashCode = newComparer.GetHashCode(current.m_key); } GetBucketAndLockNo(nodeHashCode, out newBucketNo, out newLockNo, newBuckets.Length, newLocks.Length); newBuckets[newBucketNo] = new Node(current.m_key, current.m_value, nodeHashCode, newBuckets[newBucketNo]); checked { newCountPerLock[newLockNo]++; } current = next; } } if (regenerateHashKeys) { unchecked { m_keyRehashCount++; } } m_budget = Math.Max(1, newBuckets.Length / newLocks.Length); m_tables = new Tables(newBuckets, newLocks, newCountPerLock, newComparer); } finally { ReleaseLocks(0, locksAcquired); } }
lock[]
:在以往的线程安全上,咱们对数据的保护每每是对数据的修改写入等地方加上lock,这个lock常常上整个上下文中惟一的,这样的设计下就可能会出现多个线程,写入的根本不是一块数据,却要等待前一个线程写入完成下一个线程才能继续操做.在ConcurrentDictionary中,经过哈希算法,从数组lock[]
中找出key的准确lock,若是不一样的key,使用的不是同一个lock,那么这多个线程的写入时互不影响的.
写入要考虑线程安全,读取呢?不能否认,在大部分场景下,读取没必要去考虑线程安全,可是在咱们这样的链式读取中,须要自上而下地查找,是否是有种可能在查找个过程当中,链路被修改了呢?因此ConcurrentDictionary中使用Volatile.Read来读取出数据,该方法从指定字段读取对象引用,在须要它的系统上,插入一个内存屏障,阻止处理器从新排序内存操做,若是在代码中此方法以后出现读取或写入,则处理器没法在此方法以前移动它。
在ConcurrentDictionary的更新方法中,对数据进行更新时,会判断该数据是否能够原子写入,若是时能够原子写入的,那么就直接更新数据,若是不是,那么会建立一个新的node覆盖原有node,起初看到这里时候,我百思不得其解,不知道这么操做的目的,后面在jeo duffy的博客中Thread-safety, torn reads, and the like中找到了答案,这样操做时为了防止torn reads(撕裂读取),什么叫撕裂读取呢?通俗地说,就是有的数据类型写入时,要分屡次写入,写一次,移动一次指针,那么就有可能写了一半,这个结果被另一个线程读取走了.好比说我把 刘振宇
三个字改为周杰伦
的过程当中,我先把刘改为周了,正在我准备去把振改为杰的时候,另一个线程过来读取结果了,读到的数据是周振宇
,这显然是不对的.因此对这种,更安全的作法是先把周杰伦
三个字写好在一张纸条上,而后直接替换掉刘振宇
.更多信息在CLI规范12.6.6有详细介绍.
checked
和unckecked
关键字.很是量的运算(non-constant)运算在编译阶段和运行时下不会作溢出检查,以下这样的代码时不会抛出异常的,算错了也不会报错。
int ten = 10; int i2 = 2147483647 + ten;
可是咱们知道,int的最大值是2147483647,若是咱们将上面这样的代码嵌套在checked
就会作溢出检查了.
checked { int ten = 10; int i2 = 2147483647 + ten; }
相反,对于常量,编译时是会作溢出检查的,下面这样的代码在编译时就会报错的,若是咱们使用unckeck
标签进行标记,则在编译阶段不会作移除检查.
int a = int.MaxValue * 2;
那么问题来了,咱们固然知道checked颇有用,那么uncheck呢?若是咱们只是须要那么一个数而已,至于溢出不溢出的关系不大,好比说生成一个对象的HashCode,好比说根据一个算法计算出一个相对随机数,这都是不须要准确结果的,ConcurrentDictionary中对于m_keyRehashCount++
这个运算就使用了unchecked,就是由于m_keyRehashCount是用来生成哈希值的,咱们并不关心它有没有溢出.
volatile
关键字,表示一个字段多是由在同一时间执行多个线程进行修改。出于性能缘由,编译器\运行时系统甚至硬件能够从新排列对存储器位置的读取和写入。声明的字段volatile不受这些优化的约束。添加volatile修饰符可确保全部线程都能按照执行顺序由任何其余线程执行的易失性写入,易失性写入是一件疯狂的事情的事情:普通玩家慎用.本博客所涉及的代码都保存在github中,Take it easy to enjoy it!
https://github.com/liuzhenyulive/DictionaryMini/blob/master/DictionaryMini/DictionaryMini/ConcurrentDictionaryMini.cs