本文咱们接着分析RocketMQ消息消费的逻辑。并发
接上文,DefaultMQPushConsumerImpl启动过程当中,启动了consumeMessageService消息消费线程。异步
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();复制代码
能够看到,是根据MessageListener的具体实现选择具体的consumeMessageService实现,咱们重点讲解并行消费服务ConsumeMessageConcurrentlyService。ide
首先看一下ConsumeMessageConcurrentlyService的成员变量,具体的解释写在注释上函数
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();复制代码
// 消费推模式实现类
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;复制代码
// 消费者引用
private final DefaultMQPushConsumer defaultMQPushConsumer;复制代码
// 并发消息事件监听回调
private final MessageListenerConcurrently messageListener;复制代码
// 消息消费任务队列
private final BlockingQueue<Runnable> consumeRequestQueue;复制代码
// 消息消费线程池
private final ThreadPoolExecutor consumeExecutor;复制代码
// 消息消费组
private final String consumerGroup;复制代码
// 添加消费任务到consumeExecutor定时调度器
private final ScheduledExecutorService scheduledExecutorService;复制代码
// 定时删除过时任务线程池
private final ScheduledExecutorService cleanExpireMsgExecutors;复制代码
接着看它的构造方法:性能
public ConsumeMessageConcurrentlyService(
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {复制代码
// 初始化defaultMQPushConsumerImpl,messageListener
// 本地引用指向外部具体实现
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();复制代码
// 消费者组
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
// 初始化消费请求队列为LinkedBlockingQueue无界队列
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();复制代码
// 初始化线程池,指向消费调度线程池
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));复制代码
// 初始化消费定时任务线程池,线程数=1
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
// 初始化清除过时消息线程池,线程数=1
this.cleanExpireMsgExecutors =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}复制代码
ConsumeMessageConcurrentlyService启动逻辑为start方法this
public void start() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {复制代码
@Override
public void run() {
cleanExpireMsg();
}复制代码
},
// 15min 消费超时
this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}复制代码
能够看到start方法是对cleanExpireMsgExecutors进行处理,开启清除过时消息的调度过程。spa
咱们重点看一下cleanExpireMsg方法。.net
private void cleanExpireMsg() {
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, ProcessQueue> next = it.next();
ProcessQueue pq = next.getValue();
pq.cleanExpiredMsg(this.defaultMQPushConsumer);
}
}复制代码
能够看到,cleanExpireMsg方法定时对ProcessQueue进行处理,将其中的消息进行清理。这部份内容不是讲解的重点,暂时打住。线程
咱们重点研究一下ConsumeMessageConcurrentlyService的消息消费过程。日志
ConsumeMessageConcurrentlyService的消息消费过程主要方法为submitConsumeRequest。
经过submitConsumeRequest提交消费请求进行消费过程。
@Override
public void submitConsumeRequest(
// 消息列表 默认一次从服务端拉取最多32条消息
final List<MessageExt> msgs,
// 消息处理队列
final ProcessQueue processQueue,
// 消息所属的消息队列
final MessageQueue messageQueue,
// 是否转发到消费线程池 并发消费时忽略该参数
final boolean dispatchToConsume) { 复制代码
获取批量消费数量,这个值为ConsumeMessageBatchMaxSize,默认为1
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();复制代码
若是消息的大小小于等于consumeBatchSize,组装消费请求,提交到消费线程池中进行消费操做。
若是一场则稍后再次提交消费请求,经过方法submitConsumeRequestLater实现。
if (msgs.size() <= consumeBatchSize) {
// 拉取的消息小于等于consumeBatchSize(默认为1)
// 提交消费请求到线程池中进行消费
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}复制代码
若是拉取的消息条数大于consumeBatchSize,则对拉取到消息进行分页处理;
每页大小为:consumeBatchSize。
经过循环迭代的方式,建立多个ConsumeRequest消费请求任务,提交到消费线程池中。
} else {
// 拉取的消息大于consumeBatchSize 进行分页提交任务到线程池
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
// 提交消费任务到消费线程池
this.consumeExecutor.submit(consumeRequest);复制代码
若是触发拒绝提交异常,则稍后继续提交。实际上,因为任务队列是LinkedBlockingQueue无界队列,所以理论上不会出现拒绝提交。
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}复制代码
这里插入对submitConsumeRequestLater的解释,这一部分能够直接选择跳过,对主流程没有影响。
private void submitConsumeRequestLater(final ConsumeRequest consumeRequest) {复制代码
this.scheduledExecutorService.schedule(new Runnable() {复制代码
@Override
public void run() {
ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
}
}, 5000, TimeUnit.MILLISECONDS);
}复制代码
能够看到经过scheduledExecutorService进行调度,每5秒再次提交一次消息消费请求。
咱们能够看到消费消息服务的核心代码为
this.consumeExecutor.submit(consumeRequest);复制代码
根据咱们对线程池调度的了解,能够知道submit接受一个Runnable接口实现,也就是这里的ConsumeRequest;经过调用该Runnable的run方法实现具体的调度逻辑。
咱们接着看一下ConsumeRequest的run方法。
大段代码预警.....
@Override
public void run() {复制代码
step1. 首先检查processQueue的dropped是否为true,若是是true,则中止消费,直接return。
当发生消息rebalance时,会设置dropped==true,这么作的目的是防止消费者消费不属于本身的消息队列。
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume,
because it's dropped. group={} {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
this.messageQueue);
return;
}
复制代码
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;复制代码
step2. 若是消费者存在钩子函数,则经过 executeHookBefore 调用该钩子函数
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}复制代码
step3. 【重点!】此处的代码是消费的核心部分。
首先判断msgs是否为空,若是不为空,则迭代msgs,设置消费开始时间戳,回调客户端实现的MessageListenerConcurrently.consumeMessage方法执行具体消费逻辑,得到其消费结果status。
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}复制代码
// 经过Collections.unmodifiableList将msgs包装为不可修改的视图
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);复制代码
// 若是消费执行异常则hasException = true;
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
// 计算消费耗时
long consumeRT = System.currentTimeMillis() - beginTimestamp;复制代码
step4. 根据具体的status返回值进行后续处理:
// 若是status为空,且hasException==true,则返回ConsumeReturnType.EXCEPTION,
// 不然返回 ConsumeReturnType.RETURNNULL
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
// 消费超时
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
// 业务侧返回RECONSUME_LATER,须要从新消费,returnType为消费失败
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
// 业务侧返回CONSUME_SUCCESS,消费成功,returnType为消费成功
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
......
// 若是客户端返回的status为null,则赋值为RECONSUME_LATER,以便重复消费。
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}复制代码
step5. 若是存在钩子函数,则执行钩子函数executeHookAfter
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}复制代码
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);复制代码
step6. 执行消费逻辑以后,再次判断processQueue的dropped状态;若是为true,则不进行任何处理;当非true时,调用processConsumeResult对消费结果进行处理。
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}复制代码
之因此当processQueue的dropped状态为true时不作任何处理,是由于当processQueue.dropped==true时,说明此时可能出现了新消费者的加入/原消费者down机等状况,致使原先消费者的队列在rebalance以后分配给了新的消费者。那么,这部分消息会被从新消费,所以此处就不须要作多余的处理,等待从新消费就能够了。
到processConsumeResult方法,就进入本文的结束部分,即:解析消费结果。
这部分的逻辑主要是消费进度offset进行处理。
public void processConsumeResult(
// 并行消费结果
final ConsumeConcurrentlyStatus status,
// 并行消费上下文
final ConsumeConcurrentlyContext context,
// 消费请求
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();复制代码
if (consumeRequest.getMsgs().isEmpty())
return;复制代码
判断消费结果,若是是CONSUMESUCCESS则设置ackIndex=msgs.size()-1;若是是RECONSUMELATER则设置ackIndex=-1。为发送消息确认ACK作准备。
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}复制代码
根据消费类型,进行处理,若是是广播模式:业务侧返回RECONSUME_LATER不会从新消费,只会打印告警日志;
若是是集群模式,消息消费成功不执行sendMessageBack;当业务侧返回RECONSUME_LATER时,这批消息须要将ACK发送给broker。
须要将它们从新封装为consumeRequest,延迟五秒后从新消费。
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}复制代码
最后,从ProcessQueue中将这批成功消费的消息移除,经过offset更新消费进度;以便后续可以从上次的消费位点继续消费,避免重复消费。
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}复制代码
咱们看一下submitConsumeRequestLater这个方法又作了哪些处理。
private void submitConsumeRequestLater(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue
) {复制代码
this.scheduledExecutorService.schedule(new Runnable() {复制代码
@Override
public void run() {
ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
}
}, 5000, TimeUnit.MILLISECONDS);
}复制代码
能够看到就是在这个方法中调用了submitConsumeRequest进行了消息消费处理。这样咱们的消费流程就完美的闭环了。
本文咱们主要讲解了ConsumeMessageConcurrentlyService消息消费服务是如何异步地对消息进行消费,着重分析了它的生命周期以及消费状态的流转过程。
到此,咱们还有一个问题没有解决,那就是ConsumeMessageConcurrentlyService消费的消息是从何处得到的?
这里就涉及到RocketMQ消息消费时的消息拉取流程,这个流程也是异步的,RocketMQ中大量使用了异步线程模型。这种方式便于理解,也有利于性能的提高,该异步流程咱们会在接下来的文章中继续分析。