如今有一个需求,应用启动时须要初始化一些数据,为了保证高可用,会启动多副本(replicas >= 3),如何保证数据不会重复?git
最简单的方法,初始化数据都带上主键,这样主键冲突就会报错。可是这么作咱们须要对冲突的错误进行额外处理,由于插入咱们通常会复用已写好的 DAO 层代码。github
另外,初始化数据的主键多是动态生成的,并不想把主键写死。因此下面来介绍这次的主角:基于 MySQL 的分布式锁的解决方案。redis
多副本分布式应用,在这种 n 选 1 竞争某个资源或执行权的场景,通常都会用到分布式锁。分布式有不少种实现方式,如基于 redis,etcd,zookeeper,file 等系统。本质上,就是找个多个节点都承认的地方保存数据,经过数据竞态来实现锁,固然这个依赖最好是高可用,不然会引起单点故障。分布式
多个副本都使用同一个 MySQL,因此咱们能够很方便的基于 MySQL 实现一个分布式锁。原理很简单,利用惟一索引保证只有一个副本能插入某条数据,插入成功则表示取锁成功,执行完毕则删除该条数据释放锁。测试
建一个表用来存放锁数据,将 Action 设为惟一索引,表示对某个动做加锁,如:init 初始化,cronjob 定时任务等不一样动做之间加锁互不影响。线程
type lock struct { Id string `gorm:"primary_key"` CreatedAt time.Time UpdatedAt time.Time ExpiredAt time.Time // 锁过时时间 Action string `gorm:"unique;not null"` Holder string // 持锁人信息,能够使用 hostname }
既然有过时时间,那么持锁时间设为多长合适呢?设置过短可能逻辑还没执行完锁就过时了;设置太长若是程序中途挂了没有释放锁,那么这段时间全部节点都拿不到锁。code
要解决这个问题咱们能够使用租约机制(lease),设置较短的持锁时间,而后在持锁周期内,不断延长持锁时间,直到主动释放。这样即便程序崩溃没有 UnLock,锁也会由于没有刷新租约很快过时,不影响其余节点获取锁。orm
Lock 时启动一个 goroutine 刷新租约,Unlock 时经过 stopCh 将其中止。索引
另外,MySQL 中并无线程去处理过时的记录,因此咱们在调用 Lock 时先尝试将过时记录删掉。进程
核心代码:
func NewLockDb(action, holder string, lease time.Duration) *lockDb { return &lockDb{ db: GetDB(context.Background()), stopCh: make(chan struct{}), action: action, holder: holder, leaseAge: lease, } } func (s *lockDb) Lock() (bool, error) { err := s.cleanExpired() if err != nil { return false, errx.WithStackOnce(err) } err = s.db.Create(&lock{ ExpiredAt: time.Now().Add(s.leaseAge), Action: s.action, Holder: s.holder, }).Error if err != nil { // Duplicate entry '<action_val>' for key 'action' if strings.Contains(err.Error(), "Duplicate entry") { return false, nil } return false, errx.WithStackOnce(err) } s.startLease() log.Debugf("%s get lock", s.holder) return true, nil } func (s *lockDb) UnLock() error { s.stopLease() var err error defer func() { err = s.db. Where("action = ? and holder = ?", s.action, s.holder). Delete(&lock{}). Error }() return err } func (s *lockDb) cleanExpired() error { err := s.db. Where("expired_at < ?", time.Now()). Delete(&lock{}). Error return err } func (s *lockDb) startLease() { go func() { // 剩余 1/4 时刷新租约 ticker := time.NewTicker(s.leaseAge * 3 / 4) for { select { case <-ticker.C: err := s.refreshLease() if err != nil { log.Errorf("refreash lease err: %s", err) } else { log.Debug("lease refreshed") } case <-s.stopCh: log.Debug("lease stopped") return } } }() } func (s *lockDb) stopLease() { close(s.stopCh) } func (s *lockDb) refreshLease() error { err := s.db.Model(&lock{}). Where("action = ? and holder = ?", s.action, s.holder). Update("expired_at", time.Now().Add(s.leaseAge)). Error return err }
使用及测试:
func TestLock(t *testing.T) { i := 3 wg := &sync.WaitGroup{} wg.Add(i) for i > 0 { holder := strconv.Itoa(i) action := "test" i-- go func() { defer wg.Done() locker := dbcore.NewLockDb(action, holder, 10*time.Second) if _, err := locker.Lock(); err != nil { t.Logf("not hold the lock, err: %+v", err) return } time.Sleep(30 * time.Second) locker.UnLock() }() } wg.Wait() }
完整代码:https://github.com/win5do/go-...
这个分布式锁实如今初始数据场景是够用了,但并不完美,例如:依赖时间同步,不能容忍时间偏斜;获取锁不是阻塞的,若是要抢锁须要使用方自旋; 锁不可重入,粒度是进程级别,同一个 Action,当前进程获取锁后,释放后才能再次获取锁。
你们能够思考一下如何完善。