C#线程安全类型
1、iproducerconsumercollection (线程安全接口)
此接口的所有实现必须都启用此接口的所有成员,若要从多个线程同时使用。
using system; using system.collections; using system.collections.concurrent; using system.collections.generic; namespace consoleapp1 { public class safestack<t> : iproducerconsumercollection<t> { // used for enforcing thread-safety private object m_lockobject = new object(); // we'll use a regular old stack for our core operations private stack<t> m_sequentialstack = null; // // constructors // public safestack() { m_sequentialstack = new stack<t>(); } public safestack(ienumerable<t> collection) { m_sequentialstack = new stack<t>(collection); } // // safe push/pop support // public void push(t item) { lock (m_lockobject) m_sequentialstack.push(item); } public bool trypop(out t item) { bool rval = true; lock (m_lockobject) { if (m_sequentialstack.count == 0) { item = default(t); rval = false; } else item = m_sequentialstack.pop(); } return rval; } // // iproducerconsumercollection(t) support // public bool trytake(out t item) { return trypop(out item); } public bool tryadd(t item) { push(item); return true; // push doesn't fail } public t[] toarray() { t[] rval = null; lock (m_lockobject) rval = m_sequentialstack.toarray(); return rval; } public void copyto(t[] array, int index) { lock (m_lockobject) m_sequentialstack.copyto(array, index); } // // support for ienumerable(t) // public ienumerator<t> getenumerator() { // the performance here will be unfortunate for large stacks, // but thread-safety is effectively implemented. stack<t> stackcopy = null; lock (m_lockobject) stackcopy = new stack<t>(m_sequentialstack); return stackcopy.getenumerator(); } // // support for ienumerable // ienumerator ienumerable.getenumerator() { return ((ienumerable<t>)this).getenumerator(); } // // support for icollection // public bool issynchronized { get { return true; } } public object syncroot { get { return m_lockobject; } } public int count { get { return m_sequentialstack.count; } } public void copyto(array array, int index) { lock (m_lockobject) ((icollection)m_sequentialstack).copyto(array, index); } } }
using system; using system.collections.concurrent; namespace consoleapp1 { class program { static void main() { testsafestack(); // keep the console window open in debug mode. console.writeline("press any key to exit."); console.readkey(); } // test our implementation of iproducerconsumercollection(t) // demonstrates: // ipcc(t).tryadd() // ipcc(t).trytake() // ipcc(t).copyto() static void testsafestack() { safestack<int> stack = new safestack<int>(); iproducerconsumercollection<int> ipcc = (iproducerconsumercollection<int>)stack; // test push()/tryadd() stack.push(10); console.writeline("pushed 10"); ipcc.tryadd(20); console.writeline("ipcc.tryadded 20"); stack.push(15); console.writeline("pushed 15"); int[] testarray = new int[3]; // try copyto() within boundaries try { ipcc.copyto(testarray, 0); console.writeline("copyto() within boundaries worked, as expected"); } catch (exception e) { console.writeline("copyto() within boundaries unexpectedly threw an exception: {0}", e.message); } // try copyto() that overflows try { ipcc.copyto(testarray, 1); console.writeline("copyto() with index overflow worked, and it should not have"); } catch (exception e) { console.writeline("copyto() with index overflow threw an exception, as expected: {0}", e.message); } // test enumeration console.write("enumeration (should be three items): "); foreach (int item in stack) console.write("{0} ", item); console.writeline(""); // test trypop() int popped = 0; if (stack.trypop(out popped)) { console.writeline("successfully popped {0}", popped); } else console.writeline("failed to pop!!"); // test count console.writeline("stack count is {0}, should be 2", stack.count); // test trytake() if (ipcc.trytake(out popped)) { console.writeline("successfully ipcc-trytaked {0}", popped); } else console.writeline("failed to ipcc.trytake!!"); } } }
2、concurrentstack类:安全堆栈
using system; using system.collections.concurrent; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { task t = runprogram(); t.wait(); } static async task runprogram() { var taskstack = new concurrentstack<customtask>(); var cts = new cancellationtokensource(); var tasksource = task.run(() => taskproducer(taskstack)); task[] processors = new task[4]; for (int i = 1; i <= 4; i++) { string processorid = i.tostring(); processors[i - 1] = task.run( () => taskprocessor(taskstack, "processor " + processorid, cts.token)); } await tasksource; cts.cancelafter(timespan.fromseconds(2)); await task.whenall(processors); } static async task taskproducer(concurrentstack<customtask> stack) { for (int i = 1; i <= 20; i++) { await task.delay(50); var workitem = new customtask { id = i }; stack.push(workitem); console.writeline("task {0} has been posted", workitem.id); } } static async task taskprocessor( concurrentstack<customtask> stack, string name, cancellationtoken token) { await getrandomdelay(); do { customtask workitem; bool popsuccesful = stack.trypop(out workitem); if (popsuccesful) { console.writeline("task {0} has been processed by {1}", workitem.id, name); } await getrandomdelay(); } while (!token.iscancellationrequested); } static task getrandomdelay() { int delay = new random(datetime.now.millisecond).next(1, 500); return task.delay(delay); } class customtask { public int id { get; set; } } } }
3、concurrentqueue类:安全队列
using system; using system.collections.concurrent; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { task t = runprogram(); t.wait(); } static async task runprogram() { var taskqueue = new concurrentqueue<customtask>(); var cts = new cancellationtokensource(); var tasksource = task.run(() => taskproducer(taskqueue)); task[] processors = new task[4]; for (int i = 1; i <= 4; i++) { string processorid = i.tostring(); processors[i - 1] = task.run( () => taskprocessor(taskqueue, "processor " + processorid, cts.token)); } await tasksource; cts.cancelafter(timespan.fromseconds(2)); await task.whenall(processors); } static async task taskproducer(concurrentqueue<customtask> queue) { for (int i = 1; i <= 20; i++) { await task.delay(50); var workitem = new customtask { id = i }; queue.enqueue(workitem); console.writeline("插入task {0} has been posted threadid={1}", workitem.id, thread.currentthread.managedthreadid); } } static async task taskprocessor( concurrentqueue<customtask> queue, string name, cancellationtoken token) { customtask workitem; bool dequeuesuccesful = false; await getrandomdelay(); do { dequeuesuccesful = queue.trydequeue(out workitem); if (dequeuesuccesful) { console.writeline("读取task {0} has been processed by {1} threadid={2}", workitem.id, name, thread.currentthread.managedthreadid); } await getrandomdelay(); } while (!token.iscancellationrequested); } static task getrandomdelay() { int delay = new random(datetime.now.millisecond).next(1, 500); return task.delay(delay); } class customtask { public int id { get; set; } } } }
4、concurrentdictionary类
concurrentdictionary类写操作比使用锁的通常字典(dictionary)要慢的多,而读操作则要快些。因此对字典要大量的线程安全的读操作,concurrentdictionary类是最好的选择
concurrentdictionary类的实现使用了细粒度锁(fine-grained locking)技术,这在多线程写入方面比使用锁的通常的字典(也被称为粗粒度锁)
using system; using system.collections.concurrent; using system.collections.generic; using system.diagnostics; namespace consoleapp1 { class program { static void main(string[] args) { var concurrentdictionary = new concurrentdictionary<int, string>(); var dictionary = new dictionary<int, string>(); var sw = new stopwatch(); sw.start(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { dictionary[i] = item; } } sw.stop(); console.writeline("writing to dictionary with a lock: {0}", sw.elapsed); sw.restart(); for (int i = 0; i < 1000000; i++) { concurrentdictionary[i] = item; } sw.stop(); console.writeline("writing to a concurrent dictionary: {0}", sw.elapsed); sw.restart(); for (int i = 0; i < 1000000; i++) { lock (dictionary) { currentitem = dictionary[i]; } } sw.stop(); console.writeline("reading from dictionary with a lock: {0}", sw.elapsed); sw.restart(); for (int i = 0; i < 1000000; i++) { currentitem = concurrentdictionary[i]; } sw.stop(); console.writeline("reading from a concurrent dictionary: {0}", sw.elapsed); } const string item = "dictionary item"; public static string currentitem; } }
5、concurrentbag类
namespace consoleapp1 { class crawlingtask { public string urltocrawl { get; set; } public string producername { get; set; } } }
using system.collections.generic; namespace consoleapp1 { static class module { public static dictionary<string, string[]> _contentemulation = new dictionary<string, string[]>(); public static void createlinks() { _contentemulation["http://microsoft.com/"] = new[] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" }; _contentemulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" }; _contentemulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" }; _contentemulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" }; _contentemulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" }; _contentemulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" }; _contentemulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" }; _contentemulation["http://facebook.com/"] = new[] { "http://facebook.com/a.html", "http://facebook.com/b.html" }; _contentemulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" }; _contentemulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" }; _contentemulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" }; _contentemulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" }; _contentemulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" }; _contentemulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" }; _contentemulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" }; _contentemulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" }; } } }
using system; using system.collections.concurrent; using system.collections.generic; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { module.createlinks(); task t = runprogram(); t.wait(); } static async task runprogram() { var bag = new concurrentbag<crawlingtask>(); string[] urls = new[] { "http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/" }; var crawlers = new task[4]; for (int i = 1; i <= 4; i++) { string crawlername = "crawler " + i.tostring(); bag.add(new crawlingtask { urltocrawl = urls[i - 1], producername = "root" }); crawlers[i - 1] = task.run(() => crawl(bag, crawlername)); } await task.whenall(crawlers); } static async task crawl(concurrentbag<crawlingtask> bag, string crawlername) { crawlingtask task; //尝试从bag中取出对象 while (bag.trytake(out task)) { ienumerable<string> urls = await getlinksfromcontent(task); if (urls != null) { foreach (var url in urls) { var t = new crawlingtask { urltocrawl = url, producername = crawlername }; //将子集插入到bag中 bag.add(t); } } console.writeline("indexing url {0} posted by {1} is completed by {2}!", task.urltocrawl, task.producername, crawlername); } } static async task<ienumerable<string>> getlinksfromcontent(crawlingtask task) { await getrandomdelay(); if (module._contentemulation.containskey(task.urltocrawl)) return module._contentemulation[task.urltocrawl]; return null; } static task getrandomdelay() { int delay = new random(datetime.now.millisecond).next(150, 200); return task.delay(delay); } } }
6、blockingcollection类
blockingcollection类: 我们能够改变任务存储在阻塞集合中的方式。默认情况下它使用的是concurrentqueue容器,但是我们能够使用任何实现了iproducerconsumercollection泛型接口的集合。
namespace consoleapp1 { class customtask { public int id { get; set; } } }
using system; using system.threading.tasks; namespace consoleapp1 { static class module { public static task getrandomdelay() { int delay = new random(datetime.now.millisecond).next(1, 500); return task.delay(delay); } } }
using system; using system.collections.concurrent; using system.threading.tasks; namespace consoleapp1 { class program { static void main(string[] args) { console.writeline("using a queue inside of blockingcollection"); console.writeline(); task t = runprogram(); t.wait(); //console.writeline(); //console.writeline("using a stack inside of blockingcollection"); //console.writeline(); //task t = runprogram(new concurrentstack<customtask>()); //t.wait(); } static async task runprogram(iproducerconsumercollection<customtask> collection = null) { var taskcollection = new blockingcollection<customtask>(); if (collection != null) taskcollection = new blockingcollection<customtask>(collection); //初始化collection中的数据 var tasksource = task.run(() => taskproducer(taskcollection)); task[] processors = new task[4]; for (int i = 1; i <= 4; i++) { string processorid = "processor " + i; processors[i - 1] = task.run( () => taskprocessor(taskcollection, processorid)); } await tasksource; await task.whenall(processors); } /// <summary> /// 初始化collection中的数据 /// </summary> /// <param name="collection"></param> /// <returns></returns> static async task taskproducer(blockingcollection<customtask> collection) { for (int i = 1; i <= 20; i++) { await task.delay(20); var workitem = new customtask { id = i }; collection.add(workitem); console.writeline("task {0} has been posted", workitem.id); } collection.completeadding(); } /// <summary> /// 打印collection中的数据 /// </summary> /// <param name="collection"></param> /// <param name="name"></param> /// <returns></returns> static async task taskprocessor( blockingcollection<customtask> collection, string name) { await module.getrandomdelay(); foreach (customtask item in collection.getconsumingenumerable()) { console.writeline("task {0} has been processed by {1}", item.id, name); await module.getrandomdelay(); } } } }
7、使用threadstatic特性
threadstatic特性是最简单的tls使用,且只支持静态字段,只需要在字段上标记这个特性就可以了
using system; using system.threading; namespace consoleapp1 { class program { //tls中的str变量 //可以看到,str静态字段在两个线程中都是独立存储的,互相不会被修改。 [threadstatic] static string str = "hehe"; static void main(string[] args) { //另一个线程只会修改自己tls中的hehe thread th = new thread(() => { str = "mgen"; display(); }); th.start(); th.join(); display(); } static void display() { console.writeline("{0} {1}", thread.currentthread.managedthreadid, str); } } }
8、使用命名的localdatastoreslot类型
显然threadstatic特性只支持静态字段太受限制了。.net线程类型中的localdatastoreslot提供更好的tls支持。我们先来看看命名的localdatastoreslot类型,可以通过thread.allocatenameddataslot来分配一个命名的空间,通过thread.freenameddataslot来销毁一个命名的空间。空间数据的获取和设置则通过thread类型的getdata方法和setdata方法。
using system; using system.threading; namespace consoleapp1 { class program { static void main(string[] args) { //创建slot localdatastoreslot slot = thread.allocatenameddataslot("slot"); //设置tls中的值 thread.setdata(slot, "hehe"); //修改tls的线程 thread th = new thread(() => { thread.setdata(slot, "mgen"); display(); }); th.start(); th.join(); display(); //清除slot thread.freenameddataslot("slot"); } //显示tls中slot值 static void display() { localdatastoreslot dataslot = thread.getnameddataslot("slot"); console.writeline("{0} {1}", thread.currentthread.managedthreadid, thread.getdata(dataslot)); } } }
9、使用未命名的localdatastoreslot类型
线程同样支持未命名的localdatastoreslot,未命名的localdatastoreslot不需要手动清除,分配则需要thread.allocatedataslot方法。注意由于未命名的localdatastoreslot没有名称,因此无法使用thread.getnameddataslot方法,只能在多个线程中引用同一个localdatastoreslot才可以对tls空间进行操作,将上面的命名的localdatastoreslot代码改成未命名的localdatastoreslot执行
using system; using system.threading; namespace consoleapp1 { class program { //静态localdatastoreslot变量 static localdatastoreslot slot; static void main(string[] args) { //创建slot slot = thread.allocatedataslot(); //设置tls中的值 thread.setdata(slot, "hehe"); //修改tls的线程 thread th = new thread(() => { thread.setdata(slot, "mgen"); display(); }); th.start(); th.join(); display(); } //显示tls中slot值 static void display() { console.writeline("{0} {1}", thread.currentthread.managedthreadid, thread.getdata(slot)); } } }
10、使用.net 4.0的threadlocal<t>类型
.net 4.0在线程方面加入了很多东西,其中就包括threadlocal<t>类型,他的出现更大的简化了tls的操作。threadlocal<t>类型和lazy<t>惊人相似,构造函数参数是func<t>用来创建对象(当然也可以理解成对象的默认值),然后用value属性来得到或者设置这个对象。
threadlocal的操作或多或少有点像上面的未命名的localdatastoreslot,但threadlocal感觉更简洁更好理解。
using system; using system.threading; namespace consoleapp1 { class program { static threadlocal<string> local; static void main(string[] args) { //创建threadlocal并提供默认值 local = new threadlocal<string>(() => "hehe"); //修改tls的线程 thread th = new thread(() => { local.value = "mgen"; display(); }); th.start(); th.join(); display(); } //显示tls中数据值 static void display() { console.writeline("{0} {1}", thread.currentthread.managedthreadid, local.value); } } }