Spark 性能优化:代码中经常使用的几个调整点

前面三篇文章的三种优化方式调整到位以后会让整个 Spark 做业执行速度有很是明显的提高。html

除此以外咱们还有不少其它性能优化的手段,但在和前面三种方式比较,正常状况下提高没有那么大。java

1,使用广播变量

在 task 执行算子函数运算的时候,若是要用到外部变量,这种时候须要使用广播变量。由于,若是不使用广播变量,那么所使用的外部变量会在每一个 task 里会得到一份变量的副本,后续传输到各个 worker 节点去计算的时候一方面会加大不少的网络传输,另外一方面在 task 计算的时候会占用太大的内存空间(甚至会用到磁盘空间),读写消耗的 IO 也会不少。算法

使用广播变量后,初始的时候,只会在 Driver 上有一份副本。当 task 计算的时候,须要使用到广播变量中的数据时,首先会在本地的 executor 对应的 blockManager 中尝试获取,若是这里没有,就会从 driver 远程拉取变量副本(也可能会从距离最近的其它节点上的 blockManager 上拉取数据),并保持在本地的 blockManager 中,此后这个 executor 上的全部 task 会直接使用这份数据。shell

2,使用 kryo 序列化

Spark 内部默认使用的序列化方式是 Java 的序列化方式。和 kryo 的序列化相比,Java 的序列化方式,速度慢,序列化的数据占用空间相对仍是很大,因此使用 kryo 序列化在必定程度上提高性能。数据库

使用 kryo 序列化能优化到的几个地方:算子函数中用到的外部变量,持久化 RDD 时候使用 StorageLevel.MEMORY_ONLY_SER 这个级别时,shuffle 过程。apache

使用方法:缓存

SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").registerKryoClasses(xxxx)
复制代码

3,使用 fastutil 工具类

fastutil 是扩展了 Java 标准集合框架的类库。他可以提供更小的内存占用和存取速度。在 Spark 内使用到集合存储数据的地方能够尽可能使用 fastutil 提供的一些集合。性能优化

4,调整数据本地化等待时长

Spark 在 Driver 上分配每一个 stage 的 task 去 worker 节点以前,都会计算出每一个 task 要计算的是哪一个 partition 数据。这里 Spark 会有本身的一套 task 分配算法,优先但愿的是 task 恰好分配到其全部处理数据的那个节点,能够省去数据网络传输的性能消耗。网络

但,若是那个节点上的计算能力或者其余资源已经满了,这时候,Spark 会等待一段时间,若是说在这段时间内这个几点计算能力或者资源释放了,可以知足这个 task 的资源需求的话,就会继续分配到这个节点,若是这段时间内仍是老样子,那么久得分配到其它节点上了。框架

这个等待时间是经过 spark.locality.wait 这个配置参数设置。

5,JVM 调优

a,下降 cache 操做的内存占比: Spark 集群的每一个 worker 节点上 executor 都是运行在各自的 JVM 中。这个 JVM 的内存会被划分红两块,一块是用来给 RDD 的 cache、persist 的数据作缓存的,另外一块是存储各个算子中函数运算中产生的对象的。这个比例默认是 0.6,也就是只有 40% 的内存空间是给各个算子计算用的,若是计算中产生的数据量过大会频繁触发 minor gc,甚至会触发大量的 full gc,很吃性能。因此在 RDD 缓存使用不多的状况下能够调节下这个参数。

调节参数为:spark.storage.memoryFraction

b,调整 JVM 进程中除了 堆内存之外的内存空间: 有时候,若是你的spark做业处理的数据量特别特别大,几亿数据量。而后spark做业一运行,时不时的报错,shuffle file cannot find,executor、task lost,out of memory。这时候就要考虑设置这个参数了。

调节参数:--conf spark.yarn.executor.memoryOverhead 这个是配置在 shell 脚本里的。

c,调整链接等待时长: 碰到一种状况,没有任何规律:某某file。一串file id。uuid(dsfsfd-2342vs—sdf--sdfsd)。not found。file lost。这种状况颇有可能就是某个 worker 上的 executor 拉取另外节点上的数据因为长时间没拉到,超过了超时等待时间,爆出的错误。

调节参数:--conf spark.core.connection.ack.wait.timeout 配置在 shell 脚本里的。

关于堆外内存和 JVM 内存分配的东西能够参考这两偏文章:

www.ibm.com/developerwo…

blog.csdn.net/baolibin528…

6,shuffle 调优

a,合并 map 端输出文件: shuffle 阶段,前一个 stage 会为后一个 stage 每一个 task 准备一份 map 端的输出文件,供后一个 stage 拉取。若是后一个 stage 的 task 数量不少,那么前一个 stage 产生的 map 输出文件特别多,这时候须要开启 map 端文件合并,可以减小不少文件,有助于 shuffle 阶段的性能提高。

调节参数:new SparkConf().set("spark.shuffle.consolidateFiles", "true")

b,调整 map 端内存缓冲和 reduce 端的内存占比: map 端缓冲内存大小默认是 32k,reduce 端用来缓存的内存占比默认是 0.2。

调节参数为:spark.shuffle.file.buffer spark.shuffle.memoryFraction

c,HashShuffleManager 和 SortShuffleManager: 参考:my.oschina.net/hblt147/blo…

7,算子调优

MapPartitions提高Map类操做性能、filter事后使用coalesce减小分区数量、foreachPartition优化写数据库性能、repartition解决Spark SQL低并行度的性能问、reduceByKey 在 shuffle 操做时会在 map 端进行一次本地 combine,性能比 groupByKey 要好不少,因此能用 reduceByKey 的地方尽可能用 reduceByKey。

相关文章
相关标签/搜索