一段程序只能完成功能是没有用的,只能可以稳定、高效率地运行才是生成环境所须要的。html
本篇记录了Spark各个角度的调优技巧,以备不时之需。算法
额。。。从最基本的开始讲,可能一些刚接触Spark的人不是很清楚Spark的一些参数变量到底要配置在哪里。数据库
能够经过三种方式配置参数,任选其一皆可。apache
值得一提的是一个略显诡异的现象,有些参数在spark-env.sh中配置并不起做用,反而要在程序中设置才有效果。markdown
Spark的参数不少,一些默认的设置能够参考官网推荐的配置参数:/docs/latest/configuration.html网络
能够经过如下几种方式来观察Spark集群的状态和相关性能问题:app
前景交代完毕,下面进入正题:分布式
一、小分区合并的问题函数
因为程序中过分使用filter算子或者使用不当,都会形成大量的小分区出现。
由于每次过滤获得的结果只有原来数据集的一小部分,而这些量很小的数据一样会以必定的分区数并行化分配到各个节点中执行。工具
带来的问题就是:任务处理的数据量很小,反复地切换任务所消耗的资源反而会带来很大的系统开销。
解决方案:使用重分区函数coalesce进行数据紧缩、减小分区数并设置shuffle=true保证任务是并行计算的
减小分区数,虽然意味着并行度下降,可是相对比以前的大量小任务过分切换的消耗,倒是比较值得的。
这里也能够直接使用repartition重分区函数进行操做,由于其底层使用的是coalesce并设置Shuffle=true
二、数据倾斜问题
这是一个生产环境中常常遇到的问题,典型的场景是:大量的数据被分配到小部分节点计算,而其余大部分节点却只计算小部分数据。
问题产生的缘由有不少,可能且不所有包括:
可选的解决方案有:
还有一种场景是任务执行速度倾斜问题:集群中其余节点都计算完毕了,可是只有少数几个节点死活运行不完。(其实这和上面的那个场景是差很少的)
解决方案:
三、并行度调整
官方推荐每一个CPU CORE分配2-3个任务。
Spark会根据文件大小默认配置Map阶段的任务数,因此咱们可以自行调整的就是Reduce阶段的分区数了。
四、DAG调度执行优化
DAG图是Spark计算的基本依赖,因此建议:
尽量地在Transformation算子中完成对数据的计算,由于过多的Action算子会产生不少多余的Shuffle,在划分DAG图时会造成众多Stage。
一、大任务分发问题
Spark采用Akka的Actor模型来进行消息传递,包括数据、jar包和相关文件等。
而Akka消息通讯传递默认的容量最大为10M,一旦传递的消息超过这个限制就会出现这样的错误:
Worker任务失败后Master上会打印“Lost TID:”
根据这个信息找到对应的Worker节点后查看SparkHome/work/目录下的日志,查看Serialized size of result是否超过10M,就能够知道是否是Akka这边的问题了。
一旦确认是Akka通讯容量限制以后,就能够经过配置spark.akka.frameSize控制Akka通讯消息的最大容量。
二、Broadcast在调优场景的使用
Broadcast广播,主要是用于共享Spark每一个Task都会用到的一些只读变量。
对于那些每一个Task都会用到的变量来讲,若是每一个Task都为这些变量分配内存空间显然会使用不少多余的资源,使用广播能够有效的避免这个问题,广播以后,这些变量仅仅会在每台机器上保存一份,有Task须要使用时就到本身的机器上读取就ok。
官方推荐,Task大于20k时可使用,能够在控制台上看Task的大小。
三、Collect结果过大的问题
大量数据时将数据存储在HDFS上或者其余,不是大量数据,可是超出Akka传输的Buffer大小,经过配置spark.akka.frameSize调整。
一、经过序列化手段优化
序列化以前说过,好处多多,因此是推荐能用就用,Spark上的序列化方式有几种,具体的能够参考官方文档。
这里只简单介绍一下Kryo。
配置参数的时候使用spark.serializer=”org.apache.spark.serializer.KryoSerializer”配置
自定义定义能够被Kryo序列化的类的步骤:
二、经过压缩手段优化
Spark的Job大体能够分为两种:
对于I/O密集型的Job,能压缩就压缩,由于读磁盘的时候数据压缩了,占用空间小了,读取速度不就快了。
对于CPU密集型的Job,看具体CPU使用状况再作决定,由于使用压缩是须要消耗一些CPU资源的,若是当前CPU已经超负荷了,再使用压缩反而拔苗助长。
Spark支持两种压缩算法:
一些压缩相关的参数配置:
一、对外部资源的批处理操做
如操做数据库时,每一个分区的数据应该一块儿执行一次批处理,而不是一条数据写一次,即map=>mapPartition。
二、reduce和reduceByKey
reduce:内部调用了runJob方法,是一个action操做。
reduceByKey:内部只是调用了combineBykey,是Transformation操做。
大量的数据操做时,reduce汇总全部数据到主节点会有性能瓶颈,将数据转换为Key-Value的形式使用reduceByKey实现逻辑,会作相似mr程序中的Combiner的操做,Transformation操做分布式进行。
三、Shuffle操做符的内存使用
使用会触发Shuffle过程的操做符时,操做的数据集合太大形成OOM,每一个任务执行过程当中会在各自的内存建立Hash表来进行数据分组。
能够解决的方案可能有: