上上周在团队内部作了一个关于 RocketMQ 的分享,本文记录一下分享的大部份内容git
这是公众号 Young_Blog 的第 41 篇文章github
为何没有选择 Kafka
而是 RocketMQ
呢,没有什么特别的缘由,单纯是我以前就看过一点 RocketMQ
的源码,可是后来由于各类缘由没能看完,所以想着趁此次机会系统地回顾一遍。算法
另外就是,以前个人工做集中在客户端或者服务端,不多端到端地去设计、开发某项功能,所以在架构设计方面积累的经验比较少。这点劣势在年初接手开发新项目的时候尤其明显,新项目涉及注册发现、客户端、服务端、监控、网络编程等一系列模块,由于以前没有接触过这么复杂的项目,也不了解业务成熟的大型项目的设计哲学,所以在轮到本身作架构设计的时候提出的方案老是存在一些缺陷,后续致使了返工的现象。编程
那时起我就琢磨着找一个大型开源项目研究一下,学习一下它在架构设计上的考量,RocketMQ
不论从规模仍是复杂度上讲都很合适,所以就选了它。json
最近腾讯开源了 TubeMQ,感兴趣的能够去看看 TubeMQ缓存
首先,明确下消息队列的设计目标,一般有如下几点:服务器
貌似要求越写越多,那设想一下,若是从零开始设计一个消息队列,知足以上五个需求,须要哪些组成部分。网络
第一个很容易想到的就是生产者(Producer
)和消费者(Consumer
),这是消息队列两端最重要的组成部分。一般状况下,生产者和消费者都不会是单台机器,而是一个集群,所以用生产组(Producer Group
)和消费组(Consumer Group
)来描述。架构
紧接着为了容许消息暂存和消息积压,咱们一般须要一个消息代理服务器(Broker
),生产者和消费者都仅链接 Broker
进行消息的发送和消费,两者之间彻底解耦。负载均衡
消息发送一般须要一个管道进行投递,并且不一样类型的消息最好发往不一样的管道,管道的另外一端链接消费者,经过增长管道数目和消费者数目,咱们就达到了横向扩展消费能力的目的,这里咱们将管道成为 Queue
,将消息类型成为 Topic
。
很好,目前咱们搭建了 生产者 -> Queue -> Broker -> Queue -> 消费者 的完整流程,一条消息已经能够端到端地发送了。可是思考下,若是 Topic
和 Queue
的数目不少,某些生产者和消费者只关注其中一些,那么咱们还须要为这种订阅/发布关系提供一个注册平台,称之为 NameService
(命名服务)来统一管理消息订阅的拓扑关系。
上面废了这么多口舌,目的就在于让不了解消息队列的同窗在进入下一节以前,不至于一脸懵逼。技术的架构设计都是一步一步来的,消息队列也不是一开始就演变成 Kafka
或者 RocketMQ
这种架构的,它们都经历过为了支持某些业务需求而不得不作的架构演进,最后变成了如今目前业界比较成熟的模型,而咱们上述的假设其实已经走了不少捷径。
这节咱们来看看 RocketMQ
的具体架构,结合这张结构图咱们会介绍里面每一个组成部分的功能和设计考量。
位于最顶端的是 RocketMQ
的命名服务,称之为 NameServer
,它是用来管理 Topic
的订阅发布关系、消息发送和消费拓扑的,得让生产者知道 “我这个 Topic
的信息发往哪些 Broker
”,得让消费者知道 “我得去哪些 Broker
上消费这个 Topic
的消息”。NameServer
能够多机部署变成一个 NameServer
集群保证高可用,但这些机器间彼此并不通讯,也就是说三者的元数据舍弃了强一致性。
这些元数据是怎么来的呢,首先 Broker
启动时会向所有的 NameServer
机器注册心跳,心跳里包含本身机器上 Topic
的拓扑信息,以后每隔 30s
更新一次,而后生产者和消费者启动的时候任选一台 NameServer
机器拉取所需的 Topic
的路由信息缓存在本地内存中,以后每隔 30s
定时从远端拉取更新本地缓存。
NameServer
机器中定时扫描 Broker
的心跳,一旦失联超出 2min
,即关闭这个 Broker
的链接,但不主动通知生产组和消费组,所以两者最长须要 30s
才能感知到某个 Broker
故障。
架构图两端的就是生产组和消费组,都是多机的集群,由若干个生产者和消费者实例组成,消费者消费消息时有两种模式,一种是广播模式一种是集群消费模式,前者表示一条消息会被消费组下的全部消费者实例消费,后者表示一条消息只会被消费组下的一个实例消费到,考虑到集群消费模式是目前使用主流,所以本文主要谈论后者。
Topic
咱们以前讲过了,表明某种消息类型,为了达到消费性能可横向扩展的需求,RocketMQ
引入了 MessageQueue
这个逻辑概念,将一个 Topic
划分为多个 MessageQueue
,默认是四个。而 MessageQueue
和消费者实例是一对一的关系,消费者实例和 MessageQueue
是一对多的关系。
例如架构图中,Topic
下分为四个 MessageQueue
,分布在两个 Broker
机器上,生产者组将消息平均发往四个 MessageQueue
,而因为消费组中仅有两个消费者实例,所以每一个消费者实例平均消费两个 MessageQueue
。
一旦性能不足,能够扩容消费组增长消费者实例至四个,那么每一个消费者实例消费一个 MessageQueue
,从而达到消费能力的横向扩展。
Broker
做为消息代理服务器,最重要的职责是存储消息和管理消费进度(集群消费模式下专有)。单个 Topic
下的多个 MessageQueue
通常来讲会分散在多个 Broker
上面达到容灾的目的。
Topic
经过打散 MessageQueue
达到容灾目的,那么 Broker
机器维度又是怎么容灾的呢,RocketMQ
容许设置主备 Broker
,两者间经过异步拉取复制的方式进行消息同步,一旦主 Broker
宕机,备机能够提供消息消费,但不提供消息写入,也就是说其实主备之间并无 Failover
功能,这保证了写入主的消息不会丢失,可是会影响系统的可用性。
滴滴内部作过针对性地作过二次开发,简单来讲实现的方式是 NameServer
集群经过 ZK
选举出一个 Leader
,来完成 Failover
的决策。
为了简洁,本文图例中没有说明的状况下,均不画出 Slave Broker
仍是看这张结构图,生产者发送消息,默认采用轮询的方式达到负载均衡,每一个生产者实例内存中都知道 Topic
下 MessageQueue
的分布拓扑信息,所以经过轮询就能够将消息平均发送到这些管道里。
咱们以前提到过,Broker
会向 NameServer
集群全部机器发送心跳,NameServer
集群里的机器各自按期扫描失联的 Broker
,关闭链接,但不会主动通知生产者组,须要等待生产者主动来拉取。所以存在元数据不一致的窗口,此窗口最长为 30s
。
因为上述缘由,消息生产者不可避免的会将消息发往已经故障的 Broker
机器,例如上图,Producer-01
先将消息发往 Broker-A
上的 MessageQueue-01
,发现失败了,因为轮询发送机制它继续发往 MessageQueue-02
,因为仍是位于 Broker-A
机器上,所以依旧失败了,默认状况下同步发送消息重试三次,所以极可能这条消息因为没有规避 Broker-A
致使发送失败,实际上 Broker-B
仍是存活的,彻底能够规避掉故障的 Broker-A
机器提早选择 Broker-B
发送消息。
RocketMQ
中将生产者端剔除故障机器的机制称之为 Broker
的故障延迟机制,一旦发现发送到某个 Broker
机器失败,则暂时将其剔除,优先选择其余 Broker
重试。
看完了消息发送部分,本节咱们进入消息的消费。消息的消费相较于消息的发送会复杂一些。咱们想一下,假设你某个生产者实例宕机了,那最多就是少了个消息的发送者,而绝大多数状况下消息的生产者都是无状态的,流量能够任意打到某个生产者,若是其一宕机那么我经过一些措施摘掉这台机器的流量就能够。可是消费者没有这么简单,由于它们并非无状态的,它们是固定在消费某一些 Topic
的 MessageQueue
,所以宕机任意一台消费者都涉及到消费拓扑的从新变动,这带来了更多的复杂度。
MessageQueue
存在的意义前面已经谈过再也不复述,本节讲一下若是将特定数量的 MessageQueue
分配给消费者组下的消费者实例,注意!这实际上是个技术活。
消费者组下的消费者实例,怎么知道本身须要消费某个 Topic
下的哪些 MessageQueue
呢?例如架构图中,只有两个消费者实例,可是总共有四个 MessageQueue
,他们如何知道各自消费两个,并且尚未冲突的。
为了简单,假设咱们的系统是新搭建的,两台 Consumer 都是第一次启动,所以这里不涉及 Rebalance 机制
分配方案是在消费者实例启动的时候去执行的,消费者实例启动的时候回从 NameServer
上获取本身订阅的 Topic
的拓扑信息,包括该 Topic 下总共有几个 MessageQueue
,分布在哪些 Broker
机器上等等。而后向其中全部 Broker
机器发送心跳。最后选取任意一台 Broker
,从上面获取消费组下总共有几个实例。
如此一来,消费者实例就知道了 MessageQueue
信息(mqSet
)和消费组下的实例个数(consumerIdSet
)信息。在本地内存中经过简单的分配算法,就能够知道本身应该负责消费哪些 MessageQueue
了。
须要注意的是,每一个客户端获取到 mqSet
和 consumerIdSet
以后都须要首先进行排序!目的是为了在执行分配算法时,每一个客户端的视图都是一致的。
RocketMQ
针对 MessageQueue
提供了多种可选的分配策略,例如平均分配、轮询分配、固定分配等,在实际生产环境中可能还须要根据机房进行就近路由分配、粘滞分配(使得 MessageQueue
变更次数最小)等。
顺序消费是应用场景对消息队列中间件提出的需求,例如某个 ID = 100
的支付业务,在其生命周期内会发送三条消息:
由于订单 ID
同为 100
属于一个订单,所以要求消费组在消费这三条消息时保证先消费第一条,而后才能消费第二条,最后才是第三条。若是此时还有 ID = 300
的订单,那么两者之间能够交叉,可是这三个过程必须保证升序。
保证消息局部顺序消费的重点在于:
ID
的订单消息发往同一个 MessageQueue
Broker
和客户端的两把锁,保证对该 MessageQueue
内消息的顺序消费发往同一个 MessageQueue
保证了该 MessageQueue
内消息是局部有序的,可是没法保证全局有序,想要全局有序?那这个 Topic
只能配一个 MessageQueue
,而后所有消息都发到这一个 MessageQueue
中。通常来讲,局部有序已经能够知足绝大部分应用场景了。
生产端的保证达到了,下面就是消费端,依靠的是两把锁,分别位于 Broker
侧和消费者实例客户端侧。Broker
侧的锁是 MessageQueue
粒度的,保证同一时间至多只有一个消费者实例消费该 MessageQueue
。
你可能疑惑,原本不就是一对一的关系么?缘由是在消费者组进行 Rebalance
的时候可能会形成某个时间窗口内单个 MessageQueue
被多个消费者实例同时消费,这里经过加锁限制了这种状况。一旦启动时加锁失败,意味着该 MessageQueue
还在被其余消费者实例锁定,所以不建立相应的消息拉取任务,等到锁被释放或者超时(默认 60s
)。加锁成功后消费者实例还会每隔 20s 定时锁定该 MessageQueue
一次。
消费者实例侧因为可能同时负责消费多个 MessageQueue
,所以采用了线程池消费消息,须要在客户端提供加锁的方式保证单个 MessageQueue
内的消息同一时间仅被一个线程消费。
在广播消费模式下,消费进度仅存储在消费者实例本地,而在集群消费模式下,消费进度存储在 Broker
上。经过 Topic + 消费者组名称
做为 key
,value
中分别记录每一个 MessageQueue
对应该消费者组的消费偏移量,所以消费进度是消费者组之间互相隔离的。
早期版本
Kafka
将offset
保存在ZK
上,Path
为 consumers/{consume-group}/offsets/{topic}/{partition},其实和RocketMQ
的保存方式是一致的
利用 offset
记录消费进度本质上是一种批量 ACK
的方法,它的优势在于 Broker
的消费进度管理粒度从单条消息变为单个 MessageQueue
,简化了 Broker
的复杂度。那么下一个问题,消费者和 Broker
都是在什么时候提交和持久化各自的 offset 的呢?
首先,消费者侧会记录本身的消费进度到内存中的 OffsetTable
,经过每五秒一次的定时任务提交到 Broker
侧,Broker
接收到以后保存在内存中,并定时刷到磁盘上的 json
文件里。
这里须要注意的是,因为一批消息的消费次序不肯定,可能下标大的消息先被消费结束,下标小的因为延时还没有被消费,此时消费者向 Broker
提交的 offset
应该是已被消费的最小下标,从而保证消息不被遗漏,但缺点在于可能重复消费消息。
消息队列系统中,常常会出现 Broker
实例的增删、Topic
的增减、Topic
下 MessageQueue
数目的增减、消费组实例数目的增减等状况,它们都会触发消费关系的从新分配,这个过程称之为 Rebalance
。
RocketMQ
的 Rebalance
机制有主动和被动之分,主动意为消费者实例每隔 20s
会定时计算本身的消费拓扑并和内存中的对比,一旦发现部分 MessageQueue
再也不是本身负责消费,则中止对它的消息拉取任务;若是有新的 MessageQueue
变为本身负责,则建立对它的消息拉取任务。
被动意为,Broker
能够主动通知某个消费组下的全部实例,要求它们当即开始一次 Rebalance
,经常使用于新的消费者实例加入、或者 Broker 检测到有消费者实例心跳失联等状况,下面是一个消费者实例新加入的场景。
RocketMQ
的 Rebalance
因为部分时刻的视图可能存在不一致,所以单次 Rebalance
并不能彻底保证必定达到最终效果,可是因为它是一种周期性的任务,因此最终系统里的 MessageQueue
会被分配彻底。
RocketMQ
的 Rebalance
机制依靠客户端各自单独计算获得,Kafka
新版本中则依靠 Consumer Leader
单点计算后再上传至 Group Coordinator
,由它下发至每一个消费者实例进行更新。
这两种方式各有优缺点,一般来讲,单点计算能够最大程度减少视图不一致致使的频繁 Rebalance
现象(但也不能杜绝),可是缺点在于逻辑复杂,消费者组和 Broker
中都须要选取单点,一个负责计算一个负责下发通知;客户端计算实现上更简单,彼此独立,经过周期性任务最终也能完成从新分配的任务,可是因为客户端彼此获取的视图不作校验,所以可能存在因为视图不一致致使的重复消费和频繁 Rebalance
。
硬核内容不少,并且文件存储我接触的也很少更不敢瞎写了,这块后续会视个人学习状况看看是否单独再开一个坑。
若是有同窗很想了解这部份内容的话,我贴几篇在资料搜集过程当中看到的比较好的博文:
若是你把 RocketMQ
和 Kafka
对比起来看,其实消息队列的设计哲学有不少类似之处,但在文件存储粒度、分区容灾、负载均衡等方面,两者又有本身的设计考量,采用了不一样的实现思路,结合 Kafka
的 ISR
同步、Rebalance
、Partition Failover
等机制一块儿学习的话,这种感觉会更强烈一些,但愿这篇文章对你们有所启发。