(九)分布式服务----Zookeeper注册中心
首先看一下几种注册中心:
最老的就是zookeeper了, 比较新的有eureka,consul 都可以做注册中心。可以自行搜索对比三者的优缺点。
zookeeper 最开始就是hadoop大家族中的一员,用于做协调的框架,后来已经是apache的子项目了。
几年前大数据很火的时候,只要学hadoop必学zookeeper,当然还有其他成员。
大数据简单说就是分布式,比如分布式文件存储hdfs,分布式数据库hbase,分布式协调zookeeper,还有kafka,flume等等都是hadoop大家族。
zookeeper,现在更多被用来做注册中心,比如阿里的开源soa框架dubbo就经常搭配zookeeper做注册中心。
eureka:java的微服务框架spring cloud中内部已经集成了eureka注册中心。
我选择zookeeper,不是因为他比另外两个强,而是因为我几年前就已经学习过一些zookeeper的原理,上手更容易。网络上学习书籍、资料、视频教程也特别多,学习资料完善。
注册中心的基本功能:
1. 注册服务,有点类似dns,所有的服务注册到注册中心,包含服务的地址等信息。
2. 服务订阅,客户端请求服务,注册中心就要把那些能用的服务器地址告诉客户端,服务端有变动时,注册中心也能及时通知到客户端。
3. 性能好且高可用,注册中心自身也是一个集群,如果只有一个注册中心机器的话那岂不是把注册中心累死啊,而且他一旦坏了以后,那客户端都找不到服务器了。所有注册中心就有很多台,其中只有一个老大(leader),老大用来写,小弟用来读。就是说老大来决定一台服务器能不能注册进来,小弟负责帮助客户端查找服务器。因为注册服务的次数是很少的,通常有新服务器加入才需要注册,但是客户端订阅那就很多了,所以注册中心只有一个leader。leader万一坏掉的话,会从小弟中选举出一个来当老大接替工作。
上面提到说zookeeper集群,就是说有很多台机器做zookeeper机器,但是这些机器里存储的东西基本上都是一样的,就是说客户端不管连到哪个zookeeper 都是一样的,能做服务订阅。
每一个zookeeper 中都有很多节点(znode)。
接下来说的zookeeper节点和集群完全不是一回事。 有些人喜欢吧集群中的每一台zookeeper机器称为一个节点,但是这个节点(zookeeper机器)和我说的节点(znode)完全不是一回事。
如下图:
本例的图中可以看到,一共有5台机器,每台机器都有5个znode,znode下面的子节点就更多了。
先看5台机器:
一台leader,老大,上文已经介绍,服务都从这些注册写入。
两台follower,小弟,平时用于服务订阅,老大挂掉以后,follower内部就会自行选出老大。
两台observer,观察者,就是属于无业游民,只能看,没有选老大的资格,不能参与竞选也不能投票,唯一的功能就是服务订阅。
observer模式需要手动开启,为什么会出现observer呢,是因为机器太多的话,每个机器都有选举权的话特别影响性能。全中国14亿人口,每个人都参与国家竞选的话,效率极低。所以呢,选举的工作就交给follower完成就行了,只需要确保一直都有leader接班人就好。
再看看zookeeper有什么基本功能:
基本功能很简单,组合以后却可以完成各种复杂工作。
1. 可以创建:临时节点(断开连接时便删除节点) 和 持久化节点(必须手动删除节点)。
2. 可以创建:无序节点 和 有序节点。
3. 节点上可以添加watcher监听功能,监听该节点的增删改,然后触发自定义的事件。
看看这些功能怎么用:
1. 节点: 每次注册一个服务就创建一个节点,节点的名称(key)就是服务的名称,服务的详细信息存储在节点value中,客户端通过key找到对应的节点,再找打节点中的value。
2. 临时节点:服务端注册一个服务时创建一个临时节点,服务断开时,临时节点自动销毁,自动完成服务注销。
3. watcher监听: 客户端在注册中心订阅了一个服务的时候,同时在这个服务所在的节点上加一个监听事件,每当服务节点信息有变化的时候,注册中心会自动回调通知客户端。
4. 有序临时节点:分布式锁或者分布式队列(这里与服务注册无关),客户端1想要操作一条数据的时候,在a节点下创建一个有序临时节点,自动分配编号001;客户端1也要操作该数据的时候,在a节点下也创建一个有序临时节点,自动分配编号002。只有编号最小的子节点才会被执行,因此001节点会被执行,客户端1执行完毕后,自动删除001节点,此时002编号为最小子节点。即锁的概念,不能同时操作同一数据;也可以做队列,按照先后顺序依次执行。
5. 有序临时节点+watcher监听: 上面第4条中说到每次执行编号最小的节点,因此需要有一个程序,每次都需要遍历全部节点,然后找出最小的节点,假如是002节点,这时客户端2开始执行。但是添加监听机制以后就不一样了,002监听001,003监听比他小一号的002,这样001销毁的同时通知002开始执行,002销毁的时候通知003开始执行,不需要遍历最小节点,也能有序依次执行。
6. 临时节点+watcher监听: 集群master选举以及高可用。比如hadoop集群,也有一个resourcemanager资源管理器,负责调度其它节点机器,相当于hadoop集群的leader节点。这个leader就可以交由zookeeper管理,所有的hadoop机器同时在zookeeper中创建一个同名的临时节点,由于是同名互斥的节点,因此只有一个节点能被创建,成功创建这个节点的hadoop机器就是leader。同时添加watcher监听,这个leader只要断开连接,临时节点自动销毁,触发监听,其它hadoop开始新一轮的master选举。这也是zookeeper最初在hadoop家族中的重要使命。
7....... 还要很多地方都能用zookeeper,简直无所不能,而且自身也是高可用,高性能,牛x
zookeeper本身的操作还是很简单的,无非就是节点的增删改查,可以选择要创建节点的类型,还有就是在节点上添加watcher监听器。就这些。
文件结构:
上代码:
zookeeper客户端管理类:
public class zookeeperclientprovider { private configinfo _config; private readonly ilogger<zookeeperclientprovider> _logger; private readonly dictionary<string, zookeeper> _zookeeperclients = new dictionary<string, zookeeper>(); public zookeeperclientprovider(configinfo config, ilogger<zookeeperclientprovider> logger) { _config = config; _logger = logger; } public async task<zookeeper> getzookeeper() { return await createzookeeper(_config.addresses.firstordefault()); } public async task<zookeeper> createzookeeper(string address) { if (!_zookeeperclients.trygetvalue(address, out zookeeper result)) { await task.run(() => { result = new zookeeper(address, (int)_config.sessiontimeout.totalmilliseconds, new reconnectionwatcher( async () => { if (_zookeeperclients.remove(address, out zookeeper value)) { await value.closeasync(); } await createzookeeper(address); })); _zookeeperclients.tryadd(address, result); }); } return result; } public async task<ienumerable<zookeeper>> getzookeepers() { var result = new list<zookeeper>(); foreach (var address in _config.addresses) { result.add(await createzookeeper(address)); } return result; } }
zookeeper服务注册类:
/// <summary> /// 一个抽象的服务路由发现者。 /// </summary> public interface iserviceroutemanager { /// <summary> /// 服务路由被创建。 /// </summary> event eventhandler<servicerouteeventargs> created; /// <summary> /// 服务路由被删除。 /// </summary> event eventhandler<servicerouteeventargs> removed; /// <summary> /// 服务路由被修改。 /// </summary> event eventhandler<serviceroutechangedeventargs> changed; /// <summary> /// 获取所有可用的服务路由信息。 /// </summary> /// <returns>服务路由集合。</returns> task<ienumerable<serviceroute>> getroutesasync(); /// <summary> /// 设置服务路由。 /// </summary> /// <param name="routes">服务路由集合。</param> /// <returns>一个任务。</returns> task setroutesasync(ienumerable<serviceroute> routes); /// <summary> /// 移除地址列表 /// </summary> /// <param name="routes">地址列表。</param> /// <returns>一个任务。</returns> task remveaddressasync(ienumerable<string> address); /// <summary> /// 清空所有的服务路由。 /// </summary> /// <returns>一个任务。</returns> task clearasync(); } /// <summary> /// 服务路由事件参数。 /// </summary> public class servicerouteeventargs { public servicerouteeventargs(serviceroute route) { route = route; } /// <summary> /// 服务路由信息。 /// </summary> public serviceroute route { get; private set; } } /// <summary> /// 服务路由变更事件参数。 /// </summary> public class serviceroutechangedeventargs : servicerouteeventargs { public serviceroutechangedeventargs(serviceroute route, serviceroute oldroute) : base(route) { oldroute = oldroute; } /// <summary> /// 旧的服务路由信息。 /// </summary> public serviceroute oldroute { get; set; } }
public class zookeeperserviceroutemanager : iserviceroutemanager, idisposable { private readonly configinfo _configinfo; private readonly iserializer<byte[]> _serializer; private readonly ilogger<zookeeperserviceroutemanager> _logger; private serviceroute[] _routes; private readonly zookeeperclientprovider _zookeeperclientprovider; public zookeeperserviceroutemanager(configinfo configinfo, iserializer<byte[]> serializer, iserializer<string> stringserializer, ilogger<zookeeperserviceroutemanager> logger, zookeeperclientprovider zookeeperclientprovider) { _configinfo = configinfo; _serializer = serializer; _logger = logger; _zookeeperclientprovider = zookeeperclientprovider; enterroutes().wait(); } private eventhandler<servicerouteeventargs> _created; private eventhandler<servicerouteeventargs> _removed; private eventhandler<serviceroutechangedeventargs> _changed; /// <summary> /// 服务路由被创建。 /// </summary> public event eventhandler<servicerouteeventargs> created { add { _created += value; } remove { _created -= value; } } /// <summary> /// 服务路由被删除。 /// </summary> public event eventhandler<servicerouteeventargs> removed { add { _removed += value; } remove { _removed -= value; } } /// <summary> /// 服务路由被修改。 /// </summary> public event eventhandler<serviceroutechangedeventargs> changed { add { _changed += value; } remove { _changed -= value; } } protected void oncreated(params servicerouteeventargs[] args) { if (_created == null) return; foreach (var arg in args) _created(this, arg); } protected void onchanged(params serviceroutechangedeventargs[] args) { if (_changed == null) return; foreach (var arg in args) _changed(this, arg); } protected void onremoved(params servicerouteeventargs[] args) { if (_removed == null) return; foreach (var arg in args) _removed(this, arg); } /// <summary> /// 获取所有可用的服务路由信息。 /// </summary> /// <returns>服务路由集合。</returns> public async task<ienumerable<serviceroute>> getroutesasync() { await enterroutes(); return _routes; } /// <summary> /// 清空所有的服务路由。 /// </summary> /// <returns>一个任务。</returns> public async task clearasync() { if (_logger.isenabled(loglevel.information)) _logger.loginformation("准备清空所有路由配置。"); var zookeepers = await _zookeeperclientprovider.getzookeepers(); foreach (var zookeeper in zookeepers) { var path = _configinfo.routepath; var childrens = path.split(new[] { '/' }, stringsplitoptions.removeemptyentries); var index = 0; while (childrens.count() > 1) { var nodepath = "/" + string.join("/", childrens); if (await zookeeper.existsasync(nodepath) != null) { var result = await zookeeper.getchildrenasync(nodepath); if (result?.children != null) { foreach (var child in result.children) { var childpath = $"{nodepath}/{child}"; if (_logger.isenabled(loglevel.debug)) _logger.logdebug($"准备删除:{childpath}。"); await zookeeper.deleteasync(childpath); } } if (_logger.isenabled(loglevel.debug)) _logger.logdebug($"准备删除:{nodepath}。"); await zookeeper.deleteasync(nodepath); } index++; childrens = childrens.take(childrens.length - index).toarray(); } if (_logger.isenabled(loglevel.information)) _logger.loginformation("路由配置清空完成。"); } } /// <summary> /// 设置服务路由。 /// </summary> /// <param name="routes">服务路由集合。</param> /// <returns>一个任务。</returns> public async task setroutesasync(ienumerable<serviceroute> routes) { var hostaddr = netutils.gethostaddress(); var serviceroutes = await getroutes(routes.select(p => p.serviceroutedescriptor.id)); if (serviceroutes.count() > 0) { foreach (var route in routes) { var serviceroute = serviceroutes.where(p => p.serviceroutedescriptor.id == route.serviceroutedescriptor.id).firstordefault(); if (serviceroute != null) { var addresses = serviceroute.address.concat( route.address.except(serviceroute.address)).tolist(); foreach (var address in route.address) { addresses.remove(addresses.where(p => p.tostring() == address.tostring()).firstordefault()); addresses.add(address); } route.address = addresses; } } } await removeexceptroutesasync(routes, hostaddr); if (_logger.isenabled(loglevel.information)) _logger.loginformation("准备添加服务路由。"); var zookeepers = await _zookeeperclientprovider.getzookeepers(); foreach (var zookeeper in zookeepers) { await createsubdirectory(zookeeper, _configinfo.routepath); var path = _configinfo.routepath; if (!path.endswith("/")) path += "/"; routes = routes.toarray(); foreach (var serviceroute in routes) { var nodepath = $"{path}{serviceroute.serviceroutedescriptor.id}"; var nodedata = _serializer.serialize(serviceroute); if (await zookeeper.existsasync(nodepath) == null) { if (_logger.isenabled(loglevel.debug)) _logger.logdebug($"节点:{nodepath}不存在将进行创建。"); await zookeeper.createasync(nodepath, nodedata, zoodefs.ids.open_acl_unsafe, createmode.persistent); } else { if (_logger.isenabled(loglevel.debug)) _logger.logdebug($"将更新节点:{nodepath}的数据。"); var onlinedata = (await zookeeper.getdataasync(nodepath)).data; if (!dataequals(nodedata, onlinedata)) await zookeeper.setdataasync(nodepath, nodedata); } } if (_logger.isenabled(loglevel.information)) _logger.loginformation("服务路由添加成功。"); } } public async task remveaddressasync(ienumerable<string> address) { var routes = await getroutesasync(); foreach (var route in routes) { route.address = route.address.except(address); } await setroutesasync(routes); } private async task removeexceptroutesasync(ienumerable<serviceroute> routes, string hostaddr) { var path = _configinfo.routepath; if (!path.endswith("/")) path += "/"; routes = routes.toarray(); var zookeepers = await _zookeeperclientprovider.getzookeepers(); foreach (var zookeeper in zookeepers) { if (_routes != null) { var oldrouteids = _routes.select(i => i.serviceroutedescriptor.id).toarray(); var newrouteids = routes.select(i => i.serviceroutedescriptor.id).toarray(); var deletedrouteids = oldrouteids.except(newrouteids).toarray(); foreach (var deletedrouteid in deletedrouteids) { var addresses = _routes.where(p => p.serviceroutedescriptor.id == deletedrouteid).select(p => p.address).firstordefault(); if (addresses.contains(hostaddr)) { var nodepath = $"{path}{deletedrouteid}"; await zookeeper.deleteasync(nodepath); } } } } } private async task createsubdirectory(zookeeper zookeeper, string path) { if (await zookeeper.existsasync(path) != null) return; if (_logger.isenabled(loglevel.information)) _logger.loginformation($"节点{path}不存在,将进行创建。"); var childrens = path.split(new[] { '/' }, stringsplitoptions.removeemptyentries); var nodepath = "/"; foreach (var children in childrens) { nodepath += children; if (await zookeeper.existsasync(nodepath) == null) { await zookeeper.createasync(nodepath, null, zoodefs.ids.open_acl_unsafe, createmode.persistent); } nodepath += "/"; } } private async task<serviceroute> getroute(byte[] data) { if (_logger.isenabled(loglevel.debug)) _logger.logdebug($"准备转换服务路由,配置内容:{encoding.utf8.getstring(data)}。"); if (data == null) return null; return await task.run(() => { return _serializer.deserialize<serviceroute>(data); }); } private async task<serviceroute> getroute(string path) { serviceroute result = null; var zookeeper = await getzookeeper(); var watcher = new nodemonitorwatcher(getzookeeper(), path, async (olddata, newdata) => await nodechange(olddata, newdata)); if (await zookeeper.existsasync(path) != null) { var data = (await zookeeper.getdataasync(path, watcher)).data; watcher.setcurrentdata(data); result = await getroute(data); } return result; } private async task<serviceroute[]> getroutes(ienumerable<string> childrens) { var rootpath = _configinfo.routepath; if (!rootpath.endswith("/")) rootpath += "/"; childrens = childrens.toarray(); var routes = new list<serviceroute>(childrens.count()); foreach (var children in childrens) { if (_logger.isenabled(loglevel.debug)) _logger.logdebug($"准备从节点:{children}中获取路由信息。"); var nodepath = $"{rootpath}{children}"; var route = await getroute(nodepath); if (route != null) routes.add(route); } return routes.toarray(); } private async task enterroutes() { if (_routes != null) return; var zookeeper = await getzookeeper(); var watcher = new childrenmonitorwatcher(getzookeeper(), _configinfo.routepath, async (oldchildrens, newchildrens) => await childrenchange(oldchildrens, newchildrens)); if (await zookeeper.existsasync(_configinfo.routepath, watcher) != null) { var result = await zookeeper.getchildrenasync(_configinfo.routepath, watcher); var childrens = result.children.toarray(); watcher.setcurrentdata(childrens); _routes = await getroutes(childrens); } else { if (_logger.isenabled(loglevel.warning)) _logger.logwarning($"无法获取路由信息,因为节点:{_configinfo.routepath},不存在。"); _routes = new serviceroute[0]; } } private static bool dataequals(ireadonlylist<byte> data1, ireadonlylist<byte> data2) { if (data1.count != data2.count) return false; for (var i = 0; i < data1.count; i++) { var b1 = data1[i]; var b2 = data2[i]; if (b1 != b2) return false; } return true; } public async task nodechange(byte[] olddata, byte[] newdata) { if (dataequals(olddata, newdata)) return; var newroute = await getroute(newdata); //得到旧的路由。 var oldroute = _routes.firstordefault(i => i.serviceroutedescriptor.id == newroute.serviceroutedescriptor.id); lock (_routes) { //删除旧路由,并添加上新的路由。 _routes = _routes .where(i => i.serviceroutedescriptor.id != newroute.serviceroutedescriptor.id) .concat(new[] { newroute }).toarray(); } //触发路由变更事件。 onchanged(new serviceroutechangedeventargs(newroute, oldroute)); } public async task childrenchange(string[] oldchildrens, string[] newchildrens) { if (_logger.isenabled(loglevel.debug)) _logger.logdebug($"最新的节点信息:{string.join(",", newchildrens)}"); if (_logger.isenabled(loglevel.debug)) _logger.logdebug($"旧的节点信息:{string.join(",", oldchildrens)}"); //计算出已被删除的节点。 var deletedchildrens = oldchildrens.except(newchildrens).toarray(); //计算出新增的节点。 var createdchildrens = newchildrens.except(oldchildrens).toarray(); if (_logger.isenabled(loglevel.debug)) _logger.logdebug($"需要被删除的路由节点:{string.join(",", deletedchildrens)}"); if (_logger.isenabled(loglevel.debug)) _logger.logdebug($"需要被添加的路由节点:{string.join(",", createdchildrens)}"); //获取新增的路由信息。 var newroutes = (await getroutes(createdchildrens)).toarray(); var routes = _routes.toarray(); lock (_routes) { _routes = _routes //删除无效的节点路由。 .where(i => !deletedchildrens.contains(i.serviceroutedescriptor.id)) //连接上新的路由。 .concat(newroutes) .toarray(); } //需要删除的路由集合。 var deletedroutes = routes.where(i => deletedchildrens.contains(i.serviceroutedescriptor.id)).toarray(); //触发删除事件。 onremoved(deletedroutes.select(route => new servicerouteeventargs(route)).toarray()); //触发路由被创建事件。 oncreated(newroutes.select(route => new servicerouteeventargs(route)).toarray()); if (_logger.isenabled(loglevel.information)) _logger.loginformation("路由数据更新成功。"); } /// <summary>performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void dispose() { } private async task<zookeeper> getzookeeper() { return await _zookeeperclientprovider.getzookeeper(); } }
zookeeper连接配置类:
public class configinfo { /// <summary> /// 初始化会话超时为20秒的zookeeper配置信息。 /// </summary> /// <param name="connectionstring">连接字符串。</param> /// <param name="routepath">路由配置路径。</param> /// <param name="subscriberpath">订阅者配置路径</param> /// <param name="commandpath">服务命令配置路径</param> /// <param name="cachepath">缓存中心配置路径</param> /// <param name="mqttroutepath">mqtt路由配置路径</param> /// <param name="chroot">根节点。</param> public configinfo(string connectionstring, string routepath = "/services/serviceroutes", string subscriberpath = "/services/servicesubscribers", string commandpath = "/services/servicecommands", string cachepath = "/services/servicecaches", string mqttroutepath = "/services/mqttserviceroutes", string chroot = null, bool reloadonchange = false, bool enablechildrenmonitor = false) : this(connectionstring, timespan.fromseconds(20), routepath, subscriberpath, commandpath, cachepath, mqttroutepath, chroot, reloadonchange, enablechildrenmonitor) { } /// <summary> /// 初始化zookeeper配置信息。 /// </summary> /// <param name="connectionstring">连接字符串。</param> /// <param name="routepath">路由配置路径。</param> /// <param name="commandpath">服务命令配置路径</param> /// <param name="subscriberpath">订阅者配置路径</param> /// <param name="sessiontimeout">会话超时时间。</param> /// <param name="cachepath">缓存中心配置路径</param> /// <param name="mqttroutepath">mqtt路由配置路径</param> /// <param name="chroot">根节点。</param> public configinfo(string connectionstring, timespan sessiontimeout, string routepath = "/services/serviceroutes", string subscriberpath = "/services/servicesubscribers", string commandpath = "/services/servicecommands", string cachepath = "/services/servicecaches", string mqttroutepath = "/services/mqttserviceroutes", string chroot = null, bool reloadonchange = false, bool enablechildrenmonitor = false) { cachepath = cachepath; reloadonchange = reloadonchange; chroot = chroot; commandpath = commandpath; subscriberpath = subscriberpath; connectionstring = connectionstring; routepath = routepath; sessiontimeout = sessiontimeout; mqttroutepath = mqttroutepath; enablechildrenmonitor = enablechildrenmonitor; addresses = connectionstring?.split(","); } public bool enablechildrenmonitor { get; set; } public bool reloadonchange { get; set; } /// <summary> /// 连接字符串。 /// </summary> public string connectionstring { get; set; } /// <summary> /// 命令配置路径 /// </summary> public string commandpath { get; set; } /// <summary> /// 路由配置路径。 /// </summary> public string routepath { get; set; } /// <summary> /// 订阅者配置路径 /// </summary> public string subscriberpath { get; set; } /// <summary> /// 会话超时时间。 /// </summary> public timespan sessiontimeout { get; set; } /// <summary> /// 根节点。 /// </summary> public string chroot { get; set; } public ienumerable<string> addresses { get; set; } /// <summary> /// 缓存中心配置中心 /// </summary> public string cachepath { get; set; } /// <summary> /// mqtt路由配置路径。 /// </summary> public string mqttroutepath { get; set; } }
路由和路由描述:
public class serviceroute { /// <summary> /// 服务可用地址。 /// </summary> public ienumerable<string> address { get; set; } /// <summary> /// 服务描述符。 /// </summary> public serviceroutedescriptor serviceroutedescriptor { get; set; } #region equality members /// <summary>determines whether the specified object is equal to the current object.</summary> /// <returns>true if the specified object is equal to the current object; otherwise, false.</returns> /// <param name="obj">the object to compare with the current object. </param> public override bool equals(object obj) { var model = obj as serviceroute; if (model == null) return false; if (obj.gettype() != gettype()) return false; if (model.serviceroutedescriptor != serviceroutedescriptor) return false; return model.address.count() == address.count() && model.address.all(addressmodel => address.contains(addressmodel)); } /// <summary>serves as the default hash function. </summary> /// <returns>a hash code for the current object.</returns> public override int gethashcode() { return tostring().gethashcode(); } public static bool operator ==(serviceroute model1, serviceroute model2) { return equals(model1, model2); } public static bool operator !=(serviceroute model1, serviceroute model2) { return !equals(model1, model2); } #endregion equality members }
/// <summary> /// 服务描述符。 /// </summary> [serializable] public class serviceroutedescriptor { /// <summary> /// 初始化一个新的服务描述符。 /// </summary> public serviceroutedescriptor() { metadatas = new dictionary<string, object>(stringcomparer.ordinalignorecase); } /// <summary> /// 服务id。 /// </summary> public string id { get; set; } /// <summary> /// 访问的令牌 /// </summary> public string token { get; set; } /// <summary> /// 路由 /// </summary> public string routepath { get; set; } /// <summary> /// 元数据。 /// </summary> public idictionary<string, object> metadatas { get; set; } /// <summary> /// 获取一个元数据。 /// </summary> /// <typeparam name="t">元数据类型。</typeparam> /// <param name="name">元数据名称。</param> /// <param name="def">如果指定名称的元数据不存在则返回这个参数。</param> /// <returns>元数据值。</returns> public t getmetadata<t>(string name, t def = default(t)) { if (!metadatas.containskey(name)) return def; return (t)metadatas[name]; } #region equality members /// <summary>determines whether the specified object is equal to the current object.</summary> /// <returns>true if the specified object is equal to the current object; otherwise, false.</returns> /// <param name="obj">the object to compare with the current object. </param> public override bool equals(object obj) { var model = obj as serviceroutedescriptor; if (model == null) return false; if (obj.gettype() != gettype()) return false; if (model.id != id) return false; return model.metadatas.count == metadatas.count && model.metadatas.all(metadata => { object value; if (!metadatas.trygetvalue(metadata.key, out value)) return false; if (metadata.value == null && value == null) return true; if (metadata.value == null || value == null) return false; return metadata.value.equals(value); }); } /// <summary>serves as the default hash function. </summary> /// <returns>a hash code for the current object.</returns> public override int gethashcode() { return tostring().gethashcode(); } public static bool operator ==(serviceroutedescriptor model1, serviceroutedescriptor model2) { return equals(model1, model2); } public static bool operator !=(serviceroutedescriptor model1, serviceroutedescriptor model2) { return !equals(model1, model2); } #endregion equality members }
watcher监听器:
子节点监听器:
internal class childrenmonitorwatcher : watcher { private readonly task<zookeeper> _zookeepercall; private readonly string _path; private readonly action<string[], string[]> _action; private string[] _currentdata = new string[0]; public childrenmonitorwatcher(task<zookeeper> zookeepercall, string path, action<string[], string[]> action) { _zookeepercall = zookeepercall; _path = path; _action = action; } public childrenmonitorwatcher setcurrentdata(string[] currentdata) { _currentdata = currentdata ?? new string[0]; return this; } #region overrides of watcherbase public override async task process(watchedevent watchedevent) { if (watchedevent.getstate() != event.keeperstate.syncconnected || watchedevent.getpath() != _path) return; var zookeeper = await _zookeepercall; //func<childrenmonitorwatcher> getwatcher = () => new childrenmonitorwatcher(_zookeepercall, path, _action); task<childrenmonitorwatcher> getwatcher = task.run(() => {return new childrenmonitorwatcher(_zookeepercall, _path, _action); }); switch (watchedevent.get_type()) { //创建之后开始监视下面的子节点情况。 case event.eventtype.nodecreated: await zookeeper.getchildrenasync(_path, await getwatcher); break; //子节点修改则继续监控子节点信息并通知客户端数据变更。 case event.eventtype.nodechildrenchanged: try { var watcher = await getwatcher; var result = await zookeeper.getchildrenasync(_path, watcher); var childrens = result.children.toarray(); _action(_currentdata, childrens); watcher.setcurrentdata(childrens); } catch (keeperexception.nonodeexception) { _action(_currentdata, new string[0]); } break; //删除之后开始监控自身节点,并通知客户端数据被清空。 case event.eventtype.nodedeleted: { var watcher = await getwatcher; await zookeeper.existsasync(_path, watcher); _action(_currentdata, new string[0]); watcher.setcurrentdata(new string[0]); } break; } } #endregion overrides of watcherbase }
当前节点监听器:
internal class nodemonitorwatcher : watcher { private readonly task<zookeeper> _zookeepercall; private readonly string _path; private readonly action<byte[], byte[]> _action; private byte[] _currentdata; public nodemonitorwatcher(task<zookeeper> zookeepercall, string path, action<byte[], byte[]> action) { _zookeepercall = zookeepercall; _path = path; _action = action; } public nodemonitorwatcher setcurrentdata(byte[] currentdata) { _currentdata = currentdata; return this; } #region overrides of watcherbase public override async task process(watchedevent watchedevent) { switch (watchedevent.get_type()) { case event.eventtype.nodedatachanged: var zookeeper = await _zookeepercall; var watcher = new nodemonitorwatcher(_zookeepercall, _path, _action); var data = await zookeeper.getdataasync(_path, watcher); var newdata = data.data; _action(_currentdata, newdata); watcher.setcurrentdata(newdata); break; } } #endregion overrides of watcherbase }
连接断开监听器:
internal class reconnectionwatcher : watcher { private readonly action _reconnection; public reconnectionwatcher(action reconnection) { _reconnection = reconnection; } #region overrides of watcher /// <summary>processes the specified event.</summary> /// <param name="watchedevent">the event.</param> /// <returns></returns> public override async task process(watchedevent watchedevent) { var state = watchedevent.getstate(); switch (state) { case event.keeperstate.expired: case event.keeperstate.disconnected: { _reconnection(); break; } } await task.completedtask; } #endregion overrides of watcher }
上一篇: C++快速开发样本工程的建立--简介
下一篇: Python基础——分支、循环
推荐阅读