首先看一下几种注册中心:

最老的就是zookeeper了, 比较新的有eureka,consul 都可以做注册中心。可以自行搜索对比三者的优缺点。

zookeeper 最开始就是hadoop大家族中的一员,用于做协调的框架,后来已经是apache的子项目了。

几年前大数据很火的时候,只要学hadoop必学zookeeper,当然还有其他成员。

大数据简单说就是分布式,比如分布式文件存储hdfs,分布式数据库hbase,分布式协调zookeeper,还有kafka,flume等等都是hadoop大家族。

zookeeper,现在更多被用来做注册中心,比如阿里的开源soa框架dubbo就经常搭配zookeeper做注册中心。

eureka:java的微服务框架spring cloud中内部已经集成了eureka注册中心。

我选择zookeeper,不是因为他比另外两个强,而是因为我几年前就已经学习过一些zookeeper的原理,上手更容易。网络上学习书籍、资料、视频教程也特别多,学习资料完善。

 

注册中心的基本功能:

1. 注册服务,有点类似dns,所有的服务注册到注册中心,包含服务的地址等信息。

2. 服务订阅,客户端请求服务,注册中心就要把那些能用的服务器地址告诉客户端,服务端有变动时,注册中心也能及时通知到客户端。

3. 性能好且高可用,注册中心自身也是一个集群,如果只有一个注册中心机器的话那岂不是把注册中心累死啊,而且他一旦坏了以后,那客户端都找不到服务器了。所有注册中心就有很多台,其中只有一个老大(leader),老大用来写,小弟用来读。就是说老大来决定一台服务器能不能注册进来,小弟负责帮助客户端查找服务器。因为注册服务的次数是很少的,通常有新服务器加入才需要注册,但是客户端订阅那就很多了,所以注册中心只有一个leader。leader万一坏掉的话,会从小弟中选举出一个来当老大接替工作。

 

上面提到说zookeeper集群,就是说有很多台机器做zookeeper机器,但是这些机器里存储的东西基本上都是一样的,就是说客户端不管连到哪个zookeeper 都是一样的,能做服务订阅。

每一个zookeeper 中都有很多节点(znode)。

接下来说的zookeeper节点和集群完全不是一回事。 有些人喜欢吧集群中的每一台zookeeper机器称为一个节点,但是这个节点(zookeeper机器)和我说的节点(znode)完全不是一回事。

如下图:

 

 本例的图中可以看到,一共有5台机器,每台机器都有5个znode,znode下面的子节点就更多了。

先看5台机器:

一台leader,老大,上文已经介绍,服务都从这些注册写入。

两台follower,小弟,平时用于服务订阅,老大挂掉以后,follower内部就会自行选出老大。

两台observer,观察者,就是属于无业游民,只能看,没有选老大的资格,不能参与竞选也不能投票,唯一的功能就是服务订阅。

  observer模式需要手动开启,为什么会出现observer呢,是因为机器太多的话,每个机器都有选举权的话特别影响性能。全中国14亿人口,每个人都参与国家竞选的话,效率极低。所以呢,选举的工作就交给follower完成就行了,只需要确保一直都有leader接班人就好。

 

再看看zookeeper有什么基本功能:

基本功能很简单,组合以后却可以完成各种复杂工作。

1. 可以创建:临时节点(断开连接时便删除节点) 和 持久化节点(必须手动删除节点)。

2. 可以创建:无序节点 和 有序节点。

3. 节点上可以添加watcher监听功能,监听该节点的增删改,然后触发自定义的事件。

 

看看这些功能怎么用:

1. 节点: 每次注册一个服务就创建一个节点,节点的名称(key)就是服务的名称,服务的详细信息存储在节点value中,客户端通过key找到对应的节点,再找打节点中的value。

2. 临时节点:服务端注册一个服务时创建一个临时节点,服务断开时,临时节点自动销毁,自动完成服务注销。

3. watcher监听: 客户端在注册中心订阅了一个服务的时候,同时在这个服务所在的节点上加一个监听事件,每当服务节点信息有变化的时候,注册中心会自动回调通知客户端。

4. 有序临时节点:分布式锁或者分布式队列(这里与服务注册无关),客户端1想要操作一条数据的时候,在a节点下创建一个有序临时节点,自动分配编号001;客户端1也要操作该数据的时候,在a节点下也创建一个有序临时节点,自动分配编号002。只有编号最小的子节点才会被执行,因此001节点会被执行,客户端1执行完毕后,自动删除001节点,此时002编号为最小子节点。即锁的概念,不能同时操作同一数据;也可以做队列,按照先后顺序依次执行。

5. 有序临时节点+watcher监听: 上面第4条中说到每次执行编号最小的节点,因此需要有一个程序,每次都需要遍历全部节点,然后找出最小的节点,假如是002节点,这时客户端2开始执行。但是添加监听机制以后就不一样了,002监听001,003监听比他小一号的002,这样001销毁的同时通知002开始执行,002销毁的时候通知003开始执行,不需要遍历最小节点,也能有序依次执行。

6. 临时节点+watcher监听: 集群master选举以及高可用。比如hadoop集群,也有一个resourcemanager资源管理器,负责调度其它节点机器,相当于hadoop集群的leader节点。这个leader就可以交由zookeeper管理,所有的hadoop机器同时在zookeeper中创建一个同名的临时节点,由于是同名互斥的节点,因此只有一个节点能被创建,成功创建这个节点的hadoop机器就是leader。同时添加watcher监听,这个leader只要断开连接,临时节点自动销毁,触发监听,其它hadoop开始新一轮的master选举。这也是zookeeper最初在hadoop家族中的重要使命。

7……. 还要很多地方都能用zookeeper,简直无所不能,而且自身也是高可用,高性能,牛x

 

zookeeper本身的操作还是很简单的,无非就是节点的增删改查,可以选择要创建节点的类型,还有就是在节点上添加watcher监听器。就这些。

 

文件结构:

 

上代码:

zookeeper客户端管理类:

public class zookeeperclientprovider
    {
        private configinfo _config;
        private readonly ilogger<zookeeperclientprovider> _logger;
        private readonly dictionary<string, zookeeper> _zookeeperclients = new dictionary<string, zookeeper>();

        public zookeeperclientprovider(configinfo config, ilogger<zookeeperclientprovider> logger)
        {
            _config = config;
            _logger = logger;
        }

        public async task<zookeeper> getzookeeper()
        {
            return await createzookeeper(_config.addresses.firstordefault());
        }
        public async task<zookeeper> createzookeeper(string address)
        {
            if (!_zookeeperclients.trygetvalue(address, out zookeeper result))
            {
                await task.run(() =>
                {
                    result = new zookeeper(address, (int)_config.sessiontimeout.totalmilliseconds,
                        new reconnectionwatcher(
                            async () =>
                            {
                                if (_zookeeperclients.remove(address, out zookeeper value))
                                {
                                    await value.closeasync();
                                }
                                await createzookeeper(address);
                            }));
                    _zookeeperclients.tryadd(address, result);
                });
            }
            return result;
        }

        public async task<ienumerable<zookeeper>> getzookeepers()
        {
            var result = new list<zookeeper>();
            foreach (var address in _config.addresses)
            {
                result.add(await createzookeeper(address));
            }
            return result;
        }
    }

zookeeper服务注册类:

/// <summary>
    /// 一个抽象的服务路由发现者。
    /// </summary>
    public interface iserviceroutemanager
    {

        /// <summary>
        /// 服务路由被创建。
        /// </summary>
        event eventhandler<servicerouteeventargs> created;

        /// <summary>
        /// 服务路由被删除。
        /// </summary>
        event eventhandler<servicerouteeventargs> removed;

        /// <summary>
        /// 服务路由被修改。
        /// </summary>
        event eventhandler<serviceroutechangedeventargs> changed;

        /// <summary>
        /// 获取所有可用的服务路由信息。
        /// </summary>
        /// <returns>服务路由集合。</returns>
        task<ienumerable<serviceroute>> getroutesasync();

        /// <summary>
        /// 设置服务路由。
        /// </summary>
        /// <param name="routes">服务路由集合。</param>
        /// <returns>一个任务。</returns>
        task setroutesasync(ienumerable<serviceroute> routes);

        /// <summary>
        /// 移除地址列表
        /// </summary>
        /// <param name="routes">地址列表。</param>
        /// <returns>一个任务。</returns>
        task remveaddressasync(ienumerable<string> address);
        /// <summary>
        /// 清空所有的服务路由。
        /// </summary>
        /// <returns>一个任务。</returns>
        task clearasync();
    }

    /// <summary>
    /// 服务路由事件参数。
    /// </summary>
    public class servicerouteeventargs
    {
        public servicerouteeventargs(serviceroute route)
        {
            route = route;
        }

        /// <summary>
        /// 服务路由信息。
        /// </summary>
        public serviceroute route { get; private set; }
    }

    /// <summary>
    /// 服务路由变更事件参数。
    /// </summary>
    public class serviceroutechangedeventargs : servicerouteeventargs
    {
        public serviceroutechangedeventargs(serviceroute route, serviceroute oldroute) : base(route)
        {
            oldroute = oldroute;
        }

        /// <summary>
        /// 旧的服务路由信息。
        /// </summary>
        public serviceroute oldroute { get; set; }
    }
public class zookeeperserviceroutemanager : iserviceroutemanager, idisposable
{
private readonly configinfo _configinfo;
private readonly iserializer<byte[]> _serializer;
private readonly ilogger<zookeeperserviceroutemanager> _logger;
private serviceroute[] _routes;
private readonly zookeeperclientprovider _zookeeperclientprovider;
public zookeeperserviceroutemanager(configinfo configinfo, iserializer<byte[]> serializer,
iserializer<string> stringserializer,
ilogger<zookeeperserviceroutemanager> logger,
zookeeperclientprovider zookeeperclientprovider)
{
_configinfo = configinfo;
_serializer = serializer;
_logger = logger;
_zookeeperclientprovider = zookeeperclientprovider;
enterroutes().wait();
}
private eventhandler<servicerouteeventargs> _created;
private eventhandler<servicerouteeventargs> _removed;
private eventhandler<serviceroutechangedeventargs> _changed;
/// <summary>
/// 服务路由被创建。
/// </summary>
public event eventhandler<servicerouteeventargs> created
{
add { _created += value; }
remove { _created -= value; }
}
/// <summary>
/// 服务路由被删除。
/// </summary>
public event eventhandler<servicerouteeventargs> removed
{
add { _removed += value; }
remove { _removed -= value; }
}
/// <summary>
/// 服务路由被修改。
/// </summary>
public event eventhandler<serviceroutechangedeventargs> changed
{
add { _changed += value; }
remove { _changed -= value; }
}
protected void oncreated(params servicerouteeventargs[] args)
{
if (_created == null)
return;
foreach (var arg in args)
_created(this, arg);
}
protected void onchanged(params serviceroutechangedeventargs[] args)
{
if (_changed == null)
return;
foreach (var arg in args)
_changed(this, arg);
}
protected void onremoved(params servicerouteeventargs[] args)
{
if (_removed == null)
return;
foreach (var arg in args)
_removed(this, arg);
}
/// <summary>
/// 获取所有可用的服务路由信息。
/// </summary>
/// <returns>服务路由集合。</returns>
public async task<ienumerable<serviceroute>> getroutesasync()
{
await enterroutes();
return _routes;
}
/// <summary>
/// 清空所有的服务路由。
/// </summary>
/// <returns>一个任务。</returns>
public async task clearasync()
{
if (_logger.isenabled(loglevel.information))
_logger.loginformation("准备清空所有路由配置。");
var zookeepers = await _zookeeperclientprovider.getzookeepers();
foreach (var zookeeper in zookeepers)
{
var path = _configinfo.routepath;
var childrens = path.split(new[] { '/' }, stringsplitoptions.removeemptyentries);
var index = 0;
while (childrens.count() > 1)
{
var nodepath = "/" + string.join("/", childrens);
if (await zookeeper.existsasync(nodepath) != null)
{
var result = await zookeeper.getchildrenasync(nodepath);
if (result?.children != null)
{
foreach (var child in result.children)
{
var childpath = $"{nodepath}/{child}";
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"准备删除:{childpath}。");
await zookeeper.deleteasync(childpath);
}
}
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"准备删除:{nodepath}。");
await zookeeper.deleteasync(nodepath);
}
index++;
childrens = childrens.take(childrens.length - index).toarray();
}
if (_logger.isenabled(loglevel.information))
_logger.loginformation("路由配置清空完成。");
}
}
/// <summary>
/// 设置服务路由。
/// </summary>
/// <param name="routes">服务路由集合。</param>
/// <returns>一个任务。</returns>
public async task setroutesasync(ienumerable<serviceroute> routes)
{
var hostaddr = netutils.gethostaddress();
var serviceroutes = await getroutes(routes.select(p => p.serviceroutedescriptor.id));
if (serviceroutes.count() > 0)
{
foreach (var route in routes)
{
var serviceroute = serviceroutes.where(p => p.serviceroutedescriptor.id == route.serviceroutedescriptor.id).firstordefault();
if (serviceroute != null)
{
var addresses = serviceroute.address.concat(
route.address.except(serviceroute.address)).tolist();
foreach (var address in route.address)
{
addresses.remove(addresses.where(p => p.tostring() == address.tostring()).firstordefault());
addresses.add(address);
}
route.address = addresses;
}
}
}
await removeexceptroutesasync(routes, hostaddr);
if (_logger.isenabled(loglevel.information))
_logger.loginformation("准备添加服务路由。");
var zookeepers = await _zookeeperclientprovider.getzookeepers();
foreach (var zookeeper in zookeepers)
{
await createsubdirectory(zookeeper, _configinfo.routepath);
var path = _configinfo.routepath;
if (!path.endswith("/"))
path += "/";
routes = routes.toarray();
foreach (var serviceroute in routes)
{
var nodepath = $"{path}{serviceroute.serviceroutedescriptor.id}";
var nodedata = _serializer.serialize(serviceroute);
if (await zookeeper.existsasync(nodepath) == null)
{
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"节点:{nodepath}不存在将进行创建。");
await zookeeper.createasync(nodepath, nodedata, zoodefs.ids.open_acl_unsafe, createmode.persistent);
}
else
{
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"将更新节点:{nodepath}的数据。");
var onlinedata = (await zookeeper.getdataasync(nodepath)).data;
if (!dataequals(nodedata, onlinedata))
await zookeeper.setdataasync(nodepath, nodedata);
}
}
if (_logger.isenabled(loglevel.information))
_logger.loginformation("服务路由添加成功。");
}
}
public async task remveaddressasync(ienumerable<string> address)
{
var routes = await getroutesasync();
foreach (var route in routes)
{
route.address = route.address.except(address);
}
await setroutesasync(routes);
}
private async task removeexceptroutesasync(ienumerable<serviceroute> routes, string hostaddr)
{
var path = _configinfo.routepath;
if (!path.endswith("/"))
path += "/";
routes = routes.toarray();
var zookeepers = await _zookeeperclientprovider.getzookeepers();
foreach (var zookeeper in zookeepers)
{
if (_routes != null)
{
var oldrouteids = _routes.select(i => i.serviceroutedescriptor.id).toarray();
var newrouteids = routes.select(i => i.serviceroutedescriptor.id).toarray();
var deletedrouteids = oldrouteids.except(newrouteids).toarray();
foreach (var deletedrouteid in deletedrouteids)
{
var addresses = _routes.where(p => p.serviceroutedescriptor.id == deletedrouteid).select(p => p.address).firstordefault();
if (addresses.contains(hostaddr))
{
var nodepath = $"{path}{deletedrouteid}";
await zookeeper.deleteasync(nodepath);
}
}
}
}
}
private async task createsubdirectory(zookeeper zookeeper, string path)
{
if (await zookeeper.existsasync(path) != null)
return;
if (_logger.isenabled(loglevel.information))
_logger.loginformation($"节点{path}不存在,将进行创建。");
var childrens = path.split(new[] { '/' }, stringsplitoptions.removeemptyentries);
var nodepath = "/";
foreach (var children in childrens)
{
nodepath += children;
if (await zookeeper.existsasync(nodepath) == null)
{
await zookeeper.createasync(nodepath, null, zoodefs.ids.open_acl_unsafe, createmode.persistent);
}
nodepath += "/";
}
}
private async task<serviceroute> getroute(byte[] data)
{
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"准备转换服务路由,配置内容:{encoding.utf8.getstring(data)}。");
if (data == null)
return null;
return await task.run(() =>
{
return _serializer.deserialize<serviceroute>(data);
});
}
private async task<serviceroute> getroute(string path)
{
serviceroute result = null;
var zookeeper = await getzookeeper();
var watcher = new nodemonitorwatcher(getzookeeper(), path,
async (olddata, newdata) => await nodechange(olddata, newdata));
if (await zookeeper.existsasync(path) != null)
{
var data = (await zookeeper.getdataasync(path, watcher)).data;
watcher.setcurrentdata(data);
result = await getroute(data);
}
return result;
}
private async task<serviceroute[]> getroutes(ienumerable<string> childrens)
{
var rootpath = _configinfo.routepath;
if (!rootpath.endswith("/"))
rootpath += "/";
childrens = childrens.toarray();
var routes = new list<serviceroute>(childrens.count());
foreach (var children in childrens)
{
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"准备从节点:{children}中获取路由信息。");
var nodepath = $"{rootpath}{children}";
var route = await getroute(nodepath);
if (route != null)
routes.add(route);
}
return routes.toarray();
}
private async task enterroutes()
{
if (_routes != null)
return;
var zookeeper = await getzookeeper();
var watcher = new childrenmonitorwatcher(getzookeeper(), _configinfo.routepath,
async (oldchildrens, newchildrens) => await childrenchange(oldchildrens, newchildrens));
if (await zookeeper.existsasync(_configinfo.routepath, watcher) != null)
{
var result = await zookeeper.getchildrenasync(_configinfo.routepath, watcher);
var childrens = result.children.toarray();
watcher.setcurrentdata(childrens);
_routes = await getroutes(childrens);
}
else
{
if (_logger.isenabled(loglevel.warning))
_logger.logwarning($"无法获取路由信息,因为节点:{_configinfo.routepath},不存在。");
_routes = new serviceroute[0];
}
}
private static bool dataequals(ireadonlylist<byte> data1, ireadonlylist<byte> data2)
{
if (data1.count != data2.count)
return false;
for (var i = 0; i < data1.count; i++)
{
var b1 = data1[i];
var b2 = data2[i];
if (b1 != b2)
return false;
}
return true;
}
public async task nodechange(byte[] olddata, byte[] newdata)
{
if (dataequals(olddata, newdata))
return;
var newroute = await getroute(newdata);
//得到旧的路由。
var oldroute = _routes.firstordefault(i => i.serviceroutedescriptor.id == newroute.serviceroutedescriptor.id);
lock (_routes)
{
//删除旧路由,并添加上新的路由。
_routes =
_routes
.where(i => i.serviceroutedescriptor.id != newroute.serviceroutedescriptor.id)
.concat(new[] { newroute }).toarray();
}
//触发路由变更事件。
onchanged(new serviceroutechangedeventargs(newroute, oldroute));
}
public async task childrenchange(string[] oldchildrens, string[] newchildrens)
{
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"最新的节点信息:{string.join(",", newchildrens)}");
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"旧的节点信息:{string.join(",", oldchildrens)}");
//计算出已被删除的节点。
var deletedchildrens = oldchildrens.except(newchildrens).toarray();
//计算出新增的节点。
var createdchildrens = newchildrens.except(oldchildrens).toarray();
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"需要被删除的路由节点:{string.join(",", deletedchildrens)}");
if (_logger.isenabled(loglevel.debug))
_logger.logdebug($"需要被添加的路由节点:{string.join(",", createdchildrens)}");
//获取新增的路由信息。
var newroutes = (await getroutes(createdchildrens)).toarray();
var routes = _routes.toarray();
lock (_routes)
{
_routes = _routes
//删除无效的节点路由。
.where(i => !deletedchildrens.contains(i.serviceroutedescriptor.id))
//连接上新的路由。
.concat(newroutes)
.toarray();
}
//需要删除的路由集合。
var deletedroutes = routes.where(i => deletedchildrens.contains(i.serviceroutedescriptor.id)).toarray();
//触发删除事件。
onremoved(deletedroutes.select(route => new servicerouteeventargs(route)).toarray());
//触发路由被创建事件。
oncreated(newroutes.select(route => new servicerouteeventargs(route)).toarray());
if (_logger.isenabled(loglevel.information))
_logger.loginformation("路由数据更新成功。");
}
/// <summary>performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void dispose()
{
}
private async task<zookeeper> getzookeeper()
{
return await _zookeeperclientprovider.getzookeeper();
}
}

zookeeper连接配置类:

public class configinfo
{
/// <summary>
/// 初始化会话超时为20秒的zookeeper配置信息。
/// </summary>
/// <param name="connectionstring">连接字符串。</param>
/// <param name="routepath">路由配置路径。</param>
/// <param name="subscriberpath">订阅者配置路径</param>
/// <param name="commandpath">服务命令配置路径</param>
/// <param name="cachepath">缓存中心配置路径</param>
/// <param name="mqttroutepath">mqtt路由配置路径</param>
/// <param name="chroot">根节点。</param>
public configinfo(string connectionstring, string routepath = "/services/serviceroutes",
string subscriberpath = "/services/servicesubscribers",
string commandpath = "/services/servicecommands",
string cachepath = "/services/servicecaches",
string mqttroutepath = "/services/mqttserviceroutes",
string chroot = null,
bool reloadonchange = false, bool enablechildrenmonitor = false) : this(connectionstring,
timespan.fromseconds(20),
routepath,
subscriberpath,
commandpath,
cachepath,
mqttroutepath,
chroot,
reloadonchange, enablechildrenmonitor)
{
}
/// <summary>
/// 初始化zookeeper配置信息。
/// </summary>
/// <param name="connectionstring">连接字符串。</param>
/// <param name="routepath">路由配置路径。</param>
/// <param name="commandpath">服务命令配置路径</param>
/// <param name="subscriberpath">订阅者配置路径</param>
/// <param name="sessiontimeout">会话超时时间。</param>
/// <param name="cachepath">缓存中心配置路径</param>
/// <param name="mqttroutepath">mqtt路由配置路径</param>
/// <param name="chroot">根节点。</param>
public configinfo(string connectionstring, timespan sessiontimeout, string routepath = "/services/serviceroutes",
string subscriberpath = "/services/servicesubscribers",
string commandpath = "/services/servicecommands",
string cachepath = "/services/servicecaches",
string mqttroutepath = "/services/mqttserviceroutes",
string chroot = null,
bool reloadonchange = false, bool enablechildrenmonitor = false)
{
cachepath = cachepath;
reloadonchange = reloadonchange;
chroot = chroot;
commandpath = commandpath;
subscriberpath = subscriberpath;
connectionstring = connectionstring;
routepath = routepath;
sessiontimeout = sessiontimeout;
mqttroutepath = mqttroutepath;
enablechildrenmonitor = enablechildrenmonitor;
addresses = connectionstring?.split(",");
}
public bool enablechildrenmonitor { get; set; }
public bool reloadonchange { get; set; }
/// <summary>
/// 连接字符串。
/// </summary>
public string connectionstring { get; set; }
/// <summary>
/// 命令配置路径
/// </summary>
public string commandpath { get; set; }
/// <summary>
/// 路由配置路径。
/// </summary>
public string routepath { get; set; }
/// <summary>
/// 订阅者配置路径
/// </summary>
public string subscriberpath { get; set; }
/// <summary>
/// 会话超时时间。
/// </summary>
public timespan sessiontimeout { get; set; }
/// <summary>
/// 根节点。
/// </summary>
public string chroot { get; set; }
public ienumerable<string> addresses { get; set; }
/// <summary>
/// 缓存中心配置中心
/// </summary>
public string cachepath { get; set; }
/// <summary>
/// mqtt路由配置路径。
/// </summary>
public string mqttroutepath { get; set; }
}

路由和路由描述:

public class serviceroute
{
/// <summary>
/// 服务可用地址。
/// </summary>
public ienumerable<string> address { get; set; }
/// <summary>
/// 服务描述符。
/// </summary>
public serviceroutedescriptor serviceroutedescriptor { get; set; }
#region equality members
/// <summary>determines whether the specified object is equal to the current object.</summary>
/// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
/// <param name="obj">the object to compare with the current object. </param>
public override bool equals(object obj)
{
var model = obj as serviceroute;
if (model == null)
return false;
if (obj.gettype() != gettype())
return false;
if (model.serviceroutedescriptor != serviceroutedescriptor)
return false;
return model.address.count() == address.count() && model.address.all(addressmodel => address.contains(addressmodel));
}
/// <summary>serves as the default hash function. </summary>
/// <returns>a hash code for the current object.</returns>
public override int gethashcode()
{
return tostring().gethashcode();
}
public static bool operator ==(serviceroute model1, serviceroute model2)
{
return equals(model1, model2);
}
public static bool operator !=(serviceroute model1, serviceroute model2)
{
return !equals(model1, model2);
}
#endregion equality members
}
/// <summary>
/// 服务描述符。
/// </summary>
[serializable]
public class serviceroutedescriptor
{
/// <summary>
/// 初始化一个新的服务描述符。
/// </summary>
public serviceroutedescriptor()
{
metadatas = new dictionary<string, object>(stringcomparer.ordinalignorecase);
}
/// <summary>
/// 服务id。
/// </summary>
public string id { get; set; }
/// <summary>
/// 访问的令牌
/// </summary>
public string token { get; set; }
/// <summary>
/// 路由
/// </summary>
public string routepath { get; set; }
/// <summary>
/// 元数据。
/// </summary> 
public idictionary<string, object> metadatas { get; set; }
/// <summary>
/// 获取一个元数据。
/// </summary>
/// <typeparam name="t">元数据类型。</typeparam>
/// <param name="name">元数据名称。</param>
/// <param name="def">如果指定名称的元数据不存在则返回这个参数。</param>
/// <returns>元数据值。</returns>
public t getmetadata<t>(string name, t def = default(t))
{
if (!metadatas.containskey(name))
return def;
return (t)metadatas[name];
}
#region equality members
/// <summary>determines whether the specified object is equal to the current object.</summary>
/// <returns>true if the specified object  is equal to the current object; otherwise, false.</returns>
/// <param name="obj">the object to compare with the current object. </param>
public override bool equals(object obj)
{
var model = obj as serviceroutedescriptor;
if (model == null)
return false;
if (obj.gettype() != gettype())
return false;
if (model.id != id)
return false;
return model.metadatas.count == metadatas.count && model.metadatas.all(metadata =>
{
object value;
if (!metadatas.trygetvalue(metadata.key, out value))
return false;
if (metadata.value == null && value == null)
return true;
if (metadata.value == null || value == null)
return false;
return metadata.value.equals(value);
});
}
/// <summary>serves as the default hash function. </summary>
/// <returns>a hash code for the current object.</returns>
public override int gethashcode()
{
return tostring().gethashcode();
}
public static bool operator ==(serviceroutedescriptor model1, serviceroutedescriptor model2)
{
return equals(model1, model2);
}
public static bool operator !=(serviceroutedescriptor model1, serviceroutedescriptor model2)
{
return !equals(model1, model2);
}
#endregion equality members
}

watcher监听器:

子节点监听器:

internal class childrenmonitorwatcher : watcher
{
private readonly task<zookeeper> _zookeepercall;
private readonly string _path;
private readonly action<string[], string[]> _action;
private string[] _currentdata = new string[0];
public childrenmonitorwatcher(task<zookeeper> zookeepercall, string path, action<string[], string[]> action)
{
_zookeepercall = zookeepercall;
_path = path;
_action = action;
}
public childrenmonitorwatcher setcurrentdata(string[] currentdata)
{
_currentdata = currentdata ?? new string[0];
return this;
}
#region overrides of watcherbase
public override async task process(watchedevent watchedevent)
{
if (watchedevent.getstate() != event.keeperstate.syncconnected || watchedevent.getpath() != _path)
return;
var zookeeper = await _zookeepercall;
//func<childrenmonitorwatcher> getwatcher = () => new childrenmonitorwatcher(_zookeepercall, path, _action);
task<childrenmonitorwatcher> getwatcher =  task.run(() => {return new childrenmonitorwatcher(_zookeepercall, _path, _action); });
switch (watchedevent.get_type())
{
//创建之后开始监视下面的子节点情况。
case event.eventtype.nodecreated:
await zookeeper.getchildrenasync(_path, await getwatcher);
break;
//子节点修改则继续监控子节点信息并通知客户端数据变更。
case event.eventtype.nodechildrenchanged:
try
{
var watcher = await getwatcher;
var result = await zookeeper.getchildrenasync(_path, watcher);
var childrens = result.children.toarray();
_action(_currentdata, childrens);
watcher.setcurrentdata(childrens);
}
catch (keeperexception.nonodeexception)
{
_action(_currentdata, new string[0]);
}
break;
//删除之后开始监控自身节点,并通知客户端数据被清空。
case event.eventtype.nodedeleted:
{
var watcher = await getwatcher;
await zookeeper.existsasync(_path, watcher);
_action(_currentdata, new string[0]);
watcher.setcurrentdata(new string[0]);
}
break;
}
}
#endregion overrides of watcherbase
}

当前节点监听器:

internal class nodemonitorwatcher : watcher
{
private readonly task<zookeeper> _zookeepercall;
private readonly string _path;
private readonly action<byte[], byte[]> _action;
private byte[] _currentdata;
public nodemonitorwatcher(task<zookeeper> zookeepercall, string path, action<byte[], byte[]> action)
{
_zookeepercall = zookeepercall;
_path = path;
_action = action;
}
public nodemonitorwatcher setcurrentdata(byte[] currentdata)
{
_currentdata = currentdata;
return this;
}
#region overrides of watcherbase
public override async task process(watchedevent watchedevent)
{
switch (watchedevent.get_type())
{
case event.eventtype.nodedatachanged:
var zookeeper = await _zookeepercall;
var watcher = new nodemonitorwatcher(_zookeepercall, _path, _action);
var data = await zookeeper.getdataasync(_path, watcher);
var newdata = data.data;
_action(_currentdata, newdata);
watcher.setcurrentdata(newdata);
break;
}
}
#endregion overrides of watcherbase
}

连接断开监听器:

internal class reconnectionwatcher : watcher
{
private readonly action _reconnection;
public reconnectionwatcher(action reconnection)
{
_reconnection = reconnection;
}
#region overrides of watcher
/// <summary>processes the specified event.</summary>
/// <param name="watchedevent">the event.</param>
/// <returns></returns>
public override async task process(watchedevent watchedevent)
{
var state = watchedevent.getstate();
switch (state)
{
case event.keeperstate.expired:
case event.keeperstate.disconnected:
{
_reconnection();
break;
}
}
await task.completedtask;
}
#endregion overrides of watcher
}