Spark SQL小文件问题在OPPO的解决方案

本文来自OPPO互联网基础技术团队,转载请注名做者。同时欢迎关注咱们的公众号:OPPO_tech,与你分享OPPO前沿互联网技术及活动。sql

Spark SQL小文件是指文件大小显著小于hdfs block块大小的的文件。过于繁多的小文件会给HDFS带来很严重的性能瓶颈,对任务的稳定和集群的维护会带来极大的挑战。bash

通常来讲,经过Hive调度的MR任务均可以简单设置以下几个小文件合并的参数来解决任务产生的小文件问题:markdown

set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=xxxx;
set hive.merge.smallfiles.avgsize=xxx;
复制代码

然而在咱们将离线调度任务逐步从Hive迁移到Spark的过程当中,因为Spark自己并不支持小文件合并功能,小文件问题日益突出,对集群稳定性形成很大影响,一度阻碍了咱们的迁移工做。性能

为了解决小文件问题,咱们经历了从开始的不断调整参数到后期的代码开发等不一样阶段,这里给你们作一个简单的分享。spa

1. Spark为何会产生小文件

Spark生成的文件数量直接取决于RDD里partition的数量和表分区数量。注意这里的两个分区概念并不相同,RDD的分区与任务并行度相关,而表分区则是Hive的分区数目。生成的文件数目通常是RDD分区数和表分区的乘积。所以,当任务并行度太高或者分区数目很大时,很容易产生不少的小文件。code

所以,若是须要从参数调整来减小生成的文件数目,就只能经过减小最后一个阶段RDD的分区数来达到了(减小分区数目限制于历史数据和上下游关系,难以修改)orm

2. 基于社区版本的参数进行调整的方案

2.1 不含有Shuffle算子的简单静态分区SQL

这样的SQL比较简单,主要是filter上游表一部分数据写入到下游表,或者是两张表简单UNION起来的任务,这种任务的分区数目主要是由读取文件时Partition数目决定的。排序

  • 由于从Spark 2.4以来,对Hive orc表和parquet支持已经很不错了,为了加快运行速率,咱们开启了将Hive orc/parquet表自动转为DataSource的参数。对于这种DataSource表的类型,partition数目主要是由以下三个参数控制其关系。
spark.sql.files.maxPartitionBytes;
spark.sql.files.opencostinbytes;
spark.default.parallelism;
复制代码

其关系以下图所示,所以能够经过调整这三个参数来输入数据的分片进行调整:开发

  • 而非DataSource表,使用CombineInputFormat来读取数据,所以主要是经过MR参数来进行分片调整:mapreduce.input.fileinputformat.split.minsize

虽然咱们能够经过调整输入数据的分片来对最终文件数量进行调整,可是这样的调整是不稳定的,上游数据大小发生一些轻微的变化,就可能带来参数的从新适配。get

为了简单粗暴的解决这个问题,咱们对这样的SQL加了repartition的hint,引入了新的shuffle,保证文件数量是一个固定值。

2.2 带有Shuffle算子的静态分区任务

在ISSUE SPARK-9858中,引入了一个新的参数: spark.sql.adaptive.shuffle.targetPostShuffleInputSize,

后期基于spark adaptive又对这个参数作了进一步加强,能够动态的调整partition数量,尽量保证每一个task处理targetPostShuffleInputSize大小的数据,所以这个参数咱们也能够用来在必定程度上控制生成的文件数量。

2.3 动态分区任务

动态分区任务由于存在着分区这一变量,单纯调整rdd这边的partition数目很难把控总体的文件数量。

在hive里,咱们能够经过设置hive.optimize.sort.dynamic.partition来缓解动态分区产生文件过多致使任务执行时task节点常常oom的情况。这样的参数会引入新的的shuffle,来对数据进行重排序,将相同的partition分给同一个task处理,从而避免了一个task同时持有多个文件句柄。

所以,咱们能够借助这样的思想,使用distribute by语句来修改sql,从而控制文件数量。通常而言,假设咱们想对于每一个分区生成不超过N个文件,则能够在SQL末尾增长DISTRIBUTE BY [动态分区列],ceil(rand() * N)。

3. 自研可合并文件的commitProtocol方案

综上种种,每一个方法都存在必定的弊端,众多规则也在实际使用过程当中对业务方形成很大困扰。

所以咱们产生了想在spark这边实现和hive相似的小文件合并机制。在几个可能的方案选型中,咱们最终选择了:重写spark.sql.sources.commitProtocolClass方法。

一方面,该方案对Spark代码无侵入,便于Spark源码的维护,另外一方面,该方案对业务方使用友好,能够动态经过set命令设置,若是出现问题回滚也十分方便。业务方在使用过程当中,只须要简单设置: spark.sql.sources.commitProtocolClass,便可控制是否开启小文件合并。

在开启小文件合并参数后,咱们会在commit阶段拿到生成的全部文件,引入两个新的job来对这些文件进行处理。首先咱们在第一个job获取到全部大小小于spark.compact.smallfile.size的文件,在查找完成后按照spark.compact.size参数值对组合文件,并在第二个job中对这些文件进行合并。

相关文章
相关标签/搜索