C# ConcurrentBag的实现原理
目录
一、前言
笔者最近在做一个项目,项目中为了提升吞吐量,使用了消息队列,中间实现了生产消费模式,在生产消费者模式中需要有一个集合,来存储生产者所生产的物品,笔者使用了最常见的list<t>
集合类型。
由于生产者线程有很多个,消费者线程也有很多个,所以不可避免的就产生了线程同步的问题。开始笔者是使用lock
关键字,进行线程同步,但是性能并不是特别理想,然后有网友说可以使用synchronizedlist<t>
来代替使用list<t>
达到线程安全的目的。于是笔者就替换成了synchronizedlist<t>
,但是发现性能依旧糟糕,于是查看了synchronizedlist<t>
的源代码,发现它就是简单的在list<t>
提供的api的基础上加了lock
,所以性能基本与笔者实现方式相差无几。
最后笔者找到了解决的方案,使用concurrentbag<t>
类来实现,性能有很大的改观,于是笔者查看了concurrentbag<t>
的源代码,实现非常精妙,特此在这记录一下。
二、concurrentbag类
concurrentbag<t>
实现了iproducerconsumercollection<t>
接口,该接口主要用于生产者消费者模式下,可见该类基本就是为生产消费者模式定制的。然后还实现了常规的ireadonlycollection<t>
类,实现了该类就需要实现ienumerable<t>、ienumerable、 icollection
类。
concurrentbag<t>
对外提供的方法没有list<t>
那么多,但是同样有enumerable
实现的扩展方法。类本身提供的方法如下所示。
名称 | 说明 |
---|---|
add | 将对象添加到 concurrentbag |
copyto | 从指定数组索引开始,将 concurrentbag |
equals(object) | 确定指定的 object 是否等于当前的 object。 (继承自 object。) |
finalize | 允许对象在“垃圾回收”回收之前尝试释放资源并执行其他清理操作。 (继承自 object。) |
getenumerator | 返回循环访问 concurrentbag |
gethashcode | 用作特定类型的哈希函数。 (继承自 object。) |
gettype | 获取当前实例的 type。 (继承自 object。) |
memberwiseclone | 创建当前 object 的浅表副本。 (继承自 object。) |
toarray | 将 concurrentbag |
tostring | 返回表示当前对象的字符串。 (继承自 object。) |
trypeek | 尝试从 concurrentbag |
trytake | 尝试从 concurrentbag |
三、 concurrentbag线程安全实现原理
1. concurrentbag的私有字段
concurrentbag
线程安全实现主要是通过它的数据存储的结构和细颗粒度的锁。
public class concurrentbag<t> : iproducerconsumercollection<t>, ireadonlycollection<t> { // threadlocallist对象包含每个线程的数据 threadlocal<threadlocallist> m_locals; // 这个头指针和尾指针指向中的第一个和最后一个本地列表,这些本地列表分散在不同线程中 // 允许在线程局部对象上枚举 volatile threadlocallist m_headlist, m_taillist; // 这个标志是告知操作线程必须同步操作 // 在globallistslock 锁中 设置 bool m_needsync; }
首选我们来看它声明的私有字段,其中需要注意的是集合的数据是存放在threadlocal
线程本地存储中的。也就是说访问它的每个线程会维护一个自己的集合数据列表,一个集合中的数据可能会存放在不同线程的本地存储空间中,所以如果线程访问自己本地存储的对象,那么是没有问题的,这就是实现线程安全的第一层,使用线程本地存储数据。
然后可以看到threadlocallist m_headlist, m_taillist;
这个是存放着本地列表对象的头指针和尾指针,通过这两个指针,我们就可以通过遍历的方式来访问所有本地列表。它使用volatile
修饰,所以它是线程安全的。
最后又定义了一个标志,这个标志告知操作线程必须进行同步操作,这是实现了一个细颗粒度的锁,因为只有在几个条件满足的情况下才需要进行线程同步。
2. 用于数据存储的trehadlocallist类
接下来我们来看一下threadlocallist
类的构造,该类就是实际存储了数据的位置。实际上它是使用双向链表这种结构进行数据存储。
[serializable] // 构造了双向链表的节点 internal class node { public node(t value) { m_value = value; } public readonly t m_value; public node m_next; public node m_prev; } /// <summary> /// 集合操作类型 /// </summary> internal enum listoperation { none, add, take }; /// <summary> /// 线程锁定的类 /// </summary> internal class threadlocallist { // 双向链表的头结点 如果为null那么表示链表为空 internal volatile node m_head; // 双向链表的尾节点 private volatile node m_tail; // 定义当前对list进行操作的种类 // 与前面的 listoperation 相对应 internal volatile int m_currentop; // 这个列表元素的计数 private int m_count; // the stealing count // 这个不是特别理解 好像是在本地列表中 删除某个node 以后的计数 internal int m_stealcount; // 下一个列表 可能会在其它线程中 internal volatile threadlocallist m_nextlist; // 设定锁定是否已进行 internal bool m_locktaken; // the owner thread for this list internal thread m_ownerthread; // 列表的版本,只有当列表从空变为非空统计是底层 internal volatile int m_version; /// <summary> /// threadlocallist 构造器 /// </summary> /// <param name="ownerthread">拥有这个集合的线程</param> internal threadlocallist(thread ownerthread) { m_ownerthread = ownerthread; } /// <summary> /// 添加一个新的item到链表首部 /// </summary> /// <param name="item">the item to add.</param> /// <param name="updatecount">是否更新计数.</param> internal void add(t item, bool updatecount) { checked { m_count++; } node node = new node(item); if (m_head == null) { debug.assert(m_tail == null); m_head = node; m_tail = node; m_version++; // 因为进行初始化了,所以将空状态改为非空状态 } else { // 使用头插法 将新的元素插入链表 node.m_next = m_head; m_head.m_prev = node; m_head = node; } if (updatecount) // 更新计数以避免此添加同步时溢出 { m_count = m_count - m_stealcount; m_stealcount = 0; } } /// <summary> /// 从列表的头部删除一个item /// </summary> /// <param name="result">the removed item</param> internal void remove(out t result) { // 双向链表删除头结点数据的流程 debug.assert(m_head != null); node head = m_head; m_head = m_head.m_next; if (m_head != null) { m_head.m_prev = null; } else { m_tail = null; } m_count--; result = head.m_value; } /// <summary> /// 返回列表头部的元素 /// </summary> /// <param name="result">the peeked item</param> /// <returns>true if succeeded, false otherwise</returns> internal bool peek(out t result) { node head = m_head; if (head != null) { result = head.m_value; return true; } result = default(t); return false; } /// <summary> /// 从列表的尾部获取一个item /// </summary> /// <param name="result">the removed item</param> /// <param name="remove">remove or peek flag</param> internal void steal(out t result, bool remove) { node tail = m_tail; debug.assert(tail != null); if (remove) // take operation { m_tail = m_tail.m_prev; if (m_tail != null) { m_tail.m_next = null; } else { m_head = null; } // increment the steal count m_stealcount++; } result = tail.m_value; } /// <summary> /// 获取总计列表计数, 它不是线程安全的, 如果同时调用它, 则可能提供不正确的计数 /// </summary> internal int count { get { return m_count - m_stealcount; } } }
从上面的代码中我们可以更加验证之前的观点,就是concurentbag<t>
在一个线程中存储数据时,使用的是双向链表,threadlocallist
实现了一组对链表增删改查的方法。
3. concurrentbag实现新增元素
接下来我们看一看concurentbag<t>
是如何新增元素的。
/// <summary> /// 尝试获取无主列表,无主列表是指线程已经被暂停或者终止,但是集合中的部分数据还存储在那里 /// 这是避免内存泄漏的方法 /// </summary> /// <returns></returns> private threadlocallist getunownedlist() { //此时必须持有全局锁 contract.assert(monitor.isentered(globallistslock)); // 从头线程列表开始枚举 找到那些已经被关闭的线程 // 将它所在的列表对象 返回 threadlocallist currentlist = m_headlist; while (currentlist != null) { if (currentlist.m_ownerthread.threadstate == system.threading.threadstate.stopped) { currentlist.m_ownerthread = thread.currentthread; // the caller should acquire a lock to make this line thread safe return currentlist; } currentlist = currentlist.m_nextlist; } return null; } /// <summary> /// 本地帮助方法,通过线程对象检索线程线程本地列表 /// </summary> /// <param name="forcecreate">如果列表不存在,那么创建新列表</param> /// <returns>the local list object</returns> private threadlocallist getthreadlist(bool forcecreate) { threadlocallist list = m_locals.value; if (list != null) { return list; } else if (forcecreate) { // 获取用于更新操作的 m_taillist 锁 lock (globallistslock) { // 如果头列表等于空,那么说明集合中还没有元素 // 直接创建一个新的 if (m_headlist == null) { list = new threadlocallist(thread.currentthread); m_headlist = list; m_taillist = list; } else { // concurrentbag内的数据是以双向链表的形式分散存储在各个线程的本地区域中 // 通过下面这个方法 可以找到那些存储有数据 但是已经被停止的线程 // 然后将已停止线程的数据 移交到当前线程管理 list = getunownedlist(); // 如果没有 那么就新建一个列表 然后更新尾指针的位置 if (list == null) { list = new threadlocallist(thread.currentthread); m_taillist.m_nextlist = list; m_taillist = list; } } m_locals.value = list; } } else { return null; } debug.assert(list != null); return list; } /// <summary> /// adds an object to the <see cref="concurrentbag{t}"/>. /// </summary> /// <param name="item">the object to be added to the /// <see cref="concurrentbag{t}"/>. the value can be a null reference /// (nothing in visual basic) for reference types.</param> public void add(t item) { // 获取该线程的本地列表, 如果此线程不存在, 则创建一个新列表 (第一次调用 add) threadlocallist list = getthreadlist(true); // 实际的数据添加操作 在addinternal中执行 addinternal(list, item); } /// <summary> /// </summary> /// <param name="list"></param> /// <param name="item"></param> private void addinternal(threadlocallist list, t item) { bool locktaken = false; try { #pragma warning disable 0420 interlocked.exchange(ref list.m_currentop, (int)listoperation.add); #pragma warning restore 0420 // 同步案例: // 如果列表计数小于两个, 因为是双向链表的关系 为了避免与任何窃取线程发生冲突 必须获取锁 // 如果设置了 m_needsync, 这意味着有一个线程需要冻结包 也必须获取锁 if (list.count < 2 || m_needsync) { // 将其重置为none 以避免与窃取线程的死锁 list.m_currentop = (int)listoperation.none; // 锁定当前对象 monitor.enter(list, ref locktaken); } // 调用 threadlocallist.add方法 将数据添加到双向链表中 // 如果已经锁定 那么说明线程安全 可以更新count 计数 list.add(item, locktaken); } finally { list.m_currentop = (int)listoperation.none; if (locktaken) { monitor.exit(list); } } }
从上面代码中,我们可以很清楚的知道add()
方法是如何运行的,其中的关键就是getthreadlist()
方法,通过该方法可以获取当前线程的数据存储列表对象,假如不存在数据存储列表,它会自动创建或者通过getunownedlist()
方法来寻找那些被停止但是还存储有数据列表的线程,然后将数据列表返回给当前线程中,防止了内存泄漏。
在数据添加的过程中,实现了细颗粒度的lock
同步锁,所以性能会很高。删除和其它操作与新增类似,本文不再赘述。
4. concurrentbag 如何实现迭代器模式
看完上面的代码后,我很好奇concurrentbag<t>
是如何实现ienumerator
来实现迭代访问的,因为concurrentbag<t>
是通过分散在不同线程中的threadlocallist
来存储数据的,那么在实现迭代器模式时,过程会比较复杂。
后面再查看了源码之后,发现concurrentbag<t>
为了实现迭代器模式,将分在不同线程中的数据全都存到一个list<t>
集合中,然后返回了该副本的迭代器。所以每次访问迭代器,它都会新建一个list<t>
的副本,这样虽然浪费了一定的存储空间,但是逻辑上更加简单了。
/// <summary> /// 本地帮助器方法释放所有本地列表锁 /// </summary> private void releasealllocks() { // 该方法用于在执行线程同步以后 释放掉所有本地锁 // 通过遍历每个线程中存储的 threadlocallist对象 释放所占用的锁 threadlocallist currentlist = m_headlist; while (currentlist != null) { if (currentlist.m_locktaken) { currentlist.m_locktaken = false; monitor.exit(currentlist); } currentlist = currentlist.m_nextlist; } } /// <summary> /// 从冻结状态解冻包的本地帮助器方法 /// </summary> /// <param name="locktaken">the lock taken result from the freeze method</param> private void unfreezebag(bool locktaken) { // 首先释放掉 每个线程中 本地变量的锁 // 然后释放全局锁 releasealllocks(); m_needsync = false; if (locktaken) { monitor.exit(globallistslock); } } /// <summary> /// 本地帮助器函数等待所有未同步的操作 /// </summary> private void waitalloperations() { contract.assert(monitor.isentered(globallistslock)); threadlocallist currentlist = m_headlist; // 自旋等待 等待其它操作完成 while (currentlist != null) { if (currentlist.m_currentop != (int)listoperation.none) { spinwait spinner = new spinwait(); // 有其它线程进行操作时,会将cuurentop 设置成 正在操作的枚举 while (currentlist.m_currentop != (int)listoperation.none) { spinner.spinonce(); } } currentlist = currentlist.m_nextlist; } } /// <summary> /// 本地帮助器方法获取所有本地列表锁 /// </summary> private void acquirealllocks() { contract.assert(monitor.isentered(globallistslock)); bool locktaken = false; threadlocallist currentlist = m_headlist; // 遍历每个线程的threadlocallist 然后获取对应threadlocallist的锁 while (currentlist != null) { // 尝试/最后 bllock 以避免在获取锁和设置所采取的标志之间的线程港口 try { monitor.enter(currentlist, ref locktaken); } finally { if (locktaken) { currentlist.m_locktaken = true; locktaken = false; } } currentlist = currentlist.m_nextlist; } } /// <summary> /// local helper method to freeze all bag operations, it /// 1- acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added /// to the dictionary /// 2- then acquire all local lists locks to prevent steal and synchronized operations /// 3- wait for all un-synchronized operations to be done /// </summary> /// <param name="locktaken">retrieve the lock taken result for the global lock, to be passed to unfreeze method</param> private void freezebag(ref bool locktaken) { contract.assert(!monitor.isentered(globallistslock)); // 全局锁定可安全地防止多线程调用计数和损坏 m_needsync monitor.enter(globallistslock, ref locktaken); // 这将强制同步任何将来的添加/执行操作 m_needsync = true; // 获取所有列表的锁 acquirealllocks(); // 等待所有操作完成 waitalloperations(); } /// <summary> /// 本地帮助器函数返回列表中的包项, 这主要由 copyto 和 toarray 使用。 /// 这不是线程安全, 应该被称为冻结/解冻袋块 /// 本方法是私有的 只有使用 freeze/unfreeze之后才是安全的 /// </summary> /// <returns>list the contains the bag items</returns> private list<t> tolist() { contract.assert(monitor.isentered(globallistslock)); // 创建一个新的list list<t> list = new list<t>(); threadlocallist currentlist = m_headlist; // 遍历每个线程中的threadlocallist 将里面的node的数据 添加到list中 while (currentlist != null) { node currentnode = currentlist.m_head; while (currentnode != null) { list.add(currentnode.m_value); currentnode = currentnode.m_next; } currentlist = currentlist.m_nextlist; } return list; } /// <summary> /// returns an enumerator that iterates through the <see /// cref="concurrentbag{t}"/>. /// </summary> /// <returns>an enumerator for the contents of the <see /// cref="concurrentbag{t}"/>.</returns> /// <remarks> /// the enumeration represents a moment-in-time snapshot of the contents /// of the bag. it does not reflect any updates to the collection after /// <see cref="getenumerator"/> was called. the enumerator is safe to use /// concurrently with reads from and writes to the bag. /// </remarks> public ienumerator<t> getenumerator() { // short path if the bag is empty if (m_headlist == null) return new list<t>().getenumerator(); // empty list bool locktaken = false; try { // 首先冻结整个 concurrentbag集合 freezebag(ref locktaken); // 然后tolist 再拿到 list的 ienumerator return tolist().getenumerator(); } finally { unfreezebag(locktaken); } }
由上面的代码可知道,为了获取迭代器对象,总共进行了三步主要的操作。
- 使用
freezebag()
方法,冻结整个concurrentbag<t>
集合。因为需要生成集合的list<t>
副本,生成副本期间不能有其它线程更改损坏数据。- 将
concurrrentbag<t>
生成list<t>
副本。因为concurrentbag<t>
存储数据的方式比较特殊,直接实现迭代器模式困难,考虑到线程安全和逻辑,最佳的办法是生成一个副本。- 完成以上操作以后,就可以使用
unfreezebag()
方法解冻整个集合。
那么freezebag()
方法是如何来冻结整个集合的呢?也是分为三步走。
- 首先获取全局锁,通过
monitor.enter(globallistslock, ref locktaken);
这样一条语句,这样其它线程就不能冻结集合。- 然后获取所有线程中
threadlocallist
的锁,通过`acquirealllocks()方法来遍历获取。这样其它线程就不能对它进行操作损坏数据。- 等待已经进入了操作流程线程结束,通过
waitalloperations()
方法来实现,该方法会遍历每一个threadlocallist
对象的m_currentop
属性,确保全部处于none
操作。
完成以上流程后,那么就是真正的冻结了整个concurrentbag<t>
集合,要解冻的话也类似。在此不再赘述。
四、总结
下面给出一张图,描述了concurrentbag<t>
是如何存储数据的。通过每个线程中的threadlocal
来实现线程本地存储,每个线程中都有这样的结构,互不干扰。然后每个线程中的m_headlist
总是指向concurrentbag<t>
的第一个列表,m_taillist
指向最后一个列表。列表与列表之间通过m_locals
下的 m_nextlist
相连,构成一个单链表。
数据存储在每个线程的m_locals
中,通过node
类构成一个双向链表。
ps: 要注意m_taillist
和m_headlist
并不是存储在threadlocal
中,而是所有的线程共享一份。
以上就是有关concurrentbag<t>
类的实现,笔者的一些记录和解析。
笔者水平有限,如果错误欢迎各位批评指正!
附上concurrentbag<t>
源码地址:戳一戳