Zookeeper(七)分布式锁
程序员文章站
2022-07-14 11:17:22
...
源:http://blog.csdn.net/java2000_wl/article/details/8694270
评:
获取锁实现思路:
1. 首先创建一个作为锁目录(znode),通常用它来描述锁定的实体,称为:/lock_node
2. 希望获得锁的客户端在锁目录下创建znode,作为锁/lock_node的子节点,并且节点类型为有序临时节点(EPHEMERAL_SEQUENTIAL);
例如:有两个客户端创建znode,分别为/lock_node/lock-1和/lock_node/lock-2
3. 当前客户端调用getChildren(/lock_node)得到锁目录所有子节点,不设置watch,接着获取小于自己(步骤2创建)的兄弟节点
4. 步骤3中获取小于自己的节点不存在 && 最小节点与步骤2中创建的相同,说明当前客户端顺序号最小,获得锁,结束。
5. 客户端监视(watch)相对自己次小的有序临时节点状态
6. 如果监视的次小节点状态发生变化,则跳转到步骤3,继续后续操作,直到退出锁竞争。
[java] view plain copy
print?
public synchronized boolean lock() throws KeeperException, InterruptedException {
if (isClosed()) {
return false;
}
// 如果锁目录不存在, 创建锁目录 节点类型为永久类型
ensurePathExists(dir);
// 创建锁节点,节点类型EPHEMERAL_SEQUENTIAL
// 如果不存在小于自己的节点 并且最小节点 与当前创建的节点相同 获得锁
// 未获得成功,对当前次小节点设置watch
return (Boolean) retryOperation(zop);
}
创建锁目录
[java] view plain copy
print?
protected void ensurePathExists(String path) {
ensureExists(path, null, acl, CreateMode.PERSISTENT);
}
[java] view plain copy
print?
protected void ensureExists(final String path, final byte[] data,
final List<ACL> acl, final CreateMode flags) {
try {
retryOperation(new ZooKeeperOperation() {
public boolean execute() throws KeeperException, InterruptedException {
// 创建锁目录
Stat stat = zookeeper.exists(path, false);
// 节点如果存在 直接返回
if (stat != null) {
return true;
}
// 创建节点
// data为null
// flags为持久化节点
zookeeper.create(path, data, acl, flags);
return true;
}
});
} catch (KeeperException e) {
LOG.warn("Caught: " + e, e);
} catch (InterruptedException e) {
LOG.warn("Caught: " + e, e);
}
}
创建锁节点,获得锁目录下的所有节点, 如果为最小节点 获得锁成功
[java] view plain copy
print?
/**
* the command that is run and retried for actually
* obtaining the lock
* @return if the command was successful or not
*/
public boolean execute() throws KeeperException, InterruptedException {
do {
if (id == null) {
long sessionId = zookeeper.getSessionId();
String prefix = "x-" + sessionId + "-";
// lets try look up the current ID if we failed
// in the middle of creating the znode
findPrefixInChildren(prefix, zookeeper, dir);
idName = new ZNodeName(id);
}
if (id != null) {
List<String> names = zookeeper.getChildren(dir, false);
if (names.isEmpty()) {
LOG.warn("No children in: " + dir + " when we've just " +
"created one! Lets recreate it...");
// lets force the recreation of the id
id = null;
} else {
// lets sort them explicitly (though they do seem to come back in order ususally
SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
for (String name : names) {
sortedNames.add(new ZNodeName(dir + "/" + name));
}
// 获得最小节点
ownerId = sortedNames.first().getName();
// lock_1, lock_2, lock_3 传入参数lock_2 返回lock_1
SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
if (!lessThanMe.isEmpty()) {
ZNodeName lastChildName = lessThanMe.last();
lastChildId = lastChildName.getName();
if (LOG.isDebugEnabled()) {
LOG.debug("watching less than me node: " + lastChildId);
}
// 次小节点设置watch
Stat stat = zookeeper.exists(lastChildId, new LockWatcher());
if (stat != null) {
return Boolean.FALSE;
} else {
LOG.warn("Could not find the" +
" stats for less than me: " + lastChildName.getName());
}
} else {
// 锁目录下的最小节点 与当前客户端创建相同
if (isOwner()) {
if (callback != null) {
callback.lockAcquired();
}
// 获得锁
return Boolean.TRUE;
}
}
}
}
}
while (id == null);
return Boolean.FALSE;
}
};
[java] view plain copy
print?
private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
throws KeeperException, InterruptedException {
// 获取锁目录下的所有子节点
List<String> names = zookeeper.getChildren(dir, false);
for (String name : names) {
//x-sessionId-
if (name.startsWith(prefix)) {
id = name;
if (LOG.isDebugEnabled()) {
LOG.debug("Found id created last time: " + id);
}
break;
}
}
// 当前锁目录下 没有与当前会话对应的子节点 创建子节点 节点类型为临时顺序节点
if (id == null) {
// dir/x-sessionId-i
id = zookeeper.create(dir + "/" + prefix, data,
getAcl(), EPHEMERAL_SEQUENTIAL);
if (LOG.isDebugEnabled()) {
LOG.debug("Created id: " + id);
}
}
释放锁:
释放锁非常简单,删除步骤1中创建的有序临时节点。另外,如果客户端进程死亡或连接失效,对应的节点也会被删除。
[java] view plain copy
print?
public synchronized void unlock() throws RuntimeException {
if (!isClosed() && id != null) {
// we don't need to retry this operation in the case of failure
// as ZK will remove ephemeral files and we don't wanna hang
// this process when closing if we cannot reconnect to ZK
try {
ZooKeeperOperation zopdel = new ZooKeeperOperation() {
public boolean execute() throws KeeperException,
InterruptedException {
// 删除节点 忽略版本
zookeeper.delete(id, -1);
return Boolean.TRUE;
}
};
zopdel.execute();
} catch (InterruptedException e) {
LOG.warn("Caught: " + e, e);
//set that we have been interrupted.
Thread.currentThread().interrupt();
} catch (KeeperException.NoNodeException e) {
// do nothing
} catch (KeeperException e) {
LOG.warn("Caught: " + e, e);
throw (RuntimeException) new RuntimeException(e.getMessage()).
initCause(e);
}
finally {
if (callback != null) {
callback.lockReleased();
}
id = null;
}
}
}
评:
获取锁实现思路:
1. 首先创建一个作为锁目录(znode),通常用它来描述锁定的实体,称为:/lock_node
2. 希望获得锁的客户端在锁目录下创建znode,作为锁/lock_node的子节点,并且节点类型为有序临时节点(EPHEMERAL_SEQUENTIAL);
例如:有两个客户端创建znode,分别为/lock_node/lock-1和/lock_node/lock-2
3. 当前客户端调用getChildren(/lock_node)得到锁目录所有子节点,不设置watch,接着获取小于自己(步骤2创建)的兄弟节点
4. 步骤3中获取小于自己的节点不存在 && 最小节点与步骤2中创建的相同,说明当前客户端顺序号最小,获得锁,结束。
5. 客户端监视(watch)相对自己次小的有序临时节点状态
6. 如果监视的次小节点状态发生变化,则跳转到步骤3,继续后续操作,直到退出锁竞争。
[java] view plain copy
print?
public synchronized boolean lock() throws KeeperException, InterruptedException {
if (isClosed()) {
return false;
}
// 如果锁目录不存在, 创建锁目录 节点类型为永久类型
ensurePathExists(dir);
// 创建锁节点,节点类型EPHEMERAL_SEQUENTIAL
// 如果不存在小于自己的节点 并且最小节点 与当前创建的节点相同 获得锁
// 未获得成功,对当前次小节点设置watch
return (Boolean) retryOperation(zop);
}
创建锁目录
[java] view plain copy
print?
protected void ensurePathExists(String path) {
ensureExists(path, null, acl, CreateMode.PERSISTENT);
}
[java] view plain copy
print?
protected void ensureExists(final String path, final byte[] data,
final List<ACL> acl, final CreateMode flags) {
try {
retryOperation(new ZooKeeperOperation() {
public boolean execute() throws KeeperException, InterruptedException {
// 创建锁目录
Stat stat = zookeeper.exists(path, false);
// 节点如果存在 直接返回
if (stat != null) {
return true;
}
// 创建节点
// data为null
// flags为持久化节点
zookeeper.create(path, data, acl, flags);
return true;
}
});
} catch (KeeperException e) {
LOG.warn("Caught: " + e, e);
} catch (InterruptedException e) {
LOG.warn("Caught: " + e, e);
}
}
创建锁节点,获得锁目录下的所有节点, 如果为最小节点 获得锁成功
[java] view plain copy
print?
/**
* the command that is run and retried for actually
* obtaining the lock
* @return if the command was successful or not
*/
public boolean execute() throws KeeperException, InterruptedException {
do {
if (id == null) {
long sessionId = zookeeper.getSessionId();
String prefix = "x-" + sessionId + "-";
// lets try look up the current ID if we failed
// in the middle of creating the znode
findPrefixInChildren(prefix, zookeeper, dir);
idName = new ZNodeName(id);
}
if (id != null) {
List<String> names = zookeeper.getChildren(dir, false);
if (names.isEmpty()) {
LOG.warn("No children in: " + dir + " when we've just " +
"created one! Lets recreate it...");
// lets force the recreation of the id
id = null;
} else {
// lets sort them explicitly (though they do seem to come back in order ususally
SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
for (String name : names) {
sortedNames.add(new ZNodeName(dir + "/" + name));
}
// 获得最小节点
ownerId = sortedNames.first().getName();
// lock_1, lock_2, lock_3 传入参数lock_2 返回lock_1
SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
if (!lessThanMe.isEmpty()) {
ZNodeName lastChildName = lessThanMe.last();
lastChildId = lastChildName.getName();
if (LOG.isDebugEnabled()) {
LOG.debug("watching less than me node: " + lastChildId);
}
// 次小节点设置watch
Stat stat = zookeeper.exists(lastChildId, new LockWatcher());
if (stat != null) {
return Boolean.FALSE;
} else {
LOG.warn("Could not find the" +
" stats for less than me: " + lastChildName.getName());
}
} else {
// 锁目录下的最小节点 与当前客户端创建相同
if (isOwner()) {
if (callback != null) {
callback.lockAcquired();
}
// 获得锁
return Boolean.TRUE;
}
}
}
}
}
while (id == null);
return Boolean.FALSE;
}
};
[java] view plain copy
print?
private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
throws KeeperException, InterruptedException {
// 获取锁目录下的所有子节点
List<String> names = zookeeper.getChildren(dir, false);
for (String name : names) {
//x-sessionId-
if (name.startsWith(prefix)) {
id = name;
if (LOG.isDebugEnabled()) {
LOG.debug("Found id created last time: " + id);
}
break;
}
}
// 当前锁目录下 没有与当前会话对应的子节点 创建子节点 节点类型为临时顺序节点
if (id == null) {
// dir/x-sessionId-i
id = zookeeper.create(dir + "/" + prefix, data,
getAcl(), EPHEMERAL_SEQUENTIAL);
if (LOG.isDebugEnabled()) {
LOG.debug("Created id: " + id);
}
}
释放锁:
释放锁非常简单,删除步骤1中创建的有序临时节点。另外,如果客户端进程死亡或连接失效,对应的节点也会被删除。
[java] view plain copy
print?
public synchronized void unlock() throws RuntimeException {
if (!isClosed() && id != null) {
// we don't need to retry this operation in the case of failure
// as ZK will remove ephemeral files and we don't wanna hang
// this process when closing if we cannot reconnect to ZK
try {
ZooKeeperOperation zopdel = new ZooKeeperOperation() {
public boolean execute() throws KeeperException,
InterruptedException {
// 删除节点 忽略版本
zookeeper.delete(id, -1);
return Boolean.TRUE;
}
};
zopdel.execute();
} catch (InterruptedException e) {
LOG.warn("Caught: " + e, e);
//set that we have been interrupted.
Thread.currentThread().interrupt();
} catch (KeeperException.NoNodeException e) {
// do nothing
} catch (KeeperException e) {
LOG.warn("Caught: " + e, e);
throw (RuntimeException) new RuntimeException(e.getMessage()).
initCause(e);
}
finally {
if (callback != null) {
callback.lockReleased();
}
id = null;
}
}
}