Spark(十二)--性能调优篇

一段程序只能完成功能是没有用的,只能可以稳定、高效率地运行才是生成环境所须要的。html

本篇记录了Spark各个角度的调优技巧,以备不时之需。算法

1、配置参数的方式和观察性能的方式

额。。。从最基本的开始讲,可能一些刚接触Spark的人不是很清楚Spark的一些参数变量到底要配置在哪里。数据库

能够经过三种方式配置参数,任选其一皆可。apache

  1. spark-env.sh文件中配置:最近常使用的配置方式,格式能够参考其中的一些官方保留的配置。
  2. 程序中经过SparkConf配置:经过SparkConf对象set方法设置键值对,比较直观。
  3. 程序中经过System.setProperty配置:和方法二差很少。

值得一提的是一个略显诡异的现象,有些参数在spark-env.sh中配置并不起做用,反而要在程序中设置才有效果。markdown

Spark的参数不少,一些默认的设置能够参考官网推荐的配置参数:/docs/latest/configuration.html网络

能够经过如下几种方式来观察Spark集群的状态和相关性能问题:app

  1. Web UI:即8088端口进入的UI界面。
  2. Driver程序日志:根据程序提交方式的不一样到指定的节点上观察Driver程序日志。
  3. logs文件夹下的日志:Spark集群的大部分信息都会记录在这里。
  4. works文件夹下的日志:主要记录Work节点的信息。
  5. Profiler工具:没有使用过。

前景交代完毕,下面进入正题:分布式

2、调度与分区优化

一、小分区合并的问题函数

因为程序中过分使用filter算子或者使用不当,都会形成大量的小分区出现。
由于每次过滤获得的结果只有原来数据集的一小部分,而这些量很小的数据一样会以必定的分区数并行化分配到各个节点中执行。工具

带来的问题就是:任务处理的数据量很小,反复地切换任务所消耗的资源反而会带来很大的系统开销。

解决方案:使用重分区函数coalesce进行数据紧缩、减小分区数并设置shuffle=true保证任务是并行计算的

减小分区数,虽然意味着并行度下降,可是相对比以前的大量小任务过分切换的消耗,倒是比较值得的。

这里也能够直接使用repartition重分区函数进行操做,由于其底层使用的是coalesce并设置Shuffle=true

二、数据倾斜问题

这是一个生产环境中常常遇到的问题,典型的场景是:大量的数据被分配到小部分节点计算,而其余大部分节点却只计算小部分数据。

问题产生的缘由有不少,可能且不所有包括:

  • key的数据分布不均匀
  • 业务数据自己缘由
  • 结构化表设计问题
  • 某些SQL语句会形成数据倾斜

可选的解决方案有:

  1. 增大任务数,减小分区数量:这种方法和解决小分区问题相似。
  2. 对特殊的key进行处理,如空值等:直接过滤掉空值的key以避免对任务产生干扰。
  3. 使用广播:小数据量直接广播,大数据量先拆分以后再进行广播。

还有一种场景是任务执行速度倾斜问题:集群中其余节点都计算完毕了,可是只有少数几个节点死活运行不完。(其实这和上面的那个场景是差很少的)

解决方案:

  • 设置spark.speculation=true将执行事件过长的节点去掉,从新分配任务
  • spark.speculation.interval用来设置执行间隔

三、并行度调整

官方推荐每一个CPU CORE分配2-3个任务。

  • 任务数太多:并行度过高,产生大量的任务启动和切换开销。
  • 任务数过低:并行度太低,没法发挥集群并行计算能力,任务执行慢

Spark会根据文件大小默认配置Map阶段的任务数,因此咱们可以自行调整的就是Reduce阶段的分区数了。

  • reduceByKey等操做时经过numPartitions参数进行分区数量配置。
  • 经过spark.default.parallelism进行默认分区数配置。

四、DAG调度执行优化

DAG图是Spark计算的基本依赖,因此建议:

  1. 同一个Stage尽可能容纳更多地算子,防止多余的Shuffle。
  2. 复用已经cache的数据。

尽量地在Transformation算子中完成对数据的计算,由于过多的Action算子会产生不少多余的Shuffle,在划分DAG图时会造成众多Stage。

3、网络传输优化

一、大任务分发问题

Spark采用Akka的Actor模型来进行消息传递,包括数据、jar包和相关文件等。

而Akka消息通讯传递默认的容量最大为10M,一旦传递的消息超过这个限制就会出现这样的错误:

Worker任务失败后Master上会打印“Lost TID:”

根据这个信息找到对应的Worker节点后查看SparkHome/work/目录下的日志,查看Serialized size of result是否超过10M,就能够知道是否是Akka这边的问题了。

一旦确认是Akka通讯容量限制以后,就能够经过配置spark.akka.frameSize控制Akka通讯消息的最大容量。

二、Broadcast在调优场景的使用

Broadcast广播,主要是用于共享Spark每一个Task都会用到的一些只读变量。

对于那些每一个Task都会用到的变量来讲,若是每一个Task都为这些变量分配内存空间显然会使用不少多余的资源,使用广播能够有效的避免这个问题,广播以后,这些变量仅仅会在每台机器上保存一份,有Task须要使用时就到本身的机器上读取就ok。

官方推荐,Task大于20k时可使用,能够在控制台上看Task的大小。

三、Collect结果过大的问题

大量数据时将数据存储在HDFS上或者其余,不是大量数据,可是超出Akka传输的Buffer大小,经过配置spark.akka.frameSize调整。

4、序列化与压缩

一、经过序列化手段优化

序列化以前说过,好处多多,因此是推荐能用就用,Spark上的序列化方式有几种,具体的能够参考官方文档。

这里只简单介绍一下Kryo。

配置参数的时候使用spark.serializer=”org.apache.spark.serializer.KryoSerializer”配置

自定义定义能够被Kryo序列化的类的步骤:

  1. 自定义类extends KryoRegistrator
  2. 设置序列化方式conf.set(“spark.serializer”,”org.apache.spark.serializer.KryoSerializer”)
  3. conf.set(“spark.kyro.registrator”,”自定义的class”)
  4. 若是对象占用空间大,须要增长Kryo的缓冲区则配置spark.kryoserializer.buffer.mb上值默认为2M

二、经过压缩手段优化

Spark的Job大体能够分为两种:

  • I/O密集型:即存在大量读取磁盘的操做。
  • CPU密集型:即存在大量的数据计算,使用CPU资源较多。

对于I/O密集型的Job,能压缩就压缩,由于读磁盘的时候数据压缩了,占用空间小了,读取速度不就快了。

对于CPU密集型的Job,看具体CPU使用状况再作决定,由于使用压缩是须要消耗一些CPU资源的,若是当前CPU已经超负荷了,再使用压缩反而拔苗助长。

Spark支持两种压缩算法:

  • LZF:高压缩比
  • Snappy:高速度

一些压缩相关的参数配置:

  1. spark.broadcast.compress:推荐为true
  2. spark.rdd.compress:默认为false,看状况配置,压缩花费一些时间,可是能够节省大量内存空间
  3. spark.io.compression.codec:org.apache.spark.io.LZFCompressionCodec根据状况选择压缩算法
  4. spark.io.compressions.snappy.block.size:设置Snappy压缩的块大小

5、其余优化方式

一、对外部资源的批处理操做

如操做数据库时,每一个分区的数据应该一块儿执行一次批处理,而不是一条数据写一次,即map=>mapPartition。

二、reduce和reduceByKey

reduce:内部调用了runJob方法,是一个action操做。
reduceByKey:内部只是调用了combineBykey,是Transformation操做。

大量的数据操做时,reduce汇总全部数据到主节点会有性能瓶颈,将数据转换为Key-Value的形式使用reduceByKey实现逻辑,会作相似mr程序中的Combiner的操做,Transformation操做分布式进行。

三、Shuffle操做符的内存使用

使用会触发Shuffle过程的操做符时,操做的数据集合太大形成OOM,每一个任务执行过程当中会在各自的内存建立Hash表来进行数据分组。

能够解决的方案可能有:

  • 增长并行度即分区数能够适当解决问题
  • 能够将任务数量扩展到超过集群总体的CPU core数
相关文章
相关标签/搜索