EMR Spark Relational Cache如何支持雪花模型中的关联匹配

做者:李呈祥,阿里巴巴计算平台事业部EMR团队高级技术专家,Apache Hive Committer, Apache Flink Committer,深度参与了Hadoop,Hive,Spark,Flink等开源项目的研发工做,对于SQL引擎,分布式系统有较为深刻的了解和实践,目前主要专一于EMR产品中开源计算引擎的优化工做。sql


Relational Cache相关文章连接:apache

使用Relational Cache加速EMR Spark数据分析,微信

连接:https://yq.aliyun.com/articles/703046
网络

使用EMR Spark Relational Cache跨集群同步数据,

连接:https://yq.aliyun.com/articles/704649

EMR Spark Relational Cache的执行计划重写


背景

Join是Spark SQL中很是常见的操做,数据表按照业务语义的范式化表定义,便于用户理解与使用,并且能够消除冗余数据。用户经过join操做将相关的数据关联后进行进一步的过滤,聚合等操做。在Spark中,Join一般是代价比较大,尤为是当join的两个表的数据都比较大,没法优化为map join时,须要经过网络shuffle两个表的数据,对数据按照jion字段进行从新组织。Relational Cache是EMR Spark支持的重要特性,相似于数据仓库的物化视图,将反范式化表(即关联后的大表)保存为relational cache,即可以使用cache重写执行计划,提升查询效率。可是A ⋈ B ⋈ C做为relational cache只能用来优化包含A ⋈ B ⋈ C的查询,理论上是不能用来优化只包含A ⋈ B或A ⋈ C的查询。若是表的数量不少,假设有n个表,则总共可能有2^n个关联结果(固然在业务上并不会出现任意两个表均可以关联的状况,可是可能的关联数量依然会很是大),将每一个关联结果都做为relational cache构建代价太大,并不现实,咱们须要找到一种方式能够经过单个Relational Cache支持优化多个关联查询的方式,从而在加速用户查询的同时,减小建立和更新relational cache的代价。Record Preserve Join是支持这种优化的很是有效的方式。app

什么是Record Preserved Join

定义:对于表A和表B,若是A的每一条记录都出如今A ⋈ B的结果中,而且A ⋈ B的结果并无其余额外的记录,那么咱们称A ⋈ B record preserved on A。分布式

下面咱们看一下一个Record preserved join的简单示例。函数

如上图所示,activity_log为事实表,product,user,city为维度表,activity_log表经过user_id字段和user表关联,经过product_id字段与product表关联,user表经过city_id字段与city表关联,关联的结果以下:
oop

若是咱们把上述的activity_log,product,user以及city四表的关联结果做为Relational Cache,理论上只有后续的查询包含这四个表的关联时,才可以使用cache优化SQL执行计划,若是查询只包含部分表的关联,好比只是activity_log和user关联,是没有办法使用以前的cache优化的。可是咱们仔细观察能够发现,每一行activity_log表中的记录,其user_id都和user表中的其中一条且仅一条记录相关联,对于product_id和city_id的关联也是一样如此,能够看到关联后的结果,仅仅是为activity_log表中的每条记录增长更多的字段,activity_log中的每条记录和关联结果中的每条记录是一一对应的关系,这种join结果就是record preserved join。因为activity_log和user关联的数据每一条都和cache中的数据一一对应,很少也很多,因此实际上咱们应该容许使用该cache优化包含activity_log和user关联的查询。好比对于上面的示例,咱们把图2中四表关联的结果保存为Relational Cache activity_flat_cache,那么对于query
优化

SELECT activity_text, user_name FROM activity_log, user WHERE activity.user_id = user.user_id AND user_name = 'jack'

同理,咱们可使用activity_flat_cache优化任意包含activity_log与其余单个或多个维度表的关联查询,使用同一个cache优化多种关联场景,大大下降relational cache维护和更新所需的存储和计算成本。ui

为了可以在Relational Cache中基于Record Preserved Join支持更丰富的优化场景,咱们须要首先解决两个问题:

  1. Record preserved Join对于关联两表的数据有很是严格的约束,Relational Cache如何知道两个表关联的结果是否为Record preserved Join。

  2. 已知1的信息,如何推导Relational Cache是否可用于Join查询的执行计划优化。

Record Preserved Join声明

一个常见的Record Preserved Join是Left Outer Join,对于任意的表A和B,A left outer join B record preserved on A。根据表A和B中join key的数据分布,最多见的也可能知足record preserved join的条件。在EMR Spark中,用户能够定义表的主外键和NOT NULL约束,经过表的主外键和NOT NULL约束,Relational Cache能够推断出两表关联是否知足Record Preserved Join。

根据外键的定义,外键的值必须存在其引用的主键中,或者为空,其引用的主键又必须不能重复,因此若是存在表A和表B,并且表A中的外键字段关联表B的主键,且外键字段有NOT NULL约束,则咱们能够肯定A INNER JOIN B的结果record preserved on A

以上面四个表的关系为例,在Spark SQL定义主外键的DDL示例以下:

ALTER TABLE activity_log ADD CONSTRAINT act_pk PRIMARY KEY (activity_id);ALTER TABLE product ADD CONSTRAINT prd_pk PRIMARY KEY (product_id);ALTER TABLE user ADD CONSTRAINT user_pk PRIMARY KEY (user_id);ALTER TABLE city ADD CONSTRAINT city_pk PRIMARY KEY (city_id);ALTER TABLE activity_log ADD CONSTRAINT act_prd_fk FOREIGN KEY (product_id) REFRENCES product (product_id);ALTER TABLE activity_log ADD CONSTRAINT act_user_fk FOREIGN KEY (user_id) REFRENCES user (user_id);ALTER TABLE user ADD CONSTRAINT user_city_fk FOREIGN KEY (city_id) REFRENCES city (city_id);

结合各外键字段的NOT NULL约束,咱们能够推断出以下Record Preserved Join:

  • activity_log inner join product record preserved on activity_log

  • activity_log inner join user record preserved on activity_log

  • user inner join city record preserved on user

使用Record Preserved Join优化优化执行计划

EMR Spark支持经过任意的SQL查询建立Relational Cache,可能包含关联,聚合,过滤,投影等各类操做,其中关联也包括record preserved join和其余join,如何利用到其中的record preserved join特性对更多的查询优化其执行计划,决定了咱们对于Relational Cache的利用效率。Relational Cache经过比较用户查询和cache视图的执行计划来决定是否可使用cache代替查询执行计划或其一部分,在匹配Join时判断的主要步骤以下:

  1. 收集用户查询中的join相关信息,与Relational Cache中join相关信息,找到二者并集,且并集中全部表都是关联的。

  2. 对于Relational Cache中的除1中并集外的其余关联操做,根据用户定义的约束推断出来的record preserved join信息,判断Relational Cache其余关联操做的结果是不是record preserved on 并集结果。

  3. 使用cache替换并集,并和用户查询中剩余的其余表从新拼接join。

  4. 继续适配执行计划其余部分。

例如咱们建立了relational cache (A ⋈ B) ⋈ C,且 A ⋈ B record preserved on A & A ⋈ C record preserved on A, 用户查询为A ⋈ C,

在判断过程当中,直接从约束的获得的Record Preserved Join信息可能并不足够,咱们还须要经过一些定律进一步推理,从而充分利用Record Preserved Join信息优化更多的查询。

Record Preserved Join推理

根据record preserved join的定义和关系代数的基本原理,咱们能够推导出以下定理。

  1. 等价


if A full outer join B record preserved on Athen A full outer join B = A left outer join Bif A inner join B record preserved on Athen A inner join B = A left outer join B

已知A left outer join B record preserved on A,若是同时A full outer join B record preserved on A的话,那么咱们能够肯定A left outer join B和A full outer join B的结果一致,能够互相替换。对于Inner Join一样如此。

  1. 交换

if A ⋈ B record preserved on A then B reverse() A record preserved on A def reverse(join) join match { case INNER => INNER case LEFT OUTER => RIGHT OUTER case FULL OUTER => FULL OUTER }

根据关系代数的基本定义能够获得reverse函数,加上record preserved join定义,能够很方便的推导出此定理。实际的查询中,join的顺序可能和Relational Cache中并不一致,可能须要变换join顺序进行比较。

  1. 结合

if A ⋈ B record preserved on A and B ⋈ C record preserved on Bthen A ⋈ B join C record preserved on A

因为B ⋈ C record preserved on B,能够认为B join C的结果是在B表中新增更多的维度列,因此A ⋈ B ⋈ C的结果和A ⋈ B的结果记录数一致,A ⋈ B record preserved on A,因此A ⋈ B ⋈ C record preserved on A。

同理,也可推导出:

if A ⋈ B record preserved on A and A ⋈ C record preserved on A then (A ⋈ B) ⋈ C record preserved on A and (A ⋈ B), (A ⋈ C) ⋈ B record preserved on A and (A ⋈ C),

1.传导

if A ⋈ B record preserved on A and B ⋈ C record preserved on Bthen A ⋈ C record preserved on A// same join type, same join key

因为A ⋈ B record preserved on A和B ⋈ C record preserved on B能够得知A ⋈ B ⋈ C record perserved on A,若是A ⋈ B中的join字段和A ⋈ C中A的join字段一致,且B ⋈ C中的join字段和A ⋈ C中C的join字段一致,将A ⋈ B ⋈ C结果中的B相关字段去掉,即为A join C,其结果依然record preserved on A。

雪花数据模型

Relational Cache一个重要的使用场景是决策支持系统,经过BI,报表或多维数据分析快速支持用户的商业决策。在这种场景中,数据模型一般包括一个事实表(Fact Table)和多个维度表(Dimension Table),对于事实表和维度表的关联关系,能够大致分为三种类型:

  1. Star Schema:全部的维度表都是反范式化(denormalized)的,即维度表只有一层,事实表能够和任意维度表直接关联。

  2. Snowflake Schema:全部的维度表都是范式化(normalized)的,即维度表有多层,事实表须要经过屡次关联才能关联到所有维度数据。

  3. Starflake Schema:部分维度表是范式化的,部分维度表是反范式化的。

在Star/Snowflake/Starflake数据模型中,事实表和维度表的数据存在着业务上的关联关系,实际的数据也知足主外键/非空字段等约束条件,是验证在执行计划优化时使用Record Preserved Join的合适场景。在MOLAP引擎中,例如apache kylin,一般须要用户描述Star/Snowflake/Starflake数据模型,结合维度和统计列信息构建Cube,用于快速响应多维分析请求。用户的多维分析查询可能涉及到事实表和一个或者多个维度表的关联,实际上Star/Snowflake/Starflake数据模型的定义也隐含着事实表和维度表的Record Preserved Join约束,Relational Cache经过更基础的字段约束定义,推导出Record Preserved Join,从而支持使用relational cache构建cube,经过执行计划重写,知足交互式的多维分析查询需求。Relational Cache的Record Preserved Join推导不只可用于基于雪花模型的多维分析场景,也能够用于其余涉及到Join的场景,拓展relational cache可优化的查询场景,减小维护的成本和代价。

使用Record Preserved Join优化雪花模型示例

咱们使用第二节中的表及其约束,构建Relational Cache,假设用户须要进行多维分析,构建一个Full Cube语句以下:

CACHE TABLE activity_cubeUSING parquetAS SELECT product_name, user_name, city_name, count(1), GROUPING_ID() AS grouping_id FROM activity_log, user, product, city WHERE activity_log.product_id = product.product_id and activity_log.user_id = user.user_id and user.city_id = city.city_idGROUP BY CUBE(product_name, user_name, city_name);

用户查询以下:

SELECT product_name, count FROM activity_log, product WHERE activity_log.product_id = product.product_id and product_name = 'xxx';

在匹配Join时判断的主要步骤以下:

  1. cache和用户查询join的并集为:activity_log ⋈ product

  2. cache中剩余的表为user和city,这一步可能重复屡次,在第一轮经过activity_log ⋈ user record preserved on activity_log以及activity_log ⋈ product record preserved on activity_log使用结合律2推导出(activity_log ⋈ product) ⋈ user record preserved on (activity_log ⋈ product), 在第二轮使用结合律1和上轮的结果推导出(activity_log ⋈ product) ⋈ user ⋈ city record preserved on (activity_log ⋈ product), 从而得出结论cache能够用于替换activity_log ⋈ product。

  3. 继续其余部分执行计划的匹配和重写。



能够看到,基于Record Preserved Join及其推理,咱们可使用单个大宽表(包含事实表和全部维度表关联的结果)做为cache优化全部包含事实表activity_log的关联查询,以此为基础,咱们构建的activity_cube能够用于优化基于各个维度组合的查询,结合咱们在聚合层面的匹配策略,支持Starflake模型数据的交互式多维分析。

总结

Relational Cache经过Spark中表的各类字段约束信息,推导出Record Preserved Join,结合更进一步的推理规则,使得relational cache能够经过一个宽表的cache优化多种关联查询的场景。在star/snowflake/starflake数据模型下,经过将事实表和全部维度表关联并根据维度聚合后的结果(即Cube)保存为relational cache后,经过Record Preserved Join的推导,relational cache在执行计划优化时可使用cube数据重写各类维度组合的多维分析查询的执行计划,从而知足亚秒级响应的交互式分析需求。


本文分享自微信公众号 - Apache Spark技术交流社区(E-MapReduce_Spark)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索