RocketMQ支持两种形式的消息消费者:java
实现类是DefaultMQPushConsumerImpl,看看它启动时作了哪些事。算法
一、检查配置&拷贝订阅关系给负载均衡处理器多线程
二、利用MQClientManager建立本身的MQClientInstance实例对象,该MQClientInstance的做用在生产者章节描述过负载均衡
三、为负载均衡处理器设置相关参数异步
四、初始化OffsetStore,目的是定时持久化消费进度到本地文件或者Broker,若是是广播模式——>本地,若是是集群——>Brokeride
五、设置consumeMessageService,分为2种,若是使用在注册的监听器是MessageListenerOrderly,则consumeMessageService初始化为ConsumeMessageOrderlyService,若是监听器是MessageListenerConcurrently,则对应ConsumeMessageConcurrentlyService函数
六、利用MQClientInstance注册消费者(并无向Broker注册),MQClientInstance启动相关服务。fetch
在集群消费模式下,一条消息只会被同一个group里一个消费端消费。不一样group之间相互不影响。广播消费模式下,一条消息会被同一个group里每个消费端消费。可是广播消息的代价较高,若是消费者集群规模较大或订阅的消费量较大,会影响集群稳定性。在集群模式下,当topic中的MessageQueue变动时,动态调整消费MessageQueue的数量,采用的策略以下:this
| AllocateMessageQueueConsistentHash | 一致性哈希算法分配 | | AllocateMessageQueueAveragely | 平均分配(默认) | | AllocateMessageQueueByMachineRoom | 机房分配 | | AllocateMessageQueueAveragelyByCircle | 环形分配 | | AllocateMessageQueueByConfig | 手动配置 |spa
经过增长consumer实例去分摊queue的消费,能够起到水平扩展的消费能力的做用。而有实例下线的时候,会从新触发负载均衡,这时候原来分配到的queue将分配到其余实例上继续消费。
可是若是consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将没法分到queue,也就没法消费到消息,也就没法起到分摊负载的做用了。因此须要控制让queue的总数量大于等于consumer的数量。
在启动消费者时,会经过RebalanceService每隔20s执行一次负载均衡,能够经过参数rocketmq.client.rebalance.waitInterval指定间隔时间。
【RebalanceService】 @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); } //直接利用CountDownLatch实现等待超时,不使用sleep的缘由是可能其余操做会唤醒等待,要求当即rebalance protected void waitForRunning(long interval) { if (hasNotified.compareAndSet(true, false)) { this.onWaitEnd(); return; } //entry to wait,重写了CountDownLatch的Sync,reset实现:setState(startCount),至关于从新初始化 waitPoint.reset(); try { waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { hasNotified.set(false); this.onWaitEnd(); } }
并行处理:须要用户注册监听器时实现 new MessageListenerConcurrently();实现以后对应的ConsumeMessageConcurrentlyService会启动,将每次从Broker拉取的消息切割成多个consumeBatchSize(默认是1)的消息集合,每一个集合封装成一个ConsumeRequest,提交到线程池处理。若是线程池拒绝了此次提交(当前线程没法处理或线程池缓冲队列满了),则把这批消息放入到定时调度中,延迟5s再执行。使用并行处理的好处是多线程消费快,很差在于若是强调消息消费的顺序性,例如必须(付款,发货)这2条消息,并行处理可能致使(发货——付款)的处理顺序,不对!因此考虑顺序消费的场景应该使用顺序处理。
@Override public void submitConsumeRequest(……) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } else { for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } try { consumeExecutor.submit(new ConsumeRequest(msgThis, processQueue, messageQueue)); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } } private void submitConsumeRequestLater(final ConsumeRequest consumeRequest ) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest); } }, 5000, TimeUnit.MILLISECONDS); }
顺序处理:须要用户注册监听器时实现 new MessageListenerOrderly();实现以后对应的ConsumeMessageOrderlyService会启动,该服务的特色是某一个队列,在同一时间只能有一个线程访问,只有等拿到锁的线程消费成功后释放锁,其余线程才能继续消费该队列。劣势是下降了消息处理的吞吐量,当前一条消息消费出现问题时,会阻塞后续的流程。
ConsumeMessageOrderlyService在启动的时候,若是是集群模式下会启动一个单线程的定时调度任务,延迟一秒,时间间隔为20秒,执行rebalanceImpl的lockAll()方法。向主Master节点的Broker发送LOCK_BATCH_MQ请求,内容是锁住该ConsumerId下的全部MessageQueue,当Broker返回锁住成功的MQs后,Client端会把MQ对应的ProcessQueue也锁住。这里说明下两个Queue的区别:
//消息队列的基本属性 public class MessageQueue { //消息主题 private String topic; //所在Broker private String brokerName; //惟一标识 private int queueId; } //消息队列中的消息数据 public class ProcessQueue { //存放消息,key=MsgId,value=Message private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); private final Lock lockConsume = new ReentrantLock(); // ……还有其余属性 }
当Consumer拉取到消息后,调用ConsumeMessageOrderlyService#submitConsumeRequest处理这批消息。
@Override public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { //ConsumeRequest继承了Runnable,封装了一个MessageQueue && ProcessQueue ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); //提交到线程池 this.consumeExecutor.submit(consumeRequest); } }
在执行ConsumerRequest时,它会获取该MessageQueue的锁,使用synchronized (objLock) 保证多线程串行处理该队列。关键处理以下:
@Override public void run() { final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { //广播模式 || (PQ 成功锁住&& 锁没过时) if (MessageModel.BROADCASTING.equals(defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { //依次消费,直到continueConsume=false,若是使用者处理消息没有错误,通常会返回SUCCESS,这时continueConsume始终是true,若是没有返回SUCCESS,RocketMQ会根据返回状态把continueConsume=false for (boolean continueConsume = true; continueConsume; ) { if (MessageModel.CLUSTERING.equals(defaultMQPushConsumerImpl.messageModel()) && !this.processQueue.isLocked()) { tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } if(CLUSTERING.equals(defaultMQPushConsumerImpl.messageModel())&& this.processQueue.isLockExpired()) { tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10); break; } // 在线程数小于队列数状况下,防止个别队列被饿死 long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { submitConsumeRequestLater(processQueue, messageQueue, 10); break; } try { //处理队列加锁 this.processQueue.getLockConsume().lock(); status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } finally { this.processQueue.getLockConsume().unlock(); } continueConsume = processConsumeResult(msgs, status, context, this); } else { continueConsume = false; } } } else { // 没有拿到当前队列的锁,稍后再消费 tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } }
顺序消费总的来讲,首先,须要保证顺序的消息要发送到同一个messagequeue中;其次,一个messagequeue只能被一个消费者消费,这点是由消息队列的分配机制来保证的;最后,一个消费者内部对一个mq的消费要保证是有序的。
Broker的实时推送依靠2个服务:PullRequestHoldService和ReputMessageService。二者都继承了ServiceThread,能够丢给专门的线程去执行。前者是挂起每次请求,否则客户端不停轮询扛不住。后者用来向ConsumeQueue写消息,并触发前者处理最新的消息。
PullRequestHoldService的做用是挂起来自客户端的拉取消息请求,直到有新消息知足拉取条件才会释放请求。
@Override public void run() { while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { //间隔5s检查是否有新消息 this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } //实际是调用notifyMessageArriving this.checkHoldRequest(); } catch (Throwable e) { } } }
从官方给出的示例,能够看出该模式须要使用者本身维护队列的offset,比PushConsumer由服务端维护略显麻烦。
public class PullConsumer { private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { MetaPullConsumer consumer = new MetaPullConsumer("please_rename_unique_group_name_5"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest"); for (MessageQueue mq : mqs) { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); } consumer.shutdown(); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offseTable.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offseTable.put(mq, offset); } }
从示例代码中看出,使用时须要先获取该消费者拥有的全部队列,告诉客户端须要从该队列的哪一个offset位置取消息,获得消息后本身更新offset。
拉取消息时分为同步和异步,异步无非是客户端自身(不是消费者)添加个回调函数new PullCallback() ,在MQClientAPIImpl接受消息以后调用pullCallback.onSuccess(pullResult);在Pull模式下,没看到负载均衡,应该是给消费者自行控制,不像Push模式时由客户端统一处理和控制,根据消费者的消费能力作流控。