ZooKeeper实现分布式事物锁 ZooKeeper分布式事物
程序员文章站
2022-05-28 12:52:40
...
闲暇时研究了下基于ZooKeeper实现的分布式事务锁,做下记录,便于回顾复习
public class DistributeLock implements Lock,Watcher { //锁的名称 private String lockName; private long TIMEOUT = 3000; //当前锁的节点 private String currentLockNode; //前一个锁的节点 private String prevLockNode; private ZooKeeper zk; private int SESSION_TIME_OUT = 2000; private String ROOT_LOCK = "/locks"; private String PATH_PREFIX = "_locks"; //计数器-也为锁的触发器 private CountDownLatch countDownLatch; public DistributeLock(String ip,String lockName) { this.lockName = lockName; try { zk = new ZooKeeper(ip, SESSION_TIME_OUT, this); } catch (IOException e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { if(countDownLatch!=null) { countDownLatch.countDown(); } } @Override public void lock() { if(tryLock()) { return ; }else { waitForLock(TIMEOUT); } } public boolean waitForLock(long timeOut) { Stat stat; try { stat = zk.exists(prevLockNode, true); if(stat!=null) { countDownLatch = new CountDownLatch(1); //设置超时时间 (不清楚会不会出现异常情况) countDownLatch.await(TIMEOUT,TimeUnit.MILLISECONDS); countDownLatch = null; } return true; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } @Override public void lockInterruptibly() throws InterruptedException { if(zk!=null) { zk.close(); } } @Override public Condition newCondition() { return null; } @Override public boolean tryLock() { try { Stat stat = zk.exists(ROOT_LOCK, false); //没有根节点 if(stat==null) { zk.create(ROOT_LOCK, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } if(lockName.contains(PATH_PREFIX)) { System.out.println("锁名称错误"); return false; } String nodePath = zk.create(ROOT_LOCK+"/"+lockName+"/"+PATH_PREFIX, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //获取根目录下的子节点 List<String> childList = zk.getChildren(ROOT_LOCK, false); List<String> lockPathList = new ArrayList<String>(); for(String childNode:childList) { String splitePath = childNode.split("/"+PATH_PREFIX)[0]; if(splitePath.equals(ROOT_LOCK+"/"+lockName)) { lockPathList.add(childNode); } } Collections.sort(lockPathList); //当前节点为最小节点 if(nodePath.equals(lockPathList.get(0))) { currentLockNode = nodePath; return true; } //前一个节点 prevLockNode = lockPathList.get(lockPathList.indexOf(lockPathList)-1); return false; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } @Override public boolean tryLock(long timeOut, TimeUnit timeUnit) throws InterruptedException { if(tryLock()) { return true; } return waitForLock(timeOut); } @Override public void unlock() { try { if(zk!=null) { zk.delete(currentLockNode, -1); zk.close(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } }