
最老的就是zookeeper了, 比较新的有eureka,consul 都可以做注册中心。可以自行搜索对比三者的优缺点。

zookeeper 最开始就是hadoop大家族中的一员,用于做协调的框架,后来已经是apache的子项目了。




eureka:java的微服务框架spring cloud中内部已经集成了eureka注册中心。




1. 注册服务,有点类似dns,所有的服务注册到注册中心,包含服务的地址等信息。

2. 服务订阅,客户端请求服务,注册中心就要把那些能用的服务器地址告诉客户端,服务端有变动时,注册中心也能及时通知到客户端。

3. 性能好且高可用,注册中心自身也是一个集群,如果只有一个注册中心机器的话那岂不是把注册中心累死啊,而且他一旦坏了以后,那客户端都找不到服务器了。所有注册中心就有很多台,其中只有一个老大(leader),老大用来写,小弟用来读。就是说老大来决定一台服务器能不能注册进来,小弟负责帮助客户端查找服务器。因为注册服务的次数是很少的,通常有新服务器加入才需要注册,但是客户端订阅那就很多了,所以注册中心只有一个leader。leader万一坏掉的话,会从小弟中选举出一个来当老大接替工作。


上面提到说zookeeper集群,就是说有很多台机器做zookeeper机器,但是这些机器里存储的东西基本上都是一样的,就是说客户端不管连到哪个zookeeper 都是一样的,能做服务订阅。

每一个zookeeper 中都有很多节点(znode)。

接下来说的zookeeper节点和集群完全不是一回事。 有些人喜欢吧集群中的每一台zookeeper机器称为一个节点,但是这个节点(zookeeper机器)和我说的节点(znode)完全不是一回事。












1. 可以创建:临时节点(断开连接时便删除节点) 和 持久化节点(必须手动删除节点)。

2. 可以创建:无序节点 和 有序节点。

3. 节点上可以添加watcher监听功能,监听该节点的增删改,然后触发自定义的事件。



1. 节点: 每次注册一个服务就创建一个节点,节点的名称(key)就是服务的名称,服务的详细信息存储在节点value中,客户端通过key找到对应的节点,再找打节点中的value。

2. 临时节点:服务端注册一个服务时创建一个临时节点,服务断开时,临时节点自动销毁,自动完成服务注销。

3. watcher监听: 客户端在注册中心订阅了一个服务的时候,同时在这个服务所在的节点上加一个监听事件,每当服务节点信息有变化的时候,注册中心会自动回调通知客户端。

4. 有序临时节点:分布式锁或者分布式队列(这里与服务注册无关),客户端1想要操作一条数据的时候,在a节点下创建一个有序临时节点,自动分配编号001;客户端1也要操作该数据的时候,在a节点下也创建一个有序临时节点,自动分配编号002。只有编号最小的子节点才会被执行,因此001节点会被执行,客户端1执行完毕后,自动删除001节点,此时002编号为最小子节点。即锁的概念,不能同时操作同一数据;也可以做队列,按照先后顺序依次执行。

5. 有序临时节点+watcher监听: 上面第4条中说到每次执行编号最小的节点,因此需要有一个程序,每次都需要遍历全部节点,然后找出最小的节点,假如是002节点,这时客户端2开始执行。但是添加监听机制以后就不一样了,002监听001,003监听比他小一号的002,这样001销毁的同时通知002开始执行,002销毁的时候通知003开始执行,不需要遍历最小节点,也能有序依次执行。

6. 临时节点+watcher监听: 集群master选举以及高可用。比如hadoop集群,也有一个resourcemanager资源管理器,负责调度其它节点机器,相当于hadoop集群的leader节点。这个leader就可以交由zookeeper管理,所有的hadoop机器同时在zookeeper中创建一个同名的临时节点,由于是同名互斥的节点,因此只有一个节点能被创建,成功创建这个节点的hadoop机器就是leader。同时添加watcher监听,这个leader只要断开连接,临时节点自动销毁,触发监听,其它hadoop开始新一轮的master选举。这也是zookeeper最初在hadoop家族中的重要使命。

7……. 还要很多地方都能用zookeeper,简直无所不能,而且自身也是高可用,高性能,牛x








public class zookeeperclientprovider
        private configinfo _config;
        private readonly ilogger<zookeeperclientprovider> _logger;
        private readonly dictionary<string, zookeeper> _zookeeperclients = new dictionary<string, zookeeper>();

        public zookeeperclientprovider(configinfo config, ilogger<zookeeperclientprovider> logger)
            _config = config;
            _logger = logger;

        public async task<zookeeper> getzookeeper()
            return await createzookeeper(_config.addresses.firstordefault());
        public async task<zookeeper> createzookeeper(string address)
            if (!_zookeeperclients.trygetvalue(address, out zookeeper result))
                await task.run(() =>
                    result = new zookeeper(address, (int)_config.sessiontimeout.totalmilliseconds,
                        new reconnectionwatcher(
                            async () =>
                                if (_zookeeperclients.remove(address, out zookeeper value))
                                    await value.closeasync();
                                await createzookeeper(address);
                    _zookeeperclients.tryadd(address, result);
            return result;

        public async task<ienumerable<zookeeper>> getzookeepers()
            var result = new list<zookeeper>();
            foreach (var address in _config.addresses)
                result.add(await createzookeeper(address));
            return result;


/// <summary>
    /// 一个抽象的服务路由发现者。
    /// </summary>
    public interface iserviceroutemanager

        /// <summary>
        /// 服务路由被创建。
        /// </summary>
        event eventhandler<servicerouteeventargs> created;

        /// <summary>
        /// 服务路由被删除。
        /// </summary>
        event eventhandler<servicerouteeventargs> removed;

        /// <summary>
        /// 服务路由被修改。
        /// </summary>
        event eventhandler<serviceroutechangedeventargs> changed;

        /// <summary>
        /// 获取所有可用的服务路由信息。
        /// </summary>
        /// <returns>服务路由集合。</returns>
        task<ienumerable<serviceroute>> getroutesasync();

        /// <summary>
        /// 设置服务路由。
        /// </summary>
        /// <param name="routes">服务路由集合。</param>
        /// <returns>一个任务。</returns>
        task setroutesasync(ienumerable<serviceroute> routes);

        /// <summary>
        /// 移除地址列表
        /// </summary>
        /// <param name="routes">地址列表。</param>
        /// <returns>一个任务。</returns>
        task remveaddressasync(ienumerable<string> address);
        /// <summary>
        /// 清空所有的服务路由。
        /// </summary>
        /// <returns>一个任务。</returns>
        task clearasync();

    /// <summary>
    /// 服务路由事件参数。
    /// </summary>
    public class servicerouteeventargs
        public servicerouteeventargs(serviceroute route)
            route = route;

        /// <summary>
        /// 服务路由信息。
        /// </summary>
        public serviceroute route { get; private set; }

    /// <summary>
    /// 服务路由变更事件参数。
    /// </summary>
    public class serviceroutechangedeventargs : servicerouteeventargs
        public serviceroutechangedeventargs(serviceroute route, serviceroute oldroute) : base(route)
            oldroute = oldroute;

        /// <summary>
        /// 旧的服务路由信息。
        /// </summary>
        public serviceroute oldroute { get; set; }
public class zookeeperserviceroutemanager : iserviceroutemanager, idisposable
private readonly configinfo _configinfo;
private readonly iserializer<byte[]> _serializer;
private readonly ilogger<zookeeperserviceroutemanager> _logger;
private serviceroute[] _routes;
private readonly zookeeperclientprovider _zookeeperclientprovider;
public zookeeperserviceroutemanager(configinfo configinfo, iserializer<byte[]> serializer,
iserializer<string> stringserializer,
ilogger<zookeeperserviceroutemanager> logger,
zookeeperclientprovider zookeeperclientprovider)
_configinfo = configinfo;
_serializer = serializer;
_logger = logger;
_zookeeperclientprovider = zookeeperclientprovider;
private eventhandler<servicerouteeventargs> _created;
private eventhandler<servicerouteeventargs> _removed;
private eventhandler<serviceroutechangedeventargs> _changed;
/// <summary>
/// 服务路由被创建。
/// </summary>
public event eventhandler<servicerouteeventargs> created
add { _created += value; }
remove { _created -= value; }
/// <summary>
/// 服务路由被删除。
/// </summary>
public event eventhandler<servicerouteeventargs> removed
add { _removed += value; }
remove { _removed -= value; }
/// <summary>
/// 服务路由被修改。
/// </summary>
public event eventhandler<serviceroutechangedeventargs> changed
add { _changed += value; }
remove { _changed -= value; }
protected void oncreated(params servicerouteeventargs[] args)
if (_created == null)
foreach (var arg in args)
_created(this, arg);
protected void onchanged(params serviceroutechangedeventargs[] args)
if (_changed == null)
foreach (var arg in args)
_changed(this, arg);
protected void onremoved(params servicerouteeventargs[] args)
if (_removed == null)
foreach (var arg in args)
_removed(this, arg);
/// <summary>
/// 获取所有可用的服务路由信息。
/// </summary>
/// <returns>服务路由集合。</returns>
public async task<ienumerable<serviceroute>> getroutesasync()
await enterroutes();
return _routes;
/// <summary>
/// 清空所有的服务路由。
/// </summary>
/// <returns>一个任务。</returns>
public async task clearasync()
if (_logger.isenabled(loglevel.information))
var zookeepers = await _zookeeperclientprovider.getzookeepers();
foreach (var zookeeper in zookeepers)
var path = _configinfo.routepath;
var childrens = path.split(new[] { '/' }, stringsplitoptions.removeemptyentries);
var index = 0;
while (childrens.count() > 1)
var nodepath = "/" + string.join("/", childrens);
if (await zookeeper.existsasync(nodepath) != null)
var result = await zookeeper.getchildrenasync(nodepath);
if (result?.children != null)
foreach (var child in result.children)
var childpath = $"{nodepath}/{child}";
if (_logger.isenabled(loglevel.debug))
await zookeeper.deleteasync(childpath);
if (_logger.isenabled(loglevel.debug))
await zookeeper.deleteasync(nodepath);
childrens = childrens.take(childrens.length - index).toarray();
if (_logger.isenabled(loglevel.information))
/// <summary>
/// 设置服务路由。
/// </summary>
/// <param name="routes">服务路由集合。</param>
/// <returns>一个任务。</returns>
public async task setroutesasync(ienumerable<serviceroute> routes)
var hostaddr = netutils.gethostaddress();
var serviceroutes = await getroutes(routes.select(p => p.serviceroutedescriptor.id));
if (serviceroutes.count() > 0)
foreach (var route in routes)
var serviceroute = serviceroutes.where(p => p.serviceroutedescriptor.id == route.serviceroutedescriptor.id).firstordefault();
if (serviceroute != null)
var addresses = serviceroute.address.concat(
foreach (var address in route.address)
addresses.remove(addresses.where(p => p.tostring() == address.tostring()).firstordefault());
route.address = addresses;
await removeexceptroutesasync(routes, hostaddr);
if (_logger.isenabled(loglevel.information))
var zookeepers = await _zookeeperclientprovider.getzookeepers();
foreach (var zookeeper in zookeepers)
await createsubdirectory(zookeeper, _configinfo.routepath);
var path = _configinfo.routepath;
if (!path.endswith("/"))
path += "/";
routes = routes.toarray();
foreach (var serviceroute in routes)
var nodepath = $"{path}{serviceroute.serviceroutedescriptor.id}";
var nodedata = _serializer.serialize(serviceroute);
if (await zookeeper.existsasync(nodepath) == null)
if (_logger.isenabled(loglevel.debug))
await zookeeper.createasync(nodepath, nodedata, zoodefs.ids.open_acl_unsafe, createmode.persistent);
if (_logger.isenabled(loglevel.debug))
var onlinedata = (await zookeeper.getdataasync(nodepath)).data;
if (!dataequals(nodedata, onlinedata))
await zookeeper.setdataasync(nodepath, nodedata);
if (_logger.isenabled(loglevel.information))
public async task remveaddressasync(ienumerable<string> address)
var routes = await getroutesasync();
foreach (var route in routes)
route.address = route.address.except(address);
await setroutesasync(routes);
private async task removeexceptroutesasync(ienumerable<serviceroute> routes, string hostaddr)
var path = _configinfo.routepath;
if (!path.endswith("/"))
path += "/";
routes = routes.toarray();
var zookeepers = await _zookeeperclientprovider.getzookeepers();
foreach (var zookeeper in zookeepers)
if (_routes != null)
var oldrouteids = _routes.select(i => i.serviceroutedescriptor.id).toarray();
var newrouteids = routes.select(i => i.serviceroutedescriptor.id).toarray();
var deletedrouteids = oldrouteids.except(newrouteids).toarray();
foreach (var deletedrouteid in deletedrouteids)
var addresses = _routes.where(p => p.serviceroutedescriptor.id == deletedrouteid).select(p => p.address).firstordefault();
if (addresses.contains(hostaddr))
var nodepath = $"{path}{deletedrouteid}";
await zookeeper.deleteasync(nodepath);
private async task createsubdirectory(zookeeper zookeeper, string path)
if (await zookeeper.existsasync(path) != null)
if (_logger.isenabled(loglevel.information))
var childrens = path.split(new[] { '/' }, stringsplitoptions.removeemptyentries);
var nodepath = "/";
foreach (var children in childrens)
nodepath += children;
if (await zookeeper.existsasync(nodepath) == null)
await zookeeper.createasync(nodepath, null, zoodefs.ids.open_acl_unsafe, createmode.persistent);
nodepath += "/";
private async task<serviceroute> getroute(byte[] data)
if (_logger.isenabled(loglevel.debug))
if (data == null)
return null;
return await task.run(() =>
return _serializer.deserialize<serviceroute>(data);
private async task<serviceroute> getroute(string path)
serviceroute result = null;
var zookeeper = await getzookeeper();
var watcher = new nodemonitorwatcher(getzookeeper(), path,
async (olddata, newdata) => await nodechange(olddata, newdata));
if (await zookeeper.existsasync(path) != null)
var data = (await zookeeper.getdataasync(path, watcher)).data;
result = await getroute(data);
return result;
private async task<serviceroute[]> getroutes(ienumerable<string> childrens)
var rootpath = _configinfo.routepath;
if (!rootpath.endswith("/"))
rootpath += "/";
childrens = childrens.toarray();
var routes = new list<serviceroute>(childrens.count());
foreach (var children in childrens)
if (_logger.isenabled(loglevel.debug))
var nodepath = $"{rootpath}{children}";
var route = await getroute(nodepath);
if (route != null)
return routes.toarray();
private async task enterroutes()
if (_routes != null)
var zookeeper = await getzookeeper();
var watcher = new childrenmonitorwatcher(getzookeeper(), _configinfo.routepath,
async (oldchildrens, newchildrens) => await childrenchange(oldchildrens, newchildrens));
if (await zookeeper.existsasync(_configinfo.routepath, watcher) != null)
var result = await zookeeper.getchildrenasync(_configinfo.routepath, watcher);
var childrens = result.children.toarray();
_routes = await getroutes(childrens);
if (_logger.isenabled(loglevel.warning))
_routes = new serviceroute[0];
private static bool dataequals(ireadonlylist<byte> data1, ireadonlylist<byte> data2)
if (data1.count != data2.count)
return false;
for (var i = 0; i < data1.count; i++)
var b1 = data1[i];
var b2 = data2[i];
if (b1 != b2)
return false;
return true;
public async task nodechange(byte[] olddata, byte[] newdata)
if (dataequals(olddata, newdata))
var newroute = await getroute(newdata);
var oldroute = _routes.firstordefault(i => i.serviceroutedescriptor.id == newroute.serviceroutedescriptor.id);
lock (_routes)
_routes =
.where(i => i.serviceroutedescriptor.id != newroute.serviceroutedescriptor.id)
.concat(new[] { newroute }).toarray();
onchanged(new serviceroutechangedeventargs(newroute, oldroute));
public async task childrenchange(string[] oldchildrens, string[] newchildrens)
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"最新的节点信息:{string.join(",", newchildrens)}");
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"旧的节点信息:{string.join(",", oldchildrens)}");
var deletedchildrens = oldchildrens.except(newchildrens).toarray();
var createdchildrens = newchildrens.except(oldchildrens).toarray();
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"需要被删除的路由节点:{string.join(",", deletedchildrens)}");
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"需要被添加的路由节点:{string.join(",", createdchildrens)}");
var newroutes = (await getroutes(createdchildrens)).toarray();
var routes = _routes.toarray();
lock (_routes)
_routes = _routes
.where(i => !deletedchildrens.contains(i.serviceroutedescriptor.id))
var deletedroutes = routes.where(i => deletedchildrens.contains(i.serviceroutedescriptor.id)).toarray();
onremoved(deletedroutes.select(route => new servicerouteeventargs(route)).toarray());
oncreated(newroutes.select(route => new servicerouteeventargs(route)).toarray());
if (_logger.isenabled(loglevel.information))
/// <summary>performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void dispose()
private async task<zookeeper> getzookeeper()
return await _zookeeperclientprovider.getzookeeper();


public class configinfo
/// <summary>
/// 初始化会话超时为20秒的zookeeper配置信息。
/// </summary>
/// <param name="connectionstring">连接字符串。</param>
/// <param name="routepath">路由配置路径。</param>
/// <param name="subscriberpath">订阅者配置路径</param>
/// <param name="commandpath">服务命令配置路径</param>
/// <param name="cachepath">缓存中心配置路径</param>
/// <param name="mqttroutepath">mqtt路由配置路径</param>
/// <param name="chroot">根节点。</param>
public configinfo(string connectionstring, string routepath = "/services/serviceroutes",
string subscriberpath = "/services/servicesubscribers",
string commandpath = "/services/servicecommands",
string cachepath = "/services/servicecaches",
string mqttroutepath = "/services/mqttserviceroutes",
string chroot = null,
bool reloadonchange = false, bool enablechildrenmonitor = false) : this(connectionstring,
reloadonchange, enablechildrenmonitor)
/// <summary>
/// 初始化zookeeper配置信息。
/// </summary>
/// <param name="connectionstring">连接字符串。</param>
/// <param name="routepath">路由配置路径。</param>
/// <param name="commandpath">服务命令配置路径</param>
/// <param name="subscriberpath">订阅者配置路径</param>
/// <param name="sessiontimeout">会话超时时间。</param>
/// <param name="cachepath">缓存中心配置路径</param>
/// <param name="mqttroutepath">mqtt路由配置路径</param>
/// <param name="chroot">根节点。</param>
public configinfo(string connectionstring, timespan sessiontimeout, string routepath = "/services/serviceroutes",
string subscriberpath = "/services/servicesubscribers",
string commandpath = "/services/servicecommands",
string cachepath = "/services/servicecaches",
string mqttroutepath = "/services/mqttserviceroutes",
string chroot = null,
bool reloadonchange = false, bool enablechildrenmonitor = false)
cachepath = cachepath;
reloadonchange = reloadonchange;
chroot = chroot;
commandpath = commandpath;
subscriberpath = subscriberpath;
connectionstring = connectionstring;
routepath = routepath;
sessiontimeout = sessiontimeout;
mqttroutepath = mqttroutepath;
enablechildrenmonitor = enablechildrenmonitor;
addresses = connectionstring?.split(",");
public bool enablechildrenmonitor { get; set; }
public bool reloadonchange { get; set; }
/// <summary>
/// 连接字符串。
/// </summary>
public string connectionstring { get; set; }
/// <summary>
/// 命令配置路径
/// </summary>
public string commandpath { get; set; }
/// <summary>
/// 路由配置路径。
/// </summary>
public string routepath { get; set; }
/// <summary>
/// 订阅者配置路径
/// </summary>
public string subscriberpath { get; set; }
/// <summary>
/// 会话超时时间。
/// </summary>
public timespan sessiontimeout { get; set; }
/// <summary>
/// 根节点。
/// </summary>
public string chroot { get; set; }
public ienumerable<string> addresses { get; set; }
/// <summary>
/// 缓存中心配置中心
/// </summary>
public string cachepath { get; set; }
/// <summary>
/// mqtt路由配置路径。
/// </summary>
public string mqttroutepath { get; set; }


public class serviceroute
/// <summary>
/// 服务可用地址。
/// </summary>
public ienumerable<string> address { get; set; }
/// <summary>
/// 服务描述符。
/// </summary>
public serviceroutedescriptor serviceroutedescriptor { get; set; }
#region equality members
/// <summary>determines whether the specified object is equal to the current object.</summary>
/// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
/// <param name="obj">the object to compare with the current object. </param>
public override bool equals(object obj)
var model = obj as serviceroute;
if (model == null)
return false;
if (obj.gettype() != gettype())
return false;
if (model.serviceroutedescriptor != serviceroutedescriptor)
return false;
return model.address.count() == address.count() && model.address.all(addressmodel => address.contains(addressmodel));
/// <summary>serves as the default hash function. </summary>
/// <returns>a hash code for the current object.</returns>
public override int gethashcode()
return tostring().gethashcode();
public static bool operator ==(serviceroute model1, serviceroute model2)
return equals(model1, model2);
public static bool operator !=(serviceroute model1, serviceroute model2)
return !equals(model1, model2);
#endregion equality members
/// <summary>
/// 服务描述符。
/// </summary>
public class serviceroutedescriptor
/// <summary>
/// 初始化一个新的服务描述符。
/// </summary>
public serviceroutedescriptor()
metadatas = new dictionary<string, object>(stringcomparer.ordinalignorecase);
/// <summary>
/// 服务id。
/// </summary>
public string id { get; set; }
/// <summary>
/// 访问的令牌
/// </summary>
public string token { get; set; }
/// <summary>
/// 路由
/// </summary>
public string routepath { get; set; }
/// <summary>
/// 元数据。
/// </summary> 
public idictionary<string, object> metadatas { get; set; }
/// <summary>
/// 获取一个元数据。
/// </summary>
/// <typeparam name="t">元数据类型。</typeparam>
/// <param name="name">元数据名称。</param>
/// <param name="def">如果指定名称的元数据不存在则返回这个参数。</param>
/// <returns>元数据值。</returns>
public t getmetadata<t>(string name, t def = default(t))
if (!metadatas.containskey(name))
return def;
return (t)metadatas[name];
#region equality members
/// <summary>determines whether the specified object is equal to the current object.</summary>
/// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
/// <param name="obj">the object to compare with the current object. </param>
public override bool equals(object obj)
var model = obj as serviceroutedescriptor;
if (model == null)
return false;
if (obj.gettype() != gettype())
return false;
if (model.id != id)
return false;
return model.metadatas.count == metadatas.count && model.metadatas.all(metadata =>
object value;
if (!metadatas.trygetvalue(metadata.key, out value))
return false;
if (metadata.value == null && value == null)
return true;
if (metadata.value == null || value == null)
return false;
return metadata.value.equals(value);
/// <summary>serves as the default hash function. </summary>
/// <returns>a hash code for the current object.</returns>
public override int gethashcode()
return tostring().gethashcode();
public static bool operator ==(serviceroutedescriptor model1, serviceroutedescriptor model2)
return equals(model1, model2);
public static bool operator !=(serviceroutedescriptor model1, serviceroutedescriptor model2)
return !equals(model1, model2);
#endregion equality members



internal class childrenmonitorwatcher : watcher
private readonly task<zookeeper> _zookeepercall;
private readonly string _path;
private readonly action<string[], string[]> _action;
private string[] _currentdata = new string[0];
public childrenmonitorwatcher(task<zookeeper> zookeepercall, string path, action<string[], string[]> action)
_zookeepercall = zookeepercall;
_path = path;
_action = action;
public childrenmonitorwatcher setcurrentdata(string[] currentdata)
_currentdata = currentdata ?? new string[0];
return this;
#region overrides of watcherbase
public override async task process(watchedevent watchedevent)
if (watchedevent.getstate() != event.keeperstate.syncconnected || watchedevent.getpath() != _path)
var zookeeper = await _zookeepercall;
//func<childrenmonitorwatcher> getwatcher = () => new childrenmonitorwatcher(_zookeepercall, path, _action);
task<childrenmonitorwatcher> getwatcher =  task.run(() => {return new childrenmonitorwatcher(_zookeepercall, _path, _action); });
switch (watchedevent.get_type())
case event.eventtype.nodecreated:
await zookeeper.getchildrenasync(_path, await getwatcher);
case event.eventtype.nodechildrenchanged:
var watcher = await getwatcher;
var result = await zookeeper.getchildrenasync(_path, watcher);
var childrens = result.children.toarray();
_action(_currentdata, childrens);
catch (keeperexception.nonodeexception)
_action(_currentdata, new string[0]);
case event.eventtype.nodedeleted:
var watcher = await getwatcher;
await zookeeper.existsasync(_path, watcher);
_action(_currentdata, new string[0]);
watcher.setcurrentdata(new string[0]);
#endregion overrides of watcherbase


internal class nodemonitorwatcher : watcher
private readonly task<zookeeper> _zookeepercall;
private readonly string _path;
private readonly action<byte[], byte[]> _action;
private byte[] _currentdata;
public nodemonitorwatcher(task<zookeeper> zookeepercall, string path, action<byte[], byte[]> action)
_zookeepercall = zookeepercall;
_path = path;
_action = action;
public nodemonitorwatcher setcurrentdata(byte[] currentdata)
_currentdata = currentdata;
return this;
#region overrides of watcherbase
public override async task process(watchedevent watchedevent)
switch (watchedevent.get_type())
case event.eventtype.nodedatachanged:
var zookeeper = await _zookeepercall;
var watcher = new nodemonitorwatcher(_zookeepercall, _path, _action);
var data = await zookeeper.getdataasync(_path, watcher);
var newdata = data.data;
_action(_currentdata, newdata);
#endregion overrides of watcherbase


internal class reconnectionwatcher : watcher
private readonly action _reconnection;
public reconnectionwatcher(action reconnection)
_reconnection = reconnection;
#region overrides of watcher
/// <summary>processes the specified event.</summary>
/// <param name="watchedevent">the event.</param>
/// <returns></returns>
public override async task process(watchedevent watchedevent)
var state = watchedevent.getstate();
switch (state)
case event.keeperstate.expired:
case event.keeperstate.disconnected:
await task.completedtask;
#endregion overrides of watcher