spark性能优化指南

在大数据计算领域,Spark已经成为了愈来愈流行、愈来愈受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各类不一样类型的计算操做,应用范围与前景很是普遍。在美团•大众点评,已经有不少同窗在各类项目中尝试使用Spark。大多数同窗(包括笔者在内),最初开始尝试使用Spark的缘由很简单,主要就是为了让大数据计算做业的执行速度更快、性能更高。web

然而,经过Spark开发出高性能的大数据计算做业,并非那么简单的。若是没有对Spark做业进行合理的调优,Spark做业的执行速度可能会很慢,这样就彻底体现不出Spark做为一种快速大数据计算引擎的优点来。所以,想要用好Spark,就必须对其进行合理的性能优化。数组

Spark的性能调优其实是由不少部分组成的,不是调节几个参数就能够立竿见影提高做业性能的。咱们须要根据不一样的业务场景以及数据状况,对Spark做业进行综合性的分析,而后进行多个方面的调节和优化,才能得到最佳性能。缓存

笔者根据以前的Spark做业开发经验以及实践积累,总结出了一套Spark做业的性能优化方案。整套方案主要分为开发调优、资源调优、数据倾斜调优、shuffle调优几个部分。开发调优和资源调优是全部Spark做业都须要注意和遵循的一些基本原则,是高性能Spark做业的基础;数据倾斜调优,主要讲解了一套完整的用来解决Spark做业数据倾斜的解决方案;shuffle调优,面向的是对Spark的原理有较深层次掌握和研究的同窗,主要讲解了如何对Spark做业的shuffle运行过程以及细节进行调优。性能优化

本文做为Spark性能优化指南的基础篇,主要讲解开发调优以及资源调优。网络

开发调优
调优概述
Spark性能优化的第一步,就是要在开发Spark做业的过程当中注意和应用一些性能优化的基本原则。开发调优,就是要让你们了解如下一些Spark基本开发原则,包括:RDD lineage设计、算子的合理使用、特殊操做的优化等。在开发过程当中,时时刻刻都应该注意以上原则,并将这些原则根据具体的业务以及实际的应用场景,灵活地运用到本身的Spark做业中。并发

原则一:避免建立重复的RDD
一般来讲,咱们在开发一个Spark做业时,首先是基于某个数据源(好比Hive表或HDFS文件)建立一个初始的RDD;接着对这个RDD执行某个算子操做,而后获得下一个RDD;以此类推,循环往复,直到计算出最终咱们须要的结果。在这个过程当中,多个RDD会经过不一样的算子操做(好比map、reduce等)串起来,这个“RDD串”,就是RDD lineage,也就是“RDD的血缘关系链”。机器学习

咱们在开发过程当中要注意:对于同一份数据,只应该建立一个RDD,不能建立多个RDD来表明同一份数据。ide

一些Spark初学者在刚开始开发Spark做业时,或者是有经验的工程师在开发RDD lineage极其冗长的Spark做业时,可能会忘了本身以前对于某一份数据已经建立过一个RDD了,从而致使对于同一份数据,建立了多个RDD。这就意味着,咱们的Spark做业会进行屡次重复计算来建立多个表明相同数据的RDD,进而增长了做业的性能开销。svg

一个简单的例子函数

// 须要对名为“hello.txt”的HDFS文件进行一次map操做,再进行一次reduce操做。也就是说,须要对一份数据执行两次算子操做。

// 错误的作法:对于同一份数据执行屡次算子操做时,建立多个RDD。
// 这里执行了两次textFile方法,针对同一个HDFS文件,建立了两个RDD出来,而后分别对每一个RDD都执行了一个算子操做。
// 这种状况下,Spark须要从HDFS上两次加载hello.txt文件的内容,并建立两个单独的RDD;第二次加载HDFS文件以及建立RDD的性能开销,很明显是白白浪费掉的。
val rdd1 = sc.textFile(“hdfs://192.168.0.1:9000/hello.txt”)
rdd1.map(…)
val rdd2 = sc.textFile(“hdfs://192.168.0.1:9000/hello.txt”)
rdd2.reduce(…)

// 正确的用法:对于一份数据执行屡次算子操做时,只使用一个RDD。
// 这种写法很明显比上一种写法要好多了,由于咱们对于同一份数据只建立了一个RDD,而后对这一个RDD执行了屡次算子操做。
// 可是要注意到这里为止优化尚未结束,因为rdd1被执行了两次算子操做,第二次执行reduce操做的时候,还会再次从源头处从新计算一次rdd1的数据,所以仍是会有重复计算的性能开销。
// 要完全解决这个问题,必须结合“原则三:对屡次使用的RDD进行持久化”,才能保证一个RDD被屡次使用时只被计算一次。
val rdd1 = sc.textFile(“hdfs://192.168.0.1:9000/hello.txt”)
rdd1.map(…)
rdd1.reduce(…)
原则二:尽量复用同一个RDD
除了要避免在开发过程当中对一份彻底相同的数据建立多个RDD以外,在对不一样的数据执行算子操做时还要尽量地复用一个RDD。好比说,有一个RDD的数据格式是key-value类型的,另外一个是单value类型的,这两个RDD的value数据是彻底同样的。那么此时咱们能够只使用key-value类型的那个RDD,由于其中已经包含了另外一个的数据。对于相似这种多个RDD的数据有重叠或者包含的状况,咱们应该尽可能复用一个RDD,这样能够尽量地减小RDD的数量,从而尽量减小算子执行的次数。

一个简单的例子

// 错误的作法。

// 有一个<Long, String>格式的RDD,即rdd1。
// 接着因为业务须要,对rdd1执行了一个map操做,建立了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。
JavaPairRDD<Long, String> rdd1 = …
JavaRDD rdd2 = rdd1.map(…)

// 分别对rdd1和rdd2执行了不一样的算子操做。
rdd1.reduceByKey(…)
rdd2.map(…)

// 正确的作法。

// 上面这个case中,其实rdd1和rdd2的区别无非就是数据格式不一样而已,rdd2的数据彻底就是rdd1的子集而已,却建立了两个rdd,并对两个rdd都执行了一次算子操做。
// 此时会由于对rdd1执行map算子来建立rdd2,而多执行一次算子操做,进而增长性能开销。

// 其实在这种状况下彻底能够复用同一个RDD。
// 咱们可使用rdd1,既作reduceByKey操做,也作map操做。
// 在进行第二个map操做时,只使用每一个数据的tuple._2,也就是rdd1中的value值,便可。
JavaPairRDD<Long, String> rdd1 = …
rdd1.reduceByKey(…)
rdd1.map(tuple._2…)

// 第二种方式相较于第一种方式而言,很明显减小了一次rdd2的计算开销。
// 可是到这里为止,优化尚未结束,对rdd1咱们仍是执行了两次算子操做,rdd1实际上仍是会被计算两次。
// 所以还须要配合“原则三:对屡次使用的RDD进行持久化”进行使用,才能保证一个RDD被屡次使用时只被计算一次。
原则三:对屡次使用的RDD进行持久化
当你在Spark代码中屡次对一个RDD作了算子操做后,恭喜,你已经实现Spark做业第一步的优化了,也就是尽量复用RDD。此时就该在这个基础之上,进行第二步优化了,也就是要保证对一个RDD执行屡次算子操做时,这个RDD自己仅仅被计算一次。

Spark中对于一个RDD执行屡次算子的默认原理是这样的:每次你对一个RDD执行一个算子操做时,都会从新从源头处计算一遍,计算出那个RDD来,而后再对这个RDD执行你的算子操做。这种方式的性能是不好的。

所以对于这种状况,咱们的建议是:对屡次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。之后每次对这个RDD进行算子操做时,都会直接从内存或磁盘中提取持久化的RDD数据,而后执行算子,而不会从源头处从新计算一遍这个RDD,再执行算子操做。

对屡次使用的RDD进行持久化的代码示例

// 若是要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()便可。

// 正确的作法。
// cache()方法表示:使用非序列化的方式将RDD中的数据所有尝试持久化到内存中。
// 此时再对rdd1执行两次算子操做时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。
// 第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。
val rdd1 = sc.textFile(“hdfs://192.168.0.1:9000/hello.txt”).cache()
rdd1.map(…)
rdd1.reduce(…)

// persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。
// 好比说,StorageLevel.MEMORY_AND_DISK_SER表示,内存充足时优先持久化到内存中,内存不充足时持久化到磁盘文件中。
// 并且其中的_SER后缀表示,使用序列化的方式来保存RDD数据,此时RDD中的每一个partition都会序列化成一个大的字节数组,而后再持久化到内存或磁盘中。
// 序列化的方式能够减小持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁GC。
val rdd1 = sc.textFile(“hdfs://192.168.0.1:9000/hello.txt”).persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(…)
rdd1.reduce(…)
对于persist()方法而言,咱们能够根据不一样的业务场景选择不一样的持久化级别。
Spark的持久化级别

持久化级别 含义解释
MEMORY_ONLY 使用未序列化的Java对象格式,将数据保存在内存中。若是内存不够存放全部的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操做时,那些没有被持久化的数据,须要从源头处从新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
MEMORY_AND_DISK 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。若是内存不够存放全部的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
MEMORY_ONLY_SER 基本含义同MEMORY_ONLY。惟一的区别是,会将RDD中的数据进行序列化,RDD的每一个partition会被序列化成一个字节数组。这种方式更加节省内存,从而能够避免持久化的数据占用过多内存致使频繁GC。
MEMORY_AND_DISK_SER 基本含义同MEMORY_AND_DISK。惟一的区别是,会将RDD中的数据进行序列化,RDD的每一个partition会被序列化成一个字节数组。这种方式更加节省内存,从而能够避免持久化的数据占用过多内存致使频繁GC。
DISK_ONLY 使用未序列化的Java对象格式,将数据所有写入磁盘文件中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. 对于上述任意一种持久化策略,若是加上后缀_2,表明的是将每一个持久化的数据,都复制一份副本,并将副本保存到其余节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可使用该数据在其余节点上的副本。若是没有副本的话,就只能将这些数据从源头处从新计算一遍了。
如何选择一种最合适的持久化策略

默认状况下,性能最高的固然是MEMORY_ONLY,但前提是你的内存必须足够足够大,能够绰绰有余地存放下整个RDD的全部数据。由于不进行序列化与反序列化操做,就避免了这部分的性能开销;对这个RDD的后续算子操做,都是基于纯内存中的数据的操做,不须要从磁盘文件中读取数据,性能也很高;并且不须要复制一份数据副本,并远程传送到其余节点上。可是这里必需要注意的是,在实际的生产环境中,恐怕可以直接用这种策略的场景仍是有限的,若是RDD中数据比较多时(好比几十亿),直接用这种持久化级别,会致使JVM的OOM内存溢出异常。

若是使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每一个partition仅仅是一个字节数组而已,大大减小了对象数量,并下降了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。可是后续算子能够基于纯内存进行操做,所以性能整体仍是比较高的。此外,可能发生的问题同上,若是RDD中的数据量过多的话,仍是可能会致使OOM内存溢出的异常。

若是纯内存的级别都没法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。由于既然到了这一步,就说明RDD的数据量很大,内存没法彻底放下。序列化后的数据比较少,能够节省内存和磁盘的空间开销。同时该策略会优先尽可能尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

一般不建议使用DISK_ONLY和后缀为_2的级别:由于彻底基于磁盘文件进行数据的读写,会致使性能急剧下降,有时还不如从新计算一次全部RDD。后缀为_2的级别,必须将全部数据都复制一份副本,并发送到其余节点上,数据复制以及网络传输会致使较大的性能开销,除非是要求做业的高可用性,不然不建议使用。

原则四:尽可能避免使用shuffle类算子
若是有可能的话,要尽可能避免使用shuffle类算子。由于Spark做业运行过程当中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来讲,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操做。好比reduceByKey、join等算子,都会触发shuffle操做。

shuffle过程当中,各个节点上的相同key都会先写入本地磁盘文件中,而后其余节点须要经过网络传输拉取各个节点上的磁盘文件中的相同key。并且相同key都拉取到同一个节点进行聚合操做时,还有可能会由于一个节点上处理的key过多,致使内存不够存放,进而溢写到磁盘文件中。所以在shuffle过程当中,可能会发生大量的磁盘文件读写的IO操做,以及数据的网络传输操做。磁盘IO和网络数据传输也是shuffle性能较差的主要缘由。

所以在咱们的开发过程当中,能避免则尽量避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽可能使用map类的非shuffle算子。这样的话,没有shuffle操做或者仅有较少shuffle操做的Spark做业,能够大大减小性能开销。

Broadcast与map进行join代码示例

// 传统的join操做会致使shuffle操做。
// 由于两个RDD中,相同的key都须要经过网络拉取到一个节点上,由一个task进行join操做。
val rdd3 = rdd1.join(rdd2)

// Broadcast+map的join操做,不会致使shuffle操做。
// 使用Broadcast将一个数据量较小的RDD做为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// 在rdd1.map算子中,能够从rdd2DataBroadcast中,获取rdd2的全部数据。
// 而后进行遍历,若是发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就断定能够进行join。
// 此时就能够根据本身须要的方式,将rdd1当前数据与rdd2中能够链接的数据,拼接在一块儿(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast…)

// 注意,以上操做,建议仅仅在rdd2的数据量比较少(好比几百M,或者一两G)的状况下使用。
// 由于每一个Executor的内存中,都会驻留一份rdd2的全量数据。
原则五:使用map-side预聚合的shuffle操做
若是由于业务须要,必定要使用shuffle操做,没法用map类的算子来替代,那么尽可能使用能够map-side预聚合的算子。

所谓的map-side预聚合,说的是在每一个节点本地对相同的key进行一次聚合操做,相似于MapReduce中的本地combiner。map-side预聚合以后,每一个节点本地就只会有一条相同的key,由于多条相同的key都被聚合起来了。其余节点在拉取全部节点上的相同key时,就会大大减小须要拉取的数据数量,从而也就减小了磁盘IO以及网络传输开销。一般来讲,在可能的状况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。由于reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每一个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来讲比较差。

好比,分别基于reduceByKey和groupByKey进行单词计数。其中第一张图是groupByKey的原理图,能够看到,没有进行任何本地聚合时,全部数据都会在集群节点之间传输;第二张图是reduceByKey的原理图,能够看到,每一个节点本地的相同key数据,都进行了预聚合,而后才传输到其余节点上进行全局聚合。