下面这些关于Spark的性能调优项,有的是来自官方的,有的是来自别的的工程师,有的则是我本身总结的。node
基本概念和原则程序员
首先,要搞清楚Spark的几个基本概念和原则,不然系统的性能调优无从谈起:算法
每一台host上面能够并行N个worker,每个worker下面能够并行M个executor,task们会被分配到executor上面去执行。Stage指的是一组并行运行的task,stage内部是不能出现shuffle的,由于shuffle的就像篱笆同样阻止了并行task的运行,遇到shuffle就意味着到了stage的边界。缓存
CPU的core数量,每一个executor能够占用一个或多个core,能够经过观察CPU的使用率变化来了解计算资源的使用状况,例如,很常见的一种浪费是一个executor占用了多个core,可是总的CPU使用率却不高(由于一个executor并不总能充分利用多核的能力),这个时候能够考虑让么个executor占用更少的core,同时worker下面增长更多的executor,或者一台host上面增长更多的worker来增长并行执行的executor的数量,从而增长CPU利用率。可是增长executor的时候须要考虑好内存消耗的控制,以避免出现Out of Memory的状况。网络
partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值过小了会致使每片数据量太大,致使内存压力,或者诸多executor的计算能力没法利用充分;可是若是太大了则会致使分片太多,执行效率下降。在执行action类型操做的时候(好比各类reduce操做),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操做的时候,默认返回数据的paritition数量(而在进行map类操做的时候,partition数量一般取自parent RDD中较大的一个,并且也不会涉及shuffle,所以这个parallelism的参数没有影响)。因此说,这两个概念密切相关,都是涉及到数据分片的,做用方式实际上是统一的。经过spark.default.parallelism能够设置默认的分片数量,而不少RDD的操做均可以指定一个partition参数来显式控制具体的分片数量。数据结构
上面这两条原理上看起来很简单,可是却很是重要,根据硬件和任务的状况选择不一样的取值。想要取一个放之四海而皆准的配置是不现实的。看这样几个例子:(1)实践中跑的EMR Spark job,有的特别慢,查看CPU利用率很低,咱们就尝试减小每一个executor占用CPU core的数量,增长并行的executor数量,同时配合增长分片,总体上增长了CPU的利用率,加快数据处理速度。(2)发现某job很容易发生内存溢出,咱们就增大分片数量,从而减小了每片数据的规模,同时还减小并行的executor数量,这样相同的内存资源分配给数量更少的executor,至关于增长了每一个task的内存分配,这样运行速度可能慢了些,可是总比OOM强。(3)数据量特别少,有大量的小文件生成,就减小文件分片,不必建立那么多task,这种状况,若是只是最原始的input比较小,通常都能被注意到;可是,若是是在运算过程当中,好比应用某个reduceBy或者某个filter之后,数据大量减小,这种低效状况就不多被留意到。app
最后再补充一点,随着参数和配置的变化,性能的瓶颈是变化的,在分析问题的时候不要忘记。例如在每台机器上部署的executor数量增长的时候,性能一开始是增长的,同时也观察到CPU的平均使用率在增长;可是随着单台机器上的executor愈来愈多,性能降低了,由于随着executor的数量增长,被分配到每一个executor的内存数量减少,在内存里直接操做的愈来愈少,spill over到磁盘上的数据愈来愈多,天然性能就变差了。框架
下面给这样一个直观的例子,当前总的cpu利用率并不高:ide
可是通过根据上述原则的的调整以后,能够显著发现cpu总利用率增长了:工具
其次,涉及性能调优咱们常常要改配置,在Spark里面有三种常见的配置方式,虽然有些参数的配置是能够互相替代,可是做为最佳实践,仍是须要遵循不一样的情形下使用不一样的配置:
设置环境变量,这种方式主要用于和环境、硬件相关的配置;
命令行参数,这种方式主要用于不一样次的运行会发生变化的参数,用双横线开头;
代码里面(好比Scala)显式设置(SparkConf对象),这种配置一般是application级别的配置,通常不改变。
举一个配置的具体例子。Node、worker和executor之间的比例调整。咱们常常须要调整并行的executor的数量,那么简单说有两种方式:
一个是调整并行的worker的数量,好比,SPARK_WORKER_INSTANCES能够设置每一个node的worker的数量,可是在改变这个参数的时候,好比改为2,必定要相应设置SPARK_WORKER_CORES的值,让每一个worker使用原有一半的core,这样才能让两个worker一同工做;
另外一个是调整worker内executor的数量,咱们是在YARN框架下采用这个调整来实现executor数量改变的,一种典型办法是,一个host只跑一个worker,而后配置spark.executor.cores为host上CPU core的N分之一,同时也设置spark.executor.memory为host上分配给Spark计算内存的N分之一,这样这个host上就可以启动N个executor。
有的配置在不一样的MR框架/工具下是不同的,好比YARN下有的参数的默认取值就不一样,这点须要注意。
明确这些基础的事情之后,再来一项一项看性能调优的要点。
内存
Memory Tuning,Java对象会占用原始数据2~5倍甚至更多的空间。最好的检测对象内存消耗的办法就是建立RDD,而后放到cache里面去,而后在UI上面看storage的变化;固然也可使用SizeEstimator来估算。使用-XX:+UseCompressedOops选项能够压缩指针(8字节变成4字节)。在调用collect等等API的时候也要当心——大块数据往内存拷贝的时候内心要清楚。内存要留一些给操做系统,好比20%,这里面也包括了OS的buffer cache,若是预留得太少了,会见到这样的错误:
Required executor memory (235520+23552 MB) is above the max threshold (241664 MB) of this cluster! Please increase the value of ‘yarn.scheduler.maximum-allocation-mb’.
或者干脆就没有这样的错误,可是依然有由于内存不足致使的问题,有的会有警告,好比这个:
16/01/13 23:54:48 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
Reduce Task的内存使用。在某些状况下reduce task特别消耗内存,好比当shuffle出现的时候,好比sortByKey、groupByKey、reduceByKey和join等,要在内存里面创建一个巨大的hash table。其中一个解决办法是增大level of parallelism,这样每一个task的输入规模就相应减少。
注意原始input的大小,有不少操做始终都是须要某类全集数据在内存里面完成的,那么并不是拼命增长parallelism和partition的值就能够把内存占用减得很是小的。咱们遇到过某些性能低下甚至OOM的问题,是改变这两个参数所难以缓解的。可是能够经过增长每台机器的内存,或者增长机器的数量均可以直接或间接增长内存总量来解决。
在选择EC2机器类型的时候,要明确瓶颈(能够借由测试来明确),好比咱们遇到的状况就是使用r3.8 xlarge和c3.8 xlarge选择的问题,运算能力至关,前者比后者贵50%,可是内存是后者的5倍。
CPU
Level of Parallelism。指定它之后,在进行reduce类型操做的时候,默认partition的数量就被指定了。这个参数在实际工程中一般是必不可少的,通常都要根据input和每一个executor内存的大小来肯定。设置level of parallelism或者属性spark.default.parallelism来改变并行级别,一般来讲,每个CPU核能够分配2~3个task。
CPU core的访问模式是共享仍是独占。即CPU核是被同一host上的executor共享仍是瓜分并独占。好比YARN环境,一台机器上共有32个CPU core的资源,同时部署了两个executor,总内存是50G,那么一种方式是配置spark.executor.cores为16,spark.executor.memory为20G,这样因为内存的限制,这台机器上会部署两个executor,每一个都使用20G内存,而且各使用独占的16个CPU core资源;而若是把spark.executor.cores配置为32,那么依然会部署两个executor,可是两者会共享这32个core。根据个人测试,独占模式的性能要略好与共享模式。同时,独占模式也是Spark官方文档上推荐的方式。
GC调优。打印GC信息:-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps。默认60%的executor内存能够被用来做为RDD的缓存,所以只有40%的内存能够被用来做为对象建立的空间,这一点能够经过设置spark.storage.memoryFraction改变。若是有不少小对象建立,可是这些对象在不彻底GC的过程当中就能够回收,那么增大Eden区会有必定帮助。若是有任务从HDFS拷贝数据,内存消耗有一个简单的估算公式——好比HDFS的block size是64MB,工做区内有4个task拷贝数据,而解压缩一个block要增大3倍大小,那么内存消耗就是:4*3*64MB。另外,工做中遇到过这样的一个问题:GC默认状况下有一个限制,默认是GC时间不能超过2%的CPU时间,可是若是大量对象建立(在Spark里很容易出现,代码模式就是一个RDD转下一个RDD),就会致使大量的GC时间,从而出现OutOfMemoryError: GC overhead limit exceeded,能够经过设置-XX:-UseGCOverheadLimit关掉它。
序列化和传输
Data Serialization,默认使用的是Java Serialization,这个程序员最熟悉,可是性能、空间表现都比较差。还有一个选项是Kryo Serialization,更快,压缩率也更高,可是并不是支持任意类的序列化。在Spark UI上可以看到序列化占用总时间开销的比例,若是这个比例高的话能够考虑优化内存使用和序列化。
Broadcasting Large Variables。在task使用静态大对象的时候,能够把它broadcast出去。Spark会打印序列化后的大小,一般来讲若是它超过20KB就值得这么作。有一种常见情形是,一个大表join一个小表,把小表broadcast后,大表的数据就不须要在各个node之间疯跑,安安静静地呆在本地等小表broadcast过来就行了。
Data Locality。数据和代码要放到一块儿才能处理,一般代码总比数据要小一些,所以把代码送到各处会更快。Data Locality是数据和处理的代码在屋里空间上接近的程度:PROCESS_LOCAL(同一个JVM)、NODE_LOCAL(同一个node,好比数据在HDFS上,可是和代码在同一个node)、NO_PREF、RACK_LOCAL(不在同一个server,但在同一个机架)、ANY。固然优先级从高到低,可是若是在空闲的executor上面没有未处理数据了,那么就有两个选择:(1)要么等现在繁忙的CPU闲下来处理尽量“本地”的数据,(1)要么就不等直接启动task去处理相对远程的数据。默认当这种状况发生Spark会等一下子(spark.locality),即策略(1),若是繁忙的CPU停不下来,就会执行策略(2)。
代码里对大对象的引用。在task里面引用大对象的时候要当心,由于它会随着task序列化到每一个节点上去,引起性能问题。只要序列化的过程不抛出异常,引用对象序列化的问题事实上不多被人重视。若是,这个大对象确实是须要的,那么就不如干脆把它变成RDD好了。绝大多数时候,对于大对象的序列化行为,是不知不觉发生的,或者说是预期以外的,好比在咱们的项目中有这样一段代码:
1
2
3
|
rdd.map(r
=
> {
println(BackfillTypeIndex)
})
|
其实呢,它等价于这样:
1
2
3
|
rdd.map(r
=
> {
println(
this
.BackfillTypeIndex)
})
|
对于这样的问题,一种最直接的解决方法就是:
1
2
|
val
dereferencedVariable
=
this
.BackfillTypeIndex
rdd.map(r
=
> println(dereferencedVariable))
// "this" is not serialized
|
相关地,注解@transient用来标识某变量不要被序列化,这对于将大对象从序列化的陷阱中排除掉是颇有用的。另外,注意class之间的继承层级关系,有时候一个小的case class可能来自一棵大树。
文件读写
文件存储和读取的优化。好比对于一些case而言,若是只须要某几列,使用rcfile和parquet这样的格式会大大减小文件读取成本。再有就是存储文件到S3上或者HDFS上,能够根据状况选择更合适的格式,好比压缩率更高的格式。另外,特别是对于shuffle特别多的状况,考虑留下必定量的额外内存给操做系统做为操做系统的buffer cache,好比总共50G的内存,JVM最多分配到40G多一点。
文件分片。好比在S3上面就支持文件以分片形式存放,后缀是partXX。使用coalesce方法来设置分红多少片,这个调整成并行级别或者其整数倍能够提升读写性能。可是过高过低都很差,过低了无法充分利用S3并行读写的能力,过高了则是小文件太多,预处理、合并、链接创建等等都是时间开销啊,读写还容易超过throttle。
任务
Spark的Speculation。经过设置spark.speculation等几个相关选项,可让Spark在发现某些task执行特别慢的时候,能够在不等待完成的状况下被从新执行,最后相同的task只要有一个执行完了,那么最快执行完的那个结果就会被采纳。
减小Shuffle。其实Spark的计算每每很快,可是大量开销都花在网络和IO上面,而shuffle就是一个典型。举个例子,若是(k, v1) join (k, v2) => (k, v3),那么,这种状况其实Spark是优化得很是好的,由于须要join的都在一个node的一个partition里面,join很快完成,结果也是在同一个node(这一系列操做能够被放在同一个stage里面)。可是若是数据结构被设计为(obj1) join (obj2) => (obj3),而其中的join条件为obj1.column1 == obj2.column1,这个时候每每就被迫shuffle了,由于再也不有同一个key使得数据在同一个node上的强保证。在必定要shuffle的状况下,尽量减小shuffle前的数据规模,好比这个避免groupByKey的例子。下面这个比较的图片来自Spark Summit 2013的一个演讲,讲的是同一件事情:
Repartition。运算过程当中数据量时大时小,选择合适的partition数量关系重大,若是太多partition就致使有不少小任务和空任务产生;若是太少则致使运算资源无法充分利用,必要时候可使用repartition来调整,不过它也不是没有代价的,其中一个最主要代价就是shuffle。再有一个常见问题是数据大小差别太大,这种状况主要是数据的partition的key其实取值并不均匀形成的(默认使用HashPartitioner),须要改进这一点,好比重写hash算法。测试的时候想知道partition的数量能够调用rdd.partitions().size()获知。
Task时间分布。关注Spark UI,在Stage的详情页面上,能够看获得shuffle写的总开销,GC时间,当前方法栈,还有task的时间花费。若是你发现task的时间花费分布太散,就是说有的花费时间很长,有的很短,这就说明计算分布不均,须要从新审视数据分片、key的hash、task内部的计算逻辑等等,瓶颈出如今耗时长的task上面。
重用资源。有的资源申请开销巨大,并且每每至关有限,好比创建链接,能够考虑在partition创建的时候就建立好(好比使用mapPartition方法),这样对于每一个partition内的每一个元素的操做,就只要重用这个链接就行了,不须要从新创建链接。
可供参考的文档:官方调优文档Tuning Spark,Spark配置的官方文档,Spark Programming Guide,JVMGC调优文档,JVM性能调优文档,How-to: Tune Your Apache Spark Jobs part-1 & part-2。