此次源码学习的方法是带着问题学习源码实现,问题列表以下web
Consumer Group的概念是什么?express
Consumer pull过程是怎样的?服务器
Consumer 支持push吗?网络
Consumer 怎么实现单队列并行消费?app
Consumer 怎么过滤消息?负载均衡
Consumer 怎么保证一条消息只被Group中的一个服务消费?异步
Consumer 负载均衡怎么实现?分布式
Consumer 消费失败怎么办?ide
Consumer 能够回溯消费吗?源码分析
消息消费者,负责消费消息,通常是后台系统负责异步消费。
一类 Consumer 的集合名称,这类 Consumer 一般消费一类消息,且消费逻辑一致。通常状况下group中Consumer的数量不能超过订阅的topic中queue的数量,否则会有闲置的Consumer.
分析过Producer,看Consumer有种似曾相识的感受
主要逻辑
1. 根据mq信息去找broker路由信息 2. 根据相关参数构建请求头 3. 委托netty去broker获取消息
代码走读
MQPullConsumer.pull的参数需指定MessageQueue,和offset(位置偏移)的.
PullResult pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
再看Pull操做的返回,有本次获取的数据信息MessageExt,即位置信息offset
public class PullResult { //pull状态 private final PullStatus pullStatus; //下次pull的偏移量 private final long nextBeginOffset; //最小偏移量 private final long minOffset; //最大偏移量 private final long maxOffset; //获取到的消息 private List<MessageExt> msgFoundList; }
MQPullConsumer.pull
-> DefaultMQPullConsumer.pull
-> DefaultMQPullConsumerImpl.pull
-> DefaultMQPullConsumerImpl.pullSyncImpl
-> DefaultMQPullConsumerImpl.pullKernelImpl
public PullResult pullKernelImpl( final MessageQueue mq, final String subExpression, final String expressionType, final long subVersion, final long offset, final int maxNums, final int sysFlag, final long commitOffset, final long brokerSuspendMaxTimeMillis, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { //获取broker信息 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false); } if (findBrokerResult != null) { { // check version if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) { throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null); } } int sysFlagInner = sysFlag; if (findBrokerResult.isSlave()) { sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner); } // 构建pull请求头 PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); } //委托Netty去获取信息 PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); return pullResult; } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
pull消息比较简单,一次请求返回,由Consumer管理offset.通常来讲一个Consumer Group中Consumer的数量不能大于MessageQueue的数量.
Push Consumer
Consumer 的一种,应用一般向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立
刻回调 Listener 接口方法。JMS标准中为MessageListener类的onMessage方法.
Pull Consumer
Consumer 的一种,应用一般主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。
RocketMQ的Consumer都是从Broker pull消息来消费,可是为了能作到实时收消息,RocketMQ 使用长轮询方式,能够保证消息实时性同Push方式一致。这种长轮询方式相似于WebQQ收发消息机制。请参考如下信息了解更多Comet:基于 HTTP 长链接的“服务器推”技术
虽然RocketMQ的consumer都是经过pull来实现的可是其封装了push接口,咱们先来看其使用方法
public static void main(String[] args) throws InterruptedException, MQClientException { /** * 一个应用建立一个Consumer,由应用来维护此对象,能够设置为全局对象或者单例 * 注意:ConsumerGroupName须要由应用来保证惟一 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testmerchantLeagueConsumerGroup"); consumer.setNamesrvAddr("ip:port"); /** * 订阅指定topic下tags分别等于TagA或TagB */ consumer.subscribe("broker-a", "TagB || TagA"); /** * 设置Consumer第一次启动是从队列头部开始消费仍是队列尾部开始消费 * 若是非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //真正的处理消息逻辑在这里 consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 默认msgs里只有一条消息,能够经过设置consumeMessageBatchMaxSize参数来批量接收消息 */ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("broker-a")) { // 执行TopicTest1的消费逻辑 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 执行TagA的消费 String message = new String(msg.getBody()); System.out.println(message); } else if (msg.getTags() != null && msg.getTags().equals("TagB")) { // 执行TagB的消费 String message = new String(msg.getBody()); System.out.println(message); } } //消费者向mq服务器返回消费成功的消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Consumer对象在使用以前必需要调用start初始化,初始化一次便可 consumer.start(); }
RocketMQ push的实现 :
消息的拉取逻辑
维护一个pullRequestQueue,先放入一个pullRequest,当pullResult为成功时,再构建新的pullRequest放入pullRequestQueue,另起一个线程监测pullRequestQueue,当起不为空时,轮询pull消息
DefaultMQPushConsumer.start
-> DefaultMQPushConsumerImpl.start
-> MQClientInstance.start
-> PullMessageService.start
咱们来看PullMessageService的run方法,
//请求消息阻塞链表 private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>(); @Override public void run() { log.info(this.getServiceName() + " service started"); //只要有请求就去pull消息 while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); if (pullRequest != null) { this.pullMessage(pullRequest); } } catch (InterruptedException e) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end"); }
pullRequestQueue在在哪里put呢?
在class里找到在executePullRequestLater方法内会put
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { PullMessageService.this.executePullRequestImmediately(pullRequest); } }, timeDelay, TimeUnit.MILLISECONDS); } public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
查看此方法的调用关系,发如今run中的pullMessage方法中onSuccess回调中会构建下一次的pullRequestQueue待下次请求
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); //请求成功就构建新的pullRequest pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// pullResult.getMsgFoundList(), // processQueue, // pullRequest.getMessageQueue(), // dispathToConsume); //放到pullRequestQueue if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset// || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", // pullResult.getNextBeginOffset(), // firstMsgOffset, // prevRequestOffset); } break; case NO_NEW_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", // pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } }
至此获取消息已经搞定,再看怎么触发MessageListener的消费方法.
仍是在DefaultMQPushConsumerImpl.pullMessage方法内的回调,有下列代码,把消息提供给consumeMessageService处理.
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// pullResult.getMsgFoundList(), // processQueue, // pullRequest.getMessageQueue(), // dispathToConsume);
构建ConsumeRequest,而后提交至线程池消费
@Override public void submitConsumeRequest(// final List<MessageExt> msgs, // final ProcessQueue processQueue, // final MessageQueue messageQueue, // final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } }
终于在ConsumeRequest的run方法中找到了listner的consumeMessage
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
这下整个pull逻辑就完成了.
上节代码就是取得并行的例子,简单来讲就是把消息提交给线程池,而不阻塞,就单队列并行消费了
入口仍是在DefaultMQPushConsumerImpl.pullMessage
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
往里面看,发现有过滤消息的逻辑
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { PullResultExt pullResultExt = (PullResultExt) pullResult; this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); if (PullStatus.FOUND == pullResult.getPullStatus()) { ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); List<MessageExt> msgListFilterAgain = msgList; if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList<MessageExt>(msgList.size()); for (MessageExt msg : msgList) { if (msg.getTags() != null) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } } //消息过滤 if (this.hasHook()) { FilterMessageContext filterMessageContext = new FilterMessageContext(); filterMessageContext.setUnitMode(unitMode); filterMessageContext.setMsgList(msgListFilterAgain); this.executeHook(filterMessageContext); } for (MessageExt msg : msgListFilterAgain) { MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET, Long.toString(pullResult.getMinOffset())); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET, Long.toString(pullResult.getMaxOffset())); } pullResultExt.setMsgFoundList(msgListFilterAgain); } pullResultExt.setMessageBinary(null); return pullResult; }
由于topic的MessageQueue只能对应Group中的一个Consumer,因此一条消息只被Group中的一个服务消费
概念:
consumer同时消费多个MessageQueue,当topic中的MessageQueue变动时,动态调整消费MessageQueue的数量
//RebalanceImpl public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }
咱们只关心集群模式
主要逻辑:
1. 获取topic全部MessageQueue 2. 获取同ConsumerGroup组全部Consumer信息 3. 根据制定策略分配给此Consumer
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", // consumerGroup, // topic, // mqSet, // mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } case CLUSTERING: { //获取该topic全部MessageQueue Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //获取同consumerGroup信息 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { //根据分配策略分配MessageQueue给当前Consumer allocateResult = strategy.allocate(// this.consumerGroup, // this.mQClientFactory.getClientId(), // mqAll, // cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } }
RocketMQ提供了几种策略供使用
实现类 | 策略名 |
---|---|
AllocateMessageQueueAveragelyByCircle | 轮询平均分配策略 |
AllocateMessageQueueByMachineRoom | 根据机房分配策略 |
AllocateMessageQueueConsistentHash | 一致Hash分配策略 |
本节编写参考分布式消息队列RocketMQ源码分析之4 -- Consumer负载均衡与Kafka的Consumer负载均衡之不一样点
Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败一般能够认为 有如下几种状况
因为消息自己的缘由,例如反序列化失败,消息数据自己没法处理(例如话费充值,当前消息的手机号被注销,没法充值)等。这种错误一般须要跳过这条消息,再消费其余消息,而这条失败的消息即便马上重试消费,99%也不成功,因此最好提供一种定时重试机制,即过 10s 秒后再重试。
因为依赖的下游应用服务不可用,例如db链接不可用,外系统网络不可达等。遇到这种错误,即便跳过当前失败的消息,消费其余消息一样也会报错。这种状况建议应用 sleep 30s,再 消费下一条消息,这样能够减轻 Broker 重试消息的压力。
具体到代码实现,会根据消费状态进行处理,当无返回时会重试.
if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); //设置状态为重试 status = ConsumeConcurrentlyStatus.RECONSUME_LATER; }
public void processConsumeResult(// final ConsumeConcurrentlyStatus status, // final ConsumeConcurrentlyContext context, // final ConsumeRequest consumeRequest// ) { int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); //请求重试消费 this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
回溯消费是指Consumer已经消费成功的消息,因为业务上需求须要从新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然须要保留。而且从新消费通常是按照时间维度,例如因为Consumer系统故障,恢复后须要从新消费 1 小时前的数据,那么Broker要提供一种机制,能够按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,能够向前回溯,也能够向后回溯。
逻辑: 请求broker按参数返回offset,按照offset重置消费offset,从而实现回溯消费
public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group, final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC) throws RemotingException, MQClientException, InterruptedException { ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); requestHeader.setTimestamp(timestamp); requestHeader.setForce(isForce); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader); if (isC) { request.setLanguage(LanguageCode.CPP); } RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { if (response.getBody() != null) { ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class); return body.getOffsetTable(); } } default: break; } throw new MQClientException(response.getCode(), response.getRemark()); }