spark性能调优

第一:提升并行度

并行度就是Spark做业中,各个stage的task数量,也就表明了Spark做业的在各个阶段(stage)的并行度。

若是不调节并行度,致使并行度太低,会怎么样?
假设,如今已经在spark-submit脚本里面,给咱们的spark做业分配了足够多的资源,好比50个executor,每一个executor有10G内存,每一个executor有3个cpu core。基本已经达到了集群或者yarn队列的资源上限。
task没有设置,或者设置的不多,好比就设置了,100个task。50个executor,每一个executor有3个cpu core,也就是说,你的Application任何一个stage运行的时候,都有总数在150个cpu core,能够并行运行。可是你如今,只有100个task,平均分配一下,每一个executor分配到2个task,ok,那么同时在运行的task,只有100个,每一个executor只会并行运行2个task。每一个executor剩下的一个cpu core,就浪费掉了。
合理的并行度的设置,应该是要设置的足够大,大到能够彻底合理的利用你的集群资源;好比上面的例子,总共集群有150个cpu core,能够并行运行150个task。那么就应该将你的Application的并行度,至少设置成150,才能彻底有效的利用你的集群资源,让150个task,并行执行;并且task增长到150个之后,便可以同时并行运行,还可让每一个task要处理的数据量变少;好比总共150G的数据要处理,若是是100个task,每一个task计算1.5G的数据;如今增长到150个task,能够并行运行,并且每一个task主要处理1G的数据就能够。
一、task数量,至少设置成与Spark application的总cpu core数量相同(最理想状况,好比总共150个cpu core,分配了150个task,一块儿运行,差很少同一时间运行完毕)。
二、官方是推荐,task数量,设置成spark application总cpu core数量的2~3倍,好比150个cpu core,基本要设置task数量为300~500。

由于实际状况,与理想状况不一样的,有些task会运行的快一点,好比50s就完了,有些task,可能会慢一点,要1分半才运行完,因此若是你的task数量,恰好设置的跟cpu core数量相同,可能仍是会致使资源的浪费,由于,好比150个task,10个先运行完了,剩余140个还在运行,可是这个时候,有10个cpu core就空闲出来了,就致使了浪费。那若是task数量设置成cpu core总数的2~3倍,那么一个task运行完了之后,另外一个task立刻能够补上来,就尽可能让cpu core不要空闲,同时也是尽可能提高spark做业运行的效率和速度,提高性能。
三、如何设置一个Spark Application的并行度?
spark.default.parallelism
SparkConf conf = new SparkConf()
conf.set("spark.default.parallelism", "500")
 

第二,资源分配优化node

Spark的分配资源主要就是 executor、cpu per executor、memory per executor、driver memory 等的调节,在咱们在生产环境中,提交spark做业时,用的spark-submit shell脚本,里面调整对应的参数:算法

/usr/local/spark/bin/spark-submit \shell

--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ 配置executor的数量
--driver-memory 100m \ 配置driver的内存(影响不大)
--executor-memory 100m \ 配置每一个executor的内存大小
--executor-cores 3 \ 配置每一个executor的cpu core数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

首先要了解你的机子的资源,多大的内存,多少个cpu core,就根据这个实际状况去设置,能使用多少资源,就尽可能去调节到最大的大小(executor的数量,几十个到上百个不等;executor内存;executor cpu core)。
Spark Standalone 模式下,若是每台机器可用内存是4G,2个cpu core,20台机器,那能够设置:
20个executor;每一个executor4G内存2个cpu core。
yarn 模式下,根据spark要提交的资源队列资源来考虑,若是所在队列资源为500G内存,100个cpu core,那能够设置:
50个executor;每一个executor10G内存2个cpu core。
 
调节资源后,SparkContext,DAGScheduler,TaskScheduler,会将咱们的算子,切割成大量的task,提交到Application的executor上面去执行。
增长每一个executor的cpu core,也是增长了执行的并行能力。本来20个executor,每一个才2个cpu core。可以并行执行的task数量,就是40个task。
如今每一个executor的cpu core,增长到了5个。可以并行执行的task数量,就是100个task。执行的速度,提高了2.5倍。
若是executor数量比较少,那么,可以并行执行的task数量就比较少,就意味着,咱们的Application的并行执行的能力就很弱。
好比有3个executor,每一个executor有2个cpu core,那么同时可以并行执行的task,就是6个。6个执行完之后,再换下一批6个task。
增长了executor数量之后,那么,就意味着,可以并行执行的task数量,也就变多了。好比原先是6个,如今可能能够并行执行10个,甚至20个,100个。那么并行能力就比以前提高了数倍,数十倍。相应的,性能(执行的速度),也能提高数倍~数十倍。
增长每一个executor的内存量。增长了内存量之后,对性能的提高,有两点:
一、若是须要对RDD进行cache,那么更多的内存,就能够缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减小了磁盘IO。
二、对于shuffle操做,reduce端,会须要内存来存放拉取的数据并进行聚合。若是内存不够,也会写入磁盘。若是给executor分配更多内存之后,就有更少的数据,须要写入磁盘,甚至不须要写入磁盘。减小了磁盘IO,提高了性能。
三、对于task的执行,可能会建立不少对象。若是内存比较小,可能会频繁致使JVM堆内存满了,而后频繁GC,垃圾回收,minor GC和full GC。(速度很慢)。内存加大之后,带来更少的GC,垃圾回收,避免了速度变慢,速度变快了。
 

第三,RDD持久化或缓存apache

当第一次对RDD2执行算子,获取RDD3的时候,就会从RDD1开始计算,就是读取HDFS文件,而后对RDD1执行算子,获取到RDD2,而后再计算,获得RDD3数组

默认状况下,屡次对一个RDD执行算子,去获取不一样的RDD;都会对这个RDD以及以前的父RDD,所有从新计算一次;读取HDFS->RDD1->RDD2-RDD4
这种状况,是绝对绝对,必定要避免的,一旦出现一个RDD重复计算的状况,就会致使性能急剧下降。缓存

好比,HDFS->RDD1-RDD2的时间是15分钟,那么此时就要走两遍,变成30分钟网络

 另一种状况,从一个RDD到几个不一样的RDD,算子和计算逻辑实际上是彻底同样的,结果由于人为的疏忽,计算了屡次,获取到了多个RDD。session

因此,建议采用如下方法能够优化:数据结构

 

第一,RDD架构重构与优化
尽可能去复用RDD,差很少的RDD,能够抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。架构

 

第二,公共RDD必定要实现持久化

持久化,也就是说,将RDD的数据缓存到内存中/磁盘中,(BlockManager),之后不管对这个RDD作多少次计算,那么都是直接取这个RDD的持久化的数据,好比从内存中或者磁盘中,直接提取一份数据。

 

第三,持久化,是能够进行序列化的

若是正常将数据持久化在内存中,那么可能会致使内存的占用过大,这样的话,也许,会致使OOM内存溢出。

当纯内存没法支撑公共RDD数据彻底存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每一个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减小内存的空间占用。

序列化的方式,惟一的缺点就是,在获取数据的时候,须要反序列化。

若是序列化纯内存方式,仍是致使OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。内存+磁盘,序列化。

 

第四,为了数据的高可靠性,并且内存充足,可使用双副本机制,进行持久化

持久化的双副本机制,持久化后的一个副本,由于机器宕机了,副本丢了,就仍是得从新计算一次;持久化的每一个数据单元,存储一份副本,放在其余节点上面;从而进行容错;一个副本丢了,不用从新计算,还可使用另一份副本。这种方式,仅仅针对你的内存资源极度充足。

  

sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());

/**
* 持久化,很简单,就是对RDD调用persist()方法,并传入一个持久化级别
*
* 若是是persist(StorageLevel.MEMORY_ONLY()),纯内存,无序列化,那么就能够用cache()方法来替代
* StorageLevel.MEMORY_ONLY_SER(),第二选择
* StorageLevel.MEMORY_AND_DISK(),第三选择
* StorageLevel.MEMORY_AND_DISK_SER(),第四选择
* StorageLevel.DISK_ONLY(),第五选择
*
* 若是内存充足,要使用双副本高可靠机制
* 选择后缀带_2的策略
* StorageLevel.MEMORY_ONLY_2()
*
*/
sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());
 

第四:使用广播变量

广播变量, 其实就是SparkContext的broadcast()方法,传入你要广播的变量,便可
final Broadcast<Map<String, Map<String, IntList>>> broadcast = sc.broadcast(fastutilDateHourExtractMap);
使用广播变量的时候,直接调用广播变量(Broadcast类型)的value() / getValue() ,能够获取到以前封装的广播变量
Map<String, Map<String, IntList>> dateHourExtractMap = broadcast .value();

像随机抽取的map,1M,举例。还算小的。若是你是从哪一个表里面读取了一些维度数据,比方说,全部商品品类的信息,在某个算子函数中要使用到。100M。1000个task。100G的数据,网络传输。集群瞬间由于这个缘由消耗掉100G的内存。
这种默认的,task执行的算子中,使用了外部的变量,每一个task都会获取一份变量的副本,有什么缺点呢?在什么状况下,会出现性能上的恶劣的影响呢?map,自己是不小,存放数据的一个单位是Entry,还有可能会用链表的格式的来存放Entry链条。因此map是比较消耗内存的数据格式。
好比,map是1M。总共,你前面调优都调的特好,资源给的到位,配合着资源,并行度调节的绝对到位,1000个task。大量task的确都在并行运行。这些task里面都用到了占用1M内存的map,那么首先,map会拷贝1000份副本,经过网络传输到各个task中去,给task使用。总计有1G的数据,会经过网络传输。网络传输的开销,不容乐观啊!!!网络传输,也许就会消耗掉你的spark做业运行的总时间的一小部分。
map副本,传输到了各个task上以后,是要占用内存的。1个map的确不大,1M;1000个map分布在你的集群中,一会儿就耗费掉1G的内存。对性能会有什么影响呢?没必要要的内存的消耗和占用,就致使了,你在进行RDD持久化到内存,也许就无法彻底在内存中放下;就只能写入磁盘,最后致使后续的操做在磁盘IO上消耗性能;
你的task在建立对象的时候,也许会发现堆内存放不下全部对象,也许就会致使频繁的垃圾回收器的回收,GC。GC的时候,必定是会致使工做线程中止,也就是致使Spark暂停工做那么一点时间。频繁GC的话,对Spark做业的运行的速度会有至关可观的影响。

若是说,task使用大变量(1m~100m),明知道会致使性能出现恶劣的影响。那么咱们怎么来解决呢?
广播,Broadcast,将大变量广播出去。而不是直接使用。
 
广播变量的好处,不是每一个task一份变量副本,而是变成每一个节点的executor才一份副本。这样的话,就可让变量产生的副本大大减小。
广播变量,初始的时候,就在Drvier上有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在本身本地的Executor对应的
BlockManager中,尝试获取变量副本;若是本地没有,BlockManager,也许会从远程的Driver上面去获取变量副本;也有可能从距离比较近的其余
节点的Executor的BlockManager上去获取,并保存在本地的BlockManager中;BlockManager负责管理某个Executor对应的内存和磁盘上的数据,
此后这个executor上的task,都会直接使用本地的BlockManager中的副本。

好比,50个executor,1000个task。一个map,10M:
默认状况下,1000个task,1000份副本。10G的数据,网络传输,在集群中,耗费10G的内存资源。
若是使用了广播变量。50个execurtor,50个副本。500M的数据,网络传输,并且不必定都是从Driver传输到每一个节点,还多是就近从最近的
节点的executor的bockmanager上拉取变量副本,网络传输速度大大增长;500M的内存消耗。
 

第五:使用Kryo序列化

在SparkConf中设置一个属性,spark.serializer,org.apache.spark.serializer.KryoSerializer类;注册你使用到的,须要经过Kryo序列化的,
一些自定义类,SparkConf.registerKryoClasses()
SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
Kryo之因此没有被做为默认的序列化类库的缘由,就要出现了:主要是由于Kryo要求,若是要达到它的最佳性能的话,那么就必定要注册你自定义的类(好比,你的算子函数中使用到了外部自定义类型的对象变量,这时,就要求必须注册你的类,不然Kryo达不到最佳性能)。

当使用了序列化的持久化级别时,在将每一个RDD partition序列化成一个大的字节数组时,就会使用Kryo进一步优化序列化的效率和性能。默认状况下,Spark内部是使用Java的序列化机制,ObjectOutputStream / ObjectInputStream,对象输入输出流机制,来进行序列化。
这种默认序列化机制的好处在于,处理起来比较方便;也不须要咱们手动去作什么事情,只是,你在算子里面使用的变量,必须是实现Serializable接口的,可序列化便可。可是缺点在于,默认的序列化机制的效率不高,序列化的速度比较慢;序列化之后的数据,占用的内存空间相对仍是比较大。
Spark支持使用Kryo序列化机制。Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10
 
 
。因此Kryo序列化优化之后,可让网络传输的数据变少;在集群中耗费的内存资源大大减小。在进行stage间的task的shuffle操做时,节点与节点之
间的task会互相大量经过网络拉取和传输文件,此时,这些数据既然经过网络传输,也是可能要序列化的,就会使用Kryo。
 
Kryo序列化机制,一旦启用之后,会生效的几个地方:
一、算子函数中使用到的外部变量,使用Kryo之后:优化网络传输的性能,能够优化集群中内存的占用和消耗
二、持久化RDD,StorageLevel.MEMORY_ONLY_SER优化内存的占用和消耗;持久化RDD占用的内存越少,task执行的时候,建立的对象,就
不至于频繁的占满内存,频繁发生GC。
三、shuffle:能够优化网络传输的性能。
 

第六:数据本地化

本地化级别

PROCESS_LOCAL:进程本地化,代码和数据在同一个进程中,也就是在同一个executor中;计算数据的task由executor执行,数据在executor的BlockManager中;性能最好

NODE_LOCAL:节点本地化,代码和数据在同一个节点中;好比说,数据做为一个HDFS block块,就在节点上,而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不一样executor中;数据须要在进程间进行传输
NO_PREF:对于task来讲,数据从哪里获取都同样,没有好坏之分
RACK_LOCAL:机架本地化,数据和task在一个机架的两个节点上;数据须要经过网络在节点之间进行传输
ANY:数据和task可能在集群中的任何地方,并且不在一个机架中,性能最差


Spark.locality.wait,默认是3s

 

Spark在Driver上,对Application的每个stage的task,进行分配以前,都会计算出每一个task要计算的是哪一个分片数据,RDD的某个partition;Spark的task分配算法,优先,会但愿每一个task正好分配到它要计算的数据所在的节点,这样的话,就不用在网络间传输数据;

可是可能task没有机会分配到它的数据所在的节点,由于可能那个节点的计算资源和计算能力都满了;因此呢,这种时候,一般来讲,Spark会等待一段时间,默认状况下是3s钟(不是绝对的,还有不少种状况,对不一样的本地化级别,都会去等待),到最后,实在是等待不了了,就会选择一个比较差的本地化级别,好比说,将task分配到靠它要计算的数据所在节点,比较近的一个节点,而后进行计算。

可是对于第二种状况,一般来讲,确定是要发生数据传输,task会经过其所在节点的BlockManager来获取数据,BlockManager发现本身本地没有数据,会经过一个getRemote()方法,经过TransferService(网络数据传输组件)从数据所在节点的BlockManager中,获取数据,经过网络传输回task所在节点。

对于咱们来讲,固然不但愿是相似于第二种状况的了。最好的,固然是task和数据在一个节点上,直接从本地executor的BlockManager中获取数据,纯内存,或者带一点磁盘IO;若是要经过网络传输数据的话,那么实在是,性能确定会降低的,大量网络传输,以及磁盘IO,都是性能的杀手。

 

时候要调节这个参数?

观察日志,spark做业的运行日志,推荐你们在测试的时候,先用client模式,在本地就直接能够看到比较全的日志。
日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL,观察大部分task的数据本地化级别。

若是大多都是PROCESS_LOCAL,那就不用调节了
若是是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长
调节完,应该是要反复调节,每次调节完之后,再来运行,观察日志
看看大部分的task的本地化级别有没有提高;看看,整个spark做业的运行时间有没有缩短

 

可是注意别本末倒置,本地化级别却是提高了,可是由于大量的等待时长,spark做业的运行时间反而增长了,那就仍是不要调节了。

spark.locality.wait,默认是3s;能够改为6s,10s

默认状况下,下面3个的等待时长,都是跟上面那个是同样的,都是3sspark.locality.wait.processspark.locality.wait.nodespark.locality.wait.racknew SparkConf()  .set("spark.locality.wait", "10")

相关文章
相关标签/搜索