Java工程师的进阶之路 RocketMQ篇(一)

白菜Java自习室 涵盖核心知识mysql

Java工程师的进阶之路 RocketMQ篇(一)
Java工程师的进阶之路 RocketMQ篇(二)
sql

1. RocketMQ 简介

RocketMQ 前身叫作 MetaQ, 在 MeataQ 发布 3.0 版本的时候更名为 RocketMQ,其本质上的设计思路和 Kafka 相似,可是和 Kafka 不一样的是其使用 Java 进行开发,因为在国内的 Java 受众群体远远多于 Scala,因此 RocketMQ 是不少以 Java 语言为主的公司的首选。一样的 RocketMQ 和 Kafka 都是 Apache 基金会中的顶级项目,他们社区的活跃度都很是高,项目更新迭代也很是快。markdown

2. RocketMQ 架构图

对于 RocketMQ 的架构图,在大致上来看和 Kafka 并无太多的差异,可是在不少细节上是有不少差异的,接下来会一一进行讲述。架构

3. RocketMQ 名词解释

RocketMQ 架构图中多个 Producer,多个主 Broker,多个从 Broker,每一个 Producer 能够对应多个 Topic,每一个 Consumer 也能够消费多个 Topic。ide

Broker 信息会上报至 NameServer,Consumer 会从 NameServer 中拉取 Broker 和 Topic 的信息。post

  • Producer:消息生产者,向 Broker 发送消息的客户端spa

  • Consumer:消息消费者,从 Broker 读取消息的客户端设计

  • Broker:消息中间的处理节点,这里和 kafka 不一样,kafka 的 Broker 没有主从的概念,均可以写入请求以及备份其余节点数据,RocketMQ 只有主 Broker 节点才能写,通常也经过主节点读,当主节点有故障或者一些其余特殊状况才会使用从节点读,有点相似- 于 mysql 的主从架构。3d

  • Topic:消息主题,一级消息类型,生产者向其发送消息, 消费者读取其消息。code

  • Group:分为 ProducerGroup, ConsumerGroup, 表明某一类的生产者和消费者,通常来讲同一个服务能够做为 Group, 同一个 Group 通常来讲发送和消费的消息都是同样的。

  • Tag:Kafka 中没有这个概念,Tag 是属于二级消息类型,通常来讲业务有关联的可使用同一个 Tag, 好比订单消息队列,使用 Topic_Order, Tag 能够分为 Tag_食品订单, Tag_服装订单等等。

  • Queue: 在 kafka 中叫 Partition, 每一个 Queue 内部是有序的,在 RocketMQ 中分为读和写两种队列,通常来讲读写队列数量一致,若是不一致就会出现不少问题。

  • NameServer:Kafka 中使用的是 ZooKeeper 保存 Broker 的地址信息,以及 Broker 的 Leader 的选举,在 RocketMQ 中并无采用选举 Broker 的策略,因此采用了无状态的 NameServer 来存储,因为NameServer 是无状态的,集群节点之间并不会通讯,因此上传数据的时候都须要向全部节点进行发送。

不少朋友都在问什么是无状态呢?状态的有无实际上就是数据是否会作存储,有状态的话数据会被持久化,无状态的服务能够理解就是一个内存服务,NameServer 自己也是一个内存服务,全部数据都存储在内存中,重启以后都会丢失。

4. RocketMQ Topic和Queue

在 RocketMQ 中的每一条消息,都有一个 Topic,用来区分不一样的消息。一个主题通常会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者均可以接收到生产者写入的新消息。

在 Topic 中有分为了多个 Queue,这实际上是咱们发送/读取消息通道的最小单位,咱们发送消息都须要指定某个写入某个 Queue,拉取消息的时候也须要指定拉取某个 Queue,因此咱们的顺序消息能够基于咱们的 Queue 维度保持队列有序,若是想作到全局有序那么须要将 Queue 大小设置为1,这样全部的数据都会在Queue 中有序。

在上图中咱们的 Producer 会经过一些策略进行 Queue 的选择:

  • 非顺序消息:非顺序消息通常直接采用轮训发送的方式进行发送。

  • 顺序消息:根据某个 Key 好比咱们常见的订单Id, 用户Id,进行 Hash,将同一类数据放在同一个队列中,保证咱们的顺序性。

咱们同一组 Consumer 也会根据一些策略来选 Queue,常见的好比平均分配或者一致性 Hash 分配。 要注意的是当 Consumer 出现下线或者上线的时候,这里须要作重平衡,也就是 Rebalance,RocketMQ 的重平衡机制以下:

  1. 定时拉取 broker, topic 的最新信息
  2. 每隔 20s 作重平衡
  3. 随机选取当前 Topic 的一个主 Broker,这里要注意的是否是每次重平衡全部主 Broker 都会被选中,由于会存在一个 Broker 再多个 Broker 的状况。
  4. 获取当前 Broker,当前 ConsumerGroup 的全部机器ID。
  5. 而后进行策略分配。

因为重平衡是定时作的,因此这里有可能会出现某个 Queue 同时被两个 Consumer 消费,因此会出现消息重复投递

Kafka 的重平衡机制和 RocketMQ 不一样,Kafka 的重平衡是经过 Consumer 和 Coordinator 联系来完成的,当 Coordinator 感知到消费组的变化,会在心跳过程当中发送重平衡的信号,而后由一个ConsumerLeader 进行重平衡选择,而后再由 Coordinator 将结果通知给全部的消费者。

Queue 读写数量不一致怎么办?

在 RocketMQ 中 Queue 被分为读和写两种,在最开始接触 RocketMQ 的时候一直觉得读写队列数量配置不一致不会出现什么问题的,好比当消费者机器不少的时候咱们配置不少读的队列,可是实际过程当中发现会出现消息没法消费和根本没有消息消费的状况。

  1. 当写的队列数量大于读的队列的数量,当大于读队列这部分ID的写队列的数据会没法消费,由于不会将其分配给消费者。
  2. 当读的队列数量大于写的队列数量,那么多的队列数量就不会有消息被投递进来。

5. RocketMQ 入门实例

5.1. RocketMQ 生产者

直接定义好一个producer,建立好Message,调用send方法便可。

public class Producer {

    public static void main(String[] args)
            throws MQClientException,
            InterruptedException {
        DefaultMQProducer producer = new
                DefaultMQProducer("ProducerGroupName");
        producer.start();
        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                            "TagA",
                            "OrderID188",
                            "Hello world".getBytes
                                    (RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
    
}
复制代码

5.2. RocketMQ 消费者

public class PushConsumer {

    public static void main(String[] args)
            throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere
                (ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener
                (new MessageListenerConcurrently() {
                    @Override
                    public ConsumeConcurrentlyStatus
                    consumeMessage(List<MessageExt> msgs,
                                   ConsumeConcurrentlyContext context) {
                        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
    
}
复制代码

Java工程师的进阶之路 RocketMQ篇(一)
Java工程师的进阶之路 RocketMQ篇(二)