zookeeper【3】服务发现
程序员文章站
2023-03-31 19:48:31
服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。 实现代码如下: ......
服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。
实现代码如下:
import com.alibaba.fastjson.json; import org.i0itec.zkclient.izkdatalistener; import org.i0itec.zkclient.zkclient; import org.i0itec.zkclient.exception.zknonodeexception; /** * 代表工作服务器 */ public class workserver { private zkclient zkclient; // zookeeper private string configpath; // zookeeper集群中servers节点的路径 private string serverspath; // 当前工作服务器的基本信息 private serverdata serverdata; // 当前工作服务器的配置信息 private serverconfig serverconfig; private izkdatalistener datalistener; public workserver(string configpath, string serverspath, serverdata serverdata, zkclient zkclient, serverconfig initconfig) { this.zkclient = zkclient; this.serverspath = serverspath; this.configpath = configpath; this.serverconfig = initconfig; this.serverdata = serverdata; this.datalistener = new izkdatalistener() { public void handledatadeleted(string datapath) throws exception { } public void handledatachange(string datapath, object data) throws exception { string retjson = new string((byte[])data); serverconfig serverconfiglocal = (serverconfig) json.parseobject(retjson,serverconfig.class); updateconfig(serverconfiglocal); system.out.println("new work server config is:"+serverconfig.tostring()); } }; } // 启动服务器 public void start() { system.out.println("work server start..."); initrunning(); } // 停止服务器 public void stop() { system.out.println("work server stop..."); zkclient.unsubscribedatachanges(configpath, datalistener); // 取消监听config节点 } // 服务器初始化 private void initrunning() { registme(); // 注册自己 zkclient.subscribedatachanges(configpath, datalistener); // 订阅config节点的改变事件 } // 启动时向zookeeper注册自己的注册函数 private void registme() { string mepath = serverspath.concat("/").concat(serverdata.getaddress()); try { zkclient.createephemeral(mepath, json.tojsonstring(serverdata) .getbytes()); } catch (zknonodeexception e) { zkclient.createpersistent(serverspath, true); registme(); } } // 更新自己的配置信息 private void updateconfig(serverconfig serverconfig) { this.serverconfig = serverconfig; } }
/** * 调度类 */ public class subscribezkclient { private static final int client_qty = 5; // work server数量 private static final string zookeeper_server = "192.168.1.105:2181"; private static final string config_path = "/config"; private static final string command_path = "/command"; private static final string servers_path = "/servers"; public static void main(string[] args) throws exception { list<zkclient> clients = new arraylist<zkclient>(); list<workserver> workservers = new arraylist<workserver>(); manageserver manageserver = null; try { // 创建一个默认的配置 serverconfig initconfig = new serverconfig(); initconfig.setdbpwd("123456"); initconfig.setdburl("jdbc:mysql://localhost:3306/mydb"); initconfig.setdbuser("root"); // 实例化一个manage server zkclient clientmanage = new zkclient(zookeeper_server, 5000, 5000, new bytespushthroughserializer()); manageserver = new manageserver(servers_path, command_path,config_path,clientmanage,initconfig); manageserver.start(); // 启动manage server // 创建指定个数的工作服务器 for ( int i = 0; i < client_qty; ++i ) { zkclient client = new zkclient(zookeeper_server, 5000, 5000, new bytespushthroughserializer()); clients.add(client); serverdata serverdata = new serverdata(); serverdata.setid(i); serverdata.setname("workserver#"+i); serverdata.setaddress("192.168.1."+i); workserver workserver = new workserver(config_path, servers_path, serverdata, client, initconfig); workservers.add(workserver); workserver.start(); // 启动工作服务器 } system.out.println("敲回车键退出!\n"); new bufferedreader(new inputstreamreader(system.in)).readline(); } finally { system.out.println("shutting down..."); for ( workserver workserver : workservers ) { try { workserver.stop(); } catch (exception e) { e.printstacktrace(); } } for ( zkclient client : clients ) { try { client.close(); } catch (exception e) { e.printstacktrace(); } } } } }
/** * 服务器基本信息 */ public class serverdata { private string address; private integer id; private string name; public string getaddress() { return address; } public void setaddress(string address) { this.address = address; } public integer getid() { return id; } public void setid(integer id) { this.id = id; } public string getname() { return name; } public void setname(string name) { this.name = name; } @override public string tostring() { return "serverdata [address=" + address + ", id=" + id + ", name=" + name + "]"; } }
/** * 配置信息 */ public class serverconfig { private string dburl; private string dbpwd; private string dbuser; public string getdburl() { return dburl; } public void setdburl(string dburl) { this.dburl = dburl; } public string getdbpwd() { return dbpwd; } public void setdbpwd(string dbpwd) { this.dbpwd = dbpwd; } public string getdbuser() { return dbuser; } public void setdbuser(string dbuser) { this.dbuser = dbuser; } @override public string tostring() { return "serverconfig [dburl=" + dburl + ", dbpwd=" + dbpwd + ", dbuser=" + dbuser + "]"; } }
import com.alibaba.fastjson.json; import org.i0itec.zkclient.izkchildlistener; import org.i0itec.zkclient.izkdatalistener; import org.i0itec.zkclient.zkclient; import org.i0itec.zkclient.exception.zknonodeexception; import org.i0itec.zkclient.exception.zknodeexistsexception; import java.util.list; public class manageserver { // zookeeper的servers节点路径 private string serverspath; // zookeeper的command节点路径 private string commandpath; // zookeeper的config节点路径 private string configpath; private zkclient zkclient; private serverconfig config; // 用于监听servers节点的子节点列表的变化 private izkchildlistener childlistener; // 用于监听command节点数据内容的变化 private izkdatalistener datalistener; // 工作服务器的列表 private list<string> workserverlist; public manageserver(string serverspath, string commandpath, string configpath, zkclient zkclient, serverconfig config) { this.serverspath = serverspath; this.commandpath = commandpath; this.zkclient = zkclient; this.config = config; this.configpath = configpath; this.childlistener = new izkchildlistener() { public void handlechildchange(string parentpath, list<string> currentchilds) throws exception { // todo auto-generated method stub workserverlist = currentchilds; // 更新内存中工作服务器列表 system.out.println("work server list changed, new list is "); execlist(); } }; this.datalistener = new izkdatalistener() { public void handledatadeleted(string datapath) throws exception { // todo auto-generated method stub // ignore; } public void handledatachange(string datapath, object data) throws exception { // todo auto-generated method stub string cmd = new string((byte[]) data); system.out.println("cmd:"+cmd); execmd(cmd); // 执行命令 } }; } private void initrunning() { zkclient.subscribedatachanges(commandpath, datalistener); zkclient.subscribechildchanges(serverspath, childlistener); } /* * 1: list 2: create 3: modify */ private void execmd(string cmdtype) { if ("list".equals(cmdtype)) { execlist(); } else if ("create".equals(cmdtype)) { execcreate(); } else if ("modify".equals(cmdtype)) { execmodify(); } else { system.out.println("error command!" + cmdtype); } } // 列出工作服务器列表 private void execlist() { system.out.println(workserverlist.tostring()); } // 创建config节点 private void execcreate() { if (!zkclient.exists(configpath)) { try { zkclient.createpersistent(configpath, json.tojsonstring(config) .getbytes()); } catch (zknodeexistsexception e) { zkclient.writedata(configpath, json.tojsonstring(config) .getbytes()); // config节点已经存在,则写入内容就可以了 } catch (zknonodeexception e) { string parentdir = configpath.substring(0, configpath.lastindexof('/')); zkclient.createpersistent(parentdir, true); execcreate(); } } } // 修改config节点内容 private void execmodify() { // 我们随意修改config的一个属性就可以了 config.setdbuser(config.getdbuser() + "_modify"); try { zkclient.writedata(configpath, json.tojsonstring(config).getbytes()); } catch (zknonodeexception e) { execcreate(); // 写入时config节点还未存在,则创建它 } } // 启动工作服务器 public void start() { initrunning(); } // 停止工作服务器 public void stop() { zkclient.unsubscribechildchanges(serverspath, childlistener); zkclient.unsubscribedatachanges(commandpath, datalistener); } }