使用了以下版本的MQ客户端版本:ide
<!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client --> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.6.2.Final</version> </dependency>
MQ Consumer 端对消息成功消费后,若是重启 Consumer,会对消息再次消费。但注意到每次的 reconsumeTimes=0 。this
即 consumer 从 broker 中成功消费消息后,并无将结果 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 返回。spa
如下为 consumer 配置信息。线程
@PostConstruct public void init() throws MQClientException { log.info("构建 MQ Consumer..."); this.consumer = new DefaultMQPushConsumer(mqGroupName); consumer.setNamesrvAddr(nameServer); consumer.setInstanceName(instance); //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息 //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)所有消费一遍 //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时之前 //若是非第一次启动,按照上次消费的位置继续消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe(topic, TAG); //消费线程数量 consumer.setConsumeThreadMax(2); consumer.setConsumeThreadMin(2); consumer.setPersistConsumerOffsetInterval(1000); log.info("MQ Consume From Where is {}", consumer.getConsumeFromWhere()); } /** * 注册监听,并启动MQ * * @param listener 消费监听 */ public void consumeConcurrently(MessageListenerConcurrently listener) { try { consumer.registerMessageListener(listener); consumer.start(); } catch (MQClientException e) { log.error(e.getMessage(), e); } }
@Override public void run(String... args) throws Exception { log.info("Alarm Report MQ Consumer Start ..."); mqConsumer.consumeConcurrently((messageExtList, context) -> { try { ······ 略 } catch (Exception e) { //异常捕获,确保不会由于程序异常致使的消息重复消费 log.error(e.getMessage(), e); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); }
Mq Consuemr 选择 ConsumeMessageConcurrentlyService 对 broker 消息进行消费。指针
ConsumeMessageConcurrentlyService类,经过提交内部线程类对 broker 消息进行监听消费code
在内部类 ConsumeRequest的 run 方法中,有以下逻辑:队列
class : com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); ······略 } ······略 try { ······略,对消息监听的执行 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { ······略 } ······略 //当前行,空指针异常······ consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE,returnType.name()); if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", // ConsumeMessageConcurrentlyService.this.consumerGroup, // msgs, // messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; }
consumer 在初始化时,向其默认的实现类注册 Hookget
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageHook() { @Override public String hookName() { return null; } @Override public void consumeMessageBefore(ConsumeMessageContext context) { } @Override public void consumeMessageAfter(ConsumeMessageContext context) { } });
不须要注册 Hook,又不想注册 Hook 的,能够选择其它客户端版本,如:it
<!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client --> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.5.8</version> </dependency>
这个Final版本,略坑!io