浅析Hive/Spark SQL读文件时的输入任务划分

做者:
王道远,花名健身,阿里云EMR技术专家,Apache Spark活跃贡献者,主要关注大数据计算优化相关工做。sql


Hive以及Spark SQL等大数据计算引擎为咱们操做存储在HDFS上结构化数据提供了易于上手的SQL接口,大大下降了ETL等操做的门槛,也所以在实际生产中有着普遍的应用。SQL是非过程化语言,咱们写SQL的时候并不能控制具体的执行过程,它们依赖执行引擎决定。而Hive和Spark SQL做为Map-Reduce模型的分布式执行引擎,其执行过程首先就涉及到如何将输入数据切分红一个个任务,分配给不一样的Map任务。在本文中,咱们就来说解Hive和Spark SQL是如何切分输入路径的。微信

Hive

Hive是起步较早的SQL on Hadoop项目,最先也是诞生于Hadoop中,因此输入划分这部分的代码与Hadoop相关度很是高。如今Hive广泛使用的输入格式是CombineHiveInputFormat,它继承于HiveInputFormat,而HiveInputFormat实现了Hadoop的InputFormat接口,其中的getSplits方法用来获取具体的划分结果,划分出的一份输入数据被称为一个“Split”。在执行时,每一个Split对应到一个map任务。在划分Split时,首先挑出不能合并到一块儿的目录——好比开启了事务功能的路径。这些不能合并的目录必须单独处理,剩下的路径交给私有方法getCombineSplits,这样Hive的一个map task最多能够处理多个目录下的文件。在实际操做中,咱们通常只要经过set mapred.max.split.size=xx;便可控制文件合并的大小。当一个文件过大时,父类的getSplits也会帮咱们完成相应的切分工做。session

Spark SQL

Spark的表有两种:DataSource表和Hive表。另外Spark后续版本中DataSource V2也将逐渐流行,目前还在不断发展中,暂时就不在这里讨论。咱们知道Spark SQL其实底层是Spark RDD,而RDD执行时,每一个map task会处理RDD的一个Partition中的数据(注意这里的Partition是RDD的概念,要和表的Partition进行区分)。所以,Spark SQL做业的任务切分关键在于底层RDD的partition如何切分。并发

Data Source表

Spark SQL的DataSource表在最终执行的RDD类为FileScanRDD,由FileSourceScanExec建立出来。在建立这种RDD的时候,具体的Partition直接做为参数传给了构造函数,所以划分输入的方法也在DataSourceScanExec.scala文件中。具体分两步:首先把文件划分为PartitionFile,再将较小的PartitionFile进行合并。分布式

第一步部分代码以下:ide

if (fsRelation.fileFormat.isSplitable(
    fsRelation.sparkSession, fsRelation.options, file.getPath)) {
    (0L until file.getLen by maxSplitBytes).map { offset =>
    val remaining = file.getLen - offset
    val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
    val hosts = getBlockHosts(blockLocations, offset, size)
    PartitionedFile(
      partition.values, file.getPath.toUri.toString,
      offset, size, partitionDeleteDeltas, hosts)
    }
  } else {
    val hosts = getBlockHosts(blockLocations, 0, file.getLen)
    Seq(PartitionedFile(partition.values, file.getPath.toUri.toString,
    0, file.getLen, partitionDeleteDeltas, hosts))
  }

咱们能够看出,Spark SQL首先根据文件类型判断单个文件是否可以切割,若是能够则按maxSplitBytes进行切割。若是一个文件剩余部分没法填满maxSplitBytes,也单独做为一个Partition。函数

第二部分代码以下所示:oop

splitFiles.foreach { file =>
    if (currentSize + file.length > maxSplitBytes) {
      closePartition()
    }
    // Add the given file to the current partition.
    currentSize += file.length + openCostInBytes
    currentFiles += file
  }

这样咱们就能够依次遍历第一步切好的块,再按照maxSplitBytes进行合并。注意合并文件时还需加上打开文件的预估代价openCostInBytes。那么maxSplitBytesopenCostInBytes这两个关键参数怎么来的呢?布局

val defaultMaxSplitBytes =
    fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
  val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
  val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
  val bytesPerCore = totalBytes / defaultParallelism

  val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

不难看出,主要是spark.sql.files.maxPartitionBytesspark.sql.files.openCostInBytes、调度器默认并发度以及全部输入文件实际大小所控制。大数据

Hive表

Spark SQL中的Hive表底层的RDD类为HadoopRDD,由HadoopTableReader类实现。不过此次,具体的Partition划分仍是依赖HadoopRDDgetPartitions方法,具体实现以下:

override def getPartitions: Array[Partition] = {
    ...
    try {
      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
      val inputSplits = if (ignoreEmptySplits) {
        allInputSplits.filter(_.getLength > 0)
      } else {
        allInputSplits
      }
      val array = new Array[Partition](inputSplits.size)
      for (i <- 0 until inputSplits.size) {
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
    } catch {
      ...
    }
  }

不难看出,在处理Hive表的时候,Spark SQL把任务划分又交给了Hadoop的InputFormat那一套。不过须要注意的是,并非全部Hive表都归为这一类,Spark SQL会默认对ORC和Parquet的表进行转化,用本身的Data Source实现OrcFileFormatParquetFileFormat来把这两种表做为Data Source表来处理。

总结

切分输入路径只是大数据处理的第一步,虽然不起眼,可是也绝对不可或缺。低效的文件划分可能会给端到端的执行速度带来巨大的负面影响,更有可能影响到输出做业的文件布局,从而影响到整个数据流水线上全部做业的执行效率。万事开头难,为程序输入选择合适的配置参数,能够有效改善程序执行效率。

留个思考题给读者们:如何设置参数完全关闭Spark SQL data source表的文件合并?

 

本文分享自微信公众号 - Apache Spark技术交流社区(E-MapReduce_Spark)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索