欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

高并发下金融账号控制案例

程序员文章站 2023-12-30 19:33:16
...

目录
什么是高并发,高并发下为什么会有数据一致性的问题
基于资金账号体系数据库设计及开发相关的代码,测试发现问题
解决方案有哪些,不同数据一致性解决方案的特点比较,不同应用场景使用不同解决方案
总结不同的解决方案对应不同的业务场景
详解
什么是高并发,高并发下会出现什么问题,如何确保数据一致性
例子:往缸里面倒水,一个人一瓢,如果成千上百的就是并发。这个时候有一个人计数,如果所有人同时向缸里面倒水,计数的人就无法控制准确记录,这就会导致缸里面的具体有多少瓢水无法准确确定。

资金账号体系的介绍,及数据库设计
涉及到支付就会有一套完整的资金账号体系,保证用户在产品中的一个资金变动的记录。
数据脚本:

CREATE TABLE `account` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
  `account_type` int(2) DEFAULT NULL COMMENT '账户类型 1余额账户  2冻结账户  3消费账户 4返佣账户 5信用账户',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  `begin_money` decimal(12,2) DEFAULT NULL COMMENT '期初余额',
  `cur_money` decimal(12,2) DEFAULT NULL COMMENT '当期发生额',
  `final_money` decimal(12,2) DEFAULT NULL COMMENT '期末余额',
  `accoun_status` int(2) DEFAULT NULL COMMENT '1正常 2冻结状态',
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `flag` int(1) DEFAULT NULL COMMENT '逻辑删除字段',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=124 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

CREATE TABLE `account_flew` (
  `id` bigint(20) NOT NULL,
  `flew_type` int(1) DEFAULT NULL COMMENT '1充值 2消费 3分账',
  `business_type` int(2) DEFAULT NULL COMMENT '二级业务类型 1下单 2微信充值 3支付宝充值 ',
  `business_type_msg` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '二级业务类型的注解',
  `pre_order_id` varchar(50) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '业务关联id',
  `account_id` bigint(20) DEFAULT NULL COMMENT '账户表的主键id',
  `account_type` int(2) DEFAULT NULL COMMENT '账户类型表',
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  `begin_money` decimal(12,2) DEFAULT NULL COMMENT '期初余额',
  `cur_money` decimal(12,2) DEFAULT NULL COMMENT '当期发生额',
  `final_money` decimal(12,2) DEFAULT NULL COMMENT '期末余额',
  `pay_channel` int(2) DEFAULT NULL COMMENT '支付渠道',
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `flag` int(2) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;

操作账号相关代码编写
entity对象

@Data
public class Account {
    private Long id;
    /** 账户类型 1余额账户  2冻结账户  3消费账户 4返佣账户 5信用账户 */
    private Integer accountType;
        /** 用户id */
    private Long userId;
        /** 期初余额 */
    private Double beginMoney;
        /** 当期发生额 */
    private Double curMoney;
        /** 期末余额 */
    private Double finalMoney;
        /** 1正常 2冻结状态 */
    private Integer accounStatus;
        /**  */
    private Date createTime;
        /**  */
    private Date updateTime;
        /** 逻辑删除字段 */
    private Integer flag;
}

@Data
public class AccountFlew  {
    private Long id;

    /** 1充值 2消费 3分账 */
    private Integer flewType;
        /** 二级业务类型 1下单 2微信充值 3支付宝充值  */
    private Integer businessType;
        /** 二级业务类型的注解 */
    private String businessTypeMsg;
        /** 业务关联id */
    private String preOrderId;
        /** 账户表的主键id */
    private Long accountId;
        /** 账户类型表 */
    private Integer accountType;
        /** 用户id */
    private Long userId;
        /** 期初余额 */
    private Double beginMoney;
        /** 当期发生额 */
    private Double curMoney;
        /** 期末余额 */
    private Double finalMoney;
        /** 支付渠道 */
    private Integer payChannel;
        /**  */
    private Date createTime;
        /**  */
    private Date updateTime;
        /**  */
    private Integer flag;
}

pom文件相关的依赖
application.properties配置
基于java自带的锁机制解决数据一致性问题
Lock锁

private final static Lock lock = new ReentrantLock();
lock.lock();
lock.unlock();

synchronized
private Integer lock = new Integer(0);
public void locktest(){
    synchronized (lock){
    }
}

基于java自带的锁,控制并发的问题
多节点下无法控制
-Dfile.encoding=UTF-8 -Dserver.port=201

基于mysql自带的锁进行控制
mysql排它锁的介绍
Inodb行级锁,通过锁定行数据来保证数据一致性。登录linux的mysql,设置事务部自动提交set autocommit = 0,开启事务 begin,执行for update的sql,commit提交。select
id
from account
where id = #{id,jdbcType=BIGINT} for update

xml配置

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.course.account.infrastructure.mapper.AccountMapper">

    <!-- 所有字段 -->
    <sql id="Base_Column_List">
   id
   ,account_type
   ,user_id
   ,begin_money
   ,cur_money
   ,final_money
   ,account_status
   ,create_time
   ,update_time
   ,del_status
   </sql>

    <!-- 字段映射 -->
    <resultMap id="BaseResultMap" type="com.course.account.infrastructure.entity.Account">
        <id column="id" property="id" jdbcType="BIGINT"/>
        <result column="account_type" jdbcType="VARCHAR" property="accountType"/>
        <result column="user_id" jdbcType="VARCHAR" property="userId"/>
        <result column="begin_money" jdbcType="DECIMAL" property="beginMoney"/>
        <result column="cur_money" jdbcType="DECIMAL" property="curMoney"/>
        <result column="final_money" jdbcType="DECIMAL" property="finalMoney"/>
        <result column="account_status" jdbcType="INTEGER" property="accountStatus"/>
        <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
        <result column="update_time" jdbcType="TIMESTAMP" property="updateTime"/>
        <result column="del_status" jdbcType="INTEGER" property="delStatus"/>
    </resultMap>

    <!-- 主键查询 -->
    <select id="lockAccountById" resultMap="BaseResultMap" parameterType="java.lang.Long">
        select
        <include refid="Base_Column_List"/>
        from account
        where id = #{id,jdbcType=BIGINT} for update
    </select>
</mapper>

死锁
client1:
select * from account where id = 1 for update;
select * from account where id = 2 for update;
client2:
select * from account where id = 2 for update;
select * from account where id = 1 for update;

mysql返回异常:
ERROR 1213 (40001): Deadlock found when trying to get lock; try restarting transaction

基于redis做并发控制
redis的分布式锁原理
reids是基于单线程的队列实现的,可以保证并发的安全;
基于redis命令:setnx命令来实现
java代码
@Autowired
private StringRedisTemplate stringRedisTemplate;

stringRedisTemplate.opsForValue().setIfAbsent();
stringRedisTemplate.expire(key,5, TimeUnit.SECONDS);
stringRedisTemplate.delete(key);

String key = “account_lock_”;
Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(key,“1”);
if(!b){
log.warn(“当前处于锁状态不能操作”);
return;
}

redis分布式锁存在的问题
问题:1,长生key问题;2,原子性问题;3,key超时问题;4,主从切换key失效问题
解决:
1:通过给key设置合理失效时间;
2:通过使用lua脚本;
3:(1)通过业务层中校验key是否有效决定是否提交,此方法可以大大降低key超时带来的问题,不能全部解决;(2)redission中的狗链会定期检查key是否持有,持有则延长key失效时间;
4:redisson中的redlock可以解决主从key失效问题
基于redission解决redis的分布式锁的原子性问题
加锁
waitTime和leaseTime分别是等待时间和过期时间
redissonService.getRLock(“1”).tryLock(waitTime,leaseTime,TimeUnit.SECONDS);

源码

/**
* 尝试获取锁
*
* @return
*/
public boolean tryLock() {
return (Boolean)this.get(this.tryLockAsync());
}

/**
 * 一看就是为了获取异步执行的结果,所以重点应该看tryLockAsync()
 *
 * @return
 */
protected final <V> V get(RFuture<V> future) {
    return this.commandExecutor.get(future);
}

/**
 * 异步获取锁
 *
 * @return
 */
public RFuture<Boolean> tryLockAsync() {
    return this.tryLockAsync(Thread.currentThread().getId());
}

/**
 * 尝试获取锁
 *
 * @param threadId
 * @return
 */
public RFuture<Boolean> tryLockAsync(long threadId) {
    return this.tryAcquireOnceAsync(-1L, (TimeUnit) null, threadId);
}


/**
 * 尝试获取锁
 *
 * @param leaseTime
 * @param unit
 * @param threadId
 * @return
 */
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
    /**
     * 如果自定义过期时间
     */
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    }
    /**
     * 如果是默认的过期时间
     */
    else {
        RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            //没有异常
            if (e == null) {
                //成功获取锁
                if (ttlRemaining) {
                    // 更新过期时间
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });
        return ttlRemainingFuture;
    }
}

/**
 * 真正的获取锁的代码
 * @param leaseTime
 * @param unit
 * @param threadId
 * @param command
 * @param <T>
 * @return
 */
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    //这个字段后面用作续过期时间
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    /**
     * 利用lua脚本执行相关逻辑
     */
    return this.commandExecutor.evalWriteAsync(this.getName(),
            LongCodec.INSTANCE,
            command,
            "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); 
			redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; 
			if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); 
			redis.call('pexpire', KEYS[1], ARGV[1]); 
			return nil; end; 
			return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(this.getName()),
            new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}

//如果key不存在
if (redis.call('exists', KEYS[1]) == 0)
    //设置这个key,并且设置超时时间,获取锁成功
   then redis.call('hset', KEYS[1], ARGV[2], 1);
     redis.call('pexpire', KEYS[1], ARGV[1]);
     return nil; end;
//如果key存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
    //就对key自增,并且重置过期时间(重入锁)
   then redis.call('hincrby', KEYS[1], ARGV[2], 1);
     redis.call('pexpire', KEYS[1], ARGV[1]);
     return nil; end;
//获取key剩下的时间
return redis.call('pttl', KEYS[1]);


/**
 * 重置过期时间
 * @param threadId
 */
private void scheduleExpirationRenewal(long threadId) {
    RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
    //检查是否存在指定的定时任务
    RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    //如果已经存在指定的定时任务
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    }
    //如果是第一次创建这个定时任务
    else {
        entry.addThreadId(threadId);
        this.renewExpiration();
    }

}

/**
 * 定时任务重置过期时间
 */
private void renewExpiration() {
    RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (ee != null) {
        //每三分之一的过期时间续一次,直至解锁
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                if (ent != null) {
                    //拿到第一个线程id
                    Long threadId = ent.getFirstThreadId();
                    if (threadId != null) {
                        //续时间
                        RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                        future.onComplete((res, e) -> {
                            // 如果有异常,打印日志
                            if (e != null) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                            }
                            // 没有异常就继续续时间
                            else {
                                RedissonLock.this.renewExpiration();
                            }
                        });
                    }
                }
            }
        }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
    }
}

/**
 * 重置指定线程id的过期时间
 * @param threadId
 * @return
 */
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return this.commandExecutor.evalWriteAsync(this.getName(),
            LongCodec.INSTANCE,
            RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;",
            Collections.singletonList(this.getName()),
            new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}

//如果已经存在这个key,那就给它续时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
  then redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1; end;
return 0;

图解

基于redis实现调用次数及并发的控制
redis是单线程的本身就可以保证并发控制,基于命令incrby命令来实现。
Double account_money = stringRedisTemplate.opsForValue().increment(“account_money”, -Double.valueOf(updateAccountDto.getMoney()));

redis中如果在操作数据的时候出现了异常情况下,如何保证数据和mysql一致?通过incrby做加法。
stringRedisTemplate.opsForValue().increment(“account_money”, Double.valueOf(updateAccountDto.getMoney()));

总结
1,单节点多节点下的解决方案:
单节点:可以通过java自带的锁机制,多节点不可以;
多节点:通过mysql锁redis分布式锁;
2,业务场景:
每次都需要成功的业务可以通过mysql锁;redis的incrby命令;
不要求每次都成功可以通过redis的分布式锁;

相关标签: 公开课

上一篇:

下一篇: