https://zhuanlan.zhihu.com/p/25140744 中剖析过,consumer的每一个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的(RocketMQ有保证消息确定消费成功的特性(失败则重试)?java
本文将详细解析消息具体是如何ack的,又是如何保证消费确定成功的。git
因为以上工做全部的机制都实如今PushConsumer中,因此本文的原理均只适用于RocketMQ中的PushConsumer即Java客户端中的DefaultPushConsumer。 若使用了PullConsumer模式,相似的工做如何ack,如何保证消费等均须要使用方本身实现。github
注:广播消费和集群消费的处理有部分区别,如下均特指集群消费(CLSUTER),广播(BROADCASTING)下部分可能不适用。数据库
PushConsumer为了保证消息确定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会从新投递。缓存
消费的时候,咱们须要注入一个消费回调,具体sample代码以下:运维
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); doMyJob();//执行真正消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。(具体如何ACK见后面章节)ide
若是这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息须要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。函数
为了保证消息是确定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费租的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而若是一直这样重复消费都持续失败到必定次数(默认16次),就会投递到DLQ死信队列。应用能够监控死信队列来作人工干预。oop
注:性能
当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起本身的第一次Pull请求。
若是这个消费进度在Broker并无存储起来,证实这个是一个全新的消费组,这时候客户端有几个策略能够选择:
CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息 CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)所有消费一遍 CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时之前
因此,社区中常常有人问:“为何我设了CONSUME_FROM_LAST_OFFSET,历史的消息仍是被消费了”? 缘由就在于只有全新的消费组才会使用到这些策略,老的消费组都是按已经存储过的消费进度继续消费。
对于老消费组想跳过历史消息能够采用如下两种方法:
RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。
若是某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就能够判断第一次是从哪里开始拉取的。
每次消息成功后,本地的消费进度会被更新,而后由定时器定时同步到broker,以此持久化消费进度。
可是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值,以下图:
这钟方式和传统的一条message单独ack的方式有本质的区别。性能上提高的同时,会带来一个潜在的重复问题——因为消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的状况。
在这种状况下,RocketMQ为了保证消息确定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才会一会儿更新到2200。
在这种设计下,就有消费大量重复的风险。如2101在尚未消费完成的时候消费实例忽然退出(机器断电,或者被kill)。这条queue的消费进度仍是维持在2101,当queue从新分配给新的实例的时候,新的实例从broker上拿到的消费进度仍是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过仍是会投递一次。
对于这个场景,3.2.6以前的RocketMQ无能为力,因此业务必需要保证消息消费的幂等性,这也是RocketMQ官方屡次强调的态度。
实际上,从源码的角度上看,RocketMQ多是考虑过这个问题的,截止到3.2.6的版本的源码中,能够看到为了缓解这个问题的影响面,DefaultMQPushConsumer中有个配置consumeConcurrentlyMaxSpan
/** * Concurrently max span offset.it has no effect on sequential consumption */ private int consumeConcurrentlyMaxSpan = 2000;
这个值默认是2000,当RocketMQ发现本地缓存的消息的最大值-最小值差距大于这个值(2000)的时候,会触发流控——也就是说若是头尾都卡住了部分消息,达到了这个阈值就再也不拉取消息。
但做用实际颇有限,像刚刚这个例子,2101的消费是死循环,其余消费很是正常的话,是无能为力的。一旦退出,在不人工干预的状况下,2101后全部消息所有重复。
对于这个卡消费进度的问题,最显而易见的解法是设定一个超时时间,达到超时时间的那个消费看成消费失败处理。
后来RocketMQ显然也发现了这个问题,而RocketMQ在3.5.8以后也就是采用这样的方案去解决这个问题。
核心源码以下:
//ConsumeMessageConcurrentlyService.java public void start() { this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { @Override public void run() { cleanExpireMsg(); } }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); } //ConsumeMessageConcurrentlyService.java private void cleanExpireMsg() { Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator(); while (it.hasNext()) { Map.Entry<MessageQueue, ProcessQueue> next = it.next(); ProcessQueue pq = next.getValue(); pq.cleanExpiredMsg(this.defaultMQPushConsumer); } } //ProcessQueue.java public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { return; } int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16; for (int i = 0; i < loop; i++) { MessageExt msg = null; try { this.lockTreeMap.readLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) { msg = msgTreeMap.firstEntry().getValue(); } else { break; } } finally { this.lockTreeMap.readLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); } try { pushConsumer.sendMessageBack(msg, 3); log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); try { this.lockTreeMap.writeLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { try { msgTreeMap.remove(msgTreeMap.firstKey()); } catch (Exception e) { log.error("send expired msg exception", e); } } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); } } catch (Exception e) { log.error("send expired msg exception", e); } } }
经过源码看这个方案,其实能够看出有几个不太完善的问题: