欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Redis—Springboot整合Redisson超卖问题解决

程序员文章站 2022-03-26 22:00:47
前言上一篇博客中,根据超卖问题,逐步分析了在不同环境下,多种思路的局限性。这篇博客具体针对上一篇博客中存在的问题做合理解决。Redisson查考资料redisson 官方github文档Redisson和jedis都是java实现的Redis的客户端。但Redisson比jedis具有更多的功能。依赖引入org.redissonredisson

前言

上一篇博客中,根据超卖问题,逐步分析了在不同环境下,多种思路的局限性。
这篇博客具体针对上一篇博客中存在的问题做合理解决。

Redisson查考资料

redisson 官方github文档
Redisson和jedis都是java实现的Redis的客户端。但Redisson比jedis具有更多的功能。

依赖引入

<dependency>
	<groupId>org.redisson</groupId>
	<artifactId>redisson</artifactId>
	<version>3.6.5</version>
</dependency>

配置文件编写

在Redis官方文档中,配置采取java代码的方式。由于本次使用的是Springboot+Redisson整合的方式,所以需要构建操作的bean。

yml中关于redis数据库的连接配置:

spring:
  redis:
    host: 192.168.99.100
    port: 10000
    password: linkpower
    timeout: 10000 #连接超时时间
    jedis: ## jedis配置
      pool: ## 连接池配置
        max-idle: 8 ## 最大空闲连接数
        max-active: 8 ## 最大连接数
        max-wait: 3000 ## 最大阻塞等待时间
        min-idle: 0 ## 最小空闲连接数

创建操作的bean配置类(参考资料Redisson官方单节点配置):

import org.redisson.Redisson;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Redisson配置类
 * @author 
 *
 */
@Configuration
public class RedissonConfig {
	
	Logger log = LoggerFactory.getLogger(RedissonConfig.class);
	
	@Value("${spring.redis.host}")
	private String redis_host;
	
	@Value("${spring.redis.port}")
	private String redis_port;
	
	/**
	 * 单机模式 
	 * @return
	 */
	@Bean
	public Redisson createRedisson() {
		log.info("host:{},port:{}",redis_host,redis_port);
		Config config = new Config();
		config.useSingleServer().setAddress("redis://"+redis_host+":"+redis_port).setDatabase(0);
		return (Redisson) Redisson.create(config);
	}
}

超卖问题解决

使用Redisson解决前一篇博客最后出现的问题。

	/**
	 * 模拟商品超卖代码 <br>
	 * 使用Redisson完成加锁、逻辑超时续期、释放锁等操作
	 * @return
	 */
	@RequestMapping("/deductStock9")
	public String deductStock9() {
		// 创建一个key,保存至redis
		String key = "lock";
		//生成、获取锁
		RLock lock = redisson.getLock(key);
		try {
			// 加锁
			lock.lock();
			// 获取Redis数据库中的商品数量
			Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
			// 减库存
			if (stock > 0) {
				int realStock = stock - 1;
				stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));
				System.out.println("商品扣减成功,剩余商品:" + realStock);
			} else {
				System.out.println("库存不足.....");
			}
		} finally {
			//释放锁
			lock.unlock();
		}
		return "end";
	}

Redisson是如何保证的

查看Redisson源码。

String key = "lock";
RLock lock = redisson.getLock(key);

org.redisson.RedissonLock.RedissonLock(CommandAsyncExecutor, String)中,根据给定的key设置类属性。

 public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.id = commandExecutor.getConnectionManager().getId();
        //超时时间
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.entryName = id + ":" + name;
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }

超时间数为(org.redisson.config.Config.lockWatchdogTimeout):默认30s

private long lockWatchdogTimeout = 30 * 1000;

其中加锁、锁续命逻辑在以下代码中实现:

lock.lock();

查看源码(org.redisson.RedissonLock.lock()),逐个判断分析得到核心逻辑代码如下所示:

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

redis.call("",xx) 查看xx在redis中的状态;
其中状态代码有:
exists 存在
hset hset命令
pexpire 设置有效时间

通过上述lua语言,再次分析逻辑代码:

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
        			// 如果存在 KEYS[1],这个KEYS[1]就是最初设置的redisson.getLock(key)
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      //上述代码执行逻辑为0,表示不存在
                      // 不存在则将 锁key+线程id设置为hash类型数据保存redis(ARGV[2]为当前执行线程id)
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      // 设置这个 hash数据类型 的有效时间
                       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  		// 如果这个 锁key 在redis中存在,返回1表示数据存在
                  		//hincrby 自增1 
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      // 重新设定有效时间
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

上述源码中,存在设置超时间参数,默认30s。
但是,按照上一篇博客的说明,设置时间如果到了,会自动执行请求key操作,不保险!
Redisson是如何处理时间这个参数信息的。

在源码org.redisson.RedissonLock.tryAcquireAsync(long, TimeUnit, long)中,针对时间处理参数做了如下操作:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        // 设置监听线程,当异步方法tryLockInnerAsync执行完触发
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
        	// 重写 operationComplete 方法
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    // 开启定时任务
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

查看定时任务源码(org.redisson.RedissonLock.scheduleExpirationRenewal(long)):

private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }
		// 定时任务的创建
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                //又是一个lua脚本,重新设置锁
                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        // 获取redis的hash数据类型中,指定的key-线程id 信息。
                        // 如果 == 1 表示存在这个锁
                        // 重新设置key的失效时间
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                // 设置失效时间后(evalWriteAsync执行后),开启监听
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        // 如果future 未执行成功
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        // future 执行完成
                        if (future.getNow()) {
                        	// 调取自身,此时并不会造成死循环
                        	// 调用自身,继续执行 TimerTask中的逻辑,包括定时操作
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
           // 每 30/3 也就是10秒
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
    }

这里简单说明下源码中的org.redisson.connection.ConnectionManager.newTimeout(TimerTask, long, TimeUnit)
这个和js中的timeout类似,设置多长时间后执行一次指定的程序。只会执行一次,不会循环。

所以上述续命流程如下所示:
Redis—Springboot整合Redisson超卖问题解决

Redisson中的各种操作方法,都是基于lua脚本底层原子性实现的。

Redisson思想

Redis—Springboot整合Redisson超卖问题解决

本文地址:https://blog.csdn.net/qq_38322527/article/details/112616449