说在前面java
DefaultMQProducer、DefaultMQPullConsumer、DefaultMQPushConsumer 处理过程apache
源码解析缓存
producer微信
public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");producer.start();for (int i = 0; i < 128; i++)try {{Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}} catch (Exception e) {e.printStackTrace();}producer.shutdown(); }
生产者启动,producer.start();进入方法,org.apache.rocketmq.client.producer.DefaultMQProducer#startdom
@Overridepublic void start() throws MQClientException {// 生产者启动=》this.defaultMQProducerImpl.start(); }
进入方法,生产者启动,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start()异步
public void start() throws MQClientException {// =》this.start(true); }
进入方法,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)ide
public void start(final boolean startFactory) throws MQClientException {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 检查配置=》this.checkConfig();if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {this.defaultMQProducer.changeInstanceNameToPID();}// 建立mqclient对象=》this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);// 注册生产者=》boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) { //this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}// 存储topic发布信息this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {// 启动mqclient=》mQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}// 同步发送心跳检测请求向全部的broker=》this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); }
进入方法,检查配置,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkConfig性能
private void checkConfig() throws MQClientException {Validators.checkGroup(this.defaultMQProducer.getProducerGroup());if (null == this.defaultMQProducer.getProducerGroup()) {throw new MQClientException("producerGroup is null", null);}if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",null);} }
进入方法,建立mqclient对象,org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)ui
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {String clientId = clientConfig.buildMQClientId();// 从本地缓存中获取client对象,简单的通常会concurrentHashMap当本地缓存,性能很高MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance; }
返回方法,注册生产者,org.apache.rocketmq.client.impl.factory.MQClientInstance#registerProducerthis
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {if (null == group || null == producer) {return false;}// 这里是用concurrentHashMap在本地内存中维护注册信息MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);if (prev != null) {log.warn("the producer group[{}] exist already.", group);return false;}return true; }
返回方法,启动mqclient,org.apache.rocketmq.client.impl.factory.MQClientInstance#start 前面介绍过了,能够翻阅前面的章节。
返回方法,同步发送心跳检测请求向全部的broker,org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBrokerWithLock
public void sendHeartbeatToAllBrokerWithLock() {if (this.lockHeartbeat.tryLock()) {try {// 发送心跳监测向全部broker=》this.sendHeartbeatToAllBroker();// 更新资源=》this.uploadFilterClassSource();} catch (final Exception e) {log.error("sendHeartbeatToAllBroker exception", e);} finally {this.lockHeartbeat.unlock();}} else {log.warn("lock heartBeat, but failed.");} }
进入方法,发送心跳监测向全部broker,org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBroker
private void sendHeartbeatToAllBroker() {// 准备心跳检测数据final HeartbeatData heartbeatData = this.prepareHeartbeatData();final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();// failfastif (producerEmpty && consumerEmpty) {log.warn("sending heartbeat, but no consumer and no producer");return;}if (!this.brokerAddrTable.isEmpty()) {long times = this.sendHeartbeatTimesTotal.getAndIncrement();Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, HashMap<Long, String>> entry = it.next();String brokerName = entry.getKey();HashMap<Long, String> oneTable = entry.getValue();if (oneTable != null) {for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {Long id = entry1.getKey();String addr = entry1.getValue();if (addr != null) {// 非master broke人节点没有消费者if (consumerEmpty) {if (id != MixAll.MASTER_ID)continue;}try {// 心跳检测=》int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);if (!this.brokerVersionTable.containsKey(brokerName)) {this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));}this.brokerVersionTable.get(brokerName).put(addr, version);if (times % 20 == 0) {log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);log.info(heartbeatData.toString());}} catch (Exception e) {// broker是否存在=》if (this.isBrokerInNameServer(addr)) {log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);} else {log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,id, addr);}}}}}}} }
进入方法,心跳检测,org.apache.rocketmq.client.impl.MQClientAPIImpl#sendHearbeat
public int sendHearbeat(final String addr,final HeartbeatData heartbeatData,final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);request.setLanguage(clientConfig.getLanguage());request.setBody(heartbeatData.encode());// 同步执行RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return response.getVersion();}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark()); }
进入方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync介绍过了。
返回方法,broker是否存在,org.apache.rocketmq.client.impl.factory.MQClientInstance#isBrokerInNameServer
private boolean isBrokerInNameServer(final String brokerAddr) {// 存储topic路由信息Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();while (it.hasNext()) {Entry<String, TopicRouteData> itNext = it.next();List<BrokerData> brokerDatas = itNext.getValue().getBrokerDatas();for (BrokerData bd : brokerDatas) {boolean contain = bd.getBrokerAddrs().containsValue(brokerAddr);if (contain)return true;}}return false; }
返回方法,更新资源,org.apache.rocketmq.client.impl.factory.MQClientInstance#uploadFilterClassSource
private void uploadFilterClassSource() {// 遍历消费者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> next = it.next();MQConsumerInner consumer = next.getValue();// 若是消费者消息模式是pushif (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {// 获取消费者订阅信息Set<SubscriptionData> subscriptions = consumer.subscriptions();for (SubscriptionData sub : subscriptions) {if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {final String consumerGroup = consumer.groupName();final String className = sub.getSubString();final String topic = sub.getTopic();final String filterClassSource = sub.getFilterClassSource();try {// 更新过滤类去全部过滤的server=》this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);} catch (Exception e) {log.error("uploadFilterClassToAllFilterServer Exception", e);}}}}} }
进入方法,更新过滤类去全部过滤的server,org.apache.rocketmq.client.impl.factory.MQClientInstance#uploadFilterClassToAllFilterServer
private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,final String topic,final String filterClassSource) throws UnsupportedEncodingException {byte[] classBody = null;int classCRC = 0;try {classBody = filterClassSource.getBytes(MixAll.DEFAULT_CHARSET);// 压缩classCRC = UtilAll.crc32(classBody);} catch (Exception e1) {log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}",fullClassName,RemotingHelper.exceptionSimpleDesc(e1));}// 获取topic的路由信息TopicRouteData topicRouteData = this.topicRouteTable.get(topic);if (topicRouteData != null&& topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {// 遍历的过滤server地址Iterator<Entry<String, List<String>>> it = topicRouteData.getFilterServerTable().entrySet().iterator();while (it.hasNext()) {Entry<String, List<String>> next = it.next();List<String> value = next.getValue();for (final String fsAddr : value) {try {// 注册过滤类的消息=》this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody,5000);log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}", fsAddr, consumerGroup,topic, fullClassName);} catch (Exception e) {log.error("uploadFilterClassToAllFilterServer Exception", e);}}}} else {log.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}",consumerGroup, topic, fullClassName);} }
进入方法,注册过滤类的消息,org.apache.rocketmq.client.impl.MQClientAPIImpl#registerMessageFilterClass
public void registerMessageFilterClass(final String addr,final String consumerGroup,final String topic,final String className,final int classCRC,final byte[] classBody,final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,InterruptedException, MQBrokerException {RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader();requestHeader.setConsumerGroup(consumerGroup);requestHeader.setClassName(className);requestHeader.setTopic(topic);requestHeader.setClassCRC(classCRC);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_MESSAGE_FILTER_CLASS, requestHeader);request.setBody(classBody);RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark()); }
进入方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync介绍过了。
进入方法,org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)发送消息
@Overridepublic SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// =》return this.defaultMQProducerImpl.send(msg); }
进入方法,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message)
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 发送消息3s超时=》return send(msg, this.defaultMQProducer.getSendMsgTimeout()); }
进入方法,发送消息3s超时,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message, long)
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 同步发送消息=》return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }
进入方法,同步发送消息,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 确认客户端服务是否正常this.makeSureStateOK();// 检查消息是否合法,failfast原则Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 找到topic的发布信息=》TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;// 同步重试3次,异步1次int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();// 按brokerName选择一个消息队列=》MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}// 发送=》sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {// 发送失败重试另外一个brokerif (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;switch (e.getResponseCode()) {case ResponseCode.TOPIC_NOT_EXIST:case ResponseCode.SERVICE_NOT_AVAILABLE:case ResponseCode.SYSTEM_ERROR:case ResponseCode.NO_PERMISSION:case ResponseCode.NO_BUYER_ID:case ResponseCode.NOT_IN_CURRENT_UNIT:continue;default:if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}// 查询namesrv地址列表List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();if (null == nsList || nsList.isEmpty()) {throw new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);}throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
进入方法,找到topic的发布信息,org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());// 从namesrv更新topic路由信息=》this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {// 从namesrv更新topic路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;} }
进入方法,从namesrv更新topic路由信息,org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {try {if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {TopicRouteData topicRouteData;if (isDefault && defaultMQProducer != null) {// 获取默认的topic路由信息 =》topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData != null) {// 获取队列信息for (QueueData data : topicRouteData.getQueueDatas()) {// 读写队列最大数量4int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {// 获取topic路由信息=》topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);}if (topicRouteData != null) {TopicRouteData old = this.topicRouteTable.get(topic);// 判断topic路由是否改变=》boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) {// 须要更新路由信息changed = this.isNeedUpdateTopicRouteInfo(topic);} else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) {TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {// 更新broker的地址列表this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{// topic路由信息转换成topic发布信息=》TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);// 遍历生产者信息Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {// 更新topic发布信息=》impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{// 获取消息队列订阅信息Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);// 遍历消费者Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {// 更新topic的订阅信息=》impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);}} catch (Exception e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}} finally {this.lockNamesrv.unlock();}} else {log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);}} catch (InterruptedException e) {log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false; }
接下篇。
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群