缓存分布式锁探索
先描述一下业务场景,
场景1:mq消息处理各个来源的数据,进行用户初始化,写入detail表,由于mq的重试机制以及高并发环境下,用户记录可能存在写入多次的情况,需要保证初始化时,每个用户(以下为pin)只能允许一个事务进行操作,同时每个唯一key也只能允许一个线程进行插入,因此需要使用锁。
场景2:后台计算worker,根据每个时间段对detail表中新插入的数据进行计算,也是只允许同一时间只有一个worker在执行,理论上最优方案是通过worker调度平台+dns负载均衡来实现,但为了简单,还是使用锁来实现(使用锁的方案有一个很严重的问题,文末会提到)。
多说无用,直接看第一个版本
//版本1
public Boolean tryLock(String key) {
Boolean isLock = true;
try{
if(cluster.incr(key) > 1){// 1
return false;
}
}catch (Exception e){
cluster.del(key);
isLock = false;
}
cluster.expire(key,1, TimeUnit.MINUTES);
return isLock;
}
一开始想到的就是通过incr的原子操作进行判断,当累加数大于1之后,就获取锁失败,在获取锁成功之后,设置过期时间,防止死锁。这种写法咋一看没啥问题,不过组内review之后,发现由于incr和expire的操作是分开的,并不是一个原子操作,因此在incr之后,expire之前,出现错误,比如实例出错导致未执行expire,将导致这个key永远死锁,因此迭代出下一个版本。
//版本2
public Boolean tryLock(String key) {
Boolean isLock = true;
try{
if(cluster.incr(key) > 1){
if(cluster.incr(key) > 2){//只锁一次,防止实例出错造成死锁
cluster.del(key);
return false;
}
return false;
}
}catch (Exception e){
cluster.del(key);
isLock = false;
}
cluster.expire(key,1, TimeUnit.MINUTES);
return isLock;
}
既然出错会死锁,就判断当再次被锁时,就删除这个锁,想法是很好,但仔细一想就会发现问题,此时的判断死锁的数值为2,如果当有4个实例同时请求锁时,第二个请求会被锁,第三个也会被锁,但是第四个就可以获取到锁,导致锁功能失效。
使用incr根本问题在于操作和设置过期时间的非原子型,需要其他的办法解决,经过参考网上的其他方案之后,有了版本3。
//版本3
public Boolean tryLock(String key) {
try{
long timeout = TimeUnit.MINUTES.toMillis(15);
long timestamp = System.currentTimeMillis() + timeout + 1;
if (cluster.setNX(key, String.valueOf(timestamp))) {
return true;
}
long lockTimestamp = Long.valueOf(cluster.get(key));
if (System.currentTimeMillis() > lockTimestamp) {
lockTimestamp = Long.valueOf(cluster.getSet(key, String.valueOf(timestamp)));
if (System.currentTimeMillis() > lockTimestamp) {
return true;
}
}
return false;
}catch (Exception e){
cluster.del(key);
return false;
}
}
通过把过期时间写入value,由于setNx是原子操作,可以保证只要获取到锁,过期时间一定写入,并且保证过期之后,也能立即失效,是一个理想的解决方案。
以为这就结束了吗?太天真了,有加锁就必然会有解锁,此时的解锁代码如下:
public void unlock(String key) {
if(cluster.get(key) == null){
return;
}
long lockTimestamp = Long.valueOf(cluster.get(key));
if (System.currentTimeMillis() > lockTimestamp) {
cluster.del(key);
}
}
当判断这个key过期之后,就删除这个锁,也是很合理的逻辑,但是也经不起推敲,当业务处理时间超过过期时间,会导致锁失效,造成数据不一致;
//最终版
private long timeout = TimeUnit.MINUTES.toMillis(30);
private ThreadLocal<Boolean> threadOwner = new ThreadLocal<Boolean>();
public Boolean tryLock(String key) {
try {
long timestamp = System.currentTimeMillis() + timeout + 1;
if (cluster.setNX(key, String.valueOf(timestamp))) {
threadOwner.set(true);
return true;
}
long lockTimestamp = Long.valueOf(cluster.get(key));
if (System.currentTimeMillis() > lockTimestamp) {
lockTimestamp = Long.valueOf(cluster.getSet(key, String.valueOf(timestamp)));
if (System.currentTimeMillis() > lockTimestamp) {
return true;
}
}
return false;
} catch (Exception e) {
cluster.del(key);
return false;
}
}
public void unlock(String key) {
if(threadOwner.get() != null && threadOwner.get()){
threadOwner.remove();
cluster.del(key);
}else {
if(cluster.get(key) == null){
return;
}
long lockTimestamp = Long.valueOf(cluster.get(key));
if (System.currentTimeMillis() > lockTimestamp) {
cluster.del(key);
}
}
}
这里使用线程缓存来保证只有获取锁的实例才能进行删除锁操作,同时把过期时间调长,只要保证能超过业务处理时间即可。
最后就是前文提到的使用锁方案的问题,在多实例的环境下,由于后台计算worker是周期性运行,如果某一个实例的时间比其他实例时间早,那么所有的worker都将由一台机器执行,造成性能瓶颈,因此后续将使用调度平台来进行worker的分配。
上一篇: JAVA Lab1
下一篇: ucore lab2