Dyno-queues 分布式延迟队列 之 基本功能

Dyno-queues 分布式延迟队列 之 基本功能

0x00 摘要

本系列咱们会以设计分布式延迟队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,经过分析Dyno-queues 分布式延迟队列的源码来具体看看设计实现一个分布式延迟队列的方方面面。html

0x01 Dyno-queues分布式延迟队列

Dyno-queues 是 Netflix 实现的基于 Dynomite 和 Redis 构建的队列。java

Dynomite是一种通用的实现,能够与许多不一样的 key-value 存储引擎一块儿使用。目前它提供了对Redis序列化协议(RESP)和Memcached写协议的支持。linux

1.1 设计目标

具体设计目标依据业务系统不一样而不一样。git

Dyno-queues 的业务背景是:在 Netflix 的平台上运行着许多的业务流程,这些流程的任务是经过异步编排进行驱动,如今要实现一个分布式延迟队列,这个延迟队列具备以下特色:github

  • 分布式;
  • 不用外部的锁机制;
  • 高并发;
  • 至少一次语义交付;
  • 不遵循严格的FIFO;
  • 延迟队列(消息在未来某个时间以前不会从队列中取出);
  • 优先级;

1.2 选型思路

Netflix 选择 Dynomite,是由于:redis

  • 其具备性能,多数据中心复制和高可用性的特色;
  • Dynomite提供分片和可插拔的数据存储引擎,容许在数据需求增长垂直和水平扩展;

Netflix选择Redis做为构建队列的存储引擎是由于:数据库

  • Redis架构经过提供构建队列所需的数据结构很好地支持了队列设计,同时Redis的性能也很是优秀,具有低延迟的特性;
  • Dynomite在Redis之上提供了高可用性、对等复制以及一致性等特性,用于构建分布式集群队列;

0x02 整体设计

2.1 系统假设

查询模型:基于Key-Value模型,而不是SQL,即关系模型。存储对象比较小。apache

ACID属性:传统的关系数据库中,用ACID(A原子性、C一致性、I 隔离性、D持久性)来保证事务,在保证ACID的前提下每每有不好的可用性。Dynamo用弱一致性C来达到高可用,不提供数据隔离 I,只容许单Key更新编程

2.2 高可用

其实全部的高可用,是能够依赖于RPC和存储的高可用来实现的。json

  • 先来看RPC的高可用,好比美团的基于MTThrift的RPC框架,阿里的Dubbo等,其自己就具备服务自动发现,负载均衡等功能。
  • 而消息队列的高可用,只要保证broker接受消息和确认消息的接口是幂等的,而且consumer的几台机器处理消息是幂等的,这样就把消息队列的可用性,转交给RPC框架来处理了。

Netflix 选择 Dynomite,是由于

  • 其具备高性能,多数据中心复制和高可用性的特色;
  • Dynomite 提供分片和可插拔的数据存储引擎,容许在数据需求增长垂直和水平扩展;

因此 Dyno-queues 的高可用就自动解决了。

2.3 幂等

怎么保证幂等呢?最简单的方式莫过于共享存储。broker多机器共享一个DB或者一个分布式文件/kv系统,则处理消息天然是幂等的。就算有单点故障,其余节点能够马上顶上。

对于不共享存储的队列,如Kafka使用分区加主备模式,就略微麻烦一些。须要保证每个分区内的高可用性,也就是每个分区至少要有一个主备且须要作数据的同步。

Dynomite 使用 redis 集群这个共享存储 作了幂等保证。

2.4 承载消息堆积

消息到达服务端后,若是不通过任何处理就到接收者,broker就失去了它的意义。为了知足咱们错峰/流控/最终可达等一系列需求,把消息存储下来,而后选择时机投递就显得是瓜熟蒂落的了。

这个存储能够作成不少方式。好比存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。但归结起来,主要有持久化和非持久化两种。

持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),而且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。

但并非每种消息都须要持久化存储。不少消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几回failover,最终投递出去也何尝不可。

Dynomite 使用 redis 集群这个共享存储 在必定程度上缓解了消息堆积问题。

2.5 存储子系统

咱们来看看若是须要数据落地的状况下各类存储子系统的选择。理论上,从速度来看,文件系统 > 分布式KV(持久化)> 分布式文件系统 > 数据库,而可靠性却截然相反。仍是要从支持的业务场景出发做出最合理的选择。

若是大家的消息队列是用来支持支付/交易等对可靠性要求很是高,但对性能和量的要求没有这么高,并且没有时间精力专门作文件存储系统的研究,DB是最好的选择。

可是DB受制于IOPS,若是要求单broker 5位数以上的QPS性能,基于文件的存储是比较好的解决方案。总体上能够采用数据文件 + 索引文件的方式处理。

分布式KV(如MongoDB,HBase)等,或者持久化的Redis,因为其编程接口较友好,性能也比较可观,若是在可靠性要求不是那么高的场景,也不失为一个不错的选择。

由于 场景是 可靠性要求不那么高,因此 Dynomite 使用 redis 集群这个存储子系统 也是能够的。

2.6 消费关系解析

下一个重要的事情就是解析发送接收关系,进行正确的消息投递了。抛开现象看本质,发送接收关系无外乎是单播与广播的区别。所谓单播,就是点到点;而广播,是一点对多点。

通常比较通用的设计是支持组间广播,不一样的组注册不一样的订阅。组内的不一样机器,若是注册一个相同的ID,则单播;若是注册不一样的ID(如IP地址+端口),则广播。

至于广播关系的维护,通常因为消息队列自己都是集群,因此都维护在公共存储上,如 config server、zookeeper等。维护广播关系所要作的事情基本是一致的:

  • 发送关系的维护。
  • 发送关系变动时的通知。

本文后续会介绍如何维护发送关系

2.7 数据分片

数据分片的逻辑既能够实如今客户端,也能够实如今 Proxy 层,取决于你的架构如何设计。

传统的数据库中间件大多将分片逻辑实如今客户端,经过改写物理 SQL 访问不一样的 MySQL 库;而在 NewSQL 数据库倡导的计算存储分离架构中,一般将分片逻辑实如今计算层,即 Proxy 层,经过无状态的计算节点转发用户请求到正确的存储节点。

在 Dynomite 之中,队列根据可用区域进行分片,将数据推送到队列时,经过轮训机制肯定分片,这种机制能够确保全部分片的数据是平衡的,每一个分片都表明Redis中的有序集合,有序集中的 key 是 queueName 和 AVAILABILITY _ZONE 的组合

public class RoundRobinStrategy implements ShardingStrategy {

    private final AtomicInteger nextShardIndex = new AtomicInteger(0);

    /**
     * Get shard based on round robin strategy.
     * @param allShards
     */
    @Override
    public String getNextShard(List<String> allShards, Message message) {
        int index = nextShardIndex.incrementAndGet();
        if (index >= allShards.size()) {
            nextShardIndex.set(0);
            index = 0;
        }
        return allShards.get(index);
    }
}

0x03 Dynomite 特性

3.1 可用分区和机架

Dyno-queues 队列是在 Dynomite 的JAVA客户端 Dyno 之上创建的,Dyno 为持久链接提供链接池,而且能够配置为拓扑感知。关于 Dyno 具体能够参见前文:

源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)

源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(2)

3.1.1 机架

Dyno为应用程序提供特定的本地机架(在AWS中,机架是一个区域,例如 us-east-1a、us-east-1b等),us-east-1a的客户端将链接到相同区域的Dynomite/Redis节点,除非该节点不可用,在这种状况下该客户端将进行故障转移。这个属性被用于经过区域划分队列。

3.1.2 分片

队列根据可用区域进行分片,将数据推送到队列时,经过轮训机制肯定分片,这种机制能够确保全部分片的数据是平衡的,每一个分片都表明Redis中的有序集合,有序集中的key是queueName和AVAILABILITY _ZONE的组合。

具体机制举例以下:

public class RoundRobinStrategy implements ShardingStrategy {

    private final AtomicInteger nextShardIndex = new AtomicInteger(0);

    /**
     * Get shard based on round robin strategy.
     * @param allShards
     */
    @Override
    public String getNextShard(List<String> allShards, Message message) {
        int index = nextShardIndex.incrementAndGet();
        if (index >= allShards.size()) {
            nextShardIndex.set(0);
            index = 0;
        }
        return allShards.get(index);
    }
}

3.2 Quorum

在分布式系统中有个CAP理论,对于P(分区容忍性)而言,是实际存在 从而没法避免的。由于分布系统中的处理不是在本机,而是网络中的许多机器相互通讯,故网络分区、网络通讯故障问题没法避免。所以,只能尽可能地在C 和 A 之间寻求平衡。

对于数据存储而言,为了提升可用性(Availability),采用了副本备份,好比对于HDFS,默认每块数据存三份。某数据块所在的机器宕机了,就去该数据块副本所在的机器上读取(从这能够看出,数据分布方式是按“数据块”为单位分布的)

可是问题来了,当须要修改数据时,就须要更新全部的副本数据,这样才能保证数据的一致性(Consistency)。所以,就须要在 C(Consistency) 和 A(Availability) 之间权衡。

Quorum机制,就是这样的一种权衡机制,一种将“读写转化”的模型

3.2.1 数据一致性

  • 强一致性:在任意时刻,从任意不一样副本取出的值都是同样的。
  • 弱一致性:有时泛指最终一致性,是指在任意时刻,可能因为网络延迟或者设备异常等缘由,不一样副本中的值可能会不同,但通过一段时间后,最终会变成同样。

显然,咱们更想要作到强一致性的这种效果,那么有哪些方式能够实现呢,其中最为简单直接的就是 WARO,也就是Write All Read one。

3.2.1.1 WARO 协议

WARO 是一种简单的副本控制协议,当 Client 请求向某副本写数据时(更新数据),只有当全部的副本都更新成功以后,此次写操做才算成功,不然视为失败。这样的话,只须要读任何一个副本上的数据便可。可是WARO带来的影响是写服务的可用性较低,由于只要有一个副本更新失败,这次写操做就视为失败了。

3.2.1.2 Quorum机制

Quorum 的定义以下:假设有 N 个副本,更新操做 wi 在 W 个副本中更新成功以后,则认为这次更新操做 wi 成功,把此次成功提交的更新操做对应的数据叫作:“成功提交的数据”。对于读操做而言,至少须要读 R 个副本,其中,W+R>N ,即 W 和 R 有重叠,通常,W+R=N+1。

  • N = 存储数据副本的数量;
  • W = 更新成功所需的副本;
  • R = 一次数据对象读取要访问的副本的数量;

Quorum机制认为每次写入的机器数目达到大多数(W)时,就认为本次写操做成功了。即Quorum机制可以不须要更新彻底部的数据,但又保证返回给用户的是有效数据的解决方案。

3.2.2 ES 的quorum

咱们以 ES 为例。

3.2.2.1 写一致性

咱们在发送任何一个增删改操做的时候,均可以带上一个consistency参数,指明咱们想要的写一致性是什么。

  • one:要求写操做只要primay shard是active可用的,就能够执行;
  • all:要求写操做必须全部的shard和replica都是active,才能够执行;
  • quorum(默认):全部shard中必须是大部分是可用的(一半及以上),才能够执行;
3.2.2.2 quorum机制

quorum = int((primary shard+number_of_replicas)/2)+1

若是节点数少于quorum,可能致使querum不齐全,进而致使没法执行任何写操做。quorum不齐全时,会进行等待。默认等待时间为1分钟,期待活跃的shard数量能够增长,最后实在不行,就会timeout。

3.3 DC_QUORUM

3.3.1 配置

Dynomite 可以将最终一致性(eventual consistency)扩展为协调一致性(tunable consistency)。

关于QUORUM,Dynomite有以下配置:

  • DC_ONE 本节点读写入完成及请求完成,其余的rack异步写入。使用DC_ONE模式,读写行为在local Availability Zone(AZ)下是同步的;
  • DC_QUORUM 同步写入到指定个数的rack中,其余的节点异步写入。使用DC_QUORUM模式,本地区域特定数量结点下的操做是同步的。
  • DC_SAFE_QUORUM 和DC_QUORUM相似,不过这个参数读写都要在指定个数的rack中成功而且数据校验同步,才算请求成功,否则会报错。

由测试获得的结果,Dynomite能从3,6,12,24一路扩展到48个节点,在DC_ONE和DC_QUORUM模式下,吞吐率都能线性地增加。与此同时,Dynomite在延迟方面只增长了不多的开支,即使在DC_QUORUM模式下,(延迟)也只有几毫秒。DC_QUORUM模式在延迟和吞吐量方面处于劣势,可是能为客户提供更好的读写保证。

3.3.2 实现

对于Dyno-queues来讲,则是在实现中有所体现。好比在 RedisQueues 中,有以下成员变量:

private final JedisCommands quorumConn;

private final JedisCommands nonQuorumConn;

在构建 RedisQueues 时,就须要注明使用哪种。

而从注释中咱们可知,

  • @param quorumConn Dyno connection with dc_quorum enabled,就是 采用了Quorum的Redis;
  • @param nonQuorumConn Dyno connection to local Redis,就是本地Redis;

生成 RedisQueues 的代码以下(注意其中注释):

/**
 * @param quorumConn Dyno connection with dc_quorum enabled
 * @param nonQuorumConn    Dyno connection to local Redis
 */
public RedisQueues(JedisCommands quorumConn, JedisCommands nonQuorumConn, String redisKeyPrefix, ShardSupplier shardSupplier, int unackTime, int unackHandlerIntervalInMS, ShardingStrategy shardingStrategy) {
    
    this(Clock.systemDefaultZone(), quorumConn, nonQuorumConn, redisKeyPrefix, shardSupplier, unackTime, unackHandlerIntervalInMS, shardingStrategy);
    
}

3.3.3 使用

在有分片时,就从nonQuorumConn(就是本地Redis)提取。

使用nonQuorumConn来预取的缘由是:最终一致性(eventual consistency)。

由于 replication lag,在某一时刻不一样分片的数据可能不同,因此须要先预取。这就须要使用 nonQuorumConn 来预取,由于本地 redis 的数据才是正确的

private Set<String> doPeekIdsFromShardHelper(final String queueShardName, final double peekTillTs, final int offset,final int count) {
    return nonQuorumConn.zrangeByScore(queueShardName, 0, peekTillTs, offset, count);
}

再好比处理没有 ack 的消息时,先从 nonQuorumConn 读取信息ID,再从 quorumConn 读取消息内容。

这就是由于一致性致使的,因此以下:

@Override
public void processUnacks() {
        execute("processUnacks", keyName, () -> {
            
            Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize);
            
            for (Tuple unack : unacks) {
                double score = unack.getScore();
                String member = unack.getElement();
                String payload = quorumConn.hget(messageStoreKey, member);
                long added_back = quorumConn.zadd(localQueueShard, score, member);
            }
        });
}

再好比从本地提取消息就使用了 nonQuorumConn。

@Override
public Message localGet(String messageId) {
    try {

        return execute("localGet", messageStoreKey, () -> {
            String json = nonQuorumConn.hget(messageStoreKey, messageId);

            Message msg = om.readValue(json, Message.class);
            return msg;
        });

    } 
}

再好比 popWithMsgIdHelper 也是先读取 nonQuorumConn,再从 quorumConn 读取其余内容。

public Message popWithMsgIdHelper(String messageId, String targetShard, boolean warnIfNotExists) {

    try {
        return execute("popWithMsgId", targetShard, () -> {

            String queueShardName = getQueueShardKey(queueName, targetShard);
            double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();
            String unackShardName = getUnackKey(queueName, targetShard);

            ZAddParams zParams = ZAddParams.zAddParams().nx();

            Long exists = nonQuorumConn.zrank(queueShardName, messageId);
            // If we get back a null type, then the element doesn't exist.
            if (exists == null) {
                // We only have a 'warnIfNotExists' check for this call since not all messages are present in
                // all shards. So we want to avoid a log spam. If any of the following calls return 'null' or '0',
                // we may have hit an inconsistency (because it's in the queue, but other calls have failed),
                // so make sure to log those.
                monitor.misses.increment();
                return null;
            }

            String json = quorumConn.hget(messageStoreKey, messageId);
            if (json == null) {
                monitor.misses.increment();
                return null;
            }

            long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
            if (added == 0) {
                monitor.misses.increment();
                return null;
            }

            long removed = quorumConn.zrem(queueShardName, messageId);
            if (removed == 0) {
                monitor.misses.increment();
                return null;
            }

            Message msg = om.readValue(json, Message.class);
            return msg;
        });
    } 
}

0x04 外层封装

RedisQueues是为用户提供的外部接口,从其成员变量能够看出来其内部机制,好比各类策略。

public class RedisQueues implements Closeable {

    private final Clock clock;

    private final JedisCommands quorumConn;

    private final JedisCommands nonQuorumConn;

    private final Set<String> allShards;

    private final String shardName;

    private final String redisKeyPrefix;

    private final int unackTime;

    private final int unackHandlerIntervalInMS;

    private final ConcurrentHashMap<String, DynoQueue> queues;

    private final ShardingStrategy shardingStrategy;

    private final boolean singleRingTopology;
}

用户经过get方法来获得DynoQueue:DynoQueue V1Queue = queues.get("simpleQueue")

public DynoQueue get(String queueName) {

    String key = queueName.intern();

    return queues.computeIfAbsent(key, (keyToCompute) -> new RedisDynoQueue(clock, redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS, shardingStrategy, singleRingTopology)
            .withUnackTime(unackTime)
            .withNonQuorumConn(nonQuorumConn)
            .withQuorumConn(quorumConn));
}

0x05 数据结构

咱们看看 Dyno-queues 中几种数据结构。

5.1 消息结构

一个完整的消息队列应该定义清楚本身能够投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。对不一样的业务场景作不一样的选择。

Dyno-queues 只有服务端落地的可靠消息。每一个延时消息必须包括如下参数:

  • id:惟一标示;
  • payload:消息过时以后发送mq的body,提供给消费这作具体的消息处理;
  • timeout:延时发送时间;
  • priority:优先级,与timeout一块儿决定消息如何发布,即同一 timeout 时间的消息中,哪一个优先使用。
  • shard:分区;
public class Message {
    private String id;
    private String payload;
    private long timeout;
    private int priority;
    private String shard;
}

5.2 存储结构

Dyno-queues 关于存储的整体思路是:hash 记录消息内容, zset 实现按到期时间排序的队列,即:

  • 利用hash 记录消息内容;
    • 使用hset存储消息;
    • 使用hget提取消息;
  • 经过Redis中的zset来实现一个延迟队列,主要利用它的score属性,Redis经过score来为集合中的成员进行从小到大的排序;
    • 使用zadd key score1 value1命令生产消息;
    • 使用zrem消费消息;

具体逻辑如图,这里的虚线指的是二者经过 msg id 来进行逻辑上的管理,物理没有关联:

+----------+----------+----------+-----+----------+
        |          |          |          |     |          |
 zset   | msg id 1 | msg id 2 | msg id 3 | ... | msg id n |
        |          |          |          |     |          |
        +---+------+----+-----+----+-----+-----+----+-----+
            |           |          |                |
            |           |          |                |
            v           v          v                v
        +---+---+   +---+---+   +--+----+        +--+--+
hash    | msg 1 |   | msg 2 |   | msg 3 |        |msg n|
        +-------+   +-------+   +-------+        +-----+

具体到代码,则是:

  • Message 的id做为key,Message总体被打包成json String做为value:quorumConn.hset(messageStoreKey, message.getId(), json);
  • 用Message 的超时时间,优先级以及当前时间戳构建出zset的分数:double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;

具体参见以下:

for (Message message : messages) {
    String json = om.writeValueAsString(message);
    quorumConn.hset(messageStoreKey, message.getId(), json);
    double priority = message.getPriority() / 100.0;
    double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;
    String shard = shardingStrategy.getNextShard(allShards, message);
    String queueShard = getQueueShardKey(queueName, shard);
    quorumConn.zadd(queueShard, score, message.getId());
}

0x06 队列

RedisDynoQueue是 Dyno-queues 延迟队列的主要实现。

6.1 Redis相关

从Redis角度来看,对于每一个队列,维护三组Redis数据结构:

  • 包含队列元素和分数的有序集合 zset;
  • 包含消息内容的Hash集合,其中key为消息ID;
  • 包含客户端已经消费但还没有确认的消息有序集合,Un-ack集合 zset;

这三组Redis数据结构在RedisDynoQueue内部其实没有对应的成员变量,对于RedisDynoQueue 来讲,看起来是逻辑概念,而事实上它们存在于Redis的内部存储中,由Dynomite负责高可用等等

具体以下:

message list



zset  +----------+----------+----------+-----+----------+
      |          |          |          |     |          |
      | msg id 1 | msg id 2 | msg id 3 | ... | msg id 9 |
      |          |          |          |     |          |
      +---+------+----+-----+----+-----+-----+----+-----+
          |           |          |                |
          |           |          |                |
          v           v          v                v
hash  +---+---+   +---+---+   +--+----+        +--+--+
      | msg 1 |   | msg 2 |   | msg 3 |        |msg 9|
      +-------+   +-------+   +-------+        +-----+



                  unack list
       +------------+-------------+--------------+
zset   |            |             |              |
       |  msg id 11 |   msg id 12 |   msg id 13  |
       |            |             |              |
       +------------+-------------+--------------+

6.2 成员变量

RedisDynoQueue 的成员变量能够分类以下:

6.2.1 整体

  • String queueName:本Queue名字;
  • String shardName:分区名字;

6.2.2 Redis链接相关

  • JedisCommands quorumConn:采用 quorum 的链接;
  • JedisCommands nonQuorumConn:非Quorum的链接;

6.2.3 Redis操做相关

  • ObjectMapper om:用来把消息序列化,写到redis中;

  • Clock clock:用觉得分数生成时间戳;

  • String redisKeyPrefix:每一个queue的用户会给本身定义key;

  • String messageStoreKey:对于每一个Redis hash来讲,能够设定本身的field(字段),好比:

    this.messageStoreKey = redisKeyPrefix + ".MESSAGE." + queueName;
    
    quorumConn.hget(messageStoreKey, messageId)
  • List allShards:全部分区;

  • String localQueueShard:本地分区;

  • ShardingStrategy shardingStrategy:分区策略;

  • ConcurrentLinkedQueue prefetchedIds:Prefetch message IDs from the local shard;本地分区优先的消息;

  • Map<String, ConcurrentLinkedQueue > unsafePrefetchedIdsAllShardsMap;

    this.unsafePrefetchedIdsAllShardsMap = new HashMap<>();
    
    for (String shard : allShards) {
        unsafePrefetchedIdsAllShardsMap.put(getQueueShardKey(queueName, shard), new ConcurrentLinkedQueue<>());
    }
  • int retryCount = 2:重试次数;

6.2.4 Ack相关

  • int unackTime = 60:用以生成ack队列的分数。

    double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();
    
    long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
  • ScheduledExecutorService schedulerForUnacksProcessing:用以生成线程,来按期ack

schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);

if (this.singleRingTopology) {
    schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
} else {
    schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
}
  • boolean singleRingTopology:

6.2.5 监控与统计

QueueMonitor monitor:监控与统计;

6.2.6 具体定义

具体代码以下:

public class RedisDynoQueue implements DynoQueue {
    private final Clock clock;

    private final String queueName;

    private final List<String> allShards;

    private final String shardName;

    private final String redisKeyPrefix;

    private final String messageStoreKey;

    private final String localQueueShard;

    private volatile int unackTime = 60;

    private final QueueMonitor monitor;

    private final ObjectMapper om;

    private volatile JedisCommands quorumConn;

    private volatile JedisCommands nonQuorumConn;

    private final ConcurrentLinkedQueue<String> prefetchedIds;

    private final Map<String, ConcurrentLinkedQueue<String>> unsafePrefetchedIdsAllShardsMap;

    private final ScheduledExecutorService schedulerForUnacksProcessing;

    private final int retryCount = 2;

    private final ShardingStrategy shardingStrategy;

    private final boolean singleRingTopology;
}

至此,Dyno-queues 基本功能初步分析完毕,咱们下期继续介绍消息产生,消费。

0xFF 参考

干货分享 | 如何从零开始设计一个消息队列

消息队列的理解,几种常见消息队列对比,新手也能看得懂!----分布式中间件消息队列

消息队列设计精要

有赞延迟队列设计

基于Dynomite的分布式延迟队列

http://blog.mikebabineau.com/2013/02/09/delay-queues-in-redis/

http://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq

http://activemq.apache.org/delay-and-schedule-message-delivery.html

源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)

源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(2)

原创 Amazon Dynamo系统架构

Netlix Dynomite性能基准测试,基于AWS和Redis

为何分布式必定要有延时任务?

相关文章
相关标签/搜索