Redis Template实现分布式锁的实例代码
前言
分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于redis的分布式锁;3. 基于zookeeper的分布式锁。本篇博客将介绍第二种方式,基于redis实现分布式锁。虽然网上已经有各种介绍redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现redis分布式锁。
可靠性
首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
1.互斥性。在任意时刻,只有一个客户端能持有锁。
2.不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
3.具有容错性。只要大部分的redis节点正常运行,客户端就可以加锁和解锁。
4.解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。
使用redis的setnx命令获取分布式锁的步骤:
•c1和c2线程同时检查时间戳获取锁,执行setnx命令并都返回0,此时锁仍被c3持有,并且c3已经崩溃
•c1 del锁
•c1 使用setnx命令获取锁,并且成功
•c2 del锁
•c2 使用setnx命令获取锁,并且成功
•error : 由于竞态条件,c1和c2都获取到了锁
幸运的是,以下面的步骤完全可以避免这种情况发生,看看c4线程如何操作
•c4使用setnx命令获取锁
•c3已经崩溃但是仍然持有锁,所以redis返回0给c4
•c4使用get命令获取锁并检查锁是否已经过期,如果没有过期,则继续等待一段时间并重新重试
•如果锁已经过期,c4尝试 getset lock.foo <current unix timestamp + lock timeout + 1>
•利用getset语法,c4可以检查旧时间是否仍然是过期时间,如果是,则获取锁
•如果另一个客户端c5率先获取到锁,c4执行getset命令后将返回非过期时间,然后c4继续从头开始重新尝试获取锁。此操作c4将延长一点c5获取到的锁的过期时间,不过这不是什么大问题。
接下来我们用代码的形式展现:
package com.shuige.components.cache.redis; import org.springframework.beans.factory.annotation.autowired; import org.springframework.data.redis.core.rediscallback; import org.springframework.data.redis.core.redistemplate; import org.springframework.data.redis.core.valueoperations; import org.springframework.stereotype.component; import java.util.objects; import java.util.concurrent.timeunit; /** * description: 通用redis帮助类 * user: zhouzhou * date: 2018-09-05 * time: 15:39 */ @component public class commonredishelper { public static final string lock_prefix = "redis_lock"; public static final int lock_expire = 300; // ms @autowired redistemplate redistemplate; /** * 最终加强分布式锁 * * @param key key值 * @return 是否获取到 */ public boolean lock(string key){ string lock = lock_prefix + key; // 利用lambda表达式 return (boolean) redistemplate.execute((rediscallback) connection -> { long expireat = system.currenttimemillis() + lock_expire + 1; boolean acquire = connection.setnx(lock.getbytes(), string.valueof(expireat).getbytes()); if (acquire) { return true; } else { byte[] value = connection.get(lock.getbytes()); if (objects.nonnull(value) && value.length > 0) { long expiretime = long.parselong(new string(value)); if (expiretime < system.currenttimemillis()) { // 如果锁已经过期 byte[] oldvalue = connection.getset(lock.getbytes(), string.valueof(system.currenttimemillis() + lock_expire + 1).getbytes()); // 防止死锁 return long.parselong(new string(oldvalue)) < system.currenttimemillis(); } } } return false; }); } /** * 删除锁 * * @param key */ public void delete(string key) { redistemplate.delete(key); } }
如何使用呢,导入工具类后:
boolean lock = redishelper.lock(key); if (lock) { // 执行逻辑操作 redishelper.delete(key); } else { // 设置失败次数计数器, 当到达5次时, 返回失败 int failcount = 1; while(failcount <= 5){ // 等待100ms重试 try { thread.sleep(100l); } catch (interruptedexception e) { e.printstacktrace(); } if (redishelper.lock(key)){ // 执行逻辑操作 redishelper.delete(key); }else{ failcount ++; } } throw new runtimeexception("现在创建的人太多了, 请稍等再试"); }
加锁成功执行完逻辑后, 必须解锁, 否则只能靠锁机制来解锁了不建议这么做
总结
以上所述是小编给大家介绍的redis template实现分布式锁的实例代码,希望对大家有所帮助
上一篇: Redis获取某个前缀的key脚本实例
下一篇: ProxylessNAS论文和算法解析