C# 线程同步详解
前言
当线程池的线程阻塞时,线程池会创建额外的线程,而创建、销毁和调度线程所需要相当昂贵的内存资源,另外,很多的开发人员看见自己程序的线程没有做任何有用的事情时习惯创建更多的线程,为了构建可伸缩、响应灵敏的程序,我们在前面介绍了c#异步编程详解
但是异步编程同样也存在着很严重的问题,如果两个不同的线程访问相同的变量和数据,按照我们异步函数的实现方式,不可能存在两个线程同时访问相同的数据,这个时候我们就需要线程同步。多个线程同时访问共享数据的时,线程同步能防止数据损坏,之所以强调同时这个概念,因为线程同步本质就是计时问题。
异步和同步是相对的,同步就是顺序执行,执行完一个再执行下一个,需要等待、协调运行。异步就是彼此独立,在等待某事件的过程中继续做自己的事,不需要等待这一事件完成后再工作。线程就是实现异步的一个方式。异步是让调用方法的主线程不需要同步等待另一线程的完成,从而可以让主线程干其它的事情。
基元用户模式和内核模式构造
基础概念
基元:可以在代码中使用的简单的构造
用户模式:通过特殊的cpu指令协调线程,操作系统永远检测不到一个线程在基元用户模式的构造上阻塞。
内核模式:由windows自身提供,在应用程序的线程中调用由内核实现的函数。
用户模式构造
易变构造
c#编译器、jit编译器和cpu都会对代码进行优化,它们尽量保证保留我们的意图,但是从多线程的角度出发,我们的意图并不一定会得到保留,下面举例说明:
static void main(string[] args) { console.writeline("让worker函数运行5s后停止"); var t = new thread(worker); t.start(); thread.sleep(5000); stop = true; console.readline(); } private static bool stop = false; private static void worker(object obj) { int x = 0; while (!stop) { x++; } console.writeline("worker函数停止x={0}",x); }
编译器如果检查到stop为false,就生成代码来进入一个无限循环,并在循环中一直递增x,所以优化循环很快完成,但是编译器只检测stop一次,并不是每次都会检测。
例子2---两个线程同时访问:
class test { private static int m_flag = 0; private static int m_value = 0; public static void thread1(object obj) { m_value = 5; m_flag = 1; } public static void thread2(object obj) { if (m_flag == 1) console.writeline("m_value = {0}", m_value); } //多核cpu机器才会出现线程同步问题 public void exec() { var thread1 = new thread(thread1); var thread2 = new thread(thread2); thread1.start(); thread2.start(); console.readline(); } }
程序在执行的时候,编译器必须将变量m_flag和m_value从ram读入cpu寄存器,ram先传递m_value的值0,thread1把值变为5,但是thread2并不知道thread2仍然认为值为0,这种问题一般来说发生在多核cpu的概率大一些,应该cpu越多,多个线程同时访问资源的几率就越大。
关键字volatile,作用禁止c#编译器、jtp编译器和cpu执行的一些优化,如果做用于变量后,将不允许字段缓存到cpu的寄存器中,确保字段的读写都在ram中进行。
互锁构造
system.threading.interlocked类中的每个方法都执行一次原子的读取以及写入操作,调用某个interlocked方法之前的任何变量写入都在这个interlocked方法调用之前执行,而调用之后的任何变量读取都在这个调用之后读取。
interlocked方法主要是对int32变量进行静态操作add、decrement、compare、exchange、comparechange等方法,也接受object、double等类型的参数。
原子操作:是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。
代码演示:
说明:通过interlocked的方法异步查询几个web服务器,并同时返回数据,且结果只执行一次。
//上报状态类型 enum coordinationstatus { cancel, timeout, alldone }
class asynccoordinator { //allbegun 内部调用justended来递减它 private int _mopcount = 1; //0=false,1=true private int _mstatusreported = 0; private action<coordinationstatus> _mcallback; private timer _mtimer; //发起一个操作之前调用 public void abouttobegin(int opstoadd = 1) { interlocked.add(ref _mopcount, opstoadd); } //处理好一个操作的结果之后调用 public void justended() { if (interlocked.decrement(ref _mopcount) == 0) { reportstatus(coordinationstatus.alldone); } } //该方法必须在发起所有操作后调用 public void allbegin(action<coordinationstatus> callback, int timeout = timeout.infinite) { _mcallback = callback; if (timeout != timeout.infinite) { _mtimer = new timer(timeexpired, null, timeout, timeout.infinite); justended(); } } private void timeexpired(object o) { reportstatus(coordinationstatus.timeout); } public void cancel() { reportstatus(coordinationstatus.cancel); } private void reportstatus(coordinationstatus status) { //如果状态从未报告过,就报告它,否则就忽略它,只调用一次 if (interlocked.exchange(ref _mstatusreported, 1) == 0) { _mcallback(status); } } }
class multiwebrequest { //辅助类 用于协调所有的异步操作 private asynccoordinator _mac = new asynccoordinator(); protected dictionary<string,object> _mservers = new dictionary<string, object> { {"http://www.baidu.com",null},{"http://www.microsoft.com",null},{"http://www.cctv.com",null}, {"http://www.souhu.com",null},{"http://www.sina.com",null},{"http://www.tencent.com",null}, {"http://www.youku.com",null} }; private stopwatch sp; public multiwebrequest(int timeout = timeout.infinite) { sp = new stopwatch(); sp.start(); //通过异步方式一次性发起请求 var httpclient = new httpclient(); foreach (var server in _mservers.keys) { _mac.abouttobegin(1); httpclient.getbytearrayasync(server).continuewith(task => computeresult(server, task)); } _mac.allbegin(alldone,timeout); console.writeline(""); } private void computeresult(string server, task<byte[]> task) { object result; if (task.exception != null) { result = task.exception.innerexception; } else { //线程池处理io result = task.result.length; } //保存返回结果的长度 _mservers[server] = result; _mac.justended(); } public void cancel() { _mac.cancel(); } private void alldone(coordinationstatus status) { sp.stop(); console.writeline("响应耗时总计{0}",sp.elapsed); switch (status) { case coordinationstatus.cancel: console.writeline("操作取消"); break; case coordinationstatus.alldone: console.writeline("操作完成,完成的结果如下"); foreach (var server in _mservers) { console.writeline("{0}",server.key); object result = server.value; if (result is exception) { console.writeline("错误原因{0}",result.gettype().name); } else { console.writeline("返回字节数为:{0}",result); } } break; case coordinationstatus.timeout: console.writeline("操作超时"); break; default: throw new argumentoutofrangeexception("status", status, null); } } }
非常建议大家参考一下以上代码,我在对服务器进行访问时,也会常常参考这个模型。
简单的自旋锁
class someresource { private simplespinlock s1 = new simplespinlock(); public void accessresource() { s1.enter(); //一次是有一个线程才能进入访问 s1.leave(); } } class simplespinlock { private int _mresourceinuse; public void enter() { while (true) { if(interlocked.exchange(ref _mresourceinuse,1)==0) return; } } public void leave() { volatile.write(ref _mresourceinuse,1); } }
这就是一个线程同步锁的简单实现,这种锁的最大问题在于,存在竞争的情况下会造成线程的“自旋”,这会浪费cpu的宝贵时间,组织cpu做更多的工作,因此,这种自旋锁应该用于保护那些执行的非常快的代码。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持!