.Net Core为咱们提供了一套强大的Configuration配置系统,使用简单扩展性强。经过这套配置系统咱们能够将Json、Xml、Ini等数据源加载到程序中,也能够本身扩展其余形式的存储源。今天咱们要作的就是经过自定义的方式为其扩展Etcd数据源操做。git
在使用etcd以前咱们先介绍一下Etcd,我相信不少同窗都早有耳闻。Etcd是一款高可用、强一致的分布式KV存储系统,它内部采用raft协议做为一致性算法,自己也是基于GO语言开发的,最新版本为v3.4.9,具体版本下载地址可参阅官方GitHub地址。相信了解过K8S的同窗对这个确定不陌生,它是K8S的数据管理系统。官方地址为https://etcd.io/。
在此以前,我相信你们已经了解过不少存储系统了,Etcd到底能实现了什么功能呢?其一用于配置中心和服务发现,再者也能够实现分布式锁和消息系统。它自己就是基于目录型存储,而且内部有一套强大的Watch机制能够监听针对节点和数据的操做变化,每次对节点的事务操做都会有对于的版本信息。github
经过上面的介绍是否是感受和Zookeeper有点相似呢😂😂😂,网上有不少不少关于Etcd和Zookeeper的对比文章,大体以下能够获得如下结论
算法
功能 | Etcd | Zookeeper |
---|---|---|
分布式锁 | 有(采用节点版本号信息) | 有(采用临时节点和顺序临时节点) |
watcher | 有 | 有 |
一致性算法 | raft | zab |
选举 | 有 | 有 |
元数据(metadata)存储 | 有 | 有 |
应用场景 | Etcd | Zookeeper |
---|---|---|
发布与订阅(配置中心) | 有(不限次Watch) | 有(一次性触发的,须要从新注册Watch) |
软负载均衡 | 有 | 有 |
命名服务(Naming Service) | 有 | 有 |
服务发现 | 有(基于租约节点) | 有(基于临时节点) |
分布式通知/协调 | 有 | 有 |
集群管理与Master选举 | 有 | 有 |
分布式锁 | 有 | 有 |
分布式队列 | 有 | 有 |
在Nuget上能够搜索到不少.Net Core的Etcd客户端驱动程序,我使用了下载量最多的一个名字叫dotnet-etcd的驱动包,顺便找到了它在GayHub上,很差意思手滑打错了😱😱😱GitHub上的项目地址,大概学习了一下基本的使用方式。其实咱们结合Configuration配置这一块,只须要两个功能。一个是Get获取数据,另外一个是Watch节点变化(更新数据会用到)。我的认为,前期有目有边界的学习仍是很是重要的。json
前面咱们讲到过自定义扩展Configuration是很是方便的,相信了解过Configuration相关源码的小伙伴们已经很是熟悉了,大体总结一下分为三步:数据结构
public static class EtcdConfigurationExtensions { /// <summary> /// AddEtcd扩展方法 /// </summary> /// <param name="serverAddress">Etcd地址</param> /// <param name="path">读取路径</param> /// <returns></returns> public static IConfigurationBuilder AddEtcd(this IConfigurationBuilder builder, string serverAddress,string path) { return AddEtcd(builder, serverAddress:serverAddress, path: path,reloadOnChange: false); } /// <summary> /// AddEtcd扩展方法 /// </summary> /// <param name="serverAddress">Etcd地址</param> /// <param name="path">读取路径</param> /// <param name="reloadOnChange">若是数据发送改变是否刷新</param> /// <returns></returns> public static IConfigurationBuilder AddEtcd(this IConfigurationBuilder builder, string serverAddress, string path, bool reloadOnChange) { return AddEtcd(builder,options => { options.Address = serverAddress; options.Path = path; options.ReloadOnChange = reloadOnChange; }); } public static IConfigurationBuilder AddEtcd(this IConfigurationBuilder builder, Action<EtcdOptions> options) { EtcdOptions etcdOptions = new EtcdOptions(); options.Invoke(etcdOptions); return builder.Add(new EtcdConfigurationSource { EtcdOptions = etcdOptions }); } }
这里我还定义了一个EtcdOptions的POCO,用于承载读取Etcd的配置属性并发
public class EtcdOptions { /// <summary> /// Etcd地址 /// </summary> public string Address { get; set; } /// <summary> /// Etcd访问用户名 /// </summary> public string UserName { get; set; } /// <summary> /// Etcd访问密码 /// </summary> public string PassWord { get; set; } /// <summary> /// Etcd读取路径 /// </summary> public string Path { get; set; } /// <summary> /// 数据变动是否刷新读取 /// </summary> public bool ReloadOnChange { get; set; } }
接下来咱们定义EtcdConfigurationSource,这个类很是简单就是返回一个配置提供对象负载均衡
public class EtcdConfigurationSource : IConfigurationSource { public EtcdOptions EtcdOptions { get; set; } public IConfigurationProvider Build(IConfigurationBuilder builder) { return new EtcdConfigurationProvider(EtcdOptions); } }
真正的读取操做都在EtcdConfigurationProvider里分布式
public class EtcdConfigurationProvider : ConfigurationProvider { private readonly string _path; private readonly bool _reloadOnChange; private readonly EtcdClient _etcdClient; public EtcdConfigurationProvider(EtcdOptions options) { //实例化EtcdClient _etcdClient = new EtcdClient(options.Address,username: options.UserName,password: options.PassWord); _path = options.Path; _reloadOnChange = options.ReloadOnChange; } /// <summary> /// 重写加载方法 /// </summary> public override void Load() { //读取数据 LoadData(); //数据发生变化是否从新加载 if (_reloadOnChange) { ReloadData(); } } private void LoadData() { //读取Etcd里的数据 string result = _etcdClient.GetValAsync(_path).GetAwaiter().GetResult(); if (string.IsNullOrEmpty(result)) { return; } //转换一下数据结构,这里我使用的是json格式 //读取的数据只要赋值到Data属性上便可,IConfiguration真正读取的数据就是存储到Data的字典数据 Data = ConvertData(result); } private IDictionary<string,string> ConvertData(string result) { byte[] array = Encoding.UTF8.GetBytes(result); MemoryStream stream = new MemoryStream(array); //JsonConfigurationFileParser是将json数据转换为Configuration可读取的结构(复制JsonConfiguration类库里的😄😄😄) return JsonConfigurationFileParser.Parse(stream); } private void ReloadData() { WatchRequest request = new WatchRequest() { CreateRequest = new WatchCreateRequest() { //须要转换一个格式,由于etcd v3版本的接口都包含在grpc的定义中 Key = ByteString.CopyFromUtf8(_path) } }; //监听Etcd节点变化,获取变动数据,更新配置 _etcdClient.Watch(request, rsp => { if (rsp.Events.Any()) { var @event = rsp.Events[0]; //须要转换一个格式,由于etcd v3版本的接口都包含在grpc的定义中 Data = ConvertData(@event.Kv.Value.ToStringUtf8()); //须要调用ConfigurationProvider的OnReload方法触发ConfigurationReloadToken通知 //这样才能对使用Configuration的类发送数据变动通知 //好比IOptionsMonitor就是经过ConfigurationReloadToken通知变动数据的 OnReload(); } }); } }
使用方式以下ide
builder.AddEtcd("http://127.0.0.1:2379", "service/mydemo", true);
顺便给你们推荐一个Etcd可视化管理工具ETCD Manager,以便更好的学习Etcd。
到这里,基本上就结束了,是否是很是简单。主要仍是Configuration自己的设计思路比较清晰,因此实现起来也不费劲。工具
以上代码都已经上传了个人GitHub,该仓库还扩展了其余数据源的读取好比Consul、Properties文件、Yaml文件的读取,实现思路也都大体类似,有兴趣的同窗能够自行查阅。因为主要是讲解实现思路,可能许多细节并未作处理还望见谅。若是有疑问或者更好的建议,欢迎评论区交流指导。