本系列咱们会以设计分布式延迟队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,经过分析Dyno-queues 分布式延迟队列的源码来具体看看设计实现一个分布式延迟队列的方方面面。html
Dyno-queues 是 Netflix 实现的基于 Dynomite 和 Redis 构建的队列。java
Dynomite是一种通用的实现,能够与许多不一样的 key-value 存储引擎一块儿使用。目前它提供了对Redis序列化协议(RESP)和Memcached写协议的支持。linux
具体设计目标依据业务系统不一样而不一样。git
Dyno-queues 的业务背景是:在 Netflix 的平台上运行着许多的业务流程,这些流程的任务是经过异步编排进行驱动,如今要实现一个分布式延迟队列,这个延迟队列具备以下特色:github
Netflix 选择 Dynomite,是由于:redis
Netflix选择Redis做为构建队列的存储引擎是由于:数据库
查询模型:基于Key-Value模型,而不是SQL,即关系模型。存储对象比较小。apache
ACID属性:传统的关系数据库中,用ACID(A原子性、C一致性、I 隔离性、D持久性)来保证事务,在保证ACID的前提下每每有不好的可用性。Dynamo用弱一致性C来达到高可用,不提供数据隔离 I,只容许单Key更新。编程
其实全部的高可用,是能够依赖于RPC和存储的高可用来实现的。json
Netflix 选择 Dynomite,是由于:
因此 Dyno-queues 的高可用就自动解决了。
怎么保证幂等呢?最简单的方式莫过于共享存储。broker多机器共享一个DB或者一个分布式文件/kv系统,则处理消息天然是幂等的。就算有单点故障,其余节点能够马上顶上。
对于不共享存储的队列,如Kafka使用分区加主备模式,就略微麻烦一些。须要保证每个分区内的高可用性,也就是每个分区至少要有一个主备且须要作数据的同步。
Dynomite 使用 redis 集群这个共享存储 作了幂等保证。
消息到达服务端后,若是不通过任何处理就到接收者,broker就失去了它的意义。为了知足咱们错峰/流控/最终可达等一系列需求,把消息存储下来,而后选择时机投递就显得是瓜熟蒂落的了。
这个存储能够作成不少方式。好比存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。但归结起来,主要有持久化和非持久化两种。
持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),而且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。
但并非每种消息都须要持久化存储。不少消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几回failover,最终投递出去也何尝不可。
Dynomite 使用 redis 集群这个共享存储 在必定程度上缓解了消息堆积问题。
咱们来看看若是须要数据落地的状况下各类存储子系统的选择。理论上,从速度来看,文件系统 > 分布式KV(持久化)> 分布式文件系统 > 数据库,而可靠性却截然相反。仍是要从支持的业务场景出发做出最合理的选择。
若是大家的消息队列是用来支持支付/交易等对可靠性要求很是高,但对性能和量的要求没有这么高,并且没有时间精力专门作文件存储系统的研究,DB是最好的选择。
可是DB受制于IOPS,若是要求单broker 5位数以上的QPS性能,基于文件的存储是比较好的解决方案。总体上能够采用数据文件 + 索引文件的方式处理。
分布式KV(如MongoDB,HBase)等,或者持久化的Redis,因为其编程接口较友好,性能也比较可观,若是在可靠性要求不是那么高的场景,也不失为一个不错的选择。
由于 场景是 可靠性要求不那么高,因此 Dynomite 使用 redis 集群这个存储子系统 也是能够的。
下一个重要的事情就是解析发送接收关系,进行正确的消息投递了。抛开现象看本质,发送接收关系无外乎是单播与广播的区别。所谓单播,就是点到点;而广播,是一点对多点。
通常比较通用的设计是支持组间广播,不一样的组注册不一样的订阅。组内的不一样机器,若是注册一个相同的ID,则单播;若是注册不一样的ID(如IP地址+端口),则广播。
至于广播关系的维护,通常因为消息队列自己都是集群,因此都维护在公共存储上,如 config server、zookeeper等。维护广播关系所要作的事情基本是一致的:
本文后续会介绍如何维护发送关系。
数据分片的逻辑既能够实如今客户端,也能够实如今 Proxy
层,取决于你的架构如何设计。
传统的数据库中间件大多将分片逻辑实如今客户端,经过改写物理 SQL
访问不一样的 MySQL
库;而在 NewSQL
数据库倡导的计算存储分离架构中,一般将分片逻辑实如今计算层,即 Proxy
层,经过无状态的计算节点转发用户请求到正确的存储节点。
在 Dynomite 之中,队列根据可用区域进行分片,将数据推送到队列时,经过轮训机制肯定分片,这种机制能够确保全部分片的数据是平衡的,每一个分片都表明Redis中的有序集合,有序集中的 key 是 queueName 和 AVAILABILITY _ZONE 的组合。
public class RoundRobinStrategy implements ShardingStrategy { private final AtomicInteger nextShardIndex = new AtomicInteger(0); /** * Get shard based on round robin strategy. * @param allShards */ @Override public String getNextShard(List<String> allShards, Message message) { int index = nextShardIndex.incrementAndGet(); if (index >= allShards.size()) { nextShardIndex.set(0); index = 0; } return allShards.get(index); } }
Dyno-queues 队列是在 Dynomite 的JAVA客户端 Dyno 之上创建的,Dyno 为持久链接提供链接池,而且能够配置为拓扑感知。关于 Dyno 具体能够参见前文:
源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)
源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(2)
Dyno为应用程序提供特定的本地机架(在AWS中,机架是一个区域,例如 us-east-1a、us-east-1b等),us-east-1a的客户端将链接到相同区域的Dynomite/Redis节点,除非该节点不可用,在这种状况下该客户端将进行故障转移。这个属性被用于经过区域划分队列。
队列根据可用区域进行分片,将数据推送到队列时,经过轮训机制肯定分片,这种机制能够确保全部分片的数据是平衡的,每一个分片都表明Redis中的有序集合,有序集中的key是queueName和AVAILABILITY _ZONE的组合。
具体机制举例以下:
public class RoundRobinStrategy implements ShardingStrategy { private final AtomicInteger nextShardIndex = new AtomicInteger(0); /** * Get shard based on round robin strategy. * @param allShards */ @Override public String getNextShard(List<String> allShards, Message message) { int index = nextShardIndex.incrementAndGet(); if (index >= allShards.size()) { nextShardIndex.set(0); index = 0; } return allShards.get(index); } }
在分布式系统中有个CAP理论,对于P(分区容忍性)而言,是实际存在 从而没法避免的。由于分布系统中的处理不是在本机,而是网络中的许多机器相互通讯,故网络分区、网络通讯故障问题没法避免。所以,只能尽可能地在C 和 A 之间寻求平衡。
对于数据存储而言,为了提升可用性(Availability),采用了副本备份,好比对于HDFS,默认每块数据存三份。某数据块所在的机器宕机了,就去该数据块副本所在的机器上读取(从这能够看出,数据分布方式是按“数据块”为单位分布的)
可是问题来了,当须要修改数据时,就须要更新全部的副本数据,这样才能保证数据的一致性(Consistency)。所以,就须要在 C(Consistency) 和 A(Availability) 之间权衡。
而Quorum机制,就是这样的一种权衡机制,一种将“读写转化”的模型。
显然,咱们更想要作到强一致性的这种效果,那么有哪些方式能够实现呢,其中最为简单直接的就是 WARO,也就是Write All Read one。
WARO 是一种简单的副本控制协议,当 Client 请求向某副本写数据时(更新数据),只有当全部的副本都更新成功以后,此次写操做才算成功,不然视为失败。这样的话,只须要读任何一个副本上的数据便可。可是WARO带来的影响是写服务的可用性较低,由于只要有一个副本更新失败,这次写操做就视为失败了。
Quorum 的定义以下:假设有 N 个副本,更新操做 wi 在 W 个副本中更新成功以后,则认为这次更新操做 wi 成功,把此次成功提交的更新操做对应的数据叫作:“成功提交的数据”。对于读操做而言,至少须要读 R 个副本,其中,W+R>N ,即 W 和 R 有重叠,通常,W+R=N+1。
Quorum机制认为每次写入的机器数目达到大多数(W)时,就认为本次写操做成功了。即Quorum机制可以不须要更新彻底部的数据,但又保证返回给用户的是有效数据的解决方案。
咱们以 ES 为例。
咱们在发送任何一个增删改操做的时候,均可以带上一个consistency参数,指明咱们想要的写一致性是什么。
quorum = int((primary shard+number_of_replicas)/2)+1
若是节点数少于quorum,可能致使querum不齐全,进而致使没法执行任何写操做。quorum不齐全时,会进行等待。默认等待时间为1分钟,期待活跃的shard数量能够增长,最后实在不行,就会timeout。
Dynomite 可以将最终一致性(eventual consistency)扩展为协调一致性(tunable consistency)。
关于QUORUM,Dynomite有以下配置:
由测试获得的结果,Dynomite能从3,6,12,24一路扩展到48个节点,在DC_ONE和DC_QUORUM模式下,吞吐率都能线性地增加。与此同时,Dynomite在延迟方面只增长了不多的开支,即使在DC_QUORUM模式下,(延迟)也只有几毫秒。DC_QUORUM模式在延迟和吞吐量方面处于劣势,可是能为客户提供更好的读写保证。
对于Dyno-queues来讲,则是在实现中有所体现。好比在 RedisQueues 中,有以下成员变量:
private final JedisCommands quorumConn; private final JedisCommands nonQuorumConn;
在构建 RedisQueues 时,就须要注明使用哪种。
而从注释中咱们可知,
@param quorumConn
Dyno connection with dc_quorum enabled,就是 采用了Quorum的Redis;@param nonQuorumConn
Dyno connection to local Redis,就是本地Redis;生成 RedisQueues 的代码以下(注意其中注释):
/** * @param quorumConn Dyno connection with dc_quorum enabled * @param nonQuorumConn Dyno connection to local Redis */ public RedisQueues(JedisCommands quorumConn, JedisCommands nonQuorumConn, String redisKeyPrefix, ShardSupplier shardSupplier, int unackTime, int unackHandlerIntervalInMS, ShardingStrategy shardingStrategy) { this(Clock.systemDefaultZone(), quorumConn, nonQuorumConn, redisKeyPrefix, shardSupplier, unackTime, unackHandlerIntervalInMS, shardingStrategy); }
在有分片时,就从nonQuorumConn(就是本地Redis)提取。
使用nonQuorumConn来预取的缘由是:最终一致性(eventual consistency)。
由于 replication lag,在某一时刻不一样分片的数据可能不同,因此须要先预取。这就须要使用 nonQuorumConn 来预取,由于本地 redis 的数据才是正确的。
private Set<String> doPeekIdsFromShardHelper(final String queueShardName, final double peekTillTs, final int offset,final int count) { return nonQuorumConn.zrangeByScore(queueShardName, 0, peekTillTs, offset, count); }
再好比处理没有 ack 的消息时,先从 nonQuorumConn 读取信息ID,再从 quorumConn 读取消息内容。
这就是由于一致性致使的,因此以下:
@Override public void processUnacks() { execute("processUnacks", keyName, () -> { Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize); for (Tuple unack : unacks) { double score = unack.getScore(); String member = unack.getElement(); String payload = quorumConn.hget(messageStoreKey, member); long added_back = quorumConn.zadd(localQueueShard, score, member); } }); }
再好比从本地提取消息就使用了 nonQuorumConn。
@Override public Message localGet(String messageId) { try { return execute("localGet", messageStoreKey, () -> { String json = nonQuorumConn.hget(messageStoreKey, messageId); Message msg = om.readValue(json, Message.class); return msg; }); } }
再好比 popWithMsgIdHelper 也是先读取 nonQuorumConn,再从 quorumConn 读取其余内容。
public Message popWithMsgIdHelper(String messageId, String targetShard, boolean warnIfNotExists) { try { return execute("popWithMsgId", targetShard, () -> { String queueShardName = getQueueShardKey(queueName, targetShard); double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue(); String unackShardName = getUnackKey(queueName, targetShard); ZAddParams zParams = ZAddParams.zAddParams().nx(); Long exists = nonQuorumConn.zrank(queueShardName, messageId); // If we get back a null type, then the element doesn't exist. if (exists == null) { // We only have a 'warnIfNotExists' check for this call since not all messages are present in // all shards. So we want to avoid a log spam. If any of the following calls return 'null' or '0', // we may have hit an inconsistency (because it's in the queue, but other calls have failed), // so make sure to log those. monitor.misses.increment(); return null; } String json = quorumConn.hget(messageStoreKey, messageId); if (json == null) { monitor.misses.increment(); return null; } long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams); if (added == 0) { monitor.misses.increment(); return null; } long removed = quorumConn.zrem(queueShardName, messageId); if (removed == 0) { monitor.misses.increment(); return null; } Message msg = om.readValue(json, Message.class); return msg; }); } }
RedisQueues是为用户提供的外部接口,从其成员变量能够看出来其内部机制,好比各类策略。
public class RedisQueues implements Closeable { private final Clock clock; private final JedisCommands quorumConn; private final JedisCommands nonQuorumConn; private final Set<String> allShards; private final String shardName; private final String redisKeyPrefix; private final int unackTime; private final int unackHandlerIntervalInMS; private final ConcurrentHashMap<String, DynoQueue> queues; private final ShardingStrategy shardingStrategy; private final boolean singleRingTopology; }
用户经过get方法来获得DynoQueue:DynoQueue V1Queue = queues.get("simpleQueue")
。
public DynoQueue get(String queueName) { String key = queueName.intern(); return queues.computeIfAbsent(key, (keyToCompute) -> new RedisDynoQueue(clock, redisKeyPrefix, queueName, allShards, shardName, unackHandlerIntervalInMS, shardingStrategy, singleRingTopology) .withUnackTime(unackTime) .withNonQuorumConn(nonQuorumConn) .withQuorumConn(quorumConn)); }
咱们看看 Dyno-queues 中几种数据结构。
一个完整的消息队列应该定义清楚本身能够投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。对不一样的业务场景作不一样的选择。
Dyno-queues 只有服务端落地的可靠消息。每一个延时消息必须包括如下参数:
public class Message { private String id; private String payload; private long timeout; private int priority; private String shard; }
Dyno-queues 关于存储的整体思路是:hash 记录消息内容, zset 实现按到期时间排序的队列
,即:
具体逻辑如图,这里的虚线指的是二者经过 msg id 来进行逻辑上的管理,物理没有关联:
+----------+----------+----------+-----+----------+ | | | | | | zset | msg id 1 | msg id 2 | msg id 3 | ... | msg id n | | | | | | | +---+------+----+-----+----+-----+-----+----+-----+ | | | | | | | | v v v v +---+---+ +---+---+ +--+----+ +--+--+ hash | msg 1 | | msg 2 | | msg 3 | |msg n| +-------+ +-------+ +-------+ +-----+
具体到代码,则是:
quorumConn.hset(messageStoreKey, message.getId(), json);
double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority;
具体参见以下:
for (Message message : messages) { String json = om.writeValueAsString(message); quorumConn.hset(messageStoreKey, message.getId(), json); double priority = message.getPriority() / 100.0; double score = Long.valueOf(clock.millis() + message.getTimeout()).doubleValue() + priority; String shard = shardingStrategy.getNextShard(allShards, message); String queueShard = getQueueShardKey(queueName, shard); quorumConn.zadd(queueShard, score, message.getId()); }
RedisDynoQueue是 Dyno-queues 延迟队列的主要实现。
从Redis角度来看,对于每一个队列,维护三组Redis数据结构:
这三组Redis数据结构在RedisDynoQueue内部其实没有对应的成员变量,对于RedisDynoQueue 来讲,看起来是逻辑概念,而事实上它们存在于Redis的内部存储中,由Dynomite负责高可用等等。
具体以下:
message list zset +----------+----------+----------+-----+----------+ | | | | | | | msg id 1 | msg id 2 | msg id 3 | ... | msg id 9 | | | | | | | +---+------+----+-----+----+-----+-----+----+-----+ | | | | | | | | v v v v hash +---+---+ +---+---+ +--+----+ +--+--+ | msg 1 | | msg 2 | | msg 3 | |msg 9| +-------+ +-------+ +-------+ +-----+ unack list +------------+-------------+--------------+ zset | | | | | msg id 11 | msg id 12 | msg id 13 | | | | | +------------+-------------+--------------+
RedisDynoQueue 的成员变量能够分类以下:
ObjectMapper om:用来把消息序列化,写到redis中;
Clock clock:用觉得分数生成时间戳;
String redisKeyPrefix:每一个queue的用户会给本身定义key;
String messageStoreKey:对于每一个Redis hash来讲,能够设定本身的field(字段),好比:
this.messageStoreKey = redisKeyPrefix + ".MESSAGE." + queueName; quorumConn.hget(messageStoreKey, messageId)
List
String localQueueShard:本地分区;
ShardingStrategy shardingStrategy:分区策略;
ConcurrentLinkedQueue
Map<String, ConcurrentLinkedQueue
this.unsafePrefetchedIdsAllShardsMap = new HashMap<>(); for (String shard : allShards) { unsafePrefetchedIdsAllShardsMap.put(getQueueShardKey(queueName, shard), new ConcurrentLinkedQueue<>()); }
int retryCount = 2:重试次数;
int unackTime = 60:用以生成ack队列的分数。
double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue(); long added = quorumConn.zadd(unackShardName, unackScore, messageId, zParams);
ScheduledExecutorService schedulerForUnacksProcessing:用以生成线程,来按期ack
schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1); if (this.singleRingTopology) { schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS); } else { schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS); }
QueueMonitor monitor:监控与统计;
具体代码以下:
public class RedisDynoQueue implements DynoQueue { private final Clock clock; private final String queueName; private final List<String> allShards; private final String shardName; private final String redisKeyPrefix; private final String messageStoreKey; private final String localQueueShard; private volatile int unackTime = 60; private final QueueMonitor monitor; private final ObjectMapper om; private volatile JedisCommands quorumConn; private volatile JedisCommands nonQuorumConn; private final ConcurrentLinkedQueue<String> prefetchedIds; private final Map<String, ConcurrentLinkedQueue<String>> unsafePrefetchedIdsAllShardsMap; private final ScheduledExecutorService schedulerForUnacksProcessing; private final int retryCount = 2; private final ShardingStrategy shardingStrategy; private final boolean singleRingTopology; }
至此,Dyno-queues 基本功能初步分析完毕,咱们下期继续介绍消息产生,消费。
消息队列的理解,几种常见消息队列对比,新手也能看得懂!----分布式中间件消息队列
http://blog.mikebabineau.com/2013/02/09/delay-queues-in-redis/
http://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq
http://activemq.apache.org/delay-and-schedule-message-delivery.html
源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)
源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(2)