RocketMQ探索-Producer的start

在RocketMQ中,使用Producer相关类来生产消息,第一次使用的时,会调用producer.start()方法来进行初始化。这里咱们来探索一下Producer的start作了些什么。ide

时序图以下:fetch

producer.start()实际上是调用DefaultMQProducerImpl.start(),重点看看里面2句代码:this

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);

说明:把DefaultMQProducer对象添加到MQClientInstance的producerTable属性中:线程

public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
    if (null == group || null == producer) {
        return false;
    }

    MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
    if (prev != null) {
        log.warn("the producer group[{}] exist already.", group);
        return false;
    }

    return true;
}

producerTable对象里面存储producerGroupName和DefaultMQProducer的映射。key-value:<producerGroupName,DefaultMQProducer>netty

if (startFactory) {
    mQClientFactory.start();
}

mQClientFactory.start()调用MQClientInstance(负责启动通讯服务和定时任务)的start:server

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
                }
                // Start request-response channel 启动MQClientAPIImpl
                this.mQClientAPIImpl.start();
                // Start various schedule tasks
                this.startScheduledTask();
                // Start pull service
                this.pullMessageService.start();
                // Start rebalance service
                this.rebalanceService.start();
                // Start push service 用于将消费失败的消息发回broker
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

在Producer启动的时候,serviceState值为CREATE_JUST。由于在以上的方法中就是调用各类的start方法:对象

  • this.mQClientAPIImpl.start();

        说明:调用:MQClientAPIImpl.start()--->NettyRemotingClient.start():MQClientAPIImpl(客户端与远程交互的封装,其内部使用了RemotingClient来实现与远程的交互),NettyRemotingClient.start()的方法里面启动了netty的通讯客户端(这里就不对netty作介绍了,由于这个是个大工程)。继承

  • this.startScheduledTask();

         说明:启动各类的定时任务:队列

  • /**
     * 启动各类定时任务
     */
    private void startScheduledTask() {
        //每两分钟执行一次寻址服务(NameServer地址)
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }
    
        //每30秒更新一次全部的topic的路由信息(topicRouteTable)
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);
    
        //每30秒移除离线的broker
        //每30秒发送一次心跳给全部的master broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    
        //更新offset每5秒提交一次消费的offset,broker端为ConsumerOffsetManager负责记录,此offset是逻辑偏移量,好比说,consumerA@consumerAGroup 在broker_a的queue 0的消费队列共有10000条消息,目前消费到888,那么offset就是888.
        //由于producer和consumer内部都持有MQClientInstance实例,故MQClientInstance既有生产者逻辑,又有消费者逻辑。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    MQClientInstance.this.persistAllConsumerOffset();
                } catch (Exception e) {
                    log.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
        //每1分钟调整一次线程池,这也是针对消费者来讲的,具体为若是消息堆积超过10W条,则调大线程池,最多64个线程;若是消息堆积少于8W条,则调小线程池,最少20的线程。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                try {
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
    }

         查看方法上面的注释。ci

  • this.pullMessageService.start();

         说明:consumer的拉取消息线程实现方式:PullMessageService继承ServiceThread(对拉取消息请求进行了封装,使其队列化),start拉取消息线程启动,在run方法里面实现了:不断的从pullRequestQueue中取出请求,并调用消息拉取。

  • this.rebalanceService.start();

         说明:再平衡线程启动

  • this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

         说明:暂时没有搞明白

总结:在Producer启动的时候,作了几件事情:

  1. 把本身保持到producerTable属性中<groupName,Producer>
  2. 启动各类定时任务(对MQClientInstance说明一下,由于生产和消费者都会持有MQClientInstance因此在启动任务的时候会启动生产和消费相关的任务线程)

         启动执行寻址服务的任务(NameServer地址)。

         启动更新全部的topic的路由信息(topicRouteTable)的任务。

         启动移除离线的broker和发送心跳给全部的master broker的任务。

         启动提交消费的offset(逻辑偏移量)到broker(broker端为ConsumerOffsetManager负责记录)            的任务。

         启动调整线程池的任务(针对消费)。

相关文章
相关标签/搜索