目录node
笔者最近在作一个项目,项目中为了提高吞吐量,使用了消息队列,中间实现了生产消费模式,在生产消费者模式中须要有一个集合,来存储生产者所生产的物品,笔者使用了最多见的List<T>
集合类型。c#
因为生产者线程有不少个,消费者线程也有不少个,因此不可避免的就产生了线程同步的问题。开始笔者是使用lock
关键字,进行线程同步,可是性能并非特别理想,而后有网友说可使用SynchronizedList<T>
来代替使用List<T>
达到线程安全的目的。因而笔者就替换成了SynchronizedList<T>
,可是发现性能依旧糟糕,因而查看了SynchronizedList<T>
的源代码,发现它就是简单的在List<T>
提供的API的基础上加了lock
,因此性能基本与笔者实现方式相差无几。数组
最后笔者找到了解决的方案,使用ConcurrentBag<T>
类来实现,性能有很大的改观,因而笔者查看了ConcurrentBag<T>
的源代码,实现很是精妙,特此在这记录一下。缓存
ConcurrentBag<T>
实现了IProducerConsumerCollection<T>
接口,该接口主要用于生产者消费者模式下,可见该类基本就是为生产消费者模式定制的。而后还实现了常规的IReadOnlyCollection<T>
类,实现了该类就须要实现IEnumerable<T>、IEnumerable、 ICollection
类。安全
ConcurrentBag<T>
对外提供的方法没有List<T>
那么多,可是一样有Enumerable
实现的扩展方法。类自己提供的方法以下所示。多线程
名称 | 说明 |
---|---|
Add | 将对象添加到 ConcurrentBag
|
CopyTo | 从指定数组索引开始,将 ConcurrentBag
|
Equals(Object) | 肯定指定的 Object 是否等于当前的 Object。 (继承自 Object。) |
Finalize | 容许对象在“垃圾回收”回收以前尝试释放资源并执行其余清理操做。 (继承自 Object。) |
GetEnumerator | 返回循环访问 ConcurrentBag
|
GetHashCode | 用做特定类型的哈希函数。 (继承自 Object。) |
GetType | 获取当前实例的 Type。 (继承自 Object。) |
MemberwiseClone | 建立当前 Object 的浅表副本。 (继承自 Object。) |
ToArray | 将 ConcurrentBag
|
ToString | 返回表示当前对象的字符串。 (继承自 Object。) |
TryPeek | 尝试从 ConcurrentBag
|
TryTake | 尝试从 ConcurrentBag
|
ConcurrentBag
线程安全实现主要是经过它的数据存储的结构和细颗粒度的锁。函数
public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T> { // ThreadLocalList对象包含每一个线程的数据 ThreadLocal<ThreadLocalList> m_locals; // 这个头指针和尾指针指向中的第一个和最后一个本地列表,这些本地列表分散在不一样线程中 // 容许在线程局部对象上枚举 volatile ThreadLocalList m_headList, m_tailList; // 这个标志是告知操做线程必须同步操做 // 在GlobalListsLock 锁中 设置 bool m_needSync; }
首选咱们来看它声明的私有字段,其中须要注意的是集合的数据是存放在ThreadLocal
线程本地存储中的。也就是说访问它的每一个线程会维护一个本身的集合数据列表,一个集合中的数据可能会存放在不一样线程的本地存储空间中,因此若是线程访问本身本地存储的对象,那么是没有问题的,这就是实现线程安全的第一层,使用线程本地存储数据。性能
而后能够看到ThreadLocalList m_headList, m_tailList;
这个是存放着本地列表对象的头指针和尾指针,经过这两个指针,咱们就能够经过遍历的方式来访问全部本地列表。它使用volatile
修饰,不容许线程进行本地缓存,每一个线程的读写都是直接操做在共享内存上,这就保证了变量始终具备一致性。任何线程在任什么时候间进行读写操做均是最新值。对于volatile
修饰符,感谢我是攻城狮指出描述错误。ui
最后又定义了一个标志,这个标志告知操做线程必须进行同步操做,这是实现了一个细颗粒度的锁,由于只有在几个条件知足的状况下才须要进行线程同步。this
接下来咱们来看一下ThreadLocalList
类的构造,该类就是实际存储了数据的位置。实际上它是使用双向链表这种结构进行数据存储。
[Serializable] // 构造了双向链表的节点 internal class Node { public Node(T value) { m_value = value; } public readonly T m_value; public Node m_next; public Node m_prev; } /// <summary> /// 集合操做类型 /// </summary> internal enum ListOperation { None, Add, Take }; /// <summary> /// 线程锁定的类 /// </summary> internal class ThreadLocalList { // 双向链表的头结点 若是为null那么表示链表为空 internal volatile Node m_head; // 双向链表的尾节点 private volatile Node m_tail; // 定义当前对List进行操做的种类 // 与前面的 ListOperation 相对应 internal volatile int m_currentOp; // 这个列表元素的计数 private int m_count; // The stealing count // 这个不是特别理解 好像是在本地列表中 删除某个Node 之后的计数 internal int m_stealCount; // 下一个列表 可能会在其它线程中 internal volatile ThreadLocalList m_nextList; // 设定锁定是否已进行 internal bool m_lockTaken; // The owner thread for this list internal Thread m_ownerThread; // 列表的版本,只有当列表从空变为非空统计是底层 internal volatile int m_version; /// <summary> /// ThreadLocalList 构造器 /// </summary> /// <param name="ownerThread">拥有这个集合的线程</param> internal ThreadLocalList(Thread ownerThread) { m_ownerThread = ownerThread; } /// <summary> /// 添加一个新的item到链表首部 /// </summary> /// <param name="item">The item to add.</param> /// <param name="updateCount">是否更新计数.</param> internal void Add(T item, bool updateCount) { checked { m_count++; } Node node = new Node(item); if (m_head == null) { Debug.Assert(m_tail == null); m_head = node; m_tail = node; m_version++; // 由于进行初始化了,因此将空状态改成非空状态 } else { // 使用头插法 将新的元素插入链表 node.m_next = m_head; m_head.m_prev = node; m_head = node; } if (updateCount) // 更新计数以免此添加同步时溢出 { m_count = m_count - m_stealCount; m_stealCount = 0; } } /// <summary> /// 从列表的头部删除一个item /// </summary> /// <param name="result">The removed item</param> internal void Remove(out T result) { // 双向链表删除头结点数据的流程 Debug.Assert(m_head != null); Node head = m_head; m_head = m_head.m_next; if (m_head != null) { m_head.m_prev = null; } else { m_tail = null; } m_count--; result = head.m_value; } /// <summary> /// 返回列表头部的元素 /// </summary> /// <param name="result">the peeked item</param> /// <returns>True if succeeded, false otherwise</returns> internal bool Peek(out T result) { Node head = m_head; if (head != null) { result = head.m_value; return true; } result = default(T); return false; } /// <summary> /// 从列表的尾部获取一个item /// </summary> /// <param name="result">the removed item</param> /// <param name="remove">remove or peek flag</param> internal void Steal(out T result, bool remove) { Node tail = m_tail; Debug.Assert(tail != null); if (remove) // Take operation { m_tail = m_tail.m_prev; if (m_tail != null) { m_tail.m_next = null; } else { m_head = null; } // Increment the steal count m_stealCount++; } result = tail.m_value; } /// <summary> /// 获取总计列表计数, 它不是线程安全的, 若是同时调用它, 则可能提供不正确的计数 /// </summary> internal int Count { get { return m_count - m_stealCount; } } }
从上面的代码中咱们能够更加验证以前的观点,就是ConcurentBag<T>
在一个线程中存储数据时,使用的是双向链表,ThreadLocalList
实现了一组对链表增删改查的方法。
接下来咱们看一看ConcurentBag<T>
是如何新增元素的。
/// <summary> /// 尝试获取无主列表,无主列表是指线程已经被暂停或者终止,可是集合中的部分数据还存储在那里 /// 这是避免内存泄漏的方法 /// </summary> /// <returns></returns> private ThreadLocalList GetUnownedList() { //此时必须持有全局锁 Contract.Assert(Monitor.IsEntered(GlobalListsLock)); // 从头线程列表开始枚举 找到那些已经被关闭的线程 // 将它所在的列表对象 返回 ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped) { currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe return currentList; } currentList = currentList.m_nextList; } return null; } /// <summary> /// 本地帮助方法,经过线程对象检索线程线程本地列表 /// </summary> /// <param name="forceCreate">若是列表不存在,那么建立新列表</param> /// <returns>The local list object</returns> private ThreadLocalList GetThreadList(bool forceCreate) { ThreadLocalList list = m_locals.Value; if (list != null) { return list; } else if (forceCreate) { // 获取用于更新操做的 m_tailList 锁 lock (GlobalListsLock) { // 若是头列表等于空,那么说明集合中尚未元素 // 直接建立一个新的 if (m_headList == null) { list = new ThreadLocalList(Thread.CurrentThread); m_headList = list; m_tailList = list; } else { // ConcurrentBag内的数据是以双向链表的形式分散存储在各个线程的本地区域中 // 经过下面这个方法 能够找到那些存储有数据 可是已经被中止的线程 // 而后将已中止线程的数据 移交到当前线程管理 list = GetUnownedList(); // 若是没有 那么就新建一个列表 而后更新尾指针的位置 if (list == null) { list = new ThreadLocalList(Thread.CurrentThread); m_tailList.m_nextList = list; m_tailList = list; } } m_locals.Value = list; } } else { return null; } Debug.Assert(list != null); return list; } /// <summary> /// Adds an object to the <see cref="ConcurrentBag{T}"/>. /// </summary> /// <param name="item">The object to be added to the /// <see cref="ConcurrentBag{T}"/>. The value can be a null reference /// (Nothing in Visual Basic) for reference types.</param> public void Add(T item) { // 获取该线程的本地列表, 若是此线程不存在, 则建立一个新列表 (第一次调用 add) ThreadLocalList list = GetThreadList(true); // 实际的数据添加操做 在AddInternal中执行 AddInternal(list, item); } /// <summary> /// </summary> /// <param name="list"></param> /// <param name="item"></param> private void AddInternal(ThreadLocalList list, T item) { bool lockTaken = false; try { #pragma warning disable 0420 Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add); #pragma warning restore 0420 // 同步案例: // 若是列表计数小于两个, 由于是双向链表的关系 为了不与任何窃取线程发生冲突 必须获取锁 // 若是设置了 m_needSync, 这意味着有一个线程须要冻结包 也必须获取锁 if (list.Count < 2 || m_needSync) { // 将其重置为None 以免与窃取线程的死锁 list.m_currentOp = (int)ListOperation.None; // 锁定当前对象 Monitor.Enter(list, ref lockTaken); } // 调用 ThreadLocalList.Add方法 将数据添加到双向链表中 // 若是已经锁定 那么说明线程安全 能够更新Count 计数 list.Add(item, lockTaken); } finally { list.m_currentOp = (int)ListOperation.None; if (lockTaken) { Monitor.Exit(list); } } }
从上面代码中,咱们能够很清楚的知道Add()
方法是如何运行的,其中的关键就是GetThreadList()
方法,经过该方法能够获取当前线程的数据存储列表对象,假如不存在数据存储列表,它会自动建立或者经过GetUnownedList()
方法来寻找那些被中止可是还存储有数据列表的线程,而后将数据列表返回给当前线程中,防止了内存泄漏。
在数据添加的过程当中,实现了细颗粒度的lock
同步锁,因此性能会很高。删除和其它操做与新增相似,本文再也不赘述。
看完上面的代码后,我很好奇ConcurrentBag<T>
是如何实现IEnumerator
来实现迭代访问的,由于ConcurrentBag<T>
是经过分散在不一样线程中的ThreadLocalList
来存储数据的,那么在实现迭代器模式时,过程会比较复杂。
后面再查看了源码以后,发现ConcurrentBag<T>
为了实现迭代器模式,将分在不一样线程中的数据全都存到一个List<T>
集合中,而后返回了该副本的迭代器。因此每次访问迭代器,它都会新建一个List<T>
的副本,这样虽然浪费了必定的存储空间,可是逻辑上更加简单了。
/// <summary> /// 本地帮助器方法释放全部本地列表锁 /// </summary> private void ReleaseAllLocks() { // 该方法用于在执行线程同步之后 释放掉全部本地锁 // 经过遍历每一个线程中存储的 ThreadLocalList对象 释放所占用的锁 ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_lockTaken) { currentList.m_lockTaken = false; Monitor.Exit(currentList); } currentList = currentList.m_nextList; } } /// <summary> /// 从冻结状态解冻包的本地帮助器方法 /// </summary> /// <param name="lockTaken">The lock taken result from the Freeze method</param> private void UnfreezeBag(bool lockTaken) { // 首先释放掉 每一个线程中 本地变量的锁 // 而后释放全局锁 ReleaseAllLocks(); m_needSync = false; if (lockTaken) { Monitor.Exit(GlobalListsLock); } } /// <summary> /// 本地帮助器函数等待全部未同步的操做 /// </summary> private void WaitAllOperations() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); ThreadLocalList currentList = m_headList; // 自旋等待 等待其它操做完成 while (currentList != null) { if (currentList.m_currentOp != (int)ListOperation.None) { SpinWait spinner = new SpinWait(); // 有其它线程进行操做时,会将cuurentOp 设置成 正在操做的枚举 while (currentList.m_currentOp != (int)ListOperation.None) { spinner.SpinOnce(); } } currentList = currentList.m_nextList; } } /// <summary> /// 本地帮助器方法获取全部本地列表锁 /// </summary> private void AcquireAllLocks() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); bool lockTaken = false; ThreadLocalList currentList = m_headList; // 遍历每一个线程的ThreadLocalList 而后获取对应ThreadLocalList的锁 while (currentList != null) { // 尝试/最后 bllock 以免在获取锁和设置所采起的标志之间的线程港口 try { Monitor.Enter(currentList, ref lockTaken); } finally { if (lockTaken) { currentList.m_lockTaken = true; lockTaken = false; } } currentList = currentList.m_nextList; } } /// <summary> /// Local helper method to freeze all bag operations, it /// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added /// to the dictionary /// 2- Then Acquire all local lists locks to prevent steal and synchronized operations /// 3- Wait for all un-synchronized operations to be done /// </summary> /// <param name="lockTaken">Retrieve the lock taken result for the global lock, to be passed to Unfreeze method</param> private void FreezeBag(ref bool lockTaken) { Contract.Assert(!Monitor.IsEntered(GlobalListsLock)); // 全局锁定可安全地防止多线程调用计数和损坏 m_needSync Monitor.Enter(GlobalListsLock, ref lockTaken); // 这将强制同步任何未来的添加/执行操做 m_needSync = true; // 获取全部列表的锁 AcquireAllLocks(); // 等待全部操做完成 WaitAllOperations(); } /// <summary> /// 本地帮助器函数返回列表中的包项, 这主要由 CopyTo 和 ToArray 使用。 /// 这不是线程安全, 应该被称为冻结/解冻袋块 /// 本方法是私有的 只有使用 Freeze/UnFreeze以后才是安全的 /// </summary> /// <returns>List the contains the bag items</returns> private List<T> ToList() { Contract.Assert(Monitor.IsEntered(GlobalListsLock)); // 建立一个新的List List<T> list = new List<T>(); ThreadLocalList currentList = m_headList; // 遍历每一个线程中的ThreadLocalList 将里面的Node的数据 添加到list中 while (currentList != null) { Node currentNode = currentList.m_head; while (currentNode != null) { list.Add(currentNode.m_value); currentNode = currentNode.m_next; } currentList = currentList.m_nextList; } return list; } /// <summary> /// Returns an enumerator that iterates through the <see /// cref="ConcurrentBag{T}"/>. /// </summary> /// <returns>An enumerator for the contents of the <see /// cref="ConcurrentBag{T}"/>.</returns> /// <remarks> /// The enumeration represents a moment-in-time snapshot of the contents /// of the bag. It does not reflect any updates to the collection after /// <see cref="GetEnumerator"/> was called. The enumerator is safe to use /// concurrently with reads from and writes to the bag. /// </remarks> public IEnumerator<T> GetEnumerator() { // Short path if the bag is empty if (m_headList == null) return new List<T>().GetEnumerator(); // empty list bool lockTaken = false; try { // 首先冻结整个 ConcurrentBag集合 FreezeBag(ref lockTaken); // 而后ToList 再拿到 List的 IEnumerator return ToList().GetEnumerator(); } finally { UnfreezeBag(lockTaken); } }
由上面的代码可知道,为了获取迭代器对象,总共进行了三步主要的操做。
- 使用
FreezeBag()
方法,冻结整个ConcurrentBag<T>
集合。由于须要生成集合的List<T>
副本,生成副本期间不能有其它线程更改损坏数据。- 将
ConcurrrentBag<T>
生成List<T>
副本。由于ConcurrentBag<T>
存储数据的方式比较特殊,直接实现迭代器模式困难,考虑到线程安全和逻辑,最佳的办法是生成一个副本。- 完成以上操做之后,就可使用
UnfreezeBag()
方法解冻整个集合。
那么FreezeBag()
方法是如何来冻结整个集合的呢?也是分为三步走。
- 首先获取全局锁,经过
Monitor.Enter(GlobalListsLock, ref lockTaken);
这样一条语句,这样其它线程就不能冻结集合。- 而后获取全部线程中
ThreadLocalList
的锁,经过`AcquireAllLocks()方法来遍历获取。这样其它线程就不能对它进行操做损坏数据。- 等待已经进入了操做流程线程结束,经过
WaitAllOperations()
方法来实现,该方法会遍历每个ThreadLocalList
对象的m_currentOp
属性,确保所有处于None
操做。
完成以上流程后,那么就是真正的冻结了整个ConcurrentBag<T>
集合,要解冻的话也相似。在此再也不赘述。
下面给出一张图,描述了ConcurrentBag<T>
是如何存储数据的。经过每一个线程中的ThreadLocal
来实现线程本地存储,每一个线程中都有这样的结构,互不干扰。而后每一个线程中的m_headList
老是指向ConcurrentBag<T>
的第一个列表,m_tailList
指向最后一个列表。列表与列表之间经过m_locals
下的 m_nextList
相连,构成一个单链表。
数据存储在每一个线程的m_locals
中,经过Node
类构成一个双向链表。
PS: 要注意m_tailList
和m_headList
并非存储在ThreadLocal
中,而是全部的线程共享一份。
以上就是有关ConcurrentBag<T>
类的实现,笔者的一些记录和解析。
附上ConcurrentBag<T>
源码地址:戳一戳