在java web开发中,你们常常会用到定时任务。比较经常使用的类库有:Timer、ScheduledExecutor 、Quartz、Spring Scheduler。
Timer和ScheduleExecutor都不支持持久化和分布式,宕机或重启则待执行的任务丢失。部署多个实例时会同时触发任务。
Quartz若支持持久化和分布式须要较多的配置,貌似是7张表。
Spring Scheduler支持持久化和分布式也是须要相应的插件。
本文将介绍下另外一种思路,使用rabbitmq实现定时任务。java
DLX, Dead-Letter-Exchange。当消息在一个队列中变成死信以后,它能被从新publish到另外一个Exchange,这个Exchange就是DLX。消息变成死信一贯有一下几种状况:web
rabbitMq配置两个exchange:exchange-product负责接收生产者的消息、DLX负责接收死信。两个queue:queue-dead绑定在exchange-product上,queue-consume绑定在DLX上。
消息传递流程:spring
像这种定时任务通常都是须要常常执行的。所以,队列,exchange都须要持久化。因此能够在控制台去建立队列和exchange。所以不须要在代码里建立队列以及exchange,只须要发送和接收消息就好了。json
//消费者 @Component public class MessageConsumer { @RabbitListener(queues = "queue-consume")//指定要消费的队列 public void receive(byte[] body) { long end = System.currentTimeMillis(); String start = new String(body);//解析消息体,内容传的消息发送时间 System.out.println("start: "+ start); System.out.println("end: "+ end); System.out.println("offset: "+(Long.parseLong(start)-end));//计算实际定时时间 } } //生产者 @Component public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void send(String str){ MessageProperties messageProperties = new MessageProperties(); Message message = new Message(str.getBytes(), messageProperties); messageProperties.setContentType("json"); messageProperties.setExpiration("10000");//设置消息超时时间为10秒 rabbitTemplate.send("exchange-product","",message);//发送消息到exchange-product } }
演示的例子很简单。为了方便,是在spring的环境下写的。exchange配置的类型为fanout,所以不须要指定routing key。实际应用时能够换成别的类型。也没有指定手动ack。实际应用时仍是建议手动ack。网络
public class ApplicationTests { @Autowired MessageProducer messageProducer; @Test public void push() { long start = System.currentTimeMillis(); messageProducer.send(""+start); } }
能够看到最终偏差大概在0.1秒。因为我本机和rabbitmq不在一个局域网,因此网络开销比较大。局域网链接估计偏差会小一些。分布式
因为rabbitmq的消息,在没有优先级的状况下,是会按顺序消费,判断超时也是按顺序的。如有两条消息message1和message2。message1的定时时间(即超时时间)为10秒。message2的定时时间为5秒。若message1先发送,则须要先等待message1超时后publish到DLX,message2才能判断超时publish到DLX。最终message2可能须要等10秒。
那这种定时还有什么用呢?
刚刚举得例子是由于两条消息的超时时间不一样。若两条消息超时时间相同便没有问题了。所以这种定时只适用于,同一业务的消息定时时间固定。多个业务建多个队列和exchange就行了。测试