1.尽量复用同一个RDDmysql
2.对重复使用的RDD进行持久化sql
3.对DAG过长的计算增长chekpoint(检查点)机制,将文件最好保存在HDFS中(多副本)shell
4.选择一种合适的持久化策略数据库
默认状况下,性能最高的固然是MEMORY_ONLY,但前提是你的内存必须足够足够大,能够绰绰有余地存放下整个RDD的全部数据。由于不进行序列化与反序列化操做,就避免了这部分的性能开销;对这个RDD的后续算子操做,都是基于纯内存中的数据的操做,不须要从磁盘文件中读取数据,性能也很高;并且不须要复制一份数据副本,并远程传送到其余节点上。可是这里必需要注意的是,在实际的生产环境中,恐怕可以直接用这种策略的场景仍是有限的,若是RDD中数据比较多时(好比几十亿),直接用这种持久化级别,会致使JVM的OOM内存溢出异常。apache
若是使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每一个partition仅仅是一个字节数组而已,大大减小了对象数量,并下降了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。可是后续算子能够基于纯内存进行操做,所以性能整体仍是比较高的。此外,可能发生的问题同上,若是RDD中的数据量过多的话,仍是可能会致使OOM内存溢出的异常。数组
若是纯内存的级别都没法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。由于既然到了这一步,就说明RDD的数据量很大,内存没法彻底放下。序列化后的数据比较少,能够节省内存和磁盘的空间开销。同时该策略会优先尽可能尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。缓存
一般不建议使用DISK_ONLY和后缀为_2的级别:由于彻底基于磁盘文件进行数据的读写,会致使性能急剧下降,有时还不如从新计算一次全部RDD。后缀为_2的级别,必须将全部数据都复制一份副本,并发送到其余节点上,数据复制以及网络传输会致使较大的性能开销,除非是要求做业的高可用性,不然不建议使用。网络
5.更改程序shufflle的序列化方式为Kryo并发
方法一:修改spark-defaults.conf配置文件 设置: spark.serializer org.apache.spark.serializer.KryoSerializer 注意:用空格隔开 方法二:启动spark-shell或者spark-submit时配置 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 方法三:在代码中 val conf = new SparkConf() conf.set(“spark.serializer”,“org.apache.spark.serializer.KryoSerializer”)
6.测试的时候可使用collect,可是生产环境上不可使用。Driver端会爆炸ide
7.开启推迟执行机制
能够设置spark.speculation true
开启后,spark会检测执行较慢的Task,并复制这个Task在其余节点运行,最后哪一个节点先运行完,就用其结果,而后将慢Task 杀死
8.配置多临时文件目录
spark.local.dir参数。当shuffle、归并排序(sort、merge)时都会产生临时文件。这些临时文件都在这个指定的目录下。那这个文件夹有不少临时文件,若是都发生读写操做,有的线程在读这个文件,有的线程在往这个文件里写,磁盘I/O性能就很是低。
能够建立多个文件夹,每一个文件夹都对应一个真实的硬盘。假如原来是3个程序同时读写一个硬盘,效率确定低,如今让三个程序分别读取3个磁盘,这样冲突减小,效率就提升了。这样就有效提升外部文件读和写的效率。怎么配置呢?只须要在这个配置时配置多个路径就能够。中间用逗号分隔。
spark.local.dir=/home/tmp,/home/tmp2
而后须要把每一个目录挂载到不一样的磁盘上
9.进行join时,能够将小表广播出去
10.有些状况下,RDD操做使用MapPartitions替代map
好比JdisPool,mysql链接池的建立和销毁操做
map方法对RDD的每一条记录逐一操做。mapPartitions是对RDD里的每一个分区操做
rdd.map{ x=>conn=getDBConn.conn;write(x.toString);conn close;}
这样频繁的连接、断开数据库,效率差。
rdd.mapPartitions{(record:=>conn.getDBConn;for(item<-recorders;write(item.toString);conn close;}
这样就一次连接一次断开,中间批量操做,效率提高。
参数配置调优
spark.reducer.maxSizeInFlight:默认值:48m 该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次可以拉取多少数据。调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比96m),从而减小拉取数据的次数,也就能够减小网络传输的次数,进而提高性能。在实践中发现,合理调节该参数,性能会有1%~5%的提高。
spark.shuffle.io.maxRetries:默认值:3 shuffle read task从shuffle write task所在节点拉取属于本身的数据时,若是由于网络异常致使拉取失败,是会自动进行重试的。该参数就表明了能够重试的最大次数。若是在指定次数以内拉取仍是没有成功,就可能会致使做业执行失败。
调优建议:对于那些包含了特别耗时的shuffle操做的做业,建议增长重试最大次数(好比60次),以免因为JVM的full gc或者网络不稳定等因素致使的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数能够大幅度提高稳定性。
spark.shuffle.io.retryWait :默认值:5s参数说明:具体解释同上,该参数表明了每次重试拉取数据的等待间隔,默认是5s。建议加大间隔时长(好比60s),以增长shuffle操做的稳定性。
spark.shuffle.manager:默认值:sort 该参数用于设置ShuffleManager的类型。Spark 1.5之后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默认选项,可是Spark 1.2以及以后的版本默认都是SortShuffleManager了。tungsten-sort与sort相似,可是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
调优建议:因为SortShuffleManager默认会对数据进行排序,所以若是你的业务逻辑中须要该排序机制的话,则使用默认的SortShuffleManager就能够;而若是你的业务逻辑不须要对数据进行排序,那么建议参考后面的几个参数调优,经过bypass机制或优化的HashShuffleManager来避免排序操做,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,由于以前发现了一些相应的bug
spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,若是shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程当中不会进行排序操做,而是直接按照未经优化的HashShuffleManager的方式去写数据,可是最后会将每一个task产生的全部临时磁盘文件都合并成一个文件,并会建立单独的索引文件。调优建议:当你使用SortShuffleManager时,若是的确不须要排序操做,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减小了排序的性能开销。可是这种方式下,依然会产生大量的磁盘文件,所以shuffle write性能有待提升。
spark.shuffle.consolidateFiles:默认值:false 参数说明:若是使用HashShuffleManager,该参数有效。若是设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的状况下,这种方法能够极大地减小磁盘IO开销,提高性能。调优建议:若是的确不须要SortShuffleManager的排序机制,那么除了使用bypass机制,还能够尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。