Spark性能优化

Spark性能优化

1)避免建立重复RDDweb

2)尽量复用同一个RDD数据库

3)对屡次使用的RDD进行持久化apache

4)尽可能避免使用shuffle类算子数组

5)使用map-side预聚合的shuffle操做缓存

6)使用高性能的算子性能优化

7)广播大变量网络

8)使用Kryo优化序列化性能数据结构

9)优化数据结构并发

10)资源参数调优ide

1)避免建立重复RDD

​ 对于同一份数据,只应该建立一个RDD,不能建立多个RDD来表明同一份数据。

2)尽量复用同一个RDD

​ 除了要避免在开发过程当中对一份彻底相同的数据建立多个RDD以外,在对不一样的数据执行算子 操做时还要尽量地复用一个RDD。好比说,有一个RDD的数据格式是key-value类型的,另 一个是单value类型的,这两个RDD的value数据是彻底同样的。那么此时咱们能够只使用key-value类型的那个RDD,由于其中已经包含了另外一个的数据。对于相似这种多个RDD的数据有重叠或者包含的状况,咱们应该尽可能复用一个RDD,这样能够尽量地减小RDD的数量,从而尽量减小算子执行的次数。

3)对屡次使用的RDD进行持久化
Spark中对于一个RDD执行屡次算子的默认原理是这样的:每次你对一个RDD执行一个算子操做时,都会从新从源头处计算一遍,计算出那个RDD来,而后再对这个RDD执行你的算子操做。所以对于这种状况,建议是:对屡次使用的RDD进行持久化,此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。之后每次对这个RDD进行算子操做时,都会直接从内存或磁盘中提取持久化的RDD数据,而后执行算子,而不会从源头处从新计算一遍这个RDD,再执行算子操做。

​ persist():手动选择持久化级别,并使用指定的方式进行持久化

Spark持久化级别
持久化级别 含义
MEMORY_ONLY 使用未序列化的Java对象格式,将数据保存在内存中。若是内存不够存放全部的数据,则数据可能就不会进行持久化。cache()使用的持久化策略
MEMORY_AND_DISK 使用未序列化的Java对象格式,优先尝试将数据保存在内存中,吐过内存不够存放全部的数据,会将数据写入磁盘文件中。
MEMORY_ONLY_SER 含义同MEMORY_ONLY,但会将RDD中的数据进行序列化,RDD的每一个partition会被序列化成一个字节数组。
MEMORY_AND_DISK_SER 含义同MEMORY_AND_DISK,但会将RDD中的数据进行序列化,RDD的每一个partition会被序列化成一个字节数组。
DISK_ONLY 使用未序列化的Java对象格式,将数据所有写入磁盘文件中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2...... 对于以上任意一种持久化策略,若是加上后缀_2,表明的是将每一个持久化的数据,都复制一份副本,并将副本保存到其余节点上。这种基于副本的持久化机制主要用于进行容错,加入某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可使用该数据在其余节点上的副本。
如何选择一种最合适的持久化策略

​ 默认状况下,性能最高的固然是MEMORY_ONLY,但前提是你的内存必须足够大,能够绰绰有余地存放下整个RDD的全部数据。不进行序列化与反序列化的数据的操做,避免了部分性能开销,对于这个RDD的后续算子操做,都是基于纯内存的数据的操做,不须要从磁盘文件中读取数据,性能很高。可是必需要注意的是,在实际的生产环境中,恐怕可以直接用这种策略的场景仍是有限的,若是RDD中数据比较多时,直接用这种持久化级别,会致使JVM的OOM内存溢出异常。

​ 若是使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每一个partition仅仅是一个字节数组而已,大大减小了对象数量,并下降了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。可是后续算子能够基于纯内存进行操做,所以性能整体仍是比较高的。此外,可能发生的问题同上,若是RDD中的数据量过多的话,仍是可能会致使OOM内存溢出的异常。

​ 若是纯内存的级别都没法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。由于既然到了这一步,就说明RDD的数据量很大,内存没法彻底放下。序列化后的数据比较少,能够节省内存和磁盘的空间开销。同时该策略会优先尽可能尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

​ 一般不建议使用DISK_ONLY和后缀为_2的级别,由于彻底基于磁盘文件进行数据的读写,会致使性能急剧下降,有时还不如从新计算一次全部RDD。后缀为_2的级别,必须将全部数据都复制一份副本,并发送到其余节点上,数据复制以及网络传输会致使较大的性能开销,除非是要求做业的高可用性,不然不建议使用。

4)尽可能避免使用shuffle类算子

​ 在Spark做业运行过程当中,最消耗性能的地方就是shuffle过程。Shuffle过程当中,各个节点上的相同key都会先写入本地磁盘文件中,而后其余节点须要经过网络传输拉去各个节点上的磁盘文件中的相同key。并且相同key都拉取到同一个节点进行聚合操做时,还有可能会由于一个节点上处理的key过多,致使内存不够存放,进而溢写到磁盘文件中。所以在shuffle过程当中,可能会发生大量的磁盘文件读写的IO操做,以及数据的网络传输操做。磁盘IO和网络数据传输是shuffle性能较差的主要缘由。

​ 所以在开发过程当中,要尽可能避免使用会致使shuffle的算子,尽可能使用map类的非shuffle算子,能够大大减小性能开销。

使用广播变量与map替代join

val rddOld = rdd1.join(rdd2)//会致使Shuffle操做

val rdd2Data = rdd2.collect()//将rdd2的数据收集回来
val rdd2DataBroadcast = sc.broadcast(rdd2Data)//将rdd2做为广播变量
val rddNew = rdd.map(rdd2DataBroadcast......)//对相同的key进行拼接

//每一个Executor存放一份广播变量
//建议将数据量比较少(几百M到一两个G)的rdd做为广播变量

5)使用map-side预聚合的shuffle操做

​ 若是由于业务须要,必定要使用shuffle操做,没法用map类的算子来替代,那么尽可能使用能够map-side预聚合的算子。所谓map-side预聚合,说的是在每一个节点本地对相同的key进行一次聚合操做,相似于MapReduce中的本地combiner。map-side预聚合以后,每一个节点本地就只会有一条相同的key,由于多条相同的key都被聚合起来了。其余节点在拉取全部节点上的相同key时,就会大大减小须要拉取的数据数量,从而也就减小了磁盘IO以及网络传输开销。一般来讲,在可能的状况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。由于reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每一个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来讲比较差。

6)使用高性能的算子

​ 1)groupByKey ==> reduceByKey / aggregateByKey //具体看第五点

​ 2)map ==> mapPartitions

​ mapPartitions类的算子,一次函数调用会处理一个partition全部的数据,而不是一次函数调用处理一条,性能相对来讲会高一些。可是有的时候,使用mapPartitions会出现OOM的问题。由于单次函数调用就要处理掉一个partition全部的数据,若是内存不够,垃圾回收是没法回收掉大多对象的,极可能出现OOM异常,因此使用这类操做时要慎重。

​ 3)foreach ==> foreachPartitions

​ 原理相似于mapPartitions。在实践中发现,foreachpartitions类的算子,对性能的提高仍是颇有帮助的。好比在foreach函数中,将RDD中全部数据写MySQL,那么若是是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会建立一个数据库链接,此时就势必会频繁地建立和销毁数据库链接,性能是很是低下;可是若是用foreachPartitions算子一次性处理一个partition的数据,那么对于每一个partition,只要建立一个数据库链接便可,而后执行批量插入操做,此时性能是比较高的。实践中发现,对于1万条左右的数据量写MySQL,性能能够提高30%以上。

​ 4)使用filter以后进行coalesce操做

​ 一般对一个RDD执行filter算子过滤掉RDD中较多数据后(好比30%以上的数据),建议使用coalesce算子,手动减小RDD的partition数量,将RDD中的数据压缩到更少的partition中去。由于filter以后,RDD的每一个partition中都会有不少数据被过滤掉,此时若是照常进行后续的计算,其实每一个task处理的partition中的数据量并非不少,有一点资源浪费,并且此时处理的task越多,可能速度反而越慢。所以用coalesce减小partition数量,将RDD中的数据压缩到更少的partition以后,只要使用更少的task便可处理完全部的partition。在某些场景下,对于性能的提高会有必定的帮助。

​ 5)partition + sort ==> repartitionAndSortWithinPartitions

​ repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,若是须要在repartition重分区以后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。由于该算子能够一边进行重分区的shuffle操做,一边进行排序。shuffle与sort两个操做同时进行,比先shuffle再sort来讲,性能是要高的。

7)广播大变量

​ 有时在开发过程当中,会遇到须要在算子函数中使用外部变量的场景(尤为是大变量,100M以上的集合),那么久应该使用广播变量来提高性能。

​ 在算子函数中使用到外部变量时,默认状况下,Spark会将该变量复制多个副本,经过网络传输到task中,此时每一个task都有一个变量副本。若是变量自己比较大的话,那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存致使的频繁GC,都会极大地影响性能。所以对于上述状况,若是使用的外部变量比较大,建议使用Spark的广播变量,对该变量进行广播。广播后的变量,会保证每一个Executor的内存中,只驻留一份广播变量,而Executor中的task执行时共享该Executor中的那份共享副本。这样的话,能够大大减小变量副本的数量,从而减小网络传输的性能开销,并减小对Executor内存的占用开销,下降GC的频率。//代码看第四点

8)使用Kryo优化序列化性能

​ 在Spark中,主要有三个地方涉及了序列化:

​ 1)在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输
2)将自定义的类型做为RDD的泛型类型时。全部自定义类型对象,都会进行序列化,所以在这种状况下,也要求自定义的类型必须实现Serializable接口
3)使用可序列化的持久化策略时(好比MEMORY_ONLY_SER),Spark会将RDD中的每一个partition都序列化成一个大的字节数组

​ 对于这三种出现序列化的地方,咱们均可以经过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。可是Spark同时支持使用Kyro序列化库,Kryo序列化库的性能比Java序列化库的性能要高不少。Spark之因此默认没有使用Kryo做为序列化库,是由于Kyro要求最好要注册全部须要进行序列化的自定义类型,所以对于开发者来讲,这种方式比较麻烦。

​ 如下是使用Kryo的代码示例,咱们只要设置序列化类,再注册要序列化的自定义类型便可

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//设置序列化器为KyroSerializer
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))//注册要序列化的自定义类型

9)优化数据结构

​ Java中,有三种类型比较耗费内存:

​ 1)对象,每一个Java对象都有对象头、引用等额外的信息,所以比较占用内存空间

​ 2)字符串,每一个字符串内部都有一个字符数组以及长度等额外信息

​ 3)集合类型,由于集合类型内部一般会使用一些内部类来封装集合元素,好比Map.Entry

​ 所以Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽可能不要使用上述三种数据结构,尽可能使用字符串替代对象,使用原始类型替代字符串,使用数组替代集合类型,这样能够减小内存占用,从而下降GC频率,提高性能。

10)资源参数调优

num-executors
参数说明:该参数用于设置Spark做业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽量按照你的设置来在集群的各个工做节点上,启动相应数量的Executor进程。这个参数很是之重要,若是不设置的话,默认只会给你启动少许的Executor进程,此时你的Spark做业的运行速度是很是慢的。
参数调优建议:每一个Sparkk做业的运行通常设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都很差。设置的太少,没法充分利用集群资源;设置的太多的话,大部分队列可能没法给予充分的资源。
--------------------------------------------------------------------------------
executor-memory
参数说明:该参数用于设置每一个Executor进程的内存。Executor内存的大小,不少时候直接决定了
Spark做业的性能,并且跟常见的JVM OOM异常,也有直接的关联。
参数调优建议:每一个Executor进程的内存设置4G~8G较为合适。可是这只是一个参考值,具体的设置仍是得根据不一样部门的资源队列来定。能够看看本身团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,若是你是跟团队里其余人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你本身的Spark做业占用了队列全部的资源,致使别的同窗的做业没法运行。
--------------------------------------------------------------------------------
executor-cores
参数说明:该参数用于设置每一个Executor进程的CPU core数量。这个参数决定了每一个Executor进程并行执行task线程的能力。由于每一个CPU core同一时间只能执行一个task线程,所以每一个Executor进程的CPU core数量越多,越可以快速地执行完分配给本身的全部task线程。
参数调优建议:Executor的CPU core数量设置为2~4个较为合适。一样得根据不一样部门的资源队列来定,能够看看本身的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每一个Executor进程能够分配到几个CPU core。一样建议,若是是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其余同窗的做业运行。
--------------------------------------------------------------------------------
driver-memory
参数说明:该参数用于设置Driver进程的内存。
参数调优建议:Driver的内存一般来讲不设置,或者设置1G左右应该就够了。惟一须要注意的一点是,若是须要使用collect算子将RDD的数据所有拉取到Driver上进行处理,那么必须确保Driver的内存足够大,不然会出现OOM内存溢出的问题。
--------------------------------------------------------------------------------
spark.default.parallelism
参数说明:该参数用于设置每一个stage的默认task数量。这个参数极为重要,若是不设置可能会直接影响你的Spark做业性能。
参数调优建议:Spark做业的默认task数量为500~1000个较为合适。不少同窗常犯的一个错误就是不去设置这个参数,那么此时就会致使Spark本身根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。一般来讲,Spark默认设置的数量是偏少的(好比就几十个task),若是task数量偏少的话,就会致使你前面设置好的Executor的参数都前功尽弃。试想一下,不管你的Executor进程有多少个,内存和CPU有多大,可是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!所以Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,好比Executor的总CPU core数量为300个,那么设置1000个task是能够的,此时能够充分地利用Spark集群的资源。
--------------------------------------------------------------------------------
spark.storage.memoryFraction
参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,能够用来保存持久化的RDD数据。根据你选择的不一样的持久化策略,若是内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
参数调优建议:若是Spark做业中,有较多的RDD持久化操做,该参数的值能够适当提升一些,保证持久化的数据可以容纳在内存中。避免内存不够缓存全部的数据,致使数据只能写入磁盘中,下降了性能。可是若是Spark做业中的shuffle类操做比较多,而持久化操做比较少,那么这个参数的值适当下降一些比较合适。此外,若是发现做业因为频繁的gc致使运行缓慢(经过spark web ui能够观察到做业的gc耗时),意味着task执行用户代码的内存不够用,那么一样建议调低这个参数的值。
--------------------------------------------------------------------------------
spark.shuffle.memoryFraction
参数说明:该参数用于设置shuffle过程当中一个task拉取到上个stage的task的输出后,进行聚合操做时可以使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操做。shuffle操做在进行聚合时,若是发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地下降性能。
参数调优建议:若是Spark做业中的RDD持久化操做较少,shuffle操做较多时,建议下降持久化操做的内存占比,提升shuffle操做的内存占比比例,避免shuffle过程当中数据过多时内存不够用,必须溢写到磁盘上,下降了性能。此外,若是发现做业因为频繁的gc致使运行缓慢,意味着task执行用户代码的内存不够用,那么一样建议调低这个参数的值。

静态内存管理

​ Spark最初采用的静态内存管理机制。内存交给JVM去管理,存储内存、执行内存和其余内存的大小在Spark应用程序运行期间均为固定的。用户能够应用程序启动前进行配置。

堆内内存的分配

1)Storage内存区域:由spark.storage.memoryFraction控制(默认为0.6,占系统内存的60%)

2)Execution内存区域:由spark.shuffle.memoryFraction控制(默认为0.2,占系统内存的20%)

3)其余:取决于上面两部分的大小(默认为0.2,占系统内存的20%)

1)Storage内存区域:

​ 1)可用的Storage内存:用于缓存RDD数据和broadcast数据,由spark.storage.safetyFraction决定(默认为0.9,占Storage内存的90%)

​ 2)用于unroll:缓存iterator形式的Block数据,由spark.storage.unrollFraction决定(默认为0.2,占Storage内存的20%)

​ 3)预留:可预防OOM

2)Execution内存区域:

​ 1)可用的Execution内存:用于缓存在shuffle过程当中的中间数据,由spark.shuffle.safetyFraction控制(默认为0.8,占Execution内存的80%)

​ 2)预留:可预防OOM

3)其余:

​ 用户定义的数据结构或Spark内部元数据

可用的堆内内存的大小须要按照下面的方式计算:

​ 可用的存储内存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction

​ 可用的执行内存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction

​ 其中systemMaxMemory取决于当前JVM堆内内存的大小,最后可用的存储内存或者执行内存要在此基础上与各自的memoryFraction参数和safetyFraction参数相乘得出。

堆外内存

​ 堆外内存的空间分配较为简单,存储内存,执行内存的大小一样是固定的。

1)Storage内存:由spark.memory.storageFraction控制(默认为0.5,占堆外可用内存的50%)

2)Execution内存:(默认为0.5,占堆外可用内存的50%)

总结

​ 静态内存管理机制实现起来较为简单,但若是开发人员不熟悉Spark的存储机制,或没有根据具体的数据规模和计算任务作相应的配置,很容易形成“一半海水,一半火焰”的局面,即存储内存和执行内存中的一方剩余大量的空间,而另外一方却早早被占满,不得不淘汰或移除旧的内容以存储新的内容。所以,出现了新的内存管理机制:统一内存管理。但出于兼容旧版本应用程序的目的,Spark仍然保留了它的实现。

资源参数调优

spark-submit \ 
 --master yarn-cluster \ 
 --num-executors 100 \ 
 --executor-memory 6G \ 
 --executor-cores 4 \ 
 --driver-memory 1G \ 
 --conf spark.default.parallelism=1000 \ 
 --conf spark.storage.memoryFraction=0.5 \ 
 --conf spark.shuffle.memoryFraction=0.3

统一内存管理

堆外内存

1)Storage内存:由spark.memory.storageFraction控制(默认为0.5,占堆外可用内存的50%)

2)Execution内存:(默认为0.5,占堆外可用内存的50%)

动态占用机制:与静态内存管理同样,当双方空间都被占满后,如有新增内容双方都须要将其存储到磁盘。可是若己方空间不足对方空间空余则可占用对方空间。

​ Storage占用对方的内存可被淘汰。

​ Execution占用对方的内存不可被淘汰,只能等待释放。

总结

​ 凭借统一内存管理机制,Spark在必定程度上提升了对内和堆外内存资源的利用率,下降了开发者维护Spark内存的难度,但并不意味着开发者能够高枕无忧。若是存储内存的空间太大或者说缓存的数据过多,反而会致使频繁的全量垃圾惠州,下降任务执行时的性能,由于缓存的RDD数据一般都是长期驻留内存的,因此要想充分发挥Spark的性能,须要开发者进一步了解存储内存和执行内存各自的管理方式和实现原理。

相关文章
相关标签/搜索