本篇随笔将汇总一些我对消息队列 RabbitMQ 的认识,顺便谈谈其在高并发和秒杀系统中的具体应用。
html
想了下,仍是先抛出一个简单示例,随后再根据其具体应用场景进行扩展,我以为这样表述条理更清晰些。java
RabbitConfig:缓存
@Configuration public class RabbitConfig { @Bean public Queue callQueue() { return new Queue(MQConstant.CALL); } }
Client:服务器
@Component public class Client { @Autowired private RabbitTemplate rabbitTemplate; public void sendCall(String content) { for (int i = 0; i < 10000; i++) { String message = i + "-" + content; System.out.println(String.format("Sender: %s", message)); rabbitTemplate.convertAndSend(MQConstant.CALL, message); } } }
Server:微信
@Component public class Server { @RabbitHandler @RabbitListener(queues = MQConstant.CALL) public void callProcess(String message) throws InterruptedException { Thread.sleep(1000); System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message)); } }
Result:多线程
Sender: Hello, are you there! Receiver: reply("Hello, are you there!") Yes, I just saw your message!
以上示例会在 rabbitmq 中建立一条队列 CALL, 消息在其中等待消费:
并发
在此基础上的简单扩展我就再也不写案例了,好比领域模块完成了其核心业务规则以后可能须要更新缓存、写个邮件、记个复杂日志、作个统计报表等等,这些不须要及时反馈或者耗时的附属业务均可以经过异步队列分发,以此来提高核心业务的响应速度,同时如此处理能让领域边界更加清晰,代码的可维护性和持续拓展的能力也会有所提高。异步
上个示例中我提到的应用场景是解耦和通知,再接着扩展,因其具有良好的缓冲性质,因此还有一个很是适合的应用场景那就是削峰。对于突如其来的极高并发请求,咱们能够先瞬速地将其加入队列并回复用户一个友好提示,而后服务器可在其能承受的范围内慢慢处理,以此来防止突发的 CPU 和内存 “爆表”。async
改造以后对于发送方来讲固然是比较爽的,他只是将请求加入消息队列而已,处理压力都归到了消费端。接着思考,这样处理有没有反作用?若是这个请求恰好是线程阻塞的,那还要加入队列慢慢排队处理,那不是完蛋了,用户要猴年马月才能获得反馈?因此针对此,我以为应该将消费端的方法改成异步调用(即多线程)以提高吞吐量,在 Spring Boot 中的写法也很是简单:ide
@Component public class Server { @Async @RabbitHandler @RabbitListener(queues = MQConstant.CALL) public void callProcess(String message) throws InterruptedException { Thread.sleep(100); System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message)); } }
参照示例一的方法,我发布了 10000 条消息加入队列,且消费端的调用每次阻塞一秒,那可有意思了,何时能处理完?但若是开几百个线程同时处理的话,那几十秒就够了,固然具体多少合适还应根据具体的业务场景和服务器配置酌情考虑。另外,别忘了配线程池:
@Configuration public class AsyncConfig { @Bean public Executor asyncExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(500); executor.setQueueCapacity(10); executor.setThreadNamePrefix("MyExecutor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
RabbitMQ 可能为 N 个应用同时提供服务,要是你和你的蓝颜知己忽然心有灵犀,在不一样的业务上使用了同一个 routingKey,想一想就刺激。所以,队列多了天然要进行分组管理,限定好 Exchange 的规则,接下来就能够独自玩耍了。
MQConstant:
public class MQConstant { public static final String EXCHANGE = "YOUCLK-MESSAGE-EXCHANGE"; public static final String CALL = MQConstant.EXCHANGE + ".CALL"; public static final String ALL = MQConstant.EXCHANGE + ".#"; }
RabbitConfig:
@Configuration public class RabbitConfig { @Bean public Queue callQueue() { return new Queue(MQConstant.CALL); } @Bean TopicExchange exchange() { return new TopicExchange(MQConstant.EXCHANGE); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with(MQConstant.ALL); } }
此时咱们再去查队列 CALL,能够看到已经绑定了Exchange:
固然 Exchange 的做用远不止如此,以上示例为 Topic 模式,除此以外还有 Direct、Headers 和 Fanout 模式,写法都差很少,感兴趣的童鞋能够去查看 “官方文档” 进行更深刻了解。
延时任务的场景相信小伙伴们都接触过,特别是抢购的时候,在规定时间内未付款订单就被回收了。微信支付的 API 里面也有一个支付完成后的延时再确认消息推送,实现原理应该都差很少。
利用 RabbitMQ 实现该功能首先要了解他的两个特性,分别是 Time-To-Live Extensions 和 Dead Letter Exchanges,字面意思上就能理解个大概,一个是生存时间,一个是死信。整个过程也很容易理解,TTL 至关于一个缓冲队列,等待其过时以后消息会由 DLX 转发到实际消费队列,如此便实现了他的延时过程。
MQConstant:
public class MQConstant { public static final String PER_DELAY_EXCHANGE = "PER_DELAY_EXCHANGE"; public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE"; public static final String DELAY_CALL_TTL = "DELAY_CALL_TTL"; public static final String CALL = "CALL"; }
ExpirationMessagePostProcessor:
public class ExpirationMessagePostProcessor implements MessagePostProcessor { private final Long ttl; public ExpirationMessagePostProcessor(Long ttl) { this.ttl = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties() .setExpiration(ttl.toString()); return message; } }
Client:
@Component public class Client { @Autowired private RabbitTemplate rabbitTemplate; public void sendCall(String content) { for (int i = 1; i <= 3; i++) { long expiration = i * 5000; String message = i + "-" + content; System.out.println(String.format("Sender: %s", message)); rabbitTemplate.convertAndSend(MQConstant.DELAY_CALL_TTL, (Object) message, new ExpirationMessagePostProcessor(expiration)); } } }
Server:
@Component public class Server { @Async @RabbitHandler @RabbitListener(queues = MQConstant.CALL) public void callProcess(String message) throws InterruptedException { String date = (new SimpleDateFormat("HH:mm:ss")).format(new Date()); System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!- %s", message, date)); } }
Result:
Sender: 1-Hello, are you there! Sender: 2-Hello, are you there! Sender: 3-Hello, are you there! Receiver: reply("1-Hello, are you there!") Yes, I just saw your message!- 23:04:12 Receiver: reply("2-Hello, are you there!") Yes, I just saw your message!- 23:04:17 Receiver: reply("3-Hello, are you there!") Yes, I just saw your message!- 23:04:22
结果一目了然,分别在队列中延迟了 5秒,10秒,15秒,固然,以上只是个人简单示例,童鞋们可翻阅官方文档(“ ttl ” && “ dlx ”)进一步深刻学习。
本篇随笔不应就这么结束,但晚上心情很差,百感交集,没法继续写做,无奈至此。近期正在寻觅新的工做机会,个人微信:youclk,不管有没有推荐的,给我点鼓励,谢谢!
个人公众号《有刻》,咱们共同成长!