场景php
开发中常常须要用到定时任务,对于商城来讲,定时任务尤为多,好比优惠券定时过时、订单定时关闭、微信支付2小时未支付关闭订单等等,都须要用到定时任务,可是定时任务自己有一个问题,通常来讲咱们都是经过定时轮询查询数据库来判断是否有任务须要执行,也就是说无论怎么样,咱们须要先查询数据库,并且有些任务对时间准确要求比较高的,须要每秒查询一次,对于系统小却是无所谓,若是系统自己就大并且数据也多的状况下,这就不大现实了,因此须要其余方式的,固然实现的方式有多种多样的,好比Redis实现定时队列、基于优先级队列的JDK延迟队列、时间轮等。由于咱们项目中自己就使用到了Rabbitmq,因此基于方便开发和维护的原则,咱们使用了Rabbitmq延迟队列来实现定时任务,不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的能够查看我以前的文章Spring boot集成RabbitMQspring
Rabbitmq延迟队列Rabbitmq自己是没有延迟队列的,只能经过Rabbitmq自己队列的特性来实现,想要Rabbitmq实现延迟队列,须要使用Rabbitmq的死信交换机(Exchange)和消息的存活时间TTL(Time To Live)数据库
一个消息在知足以下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机能够对应不少队列。springboot
一个消息被Consumer拒收了,而且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其余消费者使用。微信
上面的消息的TTL到了,消息过时了。dom
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。ide
死信交换机就是普通的交换机,只是由于咱们把过时的消息扔进去,因此叫死信交换机,并非说死信交换机是某种特定的交换机微信支付
消息的TTL就是消息的存活时间。RabbitMQ能够对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也能够对每个单独的消息作单独的设置。超过了这个时间,咱们认为这个消息就死了,称之为死信。若是队列设置了,消息也设置了,那么会取小的。因此一个消息若是被路由到不一样的队列中,这个消息死亡的时间有可能不同(不一样的队列设置)。这里单讲单个消息的TTL,由于它才是实现延迟任务的关键。ui
byte[] messageBodyBytes = "Hello, world!".getBytes(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60000"); channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);
能够经过设置消息的expiration字段或者x-message-ttl属性来设置时间,二者是同样的效果。只是expiration字段是字符串参数,因此要写个int类型的字符串:当上面的消息扔到队列中后,过了60秒,若是没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去3d
如图所示,就是建立一个普通的交换机,这里为了方便区分,把交换机的名字取为:delay
这个队列的主要做用是让消息定时过时的,好比咱们须要2小时候关闭订单,咱们就须要把消息放进这个队列里面,把消息过时时间设置为2小时建立一个一个名为delay_queue1的自动过时的队列,固然图片上面的参数并不会让消息自动过时,由于咱们并无设置x-message-ttl参数,若是整个队列的消息有消息都是相同的,能够设置,这里为了灵活,因此并无设置,另外两个参数x-dead-letter-exchange表明消息过时后,消息要进入的交换机,这里配置的是delay,也就是死信交换机,x-dead-letter-routing-key是配置消息过时后,进入死信交换机的routing-key,跟发送消息的routing-key一个道理,根据这个key将消息放入不一样的队列
这个队列才是真正处理消息的队列,全部进入这个队列的消息都会被处理消息队列的名字为delay_queue2
进入交换机详情页面,将建立的2个队列(delayqueue1和delayqueue2)绑定到交换机上面自动过时消息队列的routing key 设置为delay
绑定delayqueue2delayqueue2 的key要设置为建立自动过时的队列的x-dead-letter-routing-key参数,这样当消息过时的时候就能够自动把消息放入delay_queue2这个队列中了
绑定后的管理页面以下图:
固然这个绑定也可使用代码来实现,只是为了直观表现,因此本文使用的管理平台来操做
发送消息String msg = "hello word"; MessageProperties messageProperties = newMessageProperties(); messageProperties.setExpiration("6000"); messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes()); Message message = newMessage(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend("delay", "delay",message);
主要的代码就是
messageProperties.setExpiration("6000");
设置了让消息6秒后过时
注意:由于要让消息自动过时,因此必定不能设置delay_queue1的监听,不能让这个队列里面的消息被接受到,不然消息一旦被消费,就不存在过时了
接收消息接收消息配置好delay_queue2的监听就行了
package wang.raye.rabbitmq.demo1; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DelayQueue { /** 消息交换机的名字*/ public static final String EXCHANGE = "delay"; /** 队列key1*/ public static final String ROUTINGKEY1 = "delay"; /** 队列key2*/ public static final String ROUTINGKEY2 = "delay_key"; /** * 配置连接信息 * @return */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672); connectionFactory.setUsername("kberp"); connectionFactory.setPassword("kberp"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); // 必需要设置 return connectionFactory; } /** * 配置消息交换机 * 针对消费者配置 FanoutExchange: 将消息分发到全部的绑定队列,无routingkey的概念 HeadersExchange :经过添加属性key-value匹配 DirectExchange:按照routingkey分发到指定队列 TopicExchange:多关键字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE, true, false); } /** * 配置消息队列2 * 针对消费者配置 * @return */ @Bean public Queue queue() { return new Queue("delay_queue2", true); //队列持久 } /** * 将消息队列2与交换机绑定 * 针对消费者配置 * @return */ @Bean @Autowired public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2); } /** * 接受消息的监听,这个监听会接受消息队列1的消息 * 针对消费者配置 * @return */ @Bean @Autowired public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("delay_queue2 收到消息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费 } }); return container; } }
在消息监听中处理须要定时处理的任务就行了,由于Rabbitmq能发送消息,因此能够把任务特征码发过来,好比关闭订单就把订单id发过来,这样就避免了须要查询一下那些订单须要关闭而加剧MySQL负担了,毕竟一旦订单量大的话,查询自己也是一件很费IO的事情
总结基于Rabbitmq实现定时任务,就是将消息设置一个过时时间,放入一个没有读取的队列中,让消息过时后自动转入另一个队列中,监控这个队列消息的监听处来处理定时任务具体的操做