欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于Redis实现的分布式锁

程序员文章站 2022-06-16 08:26:03
前言Redis这种相信在各个公司都会用到,但是通过分布式的调用Redis,可能用的就不是那么多了,今天就介绍一下在分布式调用Redis的环境中,较为常见的分布式锁。目录前言分布式、微服务和集群的区别Demo背景环境搭建分布式、微服务和集群的区别有的刚接触的小伙伴,可能不太能够分清分布式和集群的区别,这两个东西经常一起谈到,有的人可能就会混为一谈,认为分布式就是集群环境,集群环境就会用到分布式。其实不然,这两个还是有本质的区别的。同时,提到分布式又会经常提到微服务,微服务和分布式有很多的共同点,又有着...

前言

锁这种功能相信只要是接触过多线程的小伙伴们都比较熟悉,但是多线程内部加锁仅能解决单体架构给我们带来的相关问题,在微服务满天飞的今天,程序内部锁已经不能满足我们的需求了,于是便有了基于各种方式实现的分布式锁,今天就介绍一下在分布式的环境中用Redis实现较为常见的分布式锁

本文将会通过迭代多个版本的代码,来由浅到深的展示:
1.无分布式锁,仅在程序内加锁的场景
2.有简单的分布式锁的场景
3.标准分布式锁场景


源码

GitHub : 基于Redis的分布式锁简单demo(一)无分布式锁,仅在程序内加锁的场景
GitHub : 基于Redis的分布式锁简单demo(二)有简单的分布式锁的场景
GitHub : 基于Redis的分布式锁简单demo(三)标准分布式锁场景(基于Redisson的可重入锁)



分布式、微服务和集群的区别

有的刚接触的小伙伴,可能不太能够分清分布式和集群的区别,这两个东西经常一起谈到,有的人可能就会混为一谈,认为分布式就是集群环境,集群环境就会用到分布式。其实不然,这两个还是有本质的区别的。同时,提到分布式又会经常提到微服务,微服务和分布式有很多的共同点,又有着一些区别,分布式强调的是服务的分散化,微服务则是更强调服务的专业化。当然,在实际应用的场景,微服务多是分布式的。
分布式:

是指将一个业务拆分不同的子业务,分布在不同的机器上执行。

集群:

是指多台服务器集中在一起,实现同一业务,可以视为一台计算机。

微服务:

是一种架构风格,一个大型复杂软件应用由一个或多个微服务组成。系统中的各个微服务可被独立部署,各个微服务之间是松耦合的。每个微服务仅关注于完成一件任务并很好地完成该任务。在所有情况下,每个任务代表着一个小的业务能力。

通俗的来说,一个业务逻辑相同的http服务,运行在不同的机器上,同时有一台或一个集群的nginx做负载均衡和转发,就可以视为是分布式了。而微服务则强调注册和发现,会有独立的注册中心,相对独立的网关,通过网关便可以实现服务的负载均衡了。
下面介绍的项目,则是基于SpringCloud的微服务架构,使用了网关进行负载均衡。


Demo背景

模拟一个购物秒杀的请求,在单体架构情况下,都用不到分布式锁,只要在服务内加锁即可实现。而在真正的业务场景中,秒杀的架构,肯定是分布式的,这就得考虑到多服务间共享了,于是Redis就很好的引入了进来。虽然Redis是单线程的,但是有可能存在多个服务同时读取到了这个被秒杀商品的数量,这样再进行修改,任然可能出现超卖,这时候光靠加锁已经解决不了问题了,于是又引入了基于Redis的分布式锁


环境搭建

环境搭建的比较简单,在服务器上用Docker运行了一个Redis(为什么用Docker?因为真的很方便),我选择将Redis的配置文件自己配置,再映射进容器,配置相关的东西,就不在这里介绍了。在下载完Docer环境后运行:

docker run -d -p 6379:6379 --name=redis -v /data/redis/:/etc/redis/ redis:5.0.5 redis-server /etc/redis/redis.conf --appendonly yes

然后通过docker ps查看一下,此时我们的redis已经启动了。
基于Redis实现的分布式锁
还需要安装一个Jmeter的压测工具,来模拟高并发的请求。这里我就安装在了我开发的windows操作系统中。先去官网下一个Jmeter,然后需要配置一下环境变量,新建一个系统变量JMETER_HOME,路径填的是你的解压路径,然后在classpath里面添加,%JMETER_HOME%\lib\ext\ApacheJMeter_core.jar;%JMETER_HOME%\lib\jorphan.jar;%JMETER_HOME%\lib/logkit-2.0.jar;然后双击bin目录中的jmeter.bat就可以启动了。
这样基础环境就搭建完了。


项目的搭建

搭建的项目比较简单,就是基于SpringCloud的Eureka做了个注册中心,然后用Zuul做了个网关,然后用一个简单的shop项目,去调Redis,进行拿库存,减库存操作。这个简单的shop项目我们启动多个,用相同的spring.application.name,这样在Erueka中就会注册成同一个服务,而后通过网关的转发,便可实现访问网关+指定后缀,转发到几个shop服务中去,实现负载均衡。
首先搭建一个父工程,详见源码中最外层的pom。
然后以最简单最快捷的方式搭建Eureka-Server和Zuul网关,在配置文件中,方便指定端口,程序名称等参数。
1、再在Eureka-Server的启动类中,添加@EnableEurekaServer注解
2、在其他服务中添加@EnableDiscoveryClient注解。
3、在Zuul中添加@EnableZuulProxy,启用网关。
Erueka-Server的配置文件:

server:
  port: 8008

eureka:
  server:
    enable-self-preservation: false
  instance:
    hostname: 192.168.0.24
  client:
    fetch-registry: false
    register-with-eureka: false
    service-url:
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
spring:
  application:
    name: eureka-server

Zuul的配置文件中,则多了个关于服务的路由的配置:

server:
  port: 8080

spring:
  application:
    name: gateway
eureka:
  instance:
    hostname: 127.0.0.1
  client:
    eureka-server-port: 8008
    service-url:
      defaultZone: http://${eureka.instance.hostname}:${eureka.client.eureka-server-port}/eureka/
zuul:
  routes:
    shop-api:
      path: /shop-api/**
      serviceId: shop

这样基础的配置就完成了,接下来可以启动两个程序看一下,在Erueka的管理界面中,可以看到zuul已经注册了。
基于Redis实现的分布式锁
而后,就是shop服务的配置了。shop服务需要引入spring-boot-starter-data-redis,添加关于redis的依赖。这样,我们需要在配置文件中,指定好redis的相关配置:

server:
  port: 8888 #服务端口,在启动多个服务时,仅需修改这个参数即可
eureka:
  instance:
    hostname: 127.0.0.1 #注册中心ip
  client:
    eureka-server-port: 8008 #注册中心端口
    service-url:
      defaultZone: http://${eureka.instance.hostname}:${eureka.client.eureka-server-port}/eureka/
spring:
  redis:
    host: 192.168.0.128 #改成你的redis的ip
    port: 6379	#改成你的Redis的端口
    password: 123456 #改成你的Redis的密码
  application:
    name: shop #给服务起的名称

然后我们声明一个Controller,在Controller中,有一个接口,接口内是实现从redis拿剩余商品数、消费一个后将剩余的商品数,重新放到redis中的操作。对Redis的操作使用了StringRedisTemplate来实现。

    @Autowired
    private StringRedisTemplate redisTemplate;

    @GetMapping("/shop")
    public String shopPhone() {
        synchronized (this) {
            int number = Integer.parseInt(String.valueOf(redisTemplate.opsForValue().get("phone")));
            if (number > 0) {
                number--;
                redisTemplate.opsForValue().set("phone", String.valueOf(number));
                System.out.println("恭喜抢到啦!"+number);
                return "恭喜抢到啦!";
            } else {
                System.out.println("没抢到...");
                return "没抢到...";
            }
        }
    }

Demo演示

版本一:无分布式锁,仅在程序内加锁的场景

一切完成,我在redis中,将phone这个商品,添加20件库存,然后我们将几个服务一起运行一下。(当然,这里你可以先只启动一个shop去测试一下)
我启动了三个shop服务,在Erueka注册中心中可以看到,分别启动在了8887、8888、8889端口上。
基于Redis实现的分布式锁
然后我们使用JMeter
1.创建一个线程组,将Number of Threads 设置为100,这个参数就是模拟并发请求的人数;Ramp-up perlod 设置为0 ,意思是同时时刻启动这100个请求,最后Loop Count设置为3,意思这100个请求执行三次,总并发量为300
基于Redis实现的分布式锁
2.然后创建一个HTTPRequest请求,在HTTPRequest中,指定好Url,url需要填写网关的url,通过网关路由到各个服务上去。
基于Redis实现的分布式锁
3.再创建一个显示结果的Aggregate Report,里面可以看到实际请求的每秒并发量。最后我们点击中上方的绿色按钮,启动它(确保你的服务已经启动)。
4.启动后,我们打开程序的控制台,可以看到剩余商品数16、12、8、6、5、3、0都出现了一次或两次的重复,说明有几次重复消费了,这样就会造成我们的商品,超量销售,就出大问题啦。
基于Redis实现的分布式锁基于Redis实现的分布式锁
基于Redis实现的分布式锁
这时候就体现出了分布式锁的必要了。
以上的代码位于:基于Redis的分布式锁简单demo(一)无分布式锁,仅在程序内加锁的场景


版本二:在代码内手动添加简单锁的实现场景

首先介绍一下思路:
1.考虑在Redis中存放一把锁,Redis中有一个setNX操作,可以在指定的 key 不存在时,为 key 设置指定的值,并返回是否设置成功的一个boolean类型。
2.依赖setNX和Redis是单线程的特性,我们可以简单的实现一个分布式锁。
3.比如A服务、B服务、C服务同时有请求进来,但是能获取到该商品的该锁的只会有一个线程。这样在宏观情况下,可以说就是实现了分布式锁。
4.然后往下再细细考虑,加锁,就需要释放锁,那么用try…finally就可以实现了。

		String lock = "phoneLock";
        /*
         * 尝试获取锁,Redis单线程,所以同时只会有一个线程获得锁
         * 未设置超时时间,在服务不出问题的情况下(如服务挂了,网络断了等),可以实现分布式锁
         */
        Boolean result = redisTemplate.opsForValue().setIfAbsent(lock, "phone");
        assert result != null;
        //假如没获取到锁
        if (!result) {
            System.out.println("活动太火爆了,请稍后再试!");
            return "活动太火爆了,请稍后再试!";
        }
        //获取到了,才需要finally去释放锁
        try {
            //取出phone的剩余数量
            String phone = redisTemplate.opsForValue().get("phone");
            if (phone == null) {
                System.out.println("没抢到...");
                return "没抢到...";
            }
            int number = Integer.parseInt(phone);
            //还有剩余
            if (number > 0) {
                //消费一个
                int newNumber = number - 1;
                //将消费完的phone的数量,重新放到redis中
                redisTemplate.opsForValue().set("phone", String.valueOf(newNumber));
                System.out.println("恭喜抢到啦!" + newNumber);
                return "恭喜抢到啦!";
            } else {
                System.out.println("没抢到...");
                return "没抢到...";
            }
        } finally {
            //释放锁
            redisTemplate.delete(lock);
        }

执行结果的“活动太火爆了,请稍后再试!”有点多,所以我把日志做了下过滤:
基于Redis实现的分布式锁基于Redis实现的分布式锁基于Redis实现的分布式锁
可以看到,的确没有再出现超售的情况了。

但是再细想,还会出现什么问题呢?

当加了锁,程序挂了,那这个锁就成了死锁了,再也没有程序能获取到它,它也会在Redis,永远得不到释放。

怎么解决这个问题呢?

很简单加一个锁超时时间就行了嘛。

但是真有这么简单吗?

加超时时间,说的很简单,但是具体加多久的超时时间呢?
为了确保服务的可靠性,总不能我还没消费完,锁就自动失效了吧?
那样其他的服务就可以拿到锁了,还是会出现超卖现象,还有一个严重的问题。

1.假如A线程先抢到了锁lock,设置了10秒的超时时间,但是10秒后,其实A并没有执行完成,但此时的锁已经失效了,B线程也可以拿到同一把lock锁了,这是第一个问题。
2.第二个问题是,别忘记,A线程还有一个释放锁的操作呀,假设B线程现在并未执行完成,但是A线程执行完成了,A线程以为是它加的锁,然后把锁释放了,这样就会造成大量的循环的问题出现。

于是再考虑,有没有办法解决上面的这个问题呢?

当然是有的,我们给每一个程序加一个唯一标识,让线程解锁的时候判断一下,是不是当前线程加的锁,是的话再将它解锁。

于是便有了以下的代码。

        String lock = "phoneLock";
		/*
         * 标识当前线程,没有该参数,可能会存在的问题:
         *  线程还没消费完,但是锁却被别的线程拿到了。
         *  而后该线程再释放锁,导致又有第三个线程可以拿到该锁,造成循环的问题。
         * 有了该标识,便可解决该问题。
         */
        String uuid = UUID.randomUUID().toString();
        //此处的时间设置多少,都是不合理的,只是为了防止比如程序挂了,网络断了等问题造成的死锁
        Boolean result = redisTemplate.opsForValue().setIfAbsent(lock, uuid, 5, TimeUnit.SECONDS);
        assert result != null;
        //假如没获取到锁
        if (!result) {
            System.out.println("活动太火爆了,请稍后再试!");
            return "活动太火爆了,请稍后再试!";
        }
        //获取到了,才需要finally去释放锁
        try {
            //取出phone的剩余数量
            String phone = redisTemplate.opsForValue().get("phone");
            if (phone == null) {
                System.out.println("没抢到...");
                return "没抢到...";
            }
            int number = Integer.parseInt(phone);
            //还有剩余
            if (number > 0) {
                //消费一个
                int newNumber = number - 1;
                //将消费完的phone的数量,重新放到redis中
                redisTemplate.opsForValue().set("phone", String.valueOf(newNumber));
                System.out.println("恭喜抢到啦!" + newNumber);
                return "恭喜抢到啦!";
            } else {
                System.out.println("没抢到...");
                return "没抢到...";
            }
        } finally {
            //获取Redis中锁存放的ID
            String id = redisTemplate.opsForValue().get(lock);
            //是当前线程设置的,才释放锁
            if (uuid.equals(id)) {
                //释放锁
                redisTemplate.delete(lock);
            }
        }

重新执行,结果还是一样,没有出现超卖的情况
思考:但是真的就没有问题了吗?
以上的代码在基于Redis的分布式锁简单demo(二)有简单的分布式锁的场景


版本三:使用成熟的第三方锁Redisson加锁的实现场景

问题还是会有的,就比如上面提出的具体要加多久的超时时间呢?单体Redis虽然能支持每秒10w+QPS,但是假如还是不够呢?这就需要用到Redis集群了,引入Redis集群就又会出现新的问题,假如Redis写库已经写入了数据,但是读库还没有同步到这个数据就又被新的线程请求了,这样同样会导致超出预期的问题发生。
那么行业内成熟的分布式锁是怎么实现的呢?
有几种方案:

1.基于数据库
2.基于Zookeeper
3.基于Redis等单线程的内存数据库

基于Redis又有Jedis,Redisson,Lettuce等三种解决方案,本文就介绍基于Redisson实现的分布式锁。
Redisson中包含了很多的功能,Redisson官网介绍的有以下的功能。
大概有:支持分布式,分布式Java锁和同步器,分布式Java服务,分布式Java对象,云上的Redis等等等等,我们这里就用到了分布式Java锁和同步器这块的功能。
基于Redis实现的分布式锁
在Redisson的分布式锁中,它实现了可重入锁(Reentrant Lock)、公平锁(Fair Lock)、联锁(MultiLock)、红锁(RedLock)、读写锁(ReadWriteLock)、信号量(Semaphore)、可过期性信号量(PermitExpirableSemaphore)、闭锁(CountDownLatch)等常见的8种锁。
本文就以最简单的可重入锁进行演示,想了解更多的小伙伴,可以自行尝试一下各种锁。GitHub中也有相关的介绍Redisson目录
可重入锁,简单来说便是在获取到锁之后,调用一个新的线程去轮询(轮询间隔大概是锁超时时间的1/3)的查看是否已到超时时间,到了而线程还没执行完,就给这个锁续上时间以便它能拿到锁继续执行,保证不被别的线程抢走。
官方的介绍是:

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

要使用Redisson,就当然要引入依赖,在spring-boot-starter中,也有相关的依赖,直接引入如下依赖就可以了。

		<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>${redisson.version}</version>
        </dependency>

而后,是需要配置相关参数的,如果是集群的,配置的就比较多了,这里提供一个官方的配置方法,作为演示,只使用最简单的单机式Redis,所以就添加一个Redisson的配置类就可以了。注入一个RedissonClient的bean,填好相关的地址和密码就可以了。

	@Bean
    public RedissonClient redisson() throws IOException {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://"+url + ":" + port).setPassword(password);
        return Redisson.create(config);
    }

最后,再对我们的服务进行一些小的修改,把原先我们自己实现的分布式锁给替换成Redisson实现的分布式锁。

		//标识锁用
        String lock = "phoneLock";
        //获取锁
        RLock rLock = redissonClient.getLock(lock);
        if (rLock.isLocked()) {
            System.out.println("活动太火爆了,请稍后再试!");
            return "活动太火爆了,请稍后再试!";
        }
        //获取到了,才需要finally去释放锁
        try {
            //加锁
            rLock.lock(5, TimeUnit.SECONDS);
            //取出phone的剩余数量
            String phone = redisTemplate.opsForValue().get("phone");
            if (phone == null) {
                System.out.println("没抢到...");
                return "没抢到...";
            }
            int number = Integer.parseInt(phone);
            //还有剩余
            if (number > 0) {
                //消费一个
                int newNumber = number - 1;
                //将消费完的phone的数量,重新放到redis中
                redisTemplate.opsForValue().set("phone", String.valueOf(newNumber));
                System.out.println("恭喜抢到啦!" + newNumber);
                return "恭喜抢到啦!";
            } else {
                System.out.println("没抢到...");
                return "没抢到...";
            }
        } finally {
            if (rLock.isLocked()) {
                //释放锁
                rLock.unlock();
            }
        }

然后同样的执行一下,输出结果如下。
基于Redis实现的分布式锁基于Redis实现的分布式锁基于Redis实现的分布式锁
通过Redisson,我们可以非常方便快捷的使用分布式锁。本项目就演示了简单的可重入锁的使用,不同场景需求Redisson也给我们提供了不同的锁,具体问题,具体分析需要什么锁。

最后再简单介绍一下各种锁吧:

  1. 可重入锁
    基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
    大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
    另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
  2. 公平锁
    它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
  3. 联锁
    基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。
  4. 红锁(RedLock)
    基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。
  5. 读写锁(ReadWriteLock)
    分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
  6. 信号量(Semaphore)
    基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
  7. 可过期性信号量(PermitExpirableSemaphore)
    基于Redis的Redisson可过期性信号量(PermitExpirableSemaphore)是在RSemaphore对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。它提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
  8. 闭锁(CountDownLatch)
    基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。

以上代码在基于Redis的分布式锁简单demo(三)标准分布式锁场景(基于Redisson的可重入锁)

本文地址:https://blog.csdn.net/mingwei_cheng/article/details/107369378