欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

zookeeper客户端Curator介绍及分布式锁的使用

程序员文章站 2022-07-12 17:14:27
...

前言

zookeeper是一款分布式协调中间件,简称zk,zk的使用场景比较多,比如:注册中心、分布式锁、配置中心、leader选举等

目前在java中使用zk的的客户端有Curator、原生官方提供的zookeeper包,推荐使用的是Curator,因为Curator做了大部分场景下的需求的封装,相较于原生api来说使用更简单

zk使用

我们先介绍下java中使用Curator的基本案例

先使用maven依赖相关包

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
</dependency>
<!--分布式锁、leader选举、队列...-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>

增删改查代码

public class ZkCuratorDemo {

    public static void main(String[] args) throws Exception {
        CuratorFramework curatorFramework =
                CuratorFrameworkFactory.builder().
                        connectString("localhost:2181").//集群可以写多个,以,分隔
                        sessionTimeoutMs(5000).
                        retryPolicy(new ExponentialBackoffRetry
                                (1000, 3)).// 重试机制,重试三次
                        connectionTimeoutMs(4000).build();
        curatorFramework.start(); //表示启动.
        //创建
        create(curatorFramework);
        //修改
//        update(curatorFramework);
//        operatorWithAsync(curatorFramework);

//        authOperation(curatorFramework);
//        get(curatorFramework);

//        addNodeCacheListener(curatorFramework,"/first");
//        addPathChildCacheListener(curatorFramework, "/first");
        System.in.read();
    }

    private static String get(CuratorFramework curatorFramework) throws Exception {
        String rs = new String(curatorFramework.getData().forPath("/first"));
        System.out.println(rs);
        return rs;
    }

    private static String create(CuratorFramework curatorFramework) throws Exception {

        String path = curatorFramework.create().
                creatingParentsIfNeeded() // TODO 当父节点不存在时自动创建
                .withMode(CreateMode.PERSISTENT) // TODO 模式,有持久化节点、临时节点、有序节点等
                .forPath("/first", "Hello".getBytes());
        System.out.println("创建成功的节点: " + path);
        return path;
    }

    private static String update(CuratorFramework curatorFramework) throws Exception {
        curatorFramework.setData().forPath("/first", "Hello2".getBytes());
        return null;
    }

    /**
     * 监听节点变化,不包括子节点
     */
    private static void addNodeCacheListener(CuratorFramework curatorFramework, String path) throws Exception {
        NodeCache nodeCache = new NodeCache(curatorFramework, path, false);
        NodeCacheListener nodeCacheListener = new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("Receive Node Changed");
                System.out.println("" + nodeCache.getCurrentData().getPath() + "->" + new String(nodeCache.getCurrentData().getData()));
            }
        };
        nodeCache.getListenable().addListener(nodeCacheListener);
        nodeCache.start();
    }

    /**
     * 监听子节点变化,只监听该节点的子节点变化,包括新增子节点,改变子节点,不包括孙子节点
     */
    private static void addPathChildCacheListener(CuratorFramework curatorFramework, String path) throws Exception {
        PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, path, true);
        PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                System.out.println("子节点事件变更的回调");
                ChildData childData = pathChildrenCacheEvent.getData();
                System.out.println(childData.getPath() + "-" + new String(childData.getData()));
            }
        };
        childrenCache.getListenable().addListener(childrenCacheListener);
        childrenCache.start(PathChildrenCache.StartMode.NORMAL);
    }


}

zk分布式锁介绍

zk中的分布式锁主要依赖是zk的有序节点+watcher的特性,基本原理为

  • 多个线程同时向zk注册一个有序节点,如在节点/locks下面注册一个有序节点lock,此时会创建节点如:lock0000000001、lock0000000002、lock0000000003…

  • 当注册成功的节点发现自己是节点中最小的节点时,即获取到锁的权限,如上述lock0000000001,而lock0000000002发现存在比自己小的节点lock0000000001,所以lock0000000002会向lock0000000001节点的删除事件注册一个监听事件(watcher事件),当lock0000000001节点被删除时会通知lock0000000002节点,然后lock0000000002获取锁的权限,依次类推,这便是zk实现的一个公平性分布式锁

使用Curator客户端来实现的代码如下:

public class ZkLockDemo {

    public static void main(String[] args) {

        CuratorFramework curatorFramework =
                CuratorFrameworkFactory.builder().
                        connectString("localhost:2181").
                        sessionTimeoutMs(5000).
                        retryPolicy(new ExponentialBackoffRetry
                                (1000, 3)).
                        connectionTimeoutMs(4000).build();
        curatorFramework.start(); //表示启动.

        /**
         * locks 表示命名空间
         * 锁的获取逻辑是放在zookeeper
         * 当前锁是跨进程可见
         */
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/locks");
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "->尝试抢占锁");
                try {
                    // 抢占锁,没有抢到,则阻塞
                    lock.acquire();
                    // TODO 支持带超时时间的抢占锁
//                    lock.acquire(5, TimeUnit.SECONDS);
                    System.out.println(Thread.currentThread().getName() + "->获取锁成功");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        // 模拟4秒后释放锁
                        Thread.sleep(4000);
                        // 释放锁
                        lock.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "->释放锁成功");
                }

            }, "t-" + i).start();
        }
    }
}

可以看到使用代码非常简单,因为Curator已经帮我们封装好了,如果需要看节点的变化需要使用连接到zk服务端去查看节点的变化,可以看到节点依次从xxx001往后删除

以上就是使用Curator客户端使用zookeeper的代码