欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

(九)分布式服务----Zookeeper注册中心

程序员文章站 2022-04-14 23:05:56
==>>点击查看本系列文章目录 首先看一下几种注册中心: 最老的就是Zookeeper了, 比较新的有Eureka,Consul 都可以做注册中心。可以自行搜索对比三者的优缺点。 Zookeeper 最开始就是hadoop大家族中的一员,用于做协调的框架,后来已经是apache的子项目了。 几年前大 ......

 

 

首先看一下几种注册中心:

最老的就是zookeeper了, 比较新的有eureka,consul 都可以做注册中心。可以自行搜索对比三者的优缺点。

zookeeper 最开始就是hadoop大家族中的一员,用于做协调的框架,后来已经是apache的子项目了。

几年前大数据很火的时候,只要学hadoop必学zookeeper,当然还有其他成员。

大数据简单说就是分布式,比如分布式文件存储hdfs,分布式数据库hbase,分布式协调zookeeper,还有kafka,flume等等都是hadoop大家族。

zookeeper,现在更多被用来做注册中心,比如阿里的开源soa框架dubbo就经常搭配zookeeper做注册中心。

eureka:java的微服务框架spring cloud中内部已经集成了eureka注册中心。

我选择zookeeper,不是因为他比另外两个强,而是因为我几年前就已经学习过一些zookeeper的原理,上手更容易。网络上学习书籍、资料、视频教程也特别多,学习资料完善。

 

注册中心的基本功能:

1. 注册服务,有点类似dns,所有的服务注册到注册中心,包含服务的地址等信息。

2. 服务订阅,客户端请求服务,注册中心就要把那些能用的服务器地址告诉客户端,服务端有变动时,注册中心也能及时通知到客户端。

3. 性能好且高可用,注册中心自身也是一个集群,如果只有一个注册中心机器的话那岂不是把注册中心累死啊,而且他一旦坏了以后,那客户端都找不到服务器了。所有注册中心就有很多台,其中只有一个老大(leader),老大用来写,小弟用来读。就是说老大来决定一台服务器能不能注册进来,小弟负责帮助客户端查找服务器。因为注册服务的次数是很少的,通常有新服务器加入才需要注册,但是客户端订阅那就很多了,所以注册中心只有一个leader。leader万一坏掉的话,会从小弟中选举出一个来当老大接替工作。

 

上面提到说zookeeper集群,就是说有很多台机器做zookeeper机器,但是这些机器里存储的东西基本上都是一样的,就是说客户端不管连到哪个zookeeper 都是一样的,能做服务订阅。

每一个zookeeper 中都有很多节点(znode)。

接下来说的zookeeper节点和集群完全不是一回事。 有些人喜欢吧集群中的每一台zookeeper机器称为一个节点,但是这个节点(zookeeper机器)和我说的节点(znode)完全不是一回事。

如下图:

(九)分布式服务----Zookeeper注册中心

 

 本例的图中可以看到,一共有5台机器,每台机器都有5个znode,znode下面的子节点就更多了。

先看5台机器:

一台leader,老大,上文已经介绍,服务都从这些注册写入。

两台follower,小弟,平时用于服务订阅,老大挂掉以后,follower内部就会自行选出老大。

两台observer,观察者,就是属于无业游民,只能看,没有选老大的资格,不能参与竞选也不能投票,唯一的功能就是服务订阅。

  observer模式需要手动开启,为什么会出现observer呢,是因为机器太多的话,每个机器都有选举权的话特别影响性能。全中国14亿人口,每个人都参与国家竞选的话,效率极低。所以呢,选举的工作就交给follower完成就行了,只需要确保一直都有leader接班人就好。

 

再看看zookeeper有什么基本功能:

基本功能很简单,组合以后却可以完成各种复杂工作。

1. 可以创建:临时节点(断开连接时便删除节点) 和 持久化节点(必须手动删除节点)。

2. 可以创建:无序节点 和 有序节点。

3. 节点上可以添加watcher监听功能,监听该节点的增删改,然后触发自定义的事件。

 

看看这些功能怎么用:

1. 节点: 每次注册一个服务就创建一个节点,节点的名称(key)就是服务的名称,服务的详细信息存储在节点value中,客户端通过key找到对应的节点,再找打节点中的value。

2. 临时节点:服务端注册一个服务时创建一个临时节点,服务断开时,临时节点自动销毁,自动完成服务注销。

3. watcher监听: 客户端在注册中心订阅了一个服务的时候,同时在这个服务所在的节点上加一个监听事件,每当服务节点信息有变化的时候,注册中心会自动回调通知客户端。

4. 有序临时节点:分布式锁或者分布式队列(这里与服务注册无关),客户端1想要操作一条数据的时候,在a节点下创建一个有序临时节点,自动分配编号001;客户端1也要操作该数据的时候,在a节点下也创建一个有序临时节点,自动分配编号002。只有编号最小的子节点才会被执行,因此001节点会被执行,客户端1执行完毕后,自动删除001节点,此时002编号为最小子节点。即锁的概念,不能同时操作同一数据;也可以做队列,按照先后顺序依次执行。

5. 有序临时节点+watcher监听: 上面第4条中说到每次执行编号最小的节点,因此需要有一个程序,每次都需要遍历全部节点,然后找出最小的节点,假如是002节点,这时客户端2开始执行。但是添加监听机制以后就不一样了,002监听001,003监听比他小一号的002,这样001销毁的同时通知002开始执行,002销毁的时候通知003开始执行,不需要遍历最小节点,也能有序依次执行。

6. 临时节点+watcher监听: 集群master选举以及高可用。比如hadoop集群,也有一个resourcemanager资源管理器,负责调度其它节点机器,相当于hadoop集群的leader节点。这个leader就可以交由zookeeper管理,所有的hadoop机器同时在zookeeper中创建一个同名的临时节点,由于是同名互斥的节点,因此只有一个节点能被创建,成功创建这个节点的hadoop机器就是leader。同时添加watcher监听,这个leader只要断开连接,临时节点自动销毁,触发监听,其它hadoop开始新一轮的master选举。这也是zookeeper最初在hadoop家族中的重要使命。

7....... 还要很多地方都能用zookeeper,简直无所不能,而且自身也是高可用,高性能,牛x

 

zookeeper本身的操作还是很简单的,无非就是节点的增删改查,可以选择要创建节点的类型,还有就是在节点上添加watcher监听器。就这些。

 

文件结构:

(九)分布式服务----Zookeeper注册中心

 

上代码:

zookeeper客户端管理类:

public class zookeeperclientprovider
    {
        private configinfo _config;
        private readonly ilogger<zookeeperclientprovider> _logger;
        private readonly dictionary<string, zookeeper> _zookeeperclients = new dictionary<string, zookeeper>();

        public zookeeperclientprovider(configinfo config, ilogger<zookeeperclientprovider> logger)
        {
            _config = config;
            _logger = logger;
        }

        public async task<zookeeper> getzookeeper()
        {
            return await createzookeeper(_config.addresses.firstordefault());
        }
        public async task<zookeeper> createzookeeper(string address)
        {
            if (!_zookeeperclients.trygetvalue(address, out zookeeper result))
            {
                await task.run(() =>
                {
                    result = new zookeeper(address, (int)_config.sessiontimeout.totalmilliseconds,
                        new reconnectionwatcher(
                            async () =>
                            {
                                if (_zookeeperclients.remove(address, out zookeeper value))
                                {
                                    await value.closeasync();
                                }
                                await createzookeeper(address);
                            }));
                    _zookeeperclients.tryadd(address, result);
                });
            }
            return result;
        }

        public async task<ienumerable<zookeeper>> getzookeepers()
        {
            var result = new list<zookeeper>();
            foreach (var address in _config.addresses)
            {
                result.add(await createzookeeper(address));
            }
            return result;
        }
    }

zookeeper服务注册类:

/// <summary>
    /// 一个抽象的服务路由发现者。
    /// </summary>
    public interface iserviceroutemanager
    {

        /// <summary>
        /// 服务路由被创建。
        /// </summary>
        event eventhandler<servicerouteeventargs> created;

        /// <summary>
        /// 服务路由被删除。
        /// </summary>
        event eventhandler<servicerouteeventargs> removed;

        /// <summary>
        /// 服务路由被修改。
        /// </summary>
        event eventhandler<serviceroutechangedeventargs> changed;

        /// <summary>
        /// 获取所有可用的服务路由信息。
        /// </summary>
        /// <returns>服务路由集合。</returns>
        task<ienumerable<serviceroute>> getroutesasync();

        /// <summary>
        /// 设置服务路由。
        /// </summary>
        /// <param name="routes">服务路由集合。</param>
        /// <returns>一个任务。</returns>
        task setroutesasync(ienumerable<serviceroute> routes);

        /// <summary>
        /// 移除地址列表
        /// </summary>
        /// <param name="routes">地址列表。</param>
        /// <returns>一个任务。</returns>
        task remveaddressasync(ienumerable<string> address);
        /// <summary>
        /// 清空所有的服务路由。
        /// </summary>
        /// <returns>一个任务。</returns>
        task clearasync();
    }

    /// <summary>
    /// 服务路由事件参数。
    /// </summary>
    public class servicerouteeventargs
    {
        public servicerouteeventargs(serviceroute route)
        {
            route = route;
        }

        /// <summary>
        /// 服务路由信息。
        /// </summary>
        public serviceroute route { get; private set; }
    }

    /// <summary>
    /// 服务路由变更事件参数。
    /// </summary>
    public class serviceroutechangedeventargs : servicerouteeventargs
    {
        public serviceroutechangedeventargs(serviceroute route, serviceroute oldroute) : base(route)
        {
            oldroute = oldroute;
        }

        /// <summary>
        /// 旧的服务路由信息。
        /// </summary>
        public serviceroute oldroute { get; set; }
    }
public class zookeeperserviceroutemanager : iserviceroutemanager, idisposable
    {
        private readonly configinfo _configinfo;
        private readonly iserializer<byte[]> _serializer;
        private readonly ilogger<zookeeperserviceroutemanager> _logger;
        private serviceroute[] _routes;
        private readonly zookeeperclientprovider _zookeeperclientprovider;

        public zookeeperserviceroutemanager(configinfo configinfo, iserializer<byte[]> serializer,
            iserializer<string> stringserializer,
            ilogger<zookeeperserviceroutemanager> logger,
            zookeeperclientprovider zookeeperclientprovider)
        {
            _configinfo = configinfo;
            _serializer = serializer;
            _logger = logger;
            _zookeeperclientprovider = zookeeperclientprovider;
            enterroutes().wait();
        }

        private eventhandler<servicerouteeventargs> _created;
        private eventhandler<servicerouteeventargs> _removed;
        private eventhandler<serviceroutechangedeventargs> _changed;

        /// <summary>
        /// 服务路由被创建。
        /// </summary>
        public event eventhandler<servicerouteeventargs> created
        {
            add { _created += value; }
            remove { _created -= value; }
        }

        /// <summary>
        /// 服务路由被删除。
        /// </summary>
        public event eventhandler<servicerouteeventargs> removed
        {
            add { _removed += value; }
            remove { _removed -= value; }
        }

        /// <summary>
        /// 服务路由被修改。
        /// </summary>
        public event eventhandler<serviceroutechangedeventargs> changed
        {
            add { _changed += value; }
            remove { _changed -= value; }
        }



        protected void oncreated(params servicerouteeventargs[] args)
        {
            if (_created == null)
                return;

            foreach (var arg in args)
                _created(this, arg);
        }

        protected void onchanged(params serviceroutechangedeventargs[] args)
        {
            if (_changed == null)
                return;

            foreach (var arg in args)
                _changed(this, arg);
        }

        protected void onremoved(params servicerouteeventargs[] args)
        {
            if (_removed == null)
                return;

            foreach (var arg in args)
                _removed(this, arg);
        }


        /// <summary>
        /// 获取所有可用的服务路由信息。
        /// </summary>
        /// <returns>服务路由集合。</returns>
        public async task<ienumerable<serviceroute>> getroutesasync()
        {
            await enterroutes();
            return _routes;
        }

        /// <summary>
        /// 清空所有的服务路由。
        /// </summary>
        /// <returns>一个任务。</returns>
        public async task clearasync()
        {
            if (_logger.isenabled(loglevel.information))
                _logger.loginformation("准备清空所有路由配置。");
            var zookeepers = await _zookeeperclientprovider.getzookeepers();
            foreach (var zookeeper in zookeepers)
            {
                var path = _configinfo.routepath;
                var childrens = path.split(new[] { '/' }, stringsplitoptions.removeemptyentries);

                var index = 0;
                while (childrens.count() > 1)
                {
                    var nodepath = "/" + string.join("/", childrens);

                    if (await zookeeper.existsasync(nodepath) != null)
                    {
                        var result = await zookeeper.getchildrenasync(nodepath);
                        if (result?.children != null)
                        {
                            foreach (var child in result.children)
                            {
                                var childpath = $"{nodepath}/{child}";
                                if (_logger.isenabled(loglevel.debug))
                                    _logger.logdebug($"准备删除:{childpath}。");
                                await zookeeper.deleteasync(childpath);
                            }
                        }
                        if (_logger.isenabled(loglevel.debug))
                            _logger.logdebug($"准备删除:{nodepath}。");
                        await zookeeper.deleteasync(nodepath);
                    }
                    index++;
                    childrens = childrens.take(childrens.length - index).toarray();
                }
                if (_logger.isenabled(loglevel.information))
                    _logger.loginformation("路由配置清空完成。");
            }
        }

        /// <summary>
        /// 设置服务路由。
        /// </summary>
        /// <param name="routes">服务路由集合。</param>
        /// <returns>一个任务。</returns>
        public async task setroutesasync(ienumerable<serviceroute> routes)
        {
            var hostaddr = netutils.gethostaddress();
            var serviceroutes = await getroutes(routes.select(p => p.serviceroutedescriptor.id));
            if (serviceroutes.count() > 0)
            {
                foreach (var route in routes)
                {
                    var serviceroute = serviceroutes.where(p => p.serviceroutedescriptor.id == route.serviceroutedescriptor.id).firstordefault();
                    if (serviceroute != null)
                    {
                        var addresses = serviceroute.address.concat(
                          route.address.except(serviceroute.address)).tolist();

                        foreach (var address in route.address)
                        {
                            addresses.remove(addresses.where(p => p.tostring() == address.tostring()).firstordefault());
                            addresses.add(address);
                        }
                        route.address = addresses;
                    }
                }
            }
            await removeexceptroutesasync(routes, hostaddr);

            if (_logger.isenabled(loglevel.information))
                _logger.loginformation("准备添加服务路由。");
            var zookeepers = await _zookeeperclientprovider.getzookeepers();
            foreach (var zookeeper in zookeepers)
            {
                await createsubdirectory(zookeeper, _configinfo.routepath);

                var path = _configinfo.routepath;
                if (!path.endswith("/"))
                    path += "/";

                routes = routes.toarray();

                foreach (var serviceroute in routes)
                {
                    var nodepath = $"{path}{serviceroute.serviceroutedescriptor.id}";
                    var nodedata = _serializer.serialize(serviceroute);
                    if (await zookeeper.existsasync(nodepath) == null)
                    {
                        if (_logger.isenabled(loglevel.debug))
                            _logger.logdebug($"节点:{nodepath}不存在将进行创建。");

                        await zookeeper.createasync(nodepath, nodedata, zoodefs.ids.open_acl_unsafe, createmode.persistent);
                    }
                    else
                    {
                        if (_logger.isenabled(loglevel.debug))
                            _logger.logdebug($"将更新节点:{nodepath}的数据。");

                        var onlinedata = (await zookeeper.getdataasync(nodepath)).data;
                        if (!dataequals(nodedata, onlinedata))
                            await zookeeper.setdataasync(nodepath, nodedata);
                    }
                }
                if (_logger.isenabled(loglevel.information))
                    _logger.loginformation("服务路由添加成功。");
            }
        }

        public async task remveaddressasync(ienumerable<string> address)
        {
            var routes = await getroutesasync();
            foreach (var route in routes)
            {
                route.address = route.address.except(address);
            }
            await setroutesasync(routes);
        }

        private async task removeexceptroutesasync(ienumerable<serviceroute> routes, string hostaddr)
        {
            var path = _configinfo.routepath;
            if (!path.endswith("/"))
                path += "/";
            routes = routes.toarray();
            var zookeepers = await _zookeeperclientprovider.getzookeepers();
            foreach (var zookeeper in zookeepers)
            {
                if (_routes != null)
                {
                    var oldrouteids = _routes.select(i => i.serviceroutedescriptor.id).toarray();
                    var newrouteids = routes.select(i => i.serviceroutedescriptor.id).toarray();
                    var deletedrouteids = oldrouteids.except(newrouteids).toarray();
                    foreach (var deletedrouteid in deletedrouteids)
                    {
                        var addresses = _routes.where(p => p.serviceroutedescriptor.id == deletedrouteid).select(p => p.address).firstordefault();
                        if (addresses.contains(hostaddr))
                        {
                            var nodepath = $"{path}{deletedrouteid}";
                            await zookeeper.deleteasync(nodepath);
                        }
                    }
                }
            }
        }

        private async task createsubdirectory(zookeeper zookeeper, string path)
        {
            if (await zookeeper.existsasync(path) != null)
                return;

            if (_logger.isenabled(loglevel.information))
                _logger.loginformation($"节点{path}不存在,将进行创建。");

            var childrens = path.split(new[] { '/' }, stringsplitoptions.removeemptyentries);
            var nodepath = "/";

            foreach (var children in childrens)
            {
                nodepath += children;
                if (await zookeeper.existsasync(nodepath) == null)
                {
                    await zookeeper.createasync(nodepath, null, zoodefs.ids.open_acl_unsafe, createmode.persistent);
                }
                nodepath += "/";
            }
        }

        private async task<serviceroute> getroute(byte[] data)
        {
            if (_logger.isenabled(loglevel.debug))
                _logger.logdebug($"准备转换服务路由,配置内容:{encoding.utf8.getstring(data)}。");

            if (data == null)
                return null;

            return await task.run(() =>
            {
                return _serializer.deserialize<serviceroute>(data);
            });
        }

        private async task<serviceroute> getroute(string path)
        {
            serviceroute result = null;
            var zookeeper = await getzookeeper();
            var watcher = new nodemonitorwatcher(getzookeeper(), path,
                 async (olddata, newdata) => await nodechange(olddata, newdata));
            if (await zookeeper.existsasync(path) != null)
            {
                var data = (await zookeeper.getdataasync(path, watcher)).data;
                watcher.setcurrentdata(data);
                result = await getroute(data);
            }
            return result;
        }

        private async task<serviceroute[]> getroutes(ienumerable<string> childrens)
        {
            var rootpath = _configinfo.routepath;
            if (!rootpath.endswith("/"))
                rootpath += "/";

            childrens = childrens.toarray();
            var routes = new list<serviceroute>(childrens.count());

            foreach (var children in childrens)
            {
                if (_logger.isenabled(loglevel.debug))
                    _logger.logdebug($"准备从节点:{children}中获取路由信息。");

                var nodepath = $"{rootpath}{children}";
                var route = await getroute(nodepath);
                if (route != null)
                    routes.add(route);
            }

            return routes.toarray();
        }

        private async task enterroutes()
        {
            if (_routes != null)
                return;
            var zookeeper = await getzookeeper();
            var watcher = new childrenmonitorwatcher(getzookeeper(), _configinfo.routepath,
             async (oldchildrens, newchildrens) => await childrenchange(oldchildrens, newchildrens));
            if (await zookeeper.existsasync(_configinfo.routepath, watcher) != null)
            {
                var result = await zookeeper.getchildrenasync(_configinfo.routepath, watcher);
                var childrens = result.children.toarray();
                watcher.setcurrentdata(childrens);
                _routes = await getroutes(childrens);
            }
            else
            {
                if (_logger.isenabled(loglevel.warning))
                    _logger.logwarning($"无法获取路由信息,因为节点:{_configinfo.routepath},不存在。");
                _routes = new serviceroute[0];
            }
        }

        private static bool dataequals(ireadonlylist<byte> data1, ireadonlylist<byte> data2)
        {
            if (data1.count != data2.count)
                return false;
            for (var i = 0; i < data1.count; i++)
            {
                var b1 = data1[i];
                var b2 = data2[i];
                if (b1 != b2)
                    return false;
            }
            return true;
        }

        public async task nodechange(byte[] olddata, byte[] newdata)
        {
            if (dataequals(olddata, newdata))
                return;

            var newroute = await getroute(newdata);
            //得到旧的路由。
            var oldroute = _routes.firstordefault(i => i.serviceroutedescriptor.id == newroute.serviceroutedescriptor.id);

            lock (_routes)
            {
                //删除旧路由,并添加上新的路由。
                _routes =
                    _routes
                        .where(i => i.serviceroutedescriptor.id != newroute.serviceroutedescriptor.id)
                        .concat(new[] { newroute }).toarray();
            }

            //触发路由变更事件。
            onchanged(new serviceroutechangedeventargs(newroute, oldroute));
        }

        public async task childrenchange(string[] oldchildrens, string[] newchildrens)
        {
            if (_logger.isenabled(loglevel.debug))
                _logger.logdebug($"最新的节点信息:{string.join(",", newchildrens)}");

            if (_logger.isenabled(loglevel.debug))
                _logger.logdebug($"旧的节点信息:{string.join(",", oldchildrens)}");

            //计算出已被删除的节点。
            var deletedchildrens = oldchildrens.except(newchildrens).toarray();
            //计算出新增的节点。
            var createdchildrens = newchildrens.except(oldchildrens).toarray();

            if (_logger.isenabled(loglevel.debug))
                _logger.logdebug($"需要被删除的路由节点:{string.join(",", deletedchildrens)}");
            if (_logger.isenabled(loglevel.debug))
                _logger.logdebug($"需要被添加的路由节点:{string.join(",", createdchildrens)}");

            //获取新增的路由信息。
            var newroutes = (await getroutes(createdchildrens)).toarray();

            var routes = _routes.toarray();
            lock (_routes)
            {
                _routes = _routes
                    //删除无效的节点路由。
                    .where(i => !deletedchildrens.contains(i.serviceroutedescriptor.id))
                    //连接上新的路由。
                    .concat(newroutes)
                    .toarray();
            }
            //需要删除的路由集合。
            var deletedroutes = routes.where(i => deletedchildrens.contains(i.serviceroutedescriptor.id)).toarray();
            //触发删除事件。
            onremoved(deletedroutes.select(route => new servicerouteeventargs(route)).toarray());

            //触发路由被创建事件。
            oncreated(newroutes.select(route => new servicerouteeventargs(route)).toarray());

            if (_logger.isenabled(loglevel.information))
                _logger.loginformation("路由数据更新成功。");
        }


        /// <summary>performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
        public void dispose()
        {
        }

        private async task<zookeeper> getzookeeper()
        {
            return await _zookeeperclientprovider.getzookeeper();
        }

    }

zookeeper连接配置类:

public class configinfo
    {
        /// <summary>
        /// 初始化会话超时为20秒的zookeeper配置信息。
        /// </summary>
        /// <param name="connectionstring">连接字符串。</param>
        /// <param name="routepath">路由配置路径。</param>
        /// <param name="subscriberpath">订阅者配置路径</param>
        /// <param name="commandpath">服务命令配置路径</param>
        /// <param name="cachepath">缓存中心配置路径</param>
        /// <param name="mqttroutepath">mqtt路由配置路径</param>
        /// <param name="chroot">根节点。</param>
        public configinfo(string connectionstring, string routepath = "/services/serviceroutes",
            string subscriberpath = "/services/servicesubscribers",
            string commandpath = "/services/servicecommands",
            string cachepath = "/services/servicecaches",
            string mqttroutepath = "/services/mqttserviceroutes",
            string chroot = null,
            bool reloadonchange = false, bool enablechildrenmonitor = false) : this(connectionstring,
                timespan.fromseconds(20),
                routepath,
                subscriberpath,
                commandpath,
                cachepath,
                mqttroutepath,
                chroot,
                reloadonchange, enablechildrenmonitor)
        {
        }

        /// <summary>
        /// 初始化zookeeper配置信息。
        /// </summary>
        /// <param name="connectionstring">连接字符串。</param>
        /// <param name="routepath">路由配置路径。</param>
        /// <param name="commandpath">服务命令配置路径</param>
        /// <param name="subscriberpath">订阅者配置路径</param>
        /// <param name="sessiontimeout">会话超时时间。</param>
        /// <param name="cachepath">缓存中心配置路径</param>
        /// <param name="mqttroutepath">mqtt路由配置路径</param>
        /// <param name="chroot">根节点。</param>
        public configinfo(string connectionstring, timespan sessiontimeout, string routepath = "/services/serviceroutes",
            string subscriberpath = "/services/servicesubscribers",
            string commandpath = "/services/servicecommands",
            string cachepath = "/services/servicecaches",
            string mqttroutepath = "/services/mqttserviceroutes",
            string chroot = null,
            bool reloadonchange = false, bool enablechildrenmonitor = false)
        {
            cachepath = cachepath;
            reloadonchange = reloadonchange;
            chroot = chroot;
            commandpath = commandpath;
            subscriberpath = subscriberpath;
            connectionstring = connectionstring;
            routepath = routepath;
            sessiontimeout = sessiontimeout;
            mqttroutepath = mqttroutepath;
            enablechildrenmonitor = enablechildrenmonitor;
            addresses = connectionstring?.split(",");
        }

        public bool enablechildrenmonitor { get; set; }

        public bool reloadonchange { get; set; }

        /// <summary>
        /// 连接字符串。
        /// </summary>
        public string connectionstring { get; set; }

        /// <summary>
        /// 命令配置路径
        /// </summary>
        public string commandpath { get; set; }

        /// <summary>
        /// 路由配置路径。
        /// </summary>
        public string routepath { get; set; }

        /// <summary>
        /// 订阅者配置路径
        /// </summary>
        public string subscriberpath { get; set; }

        /// <summary>
        /// 会话超时时间。
        /// </summary>
        public timespan sessiontimeout { get; set; }

        /// <summary>
        /// 根节点。
        /// </summary>
        public string chroot { get; set; }


        public ienumerable<string> addresses { get; set; }

        /// <summary>
        /// 缓存中心配置中心
        /// </summary>
        public string cachepath { get; set; }


        /// <summary>
        /// mqtt路由配置路径。
        /// </summary>
        public string mqttroutepath { get; set; }
    }

路由和路由描述:

public class serviceroute
    {
        /// <summary>
        /// 服务可用地址。
        /// </summary>
        public ienumerable<string> address { get; set; }
        /// <summary>
        /// 服务描述符。
        /// </summary>
        public serviceroutedescriptor serviceroutedescriptor { get; set; }

        #region equality members

        /// <summary>determines whether the specified object is equal to the current object.</summary>
        /// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
        /// <param name="obj">the object to compare with the current object. </param>
        public override bool equals(object obj)
        {
            var model = obj as serviceroute;
            if (model == null)
                return false;

            if (obj.gettype() != gettype())
                return false;

            if (model.serviceroutedescriptor != serviceroutedescriptor)
                return false;

            return model.address.count() == address.count() && model.address.all(addressmodel => address.contains(addressmodel));
        }

        /// <summary>serves as the default hash function. </summary>
        /// <returns>a hash code for the current object.</returns>
        public override int gethashcode()
        {
            return tostring().gethashcode();
        }

        public static bool operator ==(serviceroute model1, serviceroute model2)
        {
            return equals(model1, model2);
        }

        public static bool operator !=(serviceroute model1, serviceroute model2)
        {
            return !equals(model1, model2);
        }

        #endregion equality members
    }
/// <summary>
    /// 服务描述符。
    /// </summary>
    [serializable]
    public class serviceroutedescriptor
    {
        /// <summary>
        /// 初始化一个新的服务描述符。
        /// </summary>
        public serviceroutedescriptor()
        {
            metadatas = new dictionary<string, object>(stringcomparer.ordinalignorecase);
        }

        /// <summary>
        /// 服务id。
        /// </summary>
        public string id { get; set; }

        /// <summary>
        /// 访问的令牌
        /// </summary>
        public string token { get; set; }

        /// <summary>
        /// 路由
        /// </summary>
        public string routepath { get; set; }

        /// <summary>
        /// 元数据。
        /// </summary> 
        public idictionary<string, object> metadatas { get; set; }

        /// <summary>
        /// 获取一个元数据。
        /// </summary>
        /// <typeparam name="t">元数据类型。</typeparam>
        /// <param name="name">元数据名称。</param>
        /// <param name="def">如果指定名称的元数据不存在则返回这个参数。</param>
        /// <returns>元数据值。</returns>
        public t getmetadata<t>(string name, t def = default(t))
        {
            if (!metadatas.containskey(name))
                return def;

            return (t)metadatas[name];
        }

        #region equality members

        /// <summary>determines whether the specified object is equal to the current object.</summary>
        /// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
        /// <param name="obj">the object to compare with the current object. </param>
        public override bool equals(object obj)
        {
            var model = obj as serviceroutedescriptor;
            if (model == null)
                return false;

            if (obj.gettype() != gettype())
                return false;

            if (model.id != id)
                return false;

            return model.metadatas.count == metadatas.count && model.metadatas.all(metadata =>
            {
                object value;
                if (!metadatas.trygetvalue(metadata.key, out value))
                    return false;

                if (metadata.value == null && value == null)
                    return true;
                if (metadata.value == null || value == null)
                    return false;

                return metadata.value.equals(value);
            });
        }

        /// <summary>serves as the default hash function. </summary>
        /// <returns>a hash code for the current object.</returns>
        public override int gethashcode()
        {
            return tostring().gethashcode();
        }

        public static bool operator ==(serviceroutedescriptor model1, serviceroutedescriptor model2)
        {
            return equals(model1, model2);
        }

        public static bool operator !=(serviceroutedescriptor model1, serviceroutedescriptor model2)
        {
            return !equals(model1, model2);
        }

        #endregion equality members
    }

watcher监听器:

子节点监听器:

internal class childrenmonitorwatcher : watcher
    {
        private readonly task<zookeeper> _zookeepercall;
        private readonly string _path;
        private readonly action<string[], string[]> _action;
        private string[] _currentdata = new string[0];

        public childrenmonitorwatcher(task<zookeeper> zookeepercall, string path, action<string[], string[]> action)
        {
            _zookeepercall = zookeepercall;
            _path = path;
            _action = action;
        }

        public childrenmonitorwatcher setcurrentdata(string[] currentdata)
        {
            _currentdata = currentdata ?? new string[0];

            return this;
        }

        #region overrides of watcherbase

        public override async task process(watchedevent watchedevent)
        {
            if (watchedevent.getstate() != event.keeperstate.syncconnected || watchedevent.getpath() != _path)
                return;
            var zookeeper = await _zookeepercall;
            //func<childrenmonitorwatcher> getwatcher = () => new childrenmonitorwatcher(_zookeepercall, path, _action);
            task<childrenmonitorwatcher> getwatcher =  task.run(() => {return new childrenmonitorwatcher(_zookeepercall, _path, _action); });
            switch (watchedevent.get_type())
            {
                //创建之后开始监视下面的子节点情况。
                case event.eventtype.nodecreated:
                    await zookeeper.getchildrenasync(_path, await getwatcher);
                    break;

                //子节点修改则继续监控子节点信息并通知客户端数据变更。
                case event.eventtype.nodechildrenchanged:
                    try
                    {
                        var watcher = await getwatcher;
                        var result = await zookeeper.getchildrenasync(_path, watcher);
                        var childrens = result.children.toarray();
                        _action(_currentdata, childrens);
                        watcher.setcurrentdata(childrens);
                    }
                    catch (keeperexception.nonodeexception)
                    {
                        _action(_currentdata, new string[0]);
                    }
                    break;

                //删除之后开始监控自身节点,并通知客户端数据被清空。
                case event.eventtype.nodedeleted:
                    {
                        var watcher = await getwatcher;
                        await zookeeper.existsasync(_path, watcher);
                        _action(_currentdata, new string[0]);
                        watcher.setcurrentdata(new string[0]);
                    }
                    break;
            }
        }
        #endregion overrides of watcherbase
    }

当前节点监听器:

internal class nodemonitorwatcher : watcher
    {
        private readonly task<zookeeper> _zookeepercall;
        private readonly string _path;
        private readonly action<byte[], byte[]> _action;
        private byte[] _currentdata;

        public nodemonitorwatcher(task<zookeeper> zookeepercall, string path, action<byte[], byte[]> action)
        {
            _zookeepercall = zookeepercall;
            _path = path;
            _action = action;
        }

        public nodemonitorwatcher setcurrentdata(byte[] currentdata)
        {
            _currentdata = currentdata;

            return this;
        }

        #region overrides of watcherbase

        public override async task process(watchedevent watchedevent)
        {
            switch (watchedevent.get_type())
            {
                case event.eventtype.nodedatachanged:
                    var zookeeper = await _zookeepercall;
                    var watcher = new nodemonitorwatcher(_zookeepercall, _path, _action);
                    var data = await zookeeper.getdataasync(_path, watcher);
                    var newdata = data.data;
                    _action(_currentdata, newdata);
                    watcher.setcurrentdata(newdata);
                    break;
            }
        }

        #endregion overrides of watcherbase
    }

连接断开监听器:

internal class reconnectionwatcher : watcher
    {
        private readonly action _reconnection;

        public reconnectionwatcher(action reconnection)
        {
            _reconnection = reconnection;
        }

        #region overrides of watcher

        /// <summary>processes the specified event.</summary>
        /// <param name="watchedevent">the event.</param>
        /// <returns></returns>
        public override async task process(watchedevent watchedevent)
        {
            var state = watchedevent.getstate();
            switch (state)
            {
                case event.keeperstate.expired:
                case event.keeperstate.disconnected:
                    {
                        _reconnection();
                        break;
                    }
            }
            await task.completedtask;
        }

        #endregion overrides of watcher
    }