字节跳动在Spark SQL上的核心优化实践 | 字节跳动技术沙龙

10月26日, 字节跳动技术沙龙 | 大数据架构专场 在上海字节跳动总部圆满结束。咱们邀请到字节跳动数据仓库架构负责人-郭俊,Kyligence 大数据研发工程师-陶加涛,字节跳动存储工程师-徐明敏,阿里云高级技术专家-白宸和你们进行分享交流。

如下是字节跳动数据仓库架构负责人-郭俊的分享主题沉淀:《字节跳动在Spark SQL上的核心优化实践》。前端

团队介绍

数据仓库架构团队负责数据仓库领域架构设计,支持字节跳动几乎全部产品线(包含但不限于抖音、今日头条、西瓜视频、火山视频)数据仓库方向的需求,如 Spark SQL / Druid 的二次开发和优化。

归纳

今天的分享分为三个部分,第一个部分是 SparkSQL 的架构简介,第二部分介绍字节跳动在 SparkSQL 引擎上的优化实践,第三部分是字节跳动在 Spark Shuffle 稳定性提高和性能优化上的实践与探索。

Spark SQL 架构简介

咱们先简单聊一下Spark SQL 的架构。下面这张图描述了一条 SQL 提交以后须要经历的几个阶段,结合这些阶段就能够看到在哪些环节能够作优化。



不少时候,作数据仓库建模的同窗更倾向于直接写 SQL 而非使用 Spark 的 DSL。一条 SQL 提交以后会被 Parser 解析并转化为 Unresolved Logical Plan。它的重点是 Logical Plan 也即逻辑计划,它描述了但愿作什么样的查询。Unresolved 是指该查询相关的一些信息未知,好比不知道查询的目标表的 Schema 以及数据位置。

上述信息存于 Catalog 内。在生产环境中,通常由 Hive Metastore 提供 Catalog 服务。Analyzer 会结合 Catalog 将 Unresolved Logical Plan 转换为 Resolved Logical Plan。算法

到这里还不够。不一样的人写出来的 SQL 不同,生成的 Resolved Logical Plan 也就不同,执行效率也不同。为了保证不管用户如何写 SQL 均可以高效的执行,Spark SQL 须要对 Resolved Logical Plan 进行优化,这个优化由 Optimizer 完成。Optimizer 包含了一系列规则,对 Resolved Logical Plan 进行等价转换,最终生成 Optimized Logical Plan。该 Optimized Logical Plan 不能保证是全局最优的,但至少是接近最优的。缓存

上述过程只与 SQL 有关,与查询有关,可是与 Spark 无关,所以没法直接提交给 Spark 执行。Query Planner 负责将 Optimized Logical Plan 转换为 Physical Plan,进而能够直接由 Spark 执行。

因为同一种逻辑算子能够有多种物理实现。如 Join 有多种实现,ShuffledHashJoin、BroadcastHashJoin、BroadcastNestedLoopJoin、SortMergeJoin 等。所以 Optimized Logical Plan 可被 Query Planner 转换为多个 Physical Plan。如何选择最优的 Physical Plan 成为一件很是影响最终执行性能的事情。一种比较好的方式是,构建一个 Cost Model,并对全部候选的 Physical Plan 应用该 Model 并挑选 Cost 最小的 Physical Plan 做为最终的 Selected Physical Plan。性能优化

Physical Plan 可直接转换成 RDD 由 Spark 执行。咱们常常说“计划赶不上变化”,在执行过程当中,可能发现原计划不是最优的,后续执行计划若是能根据运行时的统计信息进行调整可能提高总体执行效率。这部分动态调整由 Adaptive Execution 完成。

后面介绍字节跳动在 Spark SQL 上作的一些优化,主要围绕这一节介绍的逻辑计划优化与物理计划优化展开。架构

Spark SQL引擎优化

Bucket Join改进

在 Spark 里,实际并无 Bucket Join 算子。这里说的 Bucket Join 泛指不须要 Shuffle 的 SortMergeJoin。

下图展现了 SortMergeJoin 的基本原理。用虚线框表明的 Table 1 和 Table 2 是两张须要按某字段进行 Join 的表。虚线框内的 partition 0 到 partition m 是该表转换成 RDD 后的 Partition,而非表的分区。假设 Table 1 与 Table 2 转换为 RDD 后分别包含 m 和 k 个 Partition。为了进行 Join,须要经过 Shuffle 保证相同 Join Key 的数据在同一个 Partition 内且 Partition 内按 Key 排序,同时保证 Table 1 与 Table 2 通过 Shuffle 后的 RDD 的 Partition 数相同。app

以下图所示,通过 Shuffle 后只须要启动 n 个 Task,每一个 Task 处理 Table 1 与 Table 2 中对应 Partition 的数据进行 Join 便可。如 Task 0 只须要顺序扫描 Shuffle 后的左右两边的 partition 0 便可完成 Join。



该方法的优点是适用场景广,几乎可用于任意大小的数据集。劣势是每次 Join 都须要对全量数据进行 Shuffle,而 Shuffle 是最影响 Spark SQL 性能的环节。若是能避免 Shuffle 每每能大幅提高 Spark SQL 性能。

对于大数据的场景来说,数据通常是一次写入屡次查询。若是常常对两张表按相同或相似的方式进行 Join,每次都须要付出 Shuffle 的代价。与其这样,不如让数据在写的时候,就让数据按照利于 Join 的方式分布,从而使得 Join 时无需进行 Shuffle。以下图所示,Table 1 与 Table 2 内的数据按照相同的 Key 进行分桶且桶数都为 n,同时桶内按该 Key 排序。对这两张表进行 Join 时,能够避免 Shuffle,直接启动 n 个 Task 进行 Join。运维




字节跳动对 Spark SQL 的 BucketJoin 作了四项比较大的改进。

改进一:支持与 Hive 兼容异步

在过去一段时间,字节跳动把大量的 Hive 做业迁移到了 SparkSQL。而 Hive 与 Spark SQL 的 Bucket 表不兼容。对于使用 Bucket 表的场景,若是直接更新计算引擎,会形成 Spark SQL 写入 Hive Bucket 表的数据没法被下游的 Hive 做业当成 Bucket 表进行 Bucket Join,从而形成做业执行时间变长,可能影响 SLA。

为了解决这个问题,咱们让 Spark SQL 支持 Hive 兼容模式,从而保证 Spark SQL 写入的 Bucket 表与 Hive 写入的 Bucket 表效果一致,而且这种表能够被 Hive 和 Spark SQL 当成 Bucket 表进行 Bucket Join 而不须要 Shuffle。经过这种方式保证 Hive 向 Spark SQL 的透明迁移。oop

第一个须要解决的问题是,Hive 的一个 Bucket 通常只包含一个文件,而 Spark SQL 的一个 Bucket 可能包含多个文件。解决办法是动态增长一次以 Bucket Key 为 Key 而且并行度与 Bucket 个数相同的 Shuffle。




第二个须要解决的问题是,Hive 1.x 的哈希方式与 Spark SQL 2.x 的哈希方式(Murmur3Hash)不一样,使得相同的数据在 Hive 中的 Bucket ID 与 Spark SQL 中的 Bucket ID 不一样而没法直接 Join。在 Hive 兼容模式下,咱们让上述动态增长的 Shuffle 使用 Hive 相同的哈希方式,从而解决该问题。

改进二:支持倍数关系Bucket Join性能

Spark SQL 要求只有 Bucket 相同的表才能(必要非充分条件)进行 Bucket Join。对于两张大小相差很大的表,好比几百 GB 的维度表与几十 TB (单分区)的事实表,它们的 Bucket 个数每每不一样,而且个数相差不少,默认没法进行 Bucket Join。所以咱们经过两种方式支持了倍数关系的 Bucket Join,即当两张 Bucket 表的 Bucket 数是倍数关系时支持 Bucket Join。

第一种方式,Task 个数与小表 Bucket 个数相同。以下图所示,Table A 包含 3 个 Bucket,Table B 包含 6 个 Bucket。此时 Table B 的 bucket 0 与 bucket 3 的数据合集应该与 Table A 的 bucket 0 进行 Join。这种状况下,能够启动 3 个 Task。其中 Task 0 对 Table A 的 bucket 0 与 Table B 的 bucket 0 + bucket 3 进行 Join。在这里,须要对 Table B 的 bucket 0 与 bucket 3 的数据再作一次 merge sort 从而保证合集有序。


若是 Table A 与 Table B 的 Bucket 个数相差不大,可使用上述方式。若是 Table B 的 Bucket 个数是 Bucket A Bucket 个数的 10 倍,那上述方式虽然避免了 Shuffle,但可能由于并行度不够反而比包含 Shuffle 的 SortMergeJoin 速度慢。此时可使用另一种方式,即 Task 个数与大表 Bucket 个数相等,以下图所示。



在该方案下,可将 Table A 的 3 个 Bucket 读屡次。在上图中,直接将 Table A 与 Table A 进行 Bucket Union (新的算子,与 Union 相似,但保留了 Bucket 特性),结果至关于 6 个 Bucket,与 Table B 的 Bucket 个数相同,从而能够进行 Bucket Join。

改进三:支持BucketJoin 降级

公司内部过去使用 Bucket 的表较少,在咱们对 Bucket 作了一系列改进后,大量用户但愿将表转换为 Bucket 表。转换后,表的元信息显示该表为 Bucket 表,而历史分区内的数据并未按 Bucket 表要求分布,在查询历史数据时会出现没法识别 Bucket 的问题。

同时,因为数据量上涨快,平均 Bucket 大小也快速增加。这会形成单 Task 须要处理的数据量过大进而引发使用 Bucket 后的效果可能不如直接使用基于 Shuffle 的 Join。

为了解决上述问题,咱们实现了支持降级的 Bucket 表。基本原理是,每次修改 Bucket 信息(包含上述两种状况——将非 Bucket 表转为 Bucket 表,以及修改 Bucket 个数)时,记录修改日期。而且在决定使用哪一种 Join 方式时,对于 Bucket 表先检查所查询的数据是否只包含该日期以后的分区。若是是,则当成 Bucket 表处理,支持 Bucket Join;不然当成普通无 Bucket 的表。

改进四:支持超集

对于一张经常使用表,可能会与另一张表按 User 字段作 Join,也可能会与另一张表按 User 和 App 字段作 Join,与其它表按 User 与 Item 字段进行 Join。而 Spark SQL 原生的 Bucket Join 要求 Join Key Set 与表的 Bucket Key Set 彻底相同才能进行 Bucket Join。在该场景中,不一样 Join 的 Key Set 不一样,所以没法同时使用 Bucket Join。这极大的限制了 Bucket Join 的适用场景。
针对此问题,咱们支持了超集场景下的 Bucket Join。只要 Join Key Set 包含了 Bucket Key Set,便可进行 Bucket Join。

以下图所示,Table X 与 Table Y,都按字段 A 分 Bucket。而查询须要对 Table X 与 Table Y 进行 Join,且 Join Key Set 为 A 与 B。此时,因为 A 相等的数据,在两表中的 Bucket ID 相同,那 A 与 B 各自相等的数据在两表中的 Bucket ID 确定也相同,因此数据分布是知足 Join 要求的,不须要 Shuffle。同时,Bucket Join 还须要保证两表按 Join Key Set 即 A 和 B 排序,此时只须要对 Table X 与 Table Y 进行分区内排序便可。因为两边已经按字段 A 排序了,此时再按 A 与 B 排序,代价相对较低。


物化列

Spark SQL 处理嵌套类型数据时,存在如下问题:

  • 读取大量没必要要的数据:对于 Parquet / ORC 等列式存储格式,可只读取须要的字段,而直接跳过其它字段,从而极大节省 IO。而对于嵌套数据类型的字段,以下图中的 Map 类型的 people 字段,每每只须要读取其中的子字段,如 people.age。却须要将整个 Map 类型的 people 字段所有读取出来而后抽取出 people.age 字段。这会引入大量的无心义的 IO 开销。在咱们的场景中,存在很多 Map 类型的字段,并且不少包含几十至几百个 Key,这也就意味着 IO 被放大了几十至几百倍。
  • 没法进行向量化读取:而向量化读能极大的提高性能。但截止到目前(2019年10月26日),Spark 不支持包含嵌套数据类型的向量化读取。这极大的影响了包含嵌套数据类型的查询性能
  • 不支持 Filter 下推:目前(2019年10月26日)的 Spark 不支持嵌套类型字段上的 Filter 的下推

  • 重复计算:JSON 字段,在 Spark SQL 中以 String 类型存在,严格来讲不算嵌套数据类型。不过实践中也经常使用于保存不固定的多个字段,在查询时经过 JSON Path 抽取目标子字段,而大型 JSON 字符串的字段抽取很是消耗 CPU。对于热点表,频繁重复抽取相同子字段很是浪费资源。


    对于这个问题,作数仓的同窗也想了一些解决方案。以下图所示,在名为 base_table 的表以外建立了一张名为 sub_table 的表,而且将高频使用的子字段 people.age 设置为一个额外的 Integer 类型的字段。下游再也不经过 base_table 查询 people.age,而是使用 sub_table 上的 age 字段代替。经过这种方式,将嵌套类型字段上的查询转为了 Primitive 类型字段的查询,同时解决了上述问题。


    这种方案存在明显缺陷:
    • 额外维护了一张表,引入了大量的额外存储/计算开销。
    • 没法在新表上查询新增字段的历史数据(如要支持对历史数据的查询,须要重跑历史做业,开销过大,没法接受)。
    • 表的维护方须要在修改表结构后修改插入数据的做业。
    • 须要下游查询方修改查询语句,推广成本较大。
    • 运营成本高:若是高频子字段变化,须要删除再也不须要的独立子字段,并添加新子字段为独立字段。删除前,须要确保下游无业务使用该字段。而新增字段须要通知并推动下游业务方使用新字段。
    为解决上述全部问题,咱们设计并实现了物化列。它的原理是:
    • 新增一个 Primitive 类型字段,好比 Integer 类型的 age 字段,而且指定它是 people.age 的物化字段。
    • 插入数据时,为物化字段自动生成数据,并在 Partition Parameter 内保存物化关系。所以对插入数据的做业彻底透明,表的维护方不须要修改已有做业。
    • 查询时,检查所需查询的全部 Partition,若是都包含物化信息(people.age 到 age 的映射),直接将 select people.age 自动重写为 select age,从而实现对下游查询方的彻底透明优化。同时兼容历史数据。

    下图展现了在某张核心表上使用物化列的收益:



    物化视图

    在 OLAP 领域,常常会对相同表的某些固定字段进行 Group By 和 Aggregate / Join 等耗时操做,形成大量重复性计算,浪费资源,且影响查询性能,不利于提高用户体验。

    咱们实现了基于物化视图的优化功能:



    如上图所示,查询历史显示大量查询根据 user 进行 group by,而后对 num 进行 sum 或 count 计算。此时可建立一张物化视图,且对 user 进行 gorup by,对 num 进行 avg(avg 会自动转换为 count 和 sum)。用户对原始表进行 select user, sum(num) 查询时,Spark SQL 自动将查询重写为对物化视图的 select user, sum_num 查询。

    Spark SQL 引擎上的其它优化

    下图展现了咱们在 Spark SQL 上进行的其它部分优化工做:


    Spark Shuffle稳定性提高与性能优化

    Spark Shuffle 存在的问题

    Shuffle的原理,不少同窗应该已经很熟悉了。鉴于时间关系,这里不介绍过多细节,只简单介绍下基本模型。


    如上图所示,咱们将 Shuffle 上游 Stage 称为 Mapper Stage,其中的 Task 称为 Mapper。Shuffle 下游 Stage 称为 Reducer Stage,其中的 Task 称为 Reducer。

    每一个 Mapper 会将本身的数据分为最多 N 个部分,N 为 Reducer 个数。每一个 Reducer 须要去最多 M (Mapper 个数)个 Mapper 获取属于本身的那部分数据。

    这个架构存在两个问题:

    • 稳定性问题:Mapper 的 Shuffle Write 数据存于 Mapper 本地磁盘,只有一个副本。当该机器出现磁盘故障,或者 IO 满载,CPU 满载时,Reducer 没法读取该数据,从而引发 FetchFailedException,进而致使 Stage Retry。Stage Retry 会形成做业执行时间增加,直接影响 SLA。同时,执行时间越长,出现 Shuffle 数据没法读取的可能性越大,反过来又会形成更多 Stage Retry。如此循环,可能致使大型做业没法成功执行。

    • 性能问题:每一个 Mapper 的数据会被大量 Reducer 读取,而且是随机读取不一样部分。假设 Mapper 的 Shuffle 输出为 512MB,Reducer 有 10 万个,那平均每一个 Reducer 读取数据 512MB / 100000 = 5.24KB。而且,不一样 Reducer 并行读取数据。对于 Mapper 输出文件而言,存在大量的随机读取。而 HDD 的随机 IO 性能远低于顺序 IO。最终的现象是,Reducer 读取 Shuffle 数据很是慢,反映到 Metrics 上就是 Reducer Shuffle Read Blocked Time 较长,甚至占整个 Reducer 执行时间的一大半,以下图所示。



    基于HDFS的Shuffle稳定性提高

    经观察,引发 Shuffle 失败的最大因素不是磁盘故障等硬件问题,而是 CPU 满载和磁盘 IO 满载。


    如上图所示,机器的 CPU 使用率接近 100%,使得 Mapper 侧的 Node Manager 内的 Spark External Shuffle Service 没法及时提供 Shuffle 服务。

    下图中 Data Node 占用了整台机器 IO 资源的 84%,部分磁盘 IO 彻底打满,这使得读取 Shuffle 数据很是慢,进而使得 Reducer 侧没法在超时时间内读取数据,形成 FetchFailedException。



    不管是何种缘由,问题的症结都是 Mapper 侧的 Shuffle Write 数据只保存在本地,一旦该节点出现问题,会形成该节点上全部 Shuffle Write 数据没法被 Reducer 读取。解决这个问题的一个通用方法是,经过多副本保证可用性。

    最初始的一个简单方案是,Mapper 侧最终数据文件与索引文件不写在本地磁盘,而是直接写到 HDFS。Reducer 再也不经过 Mapper 侧的 External Shuffle Service 读取 Shuffle 数据,而是直接从 HDFS 上获取数据,以下图所示。



    快速实现这个方案后,咱们作了几组简单的测试。结果代表:

    • Mapper 与 Reducer 很少时,Shuffle 读写性能与原始方案相比无差别。
    • Mapper 与 Reducer 较多时,Shuffle 读变得很是慢。




      在上面的实验过程当中,HDFS 发出了报警信息。以下图所示,HDFS Name Node Proxy 的 QPS 峰值达到 60 万。(注:字节跳动自研了 Node Name Proxy,并在 Proxy 层实现了缓存,所以读 QPS 能够支撑到这个量级)。



      缘由在于,总共 10000 Reducer,须要从 10000 个 Mapper 处读取数据文件和索引文件,总共须要读取 HDFS 10000 * 1000 * 2 = 2 亿次。

      若是只是 Name Node 的单点性能问题,还能够经过一些简单的方法解决。例如在 Spark Driver 侧保存全部 Mapper 的 Block Location,而后 Driver 将该信息广播至全部 Executor,每一个 Reducer 能够直接从 Executor 处获取 Block Location,而后无须链接 Name Node,而是直接从 Data Node 读取数据。但鉴于 Data Node 的线程模型,这种方案会对 Data Node 形成较大冲击。

      最后咱们选择了一种比较简单可行的方案,以下图所示。



      Mapper 的 Shuffle 输出数据仍然按原方案写本地磁盘,写完后上传到 HDFS。Reducer 仍然按原始方案经过 Mapper 侧的 External Shuffle Service 读取 Shuffle 数据。若是失败了,则从 HDFS 读取。这种方案极大减小了对 HDFS 的访问频率。

      该方案上线近一年:

      • 覆盖 57% 以上的 Spark Shuffle 数据。
      • 使得 Spark 做业总体性能提高 14%。
      • 天级大做业性能提高 18%。
      • 小时级做业性能提高 12%。


      该方案旨在提高 Spark Shuffle 稳定性从而提高做业稳定性,但最终没有使用方差等指标来衡量稳定性的提高。缘由在于天天集群负载不同,总体方差较大。Shuffle 稳定性提高后,Stage Retry 大幅减小,总体做业执行时间减小,也即性能提高。最终经过对比使用该方案先后的总的做业执行时间来对比性能的提高,用于衡量该方案的效果。

      Shuffle性能优化实践与探索

      如上文所分析,Shuffle 性能问题的缘由在于,Shuffle Write 由 Mapper 完成,而后 Reducer 须要从全部 Mapper 处读取数据。这种模型,咱们称之为以 Mapper 为中心的 Shuffle。它的问题在于:

      • Mapper 侧会有 M 次顺序写 IO。
      • Mapper 侧会有 M * N * 2 次随机读 IO(这是最大的性能瓶颈)。
      • Mapper 侧的 External Shuffle Service 必须与 Mapper 位于同一台机器,没法作到有效的存储计算分离,Shuffle 服务没法独立扩展。
      针对上述问题,咱们提出了以 Reducer 为中心的,存储计算分离的 Shuffle 方案,以下图所示。




      该方案的原理是,Mapper 直接将属于不一样 Reducer 的数据写到不一样的 Shuffle Service。在上图中,总共 2 个 Mapper,5 个 Reducer,5 个 Shuffle Service。全部 Mapper 都将属于 Reducer 0 的数据远程流式发送给 Shuffle Service 0,并由它顺序写入磁盘。Reducer 0 只须要从 Shuffle Service 0 顺序读取全部数据便可,无需再从 M 个 Mapper 取数据。该方案的优点在于:
      • 将 M * N * 2 次随机 IO 变为 N 次顺序 IO。
      • Shuffle Service 能够独立于 Mapper 或者 Reducer 部署,从而作到独立扩展,作到存储计算分离。
      • Shuffle Service 可将数据直接存于 HDFS 等高可用存储,所以可同时解决 Shuffle 稳定性问题。
      个人分享就到这里,谢谢你们。

      QA集锦

      - 提问:物化列新增一列,是否须要修改历史数据?

      回答:历史数据太多,不适合修改历史数据。

      - 提问:若是用户的请求同时包含新数据和历史数据,如何处理?

      回答:通常而言,用户修改数据都是以 Partition 为单位。因此咱们在 Partition Parameter 上保存了物化列相关信息。若是用户的查询同时包含了新 Partition 与历史 Partition,咱们会在新 Partition 上针对物化列进行 SQL Rewrite,历史 Partition 不 Rewrite,而后将新老 Partition 进行 Union,从而在保证数据正确性的前提下尽量充分利用物化列的优点。

      - 提问:你好,大家针对用户的场景,作了不少挺有价值的优化。像物化列、物化视图,都须要根据用户的查询 Pattern 进行设置。目前大家是人工分析这些查询,仍是有某种机制自动去分析并优化?

      回答:目前咱们主要是经过一些审计信息辅助人工分析。同时咱们也正在作物化列与物化视图的推荐服务,最终作到智能建设物化列与物化视图。

      - 提问:刚刚介绍的基于 HDFS 的 Spark Shuffle 稳定性提高方案,是否能够异步上传 Shuffle 数据至 HDFS?

      回答:这个想法挺好,咱们以前也考虑过,但基于几点考虑,最终没有这样作。第一,单 Mapper 的 Shuffle 输出数据量通常很小,上传到 HDFS 耗时在 2 秒之内,这个时间开销能够忽略;第二,咱们普遍使用 External Shuffle Service 和 Dynamic Allocation,Mapper 执行完成后可能 Executor 就回收了,若是要异步上传,就必须依赖其它组件,这会提高复杂度,ROI 较低。

      更多精彩分享

      上海沙龙回顾 | 字节跳动如何优化万级节点HDFS平台

      上海沙龙回顾 | Apache Kylin 原理介绍与新架构分享(Kylin On Parquet)

      上海沙龙回顾 | Redis 高速缓存在大数据场景中的应用

      字节跳动技术沙龙

      字节跳动技术沙龙是由字节跳动技术学院发起,字节跳动技术学院、掘金技术社区联合主办的技术交流活动。

      字节跳动技术沙龙邀请来自字节跳动及业内互联网公司的技术专家,分享热门技术话题与一线实践经验,内容覆盖架构、大数据、前端、测试、运维、算法、系统等技术领域。

      字节跳动技术沙龙旨在为技术领域人才提供一个开放、自由的交流学习平台,帮助技术人学习成长,不断进阶。

        欢迎关注「字节跳动技术团队」
      相关文章
      相关标签/搜索