Spark学习——性能调优(一)

其余更多java基础文章:
java基础学习(目录)java


Spark的性能调优主要有如下几个方向:node

  • 常规性能调优:分配资源、并行度、RDD架构与缓存等
  • JVM调优(Java虚拟机):JVM相关的参数,一般状况下,若是你的硬件配置、基础的JVM的配置,都ok的话,JVM一般不会形成太严重的性能问题;反而更多的是,在troubleshooting中,JVM占了很重要的地位;JVM形成线上的spark做业的运行报错,甚至失败(好比OOM)。
  • shuffle调优(至关重要):spark在执行groupByKey、reduceByKey等操做时的,shuffle环节的调优。这个很重要。shuffle调优,其实对spark做业的性能的影响,是至关之高!!!经验:在spark做业的运行过程当中,只要一牵扯到有shuffle的操做,基本上shuffle操做的性能消耗,要占到整个spark做业的50%~90%。10%用来运行map等操做,90%耗费在两个shuffle操做。groupByKey、countByKey。
  • spark操做调优(spark算子调优,比较重要):groupByKey,countByKey或aggregateByKey来重构实现。有些算子的性能,是比其余一些算子的性能要高的。foreachPartition替代foreach。若是一旦遇到合适的状况,效果仍是不错的。

按照优化效果简单排序:算法

  1. 分配资源、并行度、RDD架构与缓存
  2. shuffle调优
  3. spark算子调优
  4. JVM调优、广播大变量、Kryo、fastUtil

本系列主要讲解:shell

  • 性能调优
    • 分配更多资源
    • 调节并行度
    • 重构RDD架构及持久化RDD
    • 广播大变量
    • Kryo序列化
    • fastUtil优化
    • 调节数据本地化等待时长
  • JVM调优
    • 下降cache操做的内存占比
    • 调节executor堆外内存
    • 调节链接等待时长
  • Shuffle调优
    • 合并map端输出文件
    • 调节map端内存缓存和reduce端内存占比
    • SortShuffleManager调优
  • 算子调优
    • 使用MapPartition提高性能
    • filter事后使用coalesce减小分区数量
    • 使用foreachPartition优化
    • 使用repatition解决Spark SQL低并行度
    • 使用reduceByKey本地聚合

分配更多资源

性能调优的王道,就是增长和分配更多的资源,性能和速度上的提高,是显而易见的;基本上,在必定范围以内,增长资源与性能的提高,是成正比的;写完了一个复杂的spark做业以后,进行性能调优的时候,首先第一步,我以为,就是要来调节最优的资源配置;在这个基础之上,若是说你的spark做业,可以分配的资源达到了你的能力范围的顶端以后,没法再分配更多的资源了,公司资源有限;那么才是考虑去作后面的这些性能调优的点。apache

分配哪些资源?

  • executor数量
  • 每一个executor的cpu core数量
  • 每一个executor的内存大小
  • driver内存大小

在哪里分配这些资源?

在咱们在生产环境中,提交spark做业时,用的spark-submit shell脚本,里面调整对应的参数数组

/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \  配置executor的数量
--driver-memory 100m \  配置driver的内存(影响不大)
--executor-memory 100m \  配置每一个executor的内存大小
--executor-cores 3 \  配置每一个executor的cpu core数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
复制代码

为何多分配了这些资源之后,性能会获得提高?

如上图所示,Driver中的SparkContext,DAGScheduler,TaskScheduler,会将咱们的算子,切割成大量的task,提交到Application的executor上面去执行。假设咱们最后切割出了100个task任务。

增长executor

若是executor数量比较少,那么,可以并行执行的task数量就比较少,就意味着,咱们的Application的并行执行的能力就很弱。
好比有3个executor,每一个executor有2个cpu core,那么同时可以并行执行的task,就是6个。6个执行完之后,再换下一批6个task。 增长了executor数量之后,那么,就意味着,可以并行执行的task数量,也就变多了。好比原先是6个,如今可能能够并行执行10个,甚至20个,100个。那么并行能力就比以前提高了数倍,数十倍。 相应的,性能(执行的速度),也能提高数倍~数十倍。缓存

增长每一个executor的cpu core

增长每一个executor的cpu core,也是增长了执行的并行能力。本来20个executor,每一个才2个cpu core。可以并行执行的task数量,就是40个task。
如今每一个executor的cpu core,增长到了5个。可以并行执行的task数量,就是100个task。 执行的速度,提高了2.5倍。bash

增长每一个executor的内存量。

增长了内存量之后,对性能的提高,有三点:网络

  1. 若是须要对RDD进行cache,那么更多的内存,就能够缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减小了磁盘IO。
  2. 对于shuffle操做,reduce端,会须要内存来存放拉取的数据并进行聚合。若是内存不够,也会写入磁盘。若是给executor分配更多内存之后,就有更少的数据,须要写入磁盘,甚至不须要写入磁盘。减小了磁盘IO,提高了性能。
  3. 对于task的执行,可能会建立不少对象。若是内存比较小,可能会频繁致使JVM堆内存满了,而后频繁GC,垃圾回收,minor GC和full GC。(速度很慢)。内存加大之后,带来更少的GC,垃圾回收,避免了速度变慢,速度变快了。

调节并行度

并行度:其实就是指的是,Spark做业中,各个stage的task数量,也就表明了Spark做业的在各个阶段(stage)的并行度。架构

若是并行度太低,会怎么样?

假设,如今已经在spark-submit脚本里面,给咱们的spark做业分配了足够多的资源,好比50个executor,每一个executor有10G内存,每一个executor有3个cpu core。基本已经达到了集群或者yarn队列的资源上限。

task没有设置,或者设置的不多,好比就设置了,100个task。50个executor,每一个executor有3个cpu core,也就是说,你的Application任何一个stage运行的时候,都有总数在150个cpu core,能够并行运行。可是你如今,只有100个task,平均分配一下,每一个executor分配到2个task,ok,那么同时在运行的task,只有100个,每一个executor只会并行运行2个task。每一个executor剩下的一个cpu core,就浪费掉了。

你的资源虽然分配足够了,可是问题是,并行度没有与资源相匹配,致使你分配下去的资源都浪费掉了。

合理的并行度的设置

应该是要设置的足够大,大到能够彻底合理的利用你的集群资源;好比上面的例子,总共集群有150个cpu core,能够并行运行150个task。那么就应该将你的Application的并行度,至少设置成150,才能彻底有效的利用你的集群资源,让150个task,并行执行;并且task增长到150个之后,便可以同时并行运行,还可让每一个task要处理的数据量变少;好比总共150G的数据要处理,若是是100个task,每一个task计算1.5G的数据;如今增长到150个task,能够并行运行,并且每一个task主要处理1G的数据就能够。

很简单的道理,只要合理设置并行度,就能够彻底充分利用你的集群计算资源,而且减小每一个task要处理的数据量,最终,就是提高你的整个Spark做业的性能和运行速度。

  1. task数量,至少设置成与Spark application的总cpu core数量相同(最理想状况,好比总共150个cpu core,分配了150个task,一块儿运行,差很少同一时间运行完毕)
  2. 官方是推荐,task数量,设置成spark application总cpu core数量的2~3倍,好比150个cpu core,基本要设置task数量为300~500;

实际状况,与理想状况不一样的,有些task会运行的快一点,好比50s就完了,有些task,可能会慢一点,要1分半才运行完,因此若是你的task数量,恰好设置的跟cpu core数量相同,可能仍是会致使资源的浪费,由于,好比150个task,10个先运行完了,剩余140个还在运行,可是这个时候,有10个cpu core就空闲出来了,就致使了浪费。那若是task数量设置成cpu core总数的2~3倍,那么一个task运行完了之后,另外一个task立刻能够补上来,就尽可能让cpu core不要空闲,同时也是尽可能提高spark做业运行的效率和速度,提高性能。

如何设置一个Spark Application的并行度?

spark.default.parallelism 
SparkConf conf = new SparkConf()
  .set("spark.default.parallelism", "500")
复制代码

重构RDD架构和持久化RDD

  • 如上图第一条DAG,默认状况下,屡次对一个RDD执行算子,去获取不一样的RDD;都会对这个RDD以及以前的父RDD,所有从新计算一次;因此在计算RDD3和RDD4的时候,前面的读取HDFS文件,而后对RDD1执行算子,获取 到RDD2会计算两遍。
    这种状况,是绝对绝对,必定要避免的,一旦出现一个RDD重复计算的状况,就会致使性能急剧下降。好比,HDFS->RDD1-RDD2的时间是15分钟,那么此时就要走两遍,变成30分钟。

  • 另一种状况,在上图第二条DAG中国,从一个RDD到几个不一样的RDD,算子和计算逻辑实际上是彻底同样的,结果由于人为的疏忽,计算了屡次,获取到了多个RDD。这个也是尽可能要避免的。

如何重构RDD架构

  • 1. RDD架构重构与优化
    尽可能去复用RDD,差很少的RDD,能够抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。

  • 2. 公共RDD必定要实现持久化
    对于要屡次计算和使用的公共RDD,必定要进行持久化。
    持久化,也就是说,将RDD的数据缓存到内存中/磁盘中,(BlockManager),之后不管对这个RDD作多少次计算,那么都是直接取这个RDD的持久化的数据,好比从内存中或者磁盘中,直接提取一份数据。

  • 3. 持久化,是能够进行序列化的
    若是正常将数据持久化在内存中,那么可能会致使内存的占用过大,这样的话,也许,会致使OOM内存溢出。
    当纯内存没法支撑公共RDD数据彻底存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每一个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减小内存的空间占用。
    序列化的方式,惟一的缺点就是,在获取数据的时候,须要反序列化。

  • 4. 为了数据的高可靠性,并且内存充足,可使用双副本机制,进行持久化
    持久化的双副本机制,持久化后的一个副本,由于机器宕机了,副本丢了,就仍是得从新计算一次;持久化的每一个数据单元,存储一份副本,放在其余节点上面;从而进行容错;一个副本丢了,不用从新计算,还可使用另一份副本。
    这种方式,仅仅针对你的内存资源极度充足

广播大变量

关于广播能够阅读 Spark学习(二)——RDD基础 中的共享变量。

为何要使用广播大变量

task执行的算子中,使用了外部的变量,而后driver会把变量以task的形式发送到excutor端,每一个task都会获取一份变量的副本。若是有不少个task,就会有不少给excutor端携带不少个变量,若是这个变量很是大的时候,就可能会形成内存溢出。

好比,外部变量map是1M。总共,你前面调优都调的特好,资源给的到位,配合着资源,并行度调节的绝对到位,1000个task。大量task的确都在并行运行。
这些task里面都用到了占用1M内存的map,那么首先,map会拷贝1000份副本,经过网络传输到各个task中去,给task使用。总计有1G的数据,会经过网络传输。网络传输的开销,不容乐观啊!!!网络传输,也许就会消耗掉你的spark做业运行的总时间的一小部分。
map副本,传输到了各个task上以后,是要占用内存的。1个map的确不大,1M;1000个map分布在你的集群中,一会儿就耗费掉1G的内存。对性能会有什么影响呢?
没必要要的内存的消耗和占用,就致使了,你在进行RDD持久化到内存,也许就无法彻底在内存中放下;就只能写入磁盘,最后致使后续的操做在磁盘IO上消耗性能;
你的task在建立对象的时候,也许会发现堆内存放不下全部对象,也许就会致使频繁的垃圾回收器的回收,GC。GC的时候,必定是会致使工做线程中止,也就是致使Spark暂停工做那么一点时间。频繁GC的话,对Spark做业的运行的速度会有至关可观的影响。

广播大变量原理

  • 广播变量,初始的时候,就在Drvier上有一份副本。
  • task在运行的时候,想要使用广播变量中的数据,此时首先会在本身本地的Executor对应的BlockManager中,尝试获取变量副本;若是本地没有,那么就从Driver远程拉取变量副本,并保存在本地的BlockManager中;此后这个executor上的task,都会直接使用本地的BlockManager中的副本。
  • executor的BlockManager除了从driver上拉取,也可能从其余节点的BlockManager上拉取变量副本,距离越近越好。

使用Kryo序列化

  • 默认状况下,Spark内部是使用Java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化。这种默认序列化机制的好处在于,处理起来比较方便;也不须要咱们手动去作什么事情,只是,你在算子里面使用的变量,必须是实现Serializable接口的,可序列化便可。
  • 可是缺点在于,默认的序列化机制的效率不高,序列化的速度比较慢;序列化之后的数据,占用的内存空间相对仍是比较大。 能够手动进行序列化格式的优化,Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。因此Kryo序列化优化之后,可让网络传输的数据变少;在集群中耗费的内存资源大大减小。

Kryo序列化机制,一旦启用之后,会生效的几个地方:

  1. 算子函数中使用到的外部变量
    算子函数中使用到的外部变量,使用Kryo之后:优化网络传输的性能,能够优化集群中内存的占用和消耗
  2. 持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER等
    持久化RDD,优化内存的占用和消耗;持久化RDD占用的内存越少,task执行的时候,建立的对象,就不至于频繁的占满内存,频繁发生GC。
  3. shuffle
    能够优化网络传输的性能

如何使用Kryo

  • 首先第一步,在SparkConf中设置一个属性,spark.serializer,org.apache.spark.serializer.KryoSerializer类;
SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
复制代码

Kryo之因此没有被做为默认的序列化类库的缘由,就要出现了:主要是由于Kryo要求,若是要达到它的最佳性能的话,那么就必定要注册你自定义的类(好比,你的算子函数中使用到了外部自定义类型的对象变量,这时,就要求必须注册你的类,不然Kryo达不到最佳性能)。

  • 第二步,注册你使用到的,须要经过Kryo序列化的一些自定义
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
复制代码

使用fastUtil优化数据格式

fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue;
fastutil可以提供更小的内存占用,更快的存取速度;咱们使用fastutil提供的集合类,来替代本身平时使用的JDK的原生的Map、List、Set,好处在于,fastutil集合类,能够减少内存的占用,而且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度;
fastutil也提供了64位的array、set和list,以及高性能快速的,以及实用的IO类,来处理二进制和文本类型的文件; fastutil最新版本要求Java 7以及以上版本;

使用场景

Spark中应用fastutil的场景:

  1. 若是算子函数使用了外部变量;那么第一,你可使用Broadcast广播变量优化;第二,可使用Kryo序列化类库,提高序列化性能和效率;第三,若是外部变量是某种比较大的集合,那么能够考虑使用fastutil改写外部变量,首先从源头上就减小内存的占用,经过广播变量进一步减小内存占用,再经过Kryo序列化类库进一步减小内存占用。
  2. 在你的算子函数里,也就是task要执行的计算逻辑里面,若是有逻辑中,出现,要建立比较大的Map、List等集合,可能会占用较大的内存空间,并且可能涉及到消耗性能的遍历、存取等集合操做;那么此时,能够考虑将这些集合类型使用fastutil类库重写,使用了fastutil集合类之后,就能够在必定程度上,减小task建立出来的集合类型的内存占用。避免executor内存频繁占满,频繁唤起GC,致使性能降低。

fastutil的使用

第一步:在pom.xml中引用fastutil的包

<dependency>
    <groupId>fastutil</groupId>
    <artifactId>fastutil</artifactId>
    <version>5.0.9</version>
</dependency>
复制代码

第二步:List => IntList

IntList fastutilExtractList = new IntArrayList();
复制代码

调节数据本地化时长

Spark在Driver上,对Application的每个stage的task,进行分配以前,都会计算出每一个task要计算的是哪一个分片数据,RDD的某个partition;Spark的task分配算法,优先,会但愿每一个task正好分配到它要计算的数据所在的节点,这样的话,就不用在网络间传输数据;

可是呢,一般来讲,有时,事与愿违,可能task没有机会分配到它的数据所在的节点,为何呢,可能那个节点的计算资源和计算能力都满了;因此呢,这种时候,一般来讲,Spark会等待一段时间,默认状况下是3s钟(不是绝对的,还有不少种状况,对不一样的本地化级别,都会去等待),到最后,实在是等待不了了,就会选择一个比较差的本地化级别,好比说,将task分配到靠它要计算的数据所在节点,比较近的一个节点,而后进行计算。

可是对于第二种状况,一般来讲,确定是要发生数据传输,task会经过其所在节点的BlockManager来获取数据,BlockManager发现本身本地没有数据,会经过一个getRemote()方法,经过TransferService(网络数据传输组件)从数据所在节点的BlockManager中,获取数据,经过网络传输回task所在节点。

对于咱们来讲,固然不但愿是相似于第二种状况的了。最好的,固然是task和数据在一个节点上,直接从本地executor的BlockManager中获取数据,纯内存,或者带一点磁盘IO;若是要经过网络传输数据的话,那么实在是,性能确定会降低的,大量网络传输,以及磁盘IO,都是性能的杀手。

何时要调节这个参数?

观察日志,spark做业的运行日志,推荐你们在测试的时候,先用client模式,在本地就直接能够看到比较全的日志。 日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL 观察大部分task的数据本地化级别

若是大多都是PROCESS_LOCAL,那就不用调节了 若是是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长 调节完,应该是要反复调节,每次调节完之后,再来运行,观察日志 看看大部分的task的本地化级别有没有提高;看看,整个spark做业的运行时间有没有缩短

你别本末倒置,本地化级别却是提高了,可是由于大量的等待时长,spark做业的运行时间反而增长了,那就仍是不要调节了

怎么调节?

new SparkConf()
  .set("spark.locality.wait", "10")
复制代码
  • spark.locality.wait,默认是3s;6s,10s

默认状况下,下面3个的等待时长,都是跟上面那个是同样的,都是3s

  • spark.locality.wait.process
  • spark.locality.wait.node
  • spark.locality.wait.rack
相关文章
相关标签/搜索