RocketMq进阶源码学习之生产者启动流程分析

RocketMq进阶源码学习之生产者启动流程分析java

这里找个example,单纯简单的发送一条消息,从生产者的start方法开始入手.生产者的启动流程比较简单,本文篇幅较短,只分析了主流程,函数

public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    producer.start();
    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    producer.send()
    producer.shutdown();
}

这里是start方法,在里面的start方法下,有几行代码时表示是否开启消息轨迹追踪的,这里对traceDispatcher进行了null判断,这里不明白traceDispatcher是在哪定义的,就一路追踪了下,发现traceDispatcher这个对象是在Producer的构造函数的中进行初始化的,DefaultMqProducer有一个构造函数里有一个参数是enableMsgTrace,若是传入为true,就会初始化traceDispatcher对象,那么在start方法这里判断不为空就会开启消息轨迹追踪了学习

好比这样的构造函数便可开启消息轨迹追踪fetch

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
public void start() throws MQClientException {
    //设置生产者组名
    this.setProducerGroup(withNamespace(this.producerGroup));
    this.defaultMQProducerImpl.start();
    //traceDispatcher是为初始化,已初始化则表示开启消息追踪
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

而后再通过一些配置的校验以后,开始启动,主要是作一些定时任务与支线服务线程(如消息重平衡服务,拉取服务等等)this

public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                //开启netty客户端,NettyRemotingClient
                this.mQClientAPIImpl.start();
                //开启多个定时任务线程池,如发送心跳,持久化消息消费的offset等
                this.startScheduledTask();
                //开启拉取消息服务
                this.pullMessageService.start();
                //开启重均衡消息服务
                this.rebalanceService.start();
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }

生产者启动基本就到此为止了,就是作一些校验,看看是否须要开启消息轨迹追踪,再启动Netty客户端,而后在启动一些辅助服务就启动完毕了spa

相关文章
相关标签/搜索