今日头条的服务大量使用微服务,容器数目巨大,业务线繁多, Topic 的数量也很是多。另外,使用的语言比较繁杂,包括 Python,Go, C++, Java, JS 等,对于基础组件的接入,维护 SDK 的成本很高。算法
引入 RocketMQ 以前采用的消息队列是 NSQ 和 kafka , NSQ 是纯内存的消息队列,缺乏消息的持久性,不落盘直接写到 Golang 的 channel 里,在并发量高的时候 CPU 利用率很是高,其优势是能够无限水平扩展,另外,因为不须要保证消息的有序性,集群单点故障对可用性基本没有影响,因此具备很是高的可用性。咱们也用到了 Kafka ,它的主要问题是在业务线和 Topic 繁多,其写入性能会出现明显的降低,拆分集群又会增长额外的运维负担。而且在高负载下,其故障恢复时间比较长。因此,针对当时的情况和业务场景的需求,咱们进行了一些调研,指望选择一款新的 MQ 来比较好的解决目前的困境,最终选择了 RocketMQsql
这是一个通过阿里巴巴多年双11验证过的、能够支持亿级并发的开源消息队列,是值得信任的。其次关注一下他的特性。 RocketMQ 具备高可靠性、数据持久性,和 Kafka 同样是先写 PageCache ,再落盘,而且数据有多副本;而且它的存储模型是全部的 Topic 都写到同一个 Commitlog 里,是一个append only 操做,在海量 Topic 下也能将磁盘的性能发挥到极致,而且保持稳定的写入时延。而后就是他的性能,通过咱们的 benchmark ,采用一主两从的结构,单机 qps 能够达到 14w , latency 保持在 2ms 之内。对比以前的 NSQ 和 Kafka , Kafka 的吞吐很是高,可是在多 Topic 下, Kafka 的 PCT99 毛刺会很是多,并且平均值很是长,不适合在线业务场景。另外 NSQ 的消息首先通过 Golang 的 channel ,这是很是消耗 CPU 的,在单机 5~6w 的时候 CPU 利用率达到 50~60% ,高负载下的写延迟不稳定。另外 RocketMQ 对在线业务特性支持是很是丰富的,支持 retry , 支持并发消费,死信队列,延时消息,基于时间戳的消息回溯,另外消息体支持消息头,这个是很是有用的,能够直接支持实现消息链路追踪,否则就须要把追踪信息写到 message 的 body 里;还支持事务的消息。综合以上特性最终选择了 RocketMQ 。后端
下面简单介绍下,今日头条的部署结构,如图所示:数组
因为生产者种类繁多,咱们倾向于保持客户端简单,由于推进 SDK 升级是一个很沉重的负担,因此咱们经过提供一个 Proxy 层,来保持生产端的轻量。 Proxy 层是由一个标准的 gRpc 框架实现,也能够用 thrift ,固然任何 RPC 都框架均可以实现。缓存
Producer 的 Proxy 相对比较简单,虽然在 Producer 这边也集成了不少好比路由管理、监控等其余功能, SDK 只需实现发消息的请求,因此 SDK 的很是轻量、改动很是少,在迭代过程当中也不须要一个个推业务去升级 SDK 。 SDK 经过服务发现去找到一个 Proxy 实例,而后创建链接发送消息, Proxy 的工做是根据 RPC 请求的消息转发到对应的 Broker 集群上。 Consumer Proxy 实现的是 pull 和二次 reblance 的逻辑,这个后面会讲到,至关于把 Consumer 的 pull 透传给 Brokerset , Proxy 这边会有一个消息的 cache ,必定程度上下降对 broker page cache 的污染。这个架构和滴滴的 MQ 架构有点类似,他们也是以前作了一个 Proxy ,用 thrift 作 RPC ,这对后端的扩容、运维、减小 SDK 的逻辑上来讲都是颇有必要的。多线程
有如下几点缘由:
一、 SDK 会很是简单轻量。架构
二、很容易对流量进行控制; Proxy 能够对生产端的流量进行控制,好比咱们指望某些Broker压力比较大的时候,可以切一些流量或者说切流量到另外的机房,这种流量的调度,多环境的支持,再好比有些预发布环境、预上线环境的支持,咱们 Topic 这边写入的流量能够在 Proxy 这边能够很方便的完成控制,不用修改 SDK 。并发
3,解决链接的问题;特别是解决 Python 的问题, Python 实现的服务若是要得到高并发度,通常是采起多进程模型,这意味着一个进程一个链接,特别是对于部署到 Docker 里的 Python 服务,它可能一个容器里启动几百个进程,若是直接连到 Broker ,这个 Broker 上的链接数可能到几十上百万,此时 CPU 软中断会很是高,致使读写的延时的明显上涨app
4,经过 Proxy ,多了一个代理,在消费不须要顺序的状况下,咱们能够支持更高的并发度, Consumer 的实例数能够超过 Consume Queue 的数量。框架
5,能够无缝的继承其余的 MQ 。中间有一层 Proxy ,后面能够更改存储引擎,这个对客户端是无感知的。
6,在 Conusmer 在升级或 Restart 的时候, Consumer 若是直接连 broker 的话, rebalance 触发比较频繁, 若是 rebalance 比较频繁,且 Topic 量比较大的时候,可能会形成消息堆积,这个业务不是太接受的;若是加一层 Proxy 的话, rebalance 只在 Proxt 和 Broker 之间进行,就不须要 Consumer 再进行一次 rebalance , Proxy 只须要维护着和本身创建链接的 Consumer 就能够了。当消费者重启或升级的时候,能够最小程度的减小 rebalance 。
以上是咱们经过 Proxy 接口给 RocketMQ 带来的好处。由于多了一层,也会带来额外的 Overhead 的,以下:
1,会消耗 CPU , Proxy 那一层会作RPC协议的序列化和反序列化。
以下是 Conusme Proxy 的结构图,它带来了消费并发度的提升。因为咱们的 Broker 集群是独立部署的,考虑到broker主要是消耗包括网卡、磁盘和内存资源,对于 CPU 的消耗反而不高,这里的解决方式直接进行混合部署,而后直接在新的机器上进行扩,可是 Broker 这边的 CPU 也是能够获得利用的。
2,延迟问题。通过测试,在 4Kmsg、20W Tps 下,延迟会有所增长,大概是 1ms ,从 2ms 到 3ms 左右,这个时延对于业务来讲是能够接受的。
下面看下 Consumer 这边的逻辑,以下图所示,
好比上面部署了两个 Proxy , Broker,左边有 6 个 Queue ,对于顺序消息来讲,左边这边 rebalance 是一个相对静态的结果, Consumer 的上下线是比较频繁的。对于顺序消息来讲,左边和以前的逻辑是保持一致的, Proxy 会为每一个 Consumer 实例分配到合适的数量的 Queue ;对于不关心顺序性的消息,Proxy 会把全部的消息都放到一个队列里,而后从这个队列 dispatch 到各个 Consumer ,对于乱序消息来讲,理论上来讲 Consumer 数量能够无限扩展的;相对于和普通 Consumer 直连的状况,Consumer 的数量若是超过了Consume Queue的数量,其中多出来的 Consumer 是没有办法分配到 Queue 的,并且在容器部署环境下,单 Consumer 不能起太多线程去支撑高并发;在容器这个环境下,比较好的方式是多实例,而后按照 CPU 的核心数,启动多个线程,好比 8C 的启动 8 个线程,由于容器是有 Quota 的,通常是 1C,2C,4C,8C 这样,这种状况下,若是线程数超过了 CPU 的核心数,其实对并发度并无太大的意义。
接下来,分享一下作这个接入方式的时候遇到的一些问题,以下图所示:
一、消息大小的限制。
由于这里有一层 RPC ,在 RPC 请求过程当中会有单次请求大小的限制;另一方面是 RocketMQ 的 producer 里会有一个 MaxMessageSize 方法去控制消息不能超过这个大小; Broker 里也有一个参数,是 Broker 启动的配置,这个须要Broker重启,否则修改也不生效, Broker 里面有一个 DefaultAppendMessage 配置,是在启动的时候传进去对的参数,若是仅 NameServer 在线变动是不生效的,并且超过这个大小会报错。由于如今 RocketMQ 默认是 4M 的消息,若是将 RocketMQ 做为日志总线,可能消息体大小不是太够, Procuer 和 Broker 是都须要作变动的。
二、多链接的问题。
若是看 RocketMQ 源码会发现,多个 Producer 是共享一个底层的 MQ Client 实例的,由于一个 socket 链接吞吐是有限的,因此只会和Broker创建一个socket链接。另外,咱们也有 socket 与 socket 之间是隔离的,能够经过 Producer 的 setIntanceName() ,当与 DefaultI Instance 的 name 不同时会新启动一个 Client 的,其实就是一个新的 socket 链接,对于有隔离需求的、链接池需求得等,这个参数是有用的,在 4.5.0 上新加了一个接口是指定构造的实例数量。
三、超时设置。
由于多了一层 RPC ,那一层是有一个超时设置的,这个会有点不同,由于咱们的 RPC 请求里会带上超时设置的,客户端到 Proxy 有一个 RTT ,而后 Producer 到 Broker 的发送消息也是有一个请求响应延时,须要给 SDK 一个正确的超时语义。
四、如何选择一个合适的 reblance 算法,咱们遇到这个问题是在双机房同城容灾的背景下,会有一边 Topic 的 MessageQueue 没有写入。
这种状况下, RocketMQ 本身默认的是按照平均分配算法进行分配的,好比有 10 个 Queue , 3 个 Proxy 状况, 一、二、3 是对应 Proxy1,四、五、6 是对应 Proxy2,七、八、九、10 是对应 Proxy3 ,若是在双机房同城容灾部署状况下,通常有一半 Message Queue 是没有写入的,会有一大部分 Consumer 是启动了,可是分配到的 Message Queue 是没有消息写入的。而后另一个诉求是由于有跨机房的流量,因此他其实直接复用开源出来的 Consumer 的实现里就有根据 MachineRoom 去作 reblance ,会就近分配你的 MessageQueue 。
五、在 Proxy 这边须要作一个缓存,特别是拉消息的缓存。
特别提醒一下, Proxy 拉消息都是经过 Slave 去拉,不须要使用 Master 去拉, Master 的 IO 比较重;还有 Buffer 的管理,咱们是遇到过这种问题的,若是只考虑 Message 数量的话,会致使 OOM ,因此要注意消息 size 的设置,
六、端到端压缩。
由于 RocketMQ 在消息超过 4k 的时候, Producer 会进行压缩。若是不在客户端作压缩,这仍是涉及到 RPC 的问题, RPC 通常来讲, Byte 类型,就是 Byte 数组类型它是不会进行压缩的,只是会进行一些常规的编码,因此消息体须要在客户端作压缩。若是放在 Proxy 这边作, Proxy 压力会比较大,因此不如放在客户端去承载这个压缩。
前面大体介绍了咱们这边大体如何接入 RocketMQ ,如何实现这么一套 Proxy ,以及在实现这套 Proxy 过程当中遇到的一些问题。下面看一下灾难恢复的方案,设计之初也参考了一些潜在相关方案。
第一种方案:扩展集群,扩展集群的方案就像下图所示。
这是 master 和 slave 跨机房去部署的方式。由于咱们有一层 proxy ,因此能够很方便的去作流量的调度,让消息只在一个主机房进行消息写入,不须要一个相似中控功能的实体存在。
第二种方案:相似 MySQL 和 Redis 的架构模式,即单主模式,只有一个地方式写入的,以下图所示。数据是经过 Mysql Matser/Slave 方式同步到另外一个机房。这样 RocketMQ 会启动一个相似 Kafka 的 Mirror maker 类进行消息复制,这样会多一倍的冗余,实际上数据还会存在一些不一致的问题。
第三种方案:双写加双向复制的架构。这个结构太复杂很差控制,尤为是双向复制,其中消息区回环的问题比较好解决,只需针对在每一个正常的业务消息,在 Header 里加一个标志字段就好,另外的 Mirror 发现有这个字段就把这条消息直接丢掉便可。这个链路上维护复杂并且存在数据冗余,其中最大问题是两边的数据不对等,在一边挂掉状况下,对于一些没法接受数据不一致的是有问题的。
此外,双写都是没有 Mirror 的方案,以下图所示。这也是咱们最终选择的方案。咱们对有序消息和无序消息的处理方式不太同样,针对无序消息只需就近写本机房就能够了,对于有序消息咱们仍是会有一个主机房,Proxy 会去 NameServer 拉取 Broker 的 Queue 信息, Producer 将有序消息路由到一个指定主机房,消费端这一侧,就是就近拉取消息。对于顺序消息咱们会采起必定的调度逻辑保证均衡的分担压力获取消息,这个架构的优势是比较简单,缺点是当集群中一边挂掉时,会形成有序消息的无序,这边是经过记录消息 offset 来处理的。
此外,还有一种独立集群部署的,至关于没有上图中间的有序消息那条线,由于大多数有序消息是总体体系的,服务要部署单元化,好比某些 uid 、订单 Id 的消息或请求只会落到一边机房的,彻底不用担忧消息来得时候是否须要按照某些 key 去指定 MessageQueue ,由于过来的消息一定是隶属于这个机房的,也就是说中间有序消息那条线能够不用关心了,能够直接去掉。可是,这个是和整个公司部署方式以及单元化体系有关系的,对于部分业务咱们是直接作到两个集群,两边的生产者、消费者、Broker 、Proxy 所有是隔离的,两边都互不发现,就是这么一套运行方式,可是这就须要业务的上下游要作到单元化的程度才可行。