redis分布式锁&队列应用

分布式锁

  1. setnx(set if not exists)

若是设值成功则证实上锁成功,而后再调用del指令释放。java

// 这里的冒号:就是一个普通的字符,没特别含义,它能够是任意其它字符,不要误解
> setnx lock:codehole true
OK
... do something critical ...
> del lock:codehole
(integer) 1

可是有个问题,若是逻辑执行到中间出现异常了,可能会致使 del 指令没有被调用,这样就会陷入死锁,锁永远得不到释放。redis

  1. setnx(set if not exists) 加上过时时间
> setnx lock:codehole true
OK
> expire lock:codehole 5
... do something critical ...
> del lock:codehole
(integer) 1

若是在 setnx 和 expire 之间服务器进程忽然挂掉了,多是由于机器掉电或者是被人为杀掉的,就会致使 expire 得不到执行,也会形成死锁。json

  1. 使用ex nx命令一块儿执行
> set lock:codehole true ex 5 nx
OK
... do something critical ...
> del lock:codehole
  1. 删除锁的线程必须是上锁的线程

为 set 指令的 value 参数设置为一个随机数,释放锁时先匹配随机数是否一致,而后再删除 key,这是为了确保当前线程占有的锁不会被其它线程释放,除非这个锁是过时了被服务器自动释放的。
可是匹配 value 和删除 key 不是一个原子操做,Redis 也没有提供相似于delifequals这样的指令,这就须要使用 Lua 脚原本处理了,由于 Lua 脚本能够保证连续多个指令的原子性执行。服务器

上锁
tag = random.nextint()  # 随机数
if redis.set(key, tag, nx=True, ex=5):
    do_something()
    redis.delifequals(key, tag)  # 假想的 delifequals 指令
    

# delifequals 解锁
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

延时队列

消息队列

注意:
Redis 的消息队列不是专业的消息队列,它没有很是多的高级特性,没有 ack 保证,若是对消息的可靠性有着极致的追求,那么它就不适合使用。数据结构

Redis 的 list(列表) 数据结构经常使用来做为异步消息队列使用,使用rpush/lpush操做入队列,使用lpop 和 rpop来出队列。app

> rpush notify-queue apple banana pear
(integer) 3
> llen notify-queue
(integer) 3
> lpop notify-queue
"apple"
> llen notify-queue
(integer) 2
> lpop notify-queue
"banana"
> llen notify-queue
(integer) 1
> lpop notify-queue
"pear"
> llen notify-queue
(integer) 0
> lpop notify-queue
(nil)

阻塞队列

若是队列空了,客户端就会陷入 pop 的死循环,不停地 pop,没有数据,接着再 pop,又没有数据。这就是浪费生命的空轮询。
一般咱们使用 sleep 来解决这个问题,让线程睡一会,睡个 1s 钟就能够了。可是有个小问题,那就是睡眠会致使消息的延迟增大。
咱们可使用 blpop/brpop,阻塞读。
阻塞读在队列没有数据的时候,会当即进入休眠状态,一旦数据到来,则马上醒过来。消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。dom

锁冲突处理

上面咱们讲了分布式锁的问题,可是加锁失败没有讲。通常咱们有3种策略来处理加锁失败:异步

  1. 直接抛出异常,通知用户稍后重试
    这种方式比较适合由用户直接发起的请求,用户看到错误对话框后,会先阅读对话框的内容,再点击重试,这样就能够起到人工延时的效果。
  2. sleep 一会再重试
    sleep 会阻塞当前的消息处理线程,会致使队列的后续消息处理出现延迟。若是碰撞的比较频繁或者队列里消息比较多,sleep 可能并不合适。
  3. 将请求转移至延时队列,过一会再试
    这种方式比较适合异步消息处理,将当前冲突的请求扔到另外一个队列延后处理以避开冲突。

延时队列

延时队列能够经过 Redis 的 zset(有序列表) 来实现。咱们将消息序列化成一个字符串做为 zset 的value,这个消息的到期处理时间做为score,而后用多个线程轮询 zset 获取到期的任务进行处理,多个线程是为了保障可用性,万一挂了一个线程还有其它线程能够继续处理。分布式

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import redis.clients.jedis.Jedis;

public class RedisDelayingQueue<T> {

  static class TaskItem<T> {
    public String id;
    public T msg;
  }

  // fastjson 序列化对象中存在 generic 类型时,须要使用 TypeReference
  private Type TaskType = new TypeReference<TaskItem<T>>() {
  }.getType();

  private Jedis jedis;
  private String queueKey;

  public RedisDelayingQueue(Jedis jedis, String queueKey) {
    this.jedis = jedis;
    this.queueKey = queueKey;
  }

  public void delay(T msg) {
    TaskItem<T> task = new TaskItem<T>();
    task.id = UUID.randomUUID().toString(); // 分配惟一的 uuid
    task.msg = msg;
    String s = JSON.toJSONString(task); // fastjson 序列化
    jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
  }

  public void loop() {
    while (!Thread.interrupted()) {
      // 只取一条
      Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
      if (values.isEmpty()) {
        try {
          Thread.sleep(500); // 歇会继续
        } catch (InterruptedException e) {
          break;
        }
        continue;
      }
      String s = values.iterator().next();
      if (jedis.zrem(queueKey, s) > 0) { // 抢到了
        TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
        this.handleMsg(task.msg);
      }
    }
  }

  public void handleMsg(T msg) {
    System.out.println(msg);
  }

  public static void main(String[] args) {
    Jedis jedis = new Jedis();
    RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
    Thread producer = new Thread() {

      public void run() {
        for (int i = 0; i < 10; i++) {
          queue.delay("codehole" + i);
        }
      }

    };
    Thread consumer = new Thread() {

      public void run() {
        queue.loop();
      }

    };
    producer.start();
    consumer.start();
    try {
      producer.join();
      Thread.sleep(6000);
      consumer.interrupt();
      consumer.join();
    } catch (InterruptedException e) {
    }
  }
}
相关文章
相关标签/搜索