zookeeper客户端Curator介绍及分布式锁的使用
前言
zookeeper是一款分布式协调中间件,简称zk,zk的使用场景比较多,比如:注册中心、分布式锁、配置中心、leader选举等
目前在java中使用zk的的客户端有Curator、原生官方提供的zookeeper包,推荐使用的是Curator,因为Curator做了大部分场景下的需求的封装,相较于原生api来说使用更简单
zk使用
我们先介绍下java中使用Curator的基本案例
先使用maven依赖相关包
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<!--分布式锁、leader选举、队列...-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
增删改查代码
public class ZkCuratorDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework =
CuratorFrameworkFactory.builder().
connectString("localhost:2181").//集群可以写多个,以,分隔
sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry
(1000, 3)).// 重试机制,重试三次
connectionTimeoutMs(4000).build();
curatorFramework.start(); //表示启动.
//创建
create(curatorFramework);
//修改
// update(curatorFramework);
// operatorWithAsync(curatorFramework);
// authOperation(curatorFramework);
// get(curatorFramework);
// addNodeCacheListener(curatorFramework,"/first");
// addPathChildCacheListener(curatorFramework, "/first");
System.in.read();
}
private static String get(CuratorFramework curatorFramework) throws Exception {
String rs = new String(curatorFramework.getData().forPath("/first"));
System.out.println(rs);
return rs;
}
private static String create(CuratorFramework curatorFramework) throws Exception {
String path = curatorFramework.create().
creatingParentsIfNeeded() // TODO 当父节点不存在时自动创建
.withMode(CreateMode.PERSISTENT) // TODO 模式,有持久化节点、临时节点、有序节点等
.forPath("/first", "Hello".getBytes());
System.out.println("创建成功的节点: " + path);
return path;
}
private static String update(CuratorFramework curatorFramework) throws Exception {
curatorFramework.setData().forPath("/first", "Hello2".getBytes());
return null;
}
/**
* 监听节点变化,不包括子节点
*/
private static void addNodeCacheListener(CuratorFramework curatorFramework, String path) throws Exception {
NodeCache nodeCache = new NodeCache(curatorFramework, path, false);
NodeCacheListener nodeCacheListener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("Receive Node Changed");
System.out.println("" + nodeCache.getCurrentData().getPath() + "->" + new String(nodeCache.getCurrentData().getData()));
}
};
nodeCache.getListenable().addListener(nodeCacheListener);
nodeCache.start();
}
/**
* 监听子节点变化,只监听该节点的子节点变化,包括新增子节点,改变子节点,不包括孙子节点
*/
private static void addPathChildCacheListener(CuratorFramework curatorFramework, String path) throws Exception {
PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, path, true);
PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点事件变更的回调");
ChildData childData = pathChildrenCacheEvent.getData();
System.out.println(childData.getPath() + "-" + new String(childData.getData()));
}
};
childrenCache.getListenable().addListener(childrenCacheListener);
childrenCache.start(PathChildrenCache.StartMode.NORMAL);
}
}
zk分布式锁介绍
zk中的分布式锁主要依赖是zk的有序节点+watcher的特性,基本原理为
-
多个线程同时向zk注册一个有序节点,如在节点/locks下面注册一个有序节点lock,此时会创建节点如:lock0000000001、lock0000000002、lock0000000003…
-
当注册成功的节点发现自己是节点中最小的节点时,即获取到锁的权限,如上述lock0000000001,而lock0000000002发现存在比自己小的节点lock0000000001,所以lock0000000002会向lock0000000001节点的删除事件注册一个监听事件(watcher事件),当lock0000000001节点被删除时会通知lock0000000002节点,然后lock0000000002获取锁的权限,依次类推,这便是zk实现的一个公平性分布式锁
使用Curator客户端来实现的代码如下:
public class ZkLockDemo {
public static void main(String[] args) {
CuratorFramework curatorFramework =
CuratorFrameworkFactory.builder().
connectString("localhost:2181").
sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry
(1000, 3)).
connectionTimeoutMs(4000).build();
curatorFramework.start(); //表示启动.
/**
* locks 表示命名空间
* 锁的获取逻辑是放在zookeeper
* 当前锁是跨进程可见
*/
InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/locks");
for (int i = 0; i < 10; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "->尝试抢占锁");
try {
// 抢占锁,没有抢到,则阻塞
lock.acquire();
// TODO 支持带超时时间的抢占锁
// lock.acquire(5, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName() + "->获取锁成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 模拟4秒后释放锁
Thread.sleep(4000);
// 释放锁
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "->释放锁成功");
}
}, "t-" + i).start();
}
}
}
可以看到使用代码非常简单,因为Curator已经帮我们封装好了,如果需要看节点的变化需要使用连接到zk服务端去查看节点的变化,可以看到节点依次从xxx001往后删除
以上就是使用Curator客户端使用zookeeper的代码
上一篇: ZooKeeper安装 及 分布式搭建
下一篇: 分布式文件系统介绍