欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ZooKeeper实现分布式事物锁 ZooKeeper分布式事物 

程序员文章站 2022-05-28 12:52:40
...

闲暇时研究了下基于ZooKeeper实现的分布式事务锁,做下记录,便于回顾复习

public class DistributeLock implements Lock,Watcher {

	//锁的名称
	private String lockName;
	private long TIMEOUT = 3000;
	//当前锁的节点
	private String currentLockNode;
	//前一个锁的节点
	private String prevLockNode;
	private ZooKeeper zk;
	private int SESSION_TIME_OUT = 2000;
	private String ROOT_LOCK = "/locks";
	private String PATH_PREFIX = "_locks";
	//计数器-也为锁的触发器
	private CountDownLatch countDownLatch;
	
	public DistributeLock(String ip,String lockName) {
		this.lockName = lockName;
		try {
			zk = new ZooKeeper(ip, SESSION_TIME_OUT, this);
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public void process(WatchedEvent event) {
		if(countDownLatch!=null) {
			countDownLatch.countDown();
		}
	}

	@Override
	public void lock() {
		if(tryLock()) {
			return ;
		}else {
			waitForLock(TIMEOUT);
		}
	}
	
	public boolean waitForLock(long timeOut) {
		Stat stat;
		try {
			stat = zk.exists(prevLockNode, true);
			if(stat!=null) {
				countDownLatch = new CountDownLatch(1);
				//设置超时时间    (不清楚会不会出现异常情况)
				countDownLatch.await(TIMEOUT,TimeUnit.MILLISECONDS);
				countDownLatch = null;
			}
			return true;
		} catch (KeeperException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return false;
	}

	@Override
	public void lockInterruptibly() throws InterruptedException {
		if(zk!=null) {			
			zk.close();
		}
		
	}

	@Override
	public Condition newCondition() {
		return null;
	}

	@Override
	public boolean tryLock() {
		try {
			Stat stat = zk.exists(ROOT_LOCK, false);
			//没有根节点
			if(stat==null) {
				zk.create(ROOT_LOCK, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			}
			if(lockName.contains(PATH_PREFIX)) {
				System.out.println("锁名称错误");
				return false;
			}
			String nodePath = zk.create(ROOT_LOCK+"/"+lockName+"/"+PATH_PREFIX, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			//获取根目录下的子节点
			List<String> childList = zk.getChildren(ROOT_LOCK, false);
			List<String> lockPathList = new ArrayList<String>();
			for(String childNode:childList) {
				String splitePath = childNode.split("/"+PATH_PREFIX)[0];
				if(splitePath.equals(ROOT_LOCK+"/"+lockName)) {
					lockPathList.add(childNode);
				}
			}
			Collections.sort(lockPathList);
			//当前节点为最小节点
			if(nodePath.equals(lockPathList.get(0))) {
				currentLockNode = nodePath;
				return true;
			}
			//前一个节点
			prevLockNode = lockPathList.get(lockPathList.indexOf(lockPathList)-1);
			return false;
		} catch (KeeperException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return false;
	}

	@Override
	public boolean tryLock(long timeOut, TimeUnit timeUnit)
			throws InterruptedException {
		if(tryLock()) {
			return true;
		}
		return waitForLock(timeOut);
		
	}

	@Override
	public void unlock() {
		try {
			if(zk!=null) {
				zk.delete(currentLockNode, -1);
				zk.close();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (KeeperException e) {
			e.printStackTrace();
		}
	}
}