ZooKeeper实现读写锁
在上一篇文章,我们已经实现了分布式锁。今天更进一步,在分布式锁的基础之上,实现读写锁。
完整代码在 https://github.com/seemsilly/codestory/tree/master/research-zoo-keeper
1 读写锁的概念
参考*的条目: https://zh.wikipedia.org/wiki/读写锁
读写锁是计算机程序的并发控制的一种同步机制,用于解决读写问题,读操作可并发重入,写操作是互斥的。 读写锁有多种读写权限的优先级策略,可以设计为读优先、写优先或不指定优先级。
- 读优先:允许最大并发的读操作,但可能会饿死写操作;因为写操作必须在没有任何读操作的时候才能够执行。
- 写优先:只要排队队列中有写操作,读操作就必须等待;
- 不指定优先级:对读操作和写操作不做任何优先级的假设
不指定优先级的策略,最适合使用zookeeper的子节点模式来实现,今天就来尝试这种策略。
2 锁设计
同前面介绍的普通分布式锁,也使用子节点模式实现。先用容器模式(createmode.container)创建唯一的锁节点,每个锁客户端在锁节点下使用临时循序模式(createmode. sequential)创建子节点。这些子节点会自动在名称后面追加10位数字。
2.1 如何标识读锁还是写锁?
有两种简单的方案:在子节点名中标识、在节点的值中标识。如果采用在值中标识,每次子节点列表后,还需要再分别读一下子节点的值,才能判断是读锁还是写锁,会比较耗时。如果在子节点名称中标识,会面临一个问题:在同一个节点中创建的子节点,如果给定的名称不同,追加的10位数字是否仍然是递归的?
写个测试用例验证一下。
public class sequentialtest extends testbase {
@test
public void testsequential() throws exception {
string rootnodename = "/container-" + system.currenttimemillis();
zookeeperbase zookeeper = new zookeeperbase(address);
zookeeper.createrootnode(rootnodename, createmode.container);
random random = new securerandom();
long lastnumber = -1l;
string[] prefixs = new string[] {"/a", "/b", "/c", "/d", "/e", "/f", "/g"};
for (int i = 0; i < 10; i++) {
int index = random.nextint(prefixs.length);
string childnodename = rootnodename + prefixs[index];
string fullnodename = zookeeper.getzookeeper().create(childnodename, new byte[0],
zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
long number = long.parselong(fullnodename.substring(childnodename.length()));
assert number == lastnumber + 1;
lastnumber = number;
}
}
}
测试用例通过,说明在同一个container中创建的子节点,不论提供的节点名是什么,后续追加的10位数字都是顺序递增的。这样,就可以使用节点名来区分读锁和写锁。
2.2 类设计
介绍分布式锁的时候,已经创建了阻塞锁 childrenblockinglock,读写锁正好可以基于这个类做重载。
2.3 获取锁的逻辑
写锁是一个独占锁,逻辑跟普通分布式锁相同,只要它之前有锁就必须等待。所以,完全沿用阻塞锁的逻辑即可。
读锁允许并发,它之前可以有任意读锁,但不能有写锁。所以只需要判断有没有写锁即可。
3 关键代码
3.1 childrennodelock.java
这个类,主要是增加了一个获取排序后子节点列表的方法,这样方便实现读写锁的代码。当然,这个操作会增加一些耗时,如果子节点数量太大,可能不适用。
首先定义一个函数,用来返回子节点的前缀
/** 子节点的前缀,缺省是element,子类可以重载 */ protected string getchildprefix() { return "element"; }
然后定义一个内部类,子节点排序时会用到
/** 子节点名称比较 */ private class stringcompare implements comparator<string> { @override public int compare(string string1, string string2) { return string1.substring(string1.length() - 10) .compareto(string2.substring(string2.length() - 10)); } }
最后实现子节点排序方法,用于代替 getchildren 函数
/** 获取排好序的子节点列表 */ final public list<string> getorderedchildren(string path, boolean watch) throws keeperexception, interruptedexception { list<string> children = getzookeeper().getchildren(path, watch); collections.sort(children, new stringcompare()); return children; }
3.2 childrenblockinglock.java
在多客户端随机测试时,经常出现程序卡死的情况,无法正常退出。经过添加日志跟踪,发现watchedevent可能会丢失,也可能会发送给并不是注册事件的zookeeper客户端。在网上搜索,发现很多人也碰到类似问题。
简单修改了一下childrenblockinglock#islocksuccess等待信号的代码,从无参数的死等变成设置一定超时时间等待。关键代码如下
protected boolean islocksuccess() { boolean locksuccess; try { while (true) { string prevelementname = getprevelementname(); if (prevelementname == null) { log.trace("{} 没有更靠前的子节点,加锁成功", elementnodename); locksuccess = true; break; } else { // 有更小的节点,说明当前节点没抢到锁,注册前一个节点的监听。 log.trace("{} 监控 {} 的事件", elementnodename, prevelementname); getzookeeper().exists(this.guidnodename + "/" + prevelementname, true); synchronized (mutex) { // 等待最多一秒 mutex.wait(1000); log.trace("{} 监控的 {} 有子节点变化", elementnodename, guidnodename); } } } } catch (keeperexception e) { locksuccess = false; } catch (interruptedexception e) { locksuccess = false; } return locksuccess; }
3.3 写锁 zookeeperwritelock.java
代码基本是沿用父类,只需要重载getchildprefix()方法,
/** 返回写锁的前缀 */ protected string getchildprefix() { return "w-lock-"; }
3.4 读锁 zookeeperreadlock.java
同写锁相比,除了重载getchildprefix()方法,还重载了getprevelementname()用来查找最近一个写锁。
/** 返回读锁的前缀 */ protected string getchildprefix() { return "r-lock-"; } /** 是写锁 */ private boolean iswritelock(string elementname) { return elementname.startswith(zookeeperwritelock.flag); } /** 读取前一个写锁 */ protected string getprevelementname() throws keeperexception, interruptedexception { list<string> elementnames = super.getorderedchildren(this.guidnodename, false); super.traceorderedchildren(this.guidnodename, elementnames); string prevwriteelementname = null; for (string oneelementname : elementnames) { if (this.elementnodefullname.endswith(oneelementname)) { // 已经到了当前节点 break; } if (iswritelock(oneelementname)) { prevwriteelementname = oneelementname; } } return prevwriteelementname; }
4 测试用例
测试用例没想到好的判断方法,很难使用assert判断结果,因此做了简化,根据日志输出,靠人眼判断是否正确。
4.1 测试线程类
分别为都锁和写锁构建了两个内部类
/** 写锁线程 */ class writelockclient extends thread { zookeeperwritelock writelock; public writelockclient() { try { this.writelock = new zookeeperwritelock(address); } catch (ioexception e) { } } public void run() { writelock.lock(guidnodename, this.getname()); try { thread.sleep(1000 + random.nextint(20) * 100); } catch (interruptedexception e) { } writelock.release(guidnodename, this.getname()); } } /** 读锁线程 */ class readlockclient extends thread { zookeeperreadlock readlock; public readlockclient() { try { this.readlock = new zookeeperreadlock(address); } catch (ioexception e) { } }
public void run() { readlock.lock(guidnodename, this.getname()); try { thread.sleep(1000 + random.nextint(20) * 100); } catch (interruptedexception e) { } readlock.release(guidnodename, this.getname()); try { readlock.getzookeeper().close(); } catch (interruptedexception e) { } } }
4.2 读-读锁测试
代码
@test public void testreadread() throws ioexception, interruptedexception { readlockclient readlock1 = new readlockclient(); readlockclient readlock2 = new readlockclient(); readlock1.start(); readlock2.start(); readlock1.join(); readlock2.join(); }
测试结果可以看到,两个读锁并发执行
22:18.861 [thread-2 info] r-lock-0000000000 get read lock : true 22:18.865 [thread-1 info] r-lock-0000000001 get read lock : true 22:20.065 [thread-2 info] r-lock-0000000000 release read lock 22:21.366 [thread-1 info] r-lock-0000000001 release read lock
4.3 读-写锁测试
代码
@test public void testreadwrite() throws ioexception, interruptedexception { readlockclient readlock1 = new readlockclient(); writelockclient writelock1 = new writelockclient(); readlock1.start(); thread.sleep(50); writelock1.start(); readlock1.join(); writelock1.join(); }
测试结果可以看到,首先获取读锁,释放之后才获取到写锁。
27:40.800 [thread-1 info] r-lock-0000000000 get read lock : true 27:43.310 [thread-1 info] r-lock-0000000000 release read lock 27:43.423 [thread-2 info] w-lock-0000000001 get write lock : true 27:44.423 [thread-2 info] w-lock-0000000001 release write lock
4.4 写-读锁测试
代码
@test public void testwriteread() throws ioexception, interruptedexception { readlockclient readlock1 = new readlockclient(); writelockclient writelock1 = new writelockclient(); writelock1.start(); thread.sleep(50); readlock1.start(); writelock1.join(); readlock1.join(); }
测试结果可以看到,首先获取写锁,释放之后才获取到读锁。
29:17.661 [thread-2 info] w-lock-0000000000 get write lock : true 29:19.966 [thread-2 info] w-lock-0000000000 release write lock 29:19.976 [thread-1 info] r-lock-0000000001 get read lock : true 29:22.476 [thread-1 info] r-lock-0000000001 release read lock
4.5 多客户端随机读写锁测试
测试代码
@test public void testrandomreadwritelock() throws ioexception, interruptedexception { int threadcount = 20; thread[] lockthreads = new thread[threadcount]; for (int i = 0; i < threadcount; i++) { // 一定概率是写锁 boolean writelock = random.nextint(5) == 0; if (writelock) { lockthreads[i] = new writelockclient(); } else { lockthreads[i] = new readlockclient(); } lockthreads[i].start(); } for (int i = 0; i < threadcount; i++) { lockthreads[i].join(); } }
测试结果可以看出,如果连续多个读锁会并发执行。为了方便查看,我添加了一些横线分隔。
30:31.317 [thread-1 info] w-lock-0000000000 get write lock : true 30:32.824 [thread-1 info] w-lock-0000000000 release write lock ------------------------------------------------------------------ 30:32.834 [thread-17 info] r-lock-0000000004 get read lock : true 30:32.835 [thread-19 info] r-lock-0000000002 get read lock : true 30:32.835 [thread-20 info] r-lock-0000000001 get read lock : true 30:32.836 [thread-18 info] r-lock-0000000003 get read lock : true 30:34.135 [thread-20 info] r-lock-0000000001 release read lock 30:34.634 [thread-17 info] r-lock-0000000004 release read lock 30:34.935 [thread-19 info] r-lock-0000000002 release read lock 30:35.036 [thread-18 info] r-lock-0000000003 release read lock ------------------------------------------------------------------ 30:35.053 [thread-16 info] w-lock-0000000005 get write lock : true 30:36.154 [thread-16 info] w-lock-0000000005 release write lock ------------------------------------------------------------------ 30:36.160 [thread-14 info] r-lock-0000000007 get read lock : true 30:36.160 [thread-15 info] r-lock-0000000006 get read lock : true 30:38.160 [thread-14 info] r-lock-0000000007 release read lock 30:38.661 [thread-15 info] r-lock-0000000006 release read lock ------------------------------------------------------------------ 30:38.669 [thread-13 info] w-lock-0000000008 get write lock : true 30:39.969 [thread-13 info] w-lock-0000000008 release write lock ------------------------------------------------------------------ 30:39.976 [thread-12 info] r-lock-0000000009 get read lock : true 30:39.977 [thread-8 info] r-lock-0000000014 get read lock : true 30:39.977 [thread-6 info] r-lock-0000000015 get read lock : true 30:39.984 [thread-10 info] r-lock-0000000011 get read lock : true 30:39.985 [thread-3 info] r-lock-0000000018 get read lock : true 30:39.984 [thread-7 info] r-lock-0000000013 get read lock : true 30:39.984 [thread-11 info] r-lock-0000000010 get read lock : true 30:39.983 [thread-9 info] r-lock-0000000012 get read lock : true 30:39.983 [thread-2 info] r-lock-0000000019 get read lock : true 30:39.982 [thread-5 info] r-lock-0000000016 get read lock : true 30:39.986 [thread-4 info] r-lock-0000000017 get read lock : true 30:40.986 [thread-3 info] r-lock-0000000018 release read lock 30:41.086 [thread-2 info] r-lock-0000000019 release read lock 30:41.285 [thread-6 info] r-lock-0000000015 release read lock 30:41.576 [thread-12 info] r-lock-0000000009 release read lock 30:42.185 [thread-10 info] r-lock-0000000011 release read lock 30:42.186 [thread-5 info] r-lock-0000000016 release read lock 30:42.187 [thread-11 info] r-lock-0000000010 release read lock 30:42.286 [thread-9 info] r-lock-0000000012 release read lock 30:42.586 [thread-7 info] r-lock-0000000013 release read lock 30:42.677 [thread-8 info] r-lock-0000000014 release read lock 30:42.887 [thread-4 info] r-lock-0000000017 release read lock