java使用zookeeper实现的分布式锁示例
使用zookeeper实现的分布式锁
分布式锁,实现了lock接口
package com.concurrent;
import java.io.ioexception;
import java.util.arraylist;
import java.util.collections;
import java.util.list;
import java.util.concurrent.countdownlatch;
import java.util.concurrent.timeunit;
import java.util.concurrent.locks.condition;
import java.util.concurrent.locks.lock;
import org.apache.zookeeper.createmode;
import org.apache.zookeeper.keeperexception;
import org.apache.zookeeper.watchedevent;
import org.apache.zookeeper.watcher;
import org.apache.zookeeper.zoodefs;
import org.apache.zookeeper.zookeeper;
import org.apache.zookeeper.data.stat;
/**
distributedlock lock = null;
try {
lock = new distributedlock("127.0.0.1:2182","test");
lock.lock();
//do something...
} catch (exception e) {
e.printstacktrace();
}
finally {
if(lock != null)
lock.unlock();
}
* @author xueliang
*
*/
public class distributedlock implements lock, watcher{
private zookeeper zk;
private string root = "/locks";//根
private string lockname;//竞争资源的标志
private string waitnode;//等待前一个锁
private string myznode;//当前锁
private countdownlatch latch;//计数器
private int sessiontimeout = 30000;
private list<exception> exception = new arraylist<exception>();
/**
* 创建分布式锁,使用前请确认config配置的zookeeper服务可用
* @param config 127.0.0.1:2181
* @param lockname 竞争资源标志,lockname中不能包含单词lock
*/
public distributedlock(string config, string lockname){
this.lockname = lockname;
// 创建一个与服务器的连接
try {
zk = new zookeeper(config, sessiontimeout, this);
stat stat = zk.exists(root, false);
if(stat == null){
// 创建根节点
zk.create(root, new byte[0], zoodefs.ids.open_acl_unsafe,createmode.persistent);
}
} catch (ioexception e) {
exception.add(e);
} catch (keeperexception e) {
exception.add(e);
} catch (interruptedexception e) {
exception.add(e);
}
}
/**
* zookeeper节点的监视器
*/
public void process(watchedevent event) {
if(this.latch != null) {
this.latch.countdown();
}
}
public void lock() {
if(exception.size() > 0){
throw new lockexception(exception.get(0));
}
try {
if(this.trylock()){
system.out.println("thread " + thread.currentthread().getid() + " " +myznode + " get lock true");
return;
}
else{
waitforlock(waitnode, sessiontimeout);//等待锁
}
} catch (keeperexception e) {
throw new lockexception(e);
} catch (interruptedexception e) {
throw new lockexception(e);
}
}
public boolean trylock() {
try {
string splitstr = "_lock_";
if(lockname.contains(splitstr))
throw new lockexception("lockname can not contains \\u000b");
//创建临时子节点
myznode = zk.create(root + "/" + lockname + splitstr, new byte[0], zoodefs.ids.open_acl_unsafe,createmode.ephemeral_sequential);
system.out.println(myznode + " is created ");
//取出所有子节点
list<string> subnodes = zk.getchildren(root, false);
//取出所有lockname的锁
list<string> lockobjnodes = new arraylist<string>();
for (string node : subnodes) {
string _node = node.split(splitstr)[0];
if(_node.equals(lockname)){
lockobjnodes.add(node);
}
}
collections.sort(lockobjnodes);
system.out.println(myznode + "==" + lockobjnodes.get(0));
if(myznode.equals(root+"/"+lockobjnodes.get(0))){
//如果是最小的节点,则表示取得锁
return true;
}
//如果不是最小的节点,找到比自己小1的节点
string submyznode = myznode.substring(myznode.lastindexof("/") + 1);
waitnode = lockobjnodes.get(collections.binarysearch(lockobjnodes, submyznode) - 1);
} catch (keeperexception e) {
throw new lockexception(e);
} catch (interruptedexception e) {
throw new lockexception(e);
}
return false;
}
public boolean trylock(long time, timeunit unit) {
try {
if(this.trylock()){
return true;
}
return waitforlock(waitnode,time);
} catch (exception e) {
e.printstacktrace();
}
return false;
}
private boolean waitforlock(string lower, long waittime) throws interruptedexception, keeperexception {
stat stat = zk.exists(root + "/" + lower,true);
//判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
if(stat != null){
system.out.println("thread " + thread.currentthread().getid() + " waiting for " + root + "/" + lower);
this.latch = new countdownlatch(1);
this.latch.await(waittime, timeunit.milliseconds);
this.latch = null;
}
return true;
}
public void unlock() {
try {
system.out.println("unlock " + myznode);
zk.delete(myznode,-1);
myznode = null;
zk.close();
} catch (interruptedexception e) {
e.printstacktrace();
} catch (keeperexception e) {
e.printstacktrace();
}
}
public void lockinterruptibly() throws interruptedexception {
this.lock();
}
public condition newcondition() {
return null;
}
public class lockexception extends runtimeexception {
private static final long serialversionuid = 1l;
public lockexception(string e){
super(e);
}
public lockexception(exception e){
super(e);
}
}
}
并发测试工具
package com.concurrent;
import java.util.arraylist;
import java.util.collections;
import java.util.list;
import java.util.concurrent.copyonwritearraylist;
import java.util.concurrent.countdownlatch;
import java.util.concurrent.atomic.atomicinteger;
/**
concurrenttask[] task = new concurrenttask[5];
for(int i=0;i<task.length;i++){
task[i] = new concurrenttask(){
public void run() {
system.out.println("==============");
}};
}
new concurrenttest(task);
* @author xueliang
*
*/
public class concurrenttest {
private countdownlatch startsignal = new countdownlatch(1);//开始阀门
private countdownlatch donesignal = null;//结束阀门
private copyonwritearraylist<long> list = new copyonwritearraylist<long>();
private atomicinteger err = new atomicinteger();//原子递增
private concurrenttask[] task = null;
public concurrenttest(concurrenttask... task){
this.task = task;
if(task == null){
system.out.println("task can not null");
system.exit(1);
}
donesignal = new countdownlatch(task.length);
start();
}
/**
* @param args
* @throws classnotfoundexception
*/
private void start(){
//创建线程,并将所有线程等待在阀门处
createthread();
//打开阀门
startsignal.countdown();//递减锁存器的计数,如果计数到达零,则释放所有等待的线程
try {
donesignal.await();//等待所有线程都执行完毕
} catch (interruptedexception e) {
e.printstacktrace();
}
//计算执行时间
getexetime();
}
/**
* 初始化所有线程,并在阀门处等待
*/
private void createthread() {
long len = donesignal.getcount();
for (int i = 0; i < len; i++) {
final int j = i;
new thread(new runnable(){
public void run() {
try {
startsignal.await();//使当前线程在锁存器倒计数至零之前一直等待
long start = system.currenttimemillis();
task[j].run();
long end = (system.currenttimemillis() - start);
list.add(end);
} catch (exception e) {
err.getandincrement();//相当于err++
}
donesignal.countdown();
}
}).start();
}
}
/**
* 计算平均响应时间
*/
private void getexetime() {
int size = list.size();
list<long> _list = new arraylist<long>(size);
_list.addall(list);
collections.sort(_list);
long min = _list.get(0);
long max = _list.get(size-1);
long sum = 0l;
for (long t : _list) {
sum += t;
}
long avg = sum/size;
system.out.println("min: " + min);
system.out.println("max: " + max);
system.out.println("avg: " + avg);
system.out.println("err: " + err.get());
}
public interface concurrenttask {
void run();
}
}
测试
package com.concurrent;
import com.concurrent.concurrenttest.concurrenttask;
public class zktest {
public static void main(string[] args) {
runnable task1 = new runnable(){
public void run() {
distributedlock lock = null;
try {
lock = new distributedlock("127.0.0.1:2182","test1");
//lock = new distributedlock("127.0.0.1:2182","test2");
lock.lock();
thread.sleep(3000);
system.out.println("===thread " + thread.currentthread().getid() + " running");
} catch (exception e) {
e.printstacktrace();
}
finally {
if(lock != null)
lock.unlock();
}
}
};
new thread(task1).start();
try {
thread.sleep(1000);
} catch (interruptedexception e1) {
e1.printstacktrace();
}
concurrenttask[] tasks = new concurrenttask[60];
for(int i=0;i<tasks.length;i++){
concurrenttask task3 = new concurrenttask(){
public void run() {
distributedlock lock = null;
try {
lock = new distributedlock("127.0.0.1:2183","test2");
lock.lock();
system.out.println("thread " + thread.currentthread().getid() + " running");
} catch (exception e) {
e.printstacktrace();
}
finally {
lock.unlock();
}
}
};
tasks[i] = task3;
}
new concurrenttest(tasks);
}
}