EMR Spark Relational Cache的执行计划重写

做者:王道远,花名健身, 阿里巴巴计算平台EMR技术专家。

背景

EMR Spark提供的Relational Cache功能,能够经过对数据模型进行预计算和高效地存储,加速Spark SQL,为客户实现利用Spark SQL对海量数据进行即时查询的目的。Relational Cache的工做原理相似物化视图,在用户提交SQL语句时对语句进行分析,并选出可用的预计算结果来加速查询。为了实现高效地预计算结果复用,咱们构建的预计算缓存通常都较为通用,所以对于用户query,还需进行进一步的计算方能得到最终结果。所以,如何快速地找出匹配的缓存,并构建出准确的新执行计划,就显得尤其重要。nginx

在Hive 3.x中支持的Materialized View,利用了Apache Calcite对执行计划进行重写。考虑到Spark SQL使用Catalyst进行执行计划优化,引入Calcite过重,所以EMR Spark中的Relational Cache实现了本身的Catalyst规则,用于重写执行计划。本文将介绍执行计划重写的相关内容。sql

执行计划重写

准备工做

Spark会把用户查询语句进行解析,依次转化为Unresolved Logical Plan(未绑定的逻辑计划)、Resolved Logical Plan(绑定的逻辑计划)、Optimized Logical Plan(优化的逻辑计划)、Physical Plan(物理计划)。其中,未优化的逻辑计划根据用户查询语句不一样,会有较大区别,而Relational Cache做为优化的一部分,放在逻辑计划优化过程当中也较为合适,所以咱们拿到的用户查询计划会是优化中的逻辑计划。要与优化中的逻辑计划匹配,咱们选择把这个重写过程放在Spark优化器比较靠后的步骤中,同时,预先将Relational Cache的逻辑计划进行解析,得到优化后的Cache计划,减少匹配时的复杂程度。这样,咱们只需匹配作完了谓词下推、谓词合并等等优化以后的两个逻辑计划。数据库

基本过程

在匹配时,咱们但愿能尽量多得匹配计算和IO操做,所以,咱们对目标计划进行前序遍历,依次进行匹配,尝试找到最多的匹配节点。而在判断两个节点是否匹配时,咱们采用后序遍历的方式,但愿尽快发现不匹配的状况,减小计划匹配的执行时间。而后咱们会根据匹配结果,对计划进行重写,包括对于Cache数据进行进一步的Filter、Project、Sort甚至Aggregate等操做,使其与匹配节点彻底等价,而后更新逻辑计划节点的引用绑定,无缝替换到逻辑计划中,这样就能轻松得到最终的重写后的计划。缓存

Join匹配

Spark中的Join都是二元操做,而实际的Join顺序可能根据一些策略会有很大区别,所以对于Join节点,必须进行特殊处理。咱们会首先将逻辑计划进行处理,根据缓存计划的Join顺序进行Join重排。这一步在树状匹配以前就进行了,避免不断重复Join重排带来的时间浪费。重排后的Join能够更大几率地被咱们匹配到。微信

为了实现Cache的通用性,根据星型数据模型的特色,咱们引入了Record Preserve的概念。这和传统数据库中的Primary Key/Foreign Key的关系较为相似,当有主键的表与非空外键指向的表在外键上进行Join时,记录的条数不会变化,不会膨胀某条记录,也不会丢失某条记录。PK/FK的语意在大数据处理框架中常常缺失,咱们引入了新的DDL让用户自定义Record Preserve Join的关系。当用户定义A Inner Join B是对于A表Record Preserve时,咱们也会把A Inner Join B和A的关系匹配起来。有了PK/FK的帮助,咱们能匹配上的状况大大增长了,一个Relational Cache能够被更多看似区别巨大的查询共享,这能够很好的为用户节约额外的存储开销和预计算开销。app

Aggregate匹配

通常的Aggregate匹配较为简单,而Spark支持的Grouping Set操做,会构建出Expand逻辑计划节点,至关于把一条记录转为多条,使用Grouping ID进行标记。因为Expand的子节点是全部Grouping的状况共用的,这里咱们只对子节点进行一次匹配,再分别进行上面的Grouping属性和Aggregate属性的匹配。主要是验证目标聚合所需的属性或者聚合函数都能从某个Grouping ID对应的聚合结果中计算出来,好比粗粒度的Sum能够对细粒度的Sum进行二次Sum求和,而粗粒度的Count对细粒度的Count也应经过二次Sum求和,粗粒度的Average没法仅从细粒度的Average中还原出来等等。框架

计划重写

找出匹配的逻辑计划以后,就是重写逻辑计划的过程。对于无需二次聚合的逻辑计划,直接根据缓存数据的schema,从缓存数据的Relation中选择所需列,根据条件过滤后,进行后续操做。若是还需二次聚合,选择所需列时需保留外部要用的全部列,以及聚合时须要的列,还有聚合函数须要的数据。二次聚合的聚合函数须要根据实际状况进行重写,确保能使用Relational Cache中已经初步聚合的结果。这里面须要根据聚合的语意判断是否可以二次聚合。若是时Grouping Set的聚合,二次聚合以前还需选择正确的Grouping ID进行过滤。通过二次聚合后,步骤大致和普通的重写一致,只需替换到目标计划中便可。函数

结果

咱们以一个例子来具体说明逻辑计划的重写结果。Star Schema Benchmark(论文连接https://www.cs.umb.edu/~poneil/StarSchemaB.pdf)是星型模型数据分析的一个标准Benchmark,其结构定义如图所示:性能

咱们构建Relational Cache的SQL语句以下:测试

SELECT GROUPING_ID() AS grouping_id, lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum, SUM(lo_revenue) AS lo_revenue_SUM, SUM(lo_supplycost) AS lo_supplycost_SUM, SUM(V_REVENUE) AS V_REVENUE_SUMFROM supplier, p_lineorder, dates, customer, partWHERE lo_orderdate = d_datekey AND lo_custkey = c_custkey AND lo_suppkey = s_suppkey AND lo_partkey = p_partkeyGROUP BY lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum GROUPING SETS ((d_year, d_weeknuminyear, lo_discount, lo_quantity), (d_year, lo_discount, lo_quantity), (lo_discount, lo_quantity), (d_yearmonthnum, lo_discount, lo_quantity), (d_year, p_category, p_brand, s_region), (d_year, p_category, s_region), (d_year, s_region), (d_year, s_region, c_region, s_nation, c_nation), (d_year, s_city, c_city, s_nation, c_nation), (d_year, s_city, c_city), (d_year, d_yearmonth, s_city, c_city), (d_year, s_region, c_region, c_nation, p_mfgr), (d_year, s_region, s_nation, c_region, p_mfgr, p_category), (d_year, s_nation, s_city, c_region, p_brand, p_category, p_brand), (d_year, s_nation, s_city, c_region, p_brand, p_category), (d_year, s_nation, s_city, c_region, p_category, p_brand))

咱们从中选出一条查询做为示例。具体查询语句:

select c_city, s_city, d_year, sum(lo_revenue) as revenue from customer, lineorder, supplier, dates where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey and c_nation = 'UNITED KINGDOM' and (c_city='UNITED KI1' or c_city='UNITED KI5') and (s_city='UNITED KI1' or s_city='UNITED KI5') and s_nation = 'UNITED KINGDOM' and d_yearmonth = 'Dec1997' group by c_city, s_city, d_year order by d_year asc, revenue desc

原始逻辑计划以下所示:

Sort [d_year#47 ASC NULLS FIRST, revenue#558L DESC NULLS LAST], true+- Aggregate [c_city#22, s_city#39, d_year#47], [c_city#22, s_city#39, d_year#47, sum(cast(lo_revenue_SUM#773L as bigint)) AS revenue#558L] +- Filter ((((((((isnotnull(s_nation#40) && ((s_city#39 = UNITED KI1) || (s_city#39 = UNITED KI5))) && (s_nation#40 = UNITED KINGDOM)) && isnotnull(d_yearmonth#49)) && (d_yearmonth#49 = Dec1997)) && isnotnull(c_nation#23)) && (c_nation#23 = UNITED KINGDOM)) && ((c_city#22 = UNITED KI1) || (c_city#22 = UNITED KI5))) && (grouping_id#662 = 19322)) +- Relation[grouping_id#662,lo_discount#759,s_city#39,c_city#22,p_category#762,lo_quantity#763,d_weeknuminyear#764,s_nation#40,s_region#766,p_mfgr#767,c_region#768,p_brand1#769,c_nation#23,d_yearmonthnum#771,d_yearmonth#49,lo_revenue_SUM#773L,lo_supplycost_SUM#774L,V_REVENUE_SUM#775L,d_year#47] parquet    

因而可知,执行计划大大简化,咱们能够作到亚秒级响应用户的命中查询。

进一步优化

在实际测试过程当中,咱们发现当多个Relational Cache存在时,匹配时间线性增加明显。因为咱们在metastore中存储的是Cache的SQL语句,取SQL语句和再次解析的时间都不容小觑,这就使得匹配过程明显增加,背离了咱们追求亚秒级响应的初衷。所以咱们在Spark中构建了逻辑计划缓存,将解析过的Relational Cache的计划缓存在内存中,每一个Relational Cache只缓存一份,计划自己占用空间有限,所以咱们能够缓存住几乎全部的Relational Cache的优化后的逻辑计划,从而在第一次查询以后,全部查询都再也不收到取SQL语句和再次解析的延迟困扰。通过这样的优化,匹配时间大幅减小到100ms的量级。

总结与思考

Relational Cache实现了一种基于Cache的优化方案,让Spark SQL可以用于即时查询的场景下,知足用户对海量数据秒级查询的需求。经过对用户查询的动态改写,能够大大提升缓存的利用率,扩展缓存的命中场景,有效提升查询性能。现有方案也有不少可优化的地方,好比重复的回溯遍历时间复杂度较高,不如在逻辑计划节点内部更新维护可匹配的信息。考虑到对Spark的侵入性,咱们暂时选择了现有方案,后续根据实际的使用状况,还会进一步优化咱们的逻辑计划重写过程。而重写的逻辑计划还涉及到基于不一样的Relational Cache Plan会有不一样的重写方式,在这些重写结果中如何根据执行代价选择最优的重写方案,将会在后续文章中进行揭秘,敬请期待!


本文分享自微信公众号 - Apache Spark技术交流社区(E-MapReduce_Spark)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索