本系列咱们会以设计分布式延迟队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,经过分析Dyno-queues 分布式延迟队列的源码来具体看看设计实现一个分布式延迟队列的方方面面。html
前面两篇文章介绍了设计思路,消息的产生和消费。本文介绍一些辅助功能,有了这些功能可让系统更加完善。java
前面提到,从Redis角度来看,Dyno-queues 对于每一个队列,维护三组Redis数据结构:linux
这里的第三组数据结构,就是支持咱们的 Ack 机制。redis
前面提到,_pop
是消费消息,具体 _pop
的逻辑以下:apache
这就是涉及到 包含客户端已经消费但还没有确认的消息有序集合,Un-ack集合。json
代码以下:服务器
private List<Message> _pop(String shard, int messageCount, ConcurrentLinkedQueue<String> prefetchedIdQueue) throws Exception { String queueShardName = getQueueShardKey(queueName, shard); String unackShardName = getUnackKey(queueName, shard); double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue(); // NX option indicates add only if it doesn't exist. // https://redis.io/commands/zadd#zadd-options-redis-302-or-greater ZAddParams zParams = ZAddParams.zAddParams().nx(); List<Message> popped = new LinkedList<>(); for (;popped.size() != messageCount;) { String msgId = prefetchedIdQueue.poll(); //将messageID添加到unack集合中 long added = quorumConn.zadd(unackShardName, unackScore, msgId, zParams); if(added == 0){ monitor.misses.increment(); continue; } long removed = quorumConn.zrem(queueShardName, msgId); if (removed == 0) { monitor.misses.increment(); continue; } String json = quorumConn.hget(messageStoreKey, msgId); if (json == null) { monitor.misses.increment(); continue; } Message msg = om.readValue(json, Message.class); popped.add(msg); if (popped.size() == messageCount) { return popped; } } return popped; }
此时逻辑以下:网络
message list zset +----------+----------+----------+-----+----------+ _pop (msg id 9) | | | | | | | msg id 1 | msg id 2 | msg id 3 | ... | msg id 9 | +----+ | | | | | | | +---+------+----+-----+----+-----+-----+----+-----+ | | | | | | | | | | | v v v v | hash +---+---+ +---+---+ +--+----+ +--+--+ | | msg 1 | | msg 2 | | msg 3 | |msg 9| | +-------+ +-------+ +-------+ +-----+ | | | | | | | unack list | +------------+-------------+--------------+ | zset | | | | | | msg id 11 | msg id 12 | msg id 13 | <----------+ | | | | +------------+-------------+--------------+
用户当获得消息以后,须要Ack消息,好比:数据结构
List pushed_msgs = V1Queue.push(payloads); Message poppedWithPredicate = V1Queue.popMsgWithPredicate("searchable pay*", false); V1Queue.ack(poppedWithPredicate.getId());
Ack的逻辑是:架构
代码以下:
@Override public boolean ack(String messageId) { try { return execute("ack", "(a shard in) " + queueName, () -> { for (String shard : allShards) { String unackShardKey = getUnackKey(queueName, shard); Long removed = quorumConn.zrem(unackShardKey, messageId); if (removed > 0) { quorumConn.hdel(messageStoreKey, messageId); return true; } } return false; }); } } private String getUnackKey(String queueName, String shard) { return redisKeyPrefix + ".UNACK." + queueName + "." + shard; }
具体以下:
message list zset +----------+----------+----------+------ | | | | | | msg id 1 | msg id 2 | msg id 3 | ... | | | | | | +---+------+----+-----+----+-----+-----+ | | | | | | v v v delete hash +---+---+ +---+---+ +--+----+ +-----+ | msg 1 | | msg 2 | | msg 3 | |msg 9| <----+ ACK(msg id 9) +-------+ +-------+ +-------+ +-----+ + | | | | | | unack list | +------------+-------------+--------------+-------------+ delete| zset | | | | | | | msg id 11 | msg id 12 | msg id 13 | msg id 9 | <-----+ | | | | | +------------+-------------+--------------+-------------+
后台进程会定时作检测,即 监视 UNACK 集合中的消息,这些消息在给定时间内未被客户端确认(每一个队列可配置)。这些消息将移回到队列中。
定时任务是以下代码来启动:
schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1); if (this.singleRingTopology) { schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS); } else { schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS); }
以下代码,就是把未确认消息退回到队列中。
@Override public void processUnacks() { try { long queueDepth = size(); monitor.queueDepth.record(queueDepth); String keyName = getUnackKey(queueName, shardName); execute("processUnacks", keyName, () -> { int batchSize = 1_000; String unackShardName = getUnackKey(queueName, shardName); double now = Long.valueOf(clock.millis()).doubleValue(); int num_moved_back = 0; int num_stale = 0; Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize); for (Tuple unack : unacks) { double score = unack.getScore(); String member = unack.getElement(); String payload = quorumConn.hget(messageStoreKey, member); if (payload == null) { quorumConn.zrem(unackShardName, member); ++num_stale; continue; } long added_back = quorumConn.zadd(localQueueShard, score, member); long removed_from_unack = quorumConn.zrem(unackShardName, member); if (added_back > 0 && removed_from_unack > 0) ++num_moved_back; } return null; }); } }
此时逻辑以下:
message list zset +----------+----------+----------+-----+ | | | | | +-------------> | msg id 1 | msg id 2 | msg id 3 | ... | | | | | | | | +---+------+----+-----+----+-----+-----+ | | | | | | | | | v v v | hash +---+---+ +---+---+ +--+----+ | | msg 1 | | msg 2 | | msg 3 | | +-------+ +-------+ +-------+ | | | | unack list | +------------+-------------+--------------+ | zset | | | | | | msg id 11 | msg id 12 | msg id 13 | +-------------+ | | | | msg id 11 +-------+----+-------------+--------------+ ^ | msg id 11 | +-------+---------+ | | | ScheduledThread | | | +-----------------+
对于防止重复消费,系统作了以下努力:
生产者将数据发送到 MQ 的时候,可能数据就在半路给搞丢了,由于网络问题啥的,都有可能。
好比,以下就是简单的插入,缺乏必要的保证。
List pushed_msgs = V1Queue.push(payloads);
这种状况就是 MQ 本身弄丢了数据,这个你必须开启MQ 的持久化,就是消息写入以后会持久化到磁盘,哪怕是 MQ 本身挂了,恢复以后会自动读取以前存储的数据,通常数据不会丢。
Dyno-queues 使用ensure来确认消息彻底写入到全部分区。
简单来讲,就是:
Enqueues 'message' if it doesn't exist in any of the shards or unack sets.
@Override public boolean ensure(Message message) { return execute("ensure", "(a shard in) " + queueName, () -> { String messageId = message.getId(); for (String shard : allShards) { String queueShard = getQueueShardKey(queueName, shard); Double score = quorumConn.zscore(queueShard, messageId); if (score != null) { return false; } String unackShardKey = getUnackKey(queueName, shard); score = quorumConn.zscore(unackShardKey, messageId); if (score != null) { return false; } } push(Collections.singletonList(message)); return true; }); }
针对过时消息,Dyno-queues 的处理方式是一次性找出过时消息给用户处理,其中过时时间由用户在参数中设定。
因此 findStaleMessages 就是利用 lua 脚本找出过时消息。
@Override public List<Message> findStaleMessages() { return execute("findStaleMessages", localQueueShard, () -> { List<Message> stale_msgs = new ArrayList<>(); int batchSize = 10; double now = Long.valueOf(clock.millis()).doubleValue(); long num_stale = 0; for (String shard : allShards) { String queueShardName = getQueueShardKey(queueName, shard); Set<String> elems = nonQuorumConn.zrangeByScore(queueShardName, 0, now, 0, batchSize); if (elems.size() == 0) { continue; } String findStaleMsgsScript = "local hkey=KEYS[1]\n" + "local queue_shard=ARGV[1]\n" + "local unack_shard=ARGV[2]\n" + "local num_msgs=ARGV[3]\n" + "\n" + "local stale_msgs={}\n" + "local num_stale_idx = 1\n" + "for i=0,num_msgs-1 do\n" + " local msg_id=ARGV[4+i]\n" + "\n" + " local exists_hash = redis.call('hget', hkey, msg_id)\n" + " local exists_queue = redis.call('zscore', queue_shard, msg_id)\n" + " local exists_unack = redis.call('zscore', unack_shard, msg_id)\n" + "\n" + " if (exists_hash and exists_queue) then\n" + " elseif (not (exists_unack)) then\n" + " stale_msgs[num_stale_idx] = msg_id\n" + " num_stale_idx = num_stale_idx + 1\n" + " end\n" + "end\n" + "\n" + "return stale_msgs\n"; String unackKey = getUnackKey(queueName, shard); ImmutableList.Builder builder = ImmutableList.builder(); builder.add(queueShardName); builder.add(unackKey); builder.add(Integer.toString(elems.size())); for (String msg : elems) { builder.add(msg); } ArrayList<String> stale_msg_ids = (ArrayList) ((DynoJedisClient)quorumConn).eval(findStaleMsgsScript, Collections.singletonList(messageStoreKey), builder.build()); num_stale = stale_msg_ids.size(); for (String m : stale_msg_ids) { Message msg = new Message(); msg.setId(m); stale_msgs.add(msg); } } return stale_msgs; }); }
Dyno-queues 支持消息删除:业务使用方能够随时删除指定消息。
具体删除是 从 unack队列 和 正常队列中删除。
@Override public boolean remove(String messageId) { return execute("remove", "(a shard in) " + queueName, () -> { for (String shard : allShards) { String unackShardKey = getUnackKey(queueName, shard); quorumConn.zrem(unackShardKey, messageId); String queueShardKey = getQueueShardKey(queueName, shard); Long removed = quorumConn.zrem(queueShardKey, messageId); if (removed > 0) { // Ignoring return value since we just want to get rid of it. Long msgRemoved = quorumConn.hdel(messageStoreKey, messageId); return true; } } return false; }); }
Dyno-queues 利用lua脚原本进行批量处理,这样能够增长吞吐。
Redis中为何引入Lua脚本?
Redis提供了很是丰富的指令集,官网上提供了200多个命令。可是某些特定领域,须要扩充若干指令原子性执行时,仅使用原生命令便没法完成。
Redis 为这样的用户场景提供了 lua 脚本支持,用户能够向服务器发送 lua 脚原本执行自定义动做,获取脚本的响应数据。Redis 服务器会单线程原子性执行 lua 脚本,保证 lua 脚本在处理的过程当中不会被任意其它请求打断。
使用脚本的好处以下:
具体代码以下,能够看到就是采用了lua脚本一次性写入:
// TODO: Do code cleanup/consolidation private List<Message> atomicBulkPopHelper(int messageCount, ConcurrentLinkedQueue<String> prefetchedIdQueue, boolean localShardOnly) throws IOException { double now = Long.valueOf(clock.millis() + 1).doubleValue(); double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue(); // The script requires the scores as whole numbers NumberFormat fmt = NumberFormat.getIntegerInstance(); fmt.setGroupingUsed(false); String nowScoreString = fmt.format(now); String unackScoreString = fmt.format(unackScore); List<String> messageIds = new ArrayList<>(); for (int i = 0; i < messageCount; ++i) { messageIds.add(prefetchedIdQueue.poll()); } String atomicBulkPopScriptLocalOnly="local hkey=KEYS[1]\n" + "local num_msgs=ARGV[1]\n" + "local peek_until=ARGV[2]\n" + "local unack_score=ARGV[3]\n" + "local queue_shard_name=ARGV[4]\n" + "local unack_shard_name=ARGV[5]\n" + "local msg_start_idx = 6\n" + "local idx = 1\n" + "local return_vals={}\n" + "for i=0,num_msgs-1 do\n" + " local message_id=ARGV[msg_start_idx + i]\n" + " local exists = redis.call('zscore', queue_shard_name, message_id)\n" + " if (exists) then\n" + " if (exists <=peek_until) then\n" + " local value = redis.call('hget', hkey, message_id)\n" + " if (value) then\n" + " local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n" + " if (zadd_ret) then\n" + " redis.call('zrem', queue_shard_name, message_id)\n" + " return_vals[idx]=value\n" + " idx=idx+1\n" + " end\n" + " end\n" + " end\n" + " else\n" + " return {}\n" + " end\n" + "end\n" + "return return_vals"; String atomicBulkPopScript="local hkey=KEYS[1]\n" + "local num_msgs=ARGV[1]\n" + "local num_shards=ARGV[2]\n" + "local peek_until=ARGV[3]\n" + "local unack_score=ARGV[4]\n" + "local shard_start_idx = 5\n" + "local msg_start_idx = 5 + (num_shards * 2)\n" + "local out_idx = 1\n" + "local return_vals={}\n" + "for i=0,num_msgs-1 do\n" + " local found_msg=false\n" + " local message_id=ARGV[msg_start_idx + i]\n" + " for j=0,num_shards-1 do\n" + " local queue_shard_name=ARGV[shard_start_idx + (j*2)]\n" + " local unack_shard_name=ARGV[shard_start_idx + (j*2) + 1]\n" + " local exists = redis.call('zscore', queue_shard_name, message_id)\n" + " if (exists) then\n" + " found_msg=true\n" + " if (exists <=peek_until) then\n" + " local value = redis.call('hget', hkey, message_id)\n" + " if (value) then\n" + " local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n" + " if (zadd_ret) then\n" + " redis.call('zrem', queue_shard_name, message_id)\n" + " return_vals[out_idx]=value\n" + " out_idx=out_idx+1\n" + " break\n" + " end\n" + " end\n" + " end\n" + " end\n" + " end\n" + " if (found_msg == false) then\n" + " return {}\n" + " end\n" + "end\n" + "return return_vals"; List<Message> payloads = new ArrayList<>(); if (localShardOnly) { String unackShardName = getUnackKey(queueName, shardName); ImmutableList.Builder builder = ImmutableList.builder(); builder.add(Integer.toString(messageCount)); builder.add(nowScoreString); builder.add(unackScoreString); builder.add(localQueueShard); builder.add(unackShardName); for (int i = 0; i < messageCount; ++i) { builder.add(messageIds.get(i)); } List<String> jsonPayloads; // Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'. jsonPayloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScriptLocalOnly, Collections.singletonList(messageStoreKey), builder.build()); for (String p : jsonPayloads) { Message msg = om.readValue(p, Message.class); payloads.add(msg); } } else { ImmutableList.Builder builder = ImmutableList.builder(); builder.add(Integer.toString(messageCount)); builder.add(Integer.toString(allShards.size())); builder.add(nowScoreString); builder.add(unackScoreString); for (String shard : allShards) { String queueShard = getQueueShardKey(queueName, shard); String unackShardName = getUnackKey(queueName, shard); builder.add(queueShard); builder.add(unackShardName); } for (int i = 0; i < messageCount; ++i) { builder.add(messageIds.get(i)); } List<String> jsonPayloads; // Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'. jsonPayloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScript, Collections.singletonList(messageStoreKey), builder.build()); for (String p : jsonPayloads) { Message msg = om.readValue(p, Message.class); payloads.add(msg); } } return payloads; }
最新版本是 V2,有三个类,咱们看看具体是什么做用。
QueueBuilder
MultiRedisQueue
RedisPipelineQueue
就是封装,对外统一提供API。
public class QueueBuilder { private Clock clock; private String queueName; private String redisKeyPrefix; private int unackTime; private String currentShard; private ShardSupplier shardSupplier; private HostSupplier hs; private EurekaClient eurekaClient; private String applicationName; private Collection<Host> hosts; private JedisPoolConfig redisPoolConfig; private DynoJedisClient dynoQuorumClient; private DynoJedisClient dynoNonQuorumClient; }
该类也是为了提升速度,其内部包括多个RedisPipelineQueue,每一个queue表明一个分区,利用 round robin 方式写入。
/** * MultiRedisQueue exposes a single queue using multiple redis queues. Each RedisQueue is a shard. * When pushing elements to the queue, does a round robin to push the message to one of the shards. * When polling, the message is polled from the current shard (shardName) the instance is associated with. */ public class MultiRedisQueue implements DynoQueue { private List<String> shards; private String name; private Map<String, RedisPipelineQueue> queues = new HashMap<>(); private RedisPipelineQueue me; }
这个类就是使用pipeline来提高吞吐。
Queue implementation that uses Redis pipelines that improves the throughput under heavy load.。
public class RedisPipelineQueue implements DynoQueue { private final Logger logger = LoggerFactory.getLogger(RedisPipelineQueue.class); private final Clock clock; private final String queueName; private final String shardName; private final String messageStoreKeyPrefix; private final String myQueueShard; private final String unackShardKeyPrefix; private final int unackTime; private final QueueMonitor monitor; private final ObjectMapper om; private final RedisConnection connPool; private volatile RedisConnection nonQuorumPool; private final ScheduledExecutorService schedulerForUnacksProcessing; private final HashPartitioner partitioner = new Murmur3HashPartitioner(); private final int maxHashBuckets = 32; private final int longPollWaitIntervalInMillis = 10; }
消息队列的理解,几种常见消息队列对比,新手也能看得懂!----分布式中间件消息队列
http://blog.mikebabineau.com/2013/02/09/delay-queues-in-redis/
http://stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq
http://activemq.apache.org/delay-and-schedule-message-delivery.html
源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(1)
源码分析] Dynomite 分布式存储引擎 之 DynoJedisClient(2)