浅谈Java(SpringBoot)基于zookeeper的分布式锁实现
通过zookeeper实现分布式锁
1、创建zookeeper的client
首先通过curatorframeworkfactory创建一个连接zookeeper的连接curatorframework client
public class curatorfactorybean implements factorybean<curatorframework>, initializingbean, disposablebean { private static final logger logger = loggerfactory.getlogger(contractfileinfocontroller.class); private string connectionstring; private int sessiontimeoutms; private int connectiontimeoutms; private retrypolicy retrypolicy; private curatorframework client; public curatorfactorybean(string connectionstring) { this(connectionstring, 500, 500); } public curatorfactorybean(string connectionstring, int sessiontimeoutms, int connectiontimeoutms) { this.connectionstring = connectionstring; this.sessiontimeoutms = sessiontimeoutms; this.connectiontimeoutms = connectiontimeoutms; } @override public void destroy() throws exception { logger.info("closing curator framework..."); this.client.close(); logger.info("closed curator framework."); } @override public curatorframework getobject() throws exception { return this.client; } @override public class<?> getobjecttype() { return this.client != null ? this.client.getclass() : curatorframework.class; } @override public boolean issingleton() { return true; } @override public void afterpropertiesset() throws exception { if (stringutils.isempty(this.connectionstring)) { throw new illegalstateexception("connectionstring can not be empty."); } else { if (this.retrypolicy == null) { this.retrypolicy = new exponentialbackoffretry(1000, 2147483647, 180000); } this.client = curatorframeworkfactory.newclient(this.connectionstring, this.sessiontimeoutms, this.connectiontimeoutms, this.retrypolicy); this.client.start(); this.client.blockuntilconnected(30, timeunit.milliseconds); } } public void setconnectionstring(string connectionstring) { this.connectionstring = connectionstring; } public void setsessiontimeoutms(int sessiontimeoutms) { this.sessiontimeoutms = sessiontimeoutms; } public void setconnectiontimeoutms(int connectiontimeoutms) { this.connectiontimeoutms = connectiontimeoutms; } public void setretrypolicy(retrypolicy retrypolicy) { this.retrypolicy = retrypolicy; } public void setclient(curatorframework client) { this.client = client; } }
2、封装分布式锁
根据curatorframework创建interprocessmutex(分布式可重入排它锁)对一行数据进行上锁
public interprocessmutex(curatorframework client, string path) { this(client, path, new standardlockinternalsdriver()); }
使用 acquire方法
1、acquire() :入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。
2、acquire(long time, timeunit unit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false。
public void acquire() throws exception { if (!this.internallock(-1l, (timeunit)null)) { throw new ioexception("lost connection while trying to acquire lock: " + this.basepath); } } public boolean acquire(long time, timeunit unit) throws exception { return this.internallock(time, unit); }
释放锁 mutex.release();
public void release() throws exception { thread currentthread = thread.currentthread(); interprocessmutex.lockdata lockdata = (interprocessmutex.lockdata)this.threaddata.get(currentthread); if (lockdata == null) { throw new illegalmonitorstateexception("you do not own the lock: " + this.basepath); } else { int newlockcount = lockdata.lockcount.decrementandget(); if (newlockcount <= 0) { if (newlockcount < 0) { throw new illegalmonitorstateexception("lock count has gone negative for lock: " + this.basepath); } else { try { this.internals.releaselock(lockdata.lockpath); } finally { this.threaddata.remove(currentthread); } } } } }
封装后的dlock代码
1、调用interprocessmutex processmutex = dlock.mutex(path);
2、手动释放锁processmutex.release();
3、需要手动删除路径dlock.del(path);
推荐 使用:
都是 函数式编程
在业务代码执行完毕后 会释放锁和删除path
1、这个有返回结果
public t mutex(string path, zklockcallback zklockcallback, long time, timeunit timeunit)
2、这个无返回结果
public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit)
public class dlock { private final logger logger; private static final long timeout_d = 100l; private static final string root_path_d = "/dlock"; private string lockrootpath; private curatorframework client; public dlock(curatorframework client) { this("/dlock", client); } public dlock(string lockrootpath, curatorframework client) { this.logger = loggerfactory.getlogger(dlock.class); this.lockrootpath = lockrootpath; this.client = client; } public interprocessmutex mutex(string path) { if (!stringutils.startswith(path, "/")) { path = constant.keybuilder(new object[]{"/", path}); } return new interprocessmutex(this.client, constant.keybuilder(new object[]{this.lockrootpath, "", path})); } public <t> t mutex(string path, zklockcallback<t> zklockcallback) throws zklockexception { return this.mutex(path, zklockcallback, 100l, timeunit.milliseconds); } public <t> t mutex(string path, zklockcallback<t> zklockcallback, long time, timeunit timeunit) throws zklockexception { string finalpath = this.getlockpath(path); interprocessmutex mutex = new interprocessmutex(this.client, finalpath); try { if (!mutex.acquire(time, timeunit)) { throw new zklockexception("acquire zk lock return false"); } } catch (exception var13) { throw new zklockexception("acquire zk lock failed.", var13); } t var8; try { var8 = zklockcallback.doinlock(); } finally { this.releaselock(finalpath, mutex); } return var8; } private void releaselock(string finalpath, interprocessmutex mutex) { try { mutex.release(); this.logger.info("delete zk node path:{}", finalpath); this.deleteinternal(finalpath); } catch (exception var4) { this.logger.error("dlock", "release lock failed, path:{}", finalpath, var4); // logutil.error(this.logger, "dlock", "release lock failed, path:{}", new object[]{finalpath, var4}); } } public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit) throws zklockexception { string finalpath = this.getlockpath(path); interprocessmutex mutex = new interprocessmutex(this.client, finalpath); try { if (!mutex.acquire(time, timeunit)) { throw new zklockexception("acquire zk lock return false"); } } catch (exception var13) { throw new zklockexception("acquire zk lock failed.", var13); } try { zklockcallback.response(); } finally { this.releaselock(finalpath, mutex); } } public string getlockpath(string custompath) { if (!stringutils.startswith(custompath, "/")) { custompath = constant.keybuilder(new object[]{"/", custompath}); } string finalpath = constant.keybuilder(new object[]{this.lockrootpath, "", custompath}); return finalpath; } private void deleteinternal(string finalpath) { try { ((errorlistenerpathable)this.client.delete().inbackground()).forpath(finalpath); } catch (exception var3) { this.logger.info("delete zk node path:{} failed", finalpath); } } public void del(string custompath) { string lockpath = ""; try { lockpath = this.getlockpath(custompath); ((errorlistenerpathable)this.client.delete().inbackground()).forpath(lockpath); } catch (exception var4) { this.logger.info("delete zk node path:{} failed", lockpath); } } }
@functionalinterface public interface zklockcallback<t> { t doinlock(); } @functionalinterface public interface zkvoidcallback { void response(); } public class zklockexception extends exception { public zklockexception() { } public zklockexception(string message) { super(message); } public zklockexception(string message, throwable cause) { super(message, cause); } }
配置curatorconfig
@configuration public class curatorconfig { @value("${zk.connectionstring}") private string connectionstring; @value("${zk.sessiontimeoutms:500}") private int sessiontimeoutms; @value("${zk.connectiontimeoutms:500}") private int connectiontimeoutms; @value("${zk.dlockroot:/dlock}") private string dlockroot; @bean public curatorfactorybean curatorfactorybean() { return new curatorfactorybean(connectionstring, sessiontimeoutms, connectiontimeoutms); } @bean @autowired public dlock dlock(curatorframework client) { return new dlock(dlockroot, client); } }
测试代码
@restcontroller @requestmapping("/dlock") public class lockcontroller { @autowired private dlock dlock; @requestmapping("/lock") public map testdlock(string no){ final string path = constant.keybuilder("/test/no/", no); long mutex=0l; try { system.out.println("在拿锁:"+path+system.currenttimemillis()); mutex = dlock.mutex(path, () -> { try { system.out.println("拿到锁了" + system.currenttimemillis()); thread.sleep(10000); system.out.println("操作完成了" + system.currenttimemillis()); } finally { return system.currenttimemillis(); } }, 1000, timeunit.milliseconds); } catch (zklockexception e) { system.out.println("拿不到锁呀"+system.currenttimemillis()); } return collections.singletonmap("ret",mutex); } @requestmapping("/dlock") public map testdlock1(string no){ final string path = constant.keybuilder("/test/no/", no); long mutex=0l; try { system.out.println("在拿锁:"+path+system.currenttimemillis()); interprocessmutex processmutex = dlock.mutex(path); processmutex.acquire(); system.out.println("拿到锁了" + system.currenttimemillis()); thread.sleep(10000); processmutex.release(); system.out.println("操作完成了" + system.currenttimemillis()); } catch (zklockexception e) { system.out.println("拿不到锁呀"+system.currenttimemillis()); e.printstacktrace(); }catch (exception e){ e.printstacktrace(); } return collections.singletonmap("ret",mutex); } @requestmapping("/del") public map deldlock(string no){ final string path = constant.keybuilder("/test/no/", no); dlock.del(path); return collections.singletonmap("ret",1); } }
以上所述是小编给大家介绍的java(springboot)基于zookeeper的分布式锁实现详解整合,希望对大家有所帮助