欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

分布式锁

程序员文章站 2022-07-05 11:46:44
...

一、什么是分布式锁

1. 当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数

2. 与单机模式下的锁不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。(分布式情况下之所以问题变得复杂,主要就是需要考虑到网络的延时和不可靠)

3. 分布式锁还是可以将标记存在内存,只是该内存不是某个进程分配的内存而是公共内存如 Redis、Memcache。至于利用数据库、文件等做锁与单机的实现是一样的,只要保证标记能互斥就行。

二、我们需要怎样的分布式锁

1. 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器-上的一个线程执行

2. 这把锁要是一把可重入锁(具备锁失效机制,防止死锁)

3. 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败

4. 这把锁最好是一把公平锁(根据业务需求考虑要不要这条)

5. 有高可用的获取锁和释放锁功能

6. 高性能的获取锁与释放锁 	

三、分布式锁的实现方式

1. 基于数据库实现分布式锁
1)缺点

	(1)这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用

	(2)这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁

	(3)这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作

	(4)这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了

2)解决方案

	(1)数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上

	(2)没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍

	(3)非阻塞的?搞一个while循环,直到insert成功再返回成功

	(4)非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了	
2. 基于缓存实现分布式锁(redis,见下)
3. 基于Zookeeper实现分布式锁
4. Redisson分布式锁

四、基于Redis的实现方式

1. 选用Redis实现分布式锁原因:
1)Redis有很高的性能

2)Redis命令对此支持较好,实现起来比较方便
2. 使用命令介绍
1)SETNX

	SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0	

2)expire

	expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁

3)delete

	delete key:删除key
3. 实现思想
1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断

2)获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁

3)释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放	

五、代码实现(redis)

1. 分布式锁代码实现
/**
 * 分布式锁的简单实现代码
 * Created by liuyang on 2017/4/20.
 */
public class Lock{

    private final JedisPool jedisPool;

    public Lock(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    /**
     * 加锁
     * @param lockName       锁的key
     * @param acquireTimeout 获取超时时间
     * @param timeout        锁的超时时间
     * @return 锁标识
     */
    public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) {
        Jedis conn = null;
        String retIdentifier = null;
        try {
            // 获取连接
            conn = jedisPool.getResource();
            // 随机生成一个value
            String identifier = UUID.randomUUID().toString();
            // 锁名,即key值
            String lockKey = "lock:" + lockName;
            // 超时时间,上锁后超过此时间则自动释放锁
            int lockExpire = (int) (timeout / 1000);

            // 获取锁的超时时间,超过这个时间则放弃获取锁
            long end = System.currentTimeMillis() + acquireTimeout;
            while (System.currentTimeMillis() < end) {
                if (conn.setnx(lockKey, identifier) == 1) {
                    conn.expire(lockKey, lockExpire);
                    // 返回value值,用于释放锁时间确认
                    retIdentifier = identifier;
                    return retIdentifier;
                }
                // 返回-1代表key没有设置超时时间,为key设置一个超时时间
                if (conn.ttl(lockKey) == -1) {
                    conn.expire(lockKey, lockExpire);
                }

                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } catch (JedisException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
        return retIdentifier;
    }

    /**
     * 释放锁
     * @param lockName   锁的key
     * @param identifier 释放锁的标识
     * @return
     */
    public boolean releaseLock(String lockName, String identifier) {
        Jedis conn = null;
        String lockKey = "lock:" + lockName;
        boolean retFlag = false;
        try {
            conn = jedisPool.getResource();
            while (true) {
                // 监视lock,准备开始事务
                conn.watch(lockKey);
                // 通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
                if (identifier.equals(conn.get(lockKey))) {
                    Transaction transaction = conn.multi();
                    transaction.del(lockKey);
                    List<Object> results = transaction.exec();
                    if (results == null) {
                        continue;
                    }
                    retFlag = true;
                }
                conn.unwatch();
                break;
            }
        } catch (JedisException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
        return retFlag;
    }
}
2. 线程执行
/**
 * TODO
 *
 * @author huhui
 * @since 2018/12/27 17:35
 */
public class Service {

    private static JedisPool pool = null;

    private Lock lock = new Lock(pool);

    int n = 500;

    static {
        JedisPoolConfig config = new JedisPoolConfig();
        // 设置最大连接数
        config.setMaxTotal(200);
        // 设置最大空闲数
        config.setMaxIdle(8);
        // 设置最大等待时间
        config.setMaxWaitMillis(1000 * 100);
        // 在borrow一个jedis实例时,是否需要验证,若为true,则所有jedis实例均是可用的
        config.setTestOnBorrow(true);
        pool = new JedisPool(config, "127.0.0.1", 6379, 3000);
    }

    public void seckill() {
        // 返回锁的value值,供释放锁时候进行判断
        String identifier = lock.lockWithTimeout("resource", 5000, 1000);
        System.out.println(Thread.currentThread().getName() + "获得了锁");
        System.out.println(--n);
        lock.releaseLock("resource", identifier);
    }

}	
3. 线程类
public class ThreadA extends Thread {
    private Service service;

    public ThreadA(Service service) {
        this.service = service;
    }

    @Override
    public void run() {
        service.seckill();
    }
}
4. 测试类(模拟秒杀服务)
public class Test {

    public static void main(String[] args) {
        Service service = new Service();
        for (int i = 0; i < 50; i++) {
            ThreadA threadA = new ThreadA(service);
            threadA.start();
        }
    }
    
}	

参考网址

Java分布式锁看这篇就够了

分布式锁简单入门以及三种实现方式介绍

注:文章是经过参考其他的文章然后自己整理出来的,有可能是小部分参考,也有可能是大部分参考,但绝对不是直接转载,觉得侵权了我会删,我只是把这个用于自己的笔记,顺便整理下知识的同时,能帮到一部分人。
ps : 有错误的还望各位大佬指正,小弟不胜感激