【分布式锁】redis实现

转载:https://www.jianshu.com/p/c970cc710

SETNX命令简介redis

SETNX key value
将key的值设为value,而且仅当key不存在。
若给定的key已经存在,则SETNX不作任何操做。
SETNX 是SET if Not eXists的简写。
返回整数,具体为算法

  • 1,当 key 的值被设置
  • 0,当 key 的值没被设置

使用SETNX实现分布式锁

多个进程执行如下Redis命令:网络

SETNX lock.foo <current Unix time + lock timeout + 1>分布式

若是 SETNX 返回1,说明该进程得到锁,SETNX将键 lock.foo 的值设置为锁的超时时间(当前时间 + 锁的有效时间)。
若是 SETNX 返回0,说明其余进程已经得到了锁,进程不能进入临界区。进程能够在一个循环中不断地尝试 SETNX 操做,以得到锁。ide

 

解决死锁

正常第一反应利用SETNX实现分布式锁多是这样的ui

if(SETNX key value){//若是设置成功表示拿到了锁
    return true;
}
return false;
View Code

而后释放锁的时候就直接 DEL掉;
简单思路是这样,可是这样会有不少问题this

  • 若是一个进程得到锁以后,断开了与redis的链接(进程挂断或者网络中断),那么锁一直的不断释放,其余的进程就一直获取不到锁,就出现了 “死锁”
  • 然而,锁超时时,咱们不能简单地使用 DEL 命令删除键 lock.foo 以释放锁。考虑如下状况,进程P1已经首先得到了锁 lock.foo,而后进程P1挂掉了。进程P2,P3正在不断地检测锁是否已释放或者已超时,执行流程以下:
    1 . P2和P3进程读取键 lock.foo 的值,检测锁是否已超时(经过比较当前时间和键 lock.foo 的值来判断是否超时)
    2.P2和P3进程发现锁 lock.foo 已超时
    3.P2执行 DEL lock.foo命令
    4.P2执行 SETNX lock.foo命令,并返回1,即P2得到锁
    5.P3执行 DEL lock.foo命令将P2刚刚设置的键 lock.foo 删除(这步是因为P3刚才已检测到锁已超时)
    6.P3执行 SETNX lock.foo命令,并返回1,即P3得到锁
    7.P2和P3同时得到了锁

从上面的状况能够得知,在检测到锁超时后,进程不能直接简单地执行 DEL 删除键的操做以得到锁。spa

为了解决上述算法可能出现的多个进程同时得到锁的问题,咱们再来看如下的算法。
咱们一样假设进程P1已经首先得到了锁 lock.foo,而后进程P1挂掉了。接下来的状况:线程

进程P4执行 SETNX lock.foo 以尝试获取锁
因为进程P1已得到了锁,因此P4执行 SETNX lock.foo 返回0,即获取锁失败
P4执行 GET lock.foo 来检测锁是否已超时,若是没超时,则等待一段时间,再次检测
若是P4检测到锁已超时,即当前的时间大于键 lock.foo 的值,P4会执行如下操做
GETSET lock.foo <current Unix timestamp + lock timeout + 1>
因为 GETSET 操做在设置键的值的同时,还会返回键的旧值,经过比较键 lock.foo 的旧值是否小于当前时间,能够判断进程是否已得到锁
假如另外一个进程P5也检测到锁已超时,并在P4以前执行了 GETSET 操做,那么P4的 GETSET 操做返回的是一个大于当前时间的时间戳,这样P4就不会得到锁而继续等待。注意到,即便P4接下来将键 lock.foo 的值设置了比P5设置的更大的值也没影响。
另外,值得注意的是,在进程释放锁,即执行 DEL lock.foo 操做前,须要先判断锁是否已超时。若是锁已超时,那么锁可能已由其余进程得到,这时直接执行 DEL lock.foo 操做会致使把其余进程已得到的锁释放掉。code

程序代码

while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); //锁到期时间
            if (this.setNX(lockKey, expiresStr)) {
                // lock acquired
                locked = true;
                return true;
            }

            String currentValueStr = this.get(lockKey); //redis里的时间
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判断是否为空,不为空的状况下,若是被其余线程设置了值,则第二个条件判断是过不去的
                // lock is expired

                String oldValueStr = this.getSet(lockKey, expiresStr);
                //获取上一个锁到期时间,并设置如今的锁到期时间,
                //只有一个线程才能获取上一个线上的设置时间,由于jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止误删(覆盖,由于key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,可是由于什么相差了不多的时间,因此能够接受

                    //[分布式的状况下]:如过这个时候,多个线程刚好都到了这里,可是只有一个线程的设置值和当前值相同,他才有权利获取锁
                    // lock acquired
                    locked = true;
                    return true;
                }
            }
            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;

            /*
                延迟100 毫秒,  这里使用随机时间可能会好一点,能够防止饥饿进程的出现,即,当同时到达多个进程,
                只会有一个进程得到锁,其余的都用一样的频率进行尝试,后面有来了一些进行,也以一样的频率申请锁,这将可能致使前面来的锁得不到知足.
                使用随机的等待时间能够必定程度上保证公平性
             */
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);

        }
View Code
相关文章
相关标签/搜索