什么是MQ 跨进程的消息队列,主要角色包括生产者与消费者。 生产者只负责生产信息,没法感知消费者是谁,消息怎么处理,处理结果是什么。 消费者负责接收及处理消息,没法感知生产者是谁,怎么产生的。html
Mq能作什么? MQ 特性通常有异步,吞吐量大 ,延时低; 适合作:安全
因为MQ是异步处理消息的,因此MQ不适合作同步处理操做,若是须要及时的返回处理结果请不要用MQ;bash
MQ 个系统带来了什么?微信
缺点:增长了系统的复杂性,除了代码组件接入之外还须要考虑,高可用,集群,消息的可靠性等问题!markdown
生产者:消息发送怎么保证可靠性,怎么保证不重复!app
消费者:怎么保证幂等性,接收到重复消息怎么处理!异步
还有会带来的处理延时等问题!分布式
优势: 解耦,利用MQ咱们能够很好的给咱们系统解耦,特别是分布式/微服系统! 原来的同步操做,能够用异步处理,也能够带来更快的响应速度;oop
场景 (1) 系统解耦,用户系统或者其余系统须要发送短信能够经过 MQ 执行;很好的将 用户系统 和 短信系统进行解耦;post
场景(2)
顺序执行的任务场景,假设 A B C 三个任务,B须要等待 A完成才去执行,C须要等待B完成才去执行;
我见过一些同窗的作法是 ,用 三个定时器 错开时间去执行的,假设 A定时器 9 点执行, B 定时器 10 点执行 , C 11 点执行 , 相似这样子;
这样作实际上是 不安全的, 由于 后一个任务 没法知道 前一个任务是否 真的执行了! 假设 A 宕机了, 到 10 点 B 定时去 执行,这时候 数据就会产生异常!
当咱们 引入 MQ 后 能够这么作, A执行完了 发送 消息给 B ,B收到消息后 执行,C 相似,收到 B消息后执行;
场景(3)
支付网关的通知,咱们的系统经常须要接入支付功能,微信或者支付宝一般会以回调的形式通知咱们系统支付结果。
咱们能够将咱们的支付网关独立出来,经过MQ通知咱们业务系统进行处理,这样处理有利于系统的解耦和扩展!
假设咱们还有一个积分系统,用户支付成功,给用户添加积分。只须要积分系统监听这个消息,并处理积分就好,无需去修改再去修改网关层代码!
若是没有使用MQ ,我是否是还得去修改网关系统的代码,远程调用增长积分的接口?
这就是使用了MQ的好处,解耦和扩展!
固然咱们的转发规则也要保证每一个感兴趣的队列能获取到消息!
场景(4)
微服/分布式系统,分布式事务 - 最终一致性 处理方案!
场景(5)
咱们之前的作法是 一般启用一个定时器,每分钟或者每小时,去跑一次取出须要处理的订单或其余数据进行处理。 这种作法一个是 效率比较低,若是数据量大的话,每次都要扫库,很是要命! 再者时效性不是很高,最差的时候可能须要等待一轮时长! 还有可能出现重复执行的结果,时效和轮询的频率难以平衡!
利用MQ(Rabbitmq),DLX (Dead Letter Exchanges)和 消息的 TTL (Time-To-Live Extensions)特性。咱们能够高效的完成这个任务场景!不须要扫库,时效性更好!
DLX:www.rabbitmq.com/dlx.html,
TTL:www.rabbitmq.com/ttl.html#pe…
原理: 发送到队列的消息,能够设置一个存活时间 TTL,在存活时间内没有被消费,能够设置这个消息转发到其余队列里面去;而后咱们从这个其余队列里面消费执行咱们的任务,这样就能够达到一个消息延时的效果!
设置过时时间: 过时时间能够统一设置到消息队列里面,也能够单独设置到某个消息!
PS 若是消息设置了过时时间,发生到了设置有过时时间的队列,已队列设置的过时时间为准!
已 SpringBoot 为例:
配置转发队列和被转发队列:
@Component @Configuration public class RabbitMqConfig { @Bean public Queue curQueue() { Map<String, Object> args = new HashMap<String, Object>(); //超时后的转发器 过时转发到 delay_queue_exchange args.put("x-dead-letter-exchange", "delay_queue_exchange"); //routingKey 转发规则 args.put("x-dead-letter-routing-key", "user.#"); //过时时间 20 秒 args.put("x-message-ttl", 20000); return new Queue("cur_queue", false, false, false, args); } @Bean public Queue delayQueue() { return new Queue("delay_queue"); } @Bean TopicExchange exchange() { //当前队列 return new TopicExchange("cur_queue_exchange"); } @Bean TopicExchange exchange2() { //被转发的队列 return new TopicExchange("delay_queue_exchange"); } @Bean Binding bindingHelloQueue(Queue curQueue, TopicExchange exchange) { //绑定队列到转发器 return BindingBuilder.bind(curQueue).to(exchange).with("user.#"); } @Bean Binding bindingHelloQueue2(Queue delayQueue, TopicExchange exchange2) { return BindingBuilder.bind(delayQueue).to(exchange2).with("user.#"); } } 复制代码
发生消息:
@Component public class MqEventSender { Logger logger = LoggerFactory.getLogger(MqEventSender.class); @Autowired private RabbitTemplate rabbitTemplate; /** * 消息没有设置 时间 * 发生到队列 cur_queue_exchange * @param msg */ public void sendMsg(String msg) { logger.info("发送消息: " + msg); rabbitTemplate.convertAndSend("cur_queue_exchange", "user.ss", msg); } /** * 消息设置时间 * 发生到队列 cur_queue_exchange * @param msg */ public void sendMsgWithTime(String msg) { logger.info("发送消息: " + msg); MessageProperties messageProperties = new MessageProperties(); //过时时间设置 10 秒 messageProperties.setExpiration("10000"); Message message = rabbitTemplate.getMessageConverter().toMessage(msg, messageProperties); rabbitTemplate.convertAndSend("cur_queue_exchange", "user.ss", message); } } 复制代码
消息监听:
监听 的队列是 delay_queue 而不是 cur_queue;
PS cur_queue 不该该有监听者,不然消息被消费达不到想要的延时消息效果!
/** * Created by linli on 2017/8/21. * 监听 被丢到 超时队列内容 */ @Component @RabbitListener(queues = "delay_queue") public class DelayQueueListener { public static Logger logger = LoggerFactory.getLogger(AddCommentsEventListener.class); @RabbitHandler public void process(@Payload String msg) { logger.info("收到消息 "+msg); } } 复制代码
测试:
/** * Created by linli on 2017/8/21. */ @RestController @RequestMapping("/test") public class TestContorller { @Autowired MqEventSender sender; @RequestMapping("/mq/delay") public String test() { sender.sendMsg("队列延时消息!"); sender.sendMsgWithTime("消息延时消息!"); return ""; } } 复制代码
结果:
观察结果发现:发送时间 和 收到时间 间隔 20秒 ;
咱们给消息设置的 10 秒 TTL 时间没有生效!验证了 : 若是消息设置了过时时间,发生到了设置有过时时间的队列,已队列设置的过时时间为准!
若是但愿每一个消息都要本身的存活时间,发送到队列 不要设置
args.put(“x-message-ttl”, 20000);
复制代码
消息的过时时间 设置在队列仍是消息,根据本身的业务场景去定!
MQ 是一个跨进程的消息队列,咱们能够很好的利用他进行系统的解耦; 引入MQ会给系统带来必定的复杂度,须要评估! MQ 适合作异步任务,不适合作同步任务!