【碰见Doris】基于Apache Doris的小米增加分析平台实践

如下文章来源于小米技术 ,做者蔡聪辉&钟云 https://mp.weixin.qq.com/s/wo6gj1JvrFrm3t-KgGoIkghtml


1 背景

随着小米互联网业务的发展,各个产品线利用用户行为数据对业务进行增加分析的需求愈来愈迫切。显然,让每一个业务产品线都本身搭建一套增加分析系统,不只成本高昂,也会致使效率低下。咱们但愿能有一款产品可以帮助他们屏蔽底层复杂的技术细节,让相关业务人员可以专一于本身的技术领域,从而提升工做效率。经过分析调查发现,小米已有的统计平台没法支持灵活的维度交叉查询,数据查询分析效率较低,复杂查询须要依赖于研发人员,同时缺少根据用户行为高效的分群工具,对于用户的运营策略囿于设施薄弱而较为粗放,运营效率较低和效果不佳。 基于上述需求和痛点,小米大数据和云平台合做开发了增加分析系统(Growing Analytics, 下面简称GA),旨在提供一个灵活的多维实时查询和分析平台,统一数据接入和查询方案,帮助业务线作精细化运营。git

2 增加分析场景介绍

如上图所示,分析、决策、执行是一个循环迭代的过程,所以,增加分析查询很是灵活,涉及分析的维度有几十上百个,咱们没法预先定义好全部要计算的结果,代价过高,因此这也就要求了全部的数据须要即时计算和分析。同时,决策具备时效性,所以数据从摄入到能够查询的时延不能过高。另外,业务发展迅速,须要增长新的分析维度,因此咱们须要可以支持schema的变动(主要是在线增长字段)。 在咱们的业务中,增加分析最经常使用的三个功能是事件分析(占绝大多数)、留存分析和漏斗分析;这三个功能业务都要求针对实时入库(只有append)的明细数据,可以即席选择维度和条件(一般还要join业务画像表或者圈选的人群包),而后在秒级返回结果(业界相关的产品如神策、GrowingIO等都能达到这个性能)。一些只支持提早聚合的预计算引擎(如Kylin),虽然查询性能优秀,但难以支持schema随时变动,众多的维度也会形成Cube存储占用失控,而Hive可以在功能上知足要求,可是性能上较差。 综上,咱们须要存储和计算明细数据,须要一套支持近实时数据摄取,可灵活修改schema和即席查询的数据分析系统解决方案。github

3 技术架构演进

3.1 初始架构数据库

GA立项于2018年年中,当时基于开发时间和成本,技术栈等因素的考虑,咱们复用了现有各类大数据基础组件(HDFS, Kudu, SparkSQL等),搭建了一套基于Lamda架构的增加分析查询系统。GA系统初代版本的架构以下图所示:apache

GA系统涵盖了数据采集、数据清洗、数据查询和BI报表展现等一整套流程。首先,咱们将从数据源收集到的数据进行统一的清洗,以统一的json格式写入到Talos(注:小米自研的消息队列)中。接着咱们使用Spark Streaming将数据转储到Kudu中。Kudu做为一款优秀的OLAP存储引擎,具备支持实时摄取数据和快速查询的能力,因此这里将Kudu做为热数据的存储,HDFS做为冷数据的存储。为了避免让用户感知到冷热数据的实际存在,咱们使用了动态分区管理服务来管理表分区数据的迁移,按期将过时的热数据转化为冷数据存储到HDFS上,而且更新Kudu表和HDFS表的联合视图,当用户使用SparkSQL服务查询视图时,计算引擎会根据查询SQL自动路由,对Kudu表的数据和HDFS表的数据进行处理。 在当时的历史背景下,初代版本的GA帮助咱们用户解决了运营策略较为粗放、运营效率较低的痛点,但同时也暴露了一些问题。首先是运维成本的问题,本来的设计是各个组件都使用公共集群的资源,可是实践过程当中发现执行查询做业的过程当中,查询性能容易受到公共集群其余做业的影响,容易抖动,尤为在读取HDFS公共集群的数据时,有时较为缓慢,所以GA集群的存储层和计算层的组件都是单独搭建的。另外一个是性能的问题,SparkSQL是基于批处理系统设计的查询引擎,在每一个Stage之间交换数据shuffle的过程当中依然须要落盘操做,完成SQL查询的时延较高。为了保证SQL查询不受资源的影响,咱们经过添加机器来保证查询性能,可是实践过程当中发现,性能提高的空间有限,这套解决方案并不能充分地利用机器资源来达到高效查询的目的,存在必定的资源浪费。所以,咱们但愿有一套新的解决方案,可以提升查询性能和下降咱们的运维成本。json

3.2 从新选型缓存

MPP架构的SQL查询引擎,如Impala,presto等可以高效地支持SQL查询,可是仍然须要依赖Kudu, HDFS, Hive Metastore等组件, 运维成本依然比较高,同时,因为计算存储分离,查询引擎不能很好地及时感知存储层的数据变化,就没法作更细致的查询优化,如想在SQL层作缓存就没法保证查询的结果是最新的。所以,咱们的目标是寻求一款计算存储一体的MPP数据库来替代咱们目前的存储计算层的组件。咱们对这款MPP数据库有以下要求:服务器

  1. 足够快的查询性能。
  2. 对标准SQL支持较全面,用户使用友好。
  3. 不依赖其余外部系统,运维简单。
  4. 社区开发活跃,方便咱们后续的维护升级。 Doris是百度开源到Apache社区的基于 MPP 的交互式 SQL 数据仓库, 主要用于解决报表和多维分析。它主要集成了 Google Mesa 和 Cloudera Impala 技术,知足了咱们的上述要求。咱们对Doris进行了内部的性能测试并和社区沟通交流,肯定了Doris替换原来的计算存储组件的解决方案,因而咱们新的架构就简化为以下图所示:

3.3 性能测试微信

在配置大致相同计算资源的条件下,咱们选取了一个日均数据量约10亿的业务,分别测试不一样场景下(6个事件分析,3个留存分析,3个漏斗分析),不一样时间范围(一周到一个月)的SparkSQL和Doris的查询性能。网络

如上图测试结果,在增加分析的场景下,Doris查询性能相比于SparkSQL+Kudu+HDFS方案具备明显的提高,在事件分析场景下平均下降约85%左右的查询时间,在留存和漏斗场景下平均下降约50%左右的查询时间。对于咱们咱们业务大多数都是事件分析需求来说,这个性能提高很大。

4 Doris实践与优化

4.1 Doris在增加分析平台的使用状况

随着接入业务的增多,目前,咱们的增加分析集群单集群最大规模已经扩展到了近百台,存量数据到了PB级别。其中,近实时的产品线做业有数十个,天天有几百亿条的数据入库,每日有效的业务查询SQL达1.2w+。业务的增多和集群规模的增大,让咱们也遇到很多问题和挑战,下面咱们将介绍运维Doris集群过程当中遇到的一些问题和应对措施或改进。

4.2 Doris数据导入实践

Doris大规模接入业务的第一个挑战是数据导入,基于咱们目前的业务需求,数据要尽量实时导入。而对于增加分析集群,目前有数十个业务明细数据表须要近实时导入,这其中还包含了几个大业务(大业务天天的数据条数从几十亿到上百亿不等,字段数在200~400)。为了保证数据不重复插入,Doris采用label标记每批数据的导入,并采用两阶段提交来保证数据导入的事务性,要么所有成功,要么所有失败。为了方便监控和管理数据导入做业,咱们使用Spark Streaming封装了stream load操做,实现了将Talos的数据导入到Doris中。每隔几分钟,Spark Streaming会从Talos读取一个批次的数据并生成相应的RDD,RDD的每一个分区和Talos的每一个分区一一对应,以下图所示:

对于Doris来讲,一次stream load做业会产生一次事务,Doris的fe进程的master节点会负责整个事务生命周期的管理,若是短期内提交了太多的事务,则会对fe进程的master节点形成很大的压力。对于每一个单独的流式数据导入产品线做业来讲,假设消息队列一共有m个分区,每批次的每一个分区的数据导入可能执行最多n次stream load操做,因而对消息队列一个批次的数据的处理就可能会产生m*n次事务。为了Doris的数据导入的稳定性,咱们把Spark Streaming每批次数据的时间间隔根据业务数据量的大小和实时性要求调整为1min到3min不等,并尽可能地加大每次stream load发送的数据量。

在集群接入业务的初期,这套流式数据导入Doris的机制基本能平稳运行。可是随着接入业务规模的增加,问题也随之而来。首先,咱们发现某些存了数日数据的大表频繁地出现数据导入失败问题,具体表现为数据导入超时报错。通过咱们的排查,肯定了致使数据导入超时的缘由,因为咱们使用stream load进行数据导入的时候,没有指定表的写入分区(这里线上的事件表都是按天进行分区),有的事件表已经保留了三个多月的数据,而且天天拥有600多个数据分片,加上每张表默认三副本保存数据,因此每次写入数据以前都须要打开约18万个writer,致使在打开writer的过程当中就已经超时,可是因为数据是实时导入,其余天的分区没有数据写入,因此并不须要打开writer。定位到缘由以后,咱们作了相应的措施,一个是根据数据的日期状况,在数据导入的时候指定了写入分区,另外一个措施是缩减了天天分区的数据分片数量,将分片数据量从600+下降到了200+(分片数量过多会影响数据导入和查询的效率)。经过指定写入数据分区和限制分区的分片数量,大表也能流畅稳定地导入数据而不超时了。

另外一个困扰咱们的问题就是须要实时导入数据的业务增多给fe的master节点带来了较大的压力,也影响了数据导入的效率。每一次的 stream load操做,coordinator be节点都须要屡次和fe节点进行交互,以下图所示:

曾经有段时间,咱们发现master节点偶尔出现线程数飙升,随后cpu load升高, 最后进程挂掉重启的状况。咱们的查询并发并非很高,因此不太多是查询致使的。但同时咱们经过对max_running_txn_num_per_db参数的设置已经对数据导入在fe端作了限流,因此为什么fe的master节点的线程数会飙升让咱们感到比较奇怪。通过查看日志发现,be端有大量请求数据导入执行计划失败的日志。咱们的确限制住了单个db可以容许同时存在的最大事务数目,可是因为fe在计算执行计划的时候须要获取db的读锁,提交和完成事务须要获取db的写锁,一些长尾任务的出现致使了好多计算执行计划的任务都堵塞在获取db锁上边,这时候be客户端发现rpc请求超时了,因而当即重试,fe端的thirft server须要启动新的线程来处理新的请求,可是以前的事务任务并无取消,这时候积压的任务不断增多,最终致使了雪崩效应。针对这种状况,咱们对Doris主要作了如下的改造:

  1. 在构造fe的thrift server的线程池时使用显式建立线程池的方式而非原生的newCachedThreadPool方式,对线程数作了相应的限制,避免由于线程数飙升而致使资源耗尽,同时添加了相应的监控。
  2. 当be对fe的rpc请求超时时,大部分状况下都是fe没法在指定时间内处理完请求致使的,因此在重试以前加上缓冲时间,避免fe端处理请求的堵塞状况进一步恶化。
  3. 重构了下GlobalTransactionMgr的代码,在保持兼容原有接口的基础上,支持db级别的事务隔离,尽可能减小不一样事务请求之间的相互影响,同时优化了部分事务处理逻辑,加快事务处理的效率。
  4. 获取db锁添加了超时机制,若是指定时间内获取不到db锁,则取消任务,由于这时候be端的rpc请求也已经超时了,继续执行取消的任务没有意义。
  5. 对coordinator be每一步操做的耗时添加metric记录,如请求开始事务的耗时,获取执行计划的耗时等,在最终的执行结果中返回,方便咱们及时了解每一个stream load操做的耗时分布。 通过以上改造,咱们数据导入稳定性有了比较好的提高,至今再没发生过由于fe处理数据导入事务压力过大而致使master节点挂掉的问题。可是数据导入依然存在一些问题待改进:
  6. be端使用了libevent来处理http请求,使用了Reactor模式的libevent通常是编写高性能网络服务器的首选,可是这里却不适用于咱们的场景,Doris在回调函数中屡次地调用包含阻塞逻辑的业务代码,如rpc请求,等待数据分发完成等,因为多个请求共用同一个线程,这将部分请求的回调操做不能获得及时的处理。目前这块咱们并无好的解决方法,惟一的应对措施只是调大了libevent的并发线程数,以减弱不一样请求之间的相互影响,完全的解决方案仍有待社区的进一步讨论。
  7. fe端在更新表的分区版本时采用了db级别的隔离,这个锁的粒度过大,致使了相同db不一样表的数据导入都要竞争db锁,这大大下降了fe处理事务的效率。
  8. 发布事务的操做如今依然比较容易出现publish timeout的问题(这意味着没法在指定时间内获得大多数事务相关be节点完成发布事务操做的响应),这对数据导入的效率提高是一个比较大的阻碍。

4.3 Doris在线查询实践

在增加分析业务场景中,事件表是咱们的核心表,须要实时导入明细日志。这些事件表没有聚合和去重需求,并且业务需求是可以查询明细信息,因此都选用了冗余模型(DUPLICATE KEY)。事件表根据天级别分区,分桶字段使用了日志id字段(其实是一个随机产生的md5),其hash值可以保证分桶之间数据均匀分布,避免数据倾斜致使的写入和查询问题。 下图是咱们线上规模最大的集群最近30天的查询性能统计(查询信息的统计来自于Doris的查询审计日志),最近一周天天成功的SQL查询数在1.2w~2w之间。

从图中能够看出,使用了Doris后,平均查询时间保持在10秒左右,最大不超过15秒;查询时间P95通常能保证在30秒内。这个查询体验,相对于原来的SparkSQL,提高效果比较明显。 Doris提供了查询并发度参数parallel_fragment_exec_instance_num,查询服务端根据正在运行的任务个数动态调整它来优化查询,低负载下增长并发度提升查询性能,高负载下减小并发度保证集群稳定性。在分析业务查询profile时,咱们发现Doris默认执行过程当中exchange先后并发度是同样的,实际上对于聚合型的查询,exchange后的数据量是大大减小的,这时若是继续用同样的并发度不只浪费了资源,并且exchange后较少数据量用较大的并发执行,理论上反而下降了查询性能。所以,咱们添加了参数doris_exchange_instances控制exchange后任务并发度(以下图所示),在实际业务测试中取得了较好的效果。

这个对数据量巨大的业务或者exchange后不能明显下降数据量级的查询并不明显,可是这个对于中小业务(尤为是那些用了较多bucket的小业务)的聚合或join查询,优化比较明显。咱们对不一样数量级业务的测试,也验证了咱们的推断。咱们选取了一个数据量4亿/日的小业务,分别测试了不一样场景下查询性能:

从上图结果能够看出,doris_exchange_instances对于聚合和join类型的小查询改进明显。固然,这个测试是在不少次测试以后找到的最优doris_exchange_instances值,在实际业务中每次都能找到最优值可行性较低,通常对于中小业务根据查询计划中须要扫描的buckets数目结合集群规模适当下降,用较小的代价得到必定性能提高便可。后来咱们将这个改进贡献到社区,该参数名被修改成parallel_exchange_instance_num。 为了扩展SQL的查询能力,Doris也提供了和SparkSQL,Hive相似的UDF(User-Defined Functions)框架支持。当Doris内置函数没法知足用户需求时,用户能够根据Doris的UDF框架来实现自定义函数进行查询。Doris支持的UDF分红两类(目前不支持UDTF,User-Defined Table-Generating Functions,一行数据输入对应多行数据输出),一类是普通UDF,根据单个数据行的输入,产生一个数据行的输出。另外一类是UDAF(User-Defined Aggregate Functions),该类函数属于聚合函数,接收多个数据行的输入并产生一个数据行的输出。UDAF的执行流程以下图所示:

UDAF通常须要定义4个函数,分别为Init、Update、Merge、Finalize函数,若为中间输出的数据类型为复杂数据类型时,则还须要实现Serialize函数,在Shuffle过程当中对中间类型进行序列化,并在Merge函数中对该类型进行反序列化。在增加分析场景中,留存分析、漏斗分析等都使用到了UDAF。以留存分析为例,它是一种用来分析用户参与状况/活跃程度的分析模型,考查进行初始行为后的用户中有多少人会进行后续行为。针对以上需求,咱们首先定义了函数retention_info,输入是每一个用户的行为信息,而后以每一个用户的id为key进行分组,生成每一个用户在指定时间内的每一个时间单元(如天,周,月等)的留存信息,而后定义函数retention_count,输入是retention_info函数生成的每一个用户的留存信息,而后咱们以留存的时间单位(这里一般是天)为key进行分组,就能够算出每一个单位时间内留存的用户数。这样在UDAF的帮助下,咱们就能够顺利完成留存分析的计算。

4.4 Doris表的管理

在咱们的增加分析场景中,从是否分区的角度上看,Doris的olap表主要分红两种类型,一种是非分区表,如人群包和业务画像表,人群包表的特征是数据量较小,可是表的数量多;业务画像表数据量较少,数据量中等,但有更新需求。另外一种是分区表,如事件表,这类表通常单表数据规模都比较大,在设计上,咱们以时间字段为分区键,须要天天增长为表添加新的分区,使得实时的数据可以成功地导入当天的分区,而且须要及时地删掉过时的分区。显然,让每一个业务本身去管理表的分区,不只繁琐,并且可能出错。在咱们原先的GA架构中,就有动态分区管理服务,使用Doris系统后,咱们将动态分区管理服务集成到了Doris系统中,支持用户按天、周、月来设置须要保留的分区个数以及须要提早建立的分区数量。 另外一个表管理的典型场景是修改表的schema,主要操做为增长表的字段。Doris现阶段只支持一些基本数据类型,在大数据场景下业务打点上报的日志的数据类型多为嵌套类型(list,map),因此接入Doris时须要展开或者转换,致使Doris表字段数目较为庞大,部分类型字段展开困难不得不用varchar存储致使使用起来很是不方便,查询性能也相对低下。因为Doris不支持嵌套数据类型,当嵌套类型新增元素时,则Doris表须要增长字段,从提交增长字段请求到添加字段成功等待的时间较长,当集群管理的tablet数目庞大而且表的数据量和tablet数目都比较多的状况下可能会出现添加列失败的问题。针对以上问题,目前咱们主要作了如下两点改进:

  1. 缩短用户提交修改schema请求到真正执行的等待时长,当系统建立一个修改表的schema的事务的时候,原先的设计是要等待同一个db的全部大于该事务id号的事务都完成了才能开始修改表的schema,咱们修改成等待与该表有关且在该事务id号以前的全部事务完成便可修改表的schema。当同一个db的数据导入做业不少的时候,这个修改能够大大缩短修改schema的等待时间,也避免了其余表的一些数据导入故障问题可能致使修改表schema的操做迟迟不能执行。
  2. 加快建立表包含新的schema的tablet的速度。Doris修改schema的原理是经过建立包含新的schema的tablet,而后将旧的tablet的数据迁移到新的tablet来完成schema的修改。be节点经过一个map的数据结构来管理全部该节点上的tablet。因为这里只有一把全局锁,当tablet数量很是多的时候,一些管理tablet的操做都要去获取全局锁来对tablet进行操做,此时会致使建立新的tablet超时,使得修改schema的操做失败。针对这种状况,咱们对map和全局锁作了shard操做,避免了建立tablet超时状况的发生。

5 总结与展望

Doris在小米从2019年9月上线接入第一个业务至今,已经在海内外部署近十个集群(整体达到几百台BE的规模),天天完成数万个在线分析查询,承担了咱们包括增加分析和报表查询在内的大多数在线分析需求。从结果上来看,用Doris替换SparkSQL做为主要OLAP引擎,既大幅度提升查询性能,又简化了目前的数据分析架构,是Doris基于明细数据查询的大规模服务的一个比较成功的实践。 在接下来的一段时间内,咱们将继续投入精力提高数据实时导入效率和优化整体的查询性能,因为公司内部有很多业务有使用UNIQUE KEY模型的需求,目前该模型与DUPLICATE KEY模型的scan性能相比仍是有比较明显的差距,这块也是将来咱们须要重点解决的性能问题。

6 做者简介

蔡聪辉,小米OLAP工程师,Apache Doris Committer

钟云,小米大数据工程师


关于 Apache Doris(Incubating)

Apache Doris(Incubating) 一款基于大规模并行处理技术的交互式SQL分析数据库,由百度于2018年贡献给 Apache 基金会,目前在 Apache 基金会孵化器中。

Doris 官方网站: http://doris.incubator.apache.org/master/zh-CN/
Doris Github: https://github.com/apache/incubator-doris
Doris Gitee 镜像: https://gitee.com/baidu/apache-doris
Doris 开发者邮件组:【 如何订阅】
Doris 微信公众号: