C#线程同步--限量使用
问题抽象:当某一资源同一时刻允许一定数量的线程使用的时候,需要有个机制来阻塞多余的线程,直到资源再次变得可用。
线程同步方案:semaphore、semaphoreslim、countdownevent
方案特性:限量供应;除所有者外,其他人无条件等待;先到先得,没有先后顺序
1、semaphore类
用于控制线程的访问数量,默认的构造函数为initialcount和maximumcount,表示默认设置的信号量个数和最大信号量个数。当你waitone的时候,信号量自减,当release的时候,信号量自增,然而当信号量为0的时候,后续的线程就不能拿到waitone了,所以必须等待先前的线程通过release来释放。
using system; using system.threading; namespace consoleapp1 { class program { static void main(string[] args) { thread t1 = new thread(run1); t1.start(); thread t2 = new thread(run2); t2.start(); thread t3 = new thread(run3); t3.start(); console.readkey(); } //初始可以授予2个线程信号,因为第3个要等待前面的release才能得到信号 static semaphore sem = new semaphore(2, 10); static void run1() { sem.waitone(); console.writeline("大家好,我是run1;" + datetime.now.tostring("mm:ss")); //两秒后 thread.sleep(2000); sem.release(); } static void run2() { sem.waitone(); console.writeline("大家好,我是run2;" + datetime.now.tostring("mm:ss")); //两秒后 thread.sleep(2000); sem.release(); } static void run3() { sem.waitone(); console.writeline("大家好,我是run3;" + datetime.now.tostring("mm:ss")); //两秒后 thread.sleep(2000); sem.release(); } } }
在以上的方法中release()方法相当于自增一个信号量,release(5)自增5个信号量。但是,release()到构造函数的第二个参数maximumcount的值就不能再自增了。
semaphore可用于进程级交互。
using system; using system.diagnostics; using system.threading; namespace consoleapp1 { class program { static void main(string[] args) { thread t1 = new thread(run1); t1.start(); thread t2 = new thread(run2); t2.start(); console.read(); } //初始可以授予2个线程信号,因为第3个要等待前面的release才能得到信号 static semaphore sem = new semaphore(3, 10, "命名semaphore"); static void run1() { sem.waitone(); console.writeline("进程:" + process.getcurrentprocess().id + " 我是run1" + datetime.now.timeofday); } static void run2() { sem.waitone(); console.writeline("进程:" + process.getcurrentprocess().id + " 我是run2" + datetime.now.timeofday); } } }
直接运行两次bin目录的exe文件,就能发现最多只能输出3个。
图书馆都配备有若干台公用计算机供读者查询信息,当某日读者比较多时,必须排队等候。uselibrarycomputer实例用多线程模拟了多人使用多台计算机的过程
using system; using system.threading; namespace consoleapp1 { class program { //图书馆拥有的公用计算机 private const int computernum = 3; private static computer[] librarycomputers; //同步信号量 public static semaphore sp = new semaphore(computernum, computernum); static void main(string[] args) { //图书馆拥有computernum台电脑 librarycomputers = new computer[computernum]; for (int i = 0; i < computernum; i++) librarycomputers[i] = new computer("computer" + (i + 1).tostring()); int peoplenum = 0; random ran = new random(); thread user; system.console.writeline("敲任意键模拟一批批的人排队使用{0}台计算机,esc键结束模拟……", computernum); //每次创建若干个线程,模拟人排队使用计算机 while (system.console.readkey().key != consolekey.escape) { peoplenum = ran.next(0, 10); system.console.writeline("\n有{0}人在等待使用计算机。", peoplenum); for (int i = 1; i <= peoplenum; i++) { user = new thread(usecomputer); user.start("user" + i.tostring()); } } } //线程函数 static void usecomputer(object username) { sp.waitone();//等待计算机可用 //查找可用的计算机 computer cp = null; for (int i = 0; i < computernum; i++) if (librarycomputers[i].isoccupied == false) { cp = librarycomputers[i]; break; } //使用计算机工作 cp.use(username.tostring()); //不再使用计算机,让出来给其他人使用 sp.release(); } } class computer { public readonly string computername = ""; public computer(string name) { computername = name; } //是否被占用 public bool isoccupied = false; //人在使用计算机 public void use(string username) { system.console.writeline("{0}开始使用计算机{1}", username, computername); isoccupied = true; thread.sleep(new random().next(1, 2000)); //随机休眠,以模拟人使用计算机 system.console.writeline("{0}结束使用计算机{1}", username, computername); isoccupied = false; } } }
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { static semaphoreslim slim = new semaphoreslim(environment.processorcount, 12); static void main(string[] args) { for (int i = 0; i < 12; i++) { task.factory.startnew((obj) => { run(obj); }, i); } console.read(); } static void run(object obj) { slim.wait(); console.writeline("当前时间:{0}任务 {1}已经进入。", datetime.now, obj); //这里busy3s中 thread.sleep(3000); slim.release(); } } }
同样,防止死锁的情况,我们需要知道”超时和取消标记“的解决方案,像semaphoreslim这种定死的”线程请求范围“,其实是降低了扩展性,使用需谨慎,在觉得有必要的时候使用它
using system; using system.threading; using system.threading.tasks; namespace consoleapp1 { class program { //默认的容纳大小为“硬件线程“数 static countdownevent cde = new countdownevent(environment.processorcount); static void loaduser(object obj) { try { console.writeline("threadid={0};当前任务:{1}正在加载user部分数据!", thread.currentthread.managedthreadid, obj); } finally { cde.signal(); } } static void loadproduct(object obj) { try { console.writeline("threadid={0};当前任务:{1}正在加载product部分数据!", thread.currentthread.managedthreadid, obj); } finally { cde.signal(); } } static void loadorder(object obj) { try { console.writeline("threadid={0};当前任务:{1}正在加载order部分数据!", thread.currentthread.managedthreadid, obj); } finally { cde.signal(); } } static void main(string[] args) { //加载user表需要5个任务 var usertaskcount = 5; //重置信号 cde.reset(usertaskcount); for (int i = 0; i < usertaskcount; i++) { task.factory.startnew((obj) => { loaduser(obj); }, i); } //等待所有任务执行完毕 cde.wait(); console.writeline("\nuser表数据全部加载完毕!\n"); //加载product需要8个任务 var producttaskcount = 8; //重置信号 cde.reset(producttaskcount); for (int i = 0; i < producttaskcount; i++) { task.factory.startnew((obj) => { loadproduct(obj); }, i); } cde.wait(); console.writeline("\nproduct表数据全部加载完毕!\n"); //加载order需要12个任务 var ordertaskcount = 12; //重置信号 cde.reset(ordertaskcount); for (int i = 0; i < ordertaskcount; i++) { task.factory.startnew((obj) => { loadorder(obj); }, i); } cde.wait(); console.writeline("\norder表数据全部加载完毕!\n"); console.writeline("\n(*^__^*) 嘻嘻,恭喜你,数据全部加载完毕\n"); console.read(); } } }
我们看到有两个主要方法:wait和signal。每调用一次signal相当于麻将桌上走了一个人,直到所有人都搓过麻将wait才给放行,这里同样要注意也就是“超时“问题的存在性,尤其是在并行计算中,轻量级别给我们提供了”取消标记“的机制,这是在重量级别中不存在的
注:如果调用signal()没有到达指定的次数,那么wait()将一直等待,请确保使用每个线程完成后都要调用signal方法。
上一篇: 【2019雅礼集训】【最大费用流】【模型转换】D2T3 sum
下一篇: Java8 默认方法