欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

分布式锁解决方案

程序员文章站 2022-06-21 17:06:48
...

方案一:使用redis实现分布式锁
分布式锁解决方案
步骤如下:

  • 通过setnx命令设置锁

  • Nil,获取失败,结束或重试

  • ok,获取锁成功
    执行业务
    释放锁,del删除key即可

  • 异常情况,服务宕机,超时间ex结束,会自动释放锁

  SET lock 1 NX 10 EX

通过执行redis SET lock 1 NX 10 EX 这条命令我们获取到这把锁,只有它不存在的时候才可以获取到;在高并发环境中,只让获取到锁的请求进入业务处理逻辑,没有获取到锁的请求排队等待,不进行业务处理,当业务处理完毕后释放锁del lock_num;为了防止断电以防分布式锁造车死锁,特意给这把锁加上了有效生存时间。

缺陷:多个线程运行之间,释放锁的时候可能存在误删的情况
比如:

  1. 3个进程A和B和C在执行任务,并争夺抢锁,此时A获取了锁,并设置自动过期时间为10s
  2. A开始执行任务,因为某种原因,业务阻塞,耗时超过了10s,此时锁自动释放了
  3. B恰好此时开始尝试获取锁,因为锁已经自动释放,成功获取到了锁
  4. A此时业务执行完毕,执行释放锁的能力(删除key),于是B的锁被释放了,而B其实还在执行业务。
  5. 此时进程C尝试获取锁,也成功了,因为A把B的锁哥删除掉了。
    问题就出现了:进程B和进程C同时获取到了锁,这就违反了原子性。

那么如何解决这种问题呢?

方案二:

解决方案:我们可以在set锁时,存入当前线程的唯一标识!删除锁前,判断获取锁的值是不是与自己的唯一标识是否一致,如果不一致就不删除锁,只删除自己的锁。
分布式锁解决方案
假设我们由一个如下一个方法,采用方案二的方法:

function test(){
   .....获取分布式锁,获取当前线程唯一标识,当作分布式锁的值
            处理业务中调用了一下代码块
               {
                    ....获取分布式锁,获取当前线程唯一标识,当作分布式锁的值
                     ...处理逻辑
                     ...释放锁,删除lock_num
               }
                    
   ...释放锁,删除lock_num
}

我们是使用setnx的方式来实现分布式锁的,第一次程序执行会获取到锁,当时如果程序再次尝试执行setnx指令肯定会失效,程序就无法获取到锁,就会一直处于获取不到锁的状态,就由可能产生死锁。
那怎么解决呢?这就需要想办法改造成可重入锁。

方案三:重入锁

1.什么叫做重入锁呢?
所谓重入锁也叫做递归锁。指的是在同一线程内,外层函数获得锁之后,内层函数仍然可以获取到该锁。换一种说法,同一个线程再次进入同步代码块时,课可以使用自己获取的锁。
作用:可重入锁避免因同一线程中多次获取锁而导致死锁发生。 那么如果实现呢? 获取锁:首先尝试获取锁,如果获取失败,判断这个锁是否是自己的,如果是则允许再次获取。而且必须记录重复获取锁的次数。
释放锁:释放锁不能直接删除了,因为锁是可重入的,如果进入了多次,在最内层直接删除锁。导致外部的业务在没有锁的情况下执行,会有安全的问题。因此必选获取锁累计重入的次数,释放时减去可重入的次数,如果减少到0,则可以删除锁。

因此,存储锁中的信息就必循包含key、重入次数。不能使用简单的key-value结构,这里推荐使用hash结构

  • key:lock
  • hashKey:线程信息
  • hashValue:重入次数,默认是1

流程图
需要用到一些redis命令包括

  • exists key :判断一个key是否存在
  • hexists key filed :判断一个key是否存在
  • hset key field value:给hash的field设置一个值
  • hincrby key field increment:给一个hash的filed的值增加指定数据
  • expire key senconds 设置一个key过期时间
  • del key 删除指定的key

分布式锁解决方案

下面我们假设锁的key为lock,hashKey是当前线程的id:threadId ,该锁自动释放的时间为20

获取锁的步骤
  1.判断lock是否存在  EXISTS lock
	         
    如果存在
          判断当前线程id作为hashKey是否存在 :  HEXISTS lock threadId
                     不存在,说明说已经有了,且不是自己获取的,获取锁失败,end
                     存在,说明是自己获取的锁,重入次数+1:HINCRBY lock threadId 1,去到步骤3
    如果不存在,说明可以获取锁,hset key threadId 1
  2.设置锁自动释放时间,expire lock 20
 释放锁的步骤
      1.判断当前线程id作为hashKey是否存在 hexists lock threadId
          不存在,说明锁已经失效了,不用管了
          存在,说明锁还在,重入次数减1 HINCRBY lock threadId -1
     2.判断重入次数是否为00,说明锁已经全部释放,删除key ,del lock
         不为0,说明锁还在用,重置有效时间  expire 20

上面讨论的Reids锁实现方案都忽略了一个非常重要的问题:原子性问题。无论是获取锁,还是锁的过程,都是有多行Redis指令来完成的,如果不能保证这些命令执行的原子性,则整个过程都是不安全的。而Redis中支持以Lua脚本来运行多行命令,并且保证整个脚本运行的原子性。

分布式锁之Lua脚本

获取锁Lua脚本

  local  key=KEYS[1];
  local threadId=ARGV[1];
  local expireTime=ARGV[2];
    if (redis.call('exists',key)==0) then
      redis.call('hset',key,threadId,'1');
      redis.call('expire',key,threadId,expireTime);
      return 1;
   end;
  if (redis.call('hexists',key,threadId) ==1) then
        redis.call('hincrby',key,threadId,'1');
        redis.call('expire',key,expireTime);
        return 1;
  end;
  return 0;

释放分布式锁

  local  key=KEYS[1];
  local threadId=ARGV[1];
  local expireTime=ARGV[2];
  if(redis.call('hexists',key,threadId)==0) then
      return nil;
   end
  local count=redis.call('hincrby',key,threadId,-1);
  if(count>0) then
  redis.call('expire',key,expireTime);
  return nil;
  else
       redis.call('DEL',key);
       return nil
  end;
相关标签: 高并发高可用