点击蓝色“架构文摘”关注我哟php
加个“星标”,天天上午 09:25,干货推送!mysql
https://juejin.im/post/5ea159e4f265da47f0794da5git
MQ的主要特色为解耦、异步、削峰,该文章主要记录与分享我的在实际项目中的RocketMQ削峰用法,用于减小数据库压力的业务场景,其中RocketMQ的核心组件概念以下:github
Producer:生产发送消息spring
Broker:存储Producer发送过来的消息sql
Consumer:从Broker拉取消息并进行消费数据库
NameServer:为Producer或Consumer路由到Brokerapache
其中消费流程有如下几点是必须注意的:后端
RocketMQ的Consumer获取消息是经过向Broker发送拉取请求获取的,而不是由Broker发送Consumer接收的方式。网络
Consumer每次拉取消息时消息都会被均匀分发到消息队列再进行传输,因此RocketMQ中的不少参数都是针对队列而不是Topic的(这个是重点,顺便吐槽下源码的文档讲的真不清晰,不少都须要本身试错,但Dashboard作得很好),其中每一个Broker消息队列(ConsumeQueue)的数量均可以经过RocketMQ DashBoard实时更改调整。
当开发中须要快速集成RocketMQ时能够考虑使用 rocketmq-spring-boot-starter 搭建RocketMQ的集成环境,但该框架并不彻底具有RocketMQ全部的配置简化,如需批量消费消息便须要自定义一个DefaultMQPushConsumer bean去消费了。我的在开发中经常使用的rocketmq-spring-boot-starter
相关类:
RocketMQListener
接口:消费者都需实现该接口的消费方法onMessage(msg)
。
RocketMQPushConsumerLifecycleListener
接口:当@RocketMQMessageListener
中的配置不足以知足咱们的需求时,能够实现该接口直接更改消费者类DefaultMQPushConsumer
配置
@RocketMQMessageListener
:被该注解标注并实现了接口RocketMQListener
的bean为一个消费者并监听指定topic队列中的消息,该注解中包含消费者的一些经常使用配置(大部分按默认便可),通常只需更改consumerGroup(消费组)与topic。RocketMQMessageListener
中的属性配置是可使用Placeholder(占位符)从配置文件或配置中心获取的,以下图:
有一个点赞业务,不限制用户的点赞数只需进行记录(产品需求,开发提议无效),当每一个用户都进行x连击享受数量猛增的快感时若是数据库都须要进行x个点赞数据的插入,数据库毫无疑问会塞死致使崩溃。因而想到能够尝试下MQ削峰,好比每秒来了5000消息但数据库只能承受2000,那我消费时每次只拉取消费1600就行了,剩下的放在Broker堆积慢慢消费就好。因为以前的消息中心也在用RocketMQ,因而确认使用RocketMQ来进行削峰。
文章例子环境:1NameServer + 2Broker + 1Consumer
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> </dependency> 复制代码
rocketmq: name-server: 127.0.0.1:9876 producer: group: praise-group server: port: 10000 spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver username: root password: tiger url: jdbc:mysql://localhost:3306/wilson swagger: docket: base-package: io.rocket.consumer.controller 复制代码
PraiseRecord(点赞记录):
@Data public class PraiseRecord implements Serializable { private Long id; private Long uid; private Long liveId; private LocalDateTime createTime; } 复制代码
MessageController(简单的测试接口):
RestController @RequestMapping("/message") public class MessageController { @Resource private RocketMQTemplate rocketMQTemplate; @PostMapping("/praise") public ServerResponse praise(@RequestBody PraiseRecordVO vo) { rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC, MessageBuilder.withPayload(vo).build()); return ServerResponse.success(); } // ...... } 复制代码
因为用户能够连续点赞,因此考虑能够在点赞消息的处理上宽松一点(允许消息丢失)以追求更高的性能,所以选择使用sendOneyWay()
进行消息发送。
RocketMQ的消息发送方式主要含syncSend()同步发送、asyncSend()异步发送、sendOneWay()三种方式,sendOneWay()也是异步发送,区别在于不需等待Broker返回确认,因此可能会存在信息丢失的情况,但吞吐量更高,具体需根据业务状况选用。
性能:sendOneWay > asyncSend > syncSend RocketMQTemplate的send()方法默认是同步(syncSend)的,更多可看源码实现。
@Service @RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC, consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER) @Slf4j public class PraiseListener implements RocketMQListener<PraiseRecordVO>, RocketMQPushConsumerLifecycleListener { @Resource private PraiseRecordService praiseRecordService; @Override public void onMessage(PraiseRecordVO vo) { praiseRecordService.insert(vo.copyProperties(PraiseRecord::new)); } @Override public void prepareStart(DefaultMQPushConsumer consumer) { // 每次拉取的间隔,单位为毫秒 consumer.setPullInterval(2000); // 设置每次从队列中拉取的消息数为16 consumer.setPullBatchSize(16); } }
单次pull消息的最大数目受broker存储的MessageStoreConfig.maxTransferCountOnMessageInMemory
(默认为32)值限制,即若想要消费者从队列拉取的消息数大于32有效(pullBatchSize>32)则需更改Broker的启动参数maxTransferCountOnMessageInMemory
值。在MQ削峰的配置参数里,如下几个DefaultMQPushConsumer
的参数是须要注意一下的:
pullInterval:每次从Broker拉取消息的间隔,单位为毫秒
pullBatchSize:每次从Broker队列拉取到的消息数,该参数很容易让人误解,一开始我觉得是每次拉取的消息总数,但测试过几回后确认了实质上是从每一个队列的拉取数(源码上的注释文档真的不好,跟没有同样),即Consume每次拉取的消息总数以下:EachPullTotal=全部Broker上的写队列数和(writeQueueNums=readQueueNums) * pullBatchSize
consumeMessageBatchMaxSize:每次消费(即将多条消息合并为List消费)的最大消息数目,默认值为1,rocketmq-spring-boot-starter 目前不支持批量消费(2.1.0版本)
在消费者开始消息消费时会先从各队列中拉取一条消息进行消费,消费成功后再以每次pullBatchSize的数目进行拉取。
PraiseListener中设置了每次拉取的间隔为2s,每次从队列拉取的消息数为16,在搭建了2master broker且broker上writeQueueNums=readQueueNums=4的环境下每次拉取的消息理论数值为16 * 2 * 4 = 128,在第一次从各队列拉取1条消息(即共8条)后消费成功后会每次就会拉取最多128条消息进行消费,想验证下的能够把onMessage()的insert()改成log.info("1")而后统计单位秒内打印的日志数是否为128。
根据以上配置单Conumer状况下每2s理论消费为128,即每2秒数据库新增的点赞数据大概为128条左右,有20%误差都在我的可接受范围内,而后对点赞接口进行简单压测1s 2000请求校验MQ效果,根据消费配置理论上须要16次拉取即需32s才能消费完,压测后查看数据库校验效果:
由上图能够看出除第一次2s和最后一次2s外数据库每2s的插入数据数和通常都在128附近波动,也用了34s(因第一次拉取数较少因此比理论多花费一次拉取)消费的误差大小可能会受每次拉取数pullBatchSize、Broker上的消息队列数、网络波动等状况影响,但须要的目的已经达到了,我只想把单位时间内过多的数据库操做交给MQ作分隔成多个单位时间内的小批量操做,消息过多就堆积,当请求峰值过了后直到MQ堆积的消息消费完前数据库的插入数依旧会与峰值期的插入数相差不大,达到了MQ削峰填谷的效果。
当把拉取数pullBatchSize设置Broker的默认最大传输值32了,线上又不想重启Broker更改maxTransferCountOnMessageInMemory参数,若有2个Broker且queue都为4,那么拉取消费效率才为32 * 2 * 4 = 256,若是想要动态调整,能够从Broker数或Broker队列数下手,能够将Broker的writeQueueNums、readQueueNums增大,如都改成8,那么效率就成了32 * 2 * 8 = 512。
须要注意的是更改完queues后必须去Dashboard的Topic下的CONSUMER MANAGER查看新增的队列上是否都有Consumer成功注册上去了,由于遇到了在测试与生产上使用rocketmq-spring-boot-starter @RocketMQListener标注消费者不会自动注册到新队列上的状况,但没排除是否是RocketMQ版本的缘由(我的本地的版本比环境上的高了一个小版本0.0.1,本地没出现没消费者注册到新队列上的问题),而是使用了自定义DefaultMQPushConsumer bean(原生的方式都是没有问题的)的备用方案。当再启动新的消费者应用时CONSUMER MANAGER(下图)中就会出现 新Consumer数 * 各Broker队列数和的队列行。
虽然点赞业务使用MQ单条插入后TPS已经达到当前业务指标要求了,但考虑到若是后续要求在不添加机器数的状况下增长TPS,且数据量还没到分库分表的程度,我的就打算从批量消费下手,由一次插入一条点赞记录改成一次性插入多条(insertBatch)。固然能知足现有需求能不作确定不作的,过分优化过度碍事,但想多点方案不会坏事。
rocketmq-spring-boot-starter并无提供批量消费的功能,因此要批量消费消息须要自定义DefaultMQPushConsumer
并配置其consumeMessageBatchMaxSize
属性。consumeMessageBatchMaxSize
属性默认值为1,即每次只消费一条消息,须要注意的是该属性也会受pullBatchSize
影响,若是consumeMessageBatchMaxSize
为32但pullBatchSize
只为12,那么每次批量消费的最大消息数也就只有12。以下为我的测试批量消费Consumer的测试bean:
@Bean public DefaultMQPushConsumer userMQPushConsumer() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ConsumerGroup.SPRING_BOOT_USER_CONSUMER); consumer.setNamesrvAddr(nameServer); consumer.subscribe(RocketConstant.Topic.SPRING_BOOT_USER_TOPIC, "*"); // 设置每次消息拉取的时间间隔,单位毫秒 consumer.setPullInterval(1000); // 设置每一个队列每次拉取的最大消息数 consumer.setPullBatchSize(24); // 设置消费者单次批量消费的消息数目上限 consumer.setConsumeMessageBatchMaxSize(12); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { List<UserInfo> userInfos = new ArrayList<>(msgs.size()); Map<Integer, Integer> queueMsgMap = new HashMap<>(8); msgs.forEach(msg -> { userInfos.add(JSONObject.parseObject(msg.getBody(), UserInfo.class)); queueMsgMap.compute(msg.getQueueId(), (key, val) -> val == null ? 1 : ++val); }); log.info("userInfo size: {}, content: {}", userInfos.size(), userInfos); /* 处理批量消息,如批量插入:userInfoMapper.insertBatch(userInfos); */ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); return consumer; }
若是默认配置状况下log打印出的userInfo size恒为1,但因为设置了consumeMessageBatchMaxSize
与pullBatchSize
,且pullBatchSize
较小,因此每次消费的消息数最大值为12,以下图:
确保mqnamesrv与mqbroker已启动成功,如该文章环境的启动:
mqnamesrv -n 127.0.0.1:9876 mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-a.properties mqbroker -c E:\RocketMQ\rocketmq-all-4.5.2-bin-release\bin\2m-noslave\broker-b.properties
RocketMQ DashBoard启动流程可参考官方github文档或到个人资源里下载jar包运行
源码地址(https://github.com/Wilson-He/spring-boot-series/tree/master/spring-rocketmq),2m-noslave目录是该文章中例子中的2master broker配置与启动脚本,spring-boot-consumer-peak目录为包含该文章相关代码的实际例子
end
推荐阅读:
若有收获,点个在看,诚挚感谢