分布式锁
程序员文章站
2022-07-05 11:46:44
...
一、什么是分布式锁
1. 当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数
2. 与单机模式下的锁不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。(分布式情况下之所以问题变得复杂,主要就是需要考虑到网络的延时和不可靠)
3. 分布式锁还是可以将标记存在内存,只是该内存不是某个进程分配的内存而是公共内存如 Redis、Memcache。至于利用数据库、文件等做锁与单机的实现是一样的,只要保证标记能互斥就行。
二、我们需要怎样的分布式锁
1. 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器-上的一个线程执行
2. 这把锁要是一把可重入锁(具备锁失效机制,防止死锁)
3. 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
4. 这把锁最好是一把公平锁(根据业务需求考虑要不要这条)
5. 有高可用的获取锁和释放锁功能
6. 高性能的获取锁与释放锁
三、分布式锁的实现方式
1. 基于数据库实现分布式锁
1)缺点
(1)这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用
(2)这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁
(3)这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作
(4)这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了
2)解决方案
(1)数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上
(2)没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍
(3)非阻塞的?搞一个while循环,直到insert成功再返回成功
(4)非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了
2. 基于缓存实现分布式锁(redis,见下)
3. 基于Zookeeper实现分布式锁
4. Redisson分布式锁
四、基于Redis的实现方式
1. 选用Redis实现分布式锁原因:
1)Redis有很高的性能
2)Redis命令对此支持较好,实现起来比较方便
2. 使用命令介绍
1)SETNX
SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0
2)expire
expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁
3)delete
delete key:删除key
3. 实现思想
1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断
2)获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁
3)释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放
五、代码实现(redis)
1. 分布式锁代码实现
/**
* 分布式锁的简单实现代码
* Created by liuyang on 2017/4/20.
*/
public class Lock{
private final JedisPool jedisPool;
public Lock(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
/**
* 加锁
* @param lockName 锁的key
* @param acquireTimeout 获取超时时间
* @param timeout 锁的超时时间
* @return 锁标识
*/
public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) {
Jedis conn = null;
String retIdentifier = null;
try {
// 获取连接
conn = jedisPool.getResource();
// 随机生成一个value
String identifier = UUID.randomUUID().toString();
// 锁名,即key值
String lockKey = "lock:" + lockName;
// 超时时间,上锁后超过此时间则自动释放锁
int lockExpire = (int) (timeout / 1000);
// 获取锁的超时时间,超过这个时间则放弃获取锁
long end = System.currentTimeMillis() + acquireTimeout;
while (System.currentTimeMillis() < end) {
if (conn.setnx(lockKey, identifier) == 1) {
conn.expire(lockKey, lockExpire);
// 返回value值,用于释放锁时间确认
retIdentifier = identifier;
return retIdentifier;
}
// 返回-1代表key没有设置超时时间,为key设置一个超时时间
if (conn.ttl(lockKey) == -1) {
conn.expire(lockKey, lockExpire);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (JedisException e) {
e.printStackTrace();
} finally {
if (conn != null) {
conn.close();
}
}
return retIdentifier;
}
/**
* 释放锁
* @param lockName 锁的key
* @param identifier 释放锁的标识
* @return
*/
public boolean releaseLock(String lockName, String identifier) {
Jedis conn = null;
String lockKey = "lock:" + lockName;
boolean retFlag = false;
try {
conn = jedisPool.getResource();
while (true) {
// 监视lock,准备开始事务
conn.watch(lockKey);
// 通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
if (identifier.equals(conn.get(lockKey))) {
Transaction transaction = conn.multi();
transaction.del(lockKey);
List<Object> results = transaction.exec();
if (results == null) {
continue;
}
retFlag = true;
}
conn.unwatch();
break;
}
} catch (JedisException e) {
e.printStackTrace();
} finally {
if (conn != null) {
conn.close();
}
}
return retFlag;
}
}
2. 线程执行
/**
* TODO
*
* @author huhui
* @since 2018/12/27 17:35
*/
public class Service {
private static JedisPool pool = null;
private Lock lock = new Lock(pool);
int n = 500;
static {
JedisPoolConfig config = new JedisPoolConfig();
// 设置最大连接数
config.setMaxTotal(200);
// 设置最大空闲数
config.setMaxIdle(8);
// 设置最大等待时间
config.setMaxWaitMillis(1000 * 100);
// 在borrow一个jedis实例时,是否需要验证,若为true,则所有jedis实例均是可用的
config.setTestOnBorrow(true);
pool = new JedisPool(config, "127.0.0.1", 6379, 3000);
}
public void seckill() {
// 返回锁的value值,供释放锁时候进行判断
String identifier = lock.lockWithTimeout("resource", 5000, 1000);
System.out.println(Thread.currentThread().getName() + "获得了锁");
System.out.println(--n);
lock.releaseLock("resource", identifier);
}
}
3. 线程类
public class ThreadA extends Thread {
private Service service;
public ThreadA(Service service) {
this.service = service;
}
@Override
public void run() {
service.seckill();
}
}
4. 测试类(模拟秒杀服务)
public class Test {
public static void main(String[] args) {
Service service = new Service();
for (int i = 0; i < 50; i++) {
ThreadA threadA = new ThreadA(service);
threadA.start();
}
}
}
参考网址
注:文章是经过参考其他的文章然后自己整理出来的,有可能是小部分参考,也有可能是大部分参考,但绝对不是直接转载,觉得侵权了我会删,我只是把这个用于自己的笔记,顺便整理下知识的同时,能帮到一部分人。
ps : 有错误的还望各位大佬指正,小弟不胜感激