Spark CommitCoordinator 保证数据一致性

原创文章,转载请务必将下面这段话置于文章开头处。
本文转发自 技术世界原文连接  http://www.jasongj.com/spark/committer/

本文所述内容均基于 2018年9月17日 Spark 最新 Release 2.3.1 版本,以及 hadoop-2.6.0-cdh-5.4.4json

概述

Spark 输出数据到 HDFS 时,须要解决以下问题:数据结构

  • 因为多个 Task 同时写数据到 HDFS,如何保证要么全部 Task 写的全部文件要么同时对外可见,要么同时对外不可见,即保证数据一致性
  • 同一 Task 可能由于 Speculation 而存在两个彻底相同的 Task 实例写相同的数据到 HDFS中,如何保证只有一个 commit 成功
  • 对于大 Job(如具备几万甚至几十万 Task),如何高效管理全部文件

commit 原理

本文经过 Local mode 执行以下 Spark 程序详解 commit 原理app

sparkContext.textFile("/json/input.zstd")
  .map(_.split(","))
  .saveAsTextFile("/jason/test/tmp")

在详述 commit 原理前,须要说明几个述语oop

  • Task,即某个 Application 的某个 Job 内的某个 Stage 的一个 Task
  • TaskAttempt,Task 每次执行都视为一个 TaskAttempt。对于同一个 Task,可能同时存在多个 TaskAttemp
  • Application Attempt,即 Application 的一次执行

在本文中,会使用以下缩写性能

  • ${output.dir.root} 即输出目录根路径
  • ${appAttempt} 即 Application Attempt ID,为整型,从 0 开始
  • ${taskAttemp} 即 Task Attetmp ID,为整型,从 0 开始

检查 Job 输出目录

在启动 Job 以前,Driver 首先经过 FileOutputFormat 的 checkOutputSpecs 方法检查输出目录是否已经存在。若已存在,则直接抛出 FileAlreadyExistsException
Check output pathspa

Driver执行setupJob

Job 开始前,由 Driver(本例使用 local mode,所以由 main 线程执行)调用 FileOuputCommitter.setupJob 建立 Application Attempt 目录,即 ${output.dir.root}/_temporary/${appAttempt}
Setup job线程

Task执行setupTask

由各 Task 执行 FileOutputCommitter.setupTask 方法(本例使用 local mode,所以由 task 线程执行)。该方法不作任何事情,由于 Task 临时目录由 Task 按需建立。
<img width="80%" style="margin: 0 auto;" src="http://www.jasongj.com/img/sp...; alt="Setup task"/>scala

按需建立 Task 目录

本例中,Task 写数据须要经过 TextOutputFormatgetRecordWriter 方法建立 LineRecordWriter。而建立前须要经过 FileOutputFormat.getTaskOutputPath设置 Task 输出路径,即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}。该 Task Attempt 全部数据均写在该目录下的文件内
Create task output file3d

检查是否须要 commit

Task 执行数据写完后,经过 FileOutputCommitter.needsTaskCommit 方法检查是否须要 commit 它写在 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 下的数据。code

检查依据是 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 目录是否存在
Need commmit task

若是须要 commit,而且开启了 Output commit coordination,还须要经过 RPC 由 Driver 侧的 OutputCommitCoordinator 判断该 Task Attempt 是否能够 commit
Need commmit task detail

之因此须要由 Driver 侧的 CommitCoordinator 判断是否能够 commit,是由于可能因为 speculation 或者其它缘由(如以前的 TaskAttemp 未被 Kill 成功)存在同一 Task 的多个 Attemp 同时写数据且都申请 commit 的状况。

CommitCoordinator

当申请 commitTask 的 TaskAttempt 为失败的 Attempt,则直接拒绝

若该 TaskAttempt 成功,而且 CommitCoordinator 未容许过该 Task 的其它 Attempt 的 commit 请求,则容许该 TaskAttempt 的 commit 请求

若 CommitCoordinator 以前已容许过该 TaskAttempt 的 commit 请求,则继续赞成该 TaskAttempt 的 commit 请求,即 CommitCoordinator 对该申请的处理是幂等的。

若该 TaskAttempt 成功,且 CommitCoordinator 以前已容许该 Task 的其它 Attempt 的 commit 请求,则直接拒绝当前 TaskAttempt 的 commit 请求
Coordinator handle request

OutputCommitCoordinator 为了实现上述功能,为每一个 ActiveStage 维护一个以下 StageState

private case class StageState(numPartitions: Int) {
  val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER)
  val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]()
  }

该数据结构中,保存了每一个 Task 被容许 commit 的 TaskAttempt。默认值均为 NO_AUTHORIZED_COMMITTER

同时,保存了每一个 Task 的全部失败的 Attempt

commitTask

当 TaskAttempt 被容许 commit 后,Task (本例因为使用 local model,所以由 task 线程执行)会经过以下方式 commitTask。

mapreduce.fileoutputcommitter.algorithm.version 的值为 1 (默认值)时,Task 将 taskAttemptPath 即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 重命令为 committedTaskPath 即 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
Commit task v1

mapreduce.fileoutputcommitter.algorithm.version 的值为 2,直接将taskAttemptPath 即 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 内的全部文件移动到 outputPath 即 ${output.dir.root}/
Commit task v2

commitJob

当全部 Task 都执行成功后,由 Driver (本例因为使用 local model,故由 main 线程执行)执行 FileOutputCommitter.commitJob

mapreduce.fileoutputcommitter.algorithm.version 的值为 1,则由 Driver 单线程遍历全部 committedTaskPath 即 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt},并将其下全部文件移动到 finalOutput 即 ${output.dir.root}
Commit job v1

mapreduce.fileoutputcommitter.algorithm.version 的值为 2,则无须移动任何文件。由于全部 Task 的输出文件已在 commitTask 内被移动到 finalOutput 即 ${output.dir.root}
Commit job v2

全部 commit 过的 Task 输出文件移动到 finalOutput 即 ${output.dir.root} 后,Driver 经过 cleanupJob 删除 ${output.dir.root}/_temporary/ 下全部内容
<img width="70%" style="margin: 0 auto;" src="http://www.jasongj.com/img/sp...; alt="Cleanup job"/>

recoverTask

上文所述的 commitTask 与 commitJob 机制,保证了一次 Application Attemp 中不一样 Task 的不一样 Attemp 在 commit 时的数据一致性

而当整个 Application retry 时,在以前的 Application Attemp 中已经成功 commit 的 Task 无须从新执行,其数据可直接恢复

恢复 Task 时,先获取上一次的 Application Attempt,以及对应的 committedTaskPath,即 ${output.dir.root}/_temporary/${preAppAttempt}/${taskAttempt}

mapreduce.fileoutputcommitter.algorithm.version 的值为 1,而且 preCommittedTaskPath 存在(说明在以前的 Application Attempt 中该 Task 已被 commit 过),则直接将 preCommittedTaskPath 重命名为 committedTaskPath

mapreduce.fileoutputcommitter.algorithm.version 的值为 2,无须恢复任何数据,由于在以前 Application Attempt 中 commit 过的 Task 的数据已经在 commitTask 中被移动到 ${output.dir.root}
Recover task

abortTask

停止 Task 时,由 Task 调用 FileOutputCommitter.abortTask 方法删除 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
Abort task

abortJob

停止 Job 由 Driver 调用 FileOutputCommitter.abortJob 方法完成。该方法经过 FileOutputCommitter.cleanupJob 方法删除 ${output.dir.root}/_temporary

总结

V1 vs. V2 committer 过程

V1 committer(即 mapreduce.fileoutputcommitter.algorithm.version 的值为 1),commit 过程以下

  • Task 线程将 TaskAttempt 数据写入 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
  • commitTask 由 Task 线程将 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 移动到 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
  • commitJob 由 Driver 单线程依次将全部 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt} 移动到 ${output.dir.root},而后建立 _SUCCESS 标记文件
  • recoverTask 由 Task 线程将 ${output.dir.root}/_temporary/${preAppAttempt}/${preTaskAttempt} 移动到 ${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}

V2 committer(即 mapreduce.fileoutputcommitter.algorithm.version 的值为 2),commit 过程以下

  • Task 线程将 TaskAttempt 数据写入 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}
  • commitTask 由 Task 线程将 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 移动到 ${output.dir.root}
  • commitJob 建立 _SUCCESS 标记文件
  • recoverTask 无需任何操做

V1 vs. V2 committer 性能对比

V1 在 Job 执行结束后,在 Driver 端经过 commitJob 方法,单线程串行将全部 Task 的输出文件移动到输出根目录。移动以文件为单位,当 Task 个数较多(大 Job,或者小文件引发的大量小 Task),Name Node RPC 较慢时,该过程耗时较久。在实践中,可能所以发生全部 Task 均执行结束,但 Job 不结束的问题。甚至 commitJob 耗时比 全部 Task 执行时间还要长

而 V2 在 Task 结束后,由 Task 在 commitTask 方法内,将本身的数据文件移动到输出根目录。一方面,Task 结束时即移动文件,不需等待 Job 结束才移动文件,即文件移动更早发起,也更早结束。另外一方面,不一样 Task 间并行移动文件,极大缩短了整个 Job 内全部 Task 的文件移动耗时

V1 vs. V2 committer 一致性对比

V1 只有 Job 结束,才会将数据文件移动到输出根目录,才会对外可见。在此以前,全部文件均在 ${output.dir.root}/_temporary/${appAttempt} 及其子文件内,对外不可见。

当 commitJob 过程耗时较短时,其失败的可能性较小,可认为 V1 的 commit 过程是两阶段提交,要么全部 Task 都 commit 成功,要么都失败。

而因为上文提到的问题, commitJob 过程可能耗时较久,若是在此过程当中,Driver 失败,则可能发生部分 Task 数据被移动到 ${output.dir.root} 对外可见,部分 Task 的数据未及时移动,对外不可见的问题。此时发生了数据不一致性的问题

V2 当 Task 结束时,当即将数据移动到 ${output.dir.root},当即对外可见。若是 Application 执行过程当中失败了,已 commit 的 Task 数据仍然对外可见,而失败的 Task 数据或未被 commit 的 Task 数据对外不可见。也即 V2 更易发生数据一致性问题

相关文章
相关标签/搜索