Curator zookeeper 分布式锁实现
程序员文章站
2022-07-12 18:32:48
...
curator版本:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.5.0</version> </dependency>
代码实现
private boolean internalLock(long time, TimeUnit unit) throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null )//当前线程已经获得锁(重入),对线程数据的锁+1 { // re-entering lockData.lockCount.incrementAndGet(); return true; } //尝试获取锁,获取锁成功返回zk路径 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }
代码调用
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; while ( !isDone ) { isDone = true; try { //创建节点,模式:EPHEMERAL_SEQUENTIAL 直译:瞬态的-有序的,不会持久化,client与zk服务断掉链接的时候,此节点会自动删除,这样在服务停机时再重启时,不会出现锁无法释放 if ( localLockNodeBytes != null ) { ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes); } else { ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); } //循环尝试获取锁 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // gets thrown by StandardLockInternalsDriver when it can't find the lock node // this can happen when the session expires, etc. So, if the retry allows, just try it all again if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } if ( hasTheLock ) { return ourPath; } return null; }
代码调用
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash //验证当前线程持有的node是不是路径下最靠前的node,如果是,获取到锁,否则没有获取到 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { //获取到路径下排在当前节点前面的节点,watch此节点,防止大规模的watch String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } //获取不到,等待指定时间,等待watcher叫醒 wait(millisToWait); } else { //获取不到锁,等待watcher叫醒 wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { doDelete = true; throw e; } finally { if ( doDelete ) { deleteOurPath(ourPath); } } return haveTheLock; } @Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { int ourIndex = children.indexOf(sequenceNodeName); validateOurIndex(sequenceNodeName, ourIndex); //判断是否是第一个节点 boolean getsTheLock = ourIndex < maxLeases; String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases); return new PredicateResults(pathToWatch, getsTheLock); }
zookeeper四种模式的节点:
/** * The znode will not be automatically deleted upon client's disconnect. */ PERSISTENT (0, false, false), /** * The znode will not be automatically deleted upon client's disconnect, * and its name will be appended with a monotonically increasing number. */ PERSISTENT_SEQUENTIAL (2, false, true), /** * The znode will be deleted upon the client's disconnect. */ EPHEMERAL (1, true, false), /** * The znode will be deleted upon the client's disconnect, and its name * will be appended with a monotonically increasing number. */ EPHEMERAL_SEQUENTIAL (3, true, true);
curator分布式锁实现创建的是EPHEMERAL_SEQUENTIAL,使用时不用担心服务停掉,锁无法释放的问题
上一篇: spring 事件机制 异步操作