大数据面试题:大数据性能调优之分配更多资源
分配更多资源:性能调优的王道,就是增长和分配更多的资源,性能和速度上的提高,是显而易见的;基本上,在必定范围以内,增长资源与性能的提高,是成正比的;写完了一个复杂的spark做业以后,进行性能调优的时候,首先第一步,我以为,就是要来调节最优的资源配置;在这个基础之上,若是说你的spark做业,可以分配的资源达到了你的能力范围的顶端以后,没法再分配更多的资源了,公司资源有限;那么才是考虑去作后面的这些性能调优的点。
问题:
一、分配哪些资源?
二、在哪里分配这些资源?
三、为何多分配了这些资源之后,性能会获得提高?
答案:
一、分配哪些资源?executor、cpu per executor、memory per executor、driver memory
二、在哪里分配这些资源?在咱们在生产环境中,提交spark做业时,用的spark-submit shell脚本,里面调整对应的参数
/usr/local/spark/bin/spark-submit --class cn.spark.sparktest.core.WordCountCluster --num-executors 80 配置executor的数量
--driver-memory1024m 配置driver的内存(影响不大)
--executor-memory 512m 配置每一个executor的内存大小
--executor-cores 3 配置每一个executor的cpu core数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar 三、调节到多大,算是最大呢?
第一种,Spark Standalone,公司集群上,搭建了一套Spark集群,你内心应该清楚每台机器还可以给你使用的,大概有多少内存,多少cpu core;那么,设置的时候,就根据这个实际的状况,去调节每一个spark做业的资源分配。好比说你的每台机器可以给你使用4G内存,2个cpu core;20台机器;executor,20;4G内存,2个cpu core,平均每一个executor。
第二种,Yarn。资源队列。资源调度。应该去查看,你的spark做业,要提交到的资源队列,大概有多少资源?500G内存,100个cpu core;executor,50;10G内存,2个cpu core,平均每一个executor。
一个原则,你能使用的资源有多大,就尽可能去调节到最大的大小(executor的数量,几十个到上百个不等;executor内存;executor cpu core)
四、为何调节了资源之后,性能能够提高?面试
性能调优之调节spark并行度算法
并行度:其实就是指的是,Spark做业中,各个stage的task数量,也就表明了Spark做业的在各个阶段(stage)的并行度。
若是不调节并行度,致使并行度太低,会怎么样?
假设,如今已经在spark-submit脚本里面,给咱们的spark做业分配了足够多的资源,好比50个executor,每一个executor有10G内存,每一个executor有3个cpu core。基本已经达到了集群或者yarn队列的资源上限。
task没有设置,或者设置的不多,好比就设置了,100个task。50个executor,每一个executor有3个cpu core,也就是说,你的Application任何一个stage运行的时候,都有总数在150个cpu core,能够并行运行。可是你如今,只有100个task,平均分配一下,每一个executor分配到2个task,ok,那么同时在运行的task,只有100个,每一个executor只会并行运行2个task。每一个executor剩下的一个cpu core,就浪费掉了。
你的资源虽然分配足够了,可是问题是,并行度没有与资源相匹配,致使你分配下去的资源都浪费掉了。
合理的并行度的设置,应该是要设置的足够大,大到能够彻底合理的利用你的集群资源;好比上面的例子,总共集群有150个cpu core,能够并行运行150个task。那么就应该将你的Application的并行度,至少设置成150,才能彻底有效的利用你的集群资源,让150个task,并行执行;并且task增长到150个之后,便可以同时并行运行,还可让每一个task要处理的数据量变少;好比总共150G的数据要处理,若是是100个task,每一个task计算1.5G的数据;如今增长到150个task,能够并行运行,并且每一个task主要处理1G的数据就能够。
很简单的道理,只要合理设置并行度,就能够彻底充分利用你的集群计算资源,而且减小每一个task要处理的数据量,最终,就是提高你的整个Spark做业的性能和运行速度。sql
一、task数量,至少设置成与Spark application的总cpu core数量相同(最理想状况,好比总共150个cpu core,分配了150个task,一块儿运行,差很少同一时间运行完毕)
二、官方是推荐,task数量,设置成spark application总cpu core数量的2~3倍,好比150个cpu core,基本要设置task数量为300~500;
实际状况,与理想状况不一样的,有些task会运行的快一点,好比50s就完了,有些task,可能会慢一点,要1分半才运行完,因此若是你的task数量,恰好设置的跟cpu core数量相同,可能仍是会致使资源的浪费,由于,好比150个task,10个先运行完了,剩余140个还在运行,可是这个时候,有10个cpu core就空闲出来了,就致使了浪费。那若是task数量设置成cpu core总数的2~3倍,那么一个task运行完了之后,另外一个task立刻能够补上来,就尽可能让cpu core不要空闲,同时也是尽可能提高spark做业运行的效率和速度,提高性能。
三、如何设置一个Spark Application的并行度?
spark.default.parallelism
SparkConf conf = new SparkConf()
.set("spark.default.parallelism", "300-450")shell
性能调优之RDD架构重构apache
第一,RDD架构重构与优化
尽可能去复用RDD,差很少的RDD,能够抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。
第二,公共RDD必定要实现持久化
对于要屡次计算和使用的公共RDD,必定要进行持久化。
持久化,也就是说,将RDD的数据缓存到内存中/磁盘中,(BlockManager),之后不管对这个RDD作多少次计算,那么都是直接取这个RDD的持久化的数据,好比从内存中或者磁盘中,直接提取一份数据。
第三,持久化,是能够进行序列化的
若是正常将数据持久化在内存中,那么可能会致使内存的占用过大,这样的话,也许,会致使OOM内存溢出。
当纯内存没法支撑公共RDD数据彻底存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每一个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减小内存的空间占用。
序列化的方式,惟一的缺点就是,在获取数据的时候,须要反序列化。
若是序列化纯内存方式,仍是致使OOM,内存溢出;就只能考虑磁盘的方式,
内存+磁盘的普通方式(无序列化)。
内存+磁盘,序列化
第四,为了数据的高可靠性,并且内存充足,可使用双副本机制,进行持久化
持久化的双副本机制,持久化后的一个副本,由于机器宕机了,副本丢了,就仍是得从新计算一次;持久化的每一个数据单元,存储一份副本,放在其余节点上面;从而进行容错;一个副本丢了,不用从新计算,还可使用另一份副本。
这种方式,仅仅针对你的内存资源极度充足数组
Spark性能调优之广播大变量。缓存
这种默认的,task执行的算子中,使用了外部的变量,每一个task都会获取一份变量的副本,有什么缺点呢?在什么状况下,会出现性能上的恶劣的影响呢?
map,自己是不小,存放数据的一个单位是Entry,还有可能会用链表的格式的来存放Entry链条。因此map是比较消耗内存的数据格式。
好比,map是1M。总共,你前面调优都调的特好,资源给的到位,配合着资源,并行度调节的绝对到位,1000个task。大量task的确都在并行运行。
这些task里面都用到了占用1M内存的map,那么首先,map会拷贝1000份副本,经过网络传输到各个task中去,给task使用。总计有1G的数据,会经过网络传输。网络传输的开销,不容乐观啊!!!网络传输,也许就会消耗掉你的spark做业运行的总时间的一小部分。
map副本,传输到了各个task上以后,是要占用内存的。1个map的确不大,1M;1000个map分布在你的集群中,一会儿就耗费掉1G的内存。对性能会有什么影响呢?
没必要要的内存的消耗和占用,就致使了,你在进行RDD持久化到内存,也许就无法彻底在内存中放下;就只能写入磁盘,最后致使后续的操做在磁盘IO上消耗性能;
你的task在建立对象的时候,也许会发现堆内存放不下全部对象,也许就会致使频繁的垃圾回收器的回收,GC。GC的时候,必定是会致使工做线程中止,也就是致使Spark暂停工做那么一点时间。频繁GC的话,对Spark做业的运行的速度会有至关可观的影响。网络
大数据面试题,spark性能调优之使用kryo序列化架构
默认状况下,Spark内部是使用Java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化
这种默认序列化机制的好处在于,处理起来比较方便;也不须要咱们手动去作什么事情,只是,你在算子里面使用的变量,必须是实现Serializable接口的,可序列化便可。
可是缺点在于,默认的序列化机制的效率不高,序列化的速度比较慢;序列化之后的数据,占用的内存空间相对仍是比较大。
能够手动进行序列化格式的优化
Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。
因此Kryo序列化优化之后,可让网络传输的数据变少;在集群中耗费的内存资源大大减小。
Kryo序列化机制,一旦启用之后,会生效的几个地方:
一、算子函数中使用到的外部变量
二、持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER
三、shuffle
一、算子函数中使用到的外部变量,使用Kryo之后:优化网络传输的性能,能够优化集群中内存的占用和消耗
二、持久化RDD,优化内存的占用和消耗;持久化RDD占用的内存越少,task执行的时候,建立的对象,就不至于频繁的占满内存,频繁发生GC。
三、shuffle:能够优化网络传输的性能
实现:
SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
首先第一步,在SparkConf中设置一个属性,spark.serializer,org.apache.spark.serializer.KryoSerializer类;
Kryo之因此没有被做为默认的序列化类库的缘由,就要出现了:主要是由于Kryo要求,若是要达到它的最佳性能的话,那么就必定要注册你自定义的类(好比,你的算子函数中使用到了外部自定义类型的对象变量,这时,就要求必须注册你的类,不然Kryo达不到最佳性能)。
第二步,注册你使用到的,须要经过Kryo序列化的,一些自定义类,SparkConf.registerKryoClasses()
项目中的使用:
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})app
大数据面试题:jvm调优之原理概述以及下降cache操做的内存占比
每一次放对象的时候,都是放入eden区域,和其中一个survivor区域;另一个survivor区域是空闲的。
当eden区域和一个survivor区域放满了之后(spark运行过程当中,产生的对象实在太多了),就会触发minor gc,小型垃圾回收。把再也不使用的对象,从内存中清空,给后面新建立的对象腾出来点儿地方。
清理掉了再也不使用的对象以后,那么也会将存活下来的对象(还要继续使用的),放入以前空闲的那一个survivor区域中。这里可能会出现一个问题。默认eden、survior1和survivor2的内存占比是8:1:1。问题是,若是存活下来的对象是1.5,一个survivor区域放不下。此时就可能经过JVM的担保机制(不一样JVM版本可能对应的行为),将多余的对象,直接放入老年代了。
若是你的JVM内存不够大的话,可能致使频繁的年轻代内存满溢,频繁的进行minor gc。频繁的minor gc会致使短期内,有些存活的对象,屡次垃圾回收都没有回收掉。会致使这种短声明周期(其实不必定是要长期使用的)对象,年龄过大,垃圾回收次数太多尚未回收到,跑到老年代。
老年代中,可能会由于内存不足,囤积一大堆,短生命周期的,原本应该在年轻代中的,可能立刻就要被回收掉的对象。此时,可能致使老年代频繁满溢。频繁进行full gc(全局/全面垃圾回收)。full gc就会去回收老年代中的对象。full gc因为这个算法的设计,是针对的是,老年代中的对象数量不多,满溢进行full gc的频率应该不多,所以采起了不太复杂,可是耗费性能和时间的垃圾回收算法。full gc很慢。
full gc / minor gc,不管是快,仍是慢,都会致使jvm的工做线程中止工做,stop the world。简而言之,就是说,gc的时候,spark中止工做了。等着垃圾回收结束。
内存不充足的时候,问题:
一、频繁minor gc,也会致使频繁spark中止工做
二、老年代囤积大量活跃对象(短生命周期的对象),致使频繁full gc,full gc时间很长,短则数十秒,长则数分钟,甚至数小时。可能致使spark长时间中止工做。
三、严重影响我们的spark的性能和运行的速度。
JVM调优的第一个点:下降cache操做的内存占比
spark中,堆内存又被划分红了两块儿,一起是专门用来给RDD的cache、persist操做进行RDD数据缓存用的;另一块儿,就是咱们刚才所说的,用来给spark算子函数的运行使用的,存放函数中本身建立的对象。
默认状况下,给RDD cache操做的内存占比,是0.6,60%的内存都给了cache操做了。可是问题是,若是某些状况下,cache不是那么的紧张,问题在于task算子函数中建立的对象过多,而后内存又不太大,致使了频繁的minor gc,甚至频繁full gc,致使spark频繁的中止工做。性能影响会很大。
那么就经过yarn的界面,去查看你的spark做业的运行统计,很简单,你们一层一层点击进去就好。能够看到每一个stage的运行状况,包括每一个task的运行时间、gc时间等等。若是发现gc太频繁,时间太长。此时就能够适当调价这个比例。
下降cache操做的内存占比,大不了用persist操做,选择将一部分缓存的RDD数据写入磁盘,或者序列化方式,配合Kryo序列化类,减小RDD缓存的内存占用;下降cache操做内存占比;对应的,算子函数的内存占比就提高了。这个时候,可能,就能够减小minor gc的频率,同时减小full gc的频率。对性能的提高是有必定的帮助的。
一句话,让task执行算子函数时,有更多的内存可使用。
spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2
大数据面试题:shuffle调优之合并map端输出文件
开启了map端输出文件的合并机制以后:
第一个stage,同时就运行cpu core个task,好比cpu core是2个,并行运行2个task;每一个task都建立下一个stage的task数量个文件;
第一个stage,并行运行的2个task执行完之后;就会执行另外两个task;另外2个task不会再从新建立输出文件;而是复用以前的task建立的map端输出文件,将数据写入上一批task的输出文件中。
第二个stage,task在拉取数据的时候,就不会去拉取上一个stage每个task为本身建立的那份输出文件了;而是拉取少许的输出文件,每一个输出文件中,可能包含了多个task给本身的map端输出。
提醒一下(map端输出文件合并):
只有并行执行的task会去建立新的输出文件;下一批并行执行的task,就会去复用以前已有的输出文件;可是有一个例外,好比2个task并行在执行,可是此时又启动要执行2个task;那么这个时候的话,就没法去复用刚才的2个task建立的输出文件了;而是仍是只能去建立新的输出文件。
要实现输出文件的合并的效果,必须是一批task先执行,而后下一批task再执行,才能复用以前的输出文件;负责多批task同时起来执行,仍是作不到复用的。
大数据面试题:shuffle调优之调节map端内存缓冲与reduce端内存占比
spark.shuffle.file.buffer,默认32k
spark.shuffle.memoryFraction,0.2
reduce端task,在拉取到数据以后,会用hashmap的数据格式,来对各个key对应的values进行汇聚。
针对每一个key对应的values,执行咱们自定义的聚合函数的代码,好比_ + _(把全部values累加起来)
reduce task,在进行汇聚、聚合等操做的时候,实际上,使用的就是本身对应的executor的内存,executor(jvm进程,堆),默认executor内存中划分给reduce task进行聚合的比例,是0.2。
问题来了,由于比例是0.2,因此,理论上,颇有可能会出现,拉取过来的数据不少,那么在内存中,放不下;这个时候,默认的行为,就是说,将在内存放不下的数据,都spill(溢写)到磁盘文件中去。
原理说完以后,来看一下,默认状况下,不调优,可能会出现什么样的问题?
默认,map端内存缓冲是每一个task,32kb。
默认,reduce端聚合内存比例,是0.2,也就是20%。
若是map端的task,处理的数据量比较大,可是呢,你的内存缓冲大小是固定的。可能会出现什么样的状况?
每一个task就处理320kb,32kb,总共会向磁盘溢写320 / 32 = 10次。
每一个task处理32000kb,32kb,总共会向磁盘溢写32000 / 32 = 1000次。
在map task处理的数据量比较大的状况下,而你的task的内存缓冲默认是比较小的,32kb。可能会形成屡次的map端往磁盘文件的spill溢写操做,发生大量的磁盘IO,从而下降性能。
reduce端聚合内存,占比。默认是0.2。若是数据量比较大,reduce task拉取过来的数据不少,那么就会频繁发生reduce端聚合内存不够用,频繁发生spill操做,溢写到磁盘上去。并且最要命的是,磁盘上溢写的数据量越大,后面在进行聚合操做的时候,极可能会屡次读取磁盘中的数据,进行聚合。
默认不调优,在数据量比较大的状况下,可能频繁地发生reduce端的磁盘文件的读写。
这两个点之因此放在一块儿讲,是由于他们俩是有关联的。数据量变大,map端确定会出点问题;reduce端确定也会出点问题;出的问题是同样的,都是磁盘IO频繁,变多,影响性能。
调优:
在实际生产环境中,咱们在何时来调节两个参数?
若是发现shuffle 磁盘的write和read,很大。这个时候,就意味着最好调节一些shuffle的参数。进行调优。首先固然是考虑开启map端输出文件合并机制。
调节上面说的那两个参数。调节的时候的原则。spark.shuffle.file.buffer,每次扩大一倍,而后看看效果,64,128;spark.shuffle.memoryFraction,每次提升0.1,看看效果。
调节了之后,效果?map task内存缓冲变大了,减小spill到磁盘文件的次数;reduce端聚合内存变大了,减小spill到磁盘的次数,并且减小了后面聚合读取磁盘文件的数量。
大数据面试题:spark调优troubleshooting之控制shuffle reduce端缓存大小避免OOM
map端的task是不断的输出数据的,数据量多是很大的。
可是,其实reduce端的task,并非等到map端task将属于本身的那份数据所有写入磁盘文件以后,再去拉取的。map端写一点数据,reduce端task就会拉取一小部分数据,当即进行后面的聚合、算子函数的应用。
每次reduece可以拉取多少数据,就由buffer来决定。由于拉取过来的数据,都是先放在buffer中的。而后才用后面的executor分配的堆内存占比(0.2),hashmap,去进行后续的聚合、函数的执行。
reduce端缓冲(buffer),可能会出什么问题?
多是会出现,默认是48MB,也许大多数时候,reduce端task一边拉取一边计算,不必定一直都会拉满48M的数据。可能大多数时候,拉取个10M数据,就计算掉了。
大多数时候,也许不会出现什么问题。可是有的时候,map端的数据量特别大,而后写出的速度特别快。reduce端全部task,拉取的时候,所有达到本身的缓冲的最大极限值,缓冲,48M,所有填满。
这个时候,再加上你的reduce端执行的聚合函数的代码,可能会建立大量的对象。也许,一会儿,内存就撑不住了,就会OOM。reduce端的内存中,就会发生内存溢出的问题。
针对上述的可能出现的问题,咱们该怎么来解决呢?
这个时候,就应该减小reduce端task缓冲的大小。我宁愿多拉取几回,可是每次同时可以拉取到reduce端每一个task的数量,比较少,就不容易发生OOM内存溢出的问题。(好比,能够调节成12M)
在实际生产环境中,咱们都是碰到过这种问题的。这是典型的以性能换执行的原理。reduce端缓冲小了,不容易OOM了,可是,性能必定是有所降低的,你要拉取的次数就多了。就走更多的网络传输开销。
这种时候,只能采起牺牲性能的方式了,spark做业,首先,第一要义,就是必定要让它能够跑起来。而后才去考虑性能的调优。
再来讲说,reduce端缓冲大小的另一面,关于性能调优的一面:
我们假如说,你的Map端输出的数据量也不是特别大,而后你的整个application的资源也特别充足。200个executor、5个cpu core、10G内存。
其实能够尝试去增长这个reduce端缓冲大小的,好比从48M,变成96M。那么这样的话,每次reduce task可以拉取的数据量就很大。须要拉取的次数也就变少了。好比原先须要拉取100次,如今只要拉取50次就能够执行完了。
对网络传输性能开销的减小,以及reduce端聚合操做执行的次数的减小,都是有帮助的。
最终达到的效果,就应该是性能上的必定程度上的提高。
必定要注意,资源足够的时候,再去作这个事儿。
调节参数为
spark.reducer.maxSizeInFlight,48//调小变为
spark.reducer.maxSizeInFlight,24
大数据面试题:spark调优troubleshooting之解决JVM GC致使的shuffle文件拉取失败
有时会出现的一种状况,很是广泛,在spark的做业中;shuffle file not found。(spark做业中,很是很是常见的)并且,有的时候,它是偶尔才会出现的一种状况。有的时候,出现这种状况之后,会从新去提交stage、task。从新执行一遍,发现就行了。没有这种错误了。
log怎么看?用client模式去提交你的spark做业。好比standalone client;yarn client。一提交做业,直接能够在本地看到刷刷刷更新的log。
spark.shuffle.io.maxRetries 3
第一个参数,意思就是说,shuffle文件拉取的时候,若是没有拉取到(拉取失败),最多或重试几回(会从新拉取几回文件),默认是3次。
spark.shuffle.io.retryWait 5s
第二个参数,意思就是说,每一次重试拉取文件的时间间隔,默认是5s钟。
默认状况下,假如说第一个stage的executor正在进行漫长的full gc。第二个stage的executor尝试去拉取文件,结果没有拉取到,默认状况下,会反复重试拉取3次,每次间隔是五秒钟。最多只会等待3 * 5s = 15s。若是15s内,没有拉取到shuffle file。就会报出shuffle file not found。
针对这种状况,咱们彻底能够进行预备性的参数调节。增大上述两个参数的值,达到比较大的一个值,尽可能保证第二个stage的task,必定可以拉取到上一个stage的输出文件。避免报shuffle file not found。而后可能会从新提交stage和task去执行。那样反而对性能也很差。
spark.shuffle.io.maxRetries 60
spark.shuffle.io.retryWait 60s
最多能够忍受1个小时没有拉取到shuffle file。只是去设置一个最大的可能的值。full gc不可能1个小时都没结束吧。
这样呢,就能够尽可能避免由于gc致使的shuffle file not found,没法拉取到的问题。
大数据面试题:spark调优troubleshooting之解决yarn-cluster模式的jvm内存溢出没法执行问题
实践经验,碰到的yarn-cluster的问题:
有的时候,运行一些包含了spark sql的spark做业,可能会碰到yarn-client模式下,能够正常提交运行;yarn-cluster模式下,多是没法提交运行的,会报出JVM的PermGen(永久代)的内存溢出,OOM。
yarn-client模式下,driver是运行在本地机器上的,spark使用的JVM的PermGen的配置,是本地的spark-class文件(spark客户端是默认有配置的),JVM的永久代的大小是128M,这个是没有问题的;可是呢,在yarn-cluster模式下,driver是运行在yarn集群的某个节点上的,使用的是没有通过配置的默认设置(PermGen永久代大小),82M。
spark-sql,它的内部是要进行很复杂的SQL的语义解析、语法树的转换等等,特别复杂,在这种复杂的状况下,若是说你的sql自己特别复杂的话,极可能会比较致使性能的消耗,内存的消耗。可能对PermGen永久代的占用会比较大。
因此,此时,若是对永久代的占用需求,超过了82M的话,可是呢又在128M之内;就会出现如上所述的问题,yarn-client模式下,默认是128M,这个还能运行;若是在yarn-cluster模式下,默认是82M,就有问题了。会报出PermGen Out of Memory error log。
如何解决这种问题?
既然是JVM的PermGen永久代内存溢出,那么就是内存不够用。我们呢,就给yarn-cluster模式下的,driver的PermGen多设置一些。
spark-submit脚本中,加入如下配置便可:
--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
这个就设置了driver永久代的大小,默认是128M,最大是256M。那么,这样的话,就能够基本保证你的spark做业不会出现上述的yarn-cluster模式致使的永久代内存溢出的问题。
大数据面试题:spark调优troubleshooting之checkpoint的使用
持久化,大多数时候,都是会正常工做的。可是就怕,有些时候,会出现意外。 好比说,缓存在内存中的数据,可能莫名其妙就丢失掉了。 或者说,存储在磁盘文件中的数据,莫名其妙就没了,文件被误删了。 出现上述状况的时候,接下来,若是要对这个RDD执行某些操做,可能会发现RDD的某个partition找不到了。 对消失的partition从新计算,计算完之后再缓存和使用。 有些时候,计算某个RDD,多是极其耗时的。可能RDD以前有大量的父RDD。那么若是你要从新计算一个partition,可能要从新计算以前全部的父RDD对应的partition。 这种状况下,就能够选择对这个RDD进行checkpoint,以防万一。进行checkpoint,就是说,会将RDD的数据,持久化一份到容错的文件系统上(好比hdfs)。 在对这个RDD进行计算的时候,若是发现它的缓存数据不见了。优先就是先找一下有没有checkpoint数据(到hdfs上面去找)。若是有的话,就使用checkpoint数据了。不至于说是去从新计算。 checkpoint,其实就是能够做为是cache的一个备胎。若是cache失效了,checkpoint就能够上来使用了。 checkpoint有利有弊,利在于,提升了spark做业的可靠性,一旦发生问题,仍是很可靠的,不用从新计算大量的rdd;可是弊在于,进行checkpoint操做的时候,也就是将rdd数据写入hdfs中的时候,仍是会消耗性能的。 checkpoint,用性能换可靠性。 checkpoint原理: 一、在代码中,用SparkContext,设置一个checkpoint目录,能够是一个容错文件系统的目录,好比hdfs; 二、在代码中,对须要进行checkpoint的rdd,执行RDD.checkpoint(); 三、RDDCheckpointData(spark内部的API),接管你的RDD,会标记为marked for checkpoint,准备进行checkpoint 四、你的job运行完以后,会调用一个finalRDD.doCheckpoint()方法,会顺着rdd lineage,回溯扫描,发现有标记为待checkpoint的rdd,就会进行二次标记,inProgressCheckpoint,正在接受checkpoint操做 五、job执行完以后,就会启动一个内部的新job,去将标记为inProgressCheckpoint的rdd的数据,都写入hdfs文件中。(备注,若是rdd以前cache过,会直接从缓存中获取数据,写入hdfs中;若是没有cache过,那么就会从新计算一遍这个rdd,再checkpoint) 六、将checkpoint过的rdd以前的依赖rdd,改为一个CheckpointRDD*,强制改变你的rdd的lineage。后面若是rdd的cache数据获取失败,直接会经过它的上游CheckpointRDD,去容错的文件系统,好比hdfs,中,获取checkpoint的数据。