欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

10分钟系列-zookeeper入门

程序员文章站 2024-03-19 22:14:58
...

10分钟zookeeper总结。

安装配置

  1. 解压
  2. 修改配置:zoo.cfg, 修改dataDir
  3. 启动:bin/zkServer.sh start

默认是standalone

分布式部署

  1. 同步安装包
  2. 在zkData数据目录下创建一个myid文件
  3. 编辑myid,输入唯一标识,比如2,3,4
  4. 配置zoo.cfg :
    # server.A=host1:B:C
    # A: myid编号
    # B:数据通信端口
    # C:选举通信的端口
    server.2=host1:2888:3888
    server.3=host2:2888:3888
    server.4=host3:2888:3888
    
  5. 启动每台服务器上的zkServer.sh start ,通过zkServer.sh status查看leader还是follower

配置解释

tickTime: 每隔tickTime时间发送一次心跳

initLimit:follower链接leader最多允许的心跳帧次数:
initLimit * tickTime后就会超时

syncLimit: follower和leader同步时最多允许的心跳丢失次数
syncLimit * tickTime后就超时,从服务列表中删除follower

内部原理

选举机制

基于paxos协议。

  1. 半数机制:一般装奇数台

  2. leader与follower,投票选举出来的leader

详情可以查看选举部分代码

都在 Election接口的实现里:

package org.apache.zookeeper.server.quorum;

public interface Election {

    Vote lookForLeader() throws InterruptedException;
    void shutdown();

}

主要是FastLeaderElection类。

节点类型

  • 持久:客户端和服务端断开连接后节点不删除
    1. 持久化目录节点:目录不变
    2. 持久化顺序编号目录节点:断开后会自动编号
  • 短暂:断掉就删除了
    1. 临时目录节点
    2. 临时目录编号节点

Stat结构体

建议先看shell操作。

  1. czxid: 创建节点的事务zxid:zxid唯一,时间戳,代表顺序
  2. ctime: znode被创建的毫秒数
  3. mzxid:最后更新的zxid
  4. mtime:最后修改的毫秒数
  5. pZxid: 最后更新的子节点的zxid
  6. cversion: znode字节点变化号,znode子节点修改的次数
  7. dataversion:数据变化号
  8. aclVersion:访问控制列表的变化号
  9. ephemeralOwner:如果是临时节点,这个代表znode拥有者的session id,如果不是则为0
  10. dataLength:数据长度
  11. numChildren:子节点数量

监听器原理

  1. 客户端:首先有一个main线程
  2. 客户端:main线程里创建2个线程:一个负责连接通信(connector),一个负责监听(listener)
  3. 客户端:通过connector把注册的监听事件发给zk服务器
  4. 服务器:zk的注册监听器列表(服务器)将注册的监听事件添加到列表中
  5. 服务器:zk服务器监听到有数据路径变化,就会将消息发给listener线程
  6. 客户端:listener内部调用process方法

常见监听类型:

  1. 监听节点数据的变化:get path watch
  2. 监听子节点增减的变化: ls path watch

写数据的流程

client----》发送写请求—》server1

  • 如果server1不是leader,则会把请求转发给leader,leader会把请求广播给各个server,比如server1和server2,各个server写成功了就会通知leader
  • 当leader收到大多数(过半)的server写成功的回复后,就认为数据写成功了
  • 写成功后,leader告诉server1写成功了
  • server1再告诉client写成功了

操作实战

shell使用

zkCli.sh 进入客户端

  1. help: 显示命令

  2. ls /:查看当前目录节点

  3. create /china "china" : 创建节点
    create /china/beijing "beijing": 子节点

  4. get /china/beijing : 获取节点的值

  5. create -e /china/japan "japan": 创建短暂节点

  6. create -s /china/cq "cq": 创建带有序号的节点(多次执行)

  7. set /china/beijing "home": 修改节点的值

  8. get /china watch: 监听节点值的变化(注册一次有用一次)
    ls /china watch: 监听节点的变化

  9. delete /china/beijing: 删除节点
    rmr /china: 递归删除

代码实践

准备maven环境

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
        </dependency>
    </dependencies>

创建client

    /**
     * 多个以逗号分隔
     */
    private static String connectString = "localhost:2181";
    /**
     * 超时时间
     */
    private static int sessionTimeout = 2000;
    /**
     * client
     */
    private static ZooKeeper zk;

    @BeforeClass
    public static void init() throws IOException {
        zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {

        });
    }
	
    @AfterClass
    public static void afterClass() throws Exception {
        zk.close();
    }	

代码实现CRUD

    @Test
    public void createNode() throws KeeperException, InterruptedException {
        String path = zk.create("/super", "jimo".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        assertEquals("/super", path);
    }

    @Test
    public void getNode() throws KeeperException, InterruptedException {
        List<String> children = zk.getChildren("/", false);
        children.forEach(System.out::println);
    }

    @Test
    public void getNodeWatch() throws KeeperException, InterruptedException {
        zk.getChildren("/", watcher -> {
            try {
                getNode();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        TimeUnit.MINUTES.sleep(1L);
    }

    @Test
    public void nodeExist() throws KeeperException, InterruptedException {
        Stat exists = zk.exists("/super", false);
        assertNotNull(exists);
        Stat existNon = zk.exists("/hehe", false);
        assertNull(existNon);
    }