欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Hadoop学习(5)-zookeeper的安装和命令行,java操作

程序员文章站 2022-03-21 21:44:26
zookeeper是干嘛的呢 Zookeeper的作用1.可以为客户端管理少量的数据kvkey:是以路径的形式表示的,那就意味着,各key之间有父子关系,比如/ 是顶层key用户建的key只能在/ 下作为子节点,比如建一个key: /aa 这个key可以带value数据也可以建一个key: /bb也 ......

zookeeper是干嘛的呢

zookeeper的作用
1.可以为客户端管理少量的数据kv
key:是以路径的形式表示的,那就意味着,各key之间有父子关系,比如
/ 是顶层key
用户建的key只能在/ 下作为子节点,比如建一个key: /aa 这个key可以带value数据
也可以建一个key: /bb
也可以建key: /aa/xx

 

 

2.可以为客户端监听指定数据节点的状态,并在数据节点发生变化是,通知客户端

 


zookeeper 安装步骤
把包上传linux后解压到apps/
[root@hdp-01 ~]# tar -zxvf zookeeper-3.4.6.tar.gz -c apps/
/root/apps/zookeeper-3.4.6/conf下该配置文件
[root@hdp-01 conf]# cp zoo_sample.cfg zoo.cfg
然后vim zoo.cfg
更改为
datadir=/root/zkdata
最后添加
server.1=hdp-01:2888:3888
server.2=hdp-02:2888:3888
server.3=hdp-03:2888:3888
server.4=hdp-04:2888:3888
接着,在hdp-01上,新建数据目录/root/zkdata,并在目录重生成一个文件myid,内容为1
接着,在hdp-02上,新建数据目录/root/zkdata,并在目录重生成一个文件myid,内容为2
接着,在hdp-03上,新建数据目录/root/zkdata,并在目录重生成一个文件myid,内容为3
接着,在hdp-04上,新建数据目录/root/zkdata,并在目录重生成一个文件myid,内容为4
然后将zookeeper scp给其他机器
启动
[root@hdp-01 ~]# /root/apps/zookeeper-3.4.6/bin/zkserver.sh start
查看状态
[root@hdp-01 ~]# /root/apps/zookeeper-3.4.6/bin/zkserver.sh status

可以自己写一个脚本进行启动名字叫zkmanage.sh
用的时候后面跟上参数,传入$1.
sh ./zkmanage.sh start
或者关闭的时候
sh ./zkmanager.sh stop
脚本代码如下

Hadoop学习(5)-zookeeper的安装和命令行,java操作
#!/bin/bash
for host in hdp-01 hdp-02 hdp-03 hdp-04
do
echo "${host}:starting...."
ssh $host "/root/apps/zookeeper-3.4.6/bin/zkserver.sh $1"
done
sleep 2
for host in hdp-01 hdp-02 hdp-03 hdp-04
do
ssh $host "/root/apps/zookeeper-3.4.6/bin/zkserver.sh status"
done
Hadoop学习(5)-zookeeper的安装和命令行,java操作

注意一点,如果有的结点没有启动,一定要看一下是不是这几台机器的时间是不是不对应,如果差别太大是启动不起来的。f**k.

简单补充一点就是,启动之后,这几台机器,有的当leader,有的当follower,只有一个leader,他们谁当leader是根据他们 '投票的形式'的决定的。

只有一个leader

 

 

zookeeper的命令行客户端和java客户端

命令行

在bin/zkcli.sh

这样会连到本机localhost

指定连到哪一台zookeeper

bin/zkcli.sh –server hdp-02:2181

 

两个作用,管理数据和监听

首先是管理数据

Hadoop学习(5)-zookeeper的安装和命令行,java操作

 

也可以自己建数据

[zk: hdp-03:2181(connected) 8] create /aa "hellozk"

created /aa

 

[zk: hdp-03:2181(connected) 9] ls /

[aa, root, hbase, zookeeper]

 

[zk: hdp-03:2181(connected) 10] get /aa

"hellozk"

czxid = 0xc00000023

ctime = mon aug 05 14:41:52 cst 2019

mzxid = 0xc00000023

mtime = mon aug 05 14:41:52 cst 2019

pzxid = 0xc00000023

cversion = 0

dataversion = 0

aclversion = 0

ephemeralowner = 0x0

datalength = 9

numchildren = 0

 

 

 

修改数据

[zk: hdp-03:2181(connected) 11] set /aa hellospark

czxid = 0xc00000023

ctime = mon aug 05 14:41:52 cst 2019

mzxid = 0xc00000024

mtime = mon aug 05 14:42:40 cst 2019

pzxid = 0xc00000023

cversion = 0

dataversion = 1

aclversion = 0

ephemeralowner = 0x0

datalength = 10

numchildren = 0

这个数据版本,你没修改几次就会变成几

也可以在/aa下建立子目录

如果有些命令忘了,可以输入help查看帮助

 

删除就是rmr

[zk: hdp-03:2181(connected) 13] rmr /aa

 

监听

[zk: hdp-03:2181(connected) 17] create /aa iamfine

created /aa

 

[zk: hdp-03:2181(connected) 18] get /aa watch

然后这时候如果改变了/aa 就让他通知我

在另一台机器上启动一个zookeeper

 

[zk: hdp-03:2181(connected) 2] set /aa iamnotfine

此时就会有信息

Hadoop学习(5)-zookeeper的安装和命令行,java操作

 

但当你再改一次的话,这个连接就不会再提醒了,这个监听只起一次作用。

 

数据类型分为好几种

zookeeper中的znode有多种类型:

1、persistent  持久的:创建者就算跟集群断开联系,该类节点也会持久存在与zk集群中

2、ephemeral  短暂的:创建者一旦跟集群断开联系,zk就会将这个节点删除

3、sequential  带序号的:这类节点,zk会自动拼接上一个序号,而且序号是递增的

我们一般创建的都是持久的

create –e /bb xxx

这时候就是短暂的

create /cc yyyy

create –s /cc/c qq

然后他们就会自动的在这些子节点下带上序号

 

java客户端

 

 

 需要的jar包

Hadoop学习(5)-zookeeper的安装和命令行,java操作

 

import java.io.unsupportedencodingexception;
import java.util.list;

import org.apache.zookeeper.createmode;
import org.apache.zookeeper.keeperexception;
import org.apache.zookeeper.zoodefs.ids;
import org.apache.zookeeper.zookeeper;
import org.junit.before;
import org.junit.test;

public class zookeeperclientdemo {
    zookeeper zk = null;
    @before
    public void init()  throws exception{
        // 构造一个连接zookeeper的客户端对象
        zk = new zookeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, null);
    }
    //增
    @test
    public void testcreate() throws exception{

        // 参数1:要创建的节点路径  参数2:数据  参数3:访问权限  参数4:节点类型
        string create = zk.create("/zktest", "hello zk".getbytes(), ids.open_acl_unsafe, createmode.persistent);
        system.out.println(create);
        
        zk.close();
        
    }
    
    //改
    @test
    public void testupdate() throws exception {
        
        // 参数1:节点路径   参数2:数据    参数3:所要修改的版本,-1代表任何版本
        zk.setdata("/zktest", "我爱你".getbytes("utf-8"), -1);
        
        zk.close();
        
    }
    
    //查
    @test    
    public void testget() throws exception {
        // 参数1:节点路径    参数2:是否要监听    参数3:所要获取的数据的版本,null表示最新版本
        byte[] data = zk.getdata("/zktest", false, null);
        system.out.println(new string(data,"utf-8"));
        
        zk.close();
    }
    
    
    //查子节点
    @test    
    public void testlistchildren() throws exception {
        // 参数1:节点路径    参数2:是否要监听   
        // 注意:返回的结果中只有子节点名字,不带全路径
        list<string> children = zk.getchildren("/zktest", false);
        
        for (string child : children) {
            system.out.println(child);
        }
        
        zk.close();
    }
    
    //删
    @test
    public void testrm() throws interruptedexception, keeperexception{
        
        zk.delete("/zktest", -1);
        
        zk.close();
    }
    
    
    

}

java客户端监听节点是否发生了变化

import java.util.list;

import org.apache.zookeeper.keeperexception;
import org.apache.zookeeper.watchedevent;
import org.apache.zookeeper.watcher;
import org.apache.zookeeper.watcher.event.eventtype;
import org.apache.zookeeper.watcher.event.keeperstate;
import org.apache.zookeeper.zookeeper;
import org.junit.before;
import org.junit.test;

public class zookeeperwatchdemo {

    zookeeper zk = null;

    @before
    public void init() throws exception {
        // 构造一个连接zookeeper的客户端对象
        zk = new zookeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181", 2000, new watcher() {

            @override
            public void process(watchedevent event) {
                //如果在连接,并且为该节点的数据变化了
                if (event.getstate() == keeperstate.syncconnected && event.gettype() == eventtype.nodedatachanged) {
                    system.out.println(event.getpath()); // 收到的事件所发生的节点路径
                    system.out.println(event.gettype()); // 收到的事件的类型
                    system.out.println("数据变化了啊....."); // 收到事件后,我们的处理逻辑

                    try {
                        zk.getdata("/mygirls", true, null);

                    } catch (keeperexception | interruptedexception e) {
                        e.printstacktrace();
                    }
                    //如果在连接,并且是字节点变化了
                }else if(event.getstate() == keeperstate.syncconnected && event.gettype() == eventtype.nodechildrenchanged){
                    
                    system.out.println("子节点变化了......");
                }

            }
        });
    }

    @test
    public void testgetwatch() throws exception {
        //此时监听的逻辑就是new zookeeper时的watcher,这里也可以自己写一个watcher,
        //但如果自己写的话,就会只运行一次了,不能重复监听
        byte[] data = zk.getdata("/mygirls", true, null); // 监听节点数据变化
        
        list<string> children = zk.getchildren("/mygirls", true); //监听节点的子节点变化事件

        system.out.println(new string(data, "utf-8"));
        //这时候启动的监听线程为一个守护线程,当主线程结束后,就会退出,所以这里让主线程睡眠时间,当主线程结束,他也就没了
        //这个守护线程使我们在创建的zookeeper的时候就创建的,
        thread.sleep(long.max_value);

    }

}

 

 监听服务器上下线

首先是一个服务器的业务逻辑

 

import java.io.ioexception;
import java.io.inputstream;
import java.io.outputstream;
import java.net.serversocket;
import java.net.socket;
import java.util.date;

public class timequeryservice extends thread{
    
    int port = 0;
    public timequeryservice(int port){
        
        this.port = port;
    }
    @override
    public void run() {
        
        try {
            //javasocket编程,创建一个指定的端口号接受数据
            serversocket ss = new serversocket(port);
            system.out.println("业务线程已绑定端口"+port+"准备接受消费端请求了.....");
            while(true){
                socket sc = ss.accept();
                inputstream inputstream = sc.getinputstream();
                outputstream outputstream = sc.getoutputstream();
                outputstream.write(new date().tostring().getbytes());
            }
            
        } catch (ioexception e) {
            e.printstacktrace();
        }
        
        
    }
    

}

 

然后服务器上线时,先向zookeeper注册,等待消费者来访问

package cn.edu360.zk.distributesystem;

import org.apache.zookeeper.createmode;
import org.apache.zookeeper.zoodefs.ids;
import org.apache.zookeeper.zookeeper;
import org.apache.zookeeper.data.stat;

public class timequeryserver {
    zookeeper zk = null;
    
    // 构造zk客户端连接
    public void connectzk() throws exception{
        zk = new zookeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181,hdp-04:2181", 2000, null);
    }
    // 注册服务器信息
    public void registerserverinfo(string hostname,string port) throws exception{
        
        /**
         * 先判断注册节点的父节点是否存在,如果不存在,则创建
         */
        stat stat = zk.exists("/servers", false);
        if(stat==null){
            zk.create("/servers", null, ids.open_acl_unsafe, createmode.persistent);
        }
        
        // 注册服务器数据到zk的约定注册节点下
        string create = zk.create("/servers/server", (hostname+":"+port).getbytes(), ids.open_acl_unsafe, createmode.ephemeral_sequential);
        
        system.out.println(hostname+" 服务器向zk注册信息成功,注册的节点为:" + create);
        
    }

    public static void main(string[] args) throws exception {
        
        timequeryserver timequeryserver = new timequeryserver();
        
        // 构造zk客户端连接
        timequeryserver.connectzk();
        
        // 注册服务器信息
        timequeryserver.registerserverinfo(args[0], args[1]);
        
        // 启动业务线程开始处理业务
        new timequeryservice(integer.parseint(args[1])).start();
        
    }
    

}

然后是消费者端的业务逻辑

先看一下zookeeper有哪些alive的服务器,然后随便挑一台访问

package cn.edu360.zk.distributesystem;

import java.io.inputstream;
import java.io.outputstream;
import java.net.socket;
import java.util.arraylist;
import java.util.list;
import java.util.random;

import org.apache.zookeeper.watchedevent;
import org.apache.zookeeper.watcher;
import org.apache.zookeeper.zookeeper;
import org.apache.zookeeper.watcher.event.eventtype;
import org.apache.zookeeper.watcher.event.keeperstate;

public class consumer {

    // 定义一个list用于存放最新的在线服务器列表
    private volatile arraylist<string> onlineservers = new arraylist<>();

    // 构造zk连接对象
    zookeeper zk = null;

    // 构造zk客户端连接
    public void connectzk() throws exception {

        zk = new zookeeper("hdp-01:2181,hdp-02:2181,hdp-03:2181,hdp-04:2181", 2000, new watcher() {

            @override
            public void process(watchedevent event) {
                if (event.getstate() == keeperstate.syncconnected && event.gettype() == eventtype.nodechildrenchanged) {

                    try {
                        // 事件回调逻辑中,再次查询zk上的在线服务器节点即可,查询逻辑中又再次注册了子节点变化事件监听
                        getonlineservers();
                    } catch (exception e) {
                        e.printstacktrace();
                    }

                }

            }
        });

    }

    // 查询在线服务器列表
    public void getonlineservers() throws exception {

        list<string> children = zk.getchildren("/servers", true);
        arraylist<string> servers = new arraylist<>();

        for (string child : children) {
            byte[] data = zk.getdata("/servers/" + child, false, null);

            string serverinfo = new string(data);

            servers.add(serverinfo);
        }

        onlineservers = servers;
        system.out.println("查询了一次zk,当前在线的服务器有:" + servers);

    }

    public void sendrequest() throws exception {
        random random = new random();
        while (true) {
            try {
                // 挑选一台当前在线的服务器
                int nextint = random.nextint(onlineservers.size());
                string server = onlineservers.get(nextint);
                string hostname = server.split(":")[0];
                int port = integer.parseint(server.split(":")[1]);

                system.out.println("本次请求挑选的服务器为:" + server);

                socket socket = new socket(hostname, port);
                outputstream out = socket.getoutputstream();
                inputstream in = socket.getinputstream();

                out.write("haha".getbytes());
                out.flush();

                byte[] buf = new byte[256];
                int read = in.read(buf);
                system.out.println("服务器响应的时间为:" + new string(buf, 0, read));

                out.close();
                in.close();
                socket.close();

                thread.sleep(2000);
            } catch (exception e) {
                e.printstacktrace();
            }

        }

    }

    public static void main(string[] args) throws exception {

        consumer consumer = new consumer();
        // 构造zk连接对象
        consumer.connectzk();

        // 查询在线服务器列表
        consumer.getonlineservers();

        // 处理业务(向一台服务器发送时间查询请求)
        consumer.sendrequest();

    }

}