高并发下金融账号控制案例
目录
什么是高并发,高并发下为什么会有数据一致性的问题
基于资金账号体系数据库设计及开发相关的代码,测试发现问题
解决方案有哪些,不同数据一致性解决方案的特点比较,不同应用场景使用不同解决方案
总结不同的解决方案对应不同的业务场景
详解
什么是高并发,高并发下会出现什么问题,如何确保数据一致性
例子:往缸里面倒水,一个人一瓢,如果成千上百的就是并发。这个时候有一个人计数,如果所有人同时向缸里面倒水,计数的人就无法控制准确记录,这就会导致缸里面的具体有多少瓢水无法准确确定。
资金账号体系的介绍,及数据库设计
涉及到支付就会有一套完整的资金账号体系,保证用户在产品中的一个资金变动的记录。
数据脚本:
CREATE TABLE `account` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`account_type` int(2) DEFAULT NULL COMMENT '账户类型 1余额账户 2冻结账户 3消费账户 4返佣账户 5信用账户',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
`begin_money` decimal(12,2) DEFAULT NULL COMMENT '期初余额',
`cur_money` decimal(12,2) DEFAULT NULL COMMENT '当期发生额',
`final_money` decimal(12,2) DEFAULT NULL COMMENT '期末余额',
`accoun_status` int(2) DEFAULT NULL COMMENT '1正常 2冻结状态',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`flag` int(1) DEFAULT NULL COMMENT '逻辑删除字段',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=124 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
CREATE TABLE `account_flew` (
`id` bigint(20) NOT NULL,
`flew_type` int(1) DEFAULT NULL COMMENT '1充值 2消费 3分账',
`business_type` int(2) DEFAULT NULL COMMENT '二级业务类型 1下单 2微信充值 3支付宝充值 ',
`business_type_msg` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '二级业务类型的注解',
`pre_order_id` varchar(50) COLLATE utf8mb4_bin DEFAULT NULL COMMENT '业务关联id',
`account_id` bigint(20) DEFAULT NULL COMMENT '账户表的主键id',
`account_type` int(2) DEFAULT NULL COMMENT '账户类型表',
`user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
`begin_money` decimal(12,2) DEFAULT NULL COMMENT '期初余额',
`cur_money` decimal(12,2) DEFAULT NULL COMMENT '当期发生额',
`final_money` decimal(12,2) DEFAULT NULL COMMENT '期末余额',
`pay_channel` int(2) DEFAULT NULL COMMENT '支付渠道',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`flag` int(2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
操作账号相关代码编写
entity对象
@Data
public class Account {
private Long id;
/** 账户类型 1余额账户 2冻结账户 3消费账户 4返佣账户 5信用账户 */
private Integer accountType;
/** 用户id */
private Long userId;
/** 期初余额 */
private Double beginMoney;
/** 当期发生额 */
private Double curMoney;
/** 期末余额 */
private Double finalMoney;
/** 1正常 2冻结状态 */
private Integer accounStatus;
/** */
private Date createTime;
/** */
private Date updateTime;
/** 逻辑删除字段 */
private Integer flag;
}
@Data
public class AccountFlew {
private Long id;
/** 1充值 2消费 3分账 */
private Integer flewType;
/** 二级业务类型 1下单 2微信充值 3支付宝充值 */
private Integer businessType;
/** 二级业务类型的注解 */
private String businessTypeMsg;
/** 业务关联id */
private String preOrderId;
/** 账户表的主键id */
private Long accountId;
/** 账户类型表 */
private Integer accountType;
/** 用户id */
private Long userId;
/** 期初余额 */
private Double beginMoney;
/** 当期发生额 */
private Double curMoney;
/** 期末余额 */
private Double finalMoney;
/** 支付渠道 */
private Integer payChannel;
/** */
private Date createTime;
/** */
private Date updateTime;
/** */
private Integer flag;
}
pom文件相关的依赖
application.properties配置
基于java自带的锁机制解决数据一致性问题
Lock锁
private final static Lock lock = new ReentrantLock();
lock.lock();
lock.unlock();
synchronized
private Integer lock = new Integer(0);
public void locktest(){
synchronized (lock){
}
}
基于java自带的锁,控制并发的问题
多节点下无法控制
-Dfile.encoding=UTF-8 -Dserver.port=201
基于mysql自带的锁进行控制
mysql排它锁的介绍
Inodb行级锁,通过锁定行数据来保证数据一致性。登录linux的mysql,设置事务部自动提交set autocommit = 0,开启事务 begin,执行for update的sql,commit提交。select
id
from account
where id = #{id,jdbcType=BIGINT} for update
xml配置
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.course.account.infrastructure.mapper.AccountMapper">
<!-- 所有字段 -->
<sql id="Base_Column_List">
id
,account_type
,user_id
,begin_money
,cur_money
,final_money
,account_status
,create_time
,update_time
,del_status
</sql>
<!-- 字段映射 -->
<resultMap id="BaseResultMap" type="com.course.account.infrastructure.entity.Account">
<id column="id" property="id" jdbcType="BIGINT"/>
<result column="account_type" jdbcType="VARCHAR" property="accountType"/>
<result column="user_id" jdbcType="VARCHAR" property="userId"/>
<result column="begin_money" jdbcType="DECIMAL" property="beginMoney"/>
<result column="cur_money" jdbcType="DECIMAL" property="curMoney"/>
<result column="final_money" jdbcType="DECIMAL" property="finalMoney"/>
<result column="account_status" jdbcType="INTEGER" property="accountStatus"/>
<result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
<result column="update_time" jdbcType="TIMESTAMP" property="updateTime"/>
<result column="del_status" jdbcType="INTEGER" property="delStatus"/>
</resultMap>
<!-- 主键查询 -->
<select id="lockAccountById" resultMap="BaseResultMap" parameterType="java.lang.Long">
select
<include refid="Base_Column_List"/>
from account
where id = #{id,jdbcType=BIGINT} for update
</select>
</mapper>
死锁
client1:
select * from account where id = 1 for update;
select * from account where id = 2 for update;
client2:
select * from account where id = 2 for update;
select * from account where id = 1 for update;
mysql返回异常:
ERROR 1213 (40001): Deadlock found when trying to get lock; try restarting transaction
基于redis做并发控制
redis的分布式锁原理
reids是基于单线程的队列实现的,可以保证并发的安全;
基于redis命令:setnx命令来实现
java代码
@Autowired
private StringRedisTemplate stringRedisTemplate;
stringRedisTemplate.opsForValue().setIfAbsent();
stringRedisTemplate.expire(key,5, TimeUnit.SECONDS);
stringRedisTemplate.delete(key);
String key = “account_lock_”;
Boolean b = stringRedisTemplate.opsForValue().setIfAbsent(key,“1”);
if(!b){
log.warn(“当前处于锁状态不能操作”);
return;
}
redis分布式锁存在的问题
问题:1,长生key问题;2,原子性问题;3,key超时问题;4,主从切换key失效问题
解决:
1:通过给key设置合理失效时间;
2:通过使用lua脚本;
3:(1)通过业务层中校验key是否有效决定是否提交,此方法可以大大降低key超时带来的问题,不能全部解决;(2)redission中的狗链会定期检查key是否持有,持有则延长key失效时间;
4:redisson中的redlock可以解决主从key失效问题
基于redission解决redis的分布式锁的原子性问题
加锁
waitTime和leaseTime分别是等待时间和过期时间
redissonService.getRLock(“1”).tryLock(waitTime,leaseTime,TimeUnit.SECONDS);
源码
/**
* 尝试获取锁
*
* @return
*/
public boolean tryLock() {
return (Boolean)this.get(this.tryLockAsync());
}
/**
* 一看就是为了获取异步执行的结果,所以重点应该看tryLockAsync()
*
* @return
*/
protected final <V> V get(RFuture<V> future) {
return this.commandExecutor.get(future);
}
/**
* 异步获取锁
*
* @return
*/
public RFuture<Boolean> tryLockAsync() {
return this.tryLockAsync(Thread.currentThread().getId());
}
/**
* 尝试获取锁
*
* @param threadId
* @return
*/
public RFuture<Boolean> tryLockAsync(long threadId) {
return this.tryAcquireOnceAsync(-1L, (TimeUnit) null, threadId);
}
/**
* 尝试获取锁
*
* @param leaseTime
* @param unit
* @param threadId
* @return
*/
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
/**
* 如果自定义过期时间
*/
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
/**
* 如果是默认的过期时间
*/
else {
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
//没有异常
if (e == null) {
//成功获取锁
if (ttlRemaining) {
// 更新过期时间
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
/**
* 真正的获取锁的代码
* @param leaseTime
* @param unit
* @param threadId
* @param command
* @param <T>
* @return
*/
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
//这个字段后面用作续过期时间
this.internalLockLeaseTime = unit.toMillis(leaseTime);
/**
* 利用lua脚本执行相关逻辑
*/
return this.commandExecutor.evalWriteAsync(this.getName(),
LongCodec.INSTANCE,
command,
"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()),
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
//如果key不存在
if (redis.call('exists', KEYS[1]) == 0)
//设置这个key,并且设置超时时间,获取锁成功
then redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
//如果key存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
//就对key自增,并且重置过期时间(重入锁)
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
//获取key剩下的时间
return redis.call('pttl', KEYS[1]);
/**
* 重置过期时间
* @param threadId
*/
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
//检查是否存在指定的定时任务
RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
//如果已经存在指定的定时任务
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
}
//如果是第一次创建这个定时任务
else {
entry.addThreadId(threadId);
this.renewExpiration();
}
}
/**
* 定时任务重置过期时间
*/
private void renewExpiration() {
RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
//每三分之一的过期时间续一次,直至解锁
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
//拿到第一个线程id
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
//续时间
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
// 如果有异常,打印日志
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
}
// 没有异常就继续续时间
else {
RedissonLock.this.renewExpiration();
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
/**
* 重置指定线程id的过期时间
* @param threadId
* @return
*/
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(),
LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;",
Collections.singletonList(this.getName()),
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
//如果已经存在这个key,那就给它续时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then redis.call('pexpire', KEYS[1], ARGV[1]);
return 1; end;
return 0;
图解
基于redis实现调用次数及并发的控制
redis是单线程的本身就可以保证并发控制,基于命令incrby命令来实现。
Double account_money = stringRedisTemplate.opsForValue().increment(“account_money”, -Double.valueOf(updateAccountDto.getMoney()));
redis中如果在操作数据的时候出现了异常情况下,如何保证数据和mysql一致?通过incrby做加法。
stringRedisTemplate.opsForValue().increment(“account_money”, Double.valueOf(updateAccountDto.getMoney()));
总结
1,单节点多节点下的解决方案:
单节点:可以通过java自带的锁机制,多节点不可以;
多节点:通过mysql锁redis分布式锁;
2,业务场景:
每次都需要成功的业务可以通过mysql锁;redis的incrby命令;
不要求每次都成功可以通过redis的分布式锁;
推荐阅读