如下是字节跳动数据仓库架构负责人-郭俊的分享主题沉淀:《字节跳动在Spark SQL上的核心优化实践》。前端
上述信息存于 Catalog 内。在生产环境中,通常由 Hive Metastore 提供 Catalog 服务。Analyzer 会结合 Catalog 将 Unresolved Logical Plan 转换为 Resolved Logical Plan。算法
到这里还不够。不一样的人写出来的 SQL 不同,生成的 Resolved Logical Plan 也就不同,执行效率也不同。为了保证不管用户如何写 SQL 均可以高效的执行,Spark SQL 须要对 Resolved Logical Plan 进行优化,这个优化由 Optimizer 完成。Optimizer 包含了一系列规则,对 Resolved Logical Plan 进行等价转换,最终生成 Optimized Logical Plan。该 Optimized Logical Plan 不能保证是全局最优的,但至少是接近最优的。缓存
因为同一种逻辑算子能够有多种物理实现。如 Join 有多种实现,ShuffledHashJoin、BroadcastHashJoin、BroadcastNestedLoopJoin、SortMergeJoin 等。所以 Optimized Logical Plan 可被 Query Planner 转换为多个 Physical Plan。如何选择最优的 Physical Plan 成为一件很是影响最终执行性能的事情。一种比较好的方式是,构建一个 Cost Model,并对全部候选的 Physical Plan 应用该 Model 并挑选 Cost 最小的 Physical Plan 做为最终的 Selected Physical Plan。性能优化
后面介绍字节跳动在 Spark SQL 上作的一些优化,主要围绕这一节介绍的逻辑计划优化与物理计划优化展开。架构
下图展现了 SortMergeJoin 的基本原理。用虚线框表明的 Table 1 和 Table 2 是两张须要按某字段进行 Join 的表。虚线框内的 partition 0 到 partition m 是该表转换成 RDD 后的 Partition,而非表的分区。假设 Table 1 与 Table 2 转换为 RDD 后分别包含 m 和 k 个 Partition。为了进行 Join,须要经过 Shuffle 保证相同 Join Key 的数据在同一个 Partition 内且 Partition 内按 Key 排序,同时保证 Table 1 与 Table 2 通过 Shuffle 后的 RDD 的 Partition 数相同。app
对于大数据的场景来说,数据通常是一次写入屡次查询。若是常常对两张表按相同或相似的方式进行 Join,每次都须要付出 Shuffle 的代价。与其这样,不如让数据在写的时候,就让数据按照利于 Join 的方式分布,从而使得 Join 时无需进行 Shuffle。以下图所示,Table 1 与 Table 2 内的数据按照相同的 Key 进行分桶且桶数都为 n,同时桶内按该 Key 排序。对这两张表进行 Join 时,能够避免 Shuffle,直接启动 n 个 Task 进行 Join。运维
改进一:支持与 Hive 兼容异步
为了解决这个问题,咱们让 Spark SQL 支持 Hive 兼容模式,从而保证 Spark SQL 写入的 Bucket 表与 Hive 写入的 Bucket 表效果一致,而且这种表能够被 Hive 和 Spark SQL 当成 Bucket 表进行 Bucket Join 而不须要 Shuffle。经过这种方式保证 Hive 向 Spark SQL 的透明迁移。oop
改进二:支持倍数关系Bucket Join性能
第一种方式,Task 个数与小表 Bucket 个数相同。以下图所示,Table A 包含 3 个 Bucket,Table B 包含 6 个 Bucket。此时 Table B 的 bucket 0 与 bucket 3 的数据合集应该与 Table A 的 bucket 0 进行 Join。这种状况下,能够启动 3 个 Task。其中 Task 0 对 Table A 的 bucket 0 与 Table B 的 bucket 0 + bucket 3 进行 Join。在这里,须要对 Table B 的 bucket 0 与 bucket 3 的数据再作一次 merge sort 从而保证合集有序。
若是 Table A 与 Table B 的 Bucket 个数相差不大,可使用上述方式。若是 Table B 的 Bucket 个数是 Bucket A Bucket 个数的 10 倍,那上述方式虽然避免了 Shuffle,但可能由于并行度不够反而比包含 Shuffle 的 SortMergeJoin 速度慢。此时可使用另一种方式,即 Task 个数与大表 Bucket 个数相等,以下图所示。
改进三:支持BucketJoin 降级
同时,因为数据量上涨快,平均 Bucket 大小也快速增加。这会形成单 Task 须要处理的数据量过大进而引发使用 Bucket 后的效果可能不如直接使用基于 Shuffle 的 Join。
改进四:支持超集
以下图所示,Table X 与 Table Y,都按字段 A 分 Bucket。而查询须要对 Table X 与 Table Y 进行 Join,且 Join Key Set 为 A 与 B。此时,因为 A 相等的数据,在两表中的 Bucket ID 相同,那 A 与 B 各自相等的数据在两表中的 Bucket ID 确定也相同,因此数据分布是知足 Join 要求的,不须要 Shuffle。同时,Bucket Join 还须要保证两表按 Join Key Set 即 A 和 B 排序,此时只须要对 Table X 与 Table Y 进行分区内排序便可。因为两边已经按字段 A 排序了,此时再按 A 与 B 排序,代价相对较低。
物化列
Spark SQL 处理嵌套类型数据时,存在如下问题:
不支持 Filter 下推:目前(2019年10月26日)的 Spark 不支持嵌套类型字段上的 Filter 的下推
重复计算:JSON 字段,在 Spark SQL 中以 String 类型存在,严格来讲不算嵌套数据类型。不过实践中也经常使用于保存不固定的多个字段,在查询时经过 JSON Path 抽取目标子字段,而大型 JSON 字符串的字段抽取很是消耗 CPU。对于热点表,频繁重复抽取相同子字段很是浪费资源。
对于这个问题,作数仓的同窗也想了一些解决方案。以下图所示,在名为 base_table 的表以外建立了一张名为 sub_table 的表,而且将高频使用的子字段 people.age 设置为一个额外的 Integer 类型的字段。下游再也不经过 base_table 查询 people.age,而是使用 sub_table 上的 age 字段代替。经过这种方式,将嵌套类型字段上的查询转为了 Primitive 类型字段的查询,同时解决了上述问题。
下图展现了在某张核心表上使用物化列的收益:
在 OLAP 领域,常常会对相同表的某些固定字段进行 Group By 和 Aggregate / Join 等耗时操做,形成大量重复性计算,浪费资源,且影响查询性能,不利于提高用户体验。
如上图所示,查询历史显示大量查询根据 user 进行 group by,而后对 num 进行 sum 或 count 计算。此时可建立一张物化视图,且对 user 进行 gorup by,对 num 进行 avg(avg 会自动转换为 count 和 sum)。用户对原始表进行 select user, sum(num) 查询时,Spark SQL 自动将查询重写为对物化视图的 select user, sum_num 查询。
Spark SQL 引擎上的其它优化
如上图所示,咱们将 Shuffle 上游 Stage 称为 Mapper Stage,其中的 Task 称为 Mapper。Shuffle 下游 Stage 称为 Reducer Stage,其中的 Task 称为 Reducer。
每一个 Mapper 会将本身的数据分为最多 N 个部分,N 为 Reducer 个数。每一个 Reducer 须要去最多 M (Mapper 个数)个 Mapper 获取属于本身的那部分数据。
基于HDFS的Shuffle稳定性提高
如上图所示,机器的 CPU 使用率接近 100%,使得 Mapper 侧的 Node Manager 内的 Spark External Shuffle Service 没法及时提供 Shuffle 服务。
不管是何种缘由,问题的症结都是 Mapper 侧的 Shuffle Write 数据只保存在本地,一旦该节点出现问题,会形成该节点上全部 Shuffle Write 数据没法被 Reducer 读取。解决这个问题的一个通用方法是,经过多副本保证可用性。
最初始的一个简单方案是,Mapper 侧最终数据文件与索引文件不写在本地磁盘,而是直接写到 HDFS。Reducer 再也不经过 Mapper 侧的 External Shuffle Service 读取 Shuffle 数据,而是直接从 HDFS 上获取数据,以下图所示。
快速实现这个方案后,咱们作了几组简单的测试。结果代表:
缘由在于,总共 10000 Reducer,须要从 10000 个 Mapper 处读取数据文件和索引文件,总共须要读取 HDFS 10000 * 1000 * 2 = 2 亿次。
若是只是 Name Node 的单点性能问题,还能够经过一些简单的方法解决。例如在 Spark Driver 侧保存全部 Mapper 的 Block Location,而后 Driver 将该信息广播至全部 Executor,每一个 Reducer 能够直接从 Executor 处获取 Block Location,而后无须链接 Name Node,而是直接从 Data Node 读取数据。但鉴于 Data Node 的线程模型,这种方案会对 Data Node 形成较大冲击。
Mapper 的 Shuffle 输出数据仍然按原方案写本地磁盘,写完后上传到 HDFS。Reducer 仍然按原始方案经过 Mapper 侧的 External Shuffle Service 读取 Shuffle 数据。若是失败了,则从 HDFS 读取。这种方案极大减小了对 HDFS 的访问频率。
该方案上线近一年:
小时级做业性能提高 12%。
该方案旨在提高 Spark Shuffle 稳定性从而提高做业稳定性,但最终没有使用方差等指标来衡量稳定性的提高。缘由在于天天集群负载不同,总体方差较大。Shuffle 稳定性提高后,Stage Retry 大幅减小,总体做业执行时间减小,也即性能提高。最终经过对比使用该方案先后的总的做业执行时间来对比性能的提高,用于衡量该方案的效果。
如上文所分析,Shuffle 性能问题的缘由在于,Shuffle Write 由 Mapper 完成,而后 Reducer 须要从全部 Mapper 处读取数据。这种模型,咱们称之为以 Mapper 为中心的 Shuffle。它的问题在于:
回答:历史数据太多,不适合修改历史数据。
回答:通常而言,用户修改数据都是以 Partition 为单位。因此咱们在 Partition Parameter 上保存了物化列相关信息。若是用户的查询同时包含了新 Partition 与历史 Partition,咱们会在新 Partition 上针对物化列进行 SQL Rewrite,历史 Partition 不 Rewrite,而后将新老 Partition 进行 Union,从而在保证数据正确性的前提下尽量充分利用物化列的优点。
回答:目前咱们主要是经过一些审计信息辅助人工分析。同时咱们也正在作物化列与物化视图的推荐服务,最终作到智能建设物化列与物化视图。
回答:这个想法挺好,咱们以前也考虑过,但基于几点考虑,最终没有这样作。第一,单 Mapper 的 Shuffle 输出数据量通常很小,上传到 HDFS 耗时在 2 秒之内,这个时间开销能够忽略;第二,咱们普遍使用 External Shuffle Service 和 Dynamic Allocation,Mapper 执行完成后可能 Executor 就回收了,若是要异步上传,就必须依赖其它组件,这会提高复杂度,ROI 较低。
上海沙龙回顾 | Apache Kylin 原理介绍与新架构分享(Kylin On Parquet)
字节跳动技术沙龙邀请来自字节跳动及业内互联网公司的技术专家,分享热门技术话题与一线实践经验,内容覆盖架构、大数据、前端、测试、运维、算法、系统等技术领域。
字节跳动技术沙龙旨在为技术领域人才提供一个开放、自由的交流学习平台,帮助技术人学习成长,不断进阶。