欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java实现ZooKeeper的zNode监控

程序员文章站 2022-03-20 16:53:22
上一篇文章已经完成了ZooKeeper的基本搭建和使用的介绍,现在开始用代码说话。这个例子只实现基本的Watcher功能:当创建或修改数据时,控制台打印当前的数据内容和版本号;当节点被删除时,程序退出。 ......

上一篇文章已经完成了zookeeper的基本搭建和使用的介绍,现在开始用代码说话。参考 https://zookeeper.apache.org/doc/current/javaexample.html ,但对场景和代码都做了简化,只实现基本的watcher功能。

1   场景设计

目的是体验zookeeper的watcher功能。程序监控zookeeper的/watcher节点数据变化,当创建或修改数据时,控制台打印当前的数据内容和版本号;当/watcher被删除时,程序退出。

/watcher的创建、修改和删除操作,使用控制台或zkui操作。

2   搭建maven项目

代码相对比较简单,就不用springboot这个大杀器了,使用一个普通的maven项目即可。

zookeeper客户端使用官方提供的java库,org.apache.zookeeper: zookeeper: 3.5.5。日志框架使用习惯的slf4j+log4j2,zookeeper缺省使用的是log4j v1,因此在引入的时候需要excludes。另外,使用了lombok来简化一些代码。

以下是pom.xml的内容

<?xml version="1.0" encoding="utf-8"?>

<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"

     xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelversion>4.0.0</modelversion>

  <groupid>tech.codestory.research</groupid>

  <artifactid>zookeeper</artifactid>

  <version>1.0.0-snapshot</version>

  <dependencies>

    <dependency>

      <groupid>org.apache.zookeeper</groupid>

      <artifactid>zookeeper</artifactid>

      <version>3.5.5</version>

      <exclusions>

        <exclusion>

          <groupid>log4j</groupid>

          <artifactid>log4j</artifactid>

        </exclusion>

        <exclusion>

          <groupid>org.slf4j</groupid>

          <artifactid>slf4j-log4j12</artifactid>

        </exclusion>

      </exclusions>

    </dependency>

    <dependency>

      <groupid>org.apache.logging.log4j</groupid>

      <artifactid>log4j-core</artifactid>

      <version>2.12.1</version>

    </dependency>

    <dependency>

      <groupid>org.apache.logging.log4j</groupid>

      <artifactid>log4j-api</artifactid>

      <version>2.12.1</version>

    </dependency>

    <dependency>

      <groupid>org.apache.logging.log4j</groupid>

      <artifactid>log4j-web</artifactid>

      <version>2.12.1</version>

    </dependency>

    <dependency>

      <groupid>org.apache.logging.log4j</groupid>

      <artifactid>log4j-slf4j-impl</artifactid>

      <version>2.12.1</version>

    </dependency>

    <dependency>

      <groupid>org.slf4j</groupid>

      <artifactid>slf4j-api</artifactid>

      <version>1.7.28</version>

    </dependency>

    <dependency>

      <groupid>org.slf4j</groupid>

      <artifactid>slf4j-ext</artifactid>

      <version>1.7.28</version>

    </dependency>

    <dependency>

      <groupid>org.projectlombok</groupid>

      <artifactid>lombok</artifactid>

      <version>1.18.8</version>

      <scope>provided</scope>

    </dependency>

  </dependencies>

</project>

 

3   log4j2.xml

在项目的 src/main/resources 下创建一个文件 log4j2.xml,内容为

<?xml version="1.0" encoding="utf-8"?>

<configuration status="debug" name="codestorylogger">

  <appenders>

    <console name="stdout">

      <thresholdfilter level="trace" onmatch="accept" onmismatch="deny"/>

      <patternlayout pattern="%d{hh:mm:ss:sss} [%p] - %c{1}.%m(%l) - %m%n"/>

    </console>

  </appenders>

  <thresholdfilter level="trace"/>

  <loggers>

    <logger name="org.apache.zookeeper.clientcnxn" level="error" additivity="false">

      <appender-ref ref="stdout" />

    </logger>

    <logger name="org.apache.zookeeper" level="trace" additivity="false">

      <appender-ref ref="stdout"/>

    </logger>

    <logger name="tech.codestory" level="trace" additivity="false">

      <appender-ref ref="stdout"/>

    </logger>

    <root level="warn">

      <appender-ref ref="stdout"/>

    </root>

  </loggers>

</configuration>

 

4   创建zookeeper连接

创建连接代码比较简单,只需要创建 zookeeper对象就行,

zookeeper构造函数的定义

/**

 * 创建一个 zookeeper 客户端对象

 * @param connectstring 逗号分隔的 host:port 字符串,

 *    单点如 127.0.0.1:2181,

 *    集群如 192.168.5.128:2181,192.168.5.129:2181,192.168.5.130:2181,

 *    还可以指定根节点,如 127.0.0.1:2181/foo/bar

 * @param sessiontimeout 毫秒为单位的超时时间

 * @param watcher watcher对象,用于接收 matcherevent

 * @throws ioexception 网络错误时抛出异常

 * @throws illegalargumentexception 如果root路径设置错误

 */

public zookeeper(string connectstring, int sessiontimeout, watcher watcher)

  throws ioexception;

写一段测试代码,创建zk对象后判断某一个znode是否存在。

public class zookeeperwatcher implements watcher {

  /** zookeeper的客户端连接 */

  zookeeper zk;

 

  public zookeeperwatcher(string hostport, string znode) throws keeperexception, ioexception {

    zk = new zookeeper(hostport, 3000, this);

    try {

      stat exists = zk.exists(znode, true);

      if(exists == null){

        log.info(“{} 不存在”, znode)

      }

    } catch (interruptedexception e) {

      log.error("interruptedexception", e);

    }

  }

}

运行这段代码,发现会抛异常

java.net.socketexception: socket is not connected

  at sun.nio.ch.net.translatetosocketexception(net.java:162) ~[?:?]

  at sun.nio.ch.net.translateexception(net.java:196) ~[?:?]

  at sun.nio.ch.net.translateexception(net.java:202) ~[?:?]

  at sun.nio.ch.socketadaptor.shutdowninput(socketadaptor.java:400) ~[?:?]

  at org.apache.zookeeper.clientcnxnsocketnio.cleanup(clientcnxnsocketnio.java:198) [zookeeper-3.5.5.jar:3.5.5]

  at org.apache.zookeeper.clientcnxn$sendthread.cleanup(clientcnxn.java:1338) [zookeeper-3.5.5.jar:3.5.5]

  at org.apache.zookeeper.clientcnxn$sendthread.cleanandnotifystate(clientcnxn.java:1276) [zookeeper-3.5.5.jar:3.5.5]

  at org.apache.zookeeper.clientcnxn$sendthread.run(clientcnxn.java:1254) [zookeeper-3.5.5.jar:3.5.5]

caused by: java.nio.channels.notyetconnectedexception

  at sun.nio.ch.socketchannelimpl.shutdowninput(socketchannelimpl.java:917) ~[?:?]

  at sun.nio.ch.socketadaptor.shutdowninput(socketadaptor.java:398) ~[?:?]

  ... 4 more

notyetconnectedexception的字面意思是连接还没有创建好,网络搜索了一下,建立连接需要一些时间,创建zk对象后马上调用exists命令,这时候连接还没有创建好,就会抛异常。zookeeper在连接建立成功之后,会发送一个watchedevent事件,我们可以利用这个事件完成建立连接的过程。修改后的代码如下,顺便添加了slf4j-ext中的profiler,用于记录所消耗的时间。

public class zookeeperwatcher implements watcher {

  /** 等待连接建立成功的信号 */

  private countdownlatch connectedsemaphore = new countdownlatch(1);

 

  /** zookeeper的客户端连接 */

  zookeeper zk;

 

  public zookeeperwatcher(string hostport, string znode) throws keeperexception, ioexception {

    profiler profiler = new profiler("连接到zookeeper");

    profiler.start("开始链接");

    zk = new zookeeper(hostport, 3000, this);

    try {

          profiler.start("等待连接成功的event");

      connectedsemaphore.await();

      stat exists = zk.exists(znode, true);

      if(exists == null){

        log.info(“{} 不存在”, znode)

      }

    } catch (interruptedexception e) {

      log.error("interruptedexception", e);

    }

    profiler.stop();

    profiler.setlogger(log);

    profiler.log();

  }

 

  /** 收到zookeeper的watchedevent */

  @override

  public void process(watchedevent event) {

    log.info("event = {}", event);

    if (event.eventtype.none.equals(event.gettype())) {

      // 连接状态发生变化

      if (event.keeperstate.syncconnected.equals(event.getstate())) {

        // 连接建立成功

        connectedsemaphore.countdown();

      }

    }

  }

}

修改代码之后的执行记录日志如下,可以看到等待连接成功的event耗时9秒多。网络上有文章说关闭防火墙可以秒连,但我测试过,没发现有什么变化,使用systemctl stop firewalld之后重新执行程序,仍然需要9秒多。

[info] - zookeeperwatcher.process(61) - event = watchedevent state:syncconnected type:none path:null

[debug] - zookeeperwatcher.log(201) -

+ profiler [连接到zookeeper]

|-- elapsed time                   [开始链接]    78.912 milliseconds.

|-- elapsed time           [等待连接成功的event]  9330.606 milliseconds.

|-- total                  [连接到zookeeper]  9409.926 milliseconds.

 

[info] - zookeeperwatcher.readnodedata(95) - /watcher 不存在

 

5   读取watchedevent

前面的代码,只是处理了建立连接成功时的event,下面再来看看读取数据的过程。关键代码如下:

if (event.eventtype.nodedatachanged.equals(event.gettype())

    || event.eventtype.nodecreated.equals(event.gettype())) {

  string path = event.getpath();

  if (path != null && path.equals(znode)) {

    // 节点数据被修改

    readnodedata();

  }

}

 

/** 读节点数据 */

private void readnodedata() {

  try {

    stat stat = new stat();

    byte[] data = zk.getdata(znode, true, stat);

    if (data != null) {

      log.info("{}, value={}, version={}", znode, new string(data), stat.getversion());

    }

  } catch (keeperexception e) {

    log.info("{} 不存在", znode);   

  } catch (interruptedexception e) {

    log.error("interruptedexception", e);

  }

}

当接收到创建节点和修改节点的watchedevent,都会将数据读出并打印在控制台。

6   调整后的完整程序清单

对前面的代码做了部分调整,同时添加了退出系统的机制:节点被删除。

package tech.codestory.zookeeper.watcher;

 

import java.io.ioexception;

import java.util.concurrent.countdownlatch;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.stat;

import org.slf4j.profiler.profiler;

import lombok.extern.slf4j.slf4j;

 

/**

 * 用于测试 zookeeper的 watchedevent用法

 * @author code story

 * @date 2019/8/13

 */

@slf4j

public class zookeeperwatcher implements watcher, runnable {

  /** 等待连接建立成功的信号 */

  private countdownlatch connectedsemaphore = new countdownlatch(1);

  /** 退出系统的信号 */

  static integer quitsemaphore = integer.valueof(-1);

 

  string znode;

  zookeeper zk;

 

  public zookeeperwatcher(string hostport, string znode) throws keeperexception, ioexception {

    this.znode = znode;

 

    profiler profiler = new profiler("连接到zookeeper");

    profiler.start("开始链接");

    zk = new zookeeper(hostport, 3000, this);

    try {

      profiler.start("等待连接成功的event");

      connectedsemaphore.await();

    } catch (interruptedexception e) {

      log.error("interruptedexception", e);

    }

    profiler.stop();

    profiler.setlogger(log);

    profiler.log();

 

    // 先读当前的数据

    readnodedata();

  }

 

  /** 收到zookeeper的watchedevent */

  @override

  public void process(watchedevent event) {

    log.info("event = {}", event);

    if (event.eventtype.none.equals(event.gettype())) {

      // 连接状态发生变化

      if (event.keeperstate.syncconnected.equals(event.getstate())) {

        // 连接建立成功

        connectedsemaphore.countdown();

      }

    } else if (event.eventtype.nodedatachanged.equals(event.gettype())

        || event.eventtype.nodecreated.equals(event.gettype())) {

      string path = event.getpath();

      if (path != null && path.equals(znode)) {

        // 节点数据被修改

        readnodedata();

      }

    } else if (event.eventtype.nodedeleted.equals(event.gettype())) {

      string path = event.getpath();

      if (path != null && path.equals(znode)) {

        synchronized (quitsemaphore) {

          // 节点被删除,通知退出线程

          quitsemaphore.notify();

        }

      }

    }

  }

 

  /** 读节点数据 */

  private void readnodedata() {

    try {

      stat stat = new stat();

      byte[] data = zk.getdata(znode, true, stat);

      if (data != null) {

        log.info("{}, value={}, version={}", znode, new string(data), stat.getversion());

      }

    } catch (keeperexception e) {

      log.info("{} 不存在", znode);

      try {

        // 目的是添加watcher

        zk.exists(znode, true);

      } catch (keeperexception ex) {

      } catch (interruptedexception ex) {

      }

    } catch (interruptedexception e) {

      log.error("interruptedexception", e);

    }

  }

 

  @override

  public void run() {

    try {

      synchronized (quitsemaphore) {

        quitsemaphore.wait();

        log.info("{} 被删除,退出", znode);

      }

    } catch (interruptedexception e) {

      log.error("interruptedexception", e);

    }

  }

 

  /** 主程序,代码中写死了server地址和znode名,也可以改成从args中读取 */

  public static void main(string[] args) {

    string hostport = "192.168.5.128:2181";

    string znode = "/watcher";

    try {

      new zookeeperwatcher(hostport, znode).run();

    } catch (exception e) {

      log.error("new zookeeperexecutor()", e);

    }

  }

}

做一个测试,应用启动后创建节点,修改多次节点,最后删除节点,日志输出如下:

10:13:31:979 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:none path:null

10:13:31:982 [debug] - zookeeperwatcher.log(201) -

+ profiler [连接到zookeeper]

|-- elapsed time                   [开始链接]   210.193 milliseconds.

|-- elapsed time           [等待连接成功的event]  9385.467 milliseconds.

|-- total                  [连接到zookeeper]  9596.196 milliseconds.

 

10:13:31:996 [info] - zookeeperwatcher.readnodedata(84) - /watcher 不存在

10:15:43:451 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodecreated path:/watcher

10:15:43:463 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 00, version=0

10:15:50:906 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher

10:15:50:910 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 01, version=1

10:15:56:004 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher

10:15:56:007 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 02, version=2

10:16:00:246 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher

10:16:00:249 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 03, version=3

10:16:06:399 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher

10:16:06:402 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 10, version=4

10:16:10:217 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher

10:16:10:220 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 11, version=5

10:16:14:444 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedatachanged path:/watcher

10:16:14:447 [info] - zookeeperwatcher.readnodedata(81) - /watcher, value=hello zk 12, version=6

10:16:20:118 [info] - zookeeperwatcher.process(50) - event = watchedevent state:syncconnected type:nodedeleted path:/watcher

10:16:20:118 [info] - zookeeperwatcher.run(101) - /watcher 被删除,退出