初期,公司内部没有专门的团队维护消息队列服务,因此消息队列使用方式较多,主要以Kafka为主,有业务直连的,也有经过独立的服务转发消息的。另外有一些团队也会用RocketMQ、Redis的list,甚至会用比较非主流的beanstalkkd。致使的结果就是,比较混乱,没法维护,资源使用也很浪费。缓存
一个核心业务在使用Kafka的时候,出现了集群数据写入抖动很是严重的状况,常常会有数据写失败。架构
主要有两点缘由:app
随着业务增加,Topic的数据增多,集群负载增大,性能降低;框架
咱们用的是Kafka0.8.2那个版本,有个bug,会致使副本从新复制,复制的时候有大量的读,咱们存储盘用的是机械盘,致使磁盘IO过大,影响写入。运维
因此咱们决定作本身的消息队列服务。异步
首先须要解决业务方消息生产失败的问题。由于这个Kafka用的是发布/订阅模式,一个topic的订阅方会有不少,涉及到的下游业务也就很是多,没办法一口气直接替换Kafka,迁移到新的一个消息队列服务上。因此咱们当时的方案是加了一层代理,而后利用codis做为缓存,解决了Kafka不按期写入失败的问题。当后面的Kafka出现不可写入的时候,咱们就会先把数据写入到codis中,而后延时进行重试,直到写成功为止。工具
通过一系列的调研和测试以后,咱们决定采用RocketMQ。性能
为了支持多语言环境、解决一些迁移和某些业务的特殊需求,咱们又在消费侧加上了一个代理服务。而后造成了这么一个核心框架。 业务端只跟代理层交互。中间的消息引擎,负责消息的核心存储。在以前的基本框架以后,咱们后面就主要围绕三个方向作。测试
咱们消息队列服务的一个比较新的现状,先纵向看,上面是生产的客户端,包括了7种语言。而后是咱们的生产代理服务。在中间的是咱们的消息存储层。目前主要的消息存储引擎是RocketMQ。而后还有一些在迁移过程当中的Kafka。另外一个是Chronos,它是咱们延迟消息的一个存储引擎。优化
再下面就是消费代理。消费代理一样提供了多种语言的客户端,还支持多种协议的消息主动推送功能,包括HTTP 协议 RESTful方式。结合咱们的groovy脚本功能,还能实现将消息直接转存到Redis、Hbase和HDFS上。此外,咱们还在陆续接入更多的下游存储。
除了存储系统以外,咱们也对接了实时计算平台,例如Flink,Spark,Storm,旁边是咱们的用户控制台和运维控制台。这个是咱们服务化的重点。用户在须要使用队列的时候,就经过界面申请Topic,填写各类信息,包括身份信息,消息的峰值流量,消息大小,消息格式等等。而后消费方经过咱们的界面,就能够申请消费。
运维控制台,主要负责咱们集群的管理,自动化部署,流量调度,状态显示之类的功能。最后全部运维和用户操做会影响线上的配置,都会经过ZooKeeper进行同步。
先看一组数据,用的是Kafka,开启消费,每条消息大小为2048字节能够看到,随着Topic数量增长,到256 Topic以后,吞吐极具降低。而后是RocketMQ。能够看到,Topic增大以后,影响很是小。第三组和第四组,是上面两组关闭了消费的状况。结论基本相似,总体吞吐量会高那么一点点。
下面的四组跟上面的区别是使用了128字节的小消息体。能够看到,Kafka吞吐受Topic数量的影响特别明显。对比来看,虽然topic比较小的时候,RocketMQ吞吐较小,可是基本很是稳定,对于咱们这种共享集群来讲比较友好。
面临的挑战(顺时针看)
客户端语言,须要支持PHP、Go、Java、C++;
只有3个开发人员;
决定用RocketMQ,可是没看过源码;
上线时间紧,线上的Kafka还有问题;
可用性要求高。
使用RocketMQ时的两个问题:
客户端语言支持不全,以Java为主,而咱们还须要支持PHP、Go、C++;
功能特别多,如tag、property、消费过滤、RETRYtopic、死信队列、延迟消费之类的功能,但这对咱们稳定性维护来讲,挑战很是大。
使用ThriftRPC框架来解决跨语言的问题;
简化调用接口。能够认为只有两个接口,send用来生产,pull用来消费。
主要策略就是坚持KISS原则(Keep it simple, stupid),保持简单,先解决最主要的问题,让消息可以流转起来。而后咱们把其余主要逻辑都放在了proxy这一层来作,好比限流、权限认证、消息过滤、格式转化之类的。这样,咱们就能尽量地简化客户端的实现逻辑,不须要把不少功能用各类语言都写一遍。
迁移这个事情,在pub-sub的消息模型下,会比较复杂。由于下游的数据消费方可能不少,上游的数据无法作到一刀切流量,这就会致使整个迁移的周期特别长。而后咱们为了尽量地减小业务迁移的负担,加快迁移的效率,咱们在Proxy层提供了双写和双读的功能。
双写:ProcucerProxy同时写RocketMQ和Kafka;
双读:ConsumerProxy同时从RocketMQ和Kafka消费数据。
有了这两个功能以后,咱们就能提供如下两种迁移方案了。
生产端双写,同时往Kafka和RocketMQ写一样的数据,保证两边在整个迁移过程当中都有一样的全量数据。 Kafka和RocketMQ有相同的数据,这样下游的业务也就能够开始迁移。若是消费端不关心丢数据,那么能够直接切换,切完直接更新消费进度。 若是须要保证消费必达,能够先在ConsumerProxy设置消费进度,消费客户端保证没有数据堆积后再去迁移,这样会有一些重复消息,通常客户端会保证消费处理的幂等。
业务那边不停原来的kafka 客户端。只是加上咱们的客户端,往RocketMQ里追加写。这种方案在整个迁移完成以后,业务还须要把老的写入停掉。至关于两次上线。
业务方直接切换生产的客户端,只往咱们的proxy上写数据。而后咱们的proxy负责把数据复制,同时写到两个存储引擎中。这样在迁移完成以后,咱们只须要在Proxy上关掉双写功能就能够了。对生产的业务方来讲是无感知的,生产方全程只须要改造一次,上一下线就能够了。
因此表面看起来,应该仍是第二种方案更加简单。可是,从总体可靠性的角度来看,通常仍是认为第一种相对高一点。由于客户端到Kafka这一条链路,业务以前都已经跑稳定了。通常不会出问题。可是写咱们Proxy就不必定了,在接入过程当中,是有可能出现一些使用上的问题,致使数据写入失败,这就对业务方测试质量的要求会高一点。而后消费的迁移过程,其实风险是相对比较低的。出问题的时候,能够当即回滚。由于它在老的Kafka上消费进度,是一直保留的,并且在迁移过程当中,能够认为是全量双消费。
以上就是数据双写的迁移方案,这种方案的特色就是两个存储引擎都有相同的全量数据。
特色:保证不会重复消费。对于P2P 或者消费下游不太多,或者对重复消费数据比较敏感的场景比较适用。
这个方案的过程是这样的,消费先切换。所有迁移到到咱们的Proxy上消费,Proxy从Kafka上获取。这个时候RocketMQ上没有流量。可是咱们的消费Proxy保证了双消费,一旦RocketMQ有流量了,客户端一样也能收到。而后生产方改造客户端,直接切流到RocketMQ中,这样就完成了整个流量迁移过程。运行一段时间,好比Kafka里的数据都过时以后,就能够把消费Proxy上的双消费关了,下掉Kafka集群。
整个过程当中,生产直接切流,因此数据不会重复存储。而后在消费迁移的过程当中,咱们消费Proxy上的group和业务原有的group能够用一个名字,这样就能实现迁移过程当中自动rebalance,这样就能实现没有大量重复数据的效果。因此这个方案对重复消费比较敏感的业务会比较适合的。这个方案的整个过程当中,消费方和生产方都只须要改造一遍客户端,上一次线就能够完成。
说完迁移方案,这里再简单介绍一下,咱们在本身的RocketMQ分支上作的一些比较重要的事情。
首先一个很是重要的一点是主从的自动切换。
熟悉RocketMQ的同窗应该知道,目前开源版本的RocketMQ broker 是没有主从自动切换的。若是你的Master挂了,那你就写不进去了。而后slave只能提供只读的功能。固然若是你的topic在多个主节点上都建立了,虽然不会彻底写不进去,可是对单分片顺序消费的场景,仍是会产生影响。因此呢,咱们就本身加了一套主从自动切换的功能。
第二个是批量生产的功能。
RocketMQ4.0以后的版本是支持批量生产功能的。可是限制了,只能是同一个ConsumerQueue的。这个对于咱们的Proxy服务来讲,不太友好,由于咱们的proxy是有多个不一样的topic的,因此咱们就扩展了一下,让它可以支持不一样Topic、不一样Consume Queue。原理上其实差很少,只是在传输的时候,把Topic和Consumer Queue的信息都编码进去。
第三个,元信息管理的改造。
目前RocketMQ单机可以支持的Topic数量,基本在几万这么一个量级,在增长上去以后,元信息的管理就会很是耗时,对整个吞吐的性能影响相对来讲就会很是大。而后咱们有个场景又须要支持单机百万左右的Topic数量,因此咱们就改造了一下元信息管理部分,让RocketMQ单机可以支撑的Topic数量达到了百万。
后面一些就不过重要了,好比集成了咱们公司内部的一些监控和部署工具,修了几个bug,也给提了PR。最新版都已经fix掉了。
在RocketMQ在使用和运维上的一些经验。主要是涉及在磁盘IO性能不够的时候,一些参数的调整。
1 读老数据的问题
咱们都知道,RocketMQ的数据是要落盘的,通常只有最新写入的数据才会在PageCache中。好比下游消费数据,由于一些缘由停了一天以后,又忽然起来消费数据。这个时候就须要读磁盘上的数据。而后RocketMQ的消息体是所有存储在一个append only的 commitlog 中的。若是这个集群中混杂了不少不一样topic的数据的话,要读的两条消息就颇有可能间隔很远。最坏状况就是一次磁盘IO读一条消息。这就基本等价于随机读取了。若是磁盘的IOPS(Input/Output Operations Per Second)扛不住,还会影响数据的写入,这个问题就严重了。
值得庆幸的是,RocketMQ提供了自动从Slave读取老数据的功能。这个功能主要由slaveReadEnable这个参数控制。默认是关的(slaveReadEnable = false bydefault)。推荐把它打开,主从都要开。这个参数打开以后,在客户端消费数据时,会判断,当前读取消息的物理偏移量跟最新的位置的差值,是否是超过了内存容量的一个百分比(accessMessageInMemoryMaxRatio= 40 by default)。若是超过了,就会告诉客户端去备机上消费数据。若是采用异步主从,也就是brokerRole等于ASYNC_AMSTER的时候,你的备机IO打爆,其实影响不太大。可是若是你采用同步主从,那仍是有影响。因此这个时候,最好挂两个备机。由于RocketMQ的主从同步复制,只要一个备机响应了确认写入就能够了,一台IO打爆,问题不大。
2 过时数据删除
RocketMQ默认数据保留72个小时(fileReservedTime=72)。而后它默认在凌晨4点开始删过时数据(deleteWhen=”04”)。你能够设置多个值用分号隔开。由于数据都是定时删除的,因此在磁盘充足的状况,数据的最长保留会比你设置的还多一天。又因为默认都是同一时间,删除一成天的数据,若是用了机械硬盘,通常磁盘容量会比较大,须要删除的数据会特别多,这个就会致使在删除数据的时候,磁盘IO被打满。这个时候又要影响写入了。
为了解决这个问题,能够尝试多个方法,一个是设置文件删除的间隔,有两个参数能够设置,
deleteCommitLogFilesInterval = 100(毫秒)。每删除10个commitLog文件的时间间隔;
deleteConsumeQueueFilesInterval=100(毫秒)。每删除一个ConsumeQueue文件的时间间隔。
另一个就是增长删除频率,把00-23都写到deleteWhen,就能够实现每一个小时都删数据。
3 索引
默认状况下,全部的broker都会创建索引(messageIndexEnable=true)。这个索引功能能够支持按照消息的uniqId,消息的key来查询消息体。索引文件实现的时候,本质上也就是基于磁盘的个一个hashmap。若是broker上消息数量比较多,查询的频率比较高,这也会形成必定的IO负载。因此咱们的推荐方案是在Master上关掉了index功能,只在slave上打开。而后全部的index查询所有在slave上进行。固然这个须要简单修改一下MQAdminImpl里的实现。由于默认状况下,它会向Master发出请求。