这节介绍RocketMQ客户端的启动流程,即Consumer和Producer的启动流程。express
首先先看下客户端的demobootstrap
Producer:api
public class SyncProducer { public static void main (String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer ("GroupTest"); // 设置NameServer的地址 producer.setNamesrvAddr ("localhost:9876"); // 启动Producer实例 producer.start (); for (int i = 0; i < 100; i++) { // 建立消息,并指定Topic,Tag和消息体 Message msg = new Message ("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes (RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 发送消息到一个Broker SendResult sendResult = producer.send (msg); // 经过sendResult返回消息是否成功送达 System.out.printf ("%s%n", sendResult); } // 若是再也不发送消息,关闭Producer实例。 producer.shutdown (); } }
Consumer:缓存
public class Consumer { public static void main (String[] args) throws InterruptedException, MQClientException { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("GroupTest"); // 设置NameServer的地址 consumer.setNamesrvAddr ("localhost:9876"); // 订阅一个或者多个Topic,以及Tag来过滤须要消费的消息 consumer.subscribe ("TopicTest", "*"); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener (new MessageListenerConcurrently () { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf ("%s Receive New Messages: %s %n", Thread.currentThread ().getName (), msgs); // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start (); System.out.printf ("Consumer Started.%n"); } }
Producer和Consumer的启动相似,在初始化而后进行必要设置(主要是客户端所属的Group和NameServer地址)后,执行start方法启动后台监听服务,事实上Producer和Consumer都是调用同一个类MQClientInstance的start方法,下图为继承关系:微信
DefaultMQproducer和DefaultMQPushConsumer都继承自ClientConfig,顾名思义ClientConfig表示客户端的配置,包括NameServer地址、客户端地址、客户端实例名等。因为Producer和Consumer都须要同Broker和NameServer交互,因此配置上有不少相同,这两个将主要功能的实现都委托给了对应的Impl(DefaultMQProducerImpl和DefaultMQPushConsumerImpl)。Impl内部调用了MQClientInstance来完成客户端同远程交互的主要功能,而Producer和Consumer则封装本身相关的行为,MQClientInstance内部又委托忒了MQClientAPIImpl。app
DefaultMQProducer的启动以下:dom
DefaultMQProducer将start委托给了DefaultMQProducerImpl来完成,主要过程为:tcp
DefaultMQPushConsumer的启动以下:ide
DefaultMQPushConsumer一样将start委托给了DefaultMQPushConsumerImpl来完成,流程上也类似。但相比DefaultMQProducer多了不少其余组件来辅助消费过程,如rebalance、offset管理等,主要过程为:fetch
DefaultMQPushConsumer为推模式,RocketMQ还提供了拉模式来消费消息,实现类为DefaultMQPullConsumer,启动过程相似,推模式是用拉模式来实现的,重点实现都在MQClientInstace中。
MQClientInstance为一个门户类,组合了各功能,以下,包括Rebalance、消费数据统计、生产消息、消费消息等,这些都有对应的实现。
上面说过,Producer和Consumer在启动的时候,都会在内部先初始化一个MQClientInstance对象,而后调用其start方法启动对应的后台程序,以下:
MQClientInstance的start方法除了调用自身进行准备工做外,也调用了其余组件的start方法开始它们的准备工做,主要流程为:
下面详细介绍下各个过程。
该方法用于更新NameServer地址,该方法会从http://xxx:port/rocketmq/yyy
,默认8080端口(若是xxx中没有:,即不带端口时)中获取NameServer地址(xxx为域名,由系统配置项rocketmq.namesrv.domain控制,默认为jmenv.tbsite.net;yyy为访问路径,由系统配置项rocketmq.namesrv.domain.subgroup控制,默认为nsaddr)。该地址要求返回结果为一个ip列表,以;隔开,若是获取回来的地址跟现有的地址不一致则会更新缓存的NameServer地址列表。解析出来的地址列表用于根据NettyRemotingClient内部持有的变量:
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
该方法在内部调用了NettyRemotingClient的start方法,用于初始化Netty客户端。NettyRemotingClient是基于Netty实现的tcp协议客户端,主要流程为:
关于NettyRemotingClient后面会专门进行讲解,这里只介绍在客户端启动时其作了哪些动做。
该方法用于根据客户端实例关注的全部topic的路由信息,包括客户端监听的topic以及producer生产的topic。首先会遍历从MQClientInstance内部的consumerTable和consumerTable的客户端实例,拿到全部的topic信息,而后挨个更新topic的路由。
同步topic路由时,会经过NettyRemotingClient选择一个NameServer获取topic路由信息,而后判断topic信息是否发生了更改,主要比较topic所对应的Queue和Broker是否发生了更改。若路由信息发生了更改则会同步topic所在的broker地址列表,即内部的brokerAddrTable属性;接着同步produer关注的topic路由信息,即producerTable属性;接着同步consumer订阅的topic路由信息,即consumerTable属性;最后更新本地topic信息,即topicRouteTable属性。
该方法会遍历MQClient所持有的各个producer和consumer,将客户端信息构造为HeartbeatData对象,而后调用MQClientAPIImpl的sendHearbeat方法,向全部的broker上报心跳数据。心跳内容包括:
该方法会遍历consumerTable里的全部MQConsumer对象,获取每一个队列处理的MessageQueue,而后调用OffsetStore持久化全部的MessageQueue。OffsetStore后面会专门进行讲解。
该方法主要是动态调整DefaultMQPushConsumerImpl(推模式)客户端消费线程池的大小。前面说过推模式是经过包装拉模式来实现的,内部都依赖PullAPIWrapper。实现上推模式多了一个ConsumeMessageService定时使用拉模式消费消息,该实现须要一个线程池,adjustThreadPool就是动态调整该线程池的大小。关于客户端消费消息的过程,后面也会专门进行讲解。
PullMessageService用于封装拉模式以实现推模式。它会循环从内部的LinkedBlockingQueue<PullRequest>中拿出PullRequest对象(消费q消息封装的对象),选取一个可用的客户端实例DefaultMQPushConsumerImpl,调用其pullMessage方法.该方法会判断消费进度,决定是当即消费仍是延迟消费,若是是延迟消费则再放回LinkedBlockingQueue中等待消费;若是是直接消费,则调用PullMessageService(拉模式)的executePullRequestImmediately消费消息.
PullMessageService的基础关系以下:
PullMessageService.start内部主要是启动线程,该线程会循环执行执行任务,具体实现会在后续介绍消息消费的时候说起。
该方法用于启动rebalance任务。RebalanceService同PullMessageService相同,都继承自ServiceThread类,,并实现了run方法。RebalanceService在run方法中等待必定时间(默认20S,能够经过rocketmq.client.rebalance.waitInterval配置具体时间)后会调用MQClientInstance.doRebalance执行具体的动做。具体实现会在后续介绍rebalance实现的时候说起。
在上面2.
时有说起该流程,这里的DefaultMQPushConsumerImpl对象是Group为CLIENT_INNER_PRODUCER
的内部对象。
客户端的启动过程就如上面介绍,下面附上该部分当时源码阅读过程作的笔记简图,该图描述了客户端启动过程的大体过程:
更多原创内容请搜索微信公众号:啊驼(doubaotaizi)