checkpoint在spark中主要有两块应用:一块是在spark core中对RDD作checkpoint,能够切断作checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;另一块是应用在spark streaming中,使用checkpoint用来保存DStreamGraph以及相关配置信息,以便在Driver崩溃重启的时候可以接着以前进度继续进行处理(如以前waiting batch的job会在重启后继续处理)。apache
本文主要将详细分析checkpoint在以上两种场景的读写过程。app
使用checkpoint对RDD作快照大致以下:socket
sc.setCheckpointDir(checkpointDir.toString)
val rdd = sc.makeRDD(1 to 20, numSlices = 1)
rdd.checkpoint()
首先,设置checkpoint的目录(通常是hdfs目录),这个目录用来将RDD相关的数据(包括每一个partition实际数据,以及partitioner(若是有的话))。而后在RDD上调用checkpoint的方法便可。函数
能够看到checkpoint使用很是简单,设置checkpoint目录,而后调用RDD的checkpoint方法。针对checkpoint的写入流程,主要有如下四个问题:ui
Q1:RDD中的数据是何时写入的?是在rdd调用checkpoint方法时候吗?spa
Q2:在作checkpoint的时候,具体写入了哪些数据到HDFS了?scala
Q3:在对RDD作完checkpoint之后,对作RDD的本省又作了哪些收尾工做?rest
Q4:实际过程当中,使用RDD作checkpoint的时候须要注意什么问题?code
弄清楚了以上四个问题,我想对checkpoint的写过程也就基本清楚了。接下来将一一回答上面提出的问题。对象
A1:首先看一下RDD中checkpoint方法,能够看到在该方法中是只是新建了一个ReliableRDDCheckpintData的对象,并无作实际的写入工做。实际触发写入的时机是在runJob生成改RDD后,调用RDD的doCheckpoint方法来作的。
A2:在经历调用RDD.doCheckpoint → RDDCheckpintData.checkpoint → ReliableRDDCheckpintData.doCheckpoint → ReliableRDDCheckpintData.writeRDDToCheckpointDirectory后,在writeRDDToCheckpointDirectory方法中能够看到:将做为一个单独的任务(RunJob)将RDD中每一个parition的数据依次写入到checkpoint目录(writePartitionToCheckpointFile),此外若是该RDD中的partitioner若是不为空,则也会将该对象序列化后存储到checkpoint目录。因此,在作checkpoint的时候,写入的hdfs中的数据主要包括:RDD中每一个parition的实际数据,以及可能的partitioner对象(writePartitionerToCheckpointDir)。
A3:在写完checkpoint数据到hdfs之后,将会调用rdd的markCheckpoined方法,主要斩断该rdd的对上游的依赖,以及将paritions置空等操做。
A4:经过A1,A2能够知道,在RDD计算完毕后,会再次经过RunJob将每一个partition数据保存到HDFS。这样RDD将会计算两次,因此为了不此类状况,最好将RDD进行cache。即1.1中rdd的推荐使用方法以下:
sc.setCheckpointDir(checkpointDir.toString)
val rdd = sc.makeRDD(1 to 20, numSlices = 1)
rdd.cache()rdd.checkpoint()
在作完checkpoint后,获取原来RDD的依赖以及partitions数据都将从CheckpointRDD中获取。也就是说获取原来rdd中每一个partition数据以及partitioner等对象,都将转移到CheckPointRDD中。
在CheckPointRDD的一个具体实现ReliableRDDCheckpintRDD中的compute方法中能够看到,将会从hdfs的checkpoint目录中恢复以前写入的partition数据。而partitioner对象(若是有)也会从以前写入hdfs的paritioner对象恢复。
总的来讲,checkpoint读取过程是比较简单的。
在streaming中使用checkpoint主要包含如下两点:设置checkpoint目录,初始化StreamingContext时调用getOrCreate方法,即当checkpoint目录没有数据时,则新建streamingContext实例,而且设置checkpoint目录,不然从checkpoint目录中读取相关配置和数据建立streamingcontext。
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc }
// Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
一样,针对streaming中checkpoint的写流程,主要有如下三个问题,并对此作相关解释。
Q1:streaming中checkpoint是在什么时候作的?
A1:在spark streaming中,jobGenerator会按期生成任务(jobGenerator.generateJobs)。在任务生成后将会调用doCheckpoint方法对系统作checkpoint。此外,在当前批次任务结束,清理metadata(jobGenerator.clearMetadata)时,也会调用doCheckpoint方法。
Q2:在streaming checkpoint过程当中,具体都写入了哪些数据到checkpoint目录?
A2: 作checkpoint的主要逻辑基本都在JobGenerator.doCheckpoint方法中。
在该方法中,首先更新当前时间段须要作checkpoint RDD的相关信息,如在DirectKafkaInputDStream中,将已经生成的RDD信息的时间,topic,partition,offset等相关信息进行更新。
其次,经过checkpointWriter将Checkpoint对象写入到checkpoint目录中(CheckPoint.write → CheckpointWriteHandle)。至此,咱们清楚了,写入到checkpoint目录的数据其实就是Checkpoint对象。
Checkpoint主要包含的信息以下:
val master = ssc.sc.master
val framework = ssc.sc.appName
val jars = ssc.sc.jars
val graph = ssc.graph
val checkpointDir = ssc.checkpointDir
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val sparkConfPairs = ssc.conf.getAll
具体包括相关配置信息,checkpoint目录,DStreamGraph等。对于DStreamGraph,主要包含InputDstream以及outputStream等相关信息,从而咱们能够看出定义应用相关的计算函数也被序列化保存到checkpoint目录中了。
Q3: streaming checkpoint都有哪些坑?
A3:
从A2中能够看到,应用定义的计算函数也被序列化到checkpoint目录,当应用代码发生改变时,此时就无法从checkpoint恢复。我的感受这是checkpoint在生产环境使用中碰到的最大障碍。
另外,当从checkpoint目录恢复streamingContext时,配置信息啥的也都是从checkpoint读取的(只有不多的一部分配置是reload的,具体见读流程),当重启任务时,新改变的配置就可能不生效,致使很奇怪的问题。
此外,broadcast变量在checkpoint中使用也受到限制(SPARK-5206)。
在spark streaming任务从checkpoint恢复streamingContext时,将会触发对以前保存的checkpoint对象的读取动做。在StreamingContext的getOrCreate方法中,经过checkpoint.read方法从checkpoint目录中恢复以前保存的Checkpoint对象。一旦该对象存在,将使用Checkpoint建立streamingContext。于此同时,在StreamingContext中DStreamGraph的恢复借助以前保存的对象,而且调用restoreCheckpointData恢复以前生成而未计算的RDD,从而接着以前的进度进行数据处理。
另外须要注意的时,如下配置信息在使用checkpoint建立streamingContext时,这些配置信息是从新加载的。
val propertiesToReload = List(
"spark.yarn.app.id",
"spark.yarn.app.attemptId",
"spark.driver.host",
"spark.driver.bindAddress",
"spark.driver.port",
"spark.master",
"spark.yarn.jars",
"spark.yarn.keytab",
"spark.yarn.principal",
"spark.yarn.credentials.file",
"spark.yarn.credentials.renewalTime",
"spark.yarn.credentials.updateTime",
"spark.ui.filters",
"spark.mesos.driver.frameworkId")
本文主要分析了checkpoint在spark core和streaming读写的基本过程,而且指出了在checkpoint使用中碰到一些坑。对于spark streaming,我的认为checkpoint在生产环境不适宜使用。