先来看下producer核心的类设计,以下图:java
一、核心发布消息的类DefaultMQProducer
,继承自MQProducer接口,此接口定义了一系列发送消息的方法,如普通消息,顺序消息,延时消息等,最终进行网络通讯会交给MQClientAPIImpl
处理。spring
二、rocketmq从4.1.3版本开始又支持了事务消息,由TransactionMQProducer
类提供(以后会有专门的文章进行详细解读事务消息)shell
咱们看到DefaultMQProducer
继承了一个客户端的公共配置类ClientConfig
(与consumer公用),其实就是一个普通的javaBean,既能够代码中设置属性,也能够集成spring来配置服务器
参数名 | 默认值 | 说明 |
---|---|---|
namesrvAddr | 无 | nameserver的地址列表,用分号隔开 |
clientIP | 本机ip地址 | 客户端ip地址,有时候没法识别,须要手动配置 |
instanceName | DEFAULT | 客户端实例名称,客户端建立的多个 Producer、Consumer 实际是共用一个内部实例(这个实例包含网络链接、线程资源等) |
clientCallbackExecutorThreads | cpu核数 | 通讯层客户端处理请求的线程数 |
pollNameServerInterval | 30000 | 轮询nameserver的时间间隔,单位ms |
heartbeatBrokerInterval | 30000 | 向broker发送心跳的时间间隔,单位ms |
persistConsumerOffsetInterval | 5000 | 持久化 Consumer 消费进度间隔时间,单位ms |
producer独有的配置:网络
参数名 | 默认值 | 说明 |
---|---|---|
producerGroup | DEFAULT_PRODUCER | Producer组名,相同分组的producer应该有相同的发送消息逻辑 |
createTopicKey | AUTO_CREATE_TOPIC_KEY | 自动建立topic时,以此默认topic为模板建立指定topic |
defaultTopicQueueNums | 4 | 自动建立topic队列数量 |
sendMsgTimeout | 3000 | 发送消息的超时时间,单位ms |
compressMsgBodyOverHowmuch | 4098 | 消息体超过多大会进行压缩,单位字节 |
retryTimesWhenSendFailed | 2 | 同步发送消息,发送失败重试次数 |
retryTimesWhenSendAsyncFailed | 2 | 异步发送消息,发送失败的重试次数 |
retryAnotherBrokerWhenNotStoreOK | false | 同步发送消息,消息存储失败是否重试其余broker |
maxMessageSize | 4194304 | 客户端限制消息的大小,默认4M |
TransactionListener | 事务消息时,必须设置的回查监听器 |
咱们在建立producer时必需要指定一个group,这里有两个做用:负载均衡
简单说明下整个启动流程:dom
一、首先在DefaultMQProducerImpl
中会作一些参数校验,如group是否合法;而后会建立MQClientInstance
实例,此实例包含网络链接、线程资源等,相同的clientId会共享此实例,因此经过MQClientManager
来管理。异步
二、核心的启动流程在MQClientInstance
类中,若是nameserver地址没有配置的话,会先经过静态的http服务器地址去抓取nameserver的地址;再则启动netty客户端。工具
三、启动一些定时任务,跟producer有关的以下几个:this
RocketMQ 有多种配置方式能够令客户端找到 NameServer, 而后经过 NameServer 再找到 Broker,分别以下,
优先级由高到低,高优优先级会覆盖低优先级
一、代码中指定 Name Server 地址
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
二、启动参数指定
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
三、环境变量指定 Name Server 地址
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
四、HTTP 静态服务器寻址(默认)
若是以上三种都没有设置name server的地址,客户端启动后先会访问一个静态http服务器获取name server的地址,而后会启动一个定时任务访问这个静态 HTTP 服务器,地址以下:
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
这是默认的地址,固然你也能够更改,作以下设置:
代码:
System.setProperty("rocketmq.namesrv.domain","localhost"); System.setProperty("rocketmq.namesrv.domain.subgroup","nameServer")
或者启动参数指定:
-Drocketmq.namesrv.domain=localhost -Drocketmq.namesrv.domain.subgroup=nameServer
以上设置后http服务器地址就变成:
http://localhsot:8080/rocketmq/nameServer
这个 URL 的返回内容格式以下:
192.168.0.1:9876;192.168.0.2:9876
客户端每隔 2 分钟访问一次这个 HTTP 服务器,并更新本地的 Name Server 地址。
推荐使用 HTTP 静态服务器寻址方式,好处是客户端部署简单,且 Name Server 集群能够热升级。
一、broker在启动的时候经过参数autoCreateTopicEnable
设置是否自动建立topic,默认为true,此时会建立一个名为TBW102
(4.3版本已经更名为AUTO_CREATE_TOPIC_KEY
)的topic(参见类TopicConfigManager
),broker在向namesrv注册时会把默认的topic注册上去。若是设置false,则不会注册。
二、producer在发送消息时会在本地获取路由信息,第一次发送的话本地确定没有,就会去namesrv获取,若是此时namesrv也没有,则会获取TBW102的topic信息(参见DefaultMQProducerImpl.tryToFindTopicPublishInfo),以此为模板建立topic,而后选择topic下的一台broker发,broker建立后,会经过心跳注册到namesrv上。
三、若是autoCreateTopicEnable设置false的话,producer发送消息会报找不到路由的异常,此时必须手动建立topic。
能够经过管理工具mqadmin来手动建立topic
sh mqadmin updateTopic -c [集群名称] -n [nameserver地址] -t [topic名称] -w [写队列数] -r [读队列数]
这里讲一下自动建立的topic的队列数如何设置,首先broker建立的模板topic=AUTO_CREATE_TOPIC_KEY
的队列是8,参见类TopicConfigManager:
public TopicConfigManager(BrokerController brokerController) { //省略无关代码 if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } //省略无关代码 }
BrokerConfig:
private int defaultTopicQueueNums = 8;
DefaultMQProducer端默认知道要建立的topic的队列数是4
private volatile int defaultTopicQueueNums = 4;
在MQClientInstance
类的方法updateTopicRouteInfoFromNameServer
中有这样一段逻辑:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { //省略无关代码 for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } //省略无关代码 }
建立队列是取二者最小的一个,也就是4,因此要设置topic的队列数量,很明显了设置broker的defaultTopicQueueNums的值和DefaultMQProducer的defaultTopicQueueNums值就能够了。这是自动建立Topic时队列数的设置方法,上面也提到生成环境通常不会开启自动建立Topic的功能,能够经过上面的手动建立Topic的指令来设置读写队列数。你可能注意到了Topic下有读写队两个队列数,分别表明上面意思呢?读写队列实际上是个逻辑概念,一个broker下topic的总队列数是以写队列为准,而读队列意思是容许多少队列能够被消费者消费,也就是说读多写少的状况下,没有问题,队列均可以被消费掉,若是写多读少的话,那么就会存在队列不会被消费的状况。
前面咱们讲到了如何获取topic的路由信息,如何建立topic的队列数,一个topic下有多个队列,又能够分布在不一样的broker上面,因此topic的总队列数应该是全部broker上的topic下队列数的总和。
备注:若是手动在每一个broker上分别建立topic的话,相同topic在不一样broker上的队列数能够不同。
那么问题来了,在发送消息时根据怎么样的策略来选择一个队列发送呢?rocketmq提供了一个MQFaultStrategy
策略类来负责选择队列,这里会有一个参数sendLatencyFaultEnable
是否开启延迟故障,
该值默认为false
,在不开启的状况下,相同线程发送消息是轮询topic下的全部队列,不一样线程发送是随机的,核心代码以下:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { //省略没必要要的代码...... } return tpInfo.selectOneMessageQueue(lastBrokerName); } //以上代码逻辑参见类MQFaultStrategy.selectOneMessageQueue public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //省略没必要要的代码...... } } public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); } //以上代码逻辑参见类TopicPublishInfo public int getAndIncrement() { Integer index = this.threadLocalIndex.get();//ThreadLocal中获取 if (null == index) {//为空,随机生成一个 index = Math.abs(random.nextInt()); if (index < 0) index = 0; this.threadLocalIndex.set(index); } index = Math.abs(index + 1); if (index < 0) index = 0; this.threadLocalIndex.set(index); return index; } //以上代码参见类ThreadLocalIndex
每次获取index的时候都是从本地线程变量ThreadLocal中获取,没有的状况下就是随机生成一个,加1取绝对值后返回,再对队列列表的长度取模,因此在同一线程中,会轮训的从队列列表获取队列。而若是是不一样线程的话,index是随机生成的,因此就是随机从队列列表中获取。以下图所示:
能够看到选择队列方法的入参有一个lastBrokerName
的入参,此参数的目的是在发送消息失败的状况下,producer会重试再次发送,而再次发送选择的队列须要另选一个broker,lastBrokerName
就是要过滤掉失败的broker,选择下一个broker的队列进行发送消息。
producer的send方法自己支持内部重试,重试逻辑以下:
一、最大重试次数默认2次,能够经过参数retryTimesWhenSendFailed
设置
二、发送失败,则轮询到下一个broker,若是此时只有一个broker在线呢?那就会轮训这个broker下的其余队列。
三、这个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认为3s。
若是发送消息,broker返回结果超时,这种超时不会进行重试了;若是是方法自己耗时超过sendMsgTimeout ,还将来得及调用发送消息,此时的超时也不会重试。
以上策略其实也很难保证同步发送消息必定成功,若是应用要保证消息不丢失,最好先把消息存储到db,后台启线程定时重试,确保消息必定存储到broker。