Etcd分布式锁:cp分布式锁的最佳实现

为何须要cp分布式锁

分布式锁的功能和诉求,咱们已经在Redis分布式锁:基于AOP和Redis实现的简易版分布式锁简单的介绍过了。java

目前自研的Redis分布式锁,已可知足大部分场景(非公平+可自动续期+可重入的分布式锁),可投入生产环境的单机环境中使用。可是由于是基于Redis单机的环境,只能用于并发量并不高的场景。随着接入的业务场景扩大,Redis单机已经变得不可靠了,那么接下来给咱们的选择只有两种: 一、Redis单机改成集群。 二、改用其余基于一致性算法的实现方式。redis

方案1有先天性的缺陷,redis集群没法保证一致性问题,在master节点宕机的瞬间,master和slave节点之间的数据多是不一致的。这将会致使服务a从master节点拿到了锁a,而后master节点宕机,在slave节点还没有彻底同步完master的数据以前,服务b将从slave节点上成功拿到一样的锁a。算法

而在其余基于一致性算法的实现方式上,zk和ectd是不错的选择。而后考虑到zk已廉颇老矣,咱们选择了ectd这个后起之秀。缓存

因为在分布式锁的场景内,咱们更关注的是锁的一致性,而非锁的可用性,因此cp锁比ap锁更可靠。bash

设计思路

etcd引入了租约的概念,咱们首先须要授予一个租约,而后同时设置租约的有效时间。租约的有效时间咱们能够用来做为锁的有效时间。网络

而后咱们能够直接调用etcd的lock功能,在指定的租约上对指定的lockName进行加锁操做。若是当前没有其余线程持有该锁,则该线程能直接持有锁。不然须要等待。这里咱们能够将timeout的时间设置为锁的等待时间来实现竞争锁失败等待获取的过程。固然因为网络波动等问题,我建议timeout的时间最少设置为500ms(或大家认为合理的数值)。多线程

而后解锁的过程,咱们放弃了etcd的unlock操做,而直接使用了etcd的revoke操做。之因此没采用unlock操做,一是由于unlock所须要的参数是上一步lock操做返回的lockKey,咱们并不但愿多维护一个字段,二是由于咱们最终会执行revoke操做,而revoke操做会将该租约下的全部key都失效,由于咱们目前目前设计的是一个租约对应一个锁,不存在会释放其它业务场景中的锁的状况。并发

此外,为了保证线程在等待获取锁的过程当中租约不会过时,因此咱们得为这个线程设置一个守护线程,在该线程授予租约后就开启守护线程,按期去判断是否须要续期。异步

和redis分布式锁不同的是,redis分布式锁的有效时间是缓存的有效时间,因此能够在获取锁成功后再开启用于续期的守护线程,而etcd分布式锁的有效时间是租约的有效时间,在等待获取锁的过程当中可能租约会过时,因此得在获取租约后就得开启守护线程。这样就增长了不少的复杂度。分布式

##具体实现 原生的etcd是经过Go语言来写的,直接在java程序中应用会有一点困难,因此咱们直接采用jetcd来做为etcd的客户端,这样在java程序中就可使用代码方式和etcd服务端通信。

jetcd提供了LeaseClient,咱们能够直接使用grant功能完成授予租约的操做。

public LockLeaseData getLeaseData(String lockName, Long lockTime) {
    try {
        LockLeaseData lockLeaseData = new LockLeaseData();
        CompletableFuture<LeaseGrantResponse> leaseGrantResponseCompletableFuture = client.getLeaseClient().grant(lockTime);
        Long leaseId = leaseGrantResponseCompletableFuture.get(1, TimeUnit.SECONDS).getID();
        lockLeaseData.setLeaseId(leaseId);
        CpSurvivalClam cpSurvivalClam = new CpSurvivalClam(Thread.currentThread(), leaseId, lockName, lockTime, this);
        Thread survivalThread = threadFactoryManager.getThreadFactory().newThread(cpSurvivalClam);
        survivalThread.start();
        lockLeaseData.setCpSurvivalClam(cpSurvivalClam);
        lockLeaseData.setSurvivalThread(survivalThread);
        return lockLeaseData;
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        return null;
    }
}
复制代码

此外如上所述,咱们在获取租约后,开启了CpSurvivalClam的守护线程来按期续期。CpSurvivalClam的实现和咱们在redis分布式锁的时候实现大致一致,差异只是将其中的expandLockTime操做改成了etcd中的keepAliveOnce。expandLockTime方法具体以下所示:

/**
 * 重置锁的有效时间
 *
 * @param leaseId 锁的租约id
 * @return 是否成功重置
 */
public Boolean expandLockTime(Long leaseId) {
    try {
        CompletableFuture<LeaseKeepAliveResponse> leaseKeepAliveResponseCompletableFuture = client.getLeaseClient().keepAliveOnce(leaseId);
        leaseKeepAliveResponseCompletableFuture.get();
        return Boolean.TRUE;
    } catch (InterruptedException | ExecutionException e) {
        return Boolean.FALSE;
    }
}
复制代码

而后jetcd提供了LockClient,咱们直接能够用lock功能,将leaseId和lockName传入,咱们会获得一个在该租约下的lockKey。此外为了保证加锁成功后,租约未过时。咱们加了一步timeToLive的操做,用于判断租约在获取锁成功后的是否还存活。若是ttl未大于0,则判断为加锁失败。

/**
 * 在指定的租约上加锁,若是租约过时,则算加锁失败。
 *
 * @param leaseId  锁的租约Id
 * @param lockName 锁的名称
 * @param waitTime 加锁过程当中的的等待时间,单位ms
 * @return 是否加锁成功
 */
public Boolean tryLock(Long leaseId, String lockName, Long waitTime) {
    try {
        CompletableFuture<LockResponse> lockResponseCompletableFuture = client.getLockClient().lock(ByteSequence.from(lockName, Charset.defaultCharset()), leaseId);
        long timeout = Math.max(500, waitTime);
        lockResponseCompletableFuture.get(timeout, TimeUnit.MILLISECONDS).getKey();
        CompletableFuture<LeaseTimeToLiveResponse> leaseTimeToLiveResponseCompletableFuture = client.getLeaseClient().timeToLive(leaseId, LeaseOption.DEFAULT);
        long ttl = leaseTimeToLiveResponseCompletableFuture.get(1, TimeUnit.SECONDS).getTTl();
        if (ttl > 0) {
            return Boolean.TRUE;
        } else {
            return Boolean.FALSE;
        }
    } catch (TimeoutException | InterruptedException | ExecutionException e) {
        return Boolean.FALSE;
    }
}
复制代码

解锁过程,咱们能够直接使用LeaseClient下的revoke操做,在撤销租约的同时将该租约下的lock释放。

/**
 * 取消租约,并释放锁
 *
 * @param leaseId 租约id
 * @return 是否成功释放
 */
public Boolean unLock(Long leaseId) {
    try {
        CompletableFuture<LeaseRevokeResponse> revokeResponseCompletableFuture = client.getLeaseClient().revoke(leaseId);
        revokeResponseCompletableFuture.get(1, TimeUnit.SECONDS);
        return Boolean.TRUE;
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        return Boolean.FALSE;
    }
}

复制代码

而后是统一的CpLock对象,封装了加解锁的过程,对外只暴露execute方法,避免使用者忘记解锁步骤。

public class CpLock {

    private String lockName;

    private LockEtcdClient lockEtcdClient;

    /**
     * 分布式锁的锁持有数
     */
    private volatile int state;

    private volatile transient Thread lockOwnerThread;

    /**
     * 当前线程拥有的lease对象
     */
    private FastThreadLocal<LockLeaseData> lockLeaseDataFastThreadLocal = new FastThreadLocal<>();
    /**
     * 锁自动释放时间,单位s,默认为30
     */
    private static Long LOCK_TIME = 30L;

    /**
     * 获取锁失败单次等待时间,单位ms,默认为300
     */
    private static Integer SLEEP_TIME_ONCE = 300;

    CpLock(String lockName, LockEtcdClient lockEtcdClient) {
        this.lockName = lockName;
        this.lockEtcdClient = lockEtcdClient;
    }

    private LockLeaseData getLockLeaseData(String lockName, long lockTime) {
        if (lockLeaseDataFastThreadLocal.get() != null) {
            return lockLeaseDataFastThreadLocal.get();
        } else {
            LockLeaseData lockLeaseData = lockEtcdClient.getLeaseData(lockName, lockTime);
            lockLeaseDataFastThreadLocal.set(lockLeaseData);
            return lockLeaseData;
        }
    }

    final Boolean tryLock(long waitTime) {
        final long startTime = System.currentTimeMillis();
        final long endTime = startTime + waitTime * 1000;
        final long lockTime = LOCK_TIME;
        final Thread current = Thread.currentThread();
        try {
            do {
                int c = this.getState();
                if (c == 0) {
                    LockLeaseData lockLeaseData = this.getLockLeaseData(lockName, lockTime);
                    if (Objects.isNull(lockLeaseData)) {
                        return Boolean.FALSE;
                    }
                    Long leaseId = lockLeaseData.getLeaseId();
                    if (lockEtcdClient.tryLock(leaseId, lockName, endTime - System.currentTimeMillis())) {
                        log.info("线程获取重入锁成功,cp锁的名称为{}", lockName);
                        this.setLockOwnerThread(current);
                        this.setState(c + 1);
                        return Boolean.TRUE;
                    }
                } else if (lockOwnerThread == Thread.currentThread()) {
                    if (c + 1 <= 0) {
                        throw new Error("Maximum lock count exceeded");
                    }
                    this.setState(c + 1);
                    log.info("线程重入锁成功,cp锁的名称为{},当前LockCount为{}", lockName, state);
                    return Boolean.TRUE;
                }
                int sleepTime = SLEEP_TIME_ONCE;
                if (waitTime > 0) {
                    log.info("线程暂时没法得到cp锁,当前已等待{}ms,本次将再等待{}ms,cp锁的名称为{}", System.currentTimeMillis() - startTime, sleepTime, lockName);
                    try {
                        Thread.sleep(sleepTime);
                    } catch (InterruptedException e) {
                        log.info("线程等待过程当中被中断,cp锁的名称为{}", lockName, e);
                    }
                }
            } while (System.currentTimeMillis() <= endTime);
            if (waitTime == 0) {
                log.info("线程得到cp锁失败,将放弃获取,cp锁的名称为{}", lockName);
            } else {
                log.info("线程得到cp锁失败,以前共等待{}ms,将放弃等待获取,cp锁的名称为{}", System.currentTimeMillis() - startTime, lockName);
            }
            this.stopKeepAlive();
            return Boolean.FALSE;
        } catch (Exception e) {
            log.error("execute error", e);
            this.stopKeepAlive();
            return Boolean.FALSE;
        }
    }

    /**
     * 中止续约,并将租约对象从线程中移除
     */
    private void stopKeepAlive() {
        LockLeaseData lockLeaseData = lockLeaseDataFastThreadLocal.get();
        if (Objects.nonNull(lockLeaseData)) {
            lockLeaseData.getCpSurvivalClam().stop();
            lockLeaseData.setCpSurvivalClam(null);
            lockLeaseData.getSurvivalThread().interrupt();
            lockLeaseData.setSurvivalThread(null);
        }
        lockLeaseDataFastThreadLocal.remove();
    }

    final void unLock() {
        if (lockOwnerThread == Thread.currentThread()) {
            int c = this.getState() - 1;
            if (c == 0) {
                this.setLockOwnerThread(null);
                this.setState(c);
                LockLeaseData lockLeaseData = lockLeaseDataFastThreadLocal.get();
                this.stopKeepAlive();
                //unLock操做必须在最后执行,避免其余线程获取到锁时的state等数据不正确
                lockEtcdClient.unLock(lockLeaseData.getLeaseId());
                log.info("重入锁LockCount-1,线程已成功释放锁,cp锁的名称为{}", lockName);
            } else {
                this.setState(c);
                log.info("重入锁LockCount-1,cp锁的名称为{},剩余LockCount为{}", lockName, c);
            }
        }
    }

    public <T> T execute(Supplier<T> supplier, int waitTime) {
        Boolean holdLock = Boolean.FALSE;
        Preconditions.checkArgument(waitTime >= 0, "waitTime必须为天然数");
        try {
            if (holdLock = this.tryLock(waitTime)) {
                return supplier.get();
            }
            return null;
        } catch (Exception e) {
            log.error("cpLock execute error", e);
            return null;
        } finally {
            if (holdLock) {
                this.unLock();
            }
        }
    }

    public <T> T execute(Supplier<T> supplier) {
        return this.execute(supplier, 0);
    }
}

复制代码

CpLock和以前Redis分布式锁中的ApLock实现大致一致。区别主要有:

一、由于咱们是在授予租约的操做中开启了守护线程,因此在竞争锁失败、出现异常和释放锁这些场景下,咱们必须得中止守护线程续期。又由于是可重入的场景,咱们又只但愿在state为0的状况下再去生成租约去竞争锁。因此避免多种状况判断,咱们引入了FastThreadLocal lockLeaseDataFastThreadLocal来保存当前线程的Lease对象。

二、redis分布式锁在任何场景下,等待获取锁都是经过休眠轮询的方式实现的,而在etcd场景下,咱们在state为0时经过etcd自身的等待逻辑来完成等待,在state非0场景下,依然经过休眠轮询的方式来实现等待。由于可能会存在state从非0转为0的状况,因此咱们的waitTime值是endTime - System.currentTimeMillis(),而非本来传入的waitTime。这样可以让等待时间更接近咱们指望值。

更新说明

本次更新,咱们实现了基于etcd的cp分布式锁,同时也修复了redis分布式锁中的一个隐藏问题。

以前的setState操做在unLock以后,这样在并发场景下会致使一个问题发生。线程a和线程b在竞争获取锁a,此时各自的局部变量c和state都为0,而后线程a在获取到了锁以后马上释放了锁,此时先执行了unLock,state仍是1,线程b成功得到锁,将state重置为c+1,依然是1,而后线程a执行setState,将stete改成0。此时线程b若是去释放锁,执行stete-1操做,变为了-1。这个问题主要是由于获取state值和state值修改操做是异步的,而在多线程场景下,分布式锁是经过lock控制的,咱们只须要将unLock操做挪到全部赋值以后便可解决这个问题。

后续计划

目前实现的cp分布式锁的版本,已可知足分布式锁的绝大部分场景(非公平+可自动续期+可重入+强一致性的分布式锁),已可投入生产环境的集群中使用。后续的计划中,ap锁和cp锁将会分别更新,会优化一些使用场景。也会尝试去解决公平锁的问题,以及循环获取锁须要等待休眠的问题。

以上计划已完成,如何实现公平锁可详见Etcd分布式锁(二):支持公平锁,避免某些场景下线程长期没法获取锁

本次cp分布式锁须要考虑大量的使用场景,目前只进行了小规模的测试,若有考虑不周的地方,还望你们海涵。

推荐阅读

一、Redis分布式锁:基于AOP和Redis实现的简易版分布式锁
二、Redis分布式锁(二):支持锁的续期,避免锁超时后致使多个线程得到锁
三、Redis分布式锁(三):支持锁可重入,避免锁递归调用时死锁

好了,咱们下一期再见,欢迎你们一块儿留言讨论。同时也欢迎点赞~

相关文章
相关标签/搜索