【RocketMQ】消息的消费

RocketMQ支持两种形式的消息消费者:java

  • PushConsumer:使用者向Consumer对象注册一个Listener,用户实现MessageListenerConcurrently或者MessageListenerOrderly,Consumer一旦收到消息,当即回调Listener接口方法。底层采用的是Pull长轮询+Broker挂起方式拉取消息。该模式强调实时性。
  • PullConsumer:使用者主动调用Consumer的拉消息方法从Broker拉消息。

PushConsumer

实现类是DefaultMQPushConsumerImpl,看看它启动时作了哪些事。算法

一、检查配置&拷贝订阅关系给负载均衡处理器多线程

二、利用MQClientManager建立本身的MQClientInstance实例对象,该MQClientInstance的做用在生产者章节描述过负载均衡

三、为负载均衡处理器设置相关参数异步

四、初始化OffsetStore,目的是定时持久化消费进度到本地文件或者Broker,若是是广播模式——>本地,若是是集群——>Brokeride

五、设置consumeMessageService,分为2种,若是使用在注册的监听器是MessageListenerOrderly,则consumeMessageService初始化为ConsumeMessageOrderlyService,若是监听器是MessageListenerConcurrently,则对应ConsumeMessageConcurrentlyService函数

六、利用MQClientInstance注册消费者(并无向Broker注册),MQClientInstance启动相关服务。fetch

消费者的负载均衡

在集群消费模式下,一条消息只会被同一个group里一个消费端消费。不一样group之间相互不影响。广播消费模式下,一条消息会被同一个group里每个消费端消费。可是广播消息的代价较高,若是消费者集群规模较大或订阅的消费量较大,会影响集群稳定性。在集群模式下,当topic中的MessageQueue变动时,动态调整消费MessageQueue的数量,采用的策略以下:this

| AllocateMessageQueueConsistentHash | 一致性哈希算法分配 | | AllocateMessageQueueAveragely | 平均分配(默认) | | AllocateMessageQueueByMachineRoom | 机房分配 | | AllocateMessageQueueAveragelyByCircle | 环形分配 | | AllocateMessageQueueByConfig | 手动配置 |spa

经过增长consumer实例去分摊queue的消费,能够起到水平扩展的消费能力的做用。而有实例下线的时候,会从新触发负载均衡,这时候原来分配到的queue将分配到其余实例上继续消费。

可是若是consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将没法分到queue,也就没法消费到消息,也就没法起到分摊负载的做用了。因此须要控制让queue的总数量大于等于consumer的数量。

在启动消费者时,会经过RebalanceService每隔20s执行一次负载均衡,能够经过参数rocketmq.client.rebalance.waitInterval指定间隔时间。

【RebalanceService】
 @Override                                                                                                          
 public void run() {                                                                                                
     log.info(this.getServiceName() + " service started");                                                          

     while (!this.isStopped()) {                                                                                    
         this.waitForRunning(waitInterval);                                                                         
         this.mqClientFactory.doRebalance();                                                                        
     }                                                                                                              

     log.info(this.getServiceName() + " service end");                                                              
 }                                                                                                                  
 //直接利用CountDownLatch实现等待超时,不使用sleep的缘由是可能其余操做会唤醒等待,要求当即rebalance
 protected void waitForRunning(long interval) {
        if (hasNotified.compareAndSet(true, false)) {
            this.onWaitEnd();
            return;
        }

        //entry to wait,重写了CountDownLatch的Sync,reset实现:setState(startCount),至关于从新初始化
        waitPoint.reset();

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }

消费者处理消息的方式

并行处理:须要用户注册监听器时实现 new MessageListenerConcurrently();实现以后对应的ConsumeMessageConcurrentlyService会启动,将每次从Broker拉取的消息切割成多个consumeBatchSize(默认是1)的消息集合,每一个集合封装成一个ConsumeRequest,提交到线程池处理。若是线程池拒绝了此次提交(当前线程没法处理或线程池缓冲队列满了),则把这批消息放入到定时调度中,延迟5s再执行。使用并行处理的好处是多线程消费快,很差在于若是强调消息消费的顺序性,例如必须(付款,发货)这2条消息,并行处理可能致使(发货——付款)的处理顺序,不对!因此考虑顺序消费的场景应该使用顺序处理。

@Override
    public void submitConsumeRequest(……) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);     
        } else {
            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }
                try {
                    consumeExecutor.submit(new ConsumeRequest(msgThis, processQueue, messageQueue));
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }
                  this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }
    private void submitConsumeRequestLater(final ConsumeRequest consumeRequest
    ) {

        this.scheduledExecutorService.schedule(new Runnable() {

            @Override
            public void run() {
                ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
            }
        }, 5000, TimeUnit.MILLISECONDS);
    }

顺序处理:须要用户注册监听器时实现 new MessageListenerOrderly();实现以后对应的ConsumeMessageOrderlyService会启动,该服务的特色是某一个队列,在同一时间只能有一个线程访问,只有等拿到锁的线程消费成功后释放锁,其余线程才能继续消费该队列。劣势是下降了消息处理的吞吐量,当前一条消息消费出现问题时,会阻塞后续的流程。

ConsumeMessageOrderlyService在启动的时候,若是是集群模式下会启动一个单线程的定时调度任务,延迟一秒,时间间隔为20秒,执行rebalanceImpl的lockAll()方法。向主Master节点的Broker发送LOCK_BATCH_MQ请求,内容是锁住该ConsumerId下的全部MessageQueue,当Broker返回锁住成功的MQs后,Client端会把MQ对应的ProcessQueue也锁住。这里说明下两个Queue的区别:

//消息队列的基本属性
public class MessageQueue {
    //消息主题
    private String topic;
    //所在Broker
    private String brokerName;
    //惟一标识
    private int queueId;
}

//消息队列中的消息数据
public class ProcessQueue {
   //存放消息,key=MsgId,value=Message
   private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();

   private final Lock lockConsume = new ReentrantLock();
   // ……还有其余属性
}

当Consumer拉取到消息后,调用ConsumeMessageOrderlyService#submitConsumeRequest处理这批消息。

@Override
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) {
            //ConsumeRequest继承了Runnable,封装了一个MessageQueue && ProcessQueue
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            //提交到线程池
            this.consumeExecutor.submit(consumeRequest);
        }
}

在执行ConsumerRequest时,它会获取该MessageQueue的锁,使用synchronized (objLock) 保证多线程串行处理该队列。关键处理以下:

@Override
 public void run() {
     final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
     synchronized (objLock) {
         //广播模式 || (PQ 成功锁住&& 锁没过时)
         if (MessageModel.BROADCASTING.equals(defaultMQPushConsumerImpl.messageModel())
             || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                //依次消费,直到continueConsume=false,若是使用者处理消息没有错误,通常会返回SUCCESS,这时continueConsume始终是true,若是没有返回SUCCESS,RocketMQ会根据返回状态把continueConsume=false
                for (boolean continueConsume = true; continueConsume; ) {  
                 if (MessageModel.CLUSTERING.equals(defaultMQPushConsumerImpl.messageModel())
                            && !this.processQueue.isLocked()) {
                       tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                  }

                 if(CLUSTERING.equals(defaultMQPushConsumerImpl.messageModel())&& this.processQueue.isLockExpired()) {
                       tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                  }
                 // 在线程数小于队列数状况下,防止个别队列被饿死
                 long interval = System.currentTimeMillis() - beginTime;
                 if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                      submitConsumeRequestLater(processQueue, messageQueue, 10);
                      break;
                 }
                 try {
                   //处理队列加锁
                   this.processQueue.getLockConsume().lock();
                   status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                  } catch (Throwable e) {
                                
                  } finally {
                       this.processQueue.getLockConsume().unlock();
                  }
                  continueConsume = processConsumeResult(msgs, status, context, this);
                  } else {
                            continueConsume = false;
                        }
                    }
                } else { 
                  // 没有拿到当前队列的锁,稍后再消费
                    tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }

顺序消费总的来讲,首先,须要保证顺序的消息要发送到同一个messagequeue中;其次,一个messagequeue只能被一个消费者消费,这点是由消息队列的分配机制来保证的;最后,一个消费者内部对一个mq的消费要保证是有序的。

Broker如何实时推送

Broker的实时推送依靠2个服务:PullRequestHoldService和ReputMessageService。二者都继承了ServiceThread,能够丢给专门的线程去执行。前者是挂起每次请求,否则客户端不停轮询扛不住。后者用来向ConsumeQueue写消息,并触发前者处理最新的消息。

PullRequestHoldService的做用是挂起来自客户端的拉取消息请求,直到有新消息知足拉取条件才会释放请求。

@Override
    public void run() {
       
        while (!this.isStopped()) {
            try {
                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                    //间隔5s检查是否有新消息
                    this.waitForRunning(5 * 1000);
                } else {             this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }
                //实际是调用notifyMessageArriving
                this.checkHoldRequest();
               
            } catch (Throwable e) {
             
            }
        }
    }

PullConsumer

从官方给出的示例,能够看出该模式须要使用者本身维护队列的offset,比PushConsumer由服务端维护略显麻烦。

public class PullConsumer {
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();


    public static void main(String[] args) throws MQClientException {
        MetaPullConsumer consumer = new MetaPullConsumer("please_rename_unique_group_name_5");

        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
        for (MessageQueue mq : mqs) {
            PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
            putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
        }

        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }

}

从示例代码中看出,使用时须要先获取该消费者拥有的全部队列,告诉客户端须要从该队列的哪一个offset位置取消息,获得消息后本身更新offset。

拉取消息时分为同步和异步,异步无非是客户端自身(不是消费者)添加个回调函数new PullCallback() ,在MQClientAPIImpl接受消息以后调用pullCallback.onSuccess(pullResult);在Pull模式下,没看到负载均衡,应该是给消费者自行控制,不像Push模式时由客户端统一处理和控制,根据消费者的消费能力作流控。

相关文章
相关标签/搜索