RocketMQ 分享全纪实

上上周在团队内部作了一个关于 RocketMQ 的分享,本文记录一下分享的大部份内容git

这是公众号 Young_Blog 的第 41 篇文章github

为何分享 RocketMQ

为何没有选择 Kafka 而是 RocketMQ 呢,没有什么特别的缘由,单纯是我以前就看过一点 RocketMQ 的源码,可是后来由于各类缘由没能看完,所以想着趁此次机会系统地回顾一遍。算法

另外就是,以前个人工做集中在客户端或者服务端,不多端到端地去设计、开发某项功能,所以在架构设计方面积累的经验比较少。这点劣势在年初接手开发新项目的时候尤其明显,新项目涉及注册发现、客户端、服务端、监控、网络编程等一系列模块,由于以前没有接触过这么复杂的项目,也不了解业务成熟的大型项目的设计哲学,所以在轮到本身作架构设计的时候提出的方案老是存在一些缺陷,后续致使了返工的现象。编程

那时起我就琢磨着找一个大型开源项目研究一下,学习一下它在架构设计上的考量,RocketMQ 不论从规模仍是复杂度上讲都很合适,所以就选了它。json

最近腾讯开源了 TubeMQ,感兴趣的能够去看看 TubeMQ缓存

消息队列的组成部分

首先,明确下消息队列的设计目标,一般有如下几点:服务器

  1. 解耦,须要支持消息暂存
  2. 削峰,要求能容忍消息积压
  3. 保证消息尽量不丢、不重复消费
  4. 支持传统意义上的队列和广播两种消费模式
  5. 消费能力得能够横向扩展

貌似要求越写越多,那设想一下,若是从零开始设计一个消息队列,知足以上五个需求,须要哪些组成部分。网络

第一个很容易想到的就是生产者(Producer)和消费者(Consumer),这是消息队列两端最重要的组成部分。一般状况下,生产者和消费者都不会是单台机器,而是一个集群,所以用生产组(Producer Group)和消费组(Consumer Group)来描述。架构

紧接着为了容许消息暂存和消息积压,咱们一般须要一个消息代理服务器(Broker),生产者和消费者都仅链接 Broker 进行消息的发送和消费,两者之间彻底解耦。负载均衡

消息发送一般须要一个管道进行投递,并且不一样类型的消息最好发往不一样的管道,管道的另外一端链接消费者,经过增长管道数目和消费者数目,咱们就达到了横向扩展消费能力的目的,这里咱们将管道成为 Queue,将消息类型成为 Topic

很好,目前咱们搭建了 生产者 -> Queue -> Broker -> Queue -> 消费者 的完整流程,一条消息已经能够端到端地发送了。可是思考下,若是 TopicQueue 的数目不少,某些生产者和消费者只关注其中一些,那么咱们还须要为这种订阅/发布关系提供一个注册平台,称之为 NameService(命名服务)来统一管理消息订阅的拓扑关系。

上面废了这么多口舌,目的就在于让不了解消息队列的同窗在进入下一节以前,不至于一脸懵逼。技术的架构设计都是一步一步来的,消息队列也不是一开始就演变成 Kafka 或者 RocketMQ 这种架构的,它们都经历过为了支持某些业务需求而不得不作的架构演进,最后变成了如今目前业界比较成熟的模型,而咱们上述的假设其实已经走了不少捷径。

RocketMQ 的架构

这节咱们来看看 RocketMQ 的具体架构,结合这张结构图咱们会介绍里面每一个组成部分的功能和设计考量。

RocketMQ架构图.png

Name Server

位于最顶端的是 RocketMQ 的命名服务,称之为 NameServer,它是用来管理 Topic 的订阅发布关系、消息发送和消费拓扑的,得让生产者知道 “我这个 Topic 的信息发往哪些 Broker”,得让消费者知道 “我得去哪些 Broker 上消费这个 Topic 的消息”。NameServer 能够多机部署变成一个 NameServer 集群保证高可用,但这些机器间彼此并不通讯,也就是说三者的元数据舍弃了强一致性。

这些元数据是怎么来的呢,首先 Broker 启动时会向所有的 NameServer 机器注册心跳,心跳里包含本身机器上 Topic 的拓扑信息,以后每隔 30s 更新一次,而后生产者和消费者启动的时候任选一台 NameServer 机器拉取所需的 Topic 的路由信息缓存在本地内存中,以后每隔 30s 定时从远端拉取更新本地缓存。

NameServer 机器中定时扫描 Broker 的心跳,一旦失联超出 2min,即关闭这个 Broker 的链接,但不主动通知生产组和消费组,所以两者最长须要 30s 才能感知到某个 Broker 故障。

生产组和消费者

架构图两端的就是生产组和消费组,都是多机的集群,由若干个生产者和消费者实例组成,消费者消费消息时有两种模式,一种是广播模式一种是集群消费模式,前者表示一条消息会被消费组下的全部消费者实例消费,后者表示一条消息只会被消费组下的一个实例消费到,考虑到集群消费模式是目前使用主流,所以本文主要谈论后者。

Topic 和 MessageQueue

Topic 咱们以前讲过了,表明某种消息类型,为了达到消费性能可横向扩展的需求,RocketMQ 引入了 MessageQueue 这个逻辑概念,将一个 Topic 划分为多个 MessageQueue,默认是四个。而 MessageQueue 和消费者实例是一对一的关系,消费者实例和 MessageQueue 是一对多的关系。

例如架构图中,Topic 下分为四个 MessageQueue,分布在两个 Broker 机器上,生产者组将消息平均发往四个 MessageQueue,而因为消费组中仅有两个消费者实例,所以每一个消费者实例平均消费两个 MessageQueue

一旦性能不足,能够扩容消费组增长消费者实例至四个,那么每一个消费者实例消费一个 MessageQueue,从而达到消费能力的横向扩展。

Broker

Broker 做为消息代理服务器,最重要的职责是存储消息和管理消费进度(集群消费模式下专有)。单个 Topic 下的多个 MessageQueue 通常来讲会分散在多个 Broker 上面达到容灾的目的。

Topic 经过打散 MessageQueue 达到容灾目的,那么 Broker 机器维度又是怎么容灾的呢,RocketMQ 容许设置主备 Broker,两者间经过异步拉取复制的方式进行消息同步,一旦主 Broker 宕机,备机能够提供消息消费,但不提供消息写入,也就是说其实主备之间并无 Failover 功能,这保证了写入主的消息不会丢失,可是会影响系统的可用性。

滴滴内部作过针对性地作过二次开发,简单来讲实现的方式是 NameServer 集群经过 ZK 选举出一个 Leader,来完成 Failover 的决策。

为了简洁,本文图例中没有说明的状况下,均不画出 Slave Broker

RocketMQ Producer

消息发送的负载均衡

RocketMQ架构图.png

仍是看这张结构图,生产者发送消息,默认采用轮询的方式达到负载均衡,每一个生产者实例内存中都知道 TopicMessageQueue 的分布拓扑信息,所以经过轮询就能够将消息平均发送到这些管道里。

消息发送的高可用

咱们以前提到过,Broker 会向 NameServer 集群全部机器发送心跳,NameServer 集群里的机器各自按期扫描失联的 Broker,关闭链接,但不会主动通知生产者组,须要等待生产者主动来拉取。所以存在元数据不一致的窗口,此窗口最长为 30s

故障延时.png

因为上述缘由,消息生产者不可避免的会将消息发往已经故障的 Broker 机器,例如上图,Producer-01 先将消息发往 Broker-A 上的 MessageQueue-01,发现失败了,因为轮询发送机制它继续发往 MessageQueue-02,因为仍是位于 Broker-A 机器上,所以依旧失败了,默认状况下同步发送消息重试三次,所以极可能这条消息因为没有规避 Broker-A 致使发送失败,实际上 Broker-B 仍是存活的,彻底能够规避掉故障的 Broker-A 机器提早选择 Broker-B 发送消息。

故障延时2.png

RocketMQ 中将生产者端剔除故障机器的机制称之为 Broker 的故障延迟机制,一旦发现发送到某个 Broker 机器失败,则暂时将其剔除,优先选择其余 Broker 重试。

RocketMQ Consumer

看完了消息发送部分,本节咱们进入消息的消费。消息的消费相较于消息的发送会复杂一些。咱们想一下,假设你某个生产者实例宕机了,那最多就是少了个消息的发送者,而绝大多数状况下消息的生产者都是无状态的,流量能够任意打到某个生产者,若是其一宕机那么我经过一些措施摘掉这台机器的流量就能够。可是消费者没有这么简单,由于它们并非无状态的,它们是固定在消费某一些 TopicMessageQueue,所以宕机任意一台消费者都涉及到消费拓扑的从新变动,这带来了更多的复杂度。

Message Queue 的分配

MessageQueue 存在的意义前面已经谈过再也不复述,本节讲一下若是将特定数量的 MessageQueue 分配给消费者组下的消费者实例,注意!这实际上是个技术活。

消费者组下的消费者实例,怎么知道本身须要消费某个 Topic 下的哪些 MessageQueue 呢?例如架构图中,只有两个消费者实例,可是总共有四个 MessageQueue,他们如何知道各自消费两个,并且尚未冲突的。

为了简单,假设咱们的系统是新搭建的,两台 Consumer 都是第一次启动,所以这里不涉及 Rebalance 机制

分配方案是在消费者实例启动的时候去执行的,消费者实例启动的时候回从 NameServer 上获取本身订阅的 Topic 的拓扑信息,包括该 Topic 下总共有几个 MessageQueue,分布在哪些 Broker 机器上等等。而后向其中全部 Broker 机器发送心跳。最后选取任意一台 Broker,从上面获取消费组下总共有几个实例。

MessageQueue分配.png

如此一来,消费者实例就知道了 MessageQueue 信息(mqSet)和消费组下的实例个数(consumerIdSet)信息。在本地内存中经过简单的分配算法,就能够知道本身应该负责消费哪些 MessageQueue 了。

须要注意的是,每一个客户端获取到 mqSetconsumerIdSet 以后都须要首先进行排序!目的是为了在执行分配算法时,每一个客户端的视图都是一致的。

MessageQueue分配策略.png

RocketMQ 针对 MessageQueue 提供了多种可选的分配策略,例如平均分配、轮询分配、固定分配等,在实际生产环境中可能还须要根据机房进行就近路由分配、粘滞分配(使得 MessageQueue 变更次数最小)等。

保证局部顺序消费

顺序消费是应用场景对消息队列中间件提出的需求,例如某个 ID = 100 的支付业务,在其生命周期内会发送三条消息:

  1. 订单生成
  2. 订单付款
  3. 订单结束

由于订单 ID 同为 100 属于一个订单,所以要求消费组在消费这三条消息时保证先消费第一条,而后才能消费第二条,最后才是第三条。若是此时还有 ID = 300 的订单,那么两者之间能够交叉,可是这三个过程必须保证升序。

顺序发送.png

保证消息局部顺序消费的重点在于:

  1. 生产者组经过计算,将相同 ID 的订单消息发往同一个 MessageQueue
  2. 消费者组经过分别位于 Broker 和客户端的两把锁,保证对该 MessageQueue 内消息的顺序消费

发往同一个 MessageQueue 保证了该 MessageQueue 内消息是局部有序的,可是没法保证全局有序,想要全局有序?那这个 Topic 只能配一个 MessageQueue,而后所有消息都发到这一个 MessageQueue 中。通常来讲,局部有序已经能够知足绝大部分应用场景了。

生产端的保证达到了,下面就是消费端,依靠的是两把锁,分别位于 Broker 侧和消费者实例客户端侧。Broker 侧的锁是 MessageQueue 粒度的,保证同一时间至多只有一个消费者实例消费该 MessageQueue

顺序消费加锁.png

你可能疑惑,原本不就是一对一的关系么?缘由是在消费者组进行 Rebalance 的时候可能会形成某个时间窗口内单个 MessageQueue 被多个消费者实例同时消费,这里经过加锁限制了这种状况。一旦启动时加锁失败,意味着该 MessageQueue 还在被其余消费者实例锁定,所以不建立相应的消息拉取任务,等到锁被释放或者超时(默认 60s)。加锁成功后消费者实例还会每隔 20s 定时锁定该 MessageQueue 一次。

消费者实例侧因为可能同时负责消费多个 MessageQueue,所以采用了线程池消费消息,须要在客户端提供加锁的方式保证单个 MessageQueue 内的消息同一时间仅被一个线程消费。

管理消费进度

在广播消费模式下,消费进度仅存储在消费者实例本地,而在集群消费模式下,消费进度存储在 Broker 上。经过 Topic + 消费者组名称做为 keyvalue 中分别记录每一个 MessageQueue 对应该消费者组的消费偏移量,所以消费进度是消费者组之间互相隔离的。

早期版本 Kafkaoffset 保存在 ZK 上,Path 为 consumers/{consume-group}/offsets/{topic}/{partition},其实和 RocketMQ 的保存方式是一致的

利用 offset 记录消费进度本质上是一种批量 ACK 的方法,它的优势在于 Broker 的消费进度管理粒度从单条消息变为单个 MessageQueue,简化了 Broker 的复杂度。那么下一个问题,消费者和 Broker 都是在什么时候提交和持久化各自的 offset 的呢?

首先,消费者侧会记录本身的消费进度到内存中的 OffsetTable,经过每五秒一次的定时任务提交到 Broker 侧,Broker 接收到以后保存在内存中,并定时刷到磁盘上的 json 文件里。

这里须要注意的是,因为一批消息的消费次序不肯定,可能下标大的消息先被消费结束,下标小的因为延时还没有被消费,此时消费者向 Broker 提交的 offset 应该是已被消费的最小下标,从而保证消息不被遗漏,但缺点在于可能重复消费消息。

消费进度.png

Rebalance

消息队列系统中,常常会出现 Broker 实例的增删、Topic 的增减、TopicMessageQueue 数目的增减、消费组实例数目的增减等状况,它们都会触发消费关系的从新分配,这个过程称之为 Rebalance

RocketMQRebalance 机制有主动和被动之分,主动意为消费者实例每隔 20s 会定时计算本身的消费拓扑并和内存中的对比,一旦发现部分 MessageQueue 再也不是本身负责消费,则中止对它的消息拉取任务;若是有新的 MessageQueue 变为本身负责,则建立对它的消息拉取任务。

被动意为,Broker 能够主动通知某个消费组下的全部实例,要求它们当即开始一次 Rebalance,经常使用于新的消费者实例加入、或者 Broker 检测到有消费者实例心跳失联等状况,下面是一个消费者实例新加入的场景。

当即rebalance.png

RocketMQRebalance 因为部分时刻的视图可能存在不一致,所以单次 Rebalance 并不能彻底保证必定达到最终效果,可是因为它是一种周期性的任务,因此最终系统里的 MessageQueue 会被分配彻底。

RocketMQRebalance 机制依靠客户端各自单独计算获得,Kafka 新版本中则依靠 Consumer Leader 单点计算后再上传至 Group Coordinator,由它下发至每一个消费者实例进行更新。

这两种方式各有优缺点,一般来讲,单点计算能够最大程度减少视图不一致致使的频繁 Rebalance 现象(但也不能杜绝),可是缺点在于逻辑复杂,消费者组和 Broker 中都须要选取单点,一个负责计算一个负责下发通知;客户端计算实现上更简单,彼此独立,经过周期性任务最终也能完成从新分配的任务,可是因为客户端彼此获取的视图不作校验,所以可能存在因为视图不一致致使的重复消费和频繁 Rebalance

RocketMQ 消息存储

硬核内容不少,并且文件存储我接触的也很少更不敢瞎写了,这块后续会视个人学习状况看看是否单独再开一个坑。

若是有同窗很想了解这部份内容的话,我贴几篇在资料搜集过程当中看到的比较好的博文:

  1. RocketMQ高性能之底层存储设计
  2. RocketMQ 消息存储流程

总结

若是你把 RocketMQKafka 对比起来看,其实消息队列的设计哲学有不少类似之处,但在文件存储粒度、分区容灾、负载均衡等方面,两者又有本身的设计考量,采用了不一样的实现思路,结合 KafkaISR 同步、RebalancePartition Failover 等机制一块儿学习的话,这种感觉会更强烈一些,但愿这篇文章对你们有所启发。

相关文章
相关标签/搜索