不少时候咱们会有延时处理一个任务的需求,好比说:html
下面咱们来分别探讨一下几种实现方案:java
Java中的DelayQueue位于java.util.concurrent包下,本质是由PriorityQueue和BlockingQueue实现的阻塞优先级队列。redis
放入队列的元素须要实现Delayed接口:数据库
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
经过实现这个接口,来完成对队列中元素,按照时间延迟前后排序的目的。apache
从队列中取元素:网络
/** * Retrieves and removes the head of this queue, waiting if necessary * until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
能够看到,在这段代码里,在第一个元素的延迟时间还没到的状况下:session
向队列中放入元素:数据结构
/** * Inserts the specified element into this delay queue. * * @param e the element to add * @return <tt>true</tt> * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
在放入元素的时候,会唤醒等待中的读线程。分布式
若是咱们不考虑分布式运行和任务持久化的话,Java中的DelayQueue是一个很理想的方案,精巧好用。性能
可是若是咱们须要分布式运行和任务持久化,就须要引入一些外部组件。
前文咱们看到,能够经过优先级队列来实现延迟队列的功能。
Redis提供了不少数据结构,其中的zset是一种有序的数据结构;咱们能够经过Redis中的zset来实现一个延迟队列。
基本的方法就是使用时间戳做为元素的score存入zset。
redis> ZADD delayqueue <future_timestamp> "messsage"
获取全部已经“就绪”的message,而且删除message。
redis> MULTI redis> ZRANGEBYSCORE delayqueue 0 <current_timestamp> redis> ZREMRANGEBYSCORE delayqueue 0 <current_timestamp> redis> EXEC
可是这个方案也有一些问题:
Redis事务虽然保证了一致性和隔离性,可是并无提供回滚功能。消息处理失败是不能被恢复的,若是处理某条消息的线程崩溃或机器宕机,这条未被处理不能被自动的再次处理。
也有考虑过将分为TODO和Doing两条队列:
先从TODO队列中取出任务,放入Doing中,再开始处理;若是停留在Doing队列总太久,则从新放入TODO队列。
可是因为Redis的事务特性,并不能作到彻底可靠;而且检查Doing超时的逻辑也略复杂。
那么有没有一个成熟的消息队列能够支持延迟投递消息的功能呢?
答案固然是有的,本文的标题就是使用RabbitMQ实现DelayQueue。
这是RabbitMQ众多隐藏的强大特性中的一个,能够轻松的下降代码的复杂度,实现DelayQueue的功能。
咱们须要两个队列,一个用来作主队列,真正的投递消息;另外一个用来延迟处理消息。
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare("MAIN_QUEUE", true, false, false, null); channel.queueBind("MAIN_QUEUE", "amq.direct", "MAIN_QUEUE"); HashMap<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "amq.direct"); arguments.put("x-dead-letter-routing-key", "MAIN_QUEUE"); channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments);
放入延迟消息:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); AMQP.BasicProperties properties = builder.expiration(String.valueOf(task.getDelayMillis())).deliveryMode(2).build(); channel.basicPublish("", "DELAY_QUEUE", properties, SerializationUtils.serialize(task));
而关键点,就在于 x-dead-letter-exchange 和 x-dead-letter-routing-key 两个参数上。这两个参数说明了:消息过时后的处理方式 --> 投递到咱们指定的MAIN_QUEUE;而后咱们只须要在MAIN_QUEUE中等待消息投递便可。
RabbitMQ自己提供了消息持久化和没有收到ACK的重投递功能,这样咱们就能够实现一个高可靠的分布式延迟消息队列了。
上面讲述的RabbitMQ定时任务方案有问题,RabbitMQ TTL文档 中写道:
Caveats
While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired. Hence resources used by such expired messages will not be freed, and they will be counted in queue statistics (e.g. the number of messages in the queue).
per-queue TTL不会有问题,由于快要过时的消息老是在队列的前边;可是若是使用per-message TTL的话,过时的消息有可能会在未过时的消息后边,直到前边的消息过时或者被消费。由于RabbitMQ保证过时的消息必定不会被消费者消费,可是不能保证消息过时就会从队列中移除。
ActiveMQ from version 5.4 has an optional persistent scheduler built into the ActiveMQ message broker.
能够支持定时、延迟投递、重复投递和Cron调度。
在配置文件中,启用<broker ... schedulerSupport="true">
选项后便可使用。
MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); long delay = 30 * 1000; long period = 10 * 1000; int repeat = 9; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); producer.send(message);
MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); producer.send(message);
因为ActiveMQ采用的是相似于Java中DelayQueue的方式,经过先将消息排序再定时触发的方式来实现延迟消息。在往队列中投递大量(10w+)定时消息以后,ActiveMQ的性能将会变得接近不可用,大量的消息挤压得不到投递。
RocketMQ 支持定时消息,可是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。
经过MySQL等数据库记录消息应该被投递的时间,而后循环进行查找,并把当前时间应该投递的消息放入普通的消息队列。