经常使用的Redis客户端的并发模型(转)

 

   伪代码模型java

  

# get lock
lock = 0
while lock != 1:
  timestamp = current Unix time + lock timeout + 1
  lock = SETNX lock.foo timestamp
  if lock == 1 or (now() > (GET lock.foo) and now() > (GETSET lock.foo timestamp)):
    break;
  else:
    sleep(10ms) 
    # do your job
    do_job()     
    # release
  if now() < GET lock.foo:
    DEL lock.foo

 

并发访问

Redis为单进程单线程模式,采用队列模式将并发访问变为串行访问。Redis自己没有锁的概念,Redis对于多个客户端链接并不存在竞争,可是在Jedis客户端对Redis进行并发访问时会发生链接超时、数据转换错误、阻塞、客户端关闭链接等问题,这些问题均是因为客户端链接混乱形成。对此有2种解决方法:python

1.客户端角度,为保证每一个客户端间正常有序与Redis进行通讯,对链接进行池化,同时对客户端读写Redis操做采用内部锁synchronized。redis

2.服务器角度,利用setnx实现锁。如某客户端要得到一个名字list的锁,客户端使用下面的命令进行获取:算法

Setnx lock.list  current time + lock timeout服务器

 如返回1,则该客户端得到锁,把lock. list的键值设置为时间值表示该键已被锁定,该客户端最后能够经过DEL lock.list来释放该锁。并发

 如返回0,代表该锁已被其余客户端取得,等对方完成或等待锁超时。分布式

第二种须要用到Redis的setnx命令,可是须要注意一些问题。ide

 

SETNX命令(SET if Not eXists)

 

语法:SETNX key valueui

功能:将 key 的值设为 value ,当且仅当 key 不存在;若给定的 key 已经存在,则 SETNX 不作任何动做。
时间复杂度:O(1)
返回值:设置成功,返回 1 。设置失败,返回 0 。
模式:将 SETNX 用于加锁(locking),SETNX 能够用做加锁原语(locking primitive)。好比说,要对关键字(key) foo 加锁,客户端能够尝试如下方式:
SETNX lock.foo <current Unix time + lock timeout + 1>
若是 SETNX 返回 1 ,说明客户端已经得到了锁, key 设置的unix时间则指定了锁失效的时间。以后客户端能够经过 DEL lock.foo 来释放锁。
若是 SETNX 返回 0 ,说明 key 已经被其余客户端上锁了。若是锁是非阻塞(non blocking lock)的,咱们能够选择返回调用,或者进入一个重试循环,直到成功得到锁或重试超时(timeout)。
可是已经证明仅仅使用SETNX加锁带有竞争条件,在特定的状况下会形成错误。
处理死锁(deadlock)
上面的锁算法有一个问题:若是由于客户端失败、崩溃或其余缘由致使没有办法释放锁的话,怎么办?
这种情况能够经过检测发现——由于上锁的 key 保存的是 unix 时间戳,假如 key 值的时间戳小于当前的时间戳,表示锁已经再也不有效。
可是,当有多个客户端同时检测一个锁是否过时并尝试释放它的时候,咱们不能简单粗暴地删除死锁的 key ,再用 SETNX 上锁,由于这时竞争条件(race condition)已经造成了:
C1 C2 读取 lock.foo 并检查时间戳, SETNX 都返回 0 ,由于它已经被 C3 锁上了,但 C3 在上锁以后就崩溃(crashed)了。
C1 lock.foo 发送 DEL 命令。
C1 lock.foo 发送 SETNX 并成功。
C2 lock.foo 发送 DEL 命令。
C2 lock.foo 发送 SETNX 并成功。
出错:由于竞争条件的关系,C1 C2 两个都得到了锁。
幸亏,如下算法能够避免以上问题。来看看咱们聪明的 C4 客户端怎么办:
C4 lock.foo 发送 SETNX 命令。
由于崩溃掉的 C3 还锁着 lock.foo ,因此 Redis C4 返回 0
C4 lock.foo 发送 GET 命令,查看 lock.foo 的锁是否过时。若是不,则休眠(sleep)一段时间,并在以后重试。
另外一方面,若是 lock.foo 内的 unix 时间戳比当前时间戳老,C4 执行如下命令:
GETSET lock.foo <current Unix timestamp + lock timeout + 1>
由于 GETSET 的做用,C4 能够检查看 GETSET 的返回值,肯定 lock.foo 以前储存的旧值还是那个过时时间戳,若是是的话,那么 C4 得到锁。
若是其余客户端,好比 C5,比 C4 更快地执行了 GETSET 操做并得到锁,那么 C4 的 GETSET 操做返回的就是一个未过时的时间戳(C5 设置的时间戳)。C4 只好从第一步开始重试。注意,即使 C4 的 GETSET 操做对 key 进行了修改,这对将来也没什么影响。
这里假设锁key对应的value没有实际业务意义,不然会有问题,并且其实其value也确实不该该用在业务中。spa

为了让这个加锁算法更健壮,得到锁的客户端应该经常检查过时时间以避免锁因诸如 DEL 等命令的执行而被意外解开,由于客户端失败的状况很是复杂,不只仅是崩溃这么简单,还多是客户端由于某些操做被阻塞了至关长时间,紧接着 DEL 命令被尝试执行(但这时锁却在另外的客户端手上)。

GETSET命令

语法:GETSET key value
功能:将给定 key 的值设为 value ,并返回 key 的旧值(old value)。当 key 存在但不是字符串类型时,返回一个错误。

时间复杂度:O(1)
返回值:返回给定 key 的旧值;当 key 没有旧值时,也便是, key 不存在时,返回 nil 。

 

SETNX实现分布式锁 

Redis有一系列的命令,特色是以NX结尾,NX是Not eXists的缩写,如SETNX命令就应该理解为:SET if Not eXists。这系列的命令很是有用,这里讲使用SETNX来实现分布式锁。 
SETNX实现分布式锁 
利用SETNX很是简单地实现分布式锁。例如:某客户端要得到一个名字foo的锁,客户端使用下面的命令进行获取: 
SETNX lock.foo <current Unix time + lock timeout + 1> 

  • 如返回1,则该客户端得到锁,把lock.foo的键值设置为时间值表示该键已被锁定,该客户端最后能够经过DEL lock.foo来释放该锁。
  • 如返回0,代表该锁已被其余客户端取得,这时咱们能够先返回或进行重试等对方完成或等待锁超时。

解决死锁 
上面的锁定逻辑有一个问题:若是一个持有锁的客户端失败或崩溃了不能释放锁,该怎么解决?咱们能够经过锁的键对应的时间戳来判断这种状况是否发生了,若是当前的时间已经大于lock.foo的值,说明该锁已失效,能够被从新使用。 
发生这种状况时,可不能简单的经过DEL来删除锁,而后再SETNX一次,当多个客户端检测到锁超时后都会尝试去释放它,这里就可能出现一个竞态条件,让咱们模拟一下这个场景: 
C0操做超时了,但它还持有着锁,C1和C2读取lock.foo检查时间戳,前后发现超时了。 
C1 发送DEL lock.foo 
C1 发送SETNX lock.foo 而且成功了。 
C2 发送DEL lock.foo 
C2 发送SETNX lock.foo 而且成功了。 
这样一来,C1,C2都拿到了锁!问题大了! 
幸亏这种问题是能够避免的,让咱们来看看C3这个客户端是怎样作的: 
C3发送SETNX lock.foo 想要得到锁,因为C0还持有锁,因此Redis返回给C3一个0 
C3发送GET lock.foo 以检查锁是否超时了,若是没超时,则等待或重试。 
反之,若是已超时,C3经过下面的操做来尝试得到锁: 
GETSET lock.foo <current Unix time + lock timeout + 1> 
经过GETSET,C3拿到的时间戳若是仍然是超时的,那就说明,C3如愿以偿拿到锁了。 
若是在C3以前,有个叫C4的客户端比C3快一步执行了上面的操做,那么C3拿到的时间戳是个未超时的值,这时,C3没有如期得到锁,须要再次等待或重试。留意一下,尽管C3没拿到锁,但它改写了C4设置的锁的超时值,不过这一点很是微小的偏差带来的影响能够忽略不计。 

注意:为了让分布式锁的算法更稳键些,持有锁的客户端在解锁以前应该再检查一次本身的锁是否已经超时,再去作DEL操做,由于可能客户端由于某个耗时的操做而挂起,操做完的时候锁由于超时已经被别人得到,这时就没必要解锁了。 

 

示例伪代码 
根据上面的代码,我写了一小段Fake代码来描述使用分布式锁的全过程: 
# get lock 
lock = 0 
while lock != 1: 
    timestamp = current Unix time + lock timeout + 1 
    lock = SETNX lock.foo timestamp 
    if lock == 1 or (now() > (GET lock.foo) and now() > (GETSET lock.foo timestamp)): 
        break; 
    else: 
        sleep(10ms) 

# do your job 
do_job() 

# release 
if now() < GET lock.foo: 
    DEL lock.foo 
是的,要想这段逻辑能够重用,使用python的你立刻就想到了Decorator,而用Java的你是否是也想到了那谁?AOP + annotation?行,怎样舒服怎样用吧,别重复代码就行。 

java之jedis实现 

expireMsecs 锁持有超时,防止线程在入锁之后,无限的执行下去,让锁没法释放 
timeoutMsecs 锁等待超时,防止线程饥饿,永远没有入锁执行代码的机会 

 

 /**
     * Acquire lock.
     * 
     * @param jedis
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException
     *             in case of thread interruption
     */
    public synchronized boolean acquire(Jedis jedis) throws InterruptedException {
        int timeout = timeoutMsecs;
        while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); //锁到期时间

            if (jedis.setnx(lockKey, expiresStr) == 1) {
                // lock acquired
                locked = true;
                return true;
            }

            String currentValueStr = jedis.get(lockKey); //redis里的时间
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判断是否为空,不为空的状况下,若是被其余线程设置了值,则第二个条件判断是过不去的
                // lock is expired

                String oldValueStr = jedis.getSet(lockKey, expiresStr);
                //获取上一个锁到期时间,并设置如今的锁到期时间,
                //只有一个线程才能获取上一个线上的设置时间,由于jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //如过这个时候,多个线程刚好都到了这里,可是只有一个线程的设置值和当前值相同,他才有权利获取锁
                    // lock acquired
                    locked = true;
                    return true;
                }
            }
            timeout -= 100;
            Thread.sleep(100);
        }
        return false;
    }

  通常用法 
  其中不少繁琐的边缘代码,包括:异常处理,释放资源等等 。

 

 JedisPool pool;
        JedisLock jedisLock = new JedisLock(pool.getResource(), lockKey, timeoutMsecs, expireMsecs);
        try {
            if (jedisLock.acquire()) { // 启用锁
                //执行业务逻辑
            } else {
                logger.info("The time wait for lock more than [{}] ms ", timeoutMsecs);
            }
        } catch (Throwable t) {
            // 分布式锁异常
            logger.warn(t.getMessage(), t);
        } finally {
            if (jedisLock != null) {
                try {
                    jedisLock.release();// 则解锁
                } catch (Exception e) {
                }
            }
            if (jedis != null) {
                try {
                    pool.returnResource(jedis);// 还到链接池里
                } catch (Exception e) {
                }
            }
        }

   犀利用法 
   用匿名类来实现,代码很是简洁   至于SimpleLock的实现

  

SimpleLock lock = new SimpleLock(key);
        lock.wrap(new Runnable() {
            @Override
            public void run() {
                //此处代码是锁上的
            }
        });
相关文章
相关标签/搜索