etcd 是一个高可用强一致性的键值仓库在不少分布式系统架构中获得了普遍的应用,本教程结合一些简单的例子介绍golang版本的etcd/clientv3
中提供的主要功能及其使用方法。git
若是还不熟悉etcd推荐先阅读:github
看图轻松了解etcdgolang
etcd经常使用操做介绍json
Let's get started now!bash
咱们使用v3版本的etcd client, 首先经过go get
下载并编译安装etcd clinet v3
。架构
go get github.com/coreos/etcd/clientv3
复制代码
该命令会将包下载到$GOPATH/src/github.com/coreos/etcd/clientv3
中,全部相关依赖包会自动下载编译,包括protobuf
、grpc
等。mvc
官方文档地址:godoc.org/github.com/…app
文档中列出了Go官方实现的etcd client中支持的全部方法,方法仍是不少的,咱们主要梳理一下使用etcd时常常用到的主要API并进行演示。分布式
用程序访问etcd首先要建立client,它须要传入一个Config配置,这里传了2个选项:函数
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
// Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}
DialTimeout: 5 * time.Second,
})
复制代码
返回的client
,它的类型具体以下:
type Client struct {
Cluster
KV
Lease
Watcher
Auth
Maintenance
// Username is a user name for authentication.
Username string
// Password is a password for authentication.
Password string
// contains filtered or unexported fields
}
复制代码
类型中的成员是etcd客户端几何核心功能模块的具体实现,它们分别用于:
咱们须要使用什么功能,就去client里获取对应的成员便可。
Client.KV是一个
interface`,提供了关于K-V操做的全部方法:
type KV interface {
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
Do(ctx context.Context, op Op) (OpResponse, error)
// Txn creates a transaction.
Txn(ctx context.Context) Txn
}
复制代码
咱们经过方法clientv3.NewKV()
来得到KV接口的实现(实现中内置了错误重试机制):
kv := clientv3.NewKV(cli)
复制代码
接下来,咱们将经过kv
操做etcd中的数据。
putResp, err := kv.Put(context.TODO(),"/test/key1", "Hello etcd!")
复制代码
第一个参数是goroutine
的上下文Context
。后面两个参数分别是key和value,对于etcd来讲,key=/test/key1只是一个字符串而已,可是对咱们而言却能够模拟出目录层级关系。
Put函数的声明以下:
// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
复制代码
除了上面例子中的三个的参数,还支持一个变长参数,能够传递一些控制项来影响Put的行为,例如能够携带一个lease ID来支持key过时。
Put操做返回的是PutResponse,不一样的KV操做对应不一样的response结构,全部KV操做返回的response结构以下:
type (
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
)
复制代码
程序代码里导入clientv3
后在GoLand中能够很快定位到PutResponse
的定义文件中,PutResponse只是pb.PutResponse的类型别名,经过Goland跳转过去后能够看到PutResponse的详细定义。
type PutResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// if prev_kv is set in the request, the previous key-value pair will be returned.
PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
复制代码
Header里保存的主要是本次更新的revision信息,而PrevKv能够返回Put覆盖以前的value是什么(目前是nil,后面会说缘由),把返回的PutResponse
打印出来看一下:
fmt.Printf("PutResponse: %v, err: %v", putResp, err)
// output
// PutResponse: &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:3 raft_term:7 <nil>}, err: <nil>%
复制代码
咱们须要判断err来肯定操做是否成功。
咱们再Put其余2个key,用于后续演示:
kv.Put(context.TODO(),"/test/key2", "Hello World!")
// 再写一个同前缀的干扰项
kv.Put(context.TODO(), "/testspam", "spam")
复制代码
如今/test目录下有两个键: key1和key2, 而/testspam并不归属于/test目录
使用KV的Get
方法来读取给定键的值:
getResp, err := kv.Get(context.TODO(), "/test/key1")
复制代码
其函数声明以下:
// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
复制代码
和Put相似,函数注释里提示咱们能够传递一些控制参数来影响Get的行为,好比:WithFromKey表示读取从参数key开始递增的全部key,而不是读取单个key。
在上面的例子中,我没有传递opOption,因此就是获取key=/test/key1的最新版本数据。
这里err并不能反馈出key是否存在(只能反馈出本次操做由于各类缘由异常了),咱们须要经过GetResponse(其实是pb.RangeResponse)判断key是否存在:
type RangeResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// kvs is the list of key-value pairs matched by the range request.
// kvs is empty when count is requested.
Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`
// more indicates if there are more keys to return in the requested range.
More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
// count is set to the number of keys within the range when requested.
Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
}
复制代码
Kvs字段,保存了本次Get查询到的全部k-v对,由于上述例子只Get了一个单key,因此只须要判断一下len(Kvs)是否等于1便可知道key是否存在。
RangeResponse.More
和Count
,当咱们使用withLimit()
等选项进行Get
时会发挥做用,至关于翻页查询。
接下来,咱们经过给Get查询增长WithPrefix选项,获取/test目录下的全部子元素:
rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())
复制代码
WithPrefix()
是指查找以/test/
为前缀的全部key,所以能够模拟出查找子目录的效果。
etcd
是一个有序的k-v存储,所以/test/为前缀的key老是顺序排列在一块儿。
withPrefix()
实际上会转化为范围查询,它根据前缀/test/
生成了一个前闭后开的key range:[“/test/”, “/test0”)
,为何呢?由于比/
大的字符是0
,因此以/test0
做为范围的末尾,就能够扫描到全部以/test/
为前缀的key了。
在以前,咱们Put了一个/testspam
键值,由于不符合/test/
前缀(注意末尾的/),因此就不会被此次Get
获取到。可是,若是查询的前缀是/test
,那么/testspam
就会被返回,使用时必定要特别注意。
打印rangeResp.Kvs能够看到得到了两个键值:
[key:"/test/key1" create_revision:2 mod_revision:13 version:6 value:"Hello etcd!" key:"/test/key2" create_revision:5 mod_revision:14 version:4 value:"Hello World!" ]
复制代码
etcd客户端的Lease对象能够经过如下的代码获取到
lease := clientv3.NewLease(cli)
复制代码
lease对象是Lease接口的实现,Lease接口的声明以下:
type Lease interface {
// Grant 建立一个新租约
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
// Revoke 销毁给定租约ID的租约
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
// TimeToLive retrieves the lease information of the given lease ID.
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
// KeepAlive keeps the given lease alive forever.
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
// should be used instead of KeepAliveOnce.
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
// Close releases all resources Lease keeps for efficient communication
// with the etcd server.
Close() error
}
复制代码
Lease提供了如下功能:
要想实现key自动过时,首先得建立一个租约,下面的代码建立一个TTL为10秒的租约:
grantResp, err := lease.Grant(context.TODO(), 10)
复制代码
返回的grantResponse的结构体声明以下:
// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {
*pb.ResponseHeader
ID LeaseID
TTL int64
Error string
}
复制代码
在应用程序代码中主要使用到的是租约ID。
接下来咱们用这个Lease往etcd中存储一个10秒过时的key:
kv.Put(context.TODO(), "/test/vanish", "vanish in 10s", clientv3.WithLease(grantResp.ID))
复制代码
这里特别须要注意,有一种状况是在Put以前Lease已通过期了,那么这个Put操做会返回error,此时你须要从新分配Lease。
当咱们实现服务注册时,须要主动给Lease进行续约,一般是以小于TTL的间隔循环调用Lease的KeepAliveOnce()方法对租约进行续期,一旦某个服务节点出错没法完成租约的续期,等key过时后客户端即没法在查询服务时得到对应节点的服务,这样就经过租约到期实现了服务的错误隔离。
keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)
复制代码
或者使用KeepAlive()
方法,其会返回<-chan *LeaseKeepAliveResponse
只读通道,每次自动续租成功后会向通道中发送信号。通常都用KeepAlive()
方法
KeepAlive和Put同样,若是在执行以前Lease就已通过期了,那么须要从新分配Lease。etcd并无提供API来实现原子的Put with Lease,须要咱们本身判断err从新分配Lease。
Op字面意思就是”操做”,Get和Put都属于Op,只是为了简化用户开发而开放的特殊API。
KV对象有一个Do方法接受一个Op:
// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)
复制代码
其参数Op是一个抽象的操做,能够是Put/Get/Delete…;而OpResponse是一个抽象的结果,能够是PutResponse/GetResponse…
能够经过Client中定义的一些方法来建立Op:
其实和直接调用KV.Put,KV.GET没什么区别。
下面是一个例子:
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
ops := []clientv3.Op{
clientv3.OpPut("put-key", "123"),
clientv3.OpGet("put-key"),
clientv3.OpPut("put-key", "456")}
for _, op := range ops {
if _, err := cli.Do(context.TODO(), op); err != nil {
log.Fatal(err)
}
}
复制代码
把Op交给Do方法执行,返回的opResp结构以下:
type OpResponse struct {
put *PutResponse
get *GetResponse
del *DeleteResponse
txn *TxnResponse
}
复制代码
你的操做是什么类型,你就用哪一个指针来访问对应的结果。
etcd中事务是原子执行的,只支持if … then … else …这种表达。首先来看一下Txn中定义的方法:
type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
If(cs ...Cmp) Txn
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
Then(ops ...Op) Txn
// Else takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() fail.
Else(ops ...Op) Txn
// Commit tries to commit the transaction.
Commit() (*TxnResponse, error)
}
复制代码
Txn必须是这样使用的:If(知足条件) Then(执行若干Op) Else(执行若干Op)。
If中支持传入多个Cmp比较条件,若是全部条件知足,则执行Then中的Op(上一节介绍过Op),不然执行Else中的Op。
首先,咱们须要开启一个事务,这是经过KV对象的方法实现的:
txn := kv.Txn(context.TODO())
复制代码
下面的测试程序,判断若是k1的值大于v1而且k1的版本号是2,则Put 键值k2和k3,不然Put键值k4和k5。
kv.Txn(context.TODO()).If(
clientv3.Compare(clientv3.Value(k1), ">", v1),
clientv3.Compare(clientv3.Version(k1), "=", 2)
).Then(
clientv3.OpPut(k2,v2), clentv3.OpPut(k3,v3)
).Else(
clientv3.OpPut(k4,v4), clientv3.OpPut(k5,v5)
).Commit()
复制代码
相似于clientv3.Value()\用于指定key属性的,有这么几个方法:
Watch用于监听某个键的变化, Watch
调用后返回一个WatchChan
,它的类型声明以下:
type WatchChan <-chan WatchResponse
type WatchResponse struct {
Header pb.ResponseHeader
Events []*Event
CompactRevision int64
Canceled bool
Created bool
}
复制代码
当监听的key有变化后会向WatchChan
发送WatchResponse
。Watch的典型应用场景是应用于系统配置的热加载,咱们能够在系统读取到存储在etcd key中的配置后,用Watch监听key的变化。在单独的goroutine中接收WatchChan发送过来的数据,并将更新应用到系统设置的配置变量中,好比像下面这样在goroutine中更新变量appConfig,这样系统就实现了配置变量的热加载。
type AppConfig struct {
config1 string
config2 string
}
var appConfig Appconfig
func watchConfig(clt *clientv3.Client, key string, ss interface{}) {
watchCh := clt.Watch(context.TODO(), key)
go func() {
for res := range watchCh {
value := res.Events[0].Kv.Value
if err := json.Unmarshal(value, ss); err != nil {
fmt.Println("now", time.Now(), "watchConfig err", err)
continue
}
fmt.Println("now", time.Now(), "watchConfig", ss)
}
}()
}
watchConfig(client, "config_key", &appConfig)
复制代码
golang etcd clientv3的主要功能就是这些,但愿能帮你们梳理出学习脉络,这样工做中应用到etcd时再看官方文档就会容易不少。