一篇和Redis有关的锁和事务的文章

部分参考连接git

Transactiongithub

StackExchange.Redis Transactionredis

hashest数据库

正文

Redis 是一种基于内存的单线程数据库。意味着全部的命令是一个接一个的执行。c#


考虑只有一个Redis实例,也就是Redis自己没有作分布式。bash


经过SETNX命令,set if not exist的缩写。那么多个服务在调用的时候能够经过同一个key申请一个lock(也就是调用命令成功返回1),而后根据相应条件作释放(好比时间到期,or手动释放),也就是delete key。服务器

Redis自己有MULTI命令,标记开启一个事务。开启以后后面的命令会在调用EXEC命令的时候以一个集合的方式总体执行,也就是原子性(不保证都成功)。并发

如今有个需求,用redis实现Check and Set,也就是先读取里面的值,而后设置(好比作个+=val);并发的问题是必需要考虑的。async

用redis描述大体是这样的。这里假设redis没有incr这个自增命令。分布式

val = GET mykey
val = val + 1
SET mykey $val

直接这样作,并发问题是确定有的。因此,按照上面的知识,应该有2种方法来避免这个并发问题。

基于SENTX命令。

copy一下文档的demo

redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"
redis>

第一次调用setnx,设置mykey的value为hello,返回1,表示成功。

第二次调用setnx,设置mykey的value为world,由于第一次调用并无释放mykey,因此返回0,表示设置失败。

最后获取mykey的值,返回的是hello。

最后记得要去释放mykey。

这实际上是一个悲观锁,也就是一个进程获取到锁以后要等释放别的进程才能继续。

基于MULTI命令。

  1. 先看一个简单的应用

    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> incr foo
    QUEUED
    127.0.0.1:6379> incr bar
    QUEUED
    127.0.0.1:6379> exec
    1) (integer) 1
    2) (integer) 1

    第一步调用MULTI命令,表示开始多个命令的输入。返回OK,表示开始接收。

    第二步调用incr foo,给foo对应的值作自增。返回queued,表示已加入队列。

    第二步调用incr bar,给bar对应的值作资政,返回queued,表示已加入队列。

    最后调用exec命令,表示执行队列中的命令。返回每一个命令的结果。

  2. 有错误了怎么办

    首先错误分两种

    • 在enqueue的时候出错,最多见的就是参数错误。好比下面这个例子
    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> set a 1234
    QUEUED
    127.0.0.1:6379> set a 1 1 1 1 1 1 11
    QUEUED
    127.0.0.1:6379> exec
    1) OK
    2) (error) ERR syntax error
    127.0.0.1:6379>

    第二个set a 1 1 1 1 1 1 11命令是有语法错误,因此,在执行exec的时候会返回语法错误。第一个是成功的。因此,若是在后面get a是会返回1234,为成功的设置。

    假设报错的命令在中间,后面的命令也是会执行的。

    • 还有就是直接命令就不对的。看个例子
    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> set a 11
    QUEUED
    127.0.0.1:6379> aaa
    (error) ERR unknown command `aaa`, with args beginning with:
    127.0.0.1:6379> exec
    (error) EXECABORT Transaction discarded because of previous errors.

    先set a,进入队列。

    执行aaa命令,这个命令不存在。直接报错。

    执行exec,事务由于以前的错误,exec停止。

  3. 为何没有回滚

    经过上面的例子,看到redis对multi的操做是没有回滚的,或许有点奇怪。根据文档描述,有两个缘由。

    • redis的命令执行只有在语法错误或者数据类型出错的时候会失败,而不是在enqueue的时候。这意味着失败是由程序设置错误致使的。那么,这种错误确定是在开发环境中就应该容易被发现,而不是在生产环境。
    • 为了快。
  4. WATCH 命令的乐观锁

    结合watch命令咱们也能够实现上面的需求。

    WATCH mykey
    --Begin---
    ##下面两行是客户端命令
    val = GET mykey
    val = val + 1
    --End---
    MULTI
    SET mykey $val
    EXEC

    解释一下,先获取一下mykey的监控。而后客户端获取mykey的值,(是客户端,不是命令服务端)。而后赋值自增。而后服务端开启MULTI, 设置新的值。执行。

    假设在MULTI和Exec之间,mykey的值被别的client修改,exec会返回(nil)。

    下面作个演示:

    先在redis-cli上执行如下命令

    127.0.0.1:6379> watch a
    OK
    127.0.0.1:6379> multi
    OK
    127.0.0.1:6379> set a 13
    QUEUED

    如上,已经开启WATCH,而后设置a =13 进入队列。

    而后在本地的redis desktop manager上去修改这个值。

    update

    而后再在服务器上执行 exec,

    127.0.0.1:6379> exec
    (nil)

    返回的是nil,表示没有成功。若是没有客户端去更新,执行exec是返回OK。

  5. redis-scripting-and-transactions

    在Redis 2.6以后,引入了Redis script来实现事务的功能。一般来讲script方式速度会相对快一点(没有作测试)。不过既然multi已经出来好久了,因此,不太可能会移除这个命令。

在StackExchange.Redis中使用

显然,也分两种,基于setnx 或者 MULTI + WATCH。分别对应的是IDatabaseAsync.LockTakeAsyncIDatabaseAsync.CreateTransaction这里结合了Polly这个库用于重试,毕竟,悲观锁,我多拿几回总能拿到的;乐观锁,执行的命令,我多试几回,总能成功的。

  • LockTakeAsync

    public async Task<T> TakeLockAsync<T>(string key, string token, Func<object, Task<T>> func, object obj)
        where T : class
    {
        var db = GetDb(redisConfigModel.LockDbIndex);//获取IDatabaseAsync对象
        //定义获取锁的策略
        var policy = Policy
            .HandleResult<bool>(w => !w)
            .WaitAndRetryForeverAsync(
                sleepDurationProvider: attemp => TimeSpan.FromSeconds(3), //两次重复尝试的间隔
                onRetry: (delegeteRst, ts) =>
                {
                    //能够记录日志啥的
                }
            );
        //竞争获取锁。
        await policy.ExecuteAsync(async () => await db.LockTakeAsync(key, token, TimeSpan.MaxValue));  
        try
        {
            return await func(obj);//获取到锁以后的具体执行的方法。
        }
        finally
        {
            await db.LockReleaseAsync(key, token); //最后必定要释放
        }
    }

    LockTakeAsync的时候根据key对应的token值是否已经被获取来做为条件。

  • CreateTransaction

    StackExchange.Redis 用multiplexer类实现Redis的一些列命令。咱们的代码不能直接简单的映射到watch命令,由于,单纯调用watch是确定成功的,这样会致使你们都"成功"(假的)。这里用的Condition的方式来实现。

    public async Task AddAfterReadAsync(string key, int value, string hashField = "hash_field")
    {
          //处理policy的结果为false的状况,一直重试。
        var policy = Policy.HandleResult<bool>(w => !w).RetryForeverAsync();
          //执行
        await policy.ExecuteAsync(async () =>
        {
            var db = GetDb(redisConfigModel.LockDbIndex);
            var trans = db.CreateTransaction();
            var oldValue = Convert.ToInt32(await db.StringGetAsync(key));
            trans.AddCondition(Condition.HashNotExists(key,
                hashField)); //这里确保hashField不存在。也能够用Condition.KeyNotExists(key)
            //这里不能await,由于每一个命令的结果只有在执行了execute后才知道。
            trans.StringSetAsync(key, (oldValue + value).ToString());
            var execSuccess = await trans.ExecuteAsync();
            return execSuccess;
        });
    }

小结

这是一篇和redis有关的锁,事务的文章。写了我一整个下午。看完,感受也没有多少东西。感受开头连接中关于hashset仍是有点意思的。

相关文章
相关标签/搜索