RocketMq源码分析之DefaultLitePullConsumer主动拉取消息分析consumeRequestCachejava
由于作RMQ的proxy的消费者的时候,消息消费只能主动拉取,而后就想去研究下RMQ中主动拉消息的消费者的源码,开始在网上搜到的都是讲DefaultMQPullConsumer的,而后我用的是RocketMq4.8.0的版本,在4.8中,这个类被标记为将要废弃,将被DefaultLitePullConsumer替代,因而今天就我们就研究下DefaultLitePullConsumer.app
和普通push模式的消费者同样是建立好对象,而后start,它的区别是须要主动去拿消息,而后去消费.源码分析
这里面又有两种拉取消息的方式,一种是assign,一种是subscribe.fetch
assign是指定哪些队列去拉取,subscribe是只指定topic,而后由均衡策略去全部队列中选择队列拉取this
这是subscribespa
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test"); litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); litePullConsumer.subscribe("TopicTest", "*"); litePullConsumer.start(); try { while (running) { List<MessageExt> messageExts = litePullConsumer.poll(); System.out.printf("%s%n", messageExts); } } finally { litePullConsumer.shutdown(); }
这是assign线程
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name"); litePullConsumer.setAutoCommit(false); litePullConsumer.start(); Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest"); List<MessageQueue> list = new ArrayList<>(mqSet); List<MessageQueue> assignList = new ArrayList<>(); for (int i = 0; i < list.size() / 2; i++) { assignList.add(list.get(i)); } litePullConsumer.assign(assignList); litePullConsumer.seek(assignList.get(0), 10); try { while (running) { List<MessageExt> messageExts = litePullConsumer.poll(); System.out.printf("%s %n", messageExts); litePullConsumer.commitSync(); } } finally { litePullConsumer.shutdown(); }
首先看start方法干了什么code
public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; //这里是对consumerGroupName以及messageModel等一些基础属性作一下校验 this.checkConfig(); //若是是集群模式,就将实例名改成PID,由于通常没设置instanceName的状况下,默认是Default, //可是集群下,不一样的consumer节点须要区分开来,不能全设置为Default if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultLitePullConsumer.changeInstanceNameToPID(); } //初始化客户端实例,注册为消费者 initMQClientFactory(); //初始化消息的重平衡策略的配置, initRebalanceImpl(); initPullAPIWrapper(); //根据消息的消费模式,启动不一样的offsetStore对象 //集群模式就启动集群模式的 //广播模式启动广播模式的 initOffsetStore(); mQClientFactory.start(); //周期性获取最新的Topic和与之对应的MessageQueue startScheduleTask(); this.serviceState = ServiceState.RUNNING; log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup()); //根据拉取消息的策略来执行,有两种,一种是订阅Topic,一种是本身分配messageQueue,也就是assign //若是是assign,须要在start consumer以前,本身先根据topic拿到全部的messageQueue的信息 //而后assign本身想要选择的messageQueue //若是是subscribe,就会自动根据传入的topic去namesrv拉取全部messageQueue的数据 operateAfterRunning(); break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } }
第一遍看源码的时候,就这么大体过了一遍,而后接下来就是主动拉取消息的时候了:对象
List<MessageExt> messageExts = litePullConsumer.poll();
在最上面启动消费者的代码中有这么一行,是主动拉取消息的方法,在之前的DefaultMQPullConsumer中,是能够设置拉取多少条消息的,可是如今这个类不能设置了,我点进poll方法开始追踪消息的拉取策略队列
public synchronized List<MessageExt> poll(long timeout) { try { checkServiceState(); if (timeout < 0) throw new IllegalArgumentException("Timeout must not be negative"); //看是否设置了自动提交offset,若是是就根据当前时间判断一下当前是否须要提交一次offset if (defaultLitePullConsumer.isAutoCommit()) { maybeAutoCommit(); } long endTime = System.currentTimeMillis() + timeout; //获取拉取到的消息 ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (endTime - System.currentTimeMillis() > 0) { while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) { consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (endTime - System.currentTimeMillis() <= 0) break; } } if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) { List<MessageExt> messages = consumeRequest.getMessageExts(); long offset = consumeRequest.getProcessQueue().removeMessage(messages); assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset); //If namespace not null , reset Topic without namespace. this.resetTopic(messages); return messages; } } catch (InterruptedException ignore) { } return Collections.emptyList(); }
看到这里我就发现不对劲了,ConsumeRequest是一个保存了消息数据的一个类,这个类竟然是从consumeRequestCache这么一个BlockingQueue中取出来的
private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();
等等,不对劲啊,怎么直接就从程序的队列中取出来了,这啥时候放进去的啊,我都还没开始拉取,怎么就有消息了呢,因而我就追踪了一下这个consumeRequestCache,发现他里面的数据是经过PullTaskImpl这个线程类在启动的时候放进去的,而后就发现了一个存放PullTaskImpl的队列
private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable = new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
最终追踪到了一个startPullTask方法,在这个方法须要传入一个集合的messageQueue,而后建立一个PullTaskImpl开始拉取消息,生成ConsumeRequest. 继续追踪发现有两个地方使用到了这个startPullTask方法,一个是updateAssignPullTask,一个是updatePullTask,而后我一路追踪这两个方法的用途. 发现了updateAssignPullTask是在上面start方法中的operateAfterRunning()使用的,这下对应上了,在assign模式下,传入了assign的messageQueues而后在start以后从这些queue拉取消息.
另外一个方法是在触发消息分发重平衡的时候执行的.在RebalanceLitePullImpl中
这下主动拉取消息的逻辑明白了,原来是在consumer启动的时候,若是指定了messageQueue,就开始从这些queue中拉取消息,若是没有指定,那就在重平衡的时候从订阅的Topic里拉取全部的messageQueue,而后再拉取消息,以后就经过poll方法来取.
对了还有很是重要的一点,主动拉取消息的状况,是默认自动提交消费位点offset的,能够经过setAutoCommit(false);来设置为手动提交.消费完了记得调用commitSync();