什么是死锁
一.简介
根据上一篇文章互斥锁
死锁实验,死锁定义:一组互相互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。
class Account {
private int balance;
// 转账
void transfer(Account target, int amt){
// 锁定转出账户
synchronized(this){ ①
// 锁定转入账户
synchronized(target){ ②
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}
}
我们假设线程 T1 执行账户 A 转账户 B 的操作,账户 A.transfer(账户 B);同时线程 T2 执行账户 B 转账户 A 的操作,账户 B.transfer(账户 A)。当 T1 和 T2 同时执行完①处的代码时,T1 获得了账户 A 的锁(对于 T1,this 是账户 A),而 T2 获得了账户 B 的锁(对于 T2,this 是账户 B)。之后 T1 和 T2 在执行②处的代码时,T1 试图获取账户 B 的锁时,发现账户 B 已经被锁定(被 T2 锁定),所以 T1 开始等待;T2 则试图获取账户 A 的锁时,发现账户 A 已经被锁定(被 T1 锁定),所以 T2 也开始等待。于是 T1 和 T2 会无期限地等待下去,也就是我们所说的死锁了。
二.预防死锁
Coffman 大佬,总结发生死锁条件
- 互斥,共享资源X和Y只能被一个线程占用;
- 占用且等待,线程T1已经取得共享资源X,在等待共享资源Y的时候,不释放共享资源X;
- 不可抢占,其他线程不能强行抢占线程T1占有的资源;
- 循环等待,线程T1等待线程T2占有资源,线程T2等待线程T1占有的资源,就是循环等待。
破坏其中一个,就可以避免死锁的发生。
解决
- 对应“占用且等待”这个条件,我们可以一次性申请所有的资源,这样就不存在等待了。
- 对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。
- 对于“循环等待”这个条件,可以靠按序申请资源来预防。所谓按序申请,是指资源是有线性顺序的,申请的时候可以先申请资源序号小的,再申请资源序号大的,这样线性化后自然就不存在循环了。
2.1 破坏占用且等待条件
从理论上讲,要破坏这个条件,可以一次性申请所有资源。在现实世界里,就拿前面我们提到的转账操作来讲,它需要的资源有两个,一个是转出账户,另一个是转入账户,当这两个账户同时被申请时。
@Data
public class AccountClassLock4 {
private Allocator actr = Allocator.getInstance();
private int balance;
public AccountClassLock4(int balance) {
this.balance = balance;
}
//转账
public void transfer(AccountClassLock4 target,int amt){
//申请转出和转入账号,直到成功
while (!actr.apply(this,target)) { }
try {
synchronized (this){
synchronized (target){
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}
}
}finally {
actr.free(this,target);
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000; i++) {
AccountClassLock4 a = new AccountClassLock4(200);
AccountClassLock4 b = new AccountClassLock4(200);
AccountClassLock4 c = new AccountClassLock4(200);
Thread t1 = new Thread(()->{
a.transfer(b,100);
});
Thread t2 = new Thread(()->{
b.transfer(c,100);
});
t1.start();
t2.start();
t2.join();
System.out.println(a.getBalance());
System.out.println(b.getBalance());
System.out.println(c.getBalance());
System.out.println("----------------");
}
}
}
class Allocator{
private static Allocator instance = new Allocator();
public static Allocator getInstance() {
return instance;
}
private Allocator(){}
private List<Object> als = new ArrayList<>();
synchronized boolean apply(Object a,Object b){
if(als.contains(a) || als.contains(b)){
return false;
}else{
als.add(a);
als.add(b);
}
return true;
}
synchronized void free(Object a,Object b){
als.remove(a);
als.remove(b);
}
}
优化
// 一次性申请转出账户和转入账户,直到成功
while(!actr.apply(this, target))
;
如果 apply() 操作耗时非常短,而且并发冲突量也不大时,这个方案还挺不错的,因为这种场景下,循环上几次或者几十次就能一次性获取转出账户和转入账户了。但是如果 apply() 操作耗时长,或者并发冲突量大的时候,循环等待这种方案就不适用了,因为在这种场景下,可能要循环上万次才能获取到锁,太消耗 CPU 了。
利用等待-通知模式优化,使用线程阻塞方式就避免循环等待消耗CPU的问题。
@Data
public class AccountClassLock7 {
private Allocator actr = Allocator.getInstance();
private int balance;
public AccountClassLock7(int balance) {
this.balance = balance;
}
public void transfer(AccountClassLock7 target,int amt){
actr.apply(this,target);
try {
if (this.balance > amt){
this.balance -= amt;
target.balance += amt;
}
}finally {
actr.free(this,target);
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000; i++) {
AccountClassLock7 a = new AccountClassLock7(200);
AccountClassLock7 b = new AccountClassLock7(200);
AccountClassLock7 c = new AccountClassLock7(200);
Thread t1 = new Thread(()->{
a.transfer(b,100);
});
Thread t2 = new Thread(()->{
b.transfer(c,100);
});
t1.start();
t2.start();
t2.join();
System.out.println(a.getBalance());
System.out.println(b.getBalance());
System.out.println(c.getBalance());
System.out.println("----------------");
}
}
static class Allocator{
private static Allocator instance = new Allocator();
public static Allocator getInstance() {
return instance;
}
private Allocator(){}
private List<Object> als = new ArrayList<>();
synchronized void apply(Object a,Object b){
while(als.contains(a) || als.contains(b)){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
als.add(a);
als.add(b);
}
synchronized void free(Object a,Object b){
als.remove(a);
als.remove(b);
this.notifyAll();
}
}
}
注意
因为notify()只能保证通知时间点,条件满足,而被通知线程的执行时间点和通知的时间点基本上不会重合,所以当线程执行的时候,很可能条件已经不满足(保不齐有其他线程插队)。
2.2 破坏不可抢占条件
破坏不可抢占条件看上去很简单,核心是要能够主动释放它占有的资源,这一点 synchronized 是做不到的。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。
@Data
public class AccountClassLock6 {
private int balance;
private final Lock locks = new ReentrantLock();
public AccountClassLock6(int balance) {
this.balance = balance;
}
public void transfer(AccountClassLock6 target,int amt) throws InterruptedException {
boolean flag = true;
Random random = new Random();
while (flag){
if(this.locks.tryLock(random.nextInt(1000)+1 , TimeUnit.MILLISECONDS)){
try {
if(target.locks.tryLock()){
try {
if(this.balance > amt){
this.balance -= amt;
target.balance += amt;
flag = false;
}
}finally {
target.locks.unlock();
}
}
}finally {
this.locks.unlock();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000; i++) {
AccountClassLock6 a = new AccountClassLock6(200);
AccountClassLock6 b = new AccountClassLock6(200);
AccountClassLock6 c = new AccountClassLock6(200);
Thread t1 = new Thread(()->{
try {
a.transfer(b,100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread t2 = new Thread(()->{
try {
b.transfer(c,100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t1.start();
t2.start();
t2.join();
System.out.println(a.getBalance());
System.out.println(b.getBalance());
System.out.println(c.getBalance());
System.out.println("----------------");
}
}
}
只在外层Lock加上阻塞时间就行,如果在内层加上,内层锁阻塞一段时间,外层锁没有释放,这段时间可能形成死锁,破坏活锁一个随机时间就够了。
2.3 破坏循环等待条件
破坏这个条件,需要对资源进行排序,然后按序申请资源。这个实现非常简单,我们假设每个账户都有不同的属性 id,这个 id 可以作为排序字段,申请的时候,我们可以按照从小到大的顺序来申请。
@Data
public class AccountClassLock5 {
private int id;
private int balance;
public AccountClassLock5(int id, int balance) {
this.id = id;
this.balance = balance;
}
public void transfer(AccountClassLock5 target,int amt){
AccountClassLock5 left = this;
AccountClassLock5 right = target;
if(left.id > right.id){
left = target;
right = this;
}
synchronized (left){
synchronized (right){
if (this.balance > amt) {
this.balance -= amt;
target.balance += amt;
}
}
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000; i++) {
AccountClassLock5 a = new AccountClassLock5(1, 200);
AccountClassLock5 b = new AccountClassLock5(2, 200);
AccountClassLock5 c = new AccountClassLock5(3, 200);
Thread t1 = new Thread(()->{
a.transfer(b,100);
});
Thread t2 = new Thread(()->{
b.transfer(c,100);
});
t1.start();
t2.start();
t2.join();
System.out.println(a.getBalance());
System.out.println(b.getBalance());
System.out.println(c.getBalance());
System.out.println("----------------");
}
}
}
2.4 小结
并发大师 Doug Lea《Java 并发编程:设计原则与模式》一书中,推荐的三个用锁的最佳实践
- 永远只在更新对象的成员变量时加锁;
- 永远只在访问可变的成员时加锁;
- 永远不在调用其他对象的方法时加锁。
参考
《Java并发编程实战》
公众号
微信公众号(bigdata_limeng)
本文地址:https://blog.csdn.net/qq_19968255/article/details/109641428