欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

zookeeper源码解析-命令行解析

程序员文章站 2022-07-16 13:01:05
...

zk版本:3.5.6

1.引入

单机模式下zk服务启动前面已经介绍了,现在将介绍如何通过客户端的形式访问zk服务。其实访问zk服务有很多方式,可以通过java api等,但是这种方式稍微有点复杂,我们现在通过最简单的一种方式实现zk的客户端,即zk自带的cli。

2.命令行使用

在安装zookeeper之后,我们会使用zkStart.sh的命令启动zk服务,接着会使用zkCli.sh -server xxx:xx的命令连接到zk服务,而zkCli.sh就会创建一个zk客户端环境。
在zkCli.sh脚本中,最终会调用org.apache.zookeeper.ZooKeeperMain#main方法,这个类就是命令行形式的zk客户端。

2.1 zookeeper cli解析

ZookeeperMain.java
-------------------
    /**
     * 启动zk cli客户端
     * @param args 一般是-server localhost:2181
     */
    public static void main(String args[]) throws CliException, IOException, InterruptedException
    {
       // 创建zk客户端
        ZooKeeperMain main = new ZooKeeperMain(args);
        main.run();
    }

    public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
        // 解析参数,主要是解析-server/-timeout/ -r
        cl.parseOptions(args);

        // 创建连接zk的client对象
        connectToZK(cl.getOption("server"));
    }

cli启动主要流程:

  1. 解析参数
  2. 创建zk客户端对象,连接zk服务
  3. 监听用户输入,运行用户客户端命令

在这个流程中,比较复杂的是创建zk客户端和运行用户命令。
创建zk客户端会在后面的博客中介绍,现在直接看运行用户命令。

ZookeeperMain.java
---------------------
 /**
     * 运行命令行
     * @throws CliException
     * @throws IOException
     * @throws InterruptedException
     */
    void run() throws CliException, IOException, InterruptedException {
                // 创建控制台输入读取对象
                Class<?> consoleC = Class.forName("jline.console.ConsoleReader");
				// 自动补全对象
                Class<?> completorC =
                    Class.forName("org.apache.zookeeper.JLineZNodeCompleter");

                System.out.println("JLine support is enabled");

                Object console =
                    consoleC.getConstructor().newInstance();

                Object completor =
                    completorC.getConstructor(ZooKeeper.class).newInstance(zk);
                Method addCompletor = consoleC.getMethod("addCompleter",
                        Class.forName("jline.console.completer.Completer"));
                addCompletor.invoke(console, completor);

                String line;
                Method readLine = consoleC.getMethod("readLine", String.class);
                while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
                    // 执行命令行
                    executeLine(line);
                }
             ..... // 异常处理省略
    }

zk是通过jline实现控制台输入读取,自动补全等功能,并且最终将一行数据通过executeLine(line)执行,该方法的核心代码也只有一行

watch = processZKCmd(co);

/**
 * 执行命令
 */
 protected boolean processZKCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
        String[] args = co.getArgArray();
        String cmd = co.getCommand();
        if (args.length < 1) {
			// 打印使用提示
            usage();
            throw new MalformedCommandException("No command entered");
        }
        // 命令列表中没有找到
        if (!commandMap.containsKey(cmd)) {
			// 打印使用提示
            usage();
            throw new CommandNotFoundException("Command not found " + cmd);
        }
        
        boolean watch = false;
        LOG.debug("Processing " + cmd);

        // 处理命令quit,redo,history,printwatches,connect等命令
        if (cmd.equals("quit")) {
            zk.close();
            System.exit(exitCode);
        } else if (cmd.equals("redo") && args.length >= 2) {
            Integer i = Integer.decode(args[1]);
            if (commandCount <= i || i < 0) { // don't allow redoing this redo
                throw new MalformedCommandException("Command index out of range");
            }
            cl.parseCommand(history.get(i));
            if (cl.getCommand().equals("redo")) {
                throw new MalformedCommandException("No redoing redos");
            }
            history.put(commandCount, history.get(i));
            processCmd(cl);
        } else if (cmd.equals("history")) {
            for (int i = commandCount - 10; i <= commandCount; ++i) {
                if (i < 0) continue;
                System.out.println(i + " - " + history.get(i));
            }
        } else if (cmd.equals("printwatches")) {
            if (args.length == 1) {
                System.out.println("printwatches is " + (printWatches ? "on" : "off"));
            } else {
                printWatches = args[1].equals("on");
            }
        } else if (cmd.equals("connect")) {
            if (args.length >= 2) {
                connectToZK(args[1]);
            } else {
                connectToZK(host);
            }
        }
        
        // zk服务是否正常
        if (zk == null || !zk.getState().isAlive()) {
            System.out.println("Not connected");
            return false;
        }
        
        // 获取执行命令对象
        CliCommand cliCmd = commandMapCli.get(cmd);
        if(cliCmd != null) {
            cliCmd.setZk(zk);
            // 暴露方法parse和exec用于处理不同类型的请求
            watch = cliCmd.parse(args).exec();
        } else if (!commandMap.containsKey(cmd)) {
             usage();
        }
        return watch;
    }

zk执行命令行的过程:

  1. 校验命令和命令参数
  2. 单独处理命令quit,redo,history,printwatches,connect等命令
  3. 根据命令名称获取对象的命令对象,通过parse方法解析命令参数,再使用exec方法执行命令。

根据命令行的执行描述,可知最终会调用不同的命令对象的parse和exec完成客户端请求。

2.2 命令解析和执行分析

CliCommand的所有子类就是zk能提供的所有命令,具体如下:
zookeeper源码解析-命令行解析
为了说明问题,只以DeleteCommand为例。


   // delete命令示例: delete [-v version] path
   public DeleteCommand() {
        super("delete", "[-v version] path");
    }
    
    public CliCommand parse(String[] cmdArgs) throws CliParseException {
        Parser parser = new PosixParser();
        try {
            // 解析options
            cl = parser.parse(options, cmdArgs);
        } catch (ParseException ex) {
            throw new CliParseException(ex);
        }
        args = cl.getArgs();
        if (args.length < 2) {
            throw new CliParseException(getUsageStr());
        }
        
        retainCompatibility(cmdArgs);

        return this;
    }
    
    // 运行delete
     public boolean exec() throws CliException {
        String path = args[1];
        int version;
        if (cl.hasOption("v")) {
            version = Integer.parseInt(cl.getOptionValue("v"));
        } else {
            version = -1;
        }
        
        try {
            zk.delete(path, version);
        } catch (IllegalArgumentException ex) {
            throw new MalformedPathException(ex.getMessage());
        } catch(KeeperException|InterruptedException ex) {
            throw new CliWrapperException(ex);
        }
        return false;
    }

delete命令执行流程:

  1. 解析delete命令参数,当delete指定version之后可以通过cl.getOptionValue(“v”)获取
  2. 调用zk.delete方法删除节点,构造DeleteRequest对象发送到zk服务端,并将返回元数据封装成ReplyHeader对象(几乎所有客户端的请求都是使用这样的方式和服务端交互,唯一不同的是封装的请求对象不同)。具体代码如下。
    public void delete(final String path, int version)
        throws InterruptedException, KeeperException
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);
        final String serverPath;
        if (clientPath.equals("/")) {
            serverPath = clientPath;
        } else {
            serverPath = prependChroot(clientPath);
        }

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.delete);
        DeleteRequest request = new DeleteRequest();
        request.setPath(serverPath);
        request.setVersion(version);
        ReplyHeader r = cnxn.submitRequest(h, request, null, null);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
    }

cnxn.submitRequest这个是客户端和服务端交互的入口,在后面会介绍。