rabbitmq实现分布式定时任务

在java web开发中,你们常常会用到定时任务。比较经常使用的类库有:Timer、ScheduledExecutor 、Quartz、Spring Scheduler。
Timer和ScheduleExecutor都不支持持久化和分布式,宕机或重启则待执行的任务丢失。部署多个实例时会同时触发任务。
Quartz若支持持久化和分布式须要较多的配置,貌似是7张表。
Spring Scheduler支持持久化和分布式也是须要相应的插件。
本文将介绍下另外一种思路,使用rabbitmq实现定时任务。java

死信队列

DLX, Dead-Letter-Exchange。当消息在一个队列中变成死信以后,它能被从新publish到另外一个Exchange,这个Exchange就是DLX。消息变成死信一贯有一下几种状况:web

  • 消息被拒绝(basic.reject/ basic.nack)而且requeue=false
  • 消息TTL过时
  • 队列达到最大长度
    变为死信队列也很简单,为队列设置一个属性Dead letter exchange指定死信要从新publish到的Exchange就好了。

思路


rabbitMq配置两个exchange:exchange-product负责接收生产者的消息、DLX负责接收死信。两个queue:queue-dead绑定在exchange-product上,queue-consume绑定在DLX上。
消息传递流程:spring

  1. 生产者发送消息到exchange-product后,因为这个exchange Binding了queue-dead,所以消息会传到queue-dead。
  2. 因为这个队列没有消费者,所以queue-dead的消息会超时。因为queue-dead设置了Dead Letter exchange属性,指定了死信要publish到DLX,所以消息会到DLX。
  3. DLX这个exchange Binding了queue-consume。消息会到queue-consume中。
  4. 因为消费者链接了queue-consume,最终消息传到消费者。

代码

像这种定时任务通常都是须要常常执行的。所以,队列,exchange都须要持久化。因此能够在控制台去建立队列和exchange。所以不须要在代码里建立队列以及exchange,只须要发送和接收消息就好了。json

//消费者
@Component
public class MessageConsumer {
    @RabbitListener(queues = "queue-consume")//指定要消费的队列
    public void receive(byte[] body) {
        long end = System.currentTimeMillis();
        String start = new String(body);//解析消息体,内容传的消息发送时间
        System.out.println("start: "+ start);
        System.out.println("end: "+ end);
        System.out.println("offset: "+(Long.parseLong(start)-end));//计算实际定时时间
    }
}
//生产者
@Component
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send(String str){
        MessageProperties messageProperties = new MessageProperties();
        Message message = new Message(str.getBytes(), messageProperties);
        messageProperties.setContentType("json");
        messageProperties.setExpiration("10000");//设置消息超时时间为10秒
        rabbitTemplate.send("exchange-product","",message);//发送消息到exchange-product
    }
}

演示的例子很简单。为了方便,是在spring的环境下写的。exchange配置的类型为fanout,所以不须要指定routing key。实际应用时能够换成别的类型。也没有指定手动ack。实际应用时仍是建议手动ack。网络

测试

public class ApplicationTests {
	@Autowired
	MessageProducer messageProducer;
	@Test
	public void push() {
		long start = System.currentTimeMillis();
		messageProducer.send(""+start);
	}
}


能够看到最终偏差大概在0.1秒。因为我本机和rabbitmq不在一个局域网,因此网络开销比较大。局域网链接估计偏差会小一些。分布式

缺点

因为rabbitmq的消息,在没有优先级的状况下,是会按顺序消费,判断超时也是按顺序的。如有两条消息message1和message2。message1的定时时间(即超时时间)为10秒。message2的定时时间为5秒。若message1先发送,则须要先等待message1超时后publish到DLX,message2才能判断超时publish到DLX。最终message2可能须要等10秒。
那这种定时还有什么用呢
刚刚举得例子是由于两条消息的超时时间不一样。若两条消息超时时间相同便没有问题了。所以这种定时只适用于,同一业务的消息定时时间固定。多个业务建多个队列和exchange就行了。测试

相关文章
相关标签/搜索