RocketMq源码的分享

一:RocketMq架构设计

1 技术架构

rocketmq_architecture_1.png

RocketMQ架构上主要分为四部分,如上图所示:apache

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer经过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败而且低延迟
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,这样基本能够知足大多数用户的需求。
  • NameServer:NameServer是一个很是简单的Topic路由注册中心,其角色相似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息而且保存下来做为路由信息的基本数据。而后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每一个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。而后Producer和Conumser经过NameServer就能够知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer一般也是集群的方式部署,各实例间相互不进行信息通信。Broker是向每一台NameServer注册本身的路由信息,因此每个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种缘由下线了,Broker仍然能够向其它NameServer同步其路由信息,Producer,Consumer仍然能够动态感知Broker的路由的信息。
  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了如下几个重要子模块。
  • Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
  • Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
  • Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
  • HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
  • Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

rocketmq_architecture_2.png

2 部署架构

rocketmq_architecture_3.png

3 RocketMQ 网络部署特色
  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master能够对应多个Slave,可是一个Slave只能对应一个Master,Master与Slave 的对应关系经过指定相同的BrokerName,不一样的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也能够部署多个。每一个Broker与NameServer集群中的全部节点创建长链接,定时注册Topic信息到全部NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
  • Producer与NameServer集群中的其中一个节点(随机选择)创建长链接,按期从NameServer获取Topic路由信息,并向提供Topic 服务的Master创建长链接,且定时向Master发送心跳。Producer彻底无状态,可集群部署。
  • Consumer与NameServer集群中的其中一个节点(随机选择)创建长链接,按期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave创建长链接,且定时向Master、Slave发送心跳。Consumer既能够从Master订阅消息,也能够从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master仍是Slave拉取。
4.结合部署架构图,描述集群工做流程:
  • A: 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,至关于一个路由控制中心。
  • B:Broker启动,跟全部的NameServer保持长链接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储全部Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • C:收发消息前,先建立Topic,建立Topic时须要指定该Topic要存储在哪些Broker上,也能够在发送消息时自动建立Topic。
  • D:Producer发送消息,启动时先跟NameServer集群中的其中一台创建长链接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,而后与队列所在的Broker创建长链接从而向Broker发消息。
  • E:Consumer跟Producer相似,跟其中一台NameServer创建长链接,获取当前订阅Topic存在哪些Broker上,而后直接跟Broker创建链接通道,开始消费消息。

二:RocketMQ之NameServer部分

1.NameServer的初始化和启动流程

namesrv.png

2.RouteInfoManager类

基础:Namesrv用来存储路由的基础信息都放在RouteInfoManager类中,RouteInfoManager类也能够看作是Namesrv的资源类,不少操做都是对此类中的数据进行实时更改,
主要成员变量:缓存

private final HashMap<String/\* topic \*/, List<QueueData>> topicQueueTable;  
private final HashMap<String/\* brokerName \*/, BrokerData> brokerAddrTable;  
private final HashMap<String/\* clusterName \*/, Set<String/\* brokerName \*/\>> clusterAddrTable;  
private final HashMap<String/\* brokerAddr \*/, BrokerLiveInfo> brokerLiveTable;  
private final HashMap<String/\* brokerAddr \*/, List<String>/\* Filter Server \*/\> filterServerTable;

topicQueueTable:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡
brokerAddrTable:Broker基础信息,包含brokerName、所属集群名称、主备Broker地址
clusterAddrTable:Broke集群信息,存储集群中全部Broker名称
brokerLiveTable:Broker状态信息,NameServer每次收到心跳包时会替换该信息
filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤
QQ截图20200116014201.png服务器

3.NameServer的路由信息注册功能

其主要方法是RouteInfoManager类的registerBroker()方法网络

public RegisterBrokerResult registerBroker(  
    final String clusterName,  
    final String brokerAddr,  
    final String brokerName,  
    final long brokerId,  
    final String haServerAddr,  
    final TopicConfigSerializeWrapper topicConfigWrapper,  
    final List<String> filterServerList,  
    final Channel channel)

主要流程如上图数据结构

4.NameServer的路由信息发现功能

RMQ路由发现是非实时的,当Topic发生变化后,Namesrv不知道推送给客户端,而是由客户端主动拉取最新的路由。
其主要方法是RouteInfoManager类的pickupTopicRouteData()方法架构

public TopicRouteData pickupTopicRouteData(final String topic)

主要流程以下图:
QQ图片20200116004252.pngapp

5.NameServer的broker自动剔除机制

路由删除会从topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable中删除与该Broker相关的信息。
其主要方法是RouteInfoManager类的scanNotActiveBroker()和onChannelDestroy(String remoteAddr, Channel channel)方法负载均衡

public void scanNotActiveBroker() {  
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();     //遍历每一个存活broker
    while (it.hasNext()) {  
        Entry<String, BrokerLiveInfo> next = it.next();  
        long last = next.getValue().getLastUpdateTimestamp();  
        //默认超过两分钟,关闭和broker的channel,把本身从brokerLiveTable中剔除
        //调用 onChannelDestroy
        if ((last + BROKER\_CHANNEL\_EXPIRED\_TIME) < System.currentTimeMillis()) {  
            RemotingUtil.closeChannel(next.getValue().getChannel());  
            it.remove();  
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER\_CHANNEL\_EXPIRED\_TIME);  
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());  
        }  
    }  
}

onChannelDestroy()方法的主要流程以下:
1.申请写锁,根据brokerAddress从brokerLiveTable、filterServerTable移除
2.维护brokerAddrTable,遍历brokerAddrTable,根据参数remoteAddr从brokerAddrTable中删除brokerAddr,若是BrokerAddrs集合为空,则从brokerAddrTable中删除brokerName。
3.根据BrokerName,从ClusterAddrTable中找到Broker并从集群中移除, 若是移除后,集群中不包含任何Broker,则将该集群从clusterAddrTable中移除。
4.根据brokerName,遍历topicQueueTable,找到QueueData并从List<QueueData>移除,若是List<QueueData>为空,则把相应的topic从topicQueueTable中移除。
5.释放锁异步

三:RocketMQ之Producer部分

1.Producer的启动流程

1.1:总体流程以下图
20200116144705.jpg
1.2: 在应用里初始化DefaultMQProducer时候,会以Producer名或者RPCHook的任一个或两个做为参数初始化DefaultMQProducer对象,而后对DefaultMQProducer对象设置NameServer地址等参数,而后调用start方法启动Producer,其实内部调用了DefaultMQProducerImpl.start 方法,其大体流程以下:
20200116132819.jpg分布式

2.向Broker发送心跳消息

public void sendHeartbeatToAllBrokerWithLock() {  
  if (this.lockHeartbeat.tryLock()) {  
  try {  
  this.sendHeartbeatToAllBroker();  
  //向Filter过滤服务器发送REGISTER_MESSAGE_FILTER_CLASS请求码,更新过滤服务器中的Filterclass文件
  this.uploadFilterClassSource();  
        } catch (final Exception e) {  
  log.error("sendHeartbeatToAllBroker exception", e);  
        } finally {  
  this.lockHeartbeat.unlock();  
        }  
 } else {  
  log.warn("lock heartBeat, but failed.");  
    }  
}

sendHeartbeatToAllBroker()的主要流程以下:
一、初始化 HeartbeatData 对象,将该 Producer 或 Consumer 的 ClientID 赋值给 HeartbeatData 对象的 clientID 变量。
二、遍历 MQClientInstance.consumerTable和producerTable,根据每一个 MQConsumerInner 对象的值初始化ConsumerData 对象和ProducerData对象。
三、若 ConsumerData 集合和 ProducerData 集合都为空,说明没有 consumer或 produer,则不发送心跳信息。
四、若不是都为空,则遍历 MQClientInstance.brokerAddrTable 列表,向每一个 Broker 地址发送请求码为 HEART_BEAT 的心跳消息,可是当存在 Consumer 时才向全部 Broker 发送心跳消息,不然若不存在 Consumer 则只向主用 Broker 地址发送心跳消息。
5.根据broker返回结果,更新MQClientInstance.brokerVersionTable。

3.消息发送

3.1:消息发送的总体时序流程以下图
20190228103418595.jpg
其中几个主要方法文字说明以下:
3.2 消息的数据结构
rocketmq的消息封装在org.apache.rocketmq.common.message类中,属性:

private String topic;                   //消息所属topic
    private int flag;                       //消息flag (没啥用)
    private Map<String, String> properties; //扩展属性 (消息的TAGS 消息的延迟等级。)
    private byte[] body;                    //消息主体内容
    private String transactionId;           //交易id

3.3 普通消息的发送
对于普通消息的发送,能够从DefaultProducerImpl的send方法入手,

public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {  
  return send(msg, this.defaultMQProducer.getSendMsgTimeout());  
}

发送流程中几个主要方法以下:

3.3.1 消息验证

验证消息是否符合规范,包括topicName、body不能为空、length不能为0且最大为4_1024_1024

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)  
  throws MQClientException {  
  if (null == msg) {  
  throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");  
    }  
  // topic  
  Validators.checkTopic(msg.getTopic());  
  if (null == msg.getBody()) {  
  throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");  
    }  
  
  if (0 == msg.getBody().length) {  
  throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");  
    }  
  
  if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {  
  throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,  
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());  
    }  
}
3.3.2 查找该消息topic的路由信息

获取主题的路由信息,查找要发送的具体的Broker节点

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {  
  TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);  
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {  
  this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());  
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);  
        topicPublishInfo = this.topicPublishInfoTable.get(topic);  
    }  
  
  if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {  
  return topicPublishInfo;  
    } else {  
  this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);  
        topicPublishInfo = this.topicPublishInfoTable.get(topic);  
        return topicPublishInfo;  
    }  
}

生产者中若是缓存了topic的路由信息则直接返回,不然向Namesrv查询,再未查询到则尝试用默认主题createTopicKey=TBW102去查询,都查询步到则会抛出异常。Producer关于更新和维护路由信息MQClientInstance.topicRouteTable缓存操做都在updateTopicRoutInfoFromNameServer方法中。

3.3.3 根据TopicPublishInfo选择MessageQueue

根据路由信息选择消息队列,返回的消息体按照broker、序号排序。
首先采用重试机制,由retryTimesWhenSendFailed指定同步方式重试次数,异步由retryTimesWhenSendAsyncFailed指定,而后使用循环执行的方式,选择消息队列 、发送消息,发送成功则返回,收到异常则重试。
选择消息队列有2种方式:
1.sendLatencyFaultEnable=false ,默认不启用 Broker 故障延迟机制,调用 TopicPublishlnfo的slectOneMessageQueue每次获取index的时候都是从本地线程变量ThreadLocal中获取,没有的状况下就是随机生成一个,加1取绝对值后返回,再对队列列表的长度取模,因此在同一线程中,会轮训的从队列列表获取队列。而若是是不一样线程的话,index是随机生成的,因此就是随机从队列列表中获取。
能够看到选择队列方法的入参有一个lastBrokerName的入参,此参数的目的是在发送消息失败的状况下,producer会重试再次发送,而再次发送选择的队列须要另选一个broker,lastBrokerName就是要过滤掉失败的broker,选择下一个broker的队列进行发送消息。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {  
  if (lastBrokerName == null) {  
  return selectOneMessageQueue();  
    } else {  
  int index = this.sendWhichQueue.getAndIncrement();  
        for (int i = 0; i < this.messageQueueList.size(); i++) {  
  int pos = Math.abs(index++) % this.messageQueueList.size();  
            if (pos < 0)  
  pos = 0;  
            MessageQueue mq = this.messageQueueList.get(pos);  
            if (!mq.getBrokerName().equals(lastBrokerName)) {  
  return mq;  
            }  
 }  return selectOneMessageQueue();  
    }  
}  
  
public MessageQueue selectOneMessageQueue() {  
  int index = this.sendWhichQueue.getAndIncrement();  
    int pos = Math.abs(index) % this.messageQueueList.size();  
    if (pos < 0)  
  pos = 0;  
    return this.messageQueueList.get(pos);  
}

2.sendLatencyFaultEnable=true ,启用 Broker 障延迟机制,保证低延迟,调用MQFaultStrategy.selectOneMessageQueue
开启延迟故障,每当发送完一次消息,无论成功仍是失败,都会把这次存储消息的broker给保存下来,记录故障状况下此broker须要延长多长时间才能再次发送,目前看到在代码里面写死了,故障下30s以内是不能再向此broker发送消息了

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {  
//开启此策略能够保证低延迟
  if (this.sendLatencyFaultEnable) {  
  try {  
  int index = tpInfo.getSendWhichQueue().getAndIncrement();  
  //轮询看能不能有低延迟的MessageQueue
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {  
  int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();  
                if (pos < 0)  
  pos = 0;  
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);  
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {  
  if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))  
  return mq;  
                }  
 }  
  //若是没找到就随机找一个
  final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();  
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);  
            if (writeQueueNums > 0) {  
  final MessageQueue mq = tpInfo.selectOneMessageQueue();  
                if (notBestBroker != null) {  
  mq.setBrokerName(notBestBroker);  
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);  
                }  
  return mq;  
            } else {  
  latencyFaultTolerance.remove(notBestBroker);  
            }  
 } catch (Exception e) {  
 log.error("Error occurred when selecting message queue", e);  
        }  
  
  return tpInfo.selectOneMessageQueue();  
    }  
  
  return tpInfo.selectOneMessageQueue(lastBrokerName);  
}

3.3.4 真正消息的发送
1.消息发送的入口在DefaultMQProducerimpl的sendKernerlmpl方法中:根据 MessageQueue 获取 Broker 的网络地址 若是 MQClientlnstance.brokeraddrTable没缓存该 Broke 的信息,则从 NameServer 主动更新一 topic 的路由信
若是路由更新后还 找不到Broker信息,则抛出异常,提示Broer不存在

String brokerAddr = this.mQClientFactory.findBrokerAddressinPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishinfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressinPublish
            (mq.getBrokerName()); 
    }

2.为消息分配全局惟一id,若是消息默认超 4K(compressMsgBodyOverHowmuch),会对消息体采用 zip 压缩,并设置消息的系统标记为 MessageSysFlag.COMPRESSED_FLAG,若是是事务Prepared消息,则设消息的系统标记为 MessageSysFlag.TRANSACTION_PREPARED TYPE

//for MessageBatch,ID has been set in the generating process  
if (!(msg instanceof MessageBatch)) {  
  MessageClientIDSetter.setUniqID(msg);  
}  
  
boolean topicWithNamespace = false;  
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {  
  msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());  
    topicWithNamespace = true;  
}  
  
int sysFlag = 0;  
boolean msgBodyCompressed = false;  
if (this.tryToCompressMessage(msg)) {  
  sysFlag |= MessageSysFlag.COMPRESSED_FLAG;  
    msgBodyCompressed = true;  
}  
  
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);  
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {  
  sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;  
}
  1. 若是删了消息发送钩子函数, 则执行消息发送以前的加强逻辑 经过 DefaultMQProducerlmpl.registerSendMessageHook 注册钩子处理类,而且能够注册多个,简单看下钩子处理类接口
if (this.hasSendMessageHook()) {  
  context = new SendMessageContext();  
    context.setProducer(this);  
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());  
    context.setCommunicationMode(communicationMode);  
    context.setBornHost(this.defaultMQProducer.getClientIP());  
    context.setBrokerAddr(brokerAddr);  
    context.setMessage(msg);  
    context.setMq(mq);  
    context.setNamespace(this.defaultMQProducer.getNamespace());  
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);  
    if (isTrans != null && isTrans.equals("true")) {  
  context.setMsgType(MessageType.Trans\_Msg\_Half);  
    }  
  
  if (msg.getProperty("\_\_STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY\_DELAY\_TIME\_LEVEL) != null) {  
  context.setMsgType(MessageType.Delay\_Msg);  
    }  
  this.executeSendMessageHookBefore(context);  
}

4.构建消息发送请求 主要包含以下重要信息:生产者组、主题名称、默认建立主题 Key 、该主题在单个Broker默认队列数、队ID (队列序号)、消息系统标( MessageSysFlag 消息发时间 、消息标记( RocketMQ对消息中的flag不作任何处理供应用程序使用) 消息扩展属性 、消息重试次数、是不是批量消息等

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTopic(msg.getTopic());
    requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
    requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
    requestHeader.setQueueId(mq.getQueueId());
    requestHeader.setSysFlag(sysFlag);
    requestHeader.setBornTimestamp(System.currentTimeMillis());
    requestHeader.setFlag(msg.getFlag());
    requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
    requestHeader.setReconsumeTimes(0);
    requestHeader.setUnitMode(this.isUnitMode());
    requestHeader.setBatch(msg instanceof MessageBatch);
    if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
        String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
        if (reconsumeTimes != null) {
            requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
        }
        String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
        if (maxReconsumeTimes != null) {
            requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
        }
    }

5.根据 发送方式,同步、异步、单式进行网络传输,消息发送在MQClientAPIImpl的sendMessage()方法中

public SendResult sendMessage(  
  final String addr,  
    final String brokerName,  
    final Message msg,  
    final SendMessageRequestHeader requestHeader,  
    final long timeoutMillis,  
    final CommunicationMode communicationMode,  
    final SendCallback sendCallback,  
    final TopicPublishInfo topicPublishInfo,  
    final MQClientInstance instance,  
    final int retryTimesWhenSendFailed,  
    final SendMessageContext context,  
    final DefaultMQProducerImpl producer  
) throws RemotingException, MQBrokerException, InterruptedException {  
  long beginStartTime = System.currentTimeMillis();  
    RemotingCommand request = null;  
    if (sendSmartMsg || msg instanceof MessageBatch) {  
  SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);  
        request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND\_BATCH\_MESSAGE : RequestCode.SEND\_MESSAGE\_V2, requestHeaderV2);  
    } else {  
  request = RemotingCommand.createRequestCommand(RequestCode.SEND\_MESSAGE, requestHeader);  
    }  
  request.setBody(msg.getBody());  
    switch (communicationMode) {  
  case ONEWAY:  
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);  
            return null;  
        case ASYNC:  
            final AtomicInteger times = new AtomicInteger();  
            long costTimeAsync = System.currentTimeMillis() - beginStartTime;  
            if (timeoutMillis < costTimeAsync) {  
  throw new RemotingTooMuchRequestException("sendMessage call timeout");  
            }  
  this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,  
                retryTimesWhenSendFailed, times, context, producer);  
            return null;  
        case SYNC:  
            long costTimeSync = System.currentTimeMillis() - beginStartTime;  
            if (timeoutMillis < costTimeSync) {  
  throw new RemotingTooMuchRequestException("sendMessage call timeout");  
            }  
  return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);  
        default:  
            assert false;  
            break;  
    }  
  
  return null;  
}
  1. 若是注册了消息发送钩子函数,执行after逻辑。注意,就算消息发送过程当中发生 RemotingException MQBrokerException InterruptedException时,该方法也会执行
if (this.hasSendMessageHook()) {
        context.setException(e);
        this.executeSendMessageHookAfter(context);
    }
相关文章
相关标签/搜索