生产者发送消息,须要等到消息服务器返回结果java
生产者发送消息, 不须要等到消息服务器返回结果,生产者线程不阻塞,只需注册监听回调函数便可。apache
生产者只管发,无论成功与否。缓存
org.apache.rocketmq.common.message.Message服务器
private String topic; private int flag; // 属性 private Map<String, String> properties; // 消息体 private byte[] body; // 事务 ID private String transactionId;
properties 存放的扩展属性主要有:网络
用于过滤消息异步
Message 索引键,多个用空格隔开,RocketMQ 可根据这些 key 快速检索消息函数
消息发送时,是否等到消息存储完,再返回this
消息延迟级别,用于定时消息或消息重试线程
// 生成者组,消息服务器在回查事务状态时,会随机选择该组中的任何一个生成者发起事务回查请求 private String producerGroup; // 默认的 topic key private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; // TBW102 // 默认的主题队列数量 private volatile int defaultTopicQueueNums = 4; // 默认发送超时时间 private int sendMsgTimeout = 3000; // 消息体超过 4kb 时,压缩消息 private int compressMsgBodyOverHowmuch = 1024 * 4; // 同步发送失败,最大尝试次数 private int retryTimesWhenSendFailed = 2; // 异步发送失败,最大尝试次数 private int retryTimesWhenSendAsyncFailed = 2; // 消息重试时选择另一个 broker 时,是否不等待存储结果就返回。默认 false private boolean retryAnotherBrokerWhenNotStoreOK = false; // 容许发送的最大消息长度 private int maxMessageSize = 1024 * 1024 * 4; // 4M
流程代码位于:DefaultMQProducerImpl#startcode
流程代码位于:DefaultMQProducerImpl#sendDefaultImpl
重要的类:TopicPublishInfo
// 是不是顺序消息 private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; // 主题的消息队列 private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); // 用于选择消息队列。每次选择一个队列,会自增1,若是等于 Integer.MAX_VALUE,重置为0 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData; // TopicRouteData private String orderTopicConf; private List<QueueData> queueDatas; private List<BrokerData> brokerDatas; private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
代码位于:DefaultMQProducerImpl#tryToFindTopicPublishInfo(final String topic)
sendLatencyFaultEnable = false 默认不启用 broker 故障延迟机制。
RocketMQ 会避开上一次发送失败的 broker
代码入口:TopicPublishInfo#selectOneMessageQueue(final String lastBrokerName)
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 避开上一次发送失败的 broker if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
代码入口:MQFaultStrategy#selectOneMessageQueue
代码入口:DefaultMQProducerImpl#sendKernelImpl
若是发送失败,则继续重复 选择消息队列,而后发送
重试的调用入口是在收到服务端响应包时进行的。所以网络异常、网络超时,将不会触发重试。
异步发送,不须要回调,没有重试机制
单机环境确定没救了,集群环境下没有影响的。
rocketMQ 每次发送消息时,会选择消息队列。第一次发送失败后,会进行失败的尝试。此时会将上一次发送失败的 broker 排除。从新选择一个 broker。
若是是当前保持链接的 namesrv 保存的 broker 信息,都挂掉了,那么必定是会失败的。(若是是这个缘由,那么我以为,全部的 broker 应该挂掉了)
若是是此 namesrv 没有 topic 对应的 broker ,那么 RocketMQ 会选择其余的 namesrv 保持链接,因此,不会发送失败。
相关代码:NettyRemotingClient#getAndCreateNameserverChannel()
若是只是 namesrv 中的某个 broker 挂掉了,可是由于 Producer 有消息重试机制,会选择其余的 broker。所以,消息不会发送失败。除非全部的 broker 都失败了。
结论:除非全部 broker 挂掉,否则我以为不会形成消息发送失败。
答案是不会。
缘由是,每次再向 namesrv 发送消息时,须要判断与 namesrv 的 channel 是否有效。若是无效,则会尝试从剩下的 namesrv 查找一个有效的,并与之保持链接。
相关代码位于: NettyRemotingClient#getAndCreateNameserverChannel()