spark 调优

1.尽量复用同一个RDDmysql

2.对重复使用的RDD进行持久化sql

3.对DAG过长的计算增长chekpoint(检查点)机制,将文件最好保存在HDFS中(多副本)shell

4.选择一种合适的持久化策略数据库

  • 默认状况下,性能最高的固然是MEMORY_ONLY,但前提是你的内存必须足够足够大,能够绰绰有余地存放下整个RDD的全部数据。由于不进行序列化与反序列化操做,就避免了这部分的性能开销;对这个RDD的后续算子操做,都是基于纯内存中的数据的操做,不须要从磁盘文件中读取数据,性能也很高;并且不须要复制一份数据副本,并远程传送到其余节点上。可是这里必需要注意的是,在实际的生产环境中,恐怕可以直接用这种策略的场景仍是有限的,若是RDD中数据比较多时(好比几十亿),直接用这种持久化级别,会致使JVM的OOM内存溢出异常。apache

  • 若是使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每一个partition仅仅是一个字节数组而已,大大减小了对象数量,并下降了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。可是后续算子能够基于纯内存进行操做,所以性能整体仍是比较高的。此外,可能发生的问题同上,若是RDD中的数据量过多的话,仍是可能会致使OOM内存溢出的异常。数组

  • 若是纯内存的级别都没法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。由于既然到了这一步,就说明RDD的数据量很大,内存没法彻底放下。序列化后的数据比较少,能够节省内存和磁盘的空间开销。同时该策略会优先尽可能尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。缓存

  • 一般不建议使用DISK_ONLY和后缀为_2的级别:由于彻底基于磁盘文件进行数据的读写,会致使性能急剧下降,有时还不如从新计算一次全部RDD。后缀为_2的级别,必须将全部数据都复制一份副本,并发送到其余节点上,数据复制以及网络传输会致使较大的性能开销,除非是要求做业的高可用性,不然不建议使用。网络

5.更改程序shufflle的序列化方式为Kryo并发

方法一:修改spark-defaults.conf配置文件
设置:
spark.serializer  org.apache.spark.serializer.KryoSerializer
注意:用空格隔开
方法二:启动spark-shell或者spark-submit时配置
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
方法三:在代码中
val conf = new SparkConf()
conf.set(“spark.serializer”,“org.apache.spark.serializer.KryoSerializer”)

6.测试的时候可使用collect,可是生产环境上不可使用。Driver端会爆炸ide

7.开启推迟执行机制

能够设置spark.speculation  true

开启后,spark会检测执行较慢的Task,并复制这个Task在其余节点运行,最后哪一个节点先运行完,就用其结果,而后将慢Task 杀死

8.配置多临时文件目录

spark.local.dir参数。当shuffle、归并排序(sort、merge)时都会产生临时文件。这些临时文件都在这个指定的目录下。那这个文件夹有不少临时文件,若是都发生读写操做,有的线程在读这个文件,有的线程在往这个文件里写,磁盘I/O性能就很是低。

能够建立多个文件夹每一个文件夹都对应一个真实的硬盘。假如原来是3个程序同时读写一个硬盘,效率确定低,如今让三个程序分别读取3个磁盘,这样冲突减小,效率就提升了。这样就有效提升外部文件读和写的效率。怎么配置呢?只须要在这个配置时配置多个路径就能够。中间用逗号分隔。

spark.local.dir=/home/tmp,/home/tmp2

而后须要把每一个目录挂载到不一样的磁盘上

9.进行join时,能够将小表广播出去 

10.有些状况下,RDD操做使用MapPartitions替代map

好比JdisPool,mysql链接池的建立和销毁操做

map方法对RDD的每一条记录逐一操做。mapPartitions是对RDD里的每一个分区操做

rdd.map{ x=>conn=getDBConn.conn;write(x.toString);conn close;}

这样频繁的连接、断开数据库,效率差。

rdd.mapPartitions{(record:=>conn.getDBConn;for(item<-recorders;write(item.toString);conn close;}

这样就一次连接一次断开,中间批量操做,效率提高。

 

参数配置调优

  • num-executors:该参数必定会被设置,Yarn 会按照 Driver 的申请去最终为当前的 Application 生产指定个数的 Executors,实际生产环境下应该分配80个左右 Executors 会比较合适呢。
  • executor-memory:这个定义了每一个 Executor 的内存,它与 JVM OOM 紧密相关,不少时候甚至决定了 Spark 运行的性能。实际生产环境下建义是 8G 左右,不少时候 Spark 运行在 Yarn 上,内存占用量不要超过 Yarn 的内存资源的 50%。
  • executor-cores:决定了在 Executors 中可以并行执行的 Tasks 的个数。实际生产环境下应该分配4个左右,通常状况下不要超过 Yarn 队列中 Cores 总数量的 50%。
  • driver-memory:默应是 1G
  • spark.default.parallelizm:并行度问题,若是不设置这个参数,Spark 会跟据 HDFS 中 Block 的个数去设置这一个数量,原理是默应每一个 Block 会对应一个 Task,默应状况下,若是数据量不是太多就不能够充份利用 executor 设置的资源,就会浪费了资源。建义设置为 100个,最好 700个左右。Spark官方的建义是每个 Core 负责 2-3 个 Task。 
  • spark.storage.memoryFraction:默应占用 60%,若是计算比较依赖于历史数据则能够调高该参数,当若是计算比较依赖 Shuffle 的话则须要下降该比例。
  • spark.shuffle.memoryFraction:默应占用 20%,若是计算比较依赖 Shuffle 的话则须要调高该比例。
  • spark.shuffle.file.buffer:默认值:32k  参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件以前,会先写入buffer缓冲中,待缓冲写满以后,才会溢写到磁盘。调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比64k),从而减小shuffle write过程当中溢写磁盘文件的次数,也就能够减小磁盘IO次数,进而提高性能。在实践中发现,合理调节该参数,性能会有1%~5%的提高。
  • spark.reducer.maxSizeInFlight:默认值:48m 该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次可以拉取多少数据。调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比96m),从而减小拉取数据的次数,也就能够减小网络传输的次数,进而提高性能。在实践中发现,合理调节该参数,性能会有1%~5%的提高。

  • spark.shuffle.io.maxRetries:默认值:3  shuffle read task从shuffle write task所在节点拉取属于本身的数据时,若是由于网络异常致使拉取失败,是会自动进行重试的。该参数就表明了能够重试的最大次数。若是在指定次数以内拉取仍是没有成功,就可能会致使做业执行失败。
    调优建议:对于那些包含了特别耗时的shuffle操做的做业,建议增长重试最大次数(好比60次),以免因为JVM的full gc或者网络不稳定等因素致使的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数能够大幅度提高稳定性。

  • spark.shuffle.io.retryWait :默认值:5s参数说明:具体解释同上,该参数表明了每次重试拉取数据的等待间隔,默认是5s。建议加大间隔时长(好比60s),以增长shuffle操做的稳定性。

  • spark.shuffle.manager:默认值:sort  该参数用于设置ShuffleManager的类型。Spark 1.5之后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默认选项,可是Spark 1.2以及以后的版本默认都是SortShuffleManager了。tungsten-sort与sort相似,可是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
    调优建议:因为SortShuffleManager默认会对数据进行排序,所以若是你的业务逻辑中须要该排序机制的话,则使用默认的SortShuffleManager就能够;而若是你的业务逻辑不须要对数据进行排序,那么建议参考后面的几个参数调优,经过bypass机制或优化的HashShuffleManager来避免排序操做,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,由于以前发现了一些相应的bug

  • spark.shuffle.sort.bypassMergeThreshold

    默认值:200
    参数说明:当ShuffleManager为SortShuffleManager时,若是shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程当中不会进行排序操做,而是直接按照未经优化的HashShuffleManager的方式去写数据,可是最后会将每一个task产生的全部临时磁盘文件都合并成一个文件,并会建立单独的索引文件。调优建议:当你使用SortShuffleManager时,若是的确不须要排序操做,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减小了排序的性能开销。可是这种方式下,依然会产生大量的磁盘文件,所以shuffle write性能有待提升。

  • spark.shuffle.consolidateFiles:默认值:false 参数说明:若是使用HashShuffleManager,该参数有效。若是设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的状况下,这种方法能够极大地减小磁盘IO开销,提高性能。调优建议:若是的确不须要SortShuffleManager的排序机制,那么除了使用bypass机制,还能够尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

相关文章
相关标签/搜索