分布式锁redis实现原理及zk实现原理

最近在准备面试,又看到了redis实现分布式锁,同事也想到了zk也能够实现分布式锁。以前也有看过这两种方式实现分布式锁的原理,可是时间一长就又忘记了!今天把它整理一下,但愿能帮到面试的人,同事也为了增强记忆吧!好了,废话很少说了。开始正篇。面试

redis实现分布式锁(实现思路)redis

如下是基本的算法,还有一种是redis官网提供的基于redLock算法实现的分布式锁,此处就不作介绍了算法

一、获取当前时间戳与锁的过时时间相加后获得锁要过时的时间,把这个要过时的时间设置为value,锁做为key。调用setnx方法,若是返回1证实锁不存在,能够成功获取锁,获取成功后,设置expire超市时间。返回获取锁apache

二、若是setnx返回0,证实原来锁存在,没有获取成功。而后调用get方法,获取第一步中存入的过时时间。与当前时间戳比较,若是当前时间大于过时时间,则证实锁已过时,调用getset方法,把新的时间存进去,返回获取锁成功。这里进行时间比较主要是应对expire执行失败,或服务重启状况下出现的没法释放锁的状况。api

代码缓存

import com.eduapi.common.component.RedisComponent;
import com.eduapi.common.util.BeanUtils;
import org.apache.commons.lang3.StringUtils;

/**
 * @Description: 利用redis实现分布式锁.
 * @Author: ZhaoWeiNan .
 * @CreatedTime: 2017/3/20 .
 * @Version: 1.0 .
 */
public class RedisLock {

    private RedisComponent redisComponent;

    private static final int DEFAULT_ACQUIRY_RESOLUTION_MILLIS = 100;

    private String lockKey;

    /**
     * 锁超时时间,防止线程在入锁之后,无限的执行等待
     */
    private int expireMillisCond = 60 * 1000;

    /**
     * 锁等待时间,防止线程饥饿
     */
    private int timeoutMillisCond = 10 * 1000;

    private volatile boolean isLocked = false;

    public RedisLock(RedisComponent redisComponent, String lockKey) {
        this.redisComponent = redisComponent;
        this.lockKey = lockKey;
    }

    public RedisLock(RedisComponent redisComponent, String lockKey, int timeoutMillisCond) {
        this(redisComponent, lockKey);
        this.timeoutMillisCond = timeoutMillisCond;
    }

    public RedisLock(RedisComponent redisComponent, String lockKey, int timeoutMillisCond, int expireMillisCond) {
        this(redisComponent, lockKey, timeoutMillisCond);
        this.expireMillisCond = expireMillisCond;
    }

    public RedisLock(RedisComponent redisComponent, int expireMillisCond, String lockKey) {
        this(redisComponent, lockKey);
        this.expireMillisCond = expireMillisCond;
    }

    public String getLockKey() {
        return lockKey;
    }

    /**
     * 得到 lock. (把大神的思路粘过来了)
     * 实现思路: 主要是使用了redis 的setnx命令,缓存了锁.
     * reids缓存的key是锁的key,全部的共享, value是锁的到期时间(注意:这里把过时时间放在value了,没有时间上设置其超时时间)
     * 执行过程:
     * 1.经过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功得到锁
     * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
     *
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public synchronized boolean lock() throws InterruptedException {
        int timeout = timeoutMillisCond;

        boolean flag = false;

        while (timeout > 0){
            //设置所获得期时间
            Long expires = System.currentTimeMillis() + expireMillisCond;
            String expiresStr = BeanUtils.convertObject2String(expires);

            //原来redis里面没有锁,获取锁成功
            if (this.redisComponent.setNx(lockKey,expiresStr) > 0){
                //设置锁的过时时间
                this.redisComponent.expire(lockKey,expireMillisCond);
                isLocked = true;
                return true;
            }

            flag = compareLock(expiresStr);

            if (flag){
                return flag;
            }

            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;

            /*
                延迟100 毫秒,  这里使用随机时间可能会好一点,能够防止饥饿进程的出现,即,当同时到达多个进程,
                只会有一个进程得到锁,其余的都用一样的频率进行尝试,后面有来了一些进程,也以一样的频率申请锁,这将可能致使前面来的锁得不到知足.
                使用随机的等待时间能够必定程度上保证公平性
             */
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);
        }
        return false;
    }

    /**
     * 排他锁。做用至关于 synchronized 同步快
     * @return
     * @throws InterruptedException
     */
    public synchronized boolean excludeLock() {

        //设置所获得期时间
        long expires = System.currentTimeMillis() + expireMillisCond;
        String expiresStr = BeanUtils.convertObject2String(expires);

        //原来redis里面没有锁,获取锁成功
        if (this.redisComponent.setNx(lockKey,expiresStr) > 0){
            //设置锁的过时时间
            this.redisComponent.expire(lockKey,expireMillisCond);
            isLocked = true;
            return true;
        }

        return compareLock(expiresStr);
    }

    /**
     * 比较是否能够获取锁
     * 锁超时时 获取
     * @param expiresStr
     * @return
     */
    private boolean compareLock(String expiresStr){
        //假如两个线程走到这里
        //由于redis是单线程的获取到
        // A线程获取  currentValueStr = 1 B线程获取 currentValueStr = 1
        String currentValueStr = this.redisComponent.get(lockKey);

        //锁
        if (StringUtils.isNotEmpty(currentValueStr) && Long.parseLong(currentValueStr) < System.currentTimeMillis()){

            //获取上一个锁到期时间,并设置如今的锁到期时间,
            //只有一个线程才能获取上一个线程的设置时间,由于jedis.getSet是同步的
            //只有A线程 把 2 存进去了。 取出了 1, 对比得到了锁
            //B线程 吧 2存进去了。 获取 2.。对比 没有得到锁,
            String oldValue = this.redisComponent.getSet(lockKey,expiresStr);

            if (StringUtils.isNotEmpty(oldValue) && StringUtils.equals(oldValue,currentValueStr)){

                //防止误删(覆盖,由于key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,可是由于什么相差了不多的时间,因此能够接受
                //[分布式的状况下]:如过这个时候,多个线程刚好都到了这里,可是只有一个线程的设置值和当前值相同,他才有权利获取锁
                isLocked = true;
                return true;
            }
        }
        return false;
    }

    /**
     * 释放锁
     */
    public synchronized void unlock(){
        if (isLocked){
            this.redisComponent.delete(lockKey);
            isLocked = false;
        }
    }

}

调用代码分布式

 

//建立锁对象, redisComponent 为redis组件的对象   过时时间  锁的key
        RedisLock redisLock = new RedisLock(redisComponent,1000 * 60,RedisCacheKey.REDIS_LOCK_KEY + now_mm);
        //获取锁
        if (redisLock.excludeLock()){
            try {
                //拿到了锁,读取定时短信有序集合
                set = this.redisComponent.zRangeByScore(RedisCacheKey.MSG_TIME_LIST,0,end);

                if (set != null && set.size() > 0){
                    flag = true;
                }
            } catch (Exception e) {
                LOGGER.error("获取定时短信有序集合异常,异常为{}",e.toString());
            }

 

基于zookpeer实现分布式锁:性能

参考播客:http://www.javashuo.com/article/p-qczhbxgm-nh.html  (我的以为写的不错)ui

 

两种分布式锁的比较this

一、redis分布式锁须要本身不断尝试去获取锁,比较消耗性能。

二、zk分布式锁获取不到锁时只须要监听n-1节点的删除事件便可!不须要不断去获取锁,性能开销小

三、redis获取锁的客户端挂了的话,那么只能等待超时后释放锁(这点应该能够经过ttl方法进行检测来删除锁)。而zk是基于临时顺序节点进行的分布式锁,客户端挂的话,临时节点会自动删除,锁就会释放。