delta lake 包含Protocol、Metadata、FileAction(AddFile、RemoveFile)、CommitInfo和SetTransaction这几种元数据action。json
//初始的commit log会包含protocol和metaData的信息 {"commitInfo":{"timestamp":1576480709055,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}} {"protocol":{"minReaderVersion":1,"minWriterVersion":2}} {"metaData":{"id":"fe0948b9-8253-4942-9e28-3a89321a004d","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"azkaban_tag\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"project_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"flow_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"job_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"application_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"queue_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"master_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1576480707164}} {"add":{"path":"part-00000-dc1d1431-1e4b-4337-b111-6a447bad15fc-c000.snappy.parquet","partitionValues":{},"size":1443338,"modificationTime":1576480711000,"dataChange":true}} //以后的commit log会记录下当前操做的信息 {"commitInfo":{"timestamp":1576481270646,"operation":"DELETE","operationParameters":{"predicate":"[\"(`master_name` = 'mob_analyse')\"]"},"readVersion":0,"isBlindAppend":false}} {"remove":{"path":"part-00000-dc1d1431-1e4b-4337-b111-6a447bad15fc-c000.snappy.parquet","deletionTimestamp":1576481270643,"dataChange":true}} {"add":{"path":"part-00000-d6431884-390d-4837-865c-f6e52f0e2cf5-c000.snappy.parquet","partitionValues":{},"size":1430267,"modificationTime":1576481273000,"dataChange":true}}
当存在checkpoint文件时,DeltaLog类的currentSnapshot会根据checkpoint和以后的json日志来计算快照。session
当没有checkpoint文件时,经过DeltaLog类的update方法来计算快照。并发
@volatile private var currentSnapshot: Snapshot = lastCheckpoint.map { c => val checkpointFiles = c.parts .map(p => checkpointFileWithParts(logPath, c.version, p)) //目前版本没用到parts,疑似商业版功能 .getOrElse(Seq(checkpointFileSingular(logPath, c.version))) //返回最新checkpoint文件路径 val deltas = store.listFrom(deltaFile(logPath, c.version + 1)) //返回checkpoint以后版本的json文件 .filter(f => isDeltaFile(f.getPath)) .toArray val deltaVersions = deltas.map(f => deltaVersion(f.getPath)) verifyDeltaVersions(deltaVersions) //验证版本日志是否连续 val newVersion = deltaVersions.lastOption.getOrElse(c.version) logInfo(s"Loading version $newVersion starting from checkpoint ${c.version}") try { val deltaIndex = DeltaLogFileIndex(DeltaLog.COMMIT_FILE_FORMAT, deltas) val checkpointIndex = DeltaLogFileIndex(DeltaLog.CHECKPOINT_FILE_FORMAT, fs, checkpointFiles) val snapshot = new Snapshot( //建立快照 logPath, newVersion, None, checkpointIndex :: deltaIndex :: Nil, minFileRetentionTimestamp, this, // we don't want to make an additional RPC here to get commit timestamps when "deltas" is // empty. The next "update" call will take care of that if there are delta files. deltas.lastOption.map(_.getModificationTime).getOrElse(-1L)) validateChecksum(snapshot) //经过crc文件校验版本,可是目前delta版本并无生成crc文件,后续会更新或者又是商业版的坑? lastUpdateTimestamp = clock.getTimeMillis() snapshot } catch { case e: FileNotFoundException if Option(e.getMessage).exists(_.contains("parquet does not exist")) => recordDeltaEvent(this, "delta.checkpoint.error.partial") throw DeltaErrors.missingPartFilesException(c, e) case e: AnalysisException if Option(e.getMessage).exists(_.contains("Path does not exist")) => recordDeltaEvent(this, "delta.checkpoint.error.partial") throw DeltaErrors.missingPartFilesException(c, e) } }.getOrElse { new Snapshot(logPath, -1, None, Nil, minFileRetentionTimestamp, this, -1L) //没有checkpoint文件时,从头开始读delta log计算 }
// Reconstruct the state by applying deltas in order to the checkpoint. // We partition by path as it is likely the bulk of the data is add/remove. // Non-path based actions will be collocated to a single partition. private val stateReconstruction = { val implicits = spark.implicits import implicits._ val numPartitions = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_SNAPSHOT_PARTITIONS) val checkpointData = previousSnapshot.getOrElse(emptyActions) val deltaData = load(files) val allActions = checkpointData.union(deltaData) val time = minFileRetentionTimestamp val hadoopConf = new SerializableConfiguration(spark.sessionState.newHadoopConf()) val logPath = path.toUri // for serializability allActions.as[SingleAction] .mapPartitions { actions => val hdpConf = hadoopConf.value actions.flatMap { _.unwrap match { case add: AddFile => Some(add.copy(path = canonicalizePath(add.path, hdpConf)).wrap) case rm: RemoveFile => Some(rm.copy(path = canonicalizePath(rm.path, hdpConf)).wrap) case other if other == null => None case other => Some(other.wrap) } } } .withColumn("file", assertLogBelongsToTable(logPath)(input_file_name())) .repartition(numPartitions, coalesce($"add.path", $"remove.path")) .sortWithinPartitions("file") .as[SingleAction] .mapPartitions { iter => val state = new InMemoryLogReplay(time) state.append(0, iter.map(_.unwrap)) state.checkpoint.map(_.wrap) } }
日志的提交是在OptimisticTransactionImpl的commit()中实现的。app
/** * Modifies the state of the log by adding a new commit that is based on a read at * the given `lastVersion`. In the case of a conflict with a concurrent writer this * method will throw an exception. * * @param actions Set of actions to commit * @param op Details of operation that is performing this transactional commit */ @throws(classOf[ConcurrentModificationException]) def commit(actions: Seq[Action], op: DeltaOperations.Operation): Long = recordDeltaOperation( deltaLog, "delta.commit") { val version = try { // Try to commit at the next version. var finalActions = prepareCommit(actions, op) //各类检查 // Find the isolation level to use for this commit val noDataChanged = actions.collect { case f: FileAction => f.dataChange }.forall(_ == false) val isolationLevelToUse = if (noDataChanged) { //0.5版本新特性,很简单的隔离等级断定,writeIsolation尚未使用,等后续更新吧 // If no data has changed (i.e. its is only being rearranged), then SnapshotIsolation // provides Serializable guarantee. Hence, allow reduced conflict detection by using // SnapshotIsolation of what the table isolation level is. SnapshotIsolation } else { Serializable } val isBlindAppend = { //判断是否不读取delta数据且全部的文件操做都是AddFile val dependsOnFiles = readPredicates.nonEmpty || readFiles.nonEmpty val onlyAddFiles = finalActions.collect { case f: FileAction => f }.forall(_.isInstanceOf[AddFile]) onlyAddFiles && !dependsOnFiles } if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED)) { //默认会将commitInfo记录到commit log里 commitInfo = CommitInfo( clock.getTimeMillis(), op.name, op.jsonEncodedValues, Map.empty, Some(readVersion).filter(_ >= 0), None, Some(isBlindAppend)) finalActions = commitInfo +: finalActions } // Register post-commit hooks if any lazy val hasFileActions = finalActions.collect { case f: FileAction => f }.nonEmpty if (DeltaConfigs.SYMLINK_FORMAT_MANIFEST_ENABLED.fromMetaData(metadata) && hasFileActions) { registerPostCommitHook(GenerateSymlinkManifest) //生成manifest支持Presto和Athena } val commitVersion = doCommit(snapshot.version + 1, finalActions, 0, isolationLevelToUse) //提交action日志 logInfo(s"Committed delta #$commitVersion to ${deltaLog.logPath}") postCommit(commitVersion, finalActions) //检测是否合并checkpoint commitVersion } catch { case e: DeltaConcurrentModificationException => recordDeltaEvent(deltaLog, "delta.commit.conflict." + e.conflictType) throw e case NonFatal(e) => recordDeltaEvent( deltaLog, "delta.commit.failure", data = Map("exception" -> Utils.exceptionString(e))) throw e } runPostCommitHooks(version, actions) //0.5版本新特性,用来支持Presto和Amazon Athena version }
(具体代码在OptimisticTransaction.scala的checkAndRetry方法里,有兴趣的能够看一下)ide
调用DeltaTable里的delete方法能够删除知足指定条件的数据。oop
private def performDelete( sparkSession: SparkSession, deltaLog: DeltaLog, txn: OptimisticTransaction) = { import sparkSession.implicits._ var numTouchedFiles: Long = 0 var numRewrittenFiles: Long = 0 var scanTimeMs: Long = 0 var rewriteTimeMs: Long = 0 val startTime = System.nanoTime() val numFilesTotal = deltaLog.snapshot.numOfFiles val deleteActions: Seq[Action] = condition match { case None => //没有限定条件,需删除整张表,此时遍历全部文件,而后删除就行 // Case 1: Delete the whole table if the condition is true val allFiles = txn.filterFiles(Nil) numTouchedFiles = allFiles.size scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 val operationTimestamp = System.currentTimeMillis() allFiles.map(_.removeWithTimestamp(operationTimestamp)) //逻辑删除数据文件 case Some(cond) => //有条件就须要区分不一样状况了 val (metadataPredicates, otherPredicates) = DeltaTableUtils.splitMetadataAndDataPredicates( //将条件解析成能用元数据定位的和其余 cond, txn.metadata.partitionColumns, sparkSession) if (otherPredicates.isEmpty) { //第一种状况,只使用元数据就能定位全部数据 // Case 2: The condition can be evaluated using metadata only. // Delete a set of files without the need of scanning any data files. val operationTimestamp = System.currentTimeMillis() val candidateFiles = txn.filterFiles(metadataPredicates) //返回涉及到的文件 scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 numTouchedFiles = candidateFiles.size candidateFiles.map(_.removeWithTimestamp(operationTimestamp)) //删除 } else { //第二种状况,须要把文件中不须要删除的数据重写一份 // Case 3: Delete the rows based on the condition. val candidateFiles = txn.filterFiles(metadataPredicates ++ otherPredicates) numTouchedFiles = candidateFiles.size val nameToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, candidateFiles) //生成重写后的文件名和对应的AddFile action val fileIndex = new TahoeBatchFileIndex( sparkSession, "delete", candidateFiles, deltaLog, tahoeFileIndex.path, txn.snapshot) // Keep everything from the resolved target except a new TahoeFileIndex // that only involves the affected files instead of all files. val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex) //替换文件索引,更新LogicalPlan val data = Dataset.ofRows(sparkSession, newTarget) val filesToRewrite = withStatusCode("DELTA", s"Finding files to rewrite for DELETE operation") { if (numTouchedFiles == 0) { Array.empty[String] } else { data.filter(new Column(cond)).select(new Column(InputFileName())).distinct() .as[String].collect() } } scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 if (filesToRewrite.isEmpty) { // Case 3.1: no row matches and no delete will be triggered Nil } else { // Case 3.2: some files need an update to remove the deleted files // Do the second pass and just read the affected files val baseRelation = buildBaseRelation( sparkSession, txn, "delete", tahoeFileIndex.path, filesToRewrite, nameToAddFileMap) // Keep everything from the resolved target except a new TahoeFileIndex // that only involves the affected files instead of all files. val newTarget = DeltaTableUtils.replaceFileIndex(target, baseRelation.location) val targetDF = Dataset.ofRows(sparkSession, newTarget) val filterCond = Not(EqualNullSafe(cond, Literal(true, BooleanType))) val updatedDF = targetDF.filter(new Column(filterCond)) val rewrittenFiles = withStatusCode( "DELTA", s"Rewriting ${filesToRewrite.size} files for DELETE operation") { txn.writeFiles(updatedDF) //写文件 } numRewrittenFiles = rewrittenFiles.size rewriteTimeMs = (System.nanoTime() - startTime) / 1000 / 1000 - scanTimeMs val operationTimestamp = System.currentTimeMillis() removeFilesFromPaths(deltaLog, nameToAddFileMap, filesToRewrite, operationTimestamp) ++ //删文件 rewrittenFiles //写文件 } } } if (deleteActions.nonEmpty) { txn.commit(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq)) //提交commit日志 } recordDeltaEvent( //记录本次操做的详细信息 deltaLog, "delta.dml.delete.stats", data = DeleteMetric( condition = condition.map(_.sql).getOrElse("true"), numFilesTotal, numTouchedFiles, numRewrittenFiles, scanTimeMs, rewriteTimeMs) ) }
调用DeltaTable里的update()方法能够更新知足指定条件的数据。(和delete有些类似)源码分析
(关键代码详见UpdateCommand.scala的performUpdate方法,和delete类似)post
DeltaTable里merge直接调用DeltaMergeBuilder方法,后续的whenMatched和whenNotMatched都是向mergeBuilder里面添加从句,最后使用execute()启动执行;ui
updateAll也是一样的流程,只是MergeIntoClause.toActions(Nil, Nil)参数为空(相似于update set * ),后续execute时resolveClause方法会予以解析。
private def addUpdateClause(set: Map[String, Column]): DeltaMergeBuilder = { if (set.isEmpty && matchCondition.isEmpty) { // Nothing to update = no need to add an update clause mergeBuilder } else { val setActions = set.toSeq val updateActions = MergeIntoClause.toActions( //转化为MergeAction colNames = setActions.map(x => UnresolvedAttribute.quotedString(x._1)), exprs = setActions.map(x => x._2.expr), isEmptySeqEqualToStar = false) val updateClause = MergeIntoUpdateClause(matchCondition.map(_.expr), updateActions) //和条件一块儿打包 mergeBuilder.withClause(updateClause) //加到mergeBuilder里 } }
whenMatched时能够执行delete操做,直接用MergeIntoDeleteClause封装一下matchCondition,而后withClause添加进mergeBuilder;
/** Delete a matched row from the table */ def delete(): DeltaMergeBuilder = { val deleteClause = MergeIntoDeleteClause(matchCondition.map(_.expr)) mergeBuilder.withClause(deleteClause) }
whenNotMatched时能够执行insert操做,流程相似update,MergeIntoClause.toActions转化,MergeIntoInsertClause封装,而后添加到mergeBuilder里;
private def addInsertClause(setValues: Map[String, Column]): DeltaMergeBuilder = { val values = setValues.toSeq val insertActions = MergeIntoClause.toActions( colNames = values.map(x => UnresolvedAttribute.quotedString(x._1)), exprs = values.map(x => x._2.expr), isEmptySeqEqualToStar = false) val insertClause = MergeIntoInsertClause(notMatchCondition.map(_.expr), insertActions) mergeBuilder.withClause(insertClause) }
def execute(): Unit = { val sparkSession = targetTable.toDF.sparkSession val resolvedMergeInto = MergeInto.resolveReferences(mergePlan)(tryResolveReferences(sparkSession) _) //解析 if (!resolvedMergeInto.resolved) { throw DeltaErrors.analysisException("Failed to resolve\n", plan = Some(resolvedMergeInto)) } // Preprocess the actions and verify val mergeIntoCommand = PreprocessTableMerge(sparkSession.sessionState.conf)(resolvedMergeInto) //封装 sparkSession.sessionState.analyzer.checkAnalysis(mergeIntoCommand) //检查LogicalPlan mergeIntoCommand.run(sparkSession) //执行 }