《编程机制探析》第十章 线程同步模型
程序员文章站
2022-05-17 13:44:00
...
《编程机制探析》第十章 线程同步模型
上一章讲解的同步锁模型只是最简单的同步模型。同一时刻,保证只有一个线程能够运行同步代码。
有的时候,我们希望处理更加复杂的同步模型,比如生产者/消费者模型、读写同步模型等。这种情况下,同步锁模型就不够用了。我们需要一个新的模型。这就是我们要讲述的信号量模型。
信号量模型的工作方式如下:线程在运行的过程中,可以主动停下来,等待某个信号量的通知;这时候,该线程就进入到该信号量的待召(Waiting)队列当中;等到通知之后,再继续运行。
很多语言里面,同步锁都由专门的对象表示,对象名通常叫Monitor。
同样,在很多语言中,信号量通常也有专门的对象名来表示,比如,Mutex,Semphore。
信号量模型要比同步锁模型复杂许多。一些系统中,信号量甚至可以跨进程进行同步。另外一些信号量甚至还有计数功能,能够控制同时运行的线程数。
我们没有必要考虑那么复杂的模型。所有那些复杂的模型,都是最基本的模型衍生出来的。只要掌握了最基本的信号量模型——“等待/通知”模型,复杂模型也就迎刃而解了。
我们还是以Java语言为例。Java语言里面的同步锁和信号量概念都非常模糊,没有专门的对象名词来表示同步锁和信号量,只有两个同步锁相关的关键字——volatile和synchronized。
这种模糊虽然导致概念不清,但同时也避免了Monitor、Mutex、Semphore等名词带来的种种误解。我们不必执着于名词之争,可以专注于理解实际的运行原理。
在Java语言里面,任何一个Object Reference都可以作为同步锁。同样的道理,任何一个Object Reference也可以作为信号量。
Object对象的wait()方法就是等待通知,Object对象的notify()方法就是发出通知。
具体调用方法为
(1)等待某个信号量的通知
public static final Object signal = new Object();
… f1() {
synchronized(singal) { // 首先我们要获取这个信号量。这个信号量同时也是一个同步锁
// 只有成功获取了signal这个信号量兼同步锁之后,我们才可能进入这段代码
signal.wait(); // 这里要放弃信号量。本线程要进入signal信号量的待召(Waiting)队列
// 可怜。辛辛苦苦争取到手的信号量,就这么被放弃了
// 等到通知之后,从待召(Waiting)队列转到就绪(Ready)队列里面
// 转到了就绪队列中,离CPU核心近了一步,就有机会继续执行下面的代码了。
// 仍然需要把signal同步锁竞争到手,才能够真正继续执行下面的代码。命苦啊。
…
}
}
需要注意的是,上述代码中的signal.wait()的意思。signal.wait()很容易导致误解。signal.wait()的意思并不是说,signal开始wait,而是说,运行这段代码的当前线程开始wait这个signal对象,即,把本线程加入到signal对象的待召(Waiting)队列里。
(2)发出某个信号量的通知
… f2() {
synchronized(singal) { // 首先,我们同样要获取这个信号量。同时也是一个同步锁。
// 只有成功获取了signal这个信号量兼同步锁之后,我们才可能进入这段代码
signal.notify(); // 这里,我们通知signal的待召队列中的某个线程。
// 如果某个线程等到了这个通知,那个线程就会转到就绪队列中
// 但是本线程仍然继续拥有signal这个同步锁,本线程仍然继续执行
// 嘿嘿,虽然本线程好心通知其他线程,
// 但是,本线程可没有那么高风亮节,放弃到手的同步锁
// 本线程继续执行下面的代码
…
}
}
需要注意的是,signal.notify()的意思。signal.notify()并不是通知signal这个对象本身。而是通知正在等待signal信号量的其他线程。
以上就是Object的wait()和notify()的基本用法。
实际上,wait()还可以定义等待时间,当线程在某信号量的待召队列中,等到足够长的时间,就会等无可等,无需再等,自己就从待召队列转移到就绪队列中了。
另外,还有一个notifyAll()方法,表示通知待召队列里面的所有线程。
这些细节问题,并不对大局产生影响,留给读者自己去研究了。
有了信号量这个利器,我们就可以处理比较复杂的线程同步模型了。
首先,我们来看一个比较简单的生产者/消费者模型。还是以Java代码为例。
public static final Object signal = new Object();
public static final char[] buf = new char[1024]; // 需要同步访问的共享资源
// 生产者代码
… produce() {
for(… ) { // 循环执行
synchronized(signal){
// 产生一些东西,放到 buf 共享资源中
signal.notify(); //然后通知消费者
signal.wait(); // 然后自己进入signal待召队列
}
}
}
// 消费者代码
… consume() {
for(… ) { // 循环执行
synchronized(signal){
signal.wait(); // 进入signal待召队列,等待生产者的通知
// 读取buf 共享资源里面的东西
signal.notify(); // 然后通知生产者
}
}
}
上述的生产者/消费者模型的实现非常简单,只用了一个信号量signal。这只是一段示意代码。
实际上的生产者/消费者模型的实现可能非常复杂。可以引入buf已满或者已空的判断,可以引入更多的信号量,也可以引入一个环状的buf链。但那些都是性能优化方面的工作,基本的信号量工作方式还是不变的。
生产者/消费者模型是典型的Coroutine(协程,即相互协作制约的线程)。而且,当消费者或者生产者线程进入待召队列的时候,当前的运行栈状态就暂时保存在系统当中,这种状况又是典型的Continuation。
注:Continuation是一种能够时而暂停、时而继续的模型,其重要特性是能够在暂停的时候,完整地保存包括运行栈在内的一切当前运行信息,从而保证之后的继续运行。有一段时间,这个概念模型颇为流行,还有不少实现。但我本人是不喜欢这个概念模型的。
我们完全可以用信号量机制自己实现Coroutine和Continuation。其实,那些在语法层面上支持Coroutine和Continuation的语言,内部实现原理也是采用类似的信号量同步机制。
比生产者/消费者模型稍微复杂一些的是读写模型。一份共享资源允许多个读者同时读取。但是只要有一个写者在写这份共享资源,任何其他的读者和写者都不能访问这份共享资源。
读写模型实现起来,不仅需要信号量机制,还需要额外的读者计数和写者计数。
public static final Object signal = new Object();
public static int readers = 0;
public static int writers = 0;
// 读者代码
… read() {
for(… ) { // 循环执行
synchronized(signal){
while( writers > 0 )
signal.wait(); // 如果有人在写,那么就放弃执行,进入待召队列
// 能够到达这里,说明没有人在写
readers ++ ; // 增加一个读者计数,表示本线程在读取
}
// 进行一些读取操作
synchronized(signal){
readers --; // 读取完成,减少一个读者计数,表示本线程不在读取
signal.notifyAll(); // 通知待召队列里面的所有其他线程
}
}
}
// 写者代码
… write() {
for(… ) { // 循环执行
synchronized(signal){
while( writers > 0 || readers > 0)
signal.wait();// 如果有人在写或读,那么就放弃执行,进入待召队列
// 能够到达这里,说明没有人在写,也没有人在读
writers ++ ; // 增加一个写者计数,表示本线程在写
// 进行一些写操作
writers --; // 读取完成,减少一个读者计数,表示本线程不在写
signal.notifyAll(); // 通知待召队列里面的所有其他线程
}
}
}
上述代码只是一段示意代码。实际应用中,人们通常抽取出来一个专门的读写同步锁。
interface ReadWriteLock {
… getReadLock();
… releaseReadLock();
… getWriteLock();
… releaseWriteLock();
}
具体的实现原理也是类似的信号量同步机制。
class RWLock {
… readers, writers;
… synchronized … getReadLock() { // 相当于synchronized(this)
…
while( writers > 0 )
this.wait(); // 这里我们把RWLock对象本身作为信号量
readers++;
}
…synchronized … releaseReadLock(){ //相当于synchronized(this)
readers--;
this.notifyAll(); // // 这里我们把RWLock对象本身作为信号量
}
…synchronized … getWriteLock(){// 相当于synchronized(this)
while( writers > 0 || readers > 0 )
this.wait(); // 这里我们把RWLock对象本身作为信号量
writers++;
}
…synchronized … releaseWriteLock(){// 相当于synchronized(this)
writers--;
this.notifyAll(); // // 这里我们把RWLock对象本身作为信号量
}
}
具体用法是
public static final RWLock lock = new RWLock();
… read() {
lock.getReadLock();
// 读取
lock.releaseReadLock();
}
… write() {
lock.getWriteLock();
// 读取
lock.releaseWriteLock();
}
这种用法要求在执行一些处理之前,一定要执行某项特殊操作,处理之后一定也要执行某项特殊操作。这种人为的顺序性,无疑增加了代码的耦合度,降低了代码的独立性。很有可能会成为线程死锁和资源操作冲突的根源。
当时,这点一直让我不安,可是没有找到方法避免。毕竟,死锁或者资源操作冲突,是线程的固有问题。
很巧的是,正在我惴惴不安的时候,我得知了一个消息。Sun公司根据JCR,决定在jdk1.5中引入关于concurrency(并发)的部分。(那时候,Java还属于Sun公司。现在,已经换了主人。回想当年,不胜唏嘘。)
以下这个网址是concurrency部分的util.concurrent一个实现。非常好的信息。对于处理多线程并发问题,很有帮助。
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
包括Lock, ReadWriteLock, CurrentHashMap, CurrentReaderHashMap等类。JDK1.5引入了这些类,作为java.util.concurrent Package。
里面提供了一个ReadWriteLock类,标准用法如下。
Standard usage of ReadWriteLock:
class X {
ReadWriteLock rw;
// ...
public void read() throws InterruptedException {
rw.readLock().acquire();
try {
// ... do the read
}
finally {
rw.readlock().release()
}
}
public void write() throws InterruptedException {
rw.writeLock().acquire();
try {
// ... do the write
}
finally {
rw.writelock().release()
}
}
}
我们可以看到,ReadWriteLock同样要求调用的顺序——aquire()和release()。我对自己的例子增强了一点信心。
我又查看了WriterPreferenceReadWriteLock类,看到里面成对的方法,startRead(),endRead();startWrite(),endWrite()。我的心情完全放松了下来。我的思路虽然粗糙,但大体的方向是正确的。
上一章讲解的同步锁模型只是最简单的同步模型。同一时刻,保证只有一个线程能够运行同步代码。
有的时候,我们希望处理更加复杂的同步模型,比如生产者/消费者模型、读写同步模型等。这种情况下,同步锁模型就不够用了。我们需要一个新的模型。这就是我们要讲述的信号量模型。
信号量模型的工作方式如下:线程在运行的过程中,可以主动停下来,等待某个信号量的通知;这时候,该线程就进入到该信号量的待召(Waiting)队列当中;等到通知之后,再继续运行。
很多语言里面,同步锁都由专门的对象表示,对象名通常叫Monitor。
同样,在很多语言中,信号量通常也有专门的对象名来表示,比如,Mutex,Semphore。
信号量模型要比同步锁模型复杂许多。一些系统中,信号量甚至可以跨进程进行同步。另外一些信号量甚至还有计数功能,能够控制同时运行的线程数。
我们没有必要考虑那么复杂的模型。所有那些复杂的模型,都是最基本的模型衍生出来的。只要掌握了最基本的信号量模型——“等待/通知”模型,复杂模型也就迎刃而解了。
我们还是以Java语言为例。Java语言里面的同步锁和信号量概念都非常模糊,没有专门的对象名词来表示同步锁和信号量,只有两个同步锁相关的关键字——volatile和synchronized。
这种模糊虽然导致概念不清,但同时也避免了Monitor、Mutex、Semphore等名词带来的种种误解。我们不必执着于名词之争,可以专注于理解实际的运行原理。
在Java语言里面,任何一个Object Reference都可以作为同步锁。同样的道理,任何一个Object Reference也可以作为信号量。
Object对象的wait()方法就是等待通知,Object对象的notify()方法就是发出通知。
具体调用方法为
(1)等待某个信号量的通知
public static final Object signal = new Object();
… f1() {
synchronized(singal) { // 首先我们要获取这个信号量。这个信号量同时也是一个同步锁
// 只有成功获取了signal这个信号量兼同步锁之后,我们才可能进入这段代码
signal.wait(); // 这里要放弃信号量。本线程要进入signal信号量的待召(Waiting)队列
// 可怜。辛辛苦苦争取到手的信号量,就这么被放弃了
// 等到通知之后,从待召(Waiting)队列转到就绪(Ready)队列里面
// 转到了就绪队列中,离CPU核心近了一步,就有机会继续执行下面的代码了。
// 仍然需要把signal同步锁竞争到手,才能够真正继续执行下面的代码。命苦啊。
…
}
}
需要注意的是,上述代码中的signal.wait()的意思。signal.wait()很容易导致误解。signal.wait()的意思并不是说,signal开始wait,而是说,运行这段代码的当前线程开始wait这个signal对象,即,把本线程加入到signal对象的待召(Waiting)队列里。
(2)发出某个信号量的通知
… f2() {
synchronized(singal) { // 首先,我们同样要获取这个信号量。同时也是一个同步锁。
// 只有成功获取了signal这个信号量兼同步锁之后,我们才可能进入这段代码
signal.notify(); // 这里,我们通知signal的待召队列中的某个线程。
// 如果某个线程等到了这个通知,那个线程就会转到就绪队列中
// 但是本线程仍然继续拥有signal这个同步锁,本线程仍然继续执行
// 嘿嘿,虽然本线程好心通知其他线程,
// 但是,本线程可没有那么高风亮节,放弃到手的同步锁
// 本线程继续执行下面的代码
…
}
}
需要注意的是,signal.notify()的意思。signal.notify()并不是通知signal这个对象本身。而是通知正在等待signal信号量的其他线程。
以上就是Object的wait()和notify()的基本用法。
实际上,wait()还可以定义等待时间,当线程在某信号量的待召队列中,等到足够长的时间,就会等无可等,无需再等,自己就从待召队列转移到就绪队列中了。
另外,还有一个notifyAll()方法,表示通知待召队列里面的所有线程。
这些细节问题,并不对大局产生影响,留给读者自己去研究了。
有了信号量这个利器,我们就可以处理比较复杂的线程同步模型了。
首先,我们来看一个比较简单的生产者/消费者模型。还是以Java代码为例。
public static final Object signal = new Object();
public static final char[] buf = new char[1024]; // 需要同步访问的共享资源
// 生产者代码
… produce() {
for(… ) { // 循环执行
synchronized(signal){
// 产生一些东西,放到 buf 共享资源中
signal.notify(); //然后通知消费者
signal.wait(); // 然后自己进入signal待召队列
}
}
}
// 消费者代码
… consume() {
for(… ) { // 循环执行
synchronized(signal){
signal.wait(); // 进入signal待召队列,等待生产者的通知
// 读取buf 共享资源里面的东西
signal.notify(); // 然后通知生产者
}
}
}
上述的生产者/消费者模型的实现非常简单,只用了一个信号量signal。这只是一段示意代码。
实际上的生产者/消费者模型的实现可能非常复杂。可以引入buf已满或者已空的判断,可以引入更多的信号量,也可以引入一个环状的buf链。但那些都是性能优化方面的工作,基本的信号量工作方式还是不变的。
生产者/消费者模型是典型的Coroutine(协程,即相互协作制约的线程)。而且,当消费者或者生产者线程进入待召队列的时候,当前的运行栈状态就暂时保存在系统当中,这种状况又是典型的Continuation。
注:Continuation是一种能够时而暂停、时而继续的模型,其重要特性是能够在暂停的时候,完整地保存包括运行栈在内的一切当前运行信息,从而保证之后的继续运行。有一段时间,这个概念模型颇为流行,还有不少实现。但我本人是不喜欢这个概念模型的。
我们完全可以用信号量机制自己实现Coroutine和Continuation。其实,那些在语法层面上支持Coroutine和Continuation的语言,内部实现原理也是采用类似的信号量同步机制。
比生产者/消费者模型稍微复杂一些的是读写模型。一份共享资源允许多个读者同时读取。但是只要有一个写者在写这份共享资源,任何其他的读者和写者都不能访问这份共享资源。
读写模型实现起来,不仅需要信号量机制,还需要额外的读者计数和写者计数。
public static final Object signal = new Object();
public static int readers = 0;
public static int writers = 0;
// 读者代码
… read() {
for(… ) { // 循环执行
synchronized(signal){
while( writers > 0 )
signal.wait(); // 如果有人在写,那么就放弃执行,进入待召队列
// 能够到达这里,说明没有人在写
readers ++ ; // 增加一个读者计数,表示本线程在读取
}
// 进行一些读取操作
synchronized(signal){
readers --; // 读取完成,减少一个读者计数,表示本线程不在读取
signal.notifyAll(); // 通知待召队列里面的所有其他线程
}
}
}
// 写者代码
… write() {
for(… ) { // 循环执行
synchronized(signal){
while( writers > 0 || readers > 0)
signal.wait();// 如果有人在写或读,那么就放弃执行,进入待召队列
// 能够到达这里,说明没有人在写,也没有人在读
writers ++ ; // 增加一个写者计数,表示本线程在写
// 进行一些写操作
writers --; // 读取完成,减少一个读者计数,表示本线程不在写
signal.notifyAll(); // 通知待召队列里面的所有其他线程
}
}
}
上述代码只是一段示意代码。实际应用中,人们通常抽取出来一个专门的读写同步锁。
interface ReadWriteLock {
… getReadLock();
… releaseReadLock();
… getWriteLock();
… releaseWriteLock();
}
具体的实现原理也是类似的信号量同步机制。
class RWLock {
… readers, writers;
… synchronized … getReadLock() { // 相当于synchronized(this)
…
while( writers > 0 )
this.wait(); // 这里我们把RWLock对象本身作为信号量
readers++;
}
…synchronized … releaseReadLock(){ //相当于synchronized(this)
readers--;
this.notifyAll(); // // 这里我们把RWLock对象本身作为信号量
}
…synchronized … getWriteLock(){// 相当于synchronized(this)
while( writers > 0 || readers > 0 )
this.wait(); // 这里我们把RWLock对象本身作为信号量
writers++;
}
…synchronized … releaseWriteLock(){// 相当于synchronized(this)
writers--;
this.notifyAll(); // // 这里我们把RWLock对象本身作为信号量
}
}
具体用法是
public static final RWLock lock = new RWLock();
… read() {
lock.getReadLock();
// 读取
lock.releaseReadLock();
}
… write() {
lock.getWriteLock();
// 读取
lock.releaseWriteLock();
}
这种用法要求在执行一些处理之前,一定要执行某项特殊操作,处理之后一定也要执行某项特殊操作。这种人为的顺序性,无疑增加了代码的耦合度,降低了代码的独立性。很有可能会成为线程死锁和资源操作冲突的根源。
当时,这点一直让我不安,可是没有找到方法避免。毕竟,死锁或者资源操作冲突,是线程的固有问题。
很巧的是,正在我惴惴不安的时候,我得知了一个消息。Sun公司根据JCR,决定在jdk1.5中引入关于concurrency(并发)的部分。(那时候,Java还属于Sun公司。现在,已经换了主人。回想当年,不胜唏嘘。)
以下这个网址是concurrency部分的util.concurrent一个实现。非常好的信息。对于处理多线程并发问题,很有帮助。
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
包括Lock, ReadWriteLock, CurrentHashMap, CurrentReaderHashMap等类。JDK1.5引入了这些类,作为java.util.concurrent Package。
里面提供了一个ReadWriteLock类,标准用法如下。
Standard usage of ReadWriteLock:
class X {
ReadWriteLock rw;
// ...
public void read() throws InterruptedException {
rw.readLock().acquire();
try {
// ... do the read
}
finally {
rw.readlock().release()
}
}
public void write() throws InterruptedException {
rw.writeLock().acquire();
try {
// ... do the write
}
finally {
rw.writelock().release()
}
}
}
我们可以看到,ReadWriteLock同样要求调用的顺序——aquire()和release()。我对自己的例子增强了一点信心。
我又查看了WriterPreferenceReadWriteLock类,看到里面成对的方法,startRead(),endRead();startWrite(),endWrite()。我的心情完全放松了下来。我的思路虽然粗糙,但大体的方向是正确的。