Apache Spark Delta Lake 写数据使用及实现原理代码解析

Apache Spark Delta Lake 写数据使用及实现原理代码解析

Delta Lake 写数据是其最基本的功能,并且其使用和现有的 Spark 写 Parquet 文件基本一致,在介绍 Delta Lake 实现原理以前先来看看如何使用它,具体使用以下:sql

df.write.format("delta").save("/data/yangping.wyp/delta/test/")
 
//数据按照 dt 分区
df.write.format("delta").partitionBy("dt").save("/data/yangping.wyp/delta/test/")
 
// 覆盖以前的数据
df.write.format("delta").mode(SaveMode.Overwrite).save("/data/yangping.wyp/delta/test/")

你们能够看出,使用写 Delta 数据是很是简单的,这也是 Delte Lake 介绍的 100% 兼容 Spark。apache

Delta Lake 写数据原理

前面简单了解了如何使用 Delta Lake 来写数据,本小结咱们将深刻介绍 Delta Lake 是如何保证写数据的基本原理以及如何保证事务性。缓存

得益于 Apache Spark 强大的数据源 API,咱们能够很方便的给 Spark 添加任何数据源,Delta Lake 也不例外。Delta Lake 就是使用 DataSource V1 版本的 API 实现的一种新的数据源,咱们调用 df.write.format("delta") 其实底层调用的是 org.apache.spark.sql.delta.sources.DeltaDataSource 类。为了简单起见,本文介绍的是 Delta Lake 批量写的实现,实时流写 Delta Lake 本文不涉及,后面有机会再介绍。 Delta Lake 批量写扩展了 org.apache.spark.sql.sources.CreatableRelationProvider 特质,并实现了其中的方法。咱们调用上面的写数据方法首先会调用 DeltaDataSource 类的 createRelation 方法,它的具体实现以下:微信

override def createRelation(
    sqlContext: SQLContext,
    mode: SaveMode,
    parameters: Map[String, String],
    data: DataFrame): BaseRelation = {
 
  // 写数据的路径
  val path = parameters.getOrElse("path", {
    throw DeltaErrors.pathNotSpecifiedException
  })
 
  // 分区字段
  val partitionColumns = parameters.get(DeltaSourceUtils.PARTITIONING_COLUMNS_KEY)
    .map(DeltaDataSource.decodePartitioningColumns)
    .getOrElse(Nil)
 
 
  // 事务日志对象
  val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path)
 
  // 真正的写操做过程
  WriteIntoDelta(
    deltaLog = deltaLog,
    mode = mode,
    new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf),
    partitionColumns = partitionColumns,
    configuration = Map.empty,
    data = data).run(sqlContext.sparkSession)
 
  deltaLog.createRelation()
}

其中 mode 就是保持数据的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 这个传递的参数,好比分区字段、数据保存路径以及 Delta 支持的一些参数(replaceWhere、mergeSchema、overwriteSchema 等,具体参见 org.apache.spark.sql.delta.DeltaOptions);data 就是咱们须要保存的数据。session

createRelation 方法紧接着就是获取数据保存的路径,分区字段等信息。而后初始化 deltaLog,deltaLog 的初始化会作不少事情,好比会读取磁盘全部的事务日志(_delta_log 目录下),并构建最新事务日志的最新快照,里面能够拿到最新数据的版本。因为 deltaLog 的初始化成本比较高,因此 deltaLog 初始化完以后会缓存到 deltaLogCache 中,这是一个使用 Guava 的 CacheBuilder 类实现的一个缓存,缓存的数据保持一小时,缓存大小能够经过 delta.log.cacheSize 参数进行设置。只要写数据的路径是同样的,就只须要初始化一次 deltaLog,后面直接从缓存中拿便可。除非以前缓存的 deltaLog 被清理了,或者无效才会再次初始化。DeltaLog 类是 Delta Lake 中最重要的类之一,涉及的内容很是多,因此咱们会单独使用一篇文章进行介绍。app

紧接着初始化 WriteIntoDelta,WriteIntoDelta 扩展自 RunnableCommand,Delta Lake 中的更新、删除、合并都是扩展这个类的。初始化完 WriteIntoDelta 以后,就会调用 run 方法执行真正的写数据操做。WriteIntoDelta 的 run 方法实现以下:ide

override def run(sparkSession: SparkSession): Seq[Row] = {
    deltaLog.withNewTransaction { txn =>
      val actions = write(txn, sparkSession)
      val operation = DeltaOperations.Write(mode, Option(partitionColumns), options.replaceWhere)
      txn.commit(actions, operation)
    }
    Seq.empty
}

Delta Lake 全部的更新操做都是在事务中进行的,deltaLog.withNewTransaction 就是一个事务,withNewTransaction 的实现以下:函数

def withNewTransaction[T](thunk: OptimisticTransaction => T): T = {
  try {
    // 更新当前表事务日志的快照
    update()
    // 初始化乐观事务锁对象
    val txn = new OptimisticTransaction(this)
    // 开启事务
    OptimisticTransaction.setActive(txn)
    // 执行写数据操做
    thunk(txn)
  } finally {
    // 关闭事务
    OptimisticTransaction.clearActive()
  }
}

在开启事务以前,须要更新当前表事务的快照,由于在执行写数据以前,这张表可能已经被修改了,执行 update 操做以后,就能够拿到当前表的最新版本,紧接着开启乐观事务锁。thunk(txn) 就是须要执行的事务操做,对应 deltaLog.withNewTransaction 里面的全部代码。oop

咱们回到上面的 run 方法。val actions = write(txn, sparkSession) 就是执行写数据的操做,它的实现以下:ui

  def write(txn: OptimisticTransaction, sparkSession: SparkSession): Seq[Action] = {
    import sparkSession.implicits._
    // 若是不是第一次往表里面写数据,须要判断写数据的模式是否符合条件
    if (txn.readVersion > -1) {
      // This table already exists, check if the insert is valid.
      if (mode == SaveMode.ErrorIfExists) {
        throw DeltaErrors.pathAlreadyExistsException(deltaLog.dataPath)
      } else if (mode == SaveMode.Ignore) {
        return Nil
      } else if (mode == SaveMode.Overwrite) {
        deltaLog.assertRemovable()
      }
    }
 
    // 更新表的模式,好比是否覆盖现有的模式,是否和现有的模式进行 merge
    updateMetadata(txn, data, partitionColumns, configuration, isOverwriteOperation)
 
    // 是否认义分区过滤条件
    val replaceWhere = options.replaceWhere
    val partitionFilters = if (replaceWhere.isDefined) {
      val predicates = parsePartitionPredicates(sparkSession, replaceWhere.get)
      if (mode == SaveMode.Overwrite) {
        verifyPartitionPredicates(
          sparkSession, txn.metadata.partitionColumns, predicates)
      }
      Some(predicates)
    } else {
      None
    }
 
    // 第一次写数据初始化事务日志的目录
    if (txn.readVersion < 0) {
      // Initialize the log path
      deltaLog.fs.mkdirs(deltaLog.logPath)
    }
 
    // 写数据到文件系统中
    val newFiles = txn.writeFiles(data, Some(options))
     
    val deletedFiles = (mode, partitionFilters) match {
       // 全量覆盖,直接拿出缓存在内存中最新事务日志快照里面的全部 AddFile 文件
      case (SaveMode.Overwrite, None) =>
        txn.filterFiles().map(_.remove)
      // 从事务日志快照中获取对应分区里面的全部 AddFile 文件
      case (SaveMode.Overwrite, Some(predicates)) =>
        // Check to make sure the files we wrote out were actually valid.
        val matchingFiles = DeltaLog.filterFileList(
          txn.metadata.partitionColumns, newFiles.toDF(), predicates).as[AddFile].collect()
        val invalidFiles = newFiles.toSet -- matchingFiles
        if (invalidFiles.nonEmpty) {
          val badPartitions = invalidFiles
            .map(_.partitionValues)
            .map { _.map { case (k, v) => s"$k=$v" }.mkString("/") }
            .mkString(", ")
          throw DeltaErrors.replaceWhereMismatchException(replaceWhere.get, badPartitions)
        }
 
        txn.filterFiles(predicates).map(_.remove)
      case _ => Nil
    }
 
    newFiles ++ deletedFiles
  }
}

若是 txn.readVersion == -1,说明是第一次写数据到 Delta Lake 表,因此当这个值大于 -1 的时候,须要判断一下写数据的操做是否合法。
因为 Delta Lake 底层使用的是 Parquet 格式,因此 Delta Lake 表也支持模式的增长合并等,这就是 updateMetadata 函数对应的操做。
由于 Delta Lake 表支持分区,因此咱们可能在写数据的时候指定某个分区进行覆盖。
真正写数据的操做是 txn.writeFiles 函数执行的,具体实现以下:

def writeFiles(
      data: Dataset[_],
      writeOptions: Option[DeltaOptions],
      isOptimize: Boolean): Seq[AddFile] = {
    hasWritten = true
 
    val spark = data.sparkSession
    val partitionSchema = metadata.partitionSchema
    val outputPath = deltaLog.dataPath
 
    val (queryExecution, output) = normalizeData(data, metadata.partitionColumns)
    val partitioningColumns =
      getPartitioningColumns(partitionSchema, output, output.length < data.schema.size)
 
    // 获取 DelayedCommitProtocol,里面能够设置写文件的名字,
    // commitTask 和 commitJob 等作一些事情
    val committer = getCommitter(outputPath)
 
    val invariants = Invariants.getFromSchema(metadata.schema, spark)
 
    SQLExecution.withNewExecutionId(spark, queryExecution) {
      val outputSpec = FileFormatWriter.OutputSpec(
        outputPath.toString,
        Map.empty,
        output)
 
      val physicalPlan = DeltaInvariantCheckerExec(queryExecution.executedPlan, invariants)
 
      FileFormatWriter.write(
        sparkSession = spark,
        plan = physicalPlan,
        fileFormat = snapshot.fileFormat,
        committer = committer,
        outputSpec = outputSpec,
        hadoopConf = spark.sessionState.newHadoopConfWithOptions(metadata.configuration),
        partitionColumns = partitioningColumns,
        bucketSpec = None,
        statsTrackers = Nil,
        options = Map.empty)
    }
 
    // 返回新增的文件
    committer.addedStatuses
}

Delta Lake 写操做最终调用 Spark 的 FileFormatWriter.write 方法进行的,经过这个方法的复用将咱们真正的数据写入到 Delta Lake 表里面去了。
在 Delta Lake 中,若是是新增文件则会在事务日志中使用 AddFile 类记录相关的信息,AddFile 持久化到事务日志里面的内容以下:

{"add":{"path":"dt=20190801/part-00001-bdff67f3-c70f-4817-898d-15a73c93271a.c000.snappy.parquet","partitionValues":{"dt":"20190801"},"size":429,"modificationTime":1566990855000,"dataChange":true}}

能够看出 AddFile 里面记录了新增文件的保存路径,分区信息,新增的文件大小,修改时间等信息。若是是删除文件,也会在事务日志里面记录这个删除操做,对应的就是使用 RemoveFile 类存储,RemoveFile 持久化到事务日志里面的内容以下:

{"remove":{"path":"dt=20190801/part-00001-7f3fe89d-e55b-4848-93ea-4133b5d406d6.c000.snappy.parquet","deletionTimestamp":1566990856332,"dataChange":true}}

RemoveFile 里面保存了删除文件的路径,删除时间等信息。若是新增一个文件,再删除一个文件,那么最新的事务日志快照里面只会保存删除这个文件的记录。从这里面也能够看出, Delta Lake 删除、新增 ACID 是针对文件级别的。

上面的写操做确定会产生新的文件,因此写操做以后就须要拿到新增的文件(val newFiles = txn.writeFiles(data, Some(options)) )newFiles(AddFile) 和须要删除的文件(RemoveFile)。针对那些文件须要删除须要作一些判断,主要分两种状况(具体参见 write 方法里面的):

  • 若是是全表覆盖,则直接从缓存在内存中最新的事务日志快照中拿出全部 AddFile 文件,而后将其标记为 RemoveFile;
  • 若是是分区内的覆盖,则从缓存在内存中最新的事务日志快照中拿出对应分区下的 AddFile 文件,而后将其标记为 RemoveFile。

最后 write 方法返回新增的文件和须要删除的文件(newFiles ++ deletedFiles),这些文件最终须要记录到事务日志里面去。关于事务日志是如何写进去的请参见这篇文章的详细分析。

写在最后

为了营造一个开放的Cassandra技术交流环境,社区创建了微信公众号和钉钉群。为广大用户提供专业的技术分享及问答,按期开展专家技术直播,欢迎你们加入。另云Cassandra免费火爆公测中,欢迎试用:https://www.aliyun.com/product/cds

 

阅读原文

本文为云栖社区原创内容,未经容许不得转载。

相关文章
相关标签/搜索