欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Zookeeper入门实战

程序员文章站 2022-04-14 16:12:54
Zookeeper是一个为分布式应用提供一致性协调服务的中间件,主要用来解决分布式应用中经常遇到的一些一致性问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。本文主要包括Zookeeper简介、安装、命令行操作、java操作Zookeeper等,文中所使用到的软件版本:Jav ......

zookeeper是一个为分布式应用提供一致性协调服务的中间件,主要用来解决分布式应用中经常遇到的一些一致性问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。本文主要包括zookeeper简介、安装、命令行操作、java操作zookeeper等,文中所使用到的软件版本:java 1.8.0_191、zookeeper 3.6.0、junit 4.13、centos 7.6。

1、简介

1.1、设计目标

zookeeper is simple.
zookeeper is replicated.
zookeeper is ordered.
zookeeper is fast.

zookeeper是简单、可复制、有序、快速的。

1.2、数据模型和层次命名空间

zookeeper提供的命名空间与标准文件系统的命名空间非常类似。命名空间由一系列路径组成,用/分隔。zookeeper命名空间中的每个节点使用一个具体路径来标识。zookeeper的层次命名空间结构如下:

Zookeeper入门实战

1.3、节点

与标准文件系统不同的是,zookeeper命名空间的每个节点可以保存数据,就像一个文件系统中的文件,它既是文件也是目录。zookeeper用来存储状态信息、配置、位置信息等,因此存储在每个节点上的数据通常很小,在字节到千字节范围内。有四种类型的节点:

临时节点(ephemeral):会话结束该节点自动被删除,临时节点不能拥有子节点
临时顺序节点(ephemeral_sequential):具有临时节点特征,但是它会在节点名称后面增加一个序列号,分布式锁中会用到该类型节点
持久节点(persistent):创建后永久存在,可以自动删除;也可以设置一个存活时间,当指定存活时间过去以后,如果相应的节点没有得到更新且没有直接的,就会被自动删除
持久顺序节点(persistent_sequential):具有持久节点特征,但是它会在节点名称后面增加一个序列号

注:顺序节点中序列号对于此节点的父节点是唯一的,它是一个10位的数字,如果这个序列号大于2^32-1就会溢出。

1.4、更新和监视

客户端可以监视一个节点,当该节点发生变化时会,客户端会收到该节点变化的通知;一个监视器只会触发一次,触发后会删除该监视器。如果客户端和其中一个zookeeper服务器之间的连接中断,则客户端将收到一个本地通知。

1.5、状态信息

zxid:zookeeper每次状态改变都收到一个zxid(zookeeper transaction id),zxid是全局有序的,每次更新都会产生一个新的,且后面的大于前面的。
版本:每次节点改变都会使该节点的版本号增加,有三中版本号:dataversion(数据版本号)、cversion(子节点版本号)、aclversion(节点所拥有的acl版本号)

通过stat [-w] path可以查看节点的具体状态信息:

czxid 创建节点时的事务id
ctime 创建节点时的时间
mzxid 最后修改节点的事务id
mtime 最后修改节点的时间
pzxid 该节点的子节点最后一次修改的事务id,添加子节点或删除子节点就会影响子节点列表,但是修改子节点的数据内容则不影响该id
cversion 子节点版本号,子节点每次修改版本号加1
dataversion 数据版本号,数据每次修改该版本号加1
aclversion 权限版本号,权限每次修改该版本号加1
ephemeralowner 节点的会话id,只有临时节点有,持久节点值为0
datalength 节点的数据长度
numchildren 节点的子节点数量

1.6、特性

zookeeper的目标是作为构建其他复杂服务的基石,因此它提供了一系列的特性:

一致性:数据一致性, 数据按照顺序分批入库
原子性:事务要么成功要么失败
单一视图:客户端连接集群中的任意zk节点, 数据都是一致的
可靠性:每次对zk的操作状态都会保存在服务端
实时性::客户端可以读取到zk服务端的最新数据

2、zoo.cfg参数说明

clientport zookeeper服务器对客户端暴露的端口
datadir zookeeper服务器存储快照文件的目录,事务日志文件默认也保存在该目录下,除非另外指定。
datalogdir 服务器存储事务日志文件的目录,默认与datadir一致。建议将它和datadir分别配置,防止磁盘的并发读写,影响服务器性能。可将其配置在一个单独的磁盘上。
ticktime 服务器最小时间单元,默认值3000ms
initlimit leader服务器等待follewer服务器启动,并完成数据同步的时间,默认为10,表示10*ticktime
synclimit leader服务器和follewer服务器之间进行心跳检测的间隔时间,默认为5,表示5*ticktime
server.id zookeeper集群的机器列表,其中id为serverid,与myid文件中的值对应。第一个端口用于指定leader服务器和follewer服务器进行运行时通信和数据同步所使用的端口,第二个端口用于进行leader选举过程中的投票通信

3、安装

3.1、单机版安装

3.1.1、下载并解压zookeeper

下载地址:http://zookeeper.apache.org/releases.html

解压:tar zxvf zookeeper-3.6.0.tar.gz

3.1.2、修改配置文件

zoo.cfg默认不存在,可以从zoo_sample.cfg conf拷贝一份:

cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg

配置文件zoo.cfg中的内容可以使用文件中的默认值,也可以根据实际需要修改配置项:

datadir=/home/hadoop/app/apache-zookeeper-3.6.0-bin/data

3.1.3、启动停止

cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/bin
zkserver.sh start  #启动
zkserver.sh stop  #停止

3.2、集群安装

假设在172.17.139.160、172.17.139.161、172.17.139.162三台机器上安装。

3.2.1、下载并解压zookeeper(每台机器)

下载地址:http://zookeeper.apache.org/releases.html

解压:tar zxvf zookeeper-3.6.0.tar.gz

3.2.2、修改zoo.cfg配置文件(每台机器)

zoo.cfg默认不存在,可以从zoo_sample.cfg conf拷贝一份:

cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg

zoo.cfg中集群与单机的配置不同的地方是server.id参数,其他根据实际需要修改配置项:

datadir=/home/hadoop/app/apache-zookeeper-3.6.0-bin/data
server.1=172.17.139.160:2555:3555
server.2=172.17.139.161:2555:3555
server.3=172.17.139.162:2555:3555

3.2.3、创建myid文件(每台机器)

在datadir(/home/hadoop/app/apache-zookeeper-3.6.0-bin/data)目录下创建myid文件,文件内容为该zookeeeper在集群中的id,对应上面zoo.cfg中server.后的数字。

172.17.139.160:/home/hadoop/app/apache-zookeeper-3.6.0-bin/data/myid 文件内容:

1

172.17.139.161:/home/hadoop/app/apache-zookeeper-3.6.0-bin/data/myid 文件内容:

2

172.17.139.162:/home/hadoop/app/apache-zookeeper-3.6.0-bin/data/myid 文件内容:

3

3.2.4、启动停止(每台机器)

cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/bin
zkserver.sh start #启动
zkserver.sh stop  #停止

4、命令行

bin/zkcli.sh可以启动一个客户端连接到zookeeper:

bin/zkcli.sh [-server host:port]

不加server参数,默认连接到本地2181端口;启动后可以输入help/h查看使用方法:

[zk: localhost:2181(connected) 4] help
zookeeper -server host:port cmd args
        addwatch [-m mode] path # optional mode is one of [persistent, persistent_recursive] - default is persistent_recursive
        addauth scheme auth
        close 
        config [-c] [-w] [-s]
        connect host:port
        create [-s] [-e] [-c] [-t ttl] path [data] [acl]
        delete [-v version] path
        deleteall path [-b batch size]
        delquota [-n|-b] path
        get [-s] [-w] path
        getacl [-s] path
        getallchildrennumber path
        getephemerals path
        history 
        listquota path
        ls [-s] [-w] [-r] path
        printwatches on|off
        quit 
        reconfig [-s] [-v version] [[-file path] | [-members serverid=host:port1:port2;port3[,...]*]] | [-add serverid=host:port1:port2;port3[,...]]* [-remove serverid[,...]*]
        redo cmdno
        removewatches path [-c|-d|-a] [-l]
        set [-s] [-v version] path data
        setacl [-s] [-v version] [-r] path acl
        setquota -n|-b val path
        stat [-w] path
        sync path
        version 
command not found: command not found help
[zk: localhost:2181(connected) 5] 

4.1、列出子节点

ls [-s] [-w] [-r] path

-s:显示节点状态信息
-w:监听该节点
-r:递归查看所有子节点

如:ls /

4.2、创建节点

create [-s] [-e] [-c] [-t ttl] path [data] [acl]

-s:顺序节点
-e:临时节点
-t:设置存活时间(针对持久节点,单位秒);需要开启,默认是关闭的,参见第6小节:ttl(time to life)
acl:权限控制

如:create /test test

4.3、查看节点

get [-s] [-w] path

-s:显示状态
-w:监听该节点

如:get /test

4.4、设置节点

set [-s] [-v version] path data

-s:返回状态信息
-v:设置版本信息

如:set /test testaa

4.4、查看节点状态

stat [-w] path

-w:监视该节点

如:stat /test

4.5、删除节点

delete [-v version] path

-v:指定版本信息

如:delete /test

4.6、设置权限

setacl [-s] [-v version] [-r] path acl

-s:返回状态信息
-v:指定版本信息
-r:递归设置权限

4.7、查看权限

getacl [-s] path

-s:返回状态信息

5、权限控制acl(access control list)

zookeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限,子节点不会继承父节点的权限;acl由三个字段组成:schema:id:permission。

5.1、schema(权限模式)

world 只有一个id,anyone,代表所有人
auth 使用已添加认证的用户认证
digest 使用“用户名:密码”方式认证
ip 使用ip地址认证
x509 使用客户端x500 principal认证

5.2、id(授权对象)

权限赋予的用户或者一个实体

word对应的id只有一个:anyone
digest自定义id,通常为“usernmae:base64(sha-1(username:password))”
ip对应的id为一个ip或ip段,如10.49.196.10、10.49.196.0、24

5.3、permission(权限)

create(c) 可以创建子节点
read(r) 可以读取节点数据及显示子节点列表
write(w) 可以设置节点数据
delete(d) 可以删除子节点(仅下一级节点)
admin(a) 可以设置节点权限

5.4、例子

5.4.1、word例子

setacl /acltest world:anyone:cdrwa

创建节点时如果没有设置权限,这是默认的权限。

5.4.2、auth例子

addauth digest jack:123456 #先添加认证用户
setacl /acltest auth:jack:cdrwa

再开一个终端需先添加认证用户(addauth digest jack:123456)才能访问/actltest

5.4.3、digest例子

echo -n jack:123456 | openssl dgst -binary -sha1 | openssl base64#得到密文tgi9ucnypo5fjjvylkr05nalweg=
setacl /acltest digest:jack:tgi9ucnypo5fjjvylkr05nalweg=:cdrwa

添加认证用户(addauth digest jack:123456)后才能访问/actltest。

5.4.4、ip例子

setacl /acltest ip:10.49.196.10:cdrwa

10.49.196.10的机器才能访问/actltest。

6、ttl(time to life)

 在zookeeper中,当创建一个persistent或者persistent_sequential节点的时候,可以有选择的给这个节点设置一个存活时间(ttl);当指定存活时间过去以后,如果该节点没有得到更新且没有直接的,就会被自动删除。

默认该特性是关闭的,如果需要设置java系统属性:zookeeper.extendedtypesenabled;由于ttl节点是在3.5.3版本增加的,3.5.4/3.6.0版本并不支持,所以在3.5.4/3.6.0等其他版本还需设置另外一个java系统属性:dzookeeper.emulate353ttlnodes。可以修改zkserver.sh脚本,增加:

-dzookeeper.extendedtypesenabled=true -dzookeeper.emulate353ttlnodes=true

在zkserver.sh脚本里查找到start关键字,在如下图所示的地方增加上面的代码,如何重启zookeeper即可。

Zookeeper入门实战

7、java操作zookeeper

7.1、原生api操作zookeeper

7.1.1、引入依赖

<dependency>
    <groupid>org.apache.zookeeper</groupid>
    <artifactid>zookeeper</artifactid>
    <version>3.6.0</version>
</dependency>

<dependency>
    <groupid>junit</groupid>
    <artifactid>junit</artifactid>
    <version>4.13</version>
</dependency> 

7.1.2、基本操作

package com.inspur.demo.general.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.acl;
import org.apache.zookeeper.data.id;
import org.apache.zookeeper.data.stat;
import org.junit.after;
import org.junit.before;
import org.junit.test;

import java.io.ioexception;
import java.util.arrays;
import java.util.collections;
import java.util.concurrent.countdownlatch;

/**
 * zookeeper基本操作列子
 */
public class zookeepercase {
    //zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181
    private static string connectstring = "10.49.196.10:2181";
    private static int sessiontimeout = 2 * 1000;

    private zookeeper zookeeper;

    @before
    public void before() {
        try {
            zookeeper =  new zookeeper(connectstring, sessiontimeout, new watcher() {
                @override
                public void process(watchedevent watchedevent) {

                }
            });
            system.out.println(zookeeper.getstate());
        } catch (ioexception e) {
            e.printstacktrace();
        }
    }

    @after
    public void after() throws exception {
        zookeeper.close();
    }

    /**
     * 创建节点
     */
    @test
    public void create() throws exception {
        /*
         * 同步创建持久节点,acl为world:anyone:cdrwa
         * 等同于该命令:create /javatest/node1 test world:anyone:cdrwa
         */
        zookeeper.create("/javatest/node1", "test".getbytes(), zoodefs.ids.open_acl_unsafe, createmode.persistent);

        /*
         * 同步创建持久节点,acl为world:anyone:cr
         * 等同于该命令:create /javatest/node2 test world:anyone:cr
         */
        zookeeper.create("/javatest/node2", "test".getbytes(), collections.singletonlist(new acl((zoodefs.perms.create + zoodefs.perms.read), zoodefs.ids.anyone_id_unsafe)), createmode.persistent);

        /*
         * 异步创建临时顺序节点,acl为ip:127.0.0.1:c
         * 等同于该命令:create -s -e /javatest/node3 test ip:127.0.0.1:c
         */
        countdownlatch counter = new countdownlatch(1);
        zookeeper.create("/javatest/node3", "test".getbytes(), collections.singletonlist(new acl(zoodefs.perms.create, new id("ip", "127.0.0.1"))), createmode.ephemeral_sequential
            ,new asynccallback.stringcallback() {
                @override
                public void processresult(int rc, string path, object ctx, string name) {
                    system.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ", name=" + name);
                    counter.countdown();
                }
            }, "上下文对象,异步回调时会传递给callback");
        counter.await();

        /*
         * 同步创建持久节点,acl为digest:jack:tgi9ucnypo5fjjvylkr05nalweg=:cdrwa
         * 等同于该命令:create /javatest/node4 test digest:jack:tgi9ucnypo5fjjvylkr05nalweg=:cdrwa
         * 添加认证用户(addauth digest jack:123456)后才能访问/javatest/node4
         */
        zookeeper.create("/javatest/node4", "test".getbytes(), collections.singletonlist(new acl(zoodefs.perms.all, new id("digest", "jack:tgi9ucnypo5fjjvylkr05nalweg="))) , createmode.persistent);

        /*
         * 同步创建顺序持久节点,acl为world:anyone:cdrwa,存活时间为5秒
         * 等同于该命令:create -s -t 5000 /javatest/node5 test
         */
        stat stat = new stat();
        zookeeper.create("/javatest/node5", "test".getbytes(), zoodefs.ids.open_acl_unsafe, createmode.persistent_sequential_with_ttl, stat, 5000);
        system.out.println(stat);
    }

    /**
     * 获取节点数据
     * @throws exception
     */
    @test
    public void getdata() throws exception {
        //同步读取数据
        stat stat = new stat();
        byte[] data = zookeeper.getdata("/javatest/node1", false, stat);
        system.out.println(new string(data));
        system.out.println(stat);

        //异步读取数据
        zookeeper.addauthinfo("digest", "jack:123456".getbytes());
        countdownlatch counter = new countdownlatch(1);
        zookeeper.getdata("/javatest/node4", false, new asynccallback.datacallback() {
            @override
            public void processresult(int rc, string path, object ctx, byte[] data, stat stat) {
                string s = "";
                if (data != null) {
                    s = new string(data);
                }
                system.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",data=" + s + ",stat=" + stat);
                counter.countdown();
            }
        }, "上下文对象,异步回调时会传递给callback");
        counter.await();
    }

    @test
    public void setdata() throws exception {
        //同步设置数据,version为-1表示匹配任何版本
        stat stat = zookeeper.setdata("/javatest/node1", "test2".getbytes(), -1);
        system.out.println(stat);

        //异步设置数据
        zookeeper.addauthinfo("digest", "jack:123456".getbytes());
        countdownlatch counter = new countdownlatch(1);
        zookeeper.setdata("/javatest/node4", "test2".getbytes(), -1, new asynccallback.statcallback(){
            @override
            public void processresult(int rc, string path, object ctx, stat stat) {
                system.out.println("rc=" + rc + ",path=" + path + ",stat=" + stat);
                counter.countdown();
            }
        }, "上下文对象,异步回调时会传递给callback");
        counter.await();
    }

    @test
    public void delete() throws exception {
        //同步删除数据
        zookeeper.delete("/javatest/node1", -1);

        //异步删除数据
        countdownlatch counter = new countdownlatch(1);
        zookeeper.delete("/javatest/node2", -1,  new asynccallback.voidcallback(){
            @override
            public void processresult(int rc, string path, object ctx) {
                system.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx);
                counter.countdown();
            }
        }, "上下文对象,异步回调时会传递给callback");
        counter.await();
    }
}

7.1.3、监控节点

datamonitor类实现对节点的监控,节点有变化时会回调datamonitorlistener.process方法,该方法由调用方根据业务来实现;watchercase类传入需要的参数来启动datamonitor。

该例子是根据官网例子改造而来,相较官网更简单了些。

package com.inspur.demo.general.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.stat;

public class datamonitor implements runnable {
    private zookeeper zk;
    private datamonitorlistener listener;

    /**
     * 节点变化时会回调该方法,把监控变化类型及新数据带过来
     */
    public interface datamonitorlistener {
        void process(watchedevent event, byte[] data);
    }

    public datamonitor(string hostport, string znode, datamonitorlistener listener) throws exception {
        this.listener = listener;

        asynccallback.statcallback callback = new asynccallback.statcallback() {
            @override
            public void processresult(int rc, string path, object ctx, stat stat) {
                system.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",stat=" + stat);
                switch (rc) {
                    case keeperexception.code.ok:
                    case keeperexception.code.nonode:
                        return;
                    case keeperexception.code.sessionexpired:
                    case keeperexception.code.noauth:
                        close();
                        return;
                    default:
                        zk.exists(znode, true, this, null);
                        return;
                }
            }
        };
        //监视器
        watcher watcher = new watcher() {
            @override
            public void process(watchedevent event) {
                system.out.println(event);
                if (event.gettype() == event.eventtype.none) {
                    switch (event.getstate()) {
                        case syncconnected:
                            break;
                        case expired:
                            close();
                            break;
                    }
                } else {
                    try {
                        byte[] bytes = zk.getdata(event.getpath(), false, null);
                        listener.process(event, bytes);
                    } catch (exception e) {
                        e.printstacktrace();
                    }

                    if (event.getpath() != null && event.getpath().equals(znode)) {
                        //再次监控
                        zk.exists(znode, true, callback, null);
                    }
                }
            }
        };

        zk = new zookeeper("10.49.196.10:2181", 20000, watcher);
        zk.exists(znode, true, callback, null);
    }

    @override
    public void run() {
        try {
            synchronized (this) {
                wait();
            }
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
    }

    public void close() {
        synchronized (this) {
            notifyall();
        }
    }
}
package com.inspur.demo.general.zookeeper;

import org.apache.zookeeper.*;

/**
 * 监视节点样例
 */
public class watchercase {
    public static void main(string[] args) throws exception {
        datamonitor.datamonitorlistener listener = new datamonitor.datamonitorlistener() {
            @override
            public void process(watchedevent event, byte[] data) {
                //todo:根据实际情况处理
                if (event.gettype() == watcher.event.eventtype.nodedatachanged) {
                    system.out.println(new string(data));
                }
            }
        };
        new datamonitor("10.49.196.10:2181", "/watchtest", listener).run();
    }
}