在进行经常使用的三种消息类型例子展现的时候,咱们先来讲一说RocketMQ的几个重要概念:java
- PullConsumer与PushConsumer:主要区别在于Pull与Push的区别。对于PullConsumer,消费者会主动从broker中拉取消息进行消费。而对于PushConsumer,会封装包含消息获取、消息处理以及其余相关操做的接口给程序调用
- Tag: Tag能够看作是一个子主题(sub-topic),能够进一步细化主题下的相关子业务。提升程序的灵活性和可扩展性
- Broker:RocketMQ的核心组件之一。用来从生产者处接收消息,存储消息以及将消息推送给消费者。同时RocketMQ的broker也用来存储消息相关的数据,好比消费者组、消费处理的偏移量、主题以及消息队列等
- Name Server: 能够看作是一个信息路由器。生产者和消费者从NameServer中查找对应的主题以及相应的broker
这里咱们不玩虚的,直接将三个类型的生产者,消费者代码实例给出(在官网给出的例子上作了些许改动和注释说明):apache
生产者代码app
/** * 多种类型组合消息测试 * @author ziyuqi * */ public class MultiTypeProducer { public static void main(String[] args) throws Exception { // 顺序消息生产者 FIFO OrderedProducer orderedProducer = new OrderedProducer(); orderedProducer.produce(); // 广播消息生产者 /*BroadcastProducer broadcastProducer = new BroadcastProducer(); broadcastProducer.produce();*/ // 定时任务消息生产者 /*ScheduledProducer scheduledProducer = new ScheduledProducer(); scheduledProducer.produce();*/ } } /** * 按顺序发送消息的生产者 * @author ziyuqi * */ class OrderedProducer { public void produce() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("GroupD"); producer.setNamesrvAddr("localhost:9876"); producer.start(); String[] tags = new String[] {"tagA", "tagB", "tagC", "tagD", "tagE"}; for (int i=0; i<50; i++) { Message message = new Message("OrderedTopic", tags[i % tags.length], "KEY" + i, ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message, new MessageQueueSelector() { /** * 所谓的顺序,只能保证同一MessageQueue放入的消息知足FIFO。该方法返回应该将消息放入那个MessageQueue,最后一个参数为send传入的最后一个参数 * 若是须要全局保持FIFO,则全部消息应该依次放入同一队列中去mqs队列中的同一下标 */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 消息被分开放入多个队列,每一个队列中的消息保证按顺序被消费FIFO /*int index = (Integer) arg % mqs.size(); System.out.println("QueueSize:" + mqs.size()); return mqs.get(index);*/ // 消息所有放入同一队列,全局保持顺序性 return mqs.get(0); } }, i); System.out.println(sendResult); } producer.shutdown(); } } /** * 广播生产者 * @author ziyuqi * */ class BroadcastProducer { public void produce() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("GroupA"); // 也必须设置nameServer producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i=0; i<50; i++) { Message message = new Message("BroadcastTopic", "tagA", "OrderID188", ("Ordered Msg:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message); System.out.println(sendResult); } producer.shutdown(); } } /** * 定时消息发送者 * @author ziyuqi * */ class ScheduledProducer { public void produce() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("GroupA"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i=0; i<50; i++) { Message message = new Message("scheduledTopic", ("Message:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 设置投递的延迟时间 message.setDelayTimeLevel(3); SendResult sendResult = producer.send(message); System.out.println(sendResult); } producer.shutdown(); } }
消费者代码异步
public class MultiTypeConsumer { public static void main(String[] args) throws Exception { // 按顺序消费者 OrderedConsumer orderedConsumer = new OrderedConsumer(); orderedConsumer.consume(); // 广播消费者 /*BroadcastConsumer broadcastConsumer = new BroadcastConsumer(); broadcastConsumer.consume();*/ // 定时任务消费者 /*ScheduledConsumer scheduledConsumer = new ScheduledConsumer(); scheduledConsumer.consume();*/ } } /** * 按顺序的消费者 * @author ziyuqi * */ class OrderedConsumer { public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupD"); /* * 设置从哪里开始消费 : * 当设置为: ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setNamesrvAddr("localhost:9876"); // 设置定于的主题和tag(必须显示指定tag) consumer.subscribe("OrderedTopic", "tagA || tagB || tagC || tagD || tagE"); consumer.setMessageListener(new MessageListenerOrderly() { AtomicLong num = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { /** * 设置是否自动提交: 默认自动提交,提交以后消息就不可以被再次消费。 * 非自动提交时,消息可能会被重复消费 */ context.setAutoCommit(false); this.num.incrementAndGet(); try { for (MessageExt msg : msgs) { System.out.println("Received:num=" + this.num.get() +", queueId=" + msg.getQueueId() + ", Keys=" + msg.getKeys() + ", value=" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET)); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } /*try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }*/ if (this.num.get() % 3 == 0) { // return ConsumeOrderlyStatus.ROLLBACK; } else if (this.num.get() % 4 == 0) { return ConsumeOrderlyStatus.COMMIT; } else if (this.num.get() % 5 == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } // 非主动提交的时候,SUCCESS不会致使队列消息提交,消息未提交就能够被循环消费 return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); } } class BroadcastConsumer { public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 即便是广播形式下,nameServer仍是要设置 consumer.setNamesrvAddr("localhost:9876"); // 设置消费的消息类型为广播类消息 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("BroadcastTopic", "tagA || tagB || tagC"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { System.out.println("Received:" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET)); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } } /** * 定时任务消费者 * @author ziyuqi * */ class ScheduledConsumer { public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("scheduledTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { System.out.println("Received:[" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET) + "]" + (System.currentTimeMillis() - msg.getStoreTimestamp()) + " ms later!"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
结合我上面的测试代码,以及我在测试中主要针对顺序消费的疑惑和源码调试。我这里简单分析下顺序消费者的相关执行过程,大体的执行步骤以下:ide
消费者启动函数
咱们知道每次consumer建立以后,都会调用consumer.start()
方法来启动消费者。跟进代码嵌套,不难发现最终会进入DefaultMQPushConsumerImpl
的start
方法中,该方法的主要代码以下:源码分析
public synchronized void start() throws MQClientException { switch (this.serviceState) { // 消费者启动状态知足Create_just case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; // 配置检查 this.checkConfig(); this.copySubscription(); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } // 主要方法在这,启动MQ客户端工厂,进行消息拉取 mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); }
MQClient启动性能
上一段源码咱们发现最终调用了mQClientFactory.start();
.咱们继续跟进该方法,发现实际调用的是MQClientInstance.start()
测试
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service 关键点在这调用了pullMessageService的start方法 this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
消息拉取fetch
根据上一段代码的注释,咱们进入到核心的消息推送代码PullMessageService
的start
方法(实际上PullMessage继承自Thread类,调用的是run方法):
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); // 重点转移到该方法具体推送实现 } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); } private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); // 调用默认的拉消息消费者实现 } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }
咱们继续跟进DefaultMQPushConsumerImpl
的pullMessage
方法:
public void pullMessage(final PullRequest pullRequest) { // ... 省略 final long beginTimestamp = System.currentTimeMillis(); // 该回调函数实际是对消息消费的具体处理 PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); // 向线程池丢入消费请求任务 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; case NO_NEW_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); } }; // ... 省略 try { this.pullAPIWrapper.pullKernelImpl( // 定义消息拉取核心实现的相关参数:包括拉取方式、回调函数等,最终会经过Netty远程请求消息而后请求成功后调用回调方法 pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); } }
以上代码注释有三个重点的地方,具体的处理流程大体是这样。首先this.pullAPIWrapper.pullKernelImpl
这个方法定义了具体的消息拉取策略,内部实现其实会根据消息类型取拉取消息。对于默认的集群消息模式,实际会调用Netty进行消息拉取,拉取结束后会调用注释中的回调函数进行处理。最终实际会进入DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest
,而实际上对于顺序消息消费会进入ConsumeMessageOrderlyService
的submitConsumeRequest
方法。该方法直接向消费线程池中放入一个消费请求任务。
消费请求任务
咱们继续跟进ConsumeRequest
消费请求任务的具体实现:
@Override public void run() { if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { log.warn("the message queue not locked, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) && this.processQueue.isLockExpired()) { log.warn("the message queue lock expired, so consume later, {}", this.messageQueue); ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; } final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); ConsumeOrderlyStatus status = null; ConsumeMessageContext consumeMessageContext = null; if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup()); consumeMessageContext.setMq(messageQueue); consumeMessageContext.setMsgList(msgs); consumeMessageContext.setSuccess(false); // init the consume context type consumeMessageContext.setProps(new HashMap<String, String>()); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext); } long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } // 调用注册的listener消费消息,而且获得返回结果 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } if (null == status || ConsumeOrderlyStatus.ROLLBACK == status || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); } long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) { returnType = ConsumeReturnType.EXCEPTION; } else { returnType = ConsumeReturnType.RETURNNULL; } } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeOrderlyStatus.SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name()); } if (null == status) { status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) { consumeMessageContext.setStatus(status.toString()); consumeMessageContext .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status); ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext); } ConsumeMessageOrderlyService.this.getConsumerStatsManager() .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT); // 处理Listener的返回结果 continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; } } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } }
能够看出咱们开始会调用咱们实现的MessageListener对拉取到的消息进行消费,消费完成以后咱们会拿到消费结果,并对消费结果进行处理。
消费结果处理(COMMIT ROLLBACK)
咱们直接跟进消费结果处理代码:
public boolean processConsumeResult( final List<MessageExt> msgs, final ConsumeOrderlyStatus status, final ConsumeOrderlyContext context, final ConsumeRequest consumeRequest ) { boolean continueConsume = true; long commitOffset = -1L; if (context.isAutoCommit()) { // 自动提交的状况下 switch (status) { case COMMIT: case ROLLBACK: log.warn("the message queue consume result is illegal, we think you want to ack these message {}", consumeRequest.getMessageQueue()); case SUCCESS: commitOffset = consumeRequest.getProcessQueue().commit(); this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } else { commitOffset = consumeRequest.getProcessQueue().commit(); } break; default: break; } } else { switch (status) { // 非自动提交,需区别对待返回的处理结果 case SUCCESS: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); break; case COMMIT: commitOffset = consumeRequest.getProcessQueue().commit(); break; case ROLLBACK: consumeRequest.getProcessQueue().rollback(); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } break; default: break; } } if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); } return continueConsume; }
由于咱们例子中写的是非自动提交,咱们就来看看非自动提交下ROLLBACK和COMMIT的具体实现(对应ProcessQueue
的相关方法):
public void rollback() { try { this.lockTreeMap.writeLock().lockInterruptibly(); try { /** * 当消费到KEY2的时候,由于num=3因此进入rollback方法 * 此时: * this.msgTreeMap包含全部未消费的消息 此时有 KEY3 --- KEY49 * this.consumingMsgOrderlyTreeMap 有全部按顺序消费过的消息 KEY0 --- KEY2 * 不难看出一旦执行rollback,不只仅是将当前消费的消息从新放入消息队列供再次消费,前面已经处理的消息 * 将都会从新放入消息队列供再次消费。也就能解释前面所出现的为何自动提交设置为false以后,消息重复消费 */ this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap); this.consumingMsgOrderlyTreeMap.clear(); } finally { this.lockTreeMap.writeLock().unlock(); } } catch (InterruptedException e) { log.error("rollback exception", e); } } public long commit() { try { this.lockTreeMap.writeLock().lockInterruptibly(); try { // 获取已顺序消费消息队列中最后一个消息的偏移值 Long offset = this.consumingMsgOrderlyTreeMap.lastKey(); // 原队列消息个数减去已顺序消费但未提交的消息个数为剩下可继续消费的消息个数 msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size()); // 队列消息总长度减去待提交的队列消息总长度 for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { msgSize.addAndGet(0 - msg.getBody().length); } // 将已消费未提交的队列列表清空 this.consumingMsgOrderlyTreeMap.clear(); if (offset != null) { return offset + 1; } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (InterruptedException e) { log.error("commit exception", e); } return -1; }
至此,整个简单的消费流程分析完成。
消费流程源码分析总结
- Pull OR Push:即便是Push模式的Consumer,其最终实现仍是是经过Pull的方式来进行的
- Netty:集群模式的远程消息获取是经过Netty来实现的
RocketMQ的经常使用三种消息生产消费模式到如今咱们就基本分析完了。我的认为顺序消息消费给须要顺序执行的流程异步实现提供了强有力的支持。这一点特别适用于阿里当前的相关领域。固然RocketMQ也不是尽善尽美的,我我的在测试的时候发现顺序消息消费的性能不算特别高,固然具体什么缘由只有留到后续分析了。还有,由于这个项目开始是阿里内部研发的,可能源码注释上相比于其余开源项目仍是要少一些,也没有那么清楚。以致于consumer.setConsumeFromWhere
这个的不一样设值的具体区别在哪我尚未探究出来(想一想Spring的事务隔离级别以及传递特性相关常量的注释基本一看就懂了),限于篇幅还有我赶忙赶去上班,就再也不继续深究了(后面继续)。