欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

zookeeper【3】服务发现

程序员文章站 2023-03-31 19:48:31
服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。 实现代码如下: ......

服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。

实现代码如下:

import com.alibaba.fastjson.json;
import org.i0itec.zkclient.izkdatalistener;
import org.i0itec.zkclient.zkclient;
import org.i0itec.zkclient.exception.zknonodeexception;

/**
 * 代表工作服务器
 */
public class workserver {

    private zkclient zkclient;
    // zookeeper
    private string configpath;
    // zookeeper集群中servers节点的路径
    private string serverspath;
    // 当前工作服务器的基本信息
    private serverdata serverdata;
    // 当前工作服务器的配置信息
    private serverconfig serverconfig;
    private izkdatalistener datalistener;

    public workserver(string configpath, string serverspath,
                      serverdata serverdata, zkclient zkclient, serverconfig initconfig) {
        this.zkclient = zkclient;
        this.serverspath = serverspath;
        this.configpath = configpath;
        this.serverconfig = initconfig;
        this.serverdata = serverdata;

        this.datalistener = new izkdatalistener() {

            public void handledatadeleted(string datapath) throws exception {

            }

            public void handledatachange(string datapath, object data)
                    throws exception {
                string retjson = new string((byte[])data);
                serverconfig serverconfiglocal = (serverconfig) json.parseobject(retjson,serverconfig.class);
                updateconfig(serverconfiglocal);
                system.out.println("new work server config is:"+serverconfig.tostring());

            }
        };

    }

    // 启动服务器
    public void start() {
        system.out.println("work server start...");
        initrunning();
    }

    // 停止服务器
    public void stop() {
        system.out.println("work server stop...");
        zkclient.unsubscribedatachanges(configpath, datalistener); // 取消监听config节点
    }

    // 服务器初始化
    private void initrunning() {
        registme(); // 注册自己
        zkclient.subscribedatachanges(configpath, datalistener); // 订阅config节点的改变事件
    }

    // 启动时向zookeeper注册自己的注册函数
    private void registme() {
        string mepath = serverspath.concat("/").concat(serverdata.getaddress());

        try {
            zkclient.createephemeral(mepath, json.tojsonstring(serverdata)
                    .getbytes());
        } catch (zknonodeexception e) {
            zkclient.createpersistent(serverspath, true);
            registme();
        }
    }

    // 更新自己的配置信息
    private void updateconfig(serverconfig serverconfig) {
        this.serverconfig = serverconfig;
    }

}
/**
 * 调度类
 */
public class subscribezkclient {

    private static final int  client_qty = 5; // work server数量

    private static final string  zookeeper_server = "192.168.1.105:2181";

    private static final string  config_path = "/config";
    private static final string  command_path = "/command";
    private static final string  servers_path = "/servers";

    public static void main(string[] args) throws exception {

        list<zkclient> clients = new arraylist<zkclient>();
        list<workserver>  workservers = new arraylist<workserver>();
        manageserver manageserver = null;

        try {

            // 创建一个默认的配置
            serverconfig initconfig = new serverconfig();
            initconfig.setdbpwd("123456");
            initconfig.setdburl("jdbc:mysql://localhost:3306/mydb");
            initconfig.setdbuser("root");

            // 实例化一个manage server
            zkclient clientmanage = new zkclient(zookeeper_server, 5000, 5000, new bytespushthroughserializer());
            manageserver = new manageserver(servers_path, command_path,config_path,clientmanage,initconfig);
            manageserver.start(); // 启动manage server

            // 创建指定个数的工作服务器
            for ( int i = 0; i < client_qty; ++i ) {
                zkclient client = new zkclient(zookeeper_server, 5000, 5000, new bytespushthroughserializer());
                clients.add(client);
                serverdata serverdata = new serverdata();
                serverdata.setid(i);
                serverdata.setname("workserver#"+i);
                serverdata.setaddress("192.168.1."+i);

                workserver  workserver = new workserver(config_path, servers_path, serverdata, client, initconfig);
                workservers.add(workserver);
                workserver.start();    // 启动工作服务器

            }

            system.out.println("敲回车键退出!\n");
            new bufferedreader(new inputstreamreader(system.in)).readline();

        } finally {
            system.out.println("shutting down...");

            for ( workserver workserver : workservers ) {
                try {
                    workserver.stop();
                } catch (exception e) {
                    e.printstacktrace();
                }
            }

            for ( zkclient client : clients ) {
                try {
                    client.close();
                } catch (exception e) {
                    e.printstacktrace();
                }

            }
        }
    }

}
/**
 * 服务器基本信息
 */
public class serverdata {

    private string address;
    private integer id;
    private string name;

    public string getaddress() {
        return address;
    }
    public void setaddress(string address) {
        this.address = address;
    }
    public integer getid() {
        return id;
    }
    public void setid(integer id) {
        this.id = id;
    }
    public string getname() {
        return name;
    }
    public void setname(string name) {
        this.name = name;
    }

    @override
    public string tostring() {
        return "serverdata [address=" + address + ", id=" + id + ", name="
                + name + "]";
    }

}
/**
 * 配置信息
 */
public class serverconfig {

    private string dburl;
    private string dbpwd;
    private string dbuser;
    public string getdburl() {
        return dburl;
    }
    public void setdburl(string dburl) {
        this.dburl = dburl;
    }
    public string getdbpwd() {
        return dbpwd;
    }
    public void setdbpwd(string dbpwd) {
        this.dbpwd = dbpwd;
    }
    public string getdbuser() {
        return dbuser;
    }
    public void setdbuser(string dbuser) {
        this.dbuser = dbuser;
    }

    @override
    public string tostring() {
        return "serverconfig [dburl=" + dburl + ", dbpwd=" + dbpwd
                + ", dbuser=" + dbuser + "]";
    }

}
import com.alibaba.fastjson.json;
import org.i0itec.zkclient.izkchildlistener;
import org.i0itec.zkclient.izkdatalistener;
import org.i0itec.zkclient.zkclient;
import org.i0itec.zkclient.exception.zknonodeexception;
import org.i0itec.zkclient.exception.zknodeexistsexception;

import java.util.list;

public class manageserver {

    // zookeeper的servers节点路径
    private string serverspath;
    // zookeeper的command节点路径
    private string commandpath;
    // zookeeper的config节点路径
    private string configpath;
    private zkclient zkclient;
    private serverconfig config;
    // 用于监听servers节点的子节点列表的变化
    private izkchildlistener childlistener;
    // 用于监听command节点数据内容的变化
    private izkdatalistener datalistener;
    // 工作服务器的列表
    private list<string> workserverlist;

    public manageserver(string serverspath, string commandpath,
                        string configpath, zkclient zkclient, serverconfig config) {
        this.serverspath = serverspath;
        this.commandpath = commandpath;
        this.zkclient = zkclient;
        this.config = config;
        this.configpath = configpath;
        this.childlistener = new izkchildlistener() {

            public void handlechildchange(string parentpath,
                                          list<string> currentchilds) throws exception {
                // todo auto-generated method stub
                workserverlist = currentchilds; // 更新内存中工作服务器列表

                system.out.println("work server list changed, new list is ");
                execlist();

            }
        };
        this.datalistener = new izkdatalistener() {

            public void handledatadeleted(string datapath) throws exception {
                // todo auto-generated method stub
                // ignore;
            }

            public void handledatachange(string datapath, object data)
                    throws exception {
                // todo auto-generated method stub
                string cmd = new string((byte[]) data);
                system.out.println("cmd:"+cmd);
                execmd(cmd); // 执行命令

            }
        };

    }

    private void initrunning() {
        zkclient.subscribedatachanges(commandpath, datalistener);
        zkclient.subscribechildchanges(serverspath, childlistener);
    }

    /*
     * 1: list 2: create 3: modify
     */
    private void execmd(string cmdtype) {
        if ("list".equals(cmdtype)) {
            execlist();

        } else if ("create".equals(cmdtype)) {
            execcreate();
        } else if ("modify".equals(cmdtype)) {
            execmodify();
        } else {
            system.out.println("error command!" + cmdtype);
        }

    }

    // 列出工作服务器列表
    private void execlist() {
        system.out.println(workserverlist.tostring());
    }

    // 创建config节点
    private void execcreate() {
        if (!zkclient.exists(configpath)) {
            try {
                zkclient.createpersistent(configpath, json.tojsonstring(config)
                        .getbytes());
            } catch (zknodeexistsexception e) {
                zkclient.writedata(configpath, json.tojsonstring(config)
                        .getbytes()); // config节点已经存在,则写入内容就可以了
            } catch (zknonodeexception e) {
                string parentdir = configpath.substring(0,
                        configpath.lastindexof('/'));
                zkclient.createpersistent(parentdir, true);
                execcreate();
            }
        }
    }

    // 修改config节点内容
    private void execmodify() {
        // 我们随意修改config的一个属性就可以了
        config.setdbuser(config.getdbuser() + "_modify");

        try {
            zkclient.writedata(configpath, json.tojsonstring(config).getbytes());
        } catch (zknonodeexception e) {
            execcreate(); // 写入时config节点还未存在,则创建它
        }
    }

    // 启动工作服务器
    public void start() {
        initrunning();
    }

    // 停止工作服务器
    public void stop() {
        zkclient.unsubscribechildchanges(serverspath, childlistener);
        zkclient.unsubscribedatachanges(commandpath, datalistener);
    }

}