最近一直再作一些系统上的压测,并对一些问题作了优化,从这些里面收获了一些不少好的优化经验,后续的文章都会以这方面为主。缓存
此次打压的过程当中收获比较的大的是,对RocketMq的一些优化。最开始咱们公司使用的是RabbitMq,再一些流量高峰的场景下,发现队列堆积比较严重,致使RabbitMq挂了。为了应对这个场景,最终咱们引入了阿里云的RocketMq,RocketMq能够处理能够处理不少消息堆积,而且服务的稳定不挂也能够由阿里云保证。引入了RocketMq了以后,的确解决了队列堆积致使消息队列宕机的问题。安全
原本觉得使用了RocketMq以后,能够万事无忧,可是其实在打压过程当中发现了很多问题,这里先提几个问题,你们带着这几个问题在文中去寻找答案:并发
再RocketMq中提供了多种消息类型让咱们进行配置:app
虽然配置种类比较繁多,可是使用得仍是普通消息和分区顺序消息。后续主要讲得也是这两种消息。异步
普通消息的发送的代码比较简单,以下所示:ide
public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("test_group_producer"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); Message msg = new Message("Test_Topic", "test_tag", ("Hello World").getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); producer.shutdown(); }
其内部核心代码为:高并发
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 1. 根据 topic找到publishInfo TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 若是是同步 就三次 不然就1次 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; // 循环 for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // 更新延迟 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } } else { break; } } // 省略 }
主要流程以下:学习
Step 1: 根据Topic 获取TopicPublishInfo,TopicPublishInfo中有咱们的Topic发布消息的信息(),这个数据先从本地获取若是本地没有,则从NameServer去拉取,而且定时每隔20s会去获取TopicPublishInfo。fetch
Step 2: 获取总共执行次数(用于重试),若是发送方式是同步,那么总共次数会有3次,其余状况只会有1次。优化
Step 3: 从MessageQueue中选取一个进行发送,MessageQueue的概念能够等同于Kafka的partion分区,看做发送消息的最小粒度。这个选择有两种方式:
Step 4: 将Message发送至选择出来的MessageQueue上的Broker。
Step 5: 更新Broker的延迟。
Step 6: 根据不一样的发送方式来处理结果:
能够看见Rocketmq发送普通消息的流程比较清晰简单,下面来看看顺序消息。
顺序消息分为分区顺序消息和全局顺序消息,全局顺序消息比较容易理解,也就是哪条消息先进入,哪条消息就会先被消费,符合咱们的FIFO,不少时候全局消息的实现代价很大,因此就出现了分区顺序消息。分区顺序消息的概念能够以下图所示:
咱们经过对消息的key,进行hash,相同hash的消息会被分配到同一个分区里面,固然若是要作全局顺序消息,咱们的分区只须要一个便可,因此全局顺序消息的代价是比较大的。
对RocketMq熟悉的小伙伴会发现,它其实并无提供顺序消息发送相关的API,可是在阿里云的RocketMq版本提供了顺序消息的API,原理比较简单,其实也是对现有API的一个封装:
SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object shardingKey) { int select = Math.abs(shardingKey.hashCode()); if (select < 0) { select = 0; } return mqs.get(select % mqs.size()); } }, shardingKey);
能够看见顺序消息将MessageQueue的选择交由咱们发送方去作,因此咱们直接利用咱们shardingKey的hashCode进行发送分区。
普通消息使用比较简单,以下面代码所示:
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Test_Consumer"); consumer.subscribe("TopicTest", "*"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.setConsumeThreadMin(10); consumer.setConsumeThreadMax(10); consumer.start(); System.out.printf("Consumer Started.%n"); }
启动Consumer以后,咱们就开始真正的从Broker去进行消费了,可是咱们如何从Broker去消费的呢?首先在咱们的第一步里面咱们订阅了一个Topic,咱们就会定时去刷新Topic的相关信息好比MessageQueue的变动,而后将对应的MessageQueue分配给当前Consumer:
// 这个数据 是10s更新一次 从内存中获取 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); // 这个数据实时去拉取 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { //经过默认的分配策略进行分配 allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error( "AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
这里首先向Broker拿到当前消费全部的ConsumerId默认是对应机器的Ip+实例名字,Broker中的ConsumerId信息是Consumer经过心跳定时进行上报得来的,而后根据消费分配策略将消息分配给Consumer,这里默认是平均分配,将咱们分配到的消息队列,记录在 processQueueTable中,若是出现了新增,那么咱们须要建立一个PullRequst表明这拉取消息的请求,异步去处理:
List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); // 这里就是获取咱们第一次应该拿什么offset long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } this.dispatchPullRequest(pullRequestList);
在PullService中会不断的从PullRequestQueue拿取数据,而后进行拉取数据。
while (!this.isStopped()) { try { // rebalance 以后第一次向这个队列放数据 后续消费的时候会继续放 PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } }
拉取数据以后,这里会给PullCallBack进行响应:
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); }
若是这里成功拉取到消息的话,咱们首先将拉取的消息存入到咱们的ProcessQueue中,ProcessQueue用于咱们消费者处理的状态以及待处理的消息,而后提交到咱们的Consumer线程池中进行真正的业务逻辑消费,而后再提交一个PullRequest用于咱们下次消费。
你们看到这里有没有发现这个模式和咱们的netty中的单线程accpet,多个线程来处理业务逻辑很类似,其原理都是同样,由一个线程不断的去拉取,而后由咱们业务上定义的线程池进行处理。以下图所示:
咱们发现咱们拉取消息实际上是一个循环的过程,这里就来到了第一个问题,若是消息队列消费的速度跟不上消息发送的速度,那么就会出现消息堆积,不少同窗根据过程来看可能会觉得,咱们的拉取消息一直在进行,因为咱们的消费速度比较慢,会有不少message以队列的形式存在于咱们的内存中,那么会致使咱们的JVM出现OOM也就是内存溢出。
那么到底会不会出现OOM呢?实际上是不会的,RocketMq对安全性方面作得很好,有下面两段代码:
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { System.out.println(cachedMessageCount + ":"+pullRequest); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); return; } if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); return; }
首先是会判断当前内存缓存的Message数量是否大于限制的值默认是1000,若是大于则延迟一段时间再次提交pullRequest。 而后判断当前内存缓存的Size大小是否大于了某个值,默认是100M,若是大于也会延迟一段时间再次提交pullRequest。 因此在咱们consumer上若是出现消息堆积,基本也没有什么影响。
那咱们想一想第二个问题应该怎么解决呢?再普通消息的场景下,如何提高消费速度?
在rocketmq中对消息的消费结果处理也比较重要,这里仍是先提三个问题:
首先咱们来看第一个问题,怎么处理消费结果,在processResult中有以下代码:
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); switch (status) { case CONSUME_SUCCESS: int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
在上面的第四步中,若是不深刻进去看内部逻辑,这里会误觉得,他会将当前消息的offset给更新到最新的消费进度,那问题三中说的中间的offset是有可能被丢失的,但其实是不会发生的,具体的逻辑保证在removeMessage中:
public long removeMessage(final List<MessageExt> msgs) { long result = -1; final long now = System.currentTimeMillis(); try { this.lockTreeMap.writeLock().lockInterruptibly(); this.lastConsumeTimestamp = now; try { if (!msgTreeMap.isEmpty()) { result = this.queueOffsetMax + 1; int removedCnt = 0; for (MessageExt msg : msgs) { MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); if (prev != null) { removedCnt--; msgSize.addAndGet(0 - msg.getBody().length); } } msgCount.addAndGet(removedCnt); if (!msgTreeMap.isEmpty()) { result = msgTreeMap.firstKey(); } } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (Throwable t) { log.error("removeMessage exception", t); } return result; }
在removeMessage中经过msgTreeMap去作了一个保证,msgTreeMap是一个TreeMap,根据offset升序排序,若是treeMap中有值的话,他返回的offset就会是当前msgTreeMap中的firstKey,而不是当前的offset,从而就解决了问题三。
上面的过程总结为下图所示:
顺序消息的消费前面过程和普通消息基本同样,这里咱们须要关注的是将消息丢给咱们消费线程池以后的逻辑:
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { // 省略 List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize); status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); // 省略 }
能够发现这里比普通消息多了一个步骤,那就是加锁,这里会获取到以messageQueue为纬度去加锁,而后去咱们的processQueue中获取到咱们的Message, 这里也是用的咱们的msgTreeMap, 获取的最小offset的Message。
因此咱们以前的线程池提升并发速度的策略在这里没有用了,那么应该怎么办呢?既然咱们加锁是以messageQueue为纬度,那么增长MessageQueue就行了,因此这里的提高消费速度恰好和普通消息相反,再普通消息中提高Messagequeue可能效果并无那么大,可是在顺序消息的消费中提高就很大了。
咱们在压测的时候,发现顺序消息消费很慢,消息堆积很严重,通过调试发现阿里云上的rocketmq默认读写队列为16,咱们consumer机器有10台,每一个consumer线程池大小为10,理论并发应该有100,可是因为顺序消息的缘由致使实际并发只有16,最后找阿里的技术人员将读写队列扩至100,这样充分利用咱们的资源,极大的增长了顺序消息消费的速度,消息基本不会再堆积。
顺序消息的结果处理和普通消息的处理流程,稍有不一样,代码以下:
public boolean processConsumeResult( final List<MessageExt> msgs, final ConsumeOrderlyStatus status, final ConsumeOrderlyContext context, final ConsumeRequest consumeRequest ) { boolean continueConsume = true; long commitOffset = -1L; if (context.isAutoCommit()) { switch (status) { case SUCCESS: commitOffset = consumeRequest.getProcessQueue().commit(); this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } else { commitOffset = consumeRequest.getProcessQueue().commit(); } break; default: break; } } else { switch (status) { case SUCCESS: this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); break; case SUSPEND_CURRENT_QUEUE_A_MOMENT: this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); this.submitConsumeRequestLater( consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } break; default: break; } } if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); } return continueConsume; }
这里回到咱们的第三个问题,如何设置消息消费的重试次数呢?因为咱们直接使用的阿里云的mq,因此咱们又包装了一层,方便接入。再接入层中咱们最开始统一配置了最大重试2000次,这里设置2000次的缘由主要是想让咱们的消息队列尽可能无限重试,由于咱们默认消息基本最终会成功,可是为了以防万一,因此这里设置了一个较大的数值2000次。设置2000次对于咱们的普通消息,基本没什么影响,由于他会从新投递至broker,可是咱们的顺序消息是不行的,若是顺序消息设置重试2000次,当遇到了这种不可能成功的消息的时候就会致使消息一直在本地进行重试,而且因为对队列加锁了,因此当前MessageQueue将会一直被阻塞,致使后续消息不会被消费,若是设置2000次那么至少会阻塞半个小时以上。因此这里应该将顺序消息设置一个较小的值,目前咱们设置为16。
以前没怎么看过Rocketmq的源码,通过此次打压,从Rocketmq中学习到了不少精妙优秀的设计,将一些经验提炼成了文中的一些问题,但愿你们能仔细阅读,找到答案。
若是你们以为这篇文章对你有帮助,你的关注和转发是对我最大的支持,O(∩_∩)O:
å