原创文章,转载请务必将下面这段话置于文章开头处。
本文转发自 技术世界, 原文连接 http://www.jasongj.com/spark/committer/本文所述内容均基于 2018年9月17日 Spark 最新 Release 2.3.1 版本,以及 hadoop-2.6.0-cdh-5.4.4json
Spark 输出数据到 HDFS 时,须要解决以下问题:数据结构
本文经过 Local mode 执行以下 Spark 程序详解 commit 原理app
sparkContext.textFile("/json/input.zstd") .map(_.split(",")) .saveAsTextFile("/jason/test/tmp")
在详述 commit 原理前,须要说明几个述语oop
在本文中,会使用以下缩写性能
在启动 Job 以前,Driver 首先经过 FileOutputFormat 的 checkOutputSpecs 方法检查输出目录是否已经存在。若已存在,则直接抛出 FileAlreadyExistsExceptionspa
Job 开始前,由 Driver(本例使用 local mode,所以由 main 线程执行)调用 FileOuputCommitter.setupJob 建立 Application Attempt 目录,即 ${output.dir.root}/_temporary/${appAttempt}线程
由各 Task 执行 FileOutputCommitter.setupTask 方法(本例使用 local mode,所以由 task 线程执行)。该方法不作任何事情,由于 Task 临时目录由 Task 按需建立。
<img width="80%" style="margin: 0 auto;" src="http://www.jasongj.com/img/sp...; alt="Setup task"/>scala
本例中,Task 写数据须要经过 TextOutputFormat 的 getRecordWriter 方法建立 LineRecordWriter。而建立前须要经过 FileOutputFormat.getTaskOutputPath设置 Task 输出路径,即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}
。该 Task Attempt 全部数据均写在该目录下的文件内3d
Task 执行数据写完后,经过 FileOutputCommitter.needsTaskCommit 方法检查是否须要 commit 它写在 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
下的数据。code
检查依据是 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
目录是否存在
若是须要 commit,而且开启了 Output commit coordination,还须要经过 RPC 由 Driver 侧的 OutputCommitCoordinator 判断该 Task Attempt 是否能够 commit
之因此须要由 Driver 侧的 CommitCoordinator 判断是否能够 commit,是由于可能因为 speculation 或者其它缘由(如以前的 TaskAttemp 未被 Kill 成功)存在同一 Task 的多个 Attemp 同时写数据且都申请 commit 的状况。
当申请 commitTask 的 TaskAttempt 为失败的 Attempt,则直接拒绝
若该 TaskAttempt 成功,而且 CommitCoordinator 未容许过该 Task 的其它 Attempt 的 commit 请求,则容许该 TaskAttempt 的 commit 请求
若 CommitCoordinator 以前已容许过该 TaskAttempt 的 commit 请求,则继续赞成该 TaskAttempt 的 commit 请求,即 CommitCoordinator 对该申请的处理是幂等的。
若该 TaskAttempt 成功,且 CommitCoordinator 以前已容许该 Task 的其它 Attempt 的 commit 请求,则直接拒绝当前 TaskAttempt 的 commit 请求
OutputCommitCoordinator 为了实现上述功能,为每一个 ActiveStage 维护一个以下 StageState
private case class StageState(numPartitions: Int) { val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER) val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]() }
该数据结构中,保存了每一个 Task 被容许 commit 的 TaskAttempt。默认值均为 NO_AUTHORIZED_COMMITTER
同时,保存了每一个 Task 的全部失败的 Attempt
当 TaskAttempt 被容许 commit 后,Task (本例因为使用 local model,所以由 task 线程执行)会经过以下方式 commitTask。
当 mapreduce.fileoutputcommitter.algorithm.version
的值为 1 (默认值)时,Task 将 taskAttemptPath 即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
重命令为 committedTaskPath 即 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
若 mapreduce.fileoutputcommitter.algorithm.version
的值为 2,直接将taskAttemptPath 即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
内的全部文件移动到 outputPath 即 ${output.dir.root}/
当全部 Task 都执行成功后,由 Driver (本例因为使用 local model,故由 main 线程执行)执行 FileOutputCommitter.commitJob
若 mapreduce.fileoutputcommitter.algorithm.version
的值为 1,则由 Driver 单线程遍历全部 committedTaskPath 即 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
,并将其下全部文件移动到 finalOutput 即 ${output.dir.root}
下
若 mapreduce.fileoutputcommitter.algorithm.version
的值为 2,则无须移动任何文件。由于全部 Task 的输出文件已在 commitTask 内被移动到 finalOutput 即 ${output.dir.root}
内
全部 commit 过的 Task 输出文件移动到 finalOutput 即 ${output.dir.root}
后,Driver 经过 cleanupJob 删除 ${output.dir.root}/_temporary/
下全部内容
<img width="70%" style="margin: 0 auto;" src="http://www.jasongj.com/img/sp...; alt="Cleanup job"/>
上文所述的 commitTask 与 commitJob 机制,保证了一次 Application Attemp 中不一样 Task 的不一样 Attemp 在 commit 时的数据一致性
而当整个 Application retry 时,在以前的 Application Attemp 中已经成功 commit 的 Task 无须从新执行,其数据可直接恢复
恢复 Task 时,先获取上一次的 Application Attempt,以及对应的 committedTaskPath,即 ${output.dir.root}/_temporary/${preAppAttempt}/${taskAttempt}
若 mapreduce.fileoutputcommitter.algorithm.version
的值为 1,而且 preCommittedTaskPath 存在(说明在以前的 Application Attempt 中该 Task 已被 commit 过),则直接将 preCommittedTaskPath 重命名为 committedTaskPath
若 mapreduce.fileoutputcommitter.algorithm.version
的值为 2,无须恢复任何数据,由于在以前 Application Attempt 中 commit 过的 Task 的数据已经在 commitTask 中被移动到 ${output.dir.root}
中
停止 Task 时,由 Task 调用 FileOutputCommitter.abortTask
方法删除 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
停止 Job 由 Driver 调用 FileOutputCommitter.abortJob
方法完成。该方法经过 FileOutputCommitter.cleanupJob
方法删除 ${output.dir.root}/_temporary
V1 committer(即 mapreduce.fileoutputcommitter.algorithm.version
的值为 1),commit 过程以下
${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
移动到 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
移动到 ${output.dir.root}
,而后建立 _SUCCESS
标记文件${output.dir.root}/_temporary/${preAppAttempt}/${preTaskAttempt}
移动到 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
V2 committer(即 mapreduce.fileoutputcommitter.algorithm.version
的值为 2),commit 过程以下
${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
移动到 ${output.dir.root}
_SUCCESS
标记文件V1 在 Job 执行结束后,在 Driver 端经过 commitJob 方法,单线程串行将全部 Task 的输出文件移动到输出根目录。移动以文件为单位,当 Task 个数较多(大 Job,或者小文件引发的大量小 Task),Name Node RPC 较慢时,该过程耗时较久。在实践中,可能所以发生全部 Task 均执行结束,但 Job 不结束的问题。甚至 commitJob 耗时比 全部 Task 执行时间还要长
而 V2 在 Task 结束后,由 Task 在 commitTask 方法内,将本身的数据文件移动到输出根目录。一方面,Task 结束时即移动文件,不需等待 Job 结束才移动文件,即文件移动更早发起,也更早结束。另外一方面,不一样 Task 间并行移动文件,极大缩短了整个 Job 内全部 Task 的文件移动耗时
V1 只有 Job 结束,才会将数据文件移动到输出根目录,才会对外可见。在此以前,全部文件均在 ${output.dir.root}/_temporary/${appAttempt}
及其子文件内,对外不可见。
当 commitJob 过程耗时较短时,其失败的可能性较小,可认为 V1 的 commit 过程是两阶段提交,要么全部 Task 都 commit 成功,要么都失败。
而因为上文提到的问题, commitJob 过程可能耗时较久,若是在此过程当中,Driver 失败,则可能发生部分 Task 数据被移动到 ${output.dir.root} 对外可见,部分 Task 的数据未及时移动,对外不可见的问题。此时发生了数据不一致性的问题
V2 当 Task 结束时,当即将数据移动到 ${output.dir.root},当即对外可见。若是 Application 执行过程当中失败了,已 commit 的 Task 数据仍然对外可见,而失败的 Task 数据或未被 commit 的 Task 数据对外不可见。也即 V2 更易发生数据一致性问题