说在前面apache
client管理 心跳检测json
源码解析缓存
进入这个方法org.apache.rocketmq.broker.processor.ClientManageProcessor#processRequest client管理请求微信
@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {// 心跳监测=》case RequestCode.HEART_BEAT:return this.heartBeat(ctx, request);// 取消注册client=》case RequestCode.UNREGISTER_CLIENT:return this.unregisterClient(ctx, request);// 检查client的配置=》case RequestCode.CHECK_CLIENT_CONFIG:return this.checkClientConfig(ctx, request);default:break;}return null; }
进入这个方法,org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat 心跳检测多线程
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {RemotingCommand response = RemotingCommand.createResponseCommand(null);HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),heartbeatData.getClientID(),request.getLanguage(),request.getVersion());for (ConsumerData data : heartbeatData.getConsumerDataSet()) {// 获取订阅组的配置信息=》SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());boolean isNotifyConsumerIdsChangedEnable = true;if (null != subscriptionGroupConfig) {isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();int topicSysFlag = 0;if (data.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}// 获取重试的topic名称 = %RETRY% +消费组名称String newTopic = MixAll.getRetryTopic(data.getGroupName());// 发送消息返回后建立topic=》this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);}// 注册消费者=》boolean changed = this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(),clientChannelInfo,data.getConsumeType(),data.getMessageModel(),data.getConsumeFromWhere(),data.getSubscriptionDataSet(),isNotifyConsumerIdsChangedEnable);// 若是消费者已经存在是更新if (changed) {log.info("registerConsumer info changed {} {}",data.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));} }
进入这个方法,获取订阅组的配置信息,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager#findSubscriptionGroupConfig并发
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {// 从缓存中获取组的订阅信息SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {// 自动建立消费组或者是系统自用的消费组if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}// 更新数据的版本号this.dataVersion.nextVersion();// 持久化=》this.persist();}}return subscriptionGroupConfig; }
进入这个方法,org.apache.rocketmq.common.ConfigManager#persist,持久化app
public synchronized void persist() {// 持久化的是json存储,序列化的时候按版本号维护的数据 =》String jsonString = this.encode(true);if (jsonString != null) {// user.home/store/config/topics.json 文件存储 =》String fileName = this.configFilePath();try {// 保存文件 =》MixAll.string2File(jsonString, fileName);} catch (IOException e) {log.error("persist file " + fileName + " exception", e);}} }
进入这个方法,保存文件,org.apache.rocketmq.common.MixAll#string2Filesocket
public static void string2File(final String str, final String fileName) throws IOException {// 要保存的内容存储在临时文件中String tmpFile = fileName + ".tmp";string2FileNotSafe(str, tmpFile);// 把原来的数据进行备份String bakFile = fileName + ".bak";String prevContent = file2String(fileName);if (prevContent != null) {string2FileNotSafe(prevContent, bakFile);}// 删掉源文件File file = new File(fileName);file.delete();// 临时文件重命名file = new File(tmpFile);file.renameTo(new File(fileName)); }
进入这个方法,发送消息返回后建立重试topic,org.apache.rocketmq.broker.topic.TopicConfigManager#createTopicInSendMessageBackMethodide
public TopicConfig createTopicInSendMessageBackMethod(final String topic,final int clientDefaultTopicQueueNums,final int perm,final int topicSysFlag) {// 获取topic配置信息TopicConfig topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;boolean createNew = false;try {if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {topicConfig = this.topicConfigTable.get(topic);if (topicConfig != null)return topicConfig;topicConfig = new TopicConfig(topic);topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);topicConfig.setPerm(perm);topicConfig.setTopicSysFlag(topicSysFlag);log.info("create new topic {}", topicConfig);// 存储重试的tpic配置信息this.topicConfigTable.put(topic, topicConfig);createNew = true;// 修饰数据的版本号this.dataVersion.nextVersion();// 持久化=》this.persist();} finally {this.lockTopicConfigTable.unlock();}}} catch (InterruptedException e) {log.error("createTopicInSendMessageBackMethod exception", e);}// 若是topic配置信息是从新建立的,注册到broker集群中=》if (createNew) {this.brokerController.registerBrokerAll(false, true,true);}return topicConfig; }
进入这个方法,若是topic配置信息是从新建立的,注册到broker集群中,org.apache.rocketmq.broker.BrokerController#registerBrokerAllui
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {// 包装topic配置信息TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();// 若是没有读写权限if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}// 判断broker集群是否须要注册=》if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills())) {// 向全部的broker注册topic配置信息 =》doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);} }
进入这个方法,判断broker集群是否须要注册,org.apache.rocketmq.broker.BrokerController#needRegister
private boolean needRegister(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final int timeoutMills) {TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();// 判断是否须要注册 =》List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);boolean needRegister = false;for (Boolean changed : changeList) {// 只要一个namesrv须要更新就所有须要更新if (changed) {needRegister = true;break;}}return needRegister; }
进入这个方法,判断是否须要注册,org.apache.rocketmq.broker.out.BrokerOuterAPI#needRegister
public List<Boolean> needRegister(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final TopicConfigSerializeWrapper topicConfigWrapper,final int timeoutMills) {// 多线程更新是否须要变化状态的集合final List<Boolean> changedList = new CopyOnWriteArrayList<>();// 获取namesrv地址List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());// 多线程分发执行,一个namesrv一个线程for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);request.setBody(topicConfigWrapper.getDataVersion().encode());// broker向namesrv同步查询数据版本 =》RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);DataVersion nameServerDataVersion = null;Boolean changed = false;switch (response.getCode()) {case ResponseCode.SUCCESS: {QueryDataVersionResponseHeader queryDataVersionResponseHeader =(QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);// 根据版本号判断数据有没有发生变化changed = queryDataVersionResponseHeader.getChanged();byte[] body = response.getBody();if (body != null) {nameServerDataVersion = DataVersion.decode(body, DataVersion.class);if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {// 数据版本不一致须要更新changed = true;}}if (changed == null || changed) {changedList.add(Boolean.TRUE);}}default:break;}log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);} catch (Exception e) {changedList.add(Boolean.TRUE);log.error("Query data version from name server {} Exception, {}", namesrvAddr, e);} finally {// 异常状况下解除线程阻塞countDownLatch.countDown();}}});}try {// 等到超时解除线程阻塞countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {log.error("query dataversion from nameserver countDownLatch await Exception", e);}}return changedList; }
网上返回到这个方法,向全部的broker注册topic配置信息,org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {// 向全部的broker进行注册=》List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.getHAServerAddr(),topicConfigWrapper,// 过滤的服务this.filterServerManager.buildNewFilterServerList(),// 单途oneway,this.brokerConfig.getRegisterBrokerTimeoutMills(),this.brokerConfig.isCompressedRegister());if (registerBrokerResultList.size() > 0) {RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);if (registerBrokerResult != null) {if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {// 更新master地址本地缓存this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());}// 同步设置slave的master地址this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());if (checkOrderConfig) {// 更新订阅的topic配置 =》this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());}}} }
进入这个方法,向全部的broker进行注册,org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();// 获取namesrv地址集合List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);requestBody.setFilterServerList(filterServerList);// 对请求体进行编码=》final byte[] body = requestBody.encode(compressed);// 压缩处理final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());// 并发向namesrv集群注册brokerfor (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {// 注册broker服务任务分发=》RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker to name server {} OK", namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList; }
进入这个方法, 注册broker服务任务分发,org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker
private RegisterBrokerResult registerBroker(final String namesrvAddr,final boolean oneway,final int timeoutMills,final RegisterBrokerRequestHeader requestHeader,final byte[] body) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,InterruptedException {RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);request.setBody(body);if (oneway) {try {// 单线请求,不关心结果 =》this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);} catch (RemotingTooMuchRequestException e) {// Ignore}return null;}// broker同步向namesrv注册broker=》RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {RegisterBrokerResponseHeader responseHeader =(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);RegisterBrokerResult result = new RegisterBrokerResult();result.setMasterAddr(responseHeader.getMasterAddr());result.setHaServerAddr(responseHeader.getHaServerAddr());if (response.getBody() != null) {result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));}return result;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark()); }
进入这个方法,单线请求,不关心结果,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway
@Overridepublic void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {// 获取channel=》final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {try {if (this.rpcHook != null) {// 执行请求执行前的钩子方法this.rpcHook.doBeforeRequest(addr, request);}// 执行单线请求 =》this.invokeOnewayImpl(channel, request, timeoutMillis);} catch (RemotingSendRequestException e) {log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);// 异常关闭channel=》this.closeChannel(addr, channel);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);} }
进入这个方法, 执行单线请求,org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeOnewayImpl
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {request.markOnewayRPC();// 获取信号量的信号,这里用semaphore作了限流boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);try {channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {once.release();if (!f.isSuccess()) {log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");}}});} catch (Exception e) {once.release();log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {if (timeoutMillis <= 0) {throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");} else {String info = String.format("invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",timeoutMillis,this.semaphoreOneway.getQueueLength(),this.semaphoreOneway.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}} }
网上返回到这个方法,broker同步向namesrv注册broker,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync
@Overridepublic RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {long beginStartTime = System.currentTimeMillis();// 获取并建立channel =》final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {try {if (this.rpcHook != null) {// 执行请求前钩子方法this.rpcHook.doBeforeRequest(addr, request);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {throw new RemotingTimeoutException("invokeSync call timeout");}// 执行同步请求 =》RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);if (this.rpcHook != null) {// 执行响应后钩子方法 =》this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);}return response;} catch (RemotingSendRequestException e) {log.warn("invokeSync: send request exception, so close the channel[{}]", addr);// 出现异常,channel关闭 =》this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {if (nettyClientConfig.isClientCloseSocketIfTimeout()) {this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);} }
接下篇。
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群