consumer的常规订阅消息的整体操做流程:api
构造初始化-》注册监听-》启动-》无限循环请求队列-》长链接的数据拉取安全
一,构造初始化
DefaultMQPushConsumer:常规的订阅消息,须要制定惟一的分组名称
最终构造的对象是DefaultMQPushConsumerImpl:
核心的功能实现,整合内部多个组件
配置消息的消息开始位置:ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
配置namesrv地址:拉取对应的配置内容和关系结构
订阅内容:基于特定topic的订阅,而后内置了表达式引擎(过滤内容)
注册监听:该监听是监听到有消息时的自有业务逻辑处理网络
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wang-group-consmuer"); /* * Specify where to start in case the specified consumer group is a brand new one. */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //指定nameserver的地址,全部的数据交互都是基于nameserve来进行的信息获取和更新及心跳 consumer.setNamesrvAddr("127.0.0.1:9876"); /* * Subscribe one more more topics to consume. */ consumer.subscribe("TopicTest", "*"); /* * Register callback to execute on arrival of messages fetched from brokers. */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /* * Launch the consumer instance. */ consumer.start();
构造函数的初始化数据结构
public DefaultMQPushConsumer(final String consumerGroup) { this(consumerGroup, null, new AllocateMessageQueueAveragely()); }
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); }
实际实现服务的核心属性并发
public class DefaultMQPushConsumerImpl implements MQConsumerInner { /** * Delay some time when exception occur */ private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000; /** * Flow control interval */ private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50; /** * Delay some time when suspend pull service */ private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000; private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15; private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30; private final InternalLogger log = ClientLogger.getLog(); private final DefaultMQPushConsumer defaultMQPushConsumer; //负载的选择器 private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this); private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); private final long consumerStartTimestamp = System.currentTimeMillis(); private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>(); private final RPCHook rpcHook; private volatile ServiceState serviceState = ServiceState.CREATE_JUST; //客户端实例,复用 private MQClientInstance mQClientFactory; //拉取服务请求的包装 private PullAPIWrapper pullAPIWrapper; private volatile boolean pause = false; private boolean consumeOrderly = false; //消息监听 private MessageListener messageListenerInner; //消息的偏移位置 private OffsetStore offsetStore; //客户端服务 private ConsumeMessageService consumeMessageService; private long queueFlowControlTimes = 0; private long queueMaxSpanFlowControlTimes = 0;
二,app
启动
start:最终的核心启动是DefaultMQPushConsumerImpl的启动
启动和消息发送端的启动相似,进行对应的初始化及启动操做
配置服务的执行状态,内部拆分了四个状态机制
验证关键配置,主要是影响到和新功能的配置的内容
将配置的订阅信息拷贝到对应的监听,对应的负载,消费等数据对象中
生成实例id
得到MQClient的实例工程,和producer一致,关键的核心操做实现,综合体
将核心的配置注入到负载服务中
包装拉取服务,包装成独立的综合体
得到消息的开始消费偏移位置,用于消息拉取的请求参数
根据消息的监听类型,设置,分为两类,顺序监听和并发监听,并发监听的效率更高
注册消费处理
实例MQClient的工程方法启动
标示为当前服务为运行状态负载均衡
更新订阅的topic信息
检查使用的broker信息
发送心跳内容给全部的broker
立刻调用负载服务平衡框架
public void start() throws MQClientException { //消费端的核心启动入口 this.defaultMQPushConsumerImpl.start(); if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } }
核心功能的操做启动ide
//消费端主动推送的启动核心入口 public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; //验证核心关键配置 this.checkConfig(); //将配置中的订阅信息拷贝到推送服务中 this.copySubscription(); //生成id if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } //得到客户端工厂,共享的客户端工程单例设计 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); //将配置中的核心配置赋值给负载中 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); //拉取消息的包装api this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); //设置消息的开始位置 if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } //加载消息读取位置信息 this.offsetStore.load(); //消息接受器的监听类型 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } //消费者监听服务启动 this.consumeMessageService.start(); //注册服务处理事件 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } //连接工程的启动 mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } //更新topic信息到当前的内存数据结构中,便于后期的直接使用 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); //检查须要连接的broker是否可用 this.mQClientFactory.checkClientInBroker(); //发送心跳数据给当前服务所连接的全部broker中,操做过程是基于安全锁机制 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); //唤醒负载平衡操做 this.mQClientFactory.rebalanceImmediately(); }
三,监听
业务根据RocketMQ的规范,根据业务特定实现的接口
接口分为两类,顺序消息,并行消费函数
//消息接受器的监听类型 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = //有序消息 new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = //并行消息 new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); }
并行消息
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService { private static final InternalLogger log = ClientLogger.getLog(); //消息的实际拉取操做 private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; //消费消息的顶层结构 private final DefaultMQPushConsumer defaultMQPushConsumer; //并行消息接听 private final MessageListenerConcurrently messageListener; private final BlockingQueue<Runnable> consumeRequestQueue; //线程池执行配置 private final ThreadPoolExecutor consumeExecutor; private final String consumerGroup; //任务调用 private final ScheduledExecutorService scheduledExecutorService; //任务调度清除超时 private final ScheduledExecutorService cleanExpireMsgExecutors; public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_")); } public void start() { this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { @Override public void run() { //清除超时消息 cleanExpireMsg(); } }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); }
四,长链接的拉取
启动入口在MQClient的连接工程的启动中
在启动中指定负载服务的启动中RebalanceService
在操做中this.waitForRunning(waitInterval);默认是阻塞20000,可是在服务启动后,执行了this.mQClientFactory.rebalanceImmediately();
核心操做在重置负载中this.mqClientFactory.doRebalance();
有一个前提,根据当前服务的模式,分为push自动领取,pull程序主动发起,还有一个条件就是是不是顺序消息
遍历订阅信息,从新负载topic信息
最核心的操做在更新queue到负载中,完成了初始化
此时调用的是PullMessageService的执行拉取操做,该对象是核心
后期的操做就是自动化请求,基于队列的形式,持续的请求,完成后再放置下一次请求到队列中,循环请求
//连接工程的启动 mQClientFactory.start();
//初始化客户端请求实例 public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) { //mq的核心配置信息 this.clientConfig = clientConfig; //当前进程内的惟一标识,升序数值 this.instanceIndex = instanceIndex; //netty通讯的客户端配置 this.nettyClientConfig = new NettyClientConfig(); this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads()); this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS()); //解析客户端请求,封装的事件处理 this.clientRemotingProcessor = new ClientRemotingProcessor(this); //客户端实例的实际实现,网络通讯的核心,只是初始化了通讯框架,具体的连接后面根据不一样的地址再进行连接操做 this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig); //设置核心的nameserv地址 if (this.clientConfig.getNamesrvAddr() != null) { this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr()); log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr()); } this.clientId = clientId; //mq管理 this.mQAdminImpl = new MQAdminImpl(this); //拉取消息的实现 this.pullMessageService = new PullMessageService(this); //负载均衡的实现,可能有相关的机器增长删除,须要按期的进行重负载操做 this.rebalanceService = new RebalanceService(this); //消息发送者的包装,发送者的发送者,这个逻辑有点乱,而且在构造方法中从新初始化的 //producer -> DefaultMQProducer -> DefaultMQProducerImpl -> MQClientInstance -> DefaultMQProducer this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP); this.defaultMQProducer.resetClientConfig(clientConfig); //消费客户端的状态管理 this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService); log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}", this.instanceIndex, this.clientId, this.clientConfig, MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer()); } //启动发送消息的核心,同时也是订阅消息的核心 public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } //启动netty的客户端配置 // Start request-response channel this.mQClientAPIImpl.start(); //启动任务,更新,验证,心跳等操做 // Start various schedule tasks this.startScheduledTask(); //消费端 // Start pull service this.pullMessageService.start(); //消费端,从新负载设置,请求的初始化操做也在此方法内执行 // Start rebalance service this.rebalanceService.start(); //前面已经初始化过操做,该入参为false,只须要初始化其余操做 // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
初始化操做类RebalanceService
public class RebalanceService extends ServiceThread { private static long waitInterval = Long.parseLong(System.getProperty( "rocketmq.client.rebalance.waitInterval", "20000")); private final InternalLogger log = ClientLogger.getLog(); //消息的网络操做及功能 private final MQClientInstance mqClientFactory; //负载服务的初始化,构造是链接服务的实例,当前是线程的子类 public RebalanceService(MQClientInstance mqClientFactory) { this.mqClientFactory = mqClientFactory; } @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { //等待执行 this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); } @Override public String getServiceName() { return RebalanceService.class.getSimpleName(); } }
表现的状态是在waiting,而后再指定负载,执行的负载操做是
//负载平衡重置 public void doRebalance() { for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } }
此时consumerTable中的数据在前面的初始化启动中进行了注册操做
//注册服务处理事件 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); }
执行平台操做
@Override public void doRebalance() { if (!this.pause) { this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); } }
public void doRebalance(final boolean isOrder) { //得到订阅的内容 Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { //操做对topic的负载 this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } //基于数据的统一处理 this.truncateMessageQueueNotMyTopic(); }
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } //默认的集群模式 case CLUSTERING: { //得到当前topic的订阅队列信息 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //请求得到topic的cid请求 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); //排序操做 Collections.sort(mqAll); Collections.sort(cidAll); //选择cid的策略 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } //是否有变化 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); //消息队列的修改 this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } }
主要的操做在修改处理消息队列的变化boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { boolean changed = false; //得到处理队列的数据 Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); //遍历队列同时对消息对鞋进行修正 while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); //针对相同的topic的处理 if (mq.getTopic().equals(topic)) { if (!mqSet.contains(mq)) { pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); } } else if (pq.isPullExpired()) { switch (this.consumeType()) { case CONSUME_ACTIVELY: break; case CONSUME_PASSIVELY: pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", consumerGroup, mq); } break; default: break; } } } } //构造拉取消息的请求 List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); //封装拉取的请求结构 PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } //处理消息拉取请求 this.dispatchPullRequest(pullRequestList); return changed; }
拉取的操做this.dispatchPullRequest(pullRequestList);是抽象的设计,须要根据当前的实现类进行实现
@Override public void dispatchPullRequest(List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { //实行消息的拉取操做 this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); } }
逐层调用实现
public void executePullRequestImmediately(final PullRequest pullRequest) { this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); }
//将请求直接添加到队列中,这里确定是第一次初始化和后来无限次操做的入口 public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
将最终的拉取请求添加的请求队列中,等待请求队列的扫描和执行
最终的执行开始及操做,在启动的最后一步执行this.mQClientFactory.rebalanceImmediately();
//唤醒负载 public void rebalanceImmediately() { this.rebalanceService.wakeup(); }
public void wakeup() { if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } }
该唤醒操做和签名的waiting操做是一一对应的操做,待服务彻底启动后,执行拉取的唤醒操做