kubernetes中基于etcd实现集中的数据存储,今天来学习下基于etcd如何实现数据读取一致性、更新一致性、事务的具体实现数组
在k8s中有部分数据的存储是须要通过处理以后才能存储的,好比secret这种加密的数据,既然要存储就至少包含两个操做,加密存储,解密读取,transformer就是为了完成该操做而实现的,其在进行etcd数据存储的时候回对数据进行加密,而在读取的时候,则会进行解密微信
在etcd中进行修改(增删改)操做的时候,都会递增revision,而在k8s中也经过该值来做为k8s资源的ResourceVersion,该机制也是实现watch的关键机制,在操做etcd解码从etcd获取的数据的时候,会经过versioner组件来为资源动态的修改该值并发
将数据从etcd中读取后,数据自己就是一个字节数组,如何将对应的数据转换成咱们真正的运行时对象呢?还记得咱们以前的scheme与codec么,在这里咱们知道对应的数据编码格式,也知道资源对象的类型,则经过codec、字节数组、目标类型,咱们就能够完成对应数据的反射ide
etcd中的数据写入是基于leader单点写入和集群quorum机制实现的,并非一个强一致性的数据写入,则若是若是咱们访问的节点不存在quorum的半数节点内,则可能形成短暂的数据不一致,针对一些强一致的场景,咱们能够经过其revision机制来进行数据的读取, 保证咱们读取到更新以后的数据oop
// 省略非核心代码 func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error { // 获取key getResp, err := s.client.KV.Get(ctx, key, s.getOps...) // 检测当前版本,是否达到最小版本的 if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil { return err } // 执行数据转换 data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key)) if err != nil { return storage.NewInternalError(err.Error()) } // 解码数据 return decode(s.codec, s.versioner, data, out, kv.ModRevision) }
建立一个接口数据则会首先进行资源对象的检查,避免重复建立对象,此时会先经过资源对象的version字段来进行初步检查,而后在利用etcd的事务机制来保证资源建立的原子性操做源码分析
// 省略非核心代码 func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 { return errors.New("resourceVersion should not be set on objects to be created") } if err := s.versioner.PrepareObjectForStorage(obj); err != nil { return fmt.Errorf("PrepareObjectForStorage failed: %v", err) } // 将数据编码 data, err := runtime.Encode(s.codec, obj) if err != nil { return err } // 转换数据 newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key)) if err != nil { return storage.NewInternalError(err.Error()) } startTime := time.Now() // 事务操做 txnResp, err := s.client.KV.Txn(ctx).If( notFound(key), // 若是以前不存在 这里是利用的etcd的ModRevision即修改版本为0, 寓意着对应的key不存在 ).Then( clientv3.OpPut(key, string(newData), opts...), // put修改数据 ).Commit() metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) if err != nil { return err } if !txnResp.Succeeded { return storage.NewKeyExistsError(key, 0) } if out != nil { // 获取对应的Revision putResp := txnResp.Responses[0].GetResponsePut() return decode(s.codec, s.versioner, data, out, putResp.Header.Revision) } return nil } func notFound(key string) clientv3.Cmp { return clientv3.Compare(clientv3.ModRevision(key), "=", 0) }
删除接口主要是经过CAS和事务机制来共同实现,确保在etcd不发生异常的状况,即便并发对同个资源来进行删除操做也能保证至少有一个节点成功学习
// 省略非核心代码 func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error { startTime := time.Now() // 获取当前的key的数据 getResp, err := s.client.KV.Get(ctx, key) for { // 获取当前的状态 origState, err := s.getState(getResp, key, v, false) if err != nil { return err } txnResp, err := s.client.KV.Txn(ctx).If( clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), // 若是修改版本等于当前状态,就尝试删除 ).Then( clientv3.OpDelete(key), // 删除 ).Else( clientv3.OpGet(key), // 获取 ).Commit() if !txnResp.Succeeded { // 获取最新的数据重试事务操做 getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key) continue } // 将最后一个版本的数据解码到out里面,而后返回 return decode(s.codec, s.versioner, origState.data, out, origState.rev) } }
更新接口实现上与删除接口并没有本质上的差异,可是若是多个节点同时进行更新,CAS并发操做必然会有一个节点成功,当发现已经有节点操做成功,则当前节点其实并不须要再作过多的操做,直接返回便可fetch
// 省略非核心代码 func (s *store) GuaranteedUpdate( ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error { // 获取当前key的最新数据 getCurrentState := func() (*objState, error) { startTime := time.Now() getResp, err := s.client.KV.Get(ctx, key, s.getOps...) metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) if err != nil { return nil, err } return s.getState(getResp, key, v, ignoreNotFound) } // 获取当前数据 var origState *objState var mustCheckData bool if len(suggestion) == 1 && suggestion[0] != nil { // 若是提供了建议的数据,则会使用, origState, err = s.getStateFromObject(suggestion[0]) if err != nil { return err } //可是须要检测数据 mustCheckData = true } else { // 尝试从新获取数据 origState, err = getCurrentState() if err != nil { return err } } transformContext := authenticatedDataString(key) for { // 检查对象是否已经更新, 主要是经过检测uuid/revision来实现 if err := preconditions.Check(key, origState.obj); err != nil { // If our data is already up to date, return the error if !mustCheckData { return err } // 若是检查数据一致性错误,则须要从新获取 origState, err = getCurrentState() if err != nil { return err } mustCheckData = false // Retry continue } // 删除当前的版本数据revision ret, ttl, err := s.updateState(origState, tryUpdate) if err != nil { // If our data is already up to date, return the error if !mustCheckData { return err } // It's possible we were working with stale data // Actually fetch origState, err = getCurrentState() if err != nil { return err } mustCheckData = false // Retry continue } // 编码数据 data, err := runtime.Encode(s.codec, ret) if err != nil { return err } if !origState.stale && bytes.Equal(data, origState.data) { // 若是咱们发现咱们当前的数据与获取到的数据一致,则会直接跳过 if mustCheckData { origState, err = getCurrentState() if err != nil { return err } mustCheckData = false if !bytes.Equal(data, origState.data) { // original data changed, restart loop continue } } if !origState.stale { // 直接返回数据 return decode(s.codec, s.versioner, origState.data, out, origState.rev) } } // 砖汉数据 newData, err := s.transformer.TransformToStorage(data, transformContext) if err != nil { return storage.NewInternalError(err.Error()) } opts, err := s.ttlOpts(ctx, int64(ttl)) if err != nil { return err } trace.Step("Transaction prepared") startTime := time.Now() // 事务更新数据 txnResp, err := s.client.KV.Txn(ctx).If( clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), ).Then( clientv3.OpPut(key, string(newData), opts...), ).Else( clientv3.OpGet(key), ).Commit() metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime) if err != nil { return err } trace.Step("Transaction committed") if !txnResp.Succeeded { // 从新获取数据 getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange()) klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key) origState, err = s.getState(getResp, key, v, ignoreNotFound) if err != nil { return err } trace.Step("Retry value restored") mustCheckData = false continue } // 获取put响应 putResp := txnResp.Responses[0].GetResponsePut() return decode(s.codec, s.versioner, data, out, putResp.Header.Revision) } }
transformer的实现和注册地方我并无找到,只看到了几个覆盖资源类型的地方,还有list/watch接口,后续再继续学习,今天就先到这里,下次再见ui
> 微信号:baxiaoshi2020 > 关注公告号阅读更多源码分析文章
> 更多文章关注 www.sreguide.com > 本文由博客一文多发平台 OpenWrite 发布编码