PINGO是一个由百度大数据部与百度美国研发中心合做而成的分布式交换查询平台。在PINGO以前,百度的大数据查询做业主要由基于Hive的百度QueryEngine去完成。QueryEngine很好的支持着百度的离线计算任务,但是它对交互式的在线计算任务支持并很差。为了更好的支持交互式任务,咱们在大约一年前设计了基于SparkSQL与Tachyon的PINGO的雏形。在过去一年中, 经过跟不一样业务的结合,PINGO逐渐的演变成一套成熟高效的交互式查询系统。本文将详细介绍PINGO的架构迭代过程以及性能评估。web
PINGO设计目标缓存
QueryEngine是基于Hive的百度内部的大数据查询平台,这套系统在过去几年中较好的支撑了百度的相关业务。 图1展现了QueryEngine的架构图,其服务入口叫作Magi。用户向Magi提交查询请求, Magi为此次请求分配一个执行机, 执行机会调用Hive读取Meta信息并向Hadoop队列提交任务. 在这一过程当中, 用户须要自行提供计算须要的队列资源。随着近几年对大数据平台的要求愈来愈高, 咱们在使用QueryEngine过程当中也发现了一些问题:首先QueryEngine须要由用户提供计算资源, 这使得数据仓库的用户须要去了解Hadoop以及相关的管理流程, 这增长了用户使用数据的门槛。 第二, 对于不少小型计算任务而言, MR的任务的起动时间也较长, 每每用户的计算任务只须要1分钟, 可是排队/提交任务就须要超过一分钟甚至更长的时间。这样的结果是,QueryEngine虽然很好的支持线下执行时间长的任务,可是对线上的一些交换式查询任务(要求延时在一到两分钟内)确是无能为力。架构
为了解决这些问题, 在大约一年前,咱们尝试在离线计算的技术栈上搭建起一套具备在线服务属性的SQL计算服务 PINGO。如图2所示: PINGO使用了SparkSQL为主要的执行引擎, 主要是由于Spark具备下面的特色:框架
内存计算:Spark以RDD形式把许多数据存放在内存中,尽可能减小落盘,从而提高了计算性能。运维
可常驻服务:Spark能够帮助实现常驻式的计算服务, 而传统的Hadoop作不到这一点。常驻式的计算服务有助于下降数据查询服务的响应延迟。机器学习
机器学习支持:对于数据仓库的使用, 不该仅仅局限于SQL类的统计任务。 Spark的机器学习库能够帮助咱们未来扩展数据仓库, 提供的交互式的数据挖掘功能。分布式
计算功能多元:虽然PINGO是一个查询服务, 不过仍然有其余类型的计算需求, 如数据预处理等。 使用Spark可使咱们用一个计算引擎完成全部的任务, 简化系统的设计。ide
PINGO系统迭代oop
在过去一年中,PINGO从一个雏形开始,经过跟不一样业务的结合,逐渐的演变成一套成熟高效的系统。中间经历过几回的架构演变,在本章中,咱们将详细介绍PINGO的迭代过程。性能
PINGO第一版的目标是提高性能,让其能支持交互式查询的任务。因为Spark是基于内存的计算引擎,相对于Hive有必定的性能优点, 因此第一步咱们选择使用Spark SQL。为求简单,最初的服务是搭建在Spark Standalone集群上的。咱们知道, Spark在Standalone模式下是不支持资源伸缩的, 一个Spark Application在启动的时候会根据配置获取计算资源。 这时就算一个查询请求只有一个Task还在运行, 该Application所占用的全部资源都不能释放。好在一个Spark Application能够同时运行多个Job, 每一个Job都可以’平分’其所占用的计算资源。
基于上面的考虑, 如图3所示,咱们利用Spark的Standalone集群搭建了初版PINGO服务. 咱们的主服务节点叫作Operation Manager, 它自己也是一个Spark Driver。全部用户的请求都会从Magi Service发送到Magi Worker, 再由Magi Worker分配给Operation Manager, 经过Operation Manager在Spark集群上执行。
经过PINGO 1.0, 咱们把数据仓库的计算引擎从Hive/MR转变成了Spark。 不过, 单纯的替换计算引擎, 也许能够把查询的性能提高1-2倍, 可是并不会使查询的速度有数量级上的提升. 想要从本质上提升查询速度, 咱们还须要改进存储。对此咱们设计了PINGO 1.1, 加入了以Tachyon为载体的缓存系统,并在缓存层上搭建了缓存管理系统ViewManager, 进一步提高系统性能。
不少快速的查询系统都将Parquet之类列存储格式和分布式KeyValue存储引擎结合起来, 经过创建索引/物化视图等手段加速SQL查询的速度. 经过将部分查询条件下推到存储引擎, 某些SQL查询语句甚至能够被提速至100倍以上。然而咱们的数据都存储在旧版本的Hive数据仓库中, 文件系统被限定为HDFS, 数据格式通常为低版本的ORC File, 也有很多表是ORCFile或者文本。 由于迁移成本/数据上下游依赖等兼容性等缘由, 咱们没有办法更改Hive数据仓库自己的存储。
不过根据局部性原理, 任何数据访问都有热点. 咱们能够创建缓存系统, 在缓存系统中应用最新的文件系统和存储格式. 经过把热点输入经过预加载的方式导入到缓存系统, 咱们能够加速整个系统的访问性能.为此, 咱们设计了如下的系统架构:
在这个架构中, 咱们引入了一个新模块 ViewManager, 该模块负责管理缓存系统。 它的主要功能是识别热点数据, 将数据导入到缓存系统中, 并在查询时改变SQL的执行过程, 使得Spark从缓存而不是原始位置读取数据。在这个架构下,当一个Query进来时,会先向OperationManager请求。当接受到请求后,OperationManager会向ViewManager查询数据是否已经在缓存中。若是已经在缓存,数据将被直接返回给用户, Query完成。若是数据不在缓存中,OperationManager会向底层Data Warehouse发起请求, 等数据到达时返回給用户。同时,ViewManager也会向底层Data Warehouse发起请求, 等数据到达时创建一个Cache Entry, 这样的话下次一样Query进来时,就会从缓存读数据。注意这里咱们OperationManager与ViewManager对会向底层Data Warehouse的数据读取是两个独立的STREAM, 这样保证二者不会互相干扰。
那PINGO又是如何动态去读取缓存中或者底层Data Warehouse的数据的呢?毕竟在Query Plan中,数据地址是写死的。 为了可以让缓存对用户透明, 咱们在SparkSQL上作了一些改进, 以下图所述
Spark中利用Catalyst框架来执行SQL语句。 对于Catalyst框架来讲, 其输入是Unresolved Logical Plan, 这能够简单理解为SQL语句的结构化描述。 Catalyst调用Catalog来向MetaService查询表的元数据, Catalog返回MetastoreRelation来表明Hive表, 其中含有读取该表的全部必要信息,以便后续的解析和优化。 而在执行时, Catalyst会把MetastoreRelation转化为HiveTableScan, 来完成对数据的读取。
为了实现对缓存的透明利用, 咱们在其中两个地方了作了扩展。 首先咱们在Catalog中为咱们缓存的表返回CachableRelation来代替MetastoreRelation。 而在将LogicalPlan转变为真正执行的PhysicalPlan时, 咱们会把CachableRelation翻译为两种TableScan的Union, 一个针对那些被缓存住的数据, 另一个针对那些没有被缓存住的数据。
经过这种方式, 咱们可以作到在用户不感知的状况下, 完成对数据仓库存储层的替换和优化。 目前咱们作的仅仅是改变存储的文件系统和格式, 未来也会将一样的实现思路应用到索引和物化视图上。
PINGO 1.1服务很好的提升了系统性能, 可是在运营了一段时间以后, 咱们逐渐感受到Spark的集群管理问题正在成为整个系统的瓶颈。这具体表如今两个方面
咱们的整个服务实际上是一个Spark Application, 服务的主节点同时是Spark的Driver。 而咱们知道, Spark并不以高可靠性见长, 咱们在实践中也发现把全部的计算压力放到单个Spark Application容易致使比较严重的GC问题和负载问题。 而当Spark出问题须要重启的时候, 咱们的整个服务也就暂停了。
单一Spark集群的架构没法适应多机房的基础设施。 百度内部的离线集群规模仍是很大的, 机房分布在全国的多个地区。 这样, 咱们的服务可以获取的资源每每来自多个机房, 而多个机房的机器是不适合组成一个Spark集群的。 另外, 当集群读取其余机房的数据时, 带宽每每成为整个计算任务的瓶颈。
发现这些问题是好事,说明系统已经逐渐成熟,开始往怎么更好的运维的方向发展了。 为了解决上面的问题, 咱们对系统进行了进一步的迭代. 迭代后的架构以下图所示:
在这一版架构中, 咱们将PINGO的服务和调度功能独立出来, 与真正执行计算的部分剥离。 支持了单一查询入口PinoMaster进行调度, 多个集群Pingo Applicatoin执行计算的多集群功能。 PingoMaster同时维护多个Spark Application。 当一个Query到来时, 咱们会根据集群的归属/存储的位置选择一个最优的Application执行任务。 另外, 这一版本的PINGO还支持在yarn集群上起动Application。 百度内部有本身的资源管理系统, 提供了和yarn兼容的接口. 经过迁移PINGO服务到yarn上避免了本来Standalone版本须要的不少运维工做, 而且能够经过按期重启Application来解决Spark的可靠性问题。
为了可以在PINGO中实现更好的调度策略, 咱们也对Spark SQL进行了深度扩展。
当Pingo收到一个Query后, 咱们在Master端就完成对这条Query的分析和部分优化, 并将这些信息保存到QueryContext中。 当SQL在Application端真正执行的时候, 咱们从Master端而不是Meta端拿到执行所须要的信息. 这样作的好处是能够在根据Query的内容来作出调度. 基于这个执行流程, 目前咱们开发了两个比较有效的调度策略:
根据数据的存储位置进行调度。 由于咱们在Master端就可以知道Query所需数据的存储位置, 因此能够选择合适的PingoApplication来执行这条Query。
根据数据量大小进行调度. 如上文所说, 一个Spark Aplication能够支持同时运行多个Job, 可是在不少大型的Job同时运行在一个Application又会形成FullGC等稳定性问题. 咱们会根据Query输入量的大小, 为大型Query单独启动Application, 而让小Query复用同一个Application。 这样的好处是对于多数的小Query , 咱们节省了起动Application的开销, 而又避免了大Query形成的稳定性问题。
PINGO性能评估
在上一章中,咱们详细介绍了PINGO架构的迭代,在本章中,咱们重点看一下PINGO的性能。如图8所示,首先咱们对比了使用Hive以及使用Spark做为计算引擎的性能。 这里的Benchmark选取的Query是百度内部交互式数据分析的典型负载, 主要是Join/Aggregate等操做, 输入的数据量从100M到2T左右.咱们能够看到, Spark相比Hive有较大的性能优点。在比较大比较复杂的Query中, Spark取得了2到3倍的加速比。注意在这个对比中,内存资源有限,用的是64GB内存的机器,不少Spark的数据被迫落盘, 若是有更多内存,加速比会更高。
下一步咱们了解一下加了缓存后的性能, 在这个实验中,咱们使用了192GB的机器,有更多的内存去支持缓存以及内存计算。如图9所示,在这个环境下,使用Spark相对于Hive有大概5到7倍的提速。 在加入了Tachyon后,相对于Hive无缓存的状况有30到50倍的提速。咱们相信在将来的几年内,内存价格会大幅下降,内存计算会变成主流,因此使用内存作缓存是一个正确的方向。
PINGO服务目前被应用在交互式查询场景中, 旨在为PM和RD提供快速的数据分析服务。 图10展现了在一样的计算资源下, 在生产环境使用PINGO先后的Query执行时间分布图。注意,在这个生产环境中,咱们用的是64GB内存的机器, 为了提供足够的缓存空间,咱们使用了Tachyon的Tiered Storage功能,让缓存分布在内存以及本地磁盘中。 咱们能够看到, 在传统的基于Hive+MR的服务模式下, 只有1%左右的Query可以在两分钟内完成. 而采用了基于Spark的PINGO服务, 有50%+的Query能够在两分钟内执行完成。 可以取得这样的加速效果, 部分缘由是Spark自己的执行效率比Hive要高一些。这个自己还有很大的优化空间,好比若是咱们使用内存缓存的话,执行时间能够轻易的压缩到30秒内。
结论和展望
通过过去一年的迭代与发展,PINGO已经很好的支持了百度内部的交互式查询业务。经过PINGO,不少查询的延时从之前的30到40分钟,下降到了2分钟之内,很大的提升了查询者的工做效率。从此PINGO的发展将朝着更好用,已经更快两个方向发展。为了让PINGO更好用,咱们正在PINGO之上开发UI, 让用户与图形界面交互(而不是经过输入SQL Query)就能轻易的查询到想要的数据。为了让PINGO更快,更有交互性,咱们但愿能够把90%以上的Query 时间下降到30秒如下。第一步就是要扩大咱们的Cache的覆盖率与性能,设计出更好的缓存预取以及缓存替换机制,而且加速Cache读取的延时。第二步,咱们也经过硬件加速的方法使用FPGA加速某些经常使用的SQL Operator。就在最近,咱们对SQL的JOIN Operator进行了FPGA硬件加速,相对CPU取得了3倍的加速比。咱们也会很快把这项技术使用到PINGO上。