分布式锁解决方案
方案一:使用redis实现分布式锁
步骤如下:
-
通过setnx命令设置锁
-
Nil,获取失败,结束或重试
-
ok,获取锁成功
执行业务
释放锁,del删除key即可 -
异常情况,服务宕机,超时间ex结束,会自动释放锁
SET lock 1 NX 10 EX
通过执行redis SET lock 1 NX 10 EX 这条命令我们获取到这把锁,只有它不存在的时候才可以获取到;在高并发环境中,只让获取到锁的请求进入业务处理逻辑,没有获取到锁的请求排队等待,不进行业务处理,当业务处理完毕后释放锁del lock_num;为了防止断电以防分布式锁造车死锁,特意给这把锁加上了有效生存时间。
缺陷:多个线程运行之间,释放锁的时候可能存在误删的情况
比如:
- 3个进程A和B和C在执行任务,并争夺抢锁,此时A获取了锁,并设置自动过期时间为10s
- A开始执行任务,因为某种原因,业务阻塞,耗时超过了10s,此时锁自动释放了
- B恰好此时开始尝试获取锁,因为锁已经自动释放,成功获取到了锁
- A此时业务执行完毕,执行释放锁的能力(删除key),于是B的锁被释放了,而B其实还在执行业务。
- 此时进程C尝试获取锁,也成功了,因为A把B的锁哥删除掉了。
问题就出现了:进程B和进程C同时获取到了锁,这就违反了原子性。
那么如何解决这种问题呢?
方案二:
解决方案:我们可以在set锁时,存入当前线程的唯一标识!删除锁前,判断获取锁的值是不是与自己的唯一标识是否一致,如果不一致就不删除锁,只删除自己的锁。
假设我们由一个如下一个方法,采用方案二的方法:
function test(){
.....获取分布式锁,获取当前线程唯一标识,当作分布式锁的值
处理业务中调用了一下代码块
{
....获取分布式锁,获取当前线程唯一标识,当作分布式锁的值
...处理逻辑
...释放锁,删除lock_num
}
...释放锁,删除lock_num
}
我们是使用setnx的方式来实现分布式锁的,第一次程序执行会获取到锁,当时如果程序再次尝试执行setnx指令肯定会失效,程序就无法获取到锁,就会一直处于获取不到锁的状态,就由可能产生死锁。
那怎么解决呢?这就需要想办法改造成可重入锁。
方案三:重入锁
1.什么叫做重入锁呢?
所谓重入锁也叫做递归锁。指的是在同一线程内,外层函数获得锁之后,内层函数仍然可以获取到该锁。换一种说法,同一个线程再次进入同步代码块时,课可以使用自己获取的锁。
作用:可重入锁避免因同一线程中多次获取锁而导致死锁发生。 那么如果实现呢? 获取锁:首先尝试获取锁,如果获取失败,判断这个锁是否是自己的,如果是则允许再次获取。而且必须记录重复获取锁的次数。
释放锁:释放锁不能直接删除了,因为锁是可重入的,如果进入了多次,在最内层直接删除锁。导致外部的业务在没有锁的情况下执行,会有安全的问题。因此必选获取锁累计重入的次数,释放时减去可重入的次数,如果减少到0,则可以删除锁。因此,存储锁中的信息就必循包含key、重入次数。不能使用简单的key-value结构,这里推荐使用hash结构
- key:lock
- hashKey:线程信息
- hashValue:重入次数,默认是1
流程图
需要用到一些redis命令包括
- exists key :判断一个key是否存在
- hexists key filed :判断一个key是否存在
- hset key field value:给hash的field设置一个值
- hincrby key field increment:给一个hash的filed的值增加指定数据
- expire key senconds 设置一个key过期时间
- del key 删除指定的key
下面我们假设锁的key为lock,hashKey是当前线程的id:threadId ,该锁自动释放的时间为20
获取锁的步骤
1.判断lock是否存在 EXISTS lock
如果存在
判断当前线程id作为hashKey是否存在 : HEXISTS lock threadId
不存在,说明说已经有了,且不是自己获取的,获取锁失败,end
存在,说明是自己获取的锁,重入次数+1:HINCRBY lock threadId 1,去到步骤3
如果不存在,说明可以获取锁,hset key threadId 1
2.设置锁自动释放时间,expire lock 20
释放锁的步骤
1.判断当前线程id作为hashKey是否存在 hexists lock threadId
不存在,说明锁已经失效了,不用管了
存在,说明锁还在,重入次数减1 HINCRBY lock threadId -1
2.判断重入次数是否为0
为0,说明锁已经全部释放,删除key ,del lock
不为0,说明锁还在用,重置有效时间 expire 20
上面讨论的Reids锁实现方案都忽略了一个非常重要的问题:原子性问题。无论是获取锁,还是锁的过程,都是有多行Redis指令来完成的,如果不能保证这些命令执行的原子性,则整个过程都是不安全的。而Redis中支持以Lua脚本来运行多行命令,并且保证整个脚本运行的原子性。
分布式锁之Lua脚本
获取锁Lua脚本
local key=KEYS[1];
local threadId=ARGV[1];
local expireTime=ARGV[2];
if (redis.call('exists',key)==0) then
redis.call('hset',key,threadId,'1');
redis.call('expire',key,threadId,expireTime);
return 1;
end;
if (redis.call('hexists',key,threadId) ==1) then
redis.call('hincrby',key,threadId,'1');
redis.call('expire',key,expireTime);
return 1;
end;
return 0;
释放分布式锁
local key=KEYS[1];
local threadId=ARGV[1];
local expireTime=ARGV[2];
if(redis.call('hexists',key,threadId)==0) then
return nil;
end
local count=redis.call('hincrby',key,threadId,-1);
if(count>0) then
redis.call('expire',key,expireTime);
return nil;
else
redis.call('DEL',key);
return nil
end;