RocketMQ 发送普通消息有 三 种实现方式:可靠同步发送 、 可靠异步发送 、 单向 (Oneway)发送。算法
RocketMQ 消息封装类是 org.apache.rocketmq.common.message.Message。apache
Message 扩展属性主要包含下面几个 。缓存
org.apache.rocketmq.client.producer.DefaultMQProducer
复制代码
/**
* 生产者所属组,消息服务器在回查事务状态时回随机选择该组中任何一个生产者发起事务回查请求
*/
private String producerGroup;
/**
* 默认 topicKey。TBW102
*/
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
/**
* 默认主题在每个Broker 队列数量
*/
private volatile int defaultTopicQueueNums = 4;
/**
* 消息发送默认超时时间,默认3s
*/
private int sendMsgTimeout = 3000;
/**
* 消息体超过该值则启用压缩,默认4k
*/
private int compressMsgBodyOverHowmuch = 1024 * 4;
/**
* 同步方式发送消息重试次数,默认为2,总共执行3次
*/
private int retryTimesWhenSendFailed = 2;
/**
* 异步方式发送消息重试次数,默认为2
*/
private int retryTimesWhenSendAsyncFailed = 2;
/**
* 消息重试选择另一个Broker 时,是否不等待存储结果就返回,默认为false
*/
private boolean retryAnotherBrokerWhenNotStoreOK = false;
/**
* 容许发送的最大消息长度 默认为 4M
*/
private int maxMessageSize = 1024 * 1024 * 4;
复制代码
1.建立主题bash
/**
* key:目前未实际做用,能够与 newTopic 相同 。
* newTopic: 主题名称 。
* queueNum:队列数量 。
* topicSysF!ag:主题系统标签,默认为 0。
**/
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
复制代码
2.查找该主题下全部消息队列服务器
List<MessageQueue> fetchPublishMessageQueues(String topic);
复制代码
3.同步发送消息,具体发送到主题中的那个消息队列由负载算法决定。网络
SendResult send(Message msg)
复制代码
4.同步发送消息,若是发送超过timeout,则抛出超时异常异步
SendResult send(Message msg,long timeout)
复制代码
5.异步发送消息,sendCallback 参数是消息发送成功后的回调方法函数
void send(Message msg,SendCallback sendCallback)
复制代码
6.异步发送消息,若是发送超过timeout,则抛出超时异常fetch
void send(Message msg, SendCallback sendCallback, long timeout)
复制代码
7.单向消息发送,不在意发送结果,消息发送出去后该方法马上返回spa
void sendOneway(Message msg)
复制代码
8.同步方式发送消息,发送到指定消息队列
SendResult send(Message msg, MessageQueue mq)
复制代码
9.异步方式发送消息,发送到指定消息队列
void send(Message msg, MessageQueue mq, SendCallback sendCallback);
复制代码
10.单向发送消息,发送到指定消息队列
void sendOneway(Message msg,MessageQueue mq);
复制代码
11.根据时间戳从队列中查找其偏移量
long searchOffset(MessageQueue mq, long timestamp)
复制代码
12.查找该消息队列中最大的物理偏移量
long maxOffset(MessageQueue mq)
复制代码
13.查找该消息队列中最小的物理偏移量
long minOffset(MessageQueue mq)
复制代码
14.根据消息偏移量查找消息
MessageExt viewMessage(String offsetMsgId)
复制代码
15.根据条件查询消息
* @param topic message topic -----消息主题
* @param key message key index word -----消息索引字段
* @param maxNum max message number ----本次最多取出消息条数
* @param begin from when -----开始时间
* @param end to when -----结束时间
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
复制代码
16.根据主题与消息ID 查询
MessageExt viewMessage(String topic,String msgId)
复制代码
17.同步批量发送消息
SendResult send(Collection<Message> msgs);
复制代码
检查ProducerGroup 是否符合要求;并改变生产者的instanceName 为进程ID
建立MQClientInstance实例MQClientlnstance 封装了 RocketMQ 网络处理 API,是消息生产者( Producer)、消息消费者 (Consumer)与 NameServer、 Broker打交道的网络通道。
。整个JVM实例只存在一个MQClientManager实例,维护一个MQClientInstance缓存表factoryTable.也就是 同一个 clientld只 会建立一个 MQClientinstanc巳。
ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>
复制代码
向 MQClientlnstance注册,将当前生产者加入到 MQClientlnstance管理中,方 便后续调用网络请求、进行心跳检测等。
启动 MQClientlnstance,若是 MQC!ientlnstance 已经启动 ,则本次启 动不会真 正执行。
消息发送流程主要的步骤:验证消息、查找路由 、 消息发送 (包含异常处理机制) 。默认消息发送以同 步方式发送,默认超 时时间 为 3s。
消息发送以前,首先确保生 产 者处于运行状态,而后验证消息是否符合相应的规范, 具体的规范要求是主题名称 、 消息体不能为空 、 消息长度不能等于 0且默认不能超过容许 发送消息的最大长度 4M (maxMessageSize=l024 *1024 *4)。
消息发送以前,首先须要获取主题的路由信息,只有获取了这些信息咱们才知道消息 要发送到具体的 Broker节点。
1 )消息生产者启动流程
重点理解 MQClientlnstance、消 息生产者之间的关系 。
2 )消息队列负载机制
消息生产者在发送消息时,若是本地路由表中未缓存 topic 的路由信息,向 Name Server 发送获取路由信息请求,更新本 地路由信息表,而且消息生 产者每隔 30s 从 Name Server 更新路由表 。
3 )消息发送异常机制 消息发送高可用主要经过两个手段 : 重试与 Broker规避。 Brok巳r规避就是在一次消息 发送过程当中发现错误,在某一时间段内,消息生产者不会选择该 Broker(消息服务器)上的 消息队列,提升发送消息的成功 率 。
4 )批量消息发送
RocketMQ 支持将同一主题下的多条消息一次性发送到消息服务端。