Java实现ZooKeeper的zNode监控
上一篇文章已经完成了zookeeper的基本搭建和使用的介绍,现在开始用代码说话。参考 https://zookeeper.apache.org/doc/current/javaexample.html ,但对场景和代码都做了简化,只实现基本的watcher功能。
1 场景设计
目的是体验zookeeper的watcher功能。程序监控zookeeper的/watcher节点数据变化,当创建或修改数据时,控制台打印当前的数据内容和版本号;当/watcher被删除时,程序退出。
/watcher的创建、修改和删除操作,使用控制台或zkui操作。
2 搭建maven项目
代码相对比较简单,就不用springboot这个大杀器了,使用一个普通的maven项目即可。
zookeeper客户端使用官方提供的java库,org.apache.zookeeper: zookeeper: 3.5.5。日志框架使用习惯的slf4j+log4j2,zookeeper缺省使用的是log4j v1,因此在引入的时候需要excludes。另外,使用了lombok来简化一些代码。
以下是pom.xml的内容
<?xml version="1.0" encoding="utf-8"?> <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <groupid>tech.codestory.research</groupid> <artifactid>zookeeper</artifactid> <version>1.0.0-snapshot</version> <dependencies> <dependency> <groupid>org.apache.zookeeper</groupid> <artifactid>zookeeper</artifactid> <version>3.5.5</version> <exclusions> <exclusion> <groupid>log4j</groupid> <artifactid>log4j</artifactid> </exclusion> <exclusion> <groupid>org.slf4j</groupid> <artifactid>slf4j-log4j12</artifactid> </exclusion> </exclusions> </dependency> <dependency> <groupid>org.apache.logging.log4j</groupid> <artifactid>log4j-core</artifactid> <version>2.12.1</version> </dependency> <dependency> <groupid>org.apache.logging.log4j</groupid> <artifactid>log4j-api</artifactid> <version>2.12.1</version> </dependency> <dependency> <groupid>org.apache.logging.log4j</groupid> <artifactid>log4j-web</artifactid> <version>2.12.1</version> </dependency> <dependency> <groupid>org.apache.logging.log4j</groupid> <artifactid>log4j-slf4j-impl</artifactid> <version>2.12.1</version> </dependency> <dependency> <groupid>org.slf4j</groupid> <artifactid>slf4j-api</artifactid> <version>1.7.28</version> </dependency> <dependency> <groupid>org.slf4j</groupid> <artifactid>slf4j-ext</artifactid> <version>1.7.28</version> </dependency> <dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> <version>1.18.8</version> <scope>provided</scope> </dependency> </dependencies> </project> |
3 log4j2.xml
在项目的 src/main/resources 下创建一个文件 log4j2.xml,内容为
<?xml version="1.0" encoding="utf-8"?> <configuration status="debug" name="codestorylogger"> <appenders> <console name="stdout"> <thresholdfilter level="trace" onmatch="accept" onmismatch="deny"/> <patternlayout pattern="%d{hh:mm:ss:sss} [%p] - %c{1}.%m(%l) - %m%n"/> </console> </appenders> <thresholdfilter level="trace"/> <loggers> <logger name="org.apache.zookeeper.clientcnxn" level="error" additivity="false"> <appender-ref ref="stdout" /> </logger> <logger name="org.apache.zookeeper" level="trace" additivity="false"> <appender-ref ref="stdout"/> </logger> <logger name="tech.codestory" level="trace" additivity="false"> <appender-ref ref="stdout"/> </logger> <root level="warn"> <appender-ref ref="stdout"/> </root> </loggers> </configuration> |
4 创建zookeeper连接
创建连接代码比较简单,只需要创建 zookeeper对象就行,
zookeeper构造函数的定义
/** * 创建一个 zookeeper 客户端对象 * @param connectstring 逗号分隔的 host:port 字符串, * 单点如 127.0.0.1:2181, * 集群如 192.168.5.128:2181,192.168.5.129:2181,192.168.5.130:2181, * 还可以指定根节点,如 127.0.0.1:2181/foo/bar * @param sessiontimeout 毫秒为单位的超时时间 * @param watcher watcher对象,用于接收 matcherevent * @throws ioexception 网络错误时抛出异常 * @throws illegalargumentexception 如果root路径设置错误 */ public zookeeper(string connectstring, int sessiontimeout, watcher watcher) throws ioexception; |
写一段测试代码,创建zk对象后判断某一个znode是否存在。
public class zookeeperwatcher implements watcher { /** zookeeper的客户端连接 */ zookeeper zk;
public zookeeperwatcher(string hostport, string znode) throws keeperexception, ioexception { zk = new zookeeper(hostport, 3000, this); try { stat exists = zk.exists(znode, true); if(exists == null){ log.info(“{} 不存在”, znode) } } catch (interruptedexception e) { log.error("interruptedexception", e); } } } |
运行这段代码,发现会抛异常
java.net.socketexception: socket is not connected at sun.nio.ch.net.translatetosocketexception(net.java:162) ~[?:?] at sun.nio.ch.net.translateexception(net.java:196) ~[?:?] at sun.nio.ch.net.translateexception(net.java:202) ~[?:?] at sun.nio.ch.socketadaptor.shutdowninput(socketadaptor.java:400) ~[?:?] at org.apache.zookeeper.clientcnxnsocketnio.cleanup(clientcnxnsocketnio.java:198) [zookeeper-3.5.5.jar:3.5.5] at org.apache.zookeeper.clientcnxn$sendthread.cleanup(clientcnxn.java:1338) [zookeeper-3.5.5.jar:3.5.5] at org.apache.zookeeper.clientcnxn$sendthread.cleanandnotifystate(clientcnxn.java:1276) [zookeeper-3.5.5.jar:3.5.5] at org.apache.zookeeper.clientcnxn$sendthread.run(clientcnxn.java:1254) [zookeeper-3.5.5.jar:3.5.5] caused by: java.nio.channels.notyetconnectedexception at sun.nio.ch.socketchannelimpl.shutdowninput(socketchannelimpl.java:917) ~[?:?] at sun.nio.ch.socketadaptor.shutdowninput(socketadaptor.java:398) ~[?:?] ... 4 more |
notyetconnectedexception的字面意思是连接还没有创建好,网络搜索了一下,建立连接需要一些时间,创建zk对象后马上调用exists命令,这时候连接还没有创建好,就会抛异常。zookeeper在连接建立成功之后,会发送一个watchedevent事件,我们可以利用这个事件完成建立连接的过程。修改后的代码如下,顺便添加了slf4j-ext中的profiler,用于记录所消耗的时间。
public class zookeeperwatcher implements watcher { /** 等待连接建立成功的信号 */ private countdownlatch connectedsemaphore = new countdownlatch(1);
/** zookeeper的客户端连接 */ zookeeper zk;
public zookeeperwatcher(string hostport, string znode) throws keeperexception, ioexception { profiler profiler = new profiler("连接到zookeeper"); profiler.start("开始链接"); zk = new zookeeper(hostport, 3000, this); try { profiler.start("等待连接成功的event"); connectedsemaphore.await(); stat exists = zk.exists(znode, true); if(exists == null){ log.info(“{} 不存在”, znode) } } catch (interruptedexception e) { log.error("interruptedexception", e); } profiler.stop(); profiler.setlogger(log); profiler.log(); }
/** 收到zookeeper的watchedevent */ @override public void process(watchedevent event) { log.info("event = {}", event); if (event.eventtype.none.equals(event.gettype())) { // 连接状态发生变化 if (event.keeperstate.syncconnected.equals(event.getstate())) { // 连接建立成功 connectedsemaphore.countdown(); } } } } |
修改代码之后的执行记录日志如下,可以看到等待连接成功的event耗时9秒多。网络上有文章说关闭防火墙可以秒连,但我测试过,没发现有什么变化,使用systemctl stop firewalld之后重新执行程序,仍然需要9秒多。
[info] - zookeeperwatcher.process(61) - event = watchedevent state:syncconnected type:none path:null [debug] - zookeeperwatcher.log(201) - + profiler [连接到zookeeper] |-- elapsed time [开始链接] 78.912 milliseconds. |-- elapsed time [等待连接成功的event] 9330.606 milliseconds. |-- total [连接到zookeeper] 9409.926 milliseconds.
[info] - zookeeperwatcher.readnodedata(95) - /watcher 不存在 |
5 读取watchedevent
前面的代码,只是处理了建立连接成功时的event,下面再来看看读取数据的过程。关键代码如下:
if (event.eventtype.nodedatachanged.equals(event.gettype()) || event.eventtype.nodecreated.equals(event.gettype())) { string path = event.getpath(); if (path != null && path.equals(znode)) { // 节点数据被修改 readnodedata(); } }
/** 读节点数据 */ private void readnodedata() { try { stat stat = new stat(); byte[] data = zk.getdata(znode, true, stat); if (data != null) { log.info("{}, value={}, version={}", znode, new string(data), stat.getversion()); } } catch (keeperexception e) { log.info("{} 不存在", znode); } catch (interruptedexception e) { log.error("interruptedexception", e); } } |
当接收到创建节点和修改节点的watchedevent,都会将数据读出并打印在控制台。
6 调整后的完整程序清单
对前面的代码做了部分调整,同时添加了退出系统的机制:节点被删除。
package tech.codestory.zookeeper.watcher;
import java.io.ioexception; import java.util.concurrent.countdownlatch; import org.apache.zookeeper.*; import org.apache.zookeeper.data.stat; import org.slf4j.profiler.profiler; import lombok.extern.slf4j.slf4j;
/** * 用于测试 zookeeper的 watchedevent用法 * @author code story * @date 2019/8/13 */ @slf4j public class zookeeperwatcher implements watcher, runnable { /** 等待连接建立成功的信号 */ private countdownlatch connectedsemaphore = new countdownlatch(1); /** 退出系统的信号 */ static integer quitsemaphore = integer.valueof(-1);
string znode; zookeeper zk;
public zookeeperwatcher(string hostport, string znode) throws keeperexception, ioexception { this.znode = znode;
profiler profiler = new profiler("连接到zookeeper"); profiler.start("开始链接"); zk = new zookeeper(hostport, 3000, this); try { profiler.start("等待连接成功的event"); connectedsemaphore.await(); } catch (interruptedexception e) { log.error("interruptedexception", e); } profiler.stop(); profiler.setlogger(log); profiler.log();
// 先读当前的数据 readnodedata(); }
/** 收到zookeeper的watchedevent */ @override public void process(watchedevent event) { log.info("event = {}", event); if (event.eventtype.none.equals(event.gettype())) { // 连接状态发生变化 if (event.keeperstate.syncconnected.equals(event.getstate())) { // 连接建立成功 connectedsemaphore.countdown(); } } else if (event.eventtype.nodedatachanged.equals(event.gettype()) || event.eventtype.nodecreated.equals(event.gettype())) { string path = event.getpath(); if (path != null && path.equals(znode)) { // 节点数据被修改 readnodedata(); } } else if (event.eventtype.nodedeleted.equals(event.gettype())) { string path = event.getpath(); if (path != null && path.equals(znode)) { synchronized (quitsemaphore) { // 节点被删除,通知退出线程 quitsemaphore.notify(); } } } }
/** 读节点数据 */ private void readnodedata() { try { stat stat = new stat(); byte[] data = zk.getdata(znode, true, stat); if (data != null) { log.info("{}, value={}, version={}", znode, new string(data), stat.getversion()); } } catch (keeperexception e) { log.info("{} 不存在", znode); try { // 目的是添加watcher zk.exists(znode, true); } catch (keeperexception ex) { } catch (interruptedexception ex) { } } catch (interruptedexception e) { log.error("interruptedexception", e); } }
@override public void run() { try { synchronized (quitsemaphore) { quitsemaphore.wait(); log.info("{} 被删除,退出", znode); } } catch (interruptedexception e) { log.error("interruptedexception", e); } }
/** 主程序,代码中写死了server地址和znode名,也可以改成从args中读取 */ public static void main(string[] args) { string hostport = "192.168.5.128:2181"; string znode = "/watcher"; try { new zookeeperwatcher(hostport, znode).run(); } catch (exception e) { log.error("new zookeeperexecutor()", e); } } } |
做一个测试,应用启动后创建节点,修改多次节点,最后删除节点,日志输出如下:
10:13:31:979 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:none path:null 10:13:31:982 [debug] - zookeeperwatcher.log(201) - + profiler [连接到zookeeper] |-- elapsed time [开始链接] 210.193 milliseconds. |-- elapsed time [等待连接成功的event] 9385.467 milliseconds. |-- total [连接到zookeeper] 9596.196 milliseconds.
10:13:31:996 [info] - zookeeperwatcher.readnodedata(84) - /watcher 不存在 10:15:43:451 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodecreated path:/watcher 10:15:43:463 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 00, version=0 10:15:50:906 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher 10:15:50:910 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 01, version=1 10:15:56:004 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher 10:15:56:007 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 02, version=2 10:16:00:246 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher 10:16:00:249 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 03, version=3 10:16:06:399 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher 10:16:06:402 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 10, version=4 10:16:10:217 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher 10:16:10:220 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 11, version=5 10:16:14:444 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher 10:16:14:447 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 12, version=6 10:16:20:118 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedeleted path:/watcher 10:16:20:118 [info] - zookeeperwatcher.run(101) - /watcher 被删除,退出 |