详解Java如何实现基于Redis的分布式锁
前言
单jvm内同步好办, 直接用jdk提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用redis,当然还有很多其他的实现方式。其实基于redis实现的原理还算比较简单的,在看代码之前建议大家先去看看原理,看懂了之后看代码应该就容易理解了。
我这里不实现jdk的java.util.concurrent.locks.lock
接口,而是自定义一个,因为jdk的有个newcondition
方法我这里暂时没实现。这个lock提供了5个lock方法的变体,可以自行选择使用哪一个来获取锁,我的想法是最好用带超时返回的那几个方法,因为不这样的话,假如redis挂了,线程永远都在那死循环了(关于这里,应该还可以进一步优化,如果redis挂了,jedis的操作肯定会抛异常之类的,可以定义个机制让redis挂了的时候通知使用这个lock的用户,或者说是线程)
package cc.lixiaohui.lock; import java.util.concurrent.timeunit; public interface lock { /** * 阻塞性的获取锁, 不响应中断 */ void lock; /** * 阻塞性的获取锁, 响应中断 * * @throws interruptedexception */ void lockinterruptibly throws interruptedexception; /** * 尝试获取锁, 获取不到立即返回, 不阻塞 */ boolean trylock; /** * 超时自动返回的阻塞性的获取锁, 不响应中断 * * @param time * @param unit * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未���取到锁 * */ boolean trylock(long time, timeunit unit); /** * 超时自动返回的阻塞性的获取锁, 响应中断 * * @param time * @param unit * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁 * @throws interruptedexception 在尝试获取锁的当前线程被中断 */ boolean trylockinterruptibly(long time, timeunit unit) throws interruptedexception; /** * 释放锁 */ void unlock; }
看其抽象实现:
package cc.lixiaohui.lock; import java.util.concurrent.timeunit; /** * 锁的骨架实现, 真正的获取锁的步骤由子类去实现. * * @author lixiaohui * */ public abstract class abstractlock implements lock { /** * <pre> * 这里需不需要保证可见性值得讨论, 因为是分布式的锁, * 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性 * 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了. * </pre> */ protected volatile boolean locked; /** * 当前jvm内持有该锁的线程(if have one) */ private thread exclusiveownerthread; public void lock { try { lock(false, 0, null, false); } catch (interruptedexception e) { // todo ignore } } public void lockinterruptibly throws interruptedexception { lock(false, 0, null, true); } public boolean trylock(long time, timeunit unit) { try { return lock(true, time, unit, false); } catch (interruptedexception e) { // todo ignore } return false; } public boolean trylockinterruptibly(long time, timeunit unit) throws interruptedexception { return lock(true, time, unit, true); } public void unlock { // todo 检查当前线程是否持有锁 if (thread.currentthread != getexclusiveownerthread) { throw new illegalmonitorstateexception("current thread does not hold the lock"); } unlock0; setexclusiveownerthread(null); } protected void setexclusiveownerthread(thread thread) { exclusiveownerthread = thread; } protected final thread getexclusiveownerthread { return exclusiveownerthread; } protected abstract void unlock0; /** * 阻塞式获取锁的实现 * * @param usetimeout * @param time * @param unit * @param interrupt 是否响应中断 * @return * @throws interruptedexception */ protected abstract boolean lock(boolean usetimeout, long time, timeunit unit, boolean interrupt) throws interruptedexception; }
基于redis的最终实现,关键的获取锁,释放锁的代码在这个类的lock
方法和unlock0
方法里,大家可以只看这两个方法然后完全自己写一个:
package cc.lixiaohui.lock; import java.util.concurrent.timeunit; import redis.clients.jedis.jedis; /** * <pre> * 基于redis的setnx操作实现的分布式锁 * * 获取锁时最好用lock(long time, timeunit unit), 以免网路问题而导致线程一直阻塞 * * <a href="http://redis.io/commands/setnx">setnc操作参考资料</a> * </pre> * * @author lixiaohui * */ public class redisbaseddistributedlock extends abstractlock { private jedis jedis; // 锁的名字 protected string lockkey; // 锁的有效时长(毫秒) protected long lockexpires; public redisbaseddistributedlock(jedis jedis, string lockkey, long lockexpires) { this.jedis = jedis; this.lockkey = lockkey; this.lockexpires = lockexpires; } // 阻塞式获取锁的实现 protected boolean lock(boolean usetimeout, long time, timeunit unit, boolean interrupt) throws interruptedexception{ if (interrupt) { checkinterruption; } long start = system.currenttimemillis; long timeout = unit.tomillis(time); // if !usetimeout, then it's useless while (usetimeout ? istimeout(start, timeout) : true) { if (interrupt) { checkinterruption; } long lockexpiretime = system.currenttimemillis + lockexpires + 1;//锁超时时间 string stringoflockexpiretime = string.valueof(lockexpiretime); if (jedis.setnx(lockkey, stringoflockexpiretime) == 1) { // 获取到锁 // todo 成功获取到锁, 设置相关标识 locked = true; setexclusiveownerthread(thread.currentthread); return true; } string value = jedis.get(lockkey); if (value != null && istimeexpired(value)) { // lock is expired // 假设多个线程(非单jvm)同时走到这里 string oldvalue = jedis.getset(lockkey, stringoflockexpiretime); // getset is atomic // 但是走到这里时每个线程拿到的oldvalue肯定不可能一样(因为getset是原子性的) // 加入拿到的oldvalue依然是expired的,那么就说明拿到锁了 if (oldvalue != null && istimeexpired(oldvalue)) { // todo 成功获取到锁, 设置相关标识 locked = true; setexclusiveownerthread(thread.currentthread); return true; } } else { // todo lock is not expired, enter next loop retrying } } return false; } public boolean trylock { long lockexpiretime = system.currenttimemillis + lockexpires + 1;//锁超时时间 string stringoflockexpiretime = string.valueof(lockexpiretime); if (jedis.setnx(lockkey, stringoflockexpiretime) == 1) { // 获取到锁 // todo 成功获取到锁, 设置相关标识 locked = true; setexclusiveownerthread(thread.currentthread); return true; } string value = jedis.get(lockkey); if (value != null && istimeexpired(value)) { // lock is expired // 假设多个线程(非单jvm)同时走到这里 string oldvalue = jedis.getset(lockkey, stringoflockexpiretime); // getset is atomic // 但是走到这里时每个线程拿到的oldvalue肯定不可能一样(因为getset是原子性的) // 假如拿到的oldvalue依然是expired的,那么就说明拿到锁了 if (oldvalue != null && istimeexpired(oldvalue)) { // todo 成功获取到锁, 设置相关标识 locked = true; setexclusiveownerthread(thread.currentthread); return true; } } else { // todo lock is not expired, enter next loop retrying } return false; } /** * queries if this lock is held by any thread. * * @return {@code true} if any thread holds this lock and * {@code false} otherwise */ public boolean islocked { if (locked) { return true; } else { string value = jedis.get(lockkey); // todo 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了, // 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断 // 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就 // 不是同步控制, 它只是一种锁状态的报告. return !istimeexpired(value); } } @override protected void unlock0 { // todo 判断锁是否过期 string value = jedis.get(lockkey); if (!istimeexpired(value)) { dounlock; } } private void checkinterruption throws interruptedexception { if(thread.currentthread.isinterrupted) { throw new interruptedexception; } } private boolean istimeexpired(string value) { return long.parselong(value) < system.currenttimemillis; } private boolean istimeout(long start, long timeout) { return start + timeout > system.currenttimemillis; } private void dounlock { jedis.del(lockkey); } }
如果将来还换一种实现方式(比如zookeeper
之类的),到时直接继承abstractlock
并实现lock(boolean usetimeout, long time, timeunit unit, boolean interrupt)
, unlock0
方法即可(所谓抽象嘛)
测试
模拟全局id增长器,设计一个idgenerator
类,该类负责生成全局递增id,其代码如下:
package cc.lixiaohui.lock; import java.math.biginteger; import java.util.concurrent.timeunit; /** * 模拟id生成 * @author lixiaohui * */ public class idgenerator { private static biginteger id = biginteger.valueof(0); private final lock lock; private static final biginteger increment = biginteger.valueof(1); public idgenerator(lock lock) { this.lock = lock; } public string getandincrement { if (lock.trylock(3, timeunit.seconds)) { try { // todo 这里获取到锁, 访问临界区资源 return getandincrement0; } finally { lock.unlock; } } return null; //return getandincrement0; } private string getandincrement0 { string s = id.tostring; id = id.add(increment); return s; } }
测试主逻辑:同一个jvm内开两个线程死循环地(循环之间无间隔,有的话测试就没意义了)获取id
(我这里并不是死循环而是跑20s),获取到id存到同一个set
里面,在存之前先检查该id
在set
中是否存在,如果已存在,则让两个线程都停止。如果程序能正常跑完20s,那么说明这个分布式锁还算可以满足要求,如此测试的效果应该和不同jvm(也就是真正的分布式环境中)测试的效果是一样的,下面是测试类的代码:
package cc.lixiaohui.distributedlock.distributedlock; import java.util.hashset; import java.util.set; import org.junit.test; import redis.clients.jedis.jedis; import cc.lixiaohui.lock.idgenerator; import cc.lixiaohui.lock.lock; import cc.lixiaohui.lock.redisbaseddistributedlock; public class idgeneratortest { private static set<string> generatedids = new hashset<string>; private static final string lock_key = "lock.lock"; private static final long lock_expire = 5 * 1000; @test public void test throws interruptedexception { jedis jedis1 = new jedis("localhost", 6379); lock lock1 = new redisbaseddistributedlock(jedis1, lock_key, lock_expire); idgenerator g1 = new idgenerator(lock1); idconsumemission consume1 = new idconsumemission(g1, "consume1"); jedis jedis2 = new jedis("localhost", 6379); lock lock2 = new redisbaseddistributedlock(jedis2, lock_key, lock_expire); idgenerator g2 = new idgenerator(lock2); idconsumemission consume2 = new idconsumemission(g2, "consume2"); thread t1 = new thread(consume1); thread t2 = new thread(consume2); t1.start; t2.start; thread.sleep(20 * 1000); //让两个线程跑20秒 idconsumemission.stop; t1.join; t2.join; } static string time { return string.valueof(system.currenttimemillis / 1000); } static class idconsumemission implements runnable { private idgenerator idgenerator; private string name; private static volatile boolean stop; public idconsumemission(idgenerator idgenerator, string name) { this.idgenerator = idgenerator; this.name = name; } public static void stop { stop = true; } public void run { system.out.println(time + ": consume " + name + " start "); while (!stop) { string id = idgenerator.getandincrement; if(generatedids.contains(id)) { system.out.println(time + ": duplicate id generated, id = " + id); stop = true; continue; } generatedids.add(id); system.out.println(time + ": consume " + name + " add id = " + id); } system.out.println(time + ": consume " + name + " done "); } } }
说明一点,我这里停止两个线程的方式并不是很好,我是为了方便才这么做的,因为只是测试,最好不要这么做。
测试结果
跑20s打印的东西太多,前面打印的被clear
了,只有差不多跑完的时候才有,下面截图。说明了这个锁能正常工作:
当idgererator
没有加锁(即idgererator
的getandincrement
方法内部获取id
时不上锁)时,测试是不通过的,非常大的概率中途就会停止,下面是不加锁时的测试结果:
这个1秒都不到:
这个也1秒都不到:
结束语
好了,以上就是java实现基于redis的分布式锁的全部内容,各位如果发现问题希望能指正,希望这篇文章能对大家的学习和工作带来一定的帮助,如果有疑问可以留言交流。