Zookeeper入门实战
zookeeper是一个为分布式应用提供一致性协调服务的中间件,主要用来解决分布式应用中经常遇到的一些一致性问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。本文主要包括zookeeper简介、安装、命令行操作、java操作zookeeper等,文中所使用到的软件版本:java 1.8.0_191、zookeeper 3.6.0、junit 4.13、centos 7.6。
1、简介
1.1、设计目标
zookeeper is simple.
zookeeper is replicated.
zookeeper is ordered.
zookeeper is fast.
zookeeper是简单、可复制、有序、快速的。
1.2、数据模型和层次命名空间
zookeeper提供的命名空间与标准文件系统的命名空间非常类似。命名空间由一系列路径组成,用/分隔。zookeeper命名空间中的每个节点使用一个具体路径来标识。zookeeper的层次命名空间结构如下:
1.3、节点
与标准文件系统不同的是,zookeeper命名空间的每个节点可以保存数据,就像一个文件系统中的文件,它既是文件也是目录。zookeeper用来存储状态信息、配置、位置信息等,因此存储在每个节点上的数据通常很小,在字节到千字节范围内。有四种类型的节点:
临时节点(ephemeral):会话结束该节点自动被删除,临时节点不能拥有子节点
临时顺序节点(ephemeral_sequential):具有临时节点特征,但是它会在节点名称后面增加一个序列号,分布式锁中会用到该类型节点
持久节点(persistent):创建后永久存在,可以自动删除;也可以设置一个存活时间,当指定存活时间过去以后,如果相应的节点没有得到更新且没有直接的,就会被自动删除
持久顺序节点(persistent_sequential):具有持久节点特征,但是它会在节点名称后面增加一个序列号
注:顺序节点中序列号对于此节点的父节点是唯一的,它是一个10位的数字,如果这个序列号大于2^32-1就会溢出。
1.4、更新和监视
客户端可以监视一个节点,当该节点发生变化时会,客户端会收到该节点变化的通知;一个监视器只会触发一次,触发后会删除该监视器。如果客户端和其中一个zookeeper服务器之间的连接中断,则客户端将收到一个本地通知。
1.5、状态信息
zxid:zookeeper每次状态改变都收到一个zxid(zookeeper transaction id),zxid是全局有序的,每次更新都会产生一个新的,且后面的大于前面的。
版本:每次节点改变都会使该节点的版本号增加,有三中版本号:dataversion(数据版本号)、cversion(子节点版本号)、aclversion(节点所拥有的acl版本号)
通过stat [-w] path可以查看节点的具体状态信息:
czxid 创建节点时的事务id
ctime 创建节点时的时间
mzxid 最后修改节点的事务id
mtime 最后修改节点的时间
pzxid 该节点的子节点最后一次修改的事务id,添加子节点或删除子节点就会影响子节点列表,但是修改子节点的数据内容则不影响该id
cversion 子节点版本号,子节点每次修改版本号加1
dataversion 数据版本号,数据每次修改该版本号加1
aclversion 权限版本号,权限每次修改该版本号加1
ephemeralowner 节点的会话id,只有临时节点有,持久节点值为0
datalength 节点的数据长度
numchildren 节点的子节点数量
1.6、特性
zookeeper的目标是作为构建其他复杂服务的基石,因此它提供了一系列的特性:
一致性:数据一致性, 数据按照顺序分批入库
原子性:事务要么成功要么失败
单一视图:客户端连接集群中的任意zk节点, 数据都是一致的
可靠性:每次对zk的操作状态都会保存在服务端
实时性::客户端可以读取到zk服务端的最新数据
2、zoo.cfg参数说明
clientport zookeeper服务器对客户端暴露的端口
datadir zookeeper服务器存储快照文件的目录,事务日志文件默认也保存在该目录下,除非另外指定。
datalogdir 服务器存储事务日志文件的目录,默认与datadir一致。建议将它和datadir分别配置,防止磁盘的并发读写,影响服务器性能。可将其配置在一个单独的磁盘上。
ticktime 服务器最小时间单元,默认值3000ms
initlimit leader服务器等待follewer服务器启动,并完成数据同步的时间,默认为10,表示10*ticktime
synclimit leader服务器和follewer服务器之间进行心跳检测的间隔时间,默认为5,表示5*ticktime
server.id zookeeper集群的机器列表,其中id为serverid,与myid文件中的值对应。第一个端口用于指定leader服务器和follewer服务器进行运行时通信和数据同步所使用的端口,第二个端口用于进行leader选举过程中的投票通信
3、安装
3.1、单机版安装
3.1.1、下载并解压zookeeper
下载地址:http://zookeeper.apache.org/releases.html
解压:tar zxvf zookeeper-3.6.0.tar.gz
3.1.2、修改配置文件
zoo.cfg默认不存在,可以从zoo_sample.cfg conf拷贝一份:
cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/conf cp zoo_sample.cfg zoo.cfg
配置文件zoo.cfg中的内容可以使用文件中的默认值,也可以根据实际需要修改配置项:
datadir=/home/hadoop/app/apache-zookeeper-3.6.0-bin/data
3.1.3、启动停止
cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/bin zkserver.sh start #启动 zkserver.sh stop #停止
3.2、集群安装
假设在172.17.139.160、172.17.139.161、172.17.139.162三台机器上安装。
3.2.1、下载并解压zookeeper(每台机器)
下载地址:http://zookeeper.apache.org/releases.html
解压:tar zxvf zookeeper-3.6.0.tar.gz
3.2.2、修改zoo.cfg配置文件(每台机器)
zoo.cfg默认不存在,可以从zoo_sample.cfg conf拷贝一份:
cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/conf cp zoo_sample.cfg zoo.cfg
zoo.cfg中集群与单机的配置不同的地方是server.id参数,其他根据实际需要修改配置项:
datadir=/home/hadoop/app/apache-zookeeper-3.6.0-bin/data server.1=172.17.139.160:2555:3555 server.2=172.17.139.161:2555:3555 server.3=172.17.139.162:2555:3555
3.2.3、创建myid文件(每台机器)
在datadir(/home/hadoop/app/apache-zookeeper-3.6.0-bin/data)目录下创建myid文件,文件内容为该zookeeeper在集群中的id,对应上面zoo.cfg中server.后的数字。
172.17.139.160:/home/hadoop/app/apache-zookeeper-3.6.0-bin/data/myid 文件内容:
1
172.17.139.161:/home/hadoop/app/apache-zookeeper-3.6.0-bin/data/myid 文件内容:
2
172.17.139.162:/home/hadoop/app/apache-zookeeper-3.6.0-bin/data/myid 文件内容:
3
3.2.4、启动停止(每台机器)
cd /home/hadoop/app/apache-zookeeper-3.6.0-bin/bin zkserver.sh start #启动 zkserver.sh stop #停止
4、命令行
bin/zkcli.sh可以启动一个客户端连接到zookeeper:
bin/zkcli.sh [-server host:port]
不加server参数,默认连接到本地2181端口;启动后可以输入help/h查看使用方法:
[zk: localhost:2181(connected) 4] help zookeeper -server host:port cmd args addwatch [-m mode] path # optional mode is one of [persistent, persistent_recursive] - default is persistent_recursive addauth scheme auth close config [-c] [-w] [-s] connect host:port create [-s] [-e] [-c] [-t ttl] path [data] [acl] delete [-v version] path deleteall path [-b batch size] delquota [-n|-b] path get [-s] [-w] path getacl [-s] path getallchildrennumber path getephemerals path history listquota path ls [-s] [-w] [-r] path printwatches on|off quit reconfig [-s] [-v version] [[-file path] | [-members serverid=host:port1:port2;port3[,...]*]] | [-add serverid=host:port1:port2;port3[,...]]* [-remove serverid[,...]*] redo cmdno removewatches path [-c|-d|-a] [-l] set [-s] [-v version] path data setacl [-s] [-v version] [-r] path acl setquota -n|-b val path stat [-w] path sync path version command not found: command not found help [zk: localhost:2181(connected) 5]
4.1、列出子节点
ls [-s] [-w] [-r] path
-s:显示节点状态信息
-w:监听该节点
-r:递归查看所有子节点
如:ls /
4.2、创建节点
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
-s:顺序节点
-e:临时节点
-t:设置存活时间(针对持久节点,单位秒);需要开启,默认是关闭的,参见第6小节:ttl(time to life)
acl:权限控制
如:create /test test
4.3、查看节点
get [-s] [-w] path
-s:显示状态
-w:监听该节点
如:get /test
4.4、设置节点
set [-s] [-v version] path data
-s:返回状态信息
-v:设置版本信息
如:set /test testaa
4.4、查看节点状态
stat [-w] path
-w:监视该节点
如:stat /test
4.5、删除节点
delete [-v version] path
-v:指定版本信息
如:delete /test
4.6、设置权限
setacl [-s] [-v version] [-r] path acl
-s:返回状态信息
-v:指定版本信息
-r:递归设置权限
4.7、查看权限
getacl [-s] path
-s:返回状态信息
5、权限控制acl(access control list)
zookeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限,子节点不会继承父节点的权限;acl由三个字段组成:schema:id:permission。
5.1、schema(权限模式)
world 只有一个id,anyone,代表所有人
auth 使用已添加认证的用户认证
digest 使用“用户名:密码”方式认证
ip 使用ip地址认证
x509 使用客户端x500 principal认证
5.2、id(授权对象)
权限赋予的用户或者一个实体
word对应的id只有一个:anyone
digest自定义id,通常为“usernmae:base64(sha-1(username:password))”
ip对应的id为一个ip或ip段,如10.49.196.10、10.49.196.0、24
5.3、permission(权限)
create(c) 可以创建子节点
read(r) 可以读取节点数据及显示子节点列表
write(w) 可以设置节点数据
delete(d) 可以删除子节点(仅下一级节点)
admin(a) 可以设置节点权限
5.4、例子
5.4.1、word例子
setacl /acltest world:anyone:cdrwa
创建节点时如果没有设置权限,这是默认的权限。
5.4.2、auth例子
addauth digest jack:123456 #先添加认证用户 setacl /acltest auth:jack:cdrwa
再开一个终端需先添加认证用户(addauth digest jack:123456)才能访问/actltest
5.4.3、digest例子
echo -n jack:123456 | openssl dgst -binary -sha1 | openssl base64#得到密文tgi9ucnypo5fjjvylkr05nalweg= setacl /acltest digest:jack:tgi9ucnypo5fjjvylkr05nalweg=:cdrwa
添加认证用户(addauth digest jack:123456)后才能访问/actltest。
5.4.4、ip例子
setacl /acltest ip:10.49.196.10:cdrwa
10.49.196.10的机器才能访问/actltest。
6、ttl(time to life)
在zookeeper中,当创建一个persistent或者persistent_sequential节点的时候,可以有选择的给这个节点设置一个存活时间(ttl);当指定存活时间过去以后,如果该节点没有得到更新且没有直接的,就会被自动删除。
默认该特性是关闭的,如果需要设置java系统属性:zookeeper.extendedtypesenabled;由于ttl节点是在3.5.3版本增加的,3.5.4/3.6.0版本并不支持,所以在3.5.4/3.6.0等其他版本还需设置另外一个java系统属性:dzookeeper.emulate353ttlnodes。可以修改zkserver.sh脚本,增加:
-dzookeeper.extendedtypesenabled=true -dzookeeper.emulate353ttlnodes=true
在zkserver.sh脚本里查找到start关键字,在如下图所示的地方增加上面的代码,如何重启zookeeper即可。
7、java操作zookeeper
7.1、原生api操作zookeeper
7.1.1、引入依赖
<dependency> <groupid>org.apache.zookeeper</groupid> <artifactid>zookeeper</artifactid> <version>3.6.0</version> </dependency> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>4.13</version> </dependency>
7.1.2、基本操作
package com.inspur.demo.general.zookeeper; import org.apache.zookeeper.*; import org.apache.zookeeper.data.acl; import org.apache.zookeeper.data.id; import org.apache.zookeeper.data.stat; import org.junit.after; import org.junit.before; import org.junit.test; import java.io.ioexception; import java.util.arrays; import java.util.collections; import java.util.concurrent.countdownlatch; /** * zookeeper基本操作列子 */ public class zookeepercase { //zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181 private static string connectstring = "10.49.196.10:2181"; private static int sessiontimeout = 2 * 1000; private zookeeper zookeeper; @before public void before() { try { zookeeper = new zookeeper(connectstring, sessiontimeout, new watcher() { @override public void process(watchedevent watchedevent) { } }); system.out.println(zookeeper.getstate()); } catch (ioexception e) { e.printstacktrace(); } } @after public void after() throws exception { zookeeper.close(); } /** * 创建节点 */ @test public void create() throws exception { /* * 同步创建持久节点,acl为world:anyone:cdrwa * 等同于该命令:create /javatest/node1 test world:anyone:cdrwa */ zookeeper.create("/javatest/node1", "test".getbytes(), zoodefs.ids.open_acl_unsafe, createmode.persistent); /* * 同步创建持久节点,acl为world:anyone:cr * 等同于该命令:create /javatest/node2 test world:anyone:cr */ zookeeper.create("/javatest/node2", "test".getbytes(), collections.singletonlist(new acl((zoodefs.perms.create + zoodefs.perms.read), zoodefs.ids.anyone_id_unsafe)), createmode.persistent); /* * 异步创建临时顺序节点,acl为ip:127.0.0.1:c * 等同于该命令:create -s -e /javatest/node3 test ip:127.0.0.1:c */ countdownlatch counter = new countdownlatch(1); zookeeper.create("/javatest/node3", "test".getbytes(), collections.singletonlist(new acl(zoodefs.perms.create, new id("ip", "127.0.0.1"))), createmode.ephemeral_sequential ,new asynccallback.stringcallback() { @override public void processresult(int rc, string path, object ctx, string name) { system.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ", name=" + name); counter.countdown(); } }, "上下文对象,异步回调时会传递给callback"); counter.await(); /* * 同步创建持久节点,acl为digest:jack:tgi9ucnypo5fjjvylkr05nalweg=:cdrwa * 等同于该命令:create /javatest/node4 test digest:jack:tgi9ucnypo5fjjvylkr05nalweg=:cdrwa * 添加认证用户(addauth digest jack:123456)后才能访问/javatest/node4 */ zookeeper.create("/javatest/node4", "test".getbytes(), collections.singletonlist(new acl(zoodefs.perms.all, new id("digest", "jack:tgi9ucnypo5fjjvylkr05nalweg="))) , createmode.persistent); /* * 同步创建顺序持久节点,acl为world:anyone:cdrwa,存活时间为5秒 * 等同于该命令:create -s -t 5000 /javatest/node5 test */ stat stat = new stat(); zookeeper.create("/javatest/node5", "test".getbytes(), zoodefs.ids.open_acl_unsafe, createmode.persistent_sequential_with_ttl, stat, 5000); system.out.println(stat); } /** * 获取节点数据 * @throws exception */ @test public void getdata() throws exception { //同步读取数据 stat stat = new stat(); byte[] data = zookeeper.getdata("/javatest/node1", false, stat); system.out.println(new string(data)); system.out.println(stat); //异步读取数据 zookeeper.addauthinfo("digest", "jack:123456".getbytes()); countdownlatch counter = new countdownlatch(1); zookeeper.getdata("/javatest/node4", false, new asynccallback.datacallback() { @override public void processresult(int rc, string path, object ctx, byte[] data, stat stat) { string s = ""; if (data != null) { s = new string(data); } system.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",data=" + s + ",stat=" + stat); counter.countdown(); } }, "上下文对象,异步回调时会传递给callback"); counter.await(); } @test public void setdata() throws exception { //同步设置数据,version为-1表示匹配任何版本 stat stat = zookeeper.setdata("/javatest/node1", "test2".getbytes(), -1); system.out.println(stat); //异步设置数据 zookeeper.addauthinfo("digest", "jack:123456".getbytes()); countdownlatch counter = new countdownlatch(1); zookeeper.setdata("/javatest/node4", "test2".getbytes(), -1, new asynccallback.statcallback(){ @override public void processresult(int rc, string path, object ctx, stat stat) { system.out.println("rc=" + rc + ",path=" + path + ",stat=" + stat); counter.countdown(); } }, "上下文对象,异步回调时会传递给callback"); counter.await(); } @test public void delete() throws exception { //同步删除数据 zookeeper.delete("/javatest/node1", -1); //异步删除数据 countdownlatch counter = new countdownlatch(1); zookeeper.delete("/javatest/node2", -1, new asynccallback.voidcallback(){ @override public void processresult(int rc, string path, object ctx) { system.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx); counter.countdown(); } }, "上下文对象,异步回调时会传递给callback"); counter.await(); } }
7.1.3、监控节点
datamonitor类实现对节点的监控,节点有变化时会回调datamonitorlistener.process方法,该方法由调用方根据业务来实现;watchercase类传入需要的参数来启动datamonitor。
该例子是根据官网例子改造而来,相较官网更简单了些。
package com.inspur.demo.general.zookeeper; import org.apache.zookeeper.*; import org.apache.zookeeper.data.stat; public class datamonitor implements runnable { private zookeeper zk; private datamonitorlistener listener; /** * 节点变化时会回调该方法,把监控变化类型及新数据带过来 */ public interface datamonitorlistener { void process(watchedevent event, byte[] data); } public datamonitor(string hostport, string znode, datamonitorlistener listener) throws exception { this.listener = listener; asynccallback.statcallback callback = new asynccallback.statcallback() { @override public void processresult(int rc, string path, object ctx, stat stat) { system.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",stat=" + stat); switch (rc) { case keeperexception.code.ok: case keeperexception.code.nonode: return; case keeperexception.code.sessionexpired: case keeperexception.code.noauth: close(); return; default: zk.exists(znode, true, this, null); return; } } }; //监视器 watcher watcher = new watcher() { @override public void process(watchedevent event) { system.out.println(event); if (event.gettype() == event.eventtype.none) { switch (event.getstate()) { case syncconnected: break; case expired: close(); break; } } else { try { byte[] bytes = zk.getdata(event.getpath(), false, null); listener.process(event, bytes); } catch (exception e) { e.printstacktrace(); } if (event.getpath() != null && event.getpath().equals(znode)) { //再次监控 zk.exists(znode, true, callback, null); } } } }; zk = new zookeeper("10.49.196.10:2181", 20000, watcher); zk.exists(znode, true, callback, null); } @override public void run() { try { synchronized (this) { wait(); } } catch (interruptedexception e) { e.printstacktrace(); } } public void close() { synchronized (this) { notifyall(); } } }
package com.inspur.demo.general.zookeeper; import org.apache.zookeeper.*; /** * 监视节点样例 */ public class watchercase { public static void main(string[] args) throws exception { datamonitor.datamonitorlistener listener = new datamonitor.datamonitorlistener() { @override public void process(watchedevent event, byte[] data) { //todo:根据实际情况处理 if (event.gettype() == watcher.event.eventtype.nodedatachanged) { system.out.println(new string(data)); } } }; new datamonitor("10.49.196.10:2181", "/watchtest", listener).run(); } }
下一篇: int型参数的SQL注入