那么今日份就来了解一下怎么用RabbitMQ
搞定分布式下的超时订单html
没错,做为一名有追求的程序员,咱们接着上一篇文章实战|我仍是很建议你用DelayQueue搞定超时订单的-(1) Go on。java
傲娇的RabbitMQ官网赫然写着:git
RabbitMQ is the most widely deployed open source message broker.
复制代码
因而可知,RabbitMQ是一个消息中间件,生产者生成消息,消费者消费消息,它遵循AMQP(高级消息队列协议),是最普遍部署的开源消息代理。 因此,今天我用RabbitMQ为你们捣鼓一下延迟队列。程序员
使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,经过这二者的组合来实现上述需求。github
消息的TTL
就是消息的存活时间。RabbitMQ 能够对队列和消息分别设置TTL
。对队列设置就是队列没有消费者连着的保留时间,也能够对每个单独的消息作单独的设置。超过了这个时间,咱们认为这个消息就死了,称之为死信。若是队列设置了,消息也设置了,那么会取小的(谁小谁尴尬
)。因此一个消息若是被路由到不一样的队列中,这个消息死亡的时间有可能不同(不一样的队列设置)。这里单讲单个消息的TTL,由于它才是实现延迟任务的关键。windows
那么,如何设置这个TTL值呢?有两种方式,第一种是在建立队列的时候设置队列的"x-message-ttl"
属性,以下:api
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
复制代码
这样全部被投递到该队列的消息都最多不会存活超过6s。bash
另外一种方式即是针对每条消息设置TTL,代码以下:app
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
复制代码
这样这条消息的过时时间也被设置成了6s。异步
但这两种方式是有区别的,若是设置了队列的TTL属性,那么一旦消息过时,就会被队列丢弃,而第二种方式,消息即便过时,也不必定会被立刻丢弃,由于消息是否过时是在即将投递到消费者以前断定的,若是当前队列有严重的消息积压状况,则已过时的消息也许还能存活较长时间。 另外,还须要注意的一点是,若是不设置TTL,表示消息永远不会过时,若是将TTL设置为0,则表示除非此时能够直接投递该消息到消费者,不然该消息将会被丢弃。
单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange
。
Exchage
的概念在这里就不在赘述。一个消息在知足以下条件下,会进死信路由,记住这里是路由而不是队列,一个路由能够对应不少队列。
Consumer
拒收了,而且reject
方法的参数里requeue
是false
。也就是说不会被再次放在队列里,被其余消费者使用。TTL
到了,消息就过时了。Dead Letter Exchange
其实就是一种普通的exchange
,和建立其余exchange
没有两样。只是在某一个设置Dead Letter Exchange
的队列中有消息过时了,会自动触发消息的转发,发送到Dead Letter Exchange
中去。
延迟任务经过消息的TTL
和Dead Letter Exchange
来实现。咱们须要创建2个队列,一个用于发送消息,一个用于消息过时后的转发目标队列。
生产者生产一条延时消息,根据须要延时时间的不一样,利用不一样的routingkey
将消息路由到不一样的延时队列,每一个队列都设置了不一样的TTL
属性,并绑定在同一个死信交换机中,消息过时后,根据routingkey
的不一样,又会被路由到不一样的死信队列中,消费者只须要监听对应的死信队列进行处理便可。
rabbitmq-plugins enable rabbitmq_management
复制代码
开启Web管理插件,而后启动rabbitmq-server
访问http://localhost:15672/#/
,输入密令后你能看到就能够啦.
在 RabbitMQ 3.6.x
以前咱们通常采用死信队列(DLX)+TTL过时时间
来实现延迟队列,咱们这里不作过多介绍,能够参考其余道友的:TTL+DLX
实现方式。
在 RabbitMQ 3.6.x
开始(如今都3.8.+
了),RabbitMQ 官方提供了延迟队列的插件,能够下载放置到 RabbitMQ 根目录下的 plugins 下。延迟队列插件下载地址:
3.7.x
的,可是3.8.0
是向下兼容3.7.x
的,而后我又在Bintray
找到了3.7.x
,你们信不过就找对应的版本插件哈....plugins
的目录中,运行以下命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码
#集成 rabbitmq
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
connection-timeout: 150000
publisher-confirms: true #开启确认机制 采用消息确认模式,
publisher-returns: true #开启return确认机制
template: #消息发出去后,异步等待响应
mandatory: true #设置为 true 后,消费者在消息没有被路由到合适队列状况下会被return监听,而不会自动删除
复制代码
@Configuration
public class MQConfig {
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
public static final String DELAY_EXCHANGE = "Ex.DelayExchange";
public static final String DELAY_QUEUE = "MQ.DelayQueue";
public static final String DELAY_KEY = "delay.#";
/** * 延时交换机 * * @return TopicExchange */
@Bean
public TopicExchange delayExchange() {
Map<String, Object> pros = new HashMap<>(3);
//设置交换机支持延迟消息推送
pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(DELAY_EXCHANGE, true, false, pros);
//咱们在也能够在 Exchange 的声明中能够设置exchange.setDelayed(true)来开启延迟队列
exchange.setDelayed(true);
return exchange;
}
/** * 延时队列 * * @return Queue */
@Bean
public Queue delayQueue() {
return new Queue(DELAY_QUEUE, true);
}
/** * 绑定队列和交换机,以及设定路由规则key * * @return Binding */
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_KEY);
}
}
复制代码
/** * @author LiJing * @ClassName: MQSender * @Description: MQ发送 生产者 * @date 2019/10/9 11:50 */
@Component
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlationData: " + correlationData);
System.out.println("ack: " + ack);
if (!ack) {
System.out.println("异常处理....");
}
}
};
final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange , String routingKey) {
System.out.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
public void sendDelay(Object message, int delayTime) {
//采用消息确认模式,消息发出去后,异步等待响应
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 时间戳 全局惟一
CorrelationData correlationData = new CorrelationData("delay" + System.nanoTime());
//发送消息时指定 header 延迟时间
rabbitTemplate.convertAndSend(MQConfig.DELAY_EXCHANGE, "delay.boot", message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 两种方式 都可
//message.getMessageProperties().setHeader("x-delay", "6000");
message.getMessageProperties().setDelay(delayTime);
return message;
}
}, correlationData);
}
}
复制代码
/** * @author LiJing * @ClassName: MQReceiver * @Description: 消费者 * @date 2019/10/9 11:51 */
@Component
@Slf4j
public class MQReceiver {
@RabbitListener(queues = MQConfig.DELAY_QUEUE)
@RabbitHandler
public void onDelayMessage(Message msg, Channel channel) throws IOException {
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, true);
System.out.println("延迟队列在" + LocalDateTime.now()+"时间," + "延迟后收到消息:" + new String(msg.getBody()));
}
}
复制代码
5.建立一个mq的测试控制器
@RestController
@RequestMapping("/mq")
public class MqController extends AbstractController {
@Autowired
private MQSender mqSender;
@GetMapping(value = "/send/delay")
public void sendDelay(int delayTime) {
String msg = "hello delay";
System.out.println("发送开始时间:" + LocalDateTime.now() + "测试发送delay消息====>" + msg);
mqSender.sendDelay(msg, delayTime);
}
}
复制代码
http://localhost:8080/api/mq/send/delay?delayTime=6000
http://localhost:8080/api/mq/send/delay?delayTime=10000
复制代码
延时队列在须要延时处理的场景下很是有用,使用RabbitMQ来实现延时队列,能够很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。
另外,经过RabbitMQ集群的特性,能够很好的解决单点故障问题,不会由于单个节点挂掉致使延时队列不可用或者消息丢失。
固然,延时队列还有不少其它选择,好比利用Redis的zset,Quartz或者利用kafka的时间轮,这些方式各有特色,但就像炉石传说通常,这些知识就比如手里的卡牌,知道的越多,能够用的卡牌也就越多,遇到问题便能游刃有余,因此须要大量的知识储备和经验积累才能打造出更出色的卡牌组合,让本身解决问题的能力获得更好的提高。
肥朝告诉我说:闻道有前后,术业有专攻,达者为师。
那今日份的讲解就到此结束,具体的代码请移步个人gitHub的mybot项目888分支查阅,fork体验一把,或者评论区留言探讨,写的很差,请多多指教~~