分布式锁实现汇总

[TOC]html

分布式锁实现汇总

不少时候咱们须要保证同一时间一个方法只能被同一个线程调用,在单机环境中,Java中其实提供了不少并发处理相关的API,可是这些API在分布式场景中就无能为力了。也就是说单纯的Java Api并不能提供分布式锁的能力。java

针对分布式锁的实现目前有多种方案:redis

  • 基于数据库实现分布式锁
  • 基于缓存(redis,memcached)实现分布式锁
  • 基于Zookeeper实现分布式锁

基于数据库实现分布式锁

简单实现

直接建一张表,里面记录锁定的方法名 时间 便可。
须要加锁时,就插入一条数据,释放锁时就删除数据。算法

CREATE TABLE `methodLock` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
  `desc` varchar(1024) NOT NULL DEFAULT '备注信息',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';复制代码

当咱们想要锁住某个方法时,执行如下SQL:sql

insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)复制代码

由于咱们对method_name作了惟一性约束,这里若是有多个请求同时提交到数据库的话,数据库会保证只有一个操做能够成功,那么咱们就能够认为
操做成功的那个线程得到了该方法的锁,能够执行方法体内容。
当方法执行完毕以后,想要释放锁的话,须要执行如下Sql:数据库

delete from methodLock where method_name ='method_name'复制代码
存在的问题
  1. 这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会致使业务系统不可用。
  2. 这把锁没有失效时间,一旦解锁操做失败,就会致使锁记录一直在数据库中,其余线程没法再得到到锁。
  3. 这把锁只能是非阻塞的,由于数据的insert操做,一旦插入失败就会直接报错。没有得到锁的线程并不会进入排队队列,要想再次得到锁就要再次触发得到锁操做。
  4. 这把锁是非重入的,同一个线程在没有释放锁以前没法再次得到该锁。由于数据中数据已经存在了。
解决办法
  1. 单点问题能够用多数据库实例,同时塞N个表,N/2+1个成功就职务锁定成功
  2. 写一个定时任务,隔一段时间清除一次过时的数据。
  3. 写一个while循环,不断的重试插入,直到成功。
  4. 在数据库表中加个字段,记录当前得到锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,若是当前机器的主机信息和线程信息在数据库能够查到的话,直接把锁分配给他就能够了。
总结

数据库实现分布式锁的优势: 直接借助数据库,容易理解。apache

数据库实现分布式锁的缺点: 会有各类各样的问题,在解决问题的过程当中会使整个方案变得愈来愈复杂。缓存

操做数据库须要必定的开销,性能问题须要考虑。安全

基于缓存实现分布式锁

相比于用数据库来实现分布式锁,基于缓存实现的分布式锁的性能会更好一些。bash

目前有不少成熟的分布式产品,包括Redis、memcache、Tair等。

单点实现
步骤
  1. 获取锁的使用,使用setnx加锁,将值设为当前的时间戳,再使用expire设置一个过时值。
  2. 获取到锁则执行同步代码块,没获取则根据业务场景能够选择自旋、休眠、或作一个等待队列等拥有锁进程来唤醒(相似Synchronize的同步队列),在等待时使用ttl去检查是否有过时值,若是没有则使用expire设置一个。
  3. 执行完毕后,先根据value的值来判断是否是本身的锁,若是是的话则删除,不是则代表本身的锁已通过期,不须要删除。(此时出现因为过时而致使的多进程同时拥有锁的问题)
存在的问题
  1. 单点问题。若是单机redis挂掉了,那么程序会跟着出错
  2. 若是转移使用 slave节点,复制不是同步复制,会出现多个程序获取锁的状况
code
public Object around(ProceedingJoinPoint joinPoint) {
        try {
            MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
            Method method = methodSignature.getMethod();

            DLock dLock = method.getAnnotation(DLock.class);
            if (dLock != null) {
                String lockedPrefix = buildLockedPrefix(dLock, method, joinPoint.getArgs());
                long timeOut = dLock.timeOut();
                int expireTime = dLock.expireTime();
                long value = System.currentTimeMillis();
                if (lock(lockedPrefix, timeOut, expireTime, value)) {
                    try {
                        return joinPoint.proceed();
                    } catch (Throwable throwable) {
                        throwable.printStackTrace();
                    } finally {
                        unlock(lockedPrefix, value);
                    }
                } else {
                    recheck(lockedPrefix, expireTime);
                }

            }
        } catch (Exception e) {
            logger.error("DLockAspect around error", e);
        }
        return null;
    }

    /** * 检查是否设置过超时 * * @param lockedPrefix * @param expireTime */
    public void recheck(String lockedPrefix, int expireTime) {
        try {
            Result<Long> ttl = cacheFactory.getFactory().ttl(getLockedPrefix(lockedPrefix));
            if (ttl.isSuccess() && ttl.getValue() == -1) {
                Result<String> get = cacheFactory.getFactory().get(getLockedPrefix(lockedPrefix));
                //没有超时设置则设置超时
                if (get.isSuccess() && !StringUtils.isEmpty(get.getValue())) {
                    long oldTime = Long.parseLong(get.getValue());
                    long newTime = expireTime * 1000 - (System.currentTimeMillis() - oldTime);
                    if (newTime < 0) {
                        //已过超时时间 设默认最小超时时间
                        cacheFactory.getFactory().expire(getLockedPrefix(lockedPrefix), MIX_EXPIRE_TIME);
                    } else {
                        //未超过 设置为剩余超时时间
                        cacheFactory.getFactory().expire(getLockedPrefix(lockedPrefix), (int) newTime);
                    }
                    logger.info(lockedPrefix + "recheck:" + newTime);
                }
            }
            logger.info(String.format("执行失败lockedPrefix:%s count:%d", lockedPrefix, count++));
        } catch (Exception e) {
            logger.error("DLockAspect recheck error", e);
        }

    }
        public boolean lock(String lockedPrefix, long timeOut, int expireTime, long value) {
        long millisTime = System.currentTimeMillis();
        try {
            //在timeOut的时间范围内不断轮询锁
            while (System.currentTimeMillis() - millisTime < timeOut * 1000) {
                //锁不存在的话,设置锁并设置锁过时时间,即加锁
                Result<Long> result = cacheFactory.getFactory().setnx(getLockedPrefix(lockedPrefix), String.valueOf(value));
                if (result.isSuccess() && result.getValue() == 1) {

                    Result<Long> result1 = cacheFactory.getFactory().expire(getLockedPrefix(lockedPrefix), expireTime);
                    logger.info(lockedPrefix + "locked and expire " + result1.getValue());
                    return true;
                }
                //短暂休眠,避免可能的活锁
                Thread.sleep(100, RANDOM.nextInt(50000));
            }
        } catch (Exception e) {
            logger.error("lock error " + getLockedPrefix(lockedPrefix), e);
        }

        return false;
    }

    public void unlock(String lockedPrefix, long value) {
        try {
            Result<String> result = cacheFactory.getFactory().get(getLockedPrefix(lockedPrefix));
            String kvValue = result.getValue();
            if (!StringUtils.isEmpty(kvValue) && kvValue.equals(String.valueOf(value))) {
                cacheFactory.getFactory().del(getLockedPrefix(lockedPrefix));

            }
            logger.info(lockedPrefix + "unlock:" + kvValue + "----" + value);
        } catch (Exception e) {
            logger.error("unlock error" + getLockedPrefix(lockedPrefix), e);
        }
    }复制代码
RedLock

Redlock是Redis的做者antirez给出的集群模式的Redis分布式锁,它基于N个彻底独立的Redis节点(一般状况下N能够设置成5)。

步骤
  1. 获取当前时间(毫秒数)。
  2. 按顺序依次向N个Redis节点执行获取锁的操做。这个获取操做跟前面基于单Redis节点的获取锁的过程相同,包含随机字符串my_random_value,也包含过时时间(好比PX 30000,即锁的有效时间)。为了保证在某个Redis节点不可用的时候算法可以继续运行,这个获取锁的操做还有一个超时时间(time out),它要远小于锁的有效时间(几十毫秒量级)。客户端在向某个Redis节点获取锁失败之后,应该当即尝试下一个Redis节点。这里的失败,应该包含任何类型的失败,好比该Redis节点不可用,或者该Redis节点上的锁已经被其它客户端持有(注:Redlock原文中这里只提到了Redis节点不可用的状况,但也应该包含其它的失败状况)。
  3. 计算整个获取锁的过程总共消耗了多长时间,计算方法是用当前时间减去第1步记录的时间。若是客户端从大多数Redis节点(>= N/2+1)成功获取到了锁,而且获取锁总共消耗的时间没有超过锁的有效时间(lock validity time),那么这时客户端才认为最终获取锁成功;不然,认为最终获取锁失败。
  4. 若是最终获取锁成功了,那么这个锁的有效时间应该从新计算,它等于最初的锁的有效时间减去第3步计算出来的获取锁消耗的时间。
  5. 若是最终获取锁失败了(可能因为获取到锁的Redis节点个数少于N/2+1,或者整个获取锁的过程消耗的时间超过了锁的最初有效时间),那么客户端应该当即向全部Redis节点发起释放锁的操做。
优化

客户端1成功锁住了A, B, C,获取锁成功(但D和E没有锁住);节点C崩溃重启了,但客户端1在C上加的锁没有持久化下来,丢失了;节点C重启后,客户端2锁住了C, D, E,获取锁成功。客户端1和客户端2同时得到了锁(针对同一资源)。

这个问题能够延迟节点的恢复时间,时间长度应大于等于一个锁的过时时间。

存在的问题
  1. 时钟发生跳跃。这种状况发生时,直接致使全部的锁都超时,新的线程能够成功的获取锁,致使多线程同时处理。

关于RedLock的更多内容能够看:

基于Redis的分布式锁真的安全吗?(上)

基于Redis的分布式锁真的安全吗?(下)

一个比较好的实现:

Redission

Zookeeper锁

步骤
  1. 每一个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个惟一的瞬时有序节点。
  2. 判断是否获取锁的方式很简单,只须要判断有序节点中序号最小的一个。
  3. 当释放锁的时候,只需将这个瞬时节点删除便可。同时,其能够避免服务宕机致使的锁没法释放,而产生的死锁问题。
优势
  1. 无单点问题。ZK是集群部署的,只要集群中有半数以上的机器存活,就能够对外提供服务。

  2. 持有锁任意长的时间,可自动释放锁。使用Zookeeper能够有效的解决锁没法释放的问题,由于在建立锁的时候,客户端会在ZK中建立一个临时节点,一旦客户端获取到锁以后忽然挂掉(Session链接断开),那么这个临时节点就会自动删除掉。其余客户端就能够再次得到锁。这避免了基于Redis的锁对于有效时间(lock validity time)到底设置多长的两难问题。实际上,基于ZooKeeper的锁是依靠Session(心跳)来维持锁的持有状态的,而Redis不支持Sesion。

  3. 可阻塞。使用Zookeeper能够实现阻塞的锁,客户端能够经过在ZK中建立顺序节点,而且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端能够检查本身建立的节点是否是当前全部节点中序号最小的,若是是,那么本身就获取到锁,即可以执行业务逻辑了。

  4. 可重入。客户端在建立节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就能够了。若是和本身的信息同样,那么本身直接获取到锁,若是不同就再建立一个临时的顺序节点,参与排队。

问题
  1. 这种作法可能引起羊群效应,从而下降锁的性能。
  2. 性能不如缓存。由于每次在建立锁和释放锁的过程当中,都要动态建立、销毁瞬时节点来实现锁功能。ZK中建立和删除节点只能经过Leader服务器来执行,而后将数据同不到全部的Follower机器上。

一个比较好的实现:

Curator

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    try {
        return interProcessMutex.acquire(timeout, unit);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return true;
}
public boolean unlock() {
    try {
        interProcessMutex.release();
    } catch (Throwable e) {
        log.error(e.getMessage(), e);
    } finally {
        executorService.schedule(new Cleaner(client, path), delayTimeForClean, TimeUnit.MILLISECONDS);
    }
    return true;
}复制代码

acquire方法用户获取锁,release方法用于释放锁。

总结

使用Zookeeper实现分布式锁的优势: 有效的解决单点问题,不可重入问题,非阻塞问题以及锁没法释放的问题。实现起来较为简单。

使用Zookeeper实现分布式锁的缺点 : 性能上不如使用缓存实现分布式锁。 须要对ZK的原理有所了解。

三种方案的比较

从理解的难易程度角度(从低到高): 数据库 > 缓存 > Zookeeper

从实现的复杂性角度(从低到高): Zookeeper >= 缓存 > 数据库

从性能角度(从高到低): 缓存 > Zookeeper >= 数据库

从可靠性角度(从高到低): Zookeeper > 缓存 > 数据库

相关文章
相关标签/搜索