RocketMq源码分析之DefaultLitePullConsumer主动拉取消息分析consumeRequestCache

RocketMq源码分析之DefaultLitePullConsumer主动拉取消息分析consumeRequestCachejava

由于作RMQ的proxy的消费者的时候,消息消费只能主动拉取,而后就想去研究下RMQ中主动拉消息的消费者的源码,开始在网上搜到的都是讲DefaultMQPullConsumer的,而后我用的是RocketMq4.8.0的版本,在4.8中,这个类被标记为将要废弃,将被DefaultLitePullConsumer替代,因而今天就我们就研究下DefaultLitePullConsumer.app

和普通push模式的消费者同样是建立好对象,而后start,它的区别是须要主动去拿消息,而后去消费.源码分析

这里面又有两种拉取消息的方式,一种是assign,一种是subscribe.fetch

assign是指定哪些队列去拉取,subscribe是只指定topic,而后由均衡策略去全部队列中选择队列拉取this

这是subscribespa

DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
        litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        litePullConsumer.subscribe("TopicTest", "*");
        litePullConsumer.start();
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s%n", messageExts);

            }
        } finally {
            litePullConsumer.shutdown();
        }

这是assign线程

DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name");
        litePullConsumer.setAutoCommit(false);
        litePullConsumer.start();
        Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest");
        List<MessageQueue> list = new ArrayList<>(mqSet);
        List<MessageQueue> assignList = new ArrayList<>();
        for (int i = 0; i < list.size() / 2; i++) {
            assignList.add(list.get(i));
        }
        litePullConsumer.assign(assignList);
        litePullConsumer.seek(assignList.get(0), 10);
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s %n", messageExts);
                litePullConsumer.commitSync();
            }
        } finally {
            litePullConsumer.shutdown();
        }

首先看start方法干了什么code

public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                //这里是对consumerGroupName以及messageModel等一些基础属性作一下校验
                this.checkConfig();

                //若是是集群模式,就将实例名改成PID,由于通常没设置instanceName的状况下,默认是Default,
                //可是集群下,不一样的consumer节点须要区分开来,不能全设置为Default
                if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultLitePullConsumer.changeInstanceNameToPID();
                }

                //初始化客户端实例,注册为消费者
                initMQClientFactory();

                //初始化消息的重平衡策略的配置,
                initRebalanceImpl();

                initPullAPIWrapper();

                //根据消息的消费模式,启动不一样的offsetStore对象
                //集群模式就启动集群模式的
                //广播模式启动广播模式的
                initOffsetStore();

                mQClientFactory.start();

                //周期性获取最新的Topic和与之对应的MessageQueue
                startScheduleTask();

                this.serviceState = ServiceState.RUNNING;

                log.info("the consumer [{}] start OK", this.defaultLitePullConsumer.getConsumerGroup());

                //根据拉取消息的策略来执行,有两种,一种是订阅Topic,一种是本身分配messageQueue,也就是assign
                //若是是assign,须要在start consumer以前,本身先根据topic拿到全部的messageQueue的信息
                //而后assign本身想要选择的messageQueue
                //若是是subscribe,就会自动根据传入的topic去namesrv拉取全部messageQueue的数据
                operateAfterRunning();

                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
    }

第一遍看源码的时候,就这么大体过了一遍,而后接下来就是主动拉取消息的时候了:对象

List<MessageExt> messageExts = litePullConsumer.poll();

在最上面启动消费者的代码中有这么一行,是主动拉取消息的方法,在之前的DefaultMQPullConsumer中,是能够设置拉取多少条消息的,可是如今这个类不能设置了,我点进poll方法开始追踪消息的拉取策略队列

public synchronized List<MessageExt> poll(long timeout) {
        try {
            checkServiceState();
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");

            //看是否设置了自动提交offset,若是是就根据当前时间判断一下当前是否须要提交一次offset
            if (defaultLitePullConsumer.isAutoCommit()) {
                maybeAutoCommit();
            }
            long endTime = System.currentTimeMillis() + timeout;

            //获取拉取到的消息
            ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

            if (endTime - System.currentTimeMillis() > 0) {
                while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
                    consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
                    if (endTime - System.currentTimeMillis() <= 0)
                        break;
                }
            }

            if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
                List<MessageExt> messages = consumeRequest.getMessageExts();
                long offset = consumeRequest.getProcessQueue().removeMessage(messages);
                assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
                //If namespace not null , reset Topic without namespace.
                this.resetTopic(messages);
                return messages;
            }
        } catch (InterruptedException ignore) {

        }

        return Collections.emptyList();
    }

看到这里我就发现不对劲了,ConsumeRequest是一个保存了消息数据的一个类,这个类竟然是从consumeRequestCache这么一个BlockingQueue中取出来的

private final BlockingQueue<ConsumeRequest> consumeRequestCache = new LinkedBlockingQueue<ConsumeRequest>();

等等,不对劲啊,怎么直接就从程序的队列中取出来了,这啥时候放进去的啊,我都还没开始拉取,怎么就有消息了呢,因而我就追踪了一下这个consumeRequestCache,发现他里面的数据是经过PullTaskImpl这个线程类在启动的时候放进去的,而后就发现了一个存放PullTaskImpl的队列

private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();

最终追踪到了一个startPullTask方法,在这个方法须要传入一个集合的messageQueue,而后建立一个PullTaskImpl开始拉取消息,生成ConsumeRequest. 继续追踪发现有两个地方使用到了这个startPullTask方法,一个是updateAssignPullTask,一个是updatePullTask,而后我一路追踪这两个方法的用途. 发现了updateAssignPullTask是在上面start方法中的operateAfterRunning()使用的,这下对应上了,在assign模式下,传入了assign的messageQueues而后在start以后从这些queue拉取消息.

另外一个方法是在触发消息分发重平衡的时候执行的.在RebalanceLitePullImpl中

这下主动拉取消息的逻辑明白了,原来是在consumer启动的时候,若是指定了messageQueue,就开始从这些queue中拉取消息,若是没有指定,那就在重平衡的时候从订阅的Topic里拉取全部的messageQueue,而后再拉取消息,以后就经过poll方法来取.

对了还有很是重要的一点,主动拉取消息的状况,是默认自动提交消费位点offset的,能够经过setAutoCommit(false);来设置为手动提交.消费完了记得调用commitSync();

相关文章
相关标签/搜索