欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

zookeeper实现共享锁

程序员文章站 2022-06-04 11:32:42
...
共享锁(Shared Locks,简称S锁),又称为读锁。

如果事务T1对数据对象O1加上了共享锁,那么T1只能对O1进行读操作,其他事务也能同时对O1加共享锁(不能是排他锁),直到O1上的所有共享锁都释放后O1才能被加排他锁。


public class ZookeeperShareLock implements Watcher {
	private ZooKeeper zk;
	String root = "/ty";
	private String path;
	private String currentNode;
	private String waitNode;
	CountDownLatch latch;
	private CountDownLatch cl2 = new CountDownLatch(1);

	public ZookeeperShareLock(String host, String path) {
		this.path = path;
		try {
			zk = new ZooKeeper(host, 50000, this);
			cl2.await();
			System.err.println("开始执行");
		} catch (Exception e) {
			e.printStackTrace();
		}
		try {
			Stat sta = zk.exists(root, false);
			if (null == sta) {
				zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			}

		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	public void lock() {
		try {
			currentNode = zk.create(root + "/" + path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
					CreateMode.EPHEMERAL_SEQUENTIAL);
		} catch (Exception e) {
			e.printStackTrace();
		}
		try {
			List<String> lockObjNodes = zk.getChildren(root, false);
			Collections.sort(lockObjNodes); // 排序 //最小的那个 0 1 2 3 4
			if (currentNode.equals(root + "/" + lockObjNodes.get(0))) {
				return;
			} else {
				// lock_0001
				String childZnode = currentNode.substring(currentNode.lastIndexOf("/") + 1);// 获取当前节点
				int num = Collections.binarySearch(lockObjNodes, childZnode);// 当前节点去找下
				// 看看在什么问题
				if (num == 0) {
					num = 1;
				}
				waitNode = lockObjNodes.get(num - 1);
				Stat stat = zk.exists(root + "/" + waitNode, true);
				if (null != stat) {
					latch = new CountDownLatch(1);
					this.latch.await(5000, TimeUnit.MILLISECONDS);
				}
			}

		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	public void unLock() {
		try {
			zk.delete(currentNode, -1);
			currentNode = null;
			// zk.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public void process(WatchedEvent event) {
		System.out.println("进入 process 。。。。。event = " + event);
		try {
			Thread.sleep(200);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		if (event == null) {
			return;
		}

		// 连接状态
		KeeperState keeperState = event.getState();
		// 事件类型
		EventType eventType = event.getType();
		// 受影响的path
		String path = event.getPath();

		String logPrefix = "";
		System.out.println(logPrefix + "收到Watcher通知");
		System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());
		System.out.println(logPrefix + "事件类型:\t" + eventType.toString());

		if (KeeperState.SyncConnected == keeperState) {
			// 成功连接上ZK服务器
			if (EventType.None == eventType) {
				System.out.println("成功连接上ZK服务器");
				cl2.countDown();
			}
			// 删除节点
			else if (EventType.NodeDeleted == eventType) {
				if (null != latch) {
					latch.countDown();
				}
			} else
				;
		} else if (KeeperState.Disconnected == keeperState) {
			System.out.println(logPrefix + "与ZK服务器断开连接");
		} else if (KeeperState.AuthFailed == keeperState) {
			System.out.println(logPrefix + "权限检查失败");
		} else if (KeeperState.Expired == keeperState) {
			System.out.println(logPrefix + "会话失效");
		} else
			;

		System.out.println("--------------------------------------------");

	}
}