PV操作与信号量
今天面试,面试官,问道信号量的问题,当时有点蒙了,实在是太久了,不知道该怎么答,就胡乱说了两句,哈哈哈
所以呢准备在复习一下信号量PV操作,在计算机操作系统中,PV操作是进程管理中的难点,用于控制进程之间的同步,是一种低级、最基础的一种同步机制,同时PV不仅可以使用在进程中还可用使用在线程中
什么是信号量
信号量(semaphore)的数据结构为一个值和一个指针,指针指向等待该信号量的下一个进程。信号量的值与相应资源的使用情况有关。当它的值大于0时,表示当前可用资源的数量;当它的值小于0时,其绝对值表示等待使用该资源的进程个数。注意,信号量的值仅能由PV操作来改变。
一般来说,信号量S>=0时,S表示可用资源的数量。执行一次P操作意味着请求分配一个单位资源,因此S的值减1;当S<0时,表示已经没有可用资源,请求者必须等待别的进程释放该类资源,它才能运行下去。而执行一个V操作意味着释放一个单位资源,因此S的值加1;若S<0,表示有某些进程正在等待该资源,因此要唤醒一个等待状态的进程,使之运行下去。
PV操作
PV操作是典型的同步机制之一, PV操作与信号量的处理相关,P表示通过的意思,V表示释放的意思。PV操作即可实现同步,也可以实现互斥。
什么是互斥
互斥是进程(线程)之间的间接制约关系。当一个进程(线程)进入临界区使用临界资源时,另一个进程(线程)必须等待。只有当使用临界资源的进程退出临界区后,这个进程才会解除阻塞状态。简单的来说,就是某一资源同一时间只能由一个进程(线程)访问。
类似的场景:
- 比如进程B需要访问打印机,但此时进程A占有了打印机,进程B会被阻塞,直到进程A释放了打印机资源,进程B才可以继续执行
- 比如某一个共享代码片段,同一个时间,只能由一个线程执行,当有一个线程执行时,其它线程将会等待
什么是同步
同步是进程(线程)之间直接的制约关系,是为完成某种任务而建立的两个或多个线程,这个线程需要在某些位置上协调他们的工作次序而等待、传递信息所产生的制约关系,进程(线程)间的直接制约关系来源于他们之间的合作关系(依赖关系)。所以同步是一种更为复杂的互斥。简单的来说,就是进程(线程)的运行必须严格按照某种先后次序来运行,从而完成的特定的任务。
类似的场景:
- 比如线程A、B同时允许某计算任务,线程A的计算任务在某一时刻的依赖于线程B任务产生的数据,这时候如果线程B 还未产生相应的数据,线程A只好等待线程B,只有线程B计算出相应的结果后线程A在接着往下运行。
- 比如进程A需要从缓冲区读取进程B产生的信息,当缓冲区为空时,进程B因为读取不到信息而被阻塞。而当进程A产生信息放入缓冲区时,进程B才会被唤醒。
使用PV操作实现同步操作
用一个信号量S与一个消息(或者资源)联系起来,当信号量的值为0时,表示期望的消息尚未产生(或者资源不可用);当信号量的值S>0时,表示期望的消息已经存在(有资源可用)。用P V操作实现进程同步时,调用P操作测试消息是否到达(是否有资源可用),调用V操作发送消息(添加资源)。
使用PV操作实现进程(线程)同步时应该注意的是:
- 分析进程(线程)间的制约关系,确定信号量种类。在保持进程(线程)间有正确的同步关系情况下,哪个进程先执行,哪些进程后执行,彼此间通过什么资源(信号量)进行协调,从而明确要设置哪些信号量。
- 信号量的初值与相应资源的数量有关,也与P、V操作在程序代码中出现的位置有关。
- 同一信号量的P、V操作一般必须要成对出现,但它们分别在不同的进程(线程)代码中。
使用PV操作实现互斥操作
使用PV操作实现进程互斥时应该注意的是:
- 同一信号量的P、V操作必须要成对出现,且出现在同一的进程(线程)代码中。
- 每个互斥的P、V操作必须成对出现,先做P操作,进临界区,后做V操作,出临界区。若有多个分支,要认真检查其成对性, 否则会造成死锁。
- P、V操作应分别紧靠临界区的头尾部,临界区的代码应尽可能短,不能有死循环。
- 互斥信号量的初值一般为1
JAVA PV 同步DOME
需求是这样的,线程A、B同时进行计算任务,当线程A执行到第50次是需要线程B第50次的执行结果,且必须线程A拿到B的执行结果后线程B才接着往下执行,当线程B执行完计算任务后,需要线程A的执行结果,看代码
PV实现
package dome;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* 类PV.java的实现描述:PV信号量操作类
* @author wangbo 2018年9月13日 下午3:57:27
*/
public class PV {
private AtomicInteger s;
public PV() {
this.s = new AtomicInteger(1);
}
public PV(int s) {
this.s = new AtomicInteger(s);
}
/**
* 获取资源, 获取到资源返回 true, 反之返回 false
* @return
*/
public boolean P() {
int p = s.get();
//判断信号量是否 == 0; = 0 说明无资源可用
if (p > 0 && s.compareAndSet(p, p - 1)) {
return true;
}
return false;
}
/**
* 释放资源
*/
public void V(){
s.getAndIncrement();
}
public int get(){
return s.get();
}
}
PV 同步操作DOME
package dome;
import java.io.IOException;
import java.util.Random;
/**
*
* 类SyncTask.java的实现描述:PV信号量操作实例
* @author wangbo 2018年9月13日 下午3:58:30
*/
public class SyncTask {
private static int a = 0, b = 0;
private static Random random = new Random();
public static void main(String[] args) throws IOException {
//初始化为无资源可用,分别拥有线程A、线程B
PV pv1 = new PV(0);
PV pv2 = new PV(0);
//用于主线程等待子线程完成
PV pv3 = new PV(2);
SyncTask task = new SyncTask();
Thread threadA = task.getThreadA(pv1, pv2, pv3);
Thread threadB = task.getThreadB(pv1, pv2, pv3);
threadA.start();
threadB.start();
while(pv3.get() != 0){
}
System.out.println(a);
System.out.println(b);
}
public Thread getThreadA(PV pv1, PV pv2, PV pv3) {
return new Thread(() -> {
int i = 100;
while ((i--) > 0) {
/**
* 停顿一定的时间,模拟不同的执行时间
*/
try {
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
a++;
// 当计算50次后,需要线程B的计算结果
if (i == 50) {
//获取资源
while(!pv1.P()){
System.out.println("等待线程B的50次执行结果");
}
a += b;
//添加资源
pv2.V();
}
}
System.out.println("线程A执行完成");
//执行完成
pv2.V();
pv3.P();
});
}
public Thread getThreadB(PV pv1, PV pv2, PV pv3) {
return new Thread(() -> {
int i = 100;
while ((i--) > 0) {
/**
* 停顿一定的时间,模拟不同的执行时间
*/
try {
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
b++;
// 执行 50 次后将结果给线程A
if (i == 50) {
System.out.println("线程B执行50次结果: b = " + b);
//添加资源
pv1.V();
//确认线程A已经拿到了值
while (!pv2.P()) {
System.out.println("线程A还未获取值");
}
}
}
//需要线程A的计算结果
while (!pv2.P()) {
System.out.println("等待线程A的执行结果");
}
b += a;
System.out.println("线程B执行完成");
pv3.P();
});
}
}
JAVA PV 互斥DOME
在上面的基础上,添加了统计线程总共执行了多少次计算
package dome;
import java.io.IOException;
import java.util.Random;
public class MutexTask {
private static int a = 0, b = 0, y;
private static Random random = new Random();
//信号量四用于线程互斥操作
private PV pv4 = new PV();
public static void main(String[] args) throws IOException {
//初始化为无资源可用,分别拥有线程A、线程B
PV pv1 = new PV(0);
PV pv2 = new PV(0);
//用于主线程等待子线程完成
PV pv3 = new PV(2);
MutexTask task = new MutexTask();
Thread threadA = task.getThreadA(pv1, pv2, pv3);
Thread threadB = task.getThreadB(pv1, pv2, pv3);
threadA.start();
threadB.start();
while (pv3.get() != 0) {
}
//System.out.println(a);
//System.out.println(b);
System.out.println(y);
}
private Thread getThreadA(PV pv1, PV pv2, PV pv3) {
return new Thread(() -> {
int i = 200;
while ((i--) > 0) {
count();
/**
*
* 停顿一定的时间,模拟不同的执行时间
*/
try {
Thread.sleep(random.nextInt(20));
} catch (InterruptedException e) {
e.printStackTrace();
}
a++;
// 当计算50次后,需要线程B的计算结果
if (i == 50) {
//获取资源
while (!pv1.P()) {
// System.out.println("等待线程B的50次执行结果");
}
a += b;
//添加资源
pv2.V();
}
}
//System.out.println("线程A执行完成");
//执行完成
pv2.V();
pv3.P();
});
}
private Thread getThreadB(PV pv1, PV pv2, PV pv3) {
return new Thread(() -> {
int i = 200;
while ((i--) > 0) {
count();
/**
* 停顿一定的时间,模拟不同的执行时间
*/
try {
Thread.sleep(random.nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
b++;
// 执行 50 次后将结果给线程A
if (i == 50) {
System.out.println("线程B执行50次结果: b = " + b);
//添加资源
pv1.V();
//确认线程A已经拿到了值
while (!pv2.P()) {
// System.out.println("线程A还未获取值");
}
}
}
//需要线程A的计算结果
while (!pv2.P()) {
//System.out.println("等待线程A的执行结果");
}
b += a;
//System.out.println("线程B执行完成");
pv3.P();
});
}
/**
* 统计总共计算的次数
*/
public void count() {
//同步互斥
while(!this.pv4.P()){
}
/**
* 这里为了模拟没有互斥情况下统计不一致,分开写并暂时一定时间
*/
int i = y + 1;
try {
Thread.sleep(random.nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
y = i;
System.out.println(y);
this.pv4.V();
}
}
上一篇: 异常
下一篇: 线程安全问题和解决方案