摘要:在RocketMQ中,消息消费都是基于Pull消息方式,那么Push模式中又是如何实现Consumer端准实时消费的呢?
在上一篇—“消息中间件—RocketMQ消息消费(一)”中,已经简要地介绍了下RocketMQ中“Pull和Push两种消费方式的简要流程”以及“Push消费方式的启动流程”(ps:若是不熟悉这几块内容的童鞋,能够本身回顾下上一篇的内容)。本文将详细介绍RocketMQ中Push消费方式下的“Pull消息的长轮询机制”和“Consumer端的负载均衡机制”这两块关键核心内容。
因为RocketMQ系列的技术分享存在必定的连续性,所以但愿读者能回顾下往期RocketMQ分享的篇幅:
(1)消息中间件—RocketMQ的RPC通讯(一)
(2)消息中间件—RocketMQ的RPC通讯(二)
(3)消息中间件—RocketMQ消息发送
(4)消息中间件—RocketMQ消息消费(一)算法
在上一篇中,已经简略地介绍过RocketMQ中消息消费时Pull消息的长轮询机制了,其主要的思路是:Consumer若是第一次尝试Pull消息失败(好比:Broker端没有能够消费的消息),并不当即给消费者客户端返回Response的响应,而是先hold住而且挂起请求。而后在Broker端,经过后台独立线程—PullRequestHoldService重复尝试执行Pull消息请求来取消息。同时,另一个ReputMessageService线程不断地构建ConsumeQueue/IndexFile数据,并取出hold住的Pull请求进行二次处理。经过这种长轮询机制,便可解决Consumer端须要经过不断地发送无效的轮询Pull请求,而致使整个RocketMQ集群中Broker端负载很高的问题。数组
在RocketMQ的Consumer端,后台独立线程服务—pullMessageService是Pull消息请求的发起者,它不断地尝试从阻塞队列—LinkedBlockingQueue<PullRequest>中获取元素PullRequest,并根据pullRequest中的参数以及订阅关系信息调用pullAPIWrapper的pullKernelImpl()方法发送封装后的Pull消息请求—PullMessageRequestHeader至Broker端来拉取消息(具体完成发送一次Pull消息的PRC通讯请求的是MQClientAPIImpl中的pullMessage()方法)。这里涉及细节的时序图(ps:时序图中没有涉及PRC异步通讯中的callback过程)以下:缓存
Consumer向Broker端发送长轮询请求的时序图.jpg网络
其中, DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法是发送Pull消息请求的关键:
(1)校验ProcessQueue是否“drop”, 若是为“drop”为true则直接返回(这个“drop”的设置在下面一节—“Consumer端的负载均衡机制”中会提到);
(2)给ProcessQueue设置Pull消息的时间戳;
(3)作流量控制,对于知足下面条件的任何一种状况,稍后再发起Pull消息的请求;
条件1:正在消费的队列中,未被消费的消息数和消息大小超过阀值(默认每一个队列消息数为1000个/消息存储容量100MB);
条件2:若是是顺序消费,正在消费的队列中,消息的跨度超过阀值(默认2000);
(4)根据topic获取订阅关系—SubscriptionData;
(5)构建Pull消息的回调对象—PullBack,这里从Broker端Pull消息的返回结果处理是经过异步回调(发送异步通讯RPC请求),其中若是Broker端返回Pull消息成功,在回调方法中先填充至处理队列—processQueue中(将Pull下来的消息,设置到ProcessQueue的msgTreeMap容器中),而后经过消费消息的服务线程—consumeMessageService,将封装好的ConsumeRequest提交至消费端消费线程池—consumeExecutor异步执行处理(具体处理逻辑:经过业务应用系统在DefaultMQPushConsumer实例中注册的消息监听器完成业务端的消息消费);
(6)从Consumer端内存中获取commitOffsetValue;
(7)经过RocketMQ的Remoting通讯层向Broker端发送Pull消息的RPC请求;app
这里先来讲下对于通常状况下(即为所要Pull的消息在RocketMQ的Broker端已是存在,通常能够Pull到的状况),Broker端处理这个Pull消息请求的主要过程。其时序图(ps:图中只是画了大部分的流程,详细细节还须要对照源码看下)以下:负载均衡
Broker端接受长轮询请求的处理时序图.jpg异步
从上面的简易时序图中能够看到Broker端Pull消息的主要关键点以下:
(1)Pull消息的业务处理器—PullMessageProcessor的processRequest为处理拉取消息请求的入口,在设置reponse返回结果中的opaque值后,就完成一些前置的校验(Broker是否可读、Topic/ConsumerGroup是否存在、读取队列Id是否在Topic配置的队列范围数内);
(2)根据“ConsumerGroup”、“Topic”、“queueId”和“offset”这些参数来调用MessageStore实例的getMessage()方法来产尝试读取Broker端的消息;
(3)其中,经过findConsumeQueue()方法,获取逻辑消费队列—ConsumeQueue;
(4)根据offset与逻辑消费队列中的maxOffset、minOffset的比较,来设置状态值status,同时计算出下次Pull消息的开始偏移量值—nextBeginOffset,而后经过MappedFile的方式获取ConsumeQueue的Buffer映射结果值;
(5)根据算出来的offsetPy(物理偏移量值)和sizePy(消息的物理大小),从commitLog获取对应消息的Buffer映射结果值,并填充至GetMessageResult返回对象,并设置返回结果(状态/下次其实偏移量/maxOffset/minOffset)后return;
(6)根据isTransferMsgByHeap的设置状况(默认为true),选择下面两种方式之一来真正读取GetMessageResult的消息内容并返回至Consumer端;
方式1:使用JDK NIO的ByteBuffer,循环地读取存有消息内容的messageBufferList至堆内内存中,返回byte[]字节数组,并设置到响应的body中;而后,经过RPC通讯组件—NettyRemotingServer发送响应至Consumer端;
方式2:采用基于Zero-Copy的Netty组件的FileRegion,其包装的“FileChannel.tranferTo”实现文件传输,能够直接将文件缓冲区的数据发送至通讯目标通道Channel中,避免了经过循环write方式致使的内存拷贝开销,这种方式性能上更优;
(7)在PullMessageProcessor业务处理器的最后,提交并持久化消息消费的offset偏移量进度;ide
说完了Pull消息请求的通常流程,下面主要看下Broker端的PullMessageProcessor业务处理器在RocketMQ中尚未消息能够拉取状况下(即为:PULL_NOT_FOUND)的处理流程,本节内容也是RocketMQ中长轮询机制的关键。
长轮询机制是对普通轮询的一种优化方案,它平衡了传统Push/Pull模型的各自缺点,Server端若是当前没有Client端请求拉取的相关数据会hold住这个请求,直到Server端存在相关的数据,或者等待超时时间后返回。在响应返回后,Client端又会再次发起下一次的长轮询请求。RocketMQ的push模式正是采用了这种长轮询机制的设计思路,若是在上面所述的第一次尝试Pull消息失败后(好比Broker端暂时没有能够消费的消息),先hold住而且挂起该请求(这里,设置返回响应response为null,此时不会向Consumer端发送任何响应的内容,即不会对响应结果进行处理),而后经过Broker端的后台线程PullRequestHoldService从新尝试和后台线程ReputMessageService的二次处理。在Broker端,两个后台线程服务PullRequestHoldService和ReputMessageService是实现长轮询机制的关键点。下面就来分别介绍这两个服务线程:
(1)PullRequestHoldService:该服务线程会从pullRequestTable本地缓存变量中取PullRequest请求,检查轮询条件—“待拉取消息的偏移量是否小于消费队列最大偏移量”是否成立,若是条件成立则说明有新消息达到Broker端,则经过PullMessageProcessor的executeRequestWhenWakeup()方法从新尝试发起Pull消息的RPC请求(此处,每隔5S重试一次,默认长轮询总体的时间设置为30s);
(2)ReputMessageService:该服务线程会在Broker端不断地从数据存储对象—commitLog中解析数据并分发请求,随后构建出ConsumeQueue(逻辑消费队列)和IndexFile(消息索引文件)两种类型的数据。同时从本地缓存变量—pullRequestTable中,取出hold住的PullRequest请求并执行二次处理(具体的作法是,在PullMessageProcessor的executeRequestWhenWakeup()方法中,经过业务线程池pullMessageExecutor,异步提交从新Pull消息的请求任务,即为从新调了一次PullMessageProcessor业务处理器的processRequest()方法,来实现Pull消息请求的二次处理)。这里,ReputMessageService服务线程,每处理一次,Thread.sleep(1),继续下一次处理。性能
看了上面一节—“RocketMQ中长轮询的Pull消息机制”后,你们可能会有这样子一个疑问:在Consumer端pullMessageService线程做为消息的主动拉取者不断地从阻塞队列中获取元素PullRequest,那么这里的PullRequest是在哪儿由哪一个线程放入至阻塞队列中的呢?本节内容将介绍“Consumer端的负载均衡机制”,同时解答上面的疑问。优化
在RocketMQ中,Consumer端的两种消费模式(Push/Pull)都是基于拉模式来Pull消息的,而在Push模式中只是采用了长轮询的方式而实现了准实时的自动消息拉取。在两种基于拉模式的消费方式(Push/Pull)中,均须要Consumer端在知道从Broker端的哪个消息队列—MessageQueue中去Pull消息。所以,消息队列的负载均衡处理(即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费),由Consumer端来主动完成更为合理。
1. Consumer端的心跳包发送
在Consumer启动后,它就会经过定时任务不断地向RocketMQ集群中的全部Broker实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通讯模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为以后作Consumer端的负载均衡提供能够依据的元数据信息。
2. Consumer端实现负载均衡的核心类—RebalanceImpl
在上一篇文章的"Consumer启动流程"中已经介绍了在启动MQClientInstance实例时候,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。经过查看源码能够发现,RebalanceService线程的run()方法最终调用的是RebalanceImpl类的rebalanceByTopic()方法,该方法是实现Consumer端负载均衡的核心关键。
这里,rebalanceByTopic()方法会根据消费者通讯类型为“广播模式”仍是“集群模式”作不一样的逻辑处理。这里主要来看下集群模式下的主要处理流程:
(1)从rebalanceImpl实例的本地缓存变量—topicSubscribeInfoTable中,获取该Topic主题下的消息消费队列集合(mqSet);
(2)根据topic和consumerGroup为参数调用mQClientFactory.findConsumerIdList()方法向Broker端发送获取该消费组下消费者Id列表的RPC通讯请求(Broker端基于前面Consumer端上报的心跳包数据而构建的consumerTable作出响应返回,业务请求码:GET_CONSUMER_LIST_BY_GROUP);
(3)先对Topic下的消息消费队列、消费者Id排序,而后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。
Consumer端负载均衡策略的分配.jpg
这里的平均分配算法,相似于分页的算法,将全部MessageQueue排好序相似于记录,将全部消费端Consumer排好序相似页数,并求出每一页须要包含的平均size和每一个页面记录的范围range,最后遍历整个range而计算出当前Consumer端应该分配到的记录(这里即为:MessageQueue)。具体的算法代码以下:
@Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { //省略代码...... List<MessageQueue> result = new ArrayList<MessageQueue>(); //省略代码...... int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result;
(4)而后,调用updateProcessQueueTableInRebalance()方法,具体的作法是,先将分配到的消息队列集合(mqSet)与processQueueTable作一个过滤比对,具体的过滤比对方式以下图:
RebalancePushImpl负载均衡(分发pullRequest到pullRequestQueue).jpg
这里能够分以下两种状况来筛选过滤:
a.图中processQueueTable标注的红色部分,表示与分配到的消息队列集合mqSet互不包含。将这些队列设置Dropped属性为true,而后查看这些队列是否能够移除出processQueueTable缓存变量,这里具体执行removeUnnecessaryMessageQueue()方法,即每隔1s 查看是否能够获取当前消费处理队列的锁,拿到的话返回true。若是等待1s后,仍然拿不到当前消费处理队列的锁则返回false。若是返回true,则从processQueueTable缓存变量中移除对应的Entry;
b.图中processQueueTable的绿色部分,表示与分配到的消息队列集合mqSet的交集。判断该ProcessQueue是否已通过期了,在Pull模式的不用管,若是是Push模式的,设置Dropped属性为true,而且调用removeUnnecessaryMessageQueue()方法,像上面同样尝试移除Entry;
最后,为过滤后的消息队列集合(mqSet)中的每一个MessageQueue建立一个ProcessQueue对象并存入RebalanceImpl的processQueueTable队列中(其中调用RebalanceImpl实例的computePullFromWhere(MessageQueue mq)方法获取该MessageQueue对象的下一个进度消费值offset,随后填充至接下来要建立的pullRequest对象属性中),并建立拉取请求对象—pullRequest添加到拉取列表—pullRequestList中,最后执行dispatchPullRequest()方法,将Pull消息的请求对象PullRequest依次放入PullMessageService服务线程的阻塞队列pullRequestQueue中,待该服务线程取出后向Broker端发起Pull消息的请求。其中,能够重点对比下,RebalancePushImpl和RebalancePullImpl两个实现类的dispatchPullRequest()方法不一样,RebalancePullImpl类里面的该方法为空,这样子也就回答了上一篇中最后的那道思考题了。
RocketMQ的消息消费(二)(push模式实现)篇幅就先分析到这里了。关于RocketMQ消息消费的内容比较多也比较复杂,须要读者结合源码并屡次debug才能对其有一个较为深入的理解。另外,对于消息消费部分的““消息ACK机制”、“消费重试机制”等剩余内容将在后续的篇幅进行介绍和分析。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,若有阐述不合理之处还望留言一块儿探讨。
做者:癫狂侠 连接:https://www.jianshu.com/p/fac642f3c1af 來源:简书 简书著做权归做者全部,任何形式的转载都请联系做者得到受权并注明出处。