在传统单体应用单机部署的状况下,可使用Java并发处理相关的API(如ReentrantLock或Synchronized)进行互斥控制。在单机环境中,Java中提供了不少并发处理相关的API。可是,随着业务发展的须要,原单体单机部署的系统被演化成分布式集群系统后,因为分布式系统多线程、多进程而且分布在不一样机器上,这将使原单机部署状况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就须要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!php
锁是在执行多线程时用于强行限制资源访问的同步机制,在单机系统上,可使用Java并发处理相关的API(如ReentrantLock或Synchronized)进行互斥控制。而在分布式系统场景下,实例会运行在多台机器上,为了使多进程(多实例上)对共享资源的读写同步,保证数据的最终一致性,引入了分布式锁。html
分布式锁应具有如下特色:java
分布式锁常见实现方式:git
网上常见的是基于Redis和ZooKeeper的实现,基于数据库的由于实现繁琐且性能较差,不想维护第三方中间件的能够考虑。本文主要描述基于 ETCD 的实现,etcd3 的client也给出了新的 api,使用上更为简单github
既然是锁,核心操做无外乎加锁、解锁。golang
Redis的加锁操做:算法
SET lock_name my_random_value NX PX 30000
Redis的解锁操做:数据库
del lock_name
etcd 支持如下功能,正是依赖这些功能来实现分布式锁的:api
实现过程:缓存
客户端链接 Etcd,以 /lock/mylock 为前缀建立全局惟一的 key,假设第一个客户端对应的 key="/lock/mylock/UUID1",第二个为 key="/lock/mylock/UUID2";客户端分别为本身的 key 建立租约 - Lease,租约的长度根据业务耗时肯定,假设为 15s;
当一个客户端持有锁期间,其它客户端只能等待,为了不等待期间租约失效,客户端需建立一个定时任务做为“心跳”进行续约。此外,若是持有锁期间客户端崩溃,心跳中止,key 将因租约到期而被删除,从而锁释放,避免死锁。
进行 put 操做,将步骤 1 中建立的 key 绑定租约写入 Etcd,根据 Etcd 的 Revision 机制,假设两个客户端 put 操做返回的 Revision 分别为 一、2,客户端需记录 Revision 用以接下来判断本身是否得到锁。
客户端之前缀 /lock/mylock 读取 keyValue 列表(keyValue 中带有 key 对应的 Revision),判断本身 key 的 Revision 是否为当前列表中最小的,若是是则认为得到锁;不然监听列表中前一个 Revision 比本身小的 key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则本身得到锁。
得到锁后,操做共享资源,执行业务代码。
完成业务流程后,删除对应的key释放锁。
自带的 etcdctl 能够模拟锁的使用:
// 第一个终端 $ ./etcdctl lock mutex1 mutex1/326963a02758b52d // 第二终端 $ ./etcdctl lock mutex1 // 当第一个终端结束了,第二个终端会显示 mutex1/326963a02758b531
在etcd的clientv3包中,实现了分布式锁。使用起来和mutex是相似的,为了了解其中的工做机制,这里简要的作一下总结。
etcd分布式锁的实如今go.etcd.io/etcd/clientv3/concurrency包中,主要提供了如下几个方法:
* func NewMutex(s *Session, pfx string) *Mutex, 用来新建一个mutex * func (m *Mutex) Lock(ctx context.Context) error,它会阻塞直到拿到了锁,而且支持经过context来取消获取锁。 * func (m *Mutex) Unlock(ctx context.Context) error,解锁
所以在使用etcd提供的分布式锁式很是简单,一般就是实例化一个mutex,而后尝试抢占锁,以后进行业务处理,最后解锁便可。
demo:
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" "log" "os" "os/signal" "time" ) func main() { c := make(chan os.Signal) signal.Notify(c) cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"localhost:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { log.Fatal(err) } defer cli.Close() lockKey := "/lock" go func () { session, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } m := concurrency.NewMutex(session, lockKey) if err := m.Lock(context.TODO()); err != nil { log.Fatal("go1 get mutex failed " + err.Error()) } fmt.Printf("go1 get mutex sucess\n") fmt.Println(m) time.Sleep(time.Duration(10) * time.Second) m.Unlock(context.TODO()) fmt.Printf("go1 release lock\n") }() go func() { time.Sleep(time.Duration(2) * time.Second) session, err := concurrency.NewSession(cli) if err != nil { log.Fatal(err) } m := concurrency.NewMutex(session, lockKey) if err := m.Lock(context.TODO()); err != nil { log.Fatal("go2 get mutex failed " + err.Error()) } fmt.Printf("go2 get mutex sucess\n") fmt.Println(m) time.Sleep(time.Duration(2) * time.Second) m.Unlock(context.TODO()) fmt.Printf("go2 release lock\n") }() <-c }
Lock()函数的实现很简单:
// Lock locks the mutex with a cancelable context. If the context is canceled // while trying to acquire the lock, the mutex tries to clean its stale lock entry. func (m *Mutex) Lock(ctx context.Context) error { s := m.s client := m.s.Client() m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease()) cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0) // put self in lock waiters via myKey; oldest waiter holds lock put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease())) // reuse key in case this session already holds the lock get := v3.OpGet(m.myKey) // fetch current holder to complete uncontended path with only one RPC getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...) resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit() if err != nil { return err } m.myRev = resp.Header.Revision if !resp.Succeeded { m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision } // if no key on prefix / the minimum rev is key, already hold the lock ownerKey := resp.Responses[1].GetResponseRange().Kvs if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header return nil } // wait for deletion revisions prior to myKey hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1) // release lock key if wait failed if werr != nil { m.Unlock(client.Ctx()) } else { m.hdr = hdr } return werr }
首先经过一个事务来尝试加锁,这个事务主要包含了4个操做: cmp、put、get、getOwner。须要注意的是,key是由pfx和Lease()组成的。
接下来才是经过判断来检查是否持有锁
m.myRev = resp.Header.Revision if !resp.Succeeded { m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision } // if no key on prefix / the minimum rev is key, already hold the lock ownerKey := resp.Responses[1].GetResponseRange().Kvs if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header return nil }
m.myRev是当前的版本号,resp.Succeeded是cmp为true时值为true,不然是false。这里的判断代表当同一个session非第一次尝试加锁,当前的版本号应该取这个key的最新的版本号。
下面是取得锁的持有者的key。若是当前没有人持有这把锁,那么默认当前会话得到了锁。或者锁持有者的版本号和当前的版本号一致, 那么当前的会话就是锁的持有者。
// wait for deletion revisions prior to myKey hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1) // release lock key if wait failed if werr != nil { m.Unlock(client.Ctx()) } else { m.hdr = hdr }
上面这段代码就很好理解了,由于走到这里说明没有获取到锁,那么这里等待锁的删除。
waitDeletes方法的实现也很简单,可是须要注意的是,这里的getOpts只会获取比当前会话版本号更低的key,而后去监控最新的key的删除。等这个key删除了,本身也就拿到锁了。
这种分布式锁的实现和我一开始的预想是不一样的。它不存在锁的竞争,不存在重复的尝试加锁的操做。而是经过使用统一的前缀pfx来put,而后根据各自的版本号来排队获取锁。效率很是的高。避免了惊群效应
如图所示,共有4个session来加锁,那么根据revision来排队,获取锁的顺序为session2 -> session3 -> session1 -> session4。
这里面须要注意一个惊群效应,每个client在锁住/lock这个path的时候,实际都已经插入了本身的数据,相似/lock/LEASE_ID,而且返回了各自的index(就是raft算法里面的日志索引),而只有最小的才算是拿到了锁,其余的client须要watch等待。例如client1拿到了锁,client2和client3在等待,而client2拿到的index比client3的更小,那么对于client1删除锁以后,client3其实并不关心,并不须要去watch。因此综上,等待的节点只须要watch比本身index小而且差距最小的节点删除事件便可。
etcd有多种使用场景,Master选举是其中一种。提及Master选举,过去经常使用zookeeper,经过建立EPHEMERAL_SEQUENTIAL节点(临时有序节点),咱们选择序号最小的节点做为Master,逻辑直观,实现简单是其优点,可是要实现一个高健壮性的选举并不简单,同时zookeeper繁杂的扩缩容机制也是沉重的负担。
master 选举根本上也是抢锁,与zookeeper直观选举逻辑相比,etcd的选举则须要在咱们熟悉它的一系列基本概念后,调动咱们充分的想象力:
至此,etcd选举的逻辑大致清晰了,但这一系列操做与zookeeper相比复杂不少,有没有已经封装好的库能够直接拿来用?etcd clientv3 concurrency中有对选举及分布式锁的封装。后面进一步发现,etcdctl v3里已经有master选举的实现了,下面针对这部分代码进行简单注释,在最后参考这部分代码实现本身的选举逻辑。
官方示例:https://github.com/etcd-io/et...
如crontab 示例:
package main import ( "context" "fmt" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" "log" "time" ) const prefix = "/election-demo" const prop = "local" func main() { endpoints := []string{"szth-cce-devops00.szth.baidu.com:8379"} cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { log.Fatal(err) } defer cli.Close() campaign(cli, prefix, prop) } func campaign(c *clientv3.Client, election string, prop string) { for { // 租约到期时间:5s s, err := concurrency.NewSession(c, concurrency.WithTTL(5)) if err != nil { fmt.Println(err) continue } e := concurrency.NewElection(s, election) ctx := context.TODO() log.Println("开始竞选") err = e.Campaign(ctx, prop) if err != nil { log.Println("竞选 leader失败,继续") switch { case err == context.Canceled: return default: continue } } log.Println("得到leader") if err := doCrontab(); err != nil { log.Println("调用主方法失败,辞去leader,从新竞选") _ = e.Resign(ctx) continue } return } } func doCrontab() error { for { fmt.Println("doCrontab") time.Sleep(time.Second * 4) //return fmt.Errorf("sss") } }
/* * 发起竞选 * 未当选leader前,会一直阻塞在Campaign调用 * 当选leader后,等待SIGINT、SIGTERM或session过时而退出 * https://github.com/etcd-io/etcd/blob/master/etcdctl/ctlv3/command/elect_command.go */ func campaign(c *clientv3.Client, election string, prop string) error { //NewSession函数中建立了一个lease,默认是60s TTL,并会调用KeepAlive,永久为这个lease自动续约(2/3生命周期的时候执行续约操做) s, err := concurrency.NewSession(c) if err != nil { return err } e := concurrency.NewElection(s, election) ctx, cancel := context.WithCancel(context.TODO()) donec := make(chan struct{}) sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigc cancel() close(donec) }() //竞选逻辑,将展开分析 if err = e.Campaign(ctx, prop); err != nil { return err } // print key since elected resp, err := c.Get(ctx, e.Key()) if err != nil { return err } display.Get(*resp) select { case <-donec: case <-s.Done(): return errors.New("elect: session expired") } return e.Resign(context.TODO()) } /* * 相似于zookeeper的临时有序节点,etcd的选举也是在相应的prefix path下面建立key,该key绑定了lease并根据lease id进行命名, * key建立后就有revision号,这样使得在prefix path下的key也都是按revision有序 * https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/election.go */ func (e *Election) Campaign(ctx context.Context, val string) error { s := e.session client := e.session.Client() //真正建立的key名为:prefix + lease id k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease()) //Txn:transaction,依靠Txn进行建立key的CAS操做,当key不存在时才会成功建立 txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0)) txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease()))) txn = txn.Else(v3.OpGet(k)) resp, err := txn.Commit() if err != nil { return err } e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s //若是key已存在,则建立失败; //当key的value与当前value不等时,若是本身为leader,则不用从新执行选举直接设置value; //不然报错。 if !resp.Succeeded { kv := resp.Responses[0].GetResponseRange().Kvs[0] e.leaderRev = kv.CreateRevision if string(kv.Value) != val { if err = e.Proclaim(ctx, val); err != nil { e.Resign(ctx) return err } } } //一直阻塞,直到确认本身的create revision为当前path中最小,从而确认本身当选为leader _, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1) if err != nil { // clean up in case of context cancel select { case <-ctx.Done(): e.Resign(client.Ctx()) default: e.leaderSession = nil } return err } e.hdr = resp.Header return nil }
锁基础:
https://tech.meituan.com/2018...