在RocketMQ中,使用Producer相关类来生产消息,第一次使用的时,会调用producer.start()方法来进行初始化。这里咱们来探索一下Producer的start作了些什么。ide
时序图以下:fetch
producer.start()实际上是调用DefaultMQProducerImpl.start(),重点看看里面2句代码:this
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
说明:把DefaultMQProducer对象添加到MQClientInstance的producerTable属性中:线程
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { if (null == group || null == producer) { return false; } MQProducerInner prev = this.producerTable.putIfAbsent(group, producer); if (prev != null) { log.warn("the producer group[{}] exist already.", group); return false; } return true; }
producerTable对象里面存储producerGroupName和DefaultMQProducer的映射。key-value:<producerGroupName,DefaultMQProducer>netty
if (startFactory) { mQClientFactory.start(); }
mQClientFactory.start()调用MQClientInstance(负责启动通讯服务和定时任务)的start:server
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr()); } // Start request-response channel 启动MQClientAPIImpl this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service 用于将消费失败的消息发回broker this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
在Producer启动的时候,serviceState值为CREATE_JUST。由于在以上的方法中就是调用各类的start方法:对象
this.mQClientAPIImpl.start();
说明:调用:MQClientAPIImpl.start()--->NettyRemotingClient.start():MQClientAPIImpl(客户端与远程交互的封装,其内部使用了RemotingClient来实现与远程的交互),NettyRemotingClient.start()的方法里面启动了netty的通讯客户端(这里就不对netty作介绍了,由于这个是个大工程)。继承
this.startScheduledTask();
说明:启动各类的定时任务:队列
/** * 启动各类定时任务 */ private void startScheduledTask() { //每两分钟执行一次寻址服务(NameServer地址) if (null == this.clientConfig.getNamesrvAddr()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } //每30秒更新一次全部的topic的路由信息(topicRouteTable) this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS); //每30秒移除离线的broker //每30秒发送一次心跳给全部的master broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); //更新offset每5秒提交一次消费的offset,broker端为ConsumerOffsetManager负责记录,此offset是逻辑偏移量,好比说,consumerA@consumerAGroup 在broker_a的queue 0的消费队列共有10000条消息,目前消费到888,那么offset就是888. //由于producer和consumer内部都持有MQClientInstance实例,故MQClientInstance既有生产者逻辑,又有消费者逻辑。 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); //每1分钟调整一次线程池,这也是针对消费者来讲的,具体为若是消息堆积超过10W条,则调大线程池,最多64个线程;若是消息堆积少于8W条,则调小线程池,最少20的线程。 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } } }, 1, 1, TimeUnit.MINUTES); }
查看方法上面的注释。ci
this.pullMessageService.start();
说明:consumer的拉取消息线程实现方式:PullMessageService继承ServiceThread(对拉取消息请求进行了封装,使其队列化),start拉取消息线程启动,在run方法里面实现了:不断的从pullRequestQueue中取出请求,并调用消息拉取。
this.rebalanceService.start();
说明:再平衡线程启动
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
说明:暂时没有搞明白
总结:在Producer启动的时候,作了几件事情:
启动执行寻址服务的任务(NameServer地址)。
启动更新全部的topic的路由信息(topicRouteTable)的任务。
启动移除离线的broker和发送心跳给全部的master broker的任务。
启动提交消费的offset(逻辑偏移量)到broker(broker端为ConsumerOffsetManager负责记录) 的任务。
启动调整线程池的任务(针对消费)。