并行度:其实就是指的是,Spark做业中,各个stage的task数量,也就表明了Spark做业的在各个阶段(stage)的并行度。java
假设,如今已经在spark-submit脚本里面,给咱们的spark做业分配了足够多的资源,好比50个executor,每一个executor有10G内存,每一个executor有3个cpu core。基本已经达到了集群或者yarn队列的资源上限。
可是task没有设置,或者设置的不多,好比就设置了,100个task。50个executor,每一个executor有3个cpu core,也就是说,你的Application任何一个stage运行的时候,都有总数在150个cpu core,能够并行运行。可是你如今,只有100个task,平均分配一下,每一个executor分配到2个task,ok,那么同时在运行的task,只有100个,每一个executor只会并行运行2个task。每一个executor剩下的一个cpu core,就浪费掉了。git
官方推荐,task数量设置成spark application总cpu core数量的2~3倍,好比150个cpu core,基本要设置task数量为300~500github
如何设置一个Spark Application的并行度算法
SparkConf conf = new SparkConf() .set("spark.default.parallelism", "500")
RDD架构重构与优化
尽可能去复用RDD,差很少的RDD,能够抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。apache
公共RDD必定要实现持久化
对于要屡次计算和使用的公共RDD,必定要进行持久化。数组
持久化,是能够进行序列化的
若是正常将数据持久化在内存中,那么可能会致使内存的占用过大,这样的话,也许,会致使OOM内存溢出。
当纯内存没法支撑公共RDD数据彻底存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每一个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减小内存的空间占用。
若是序列化纯内存方式,仍是致使OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。
缺点:在获取数据的时候须要反序列化缓存
数据的高可靠性,
在内存资源很充沛的状况下,能够持久化一个副本网络
而每一个task在处理变量的时候,都会拷贝一份变量的副本,若是变量很大的话,就会耗费不少内存。这时能够采用广播变量的方式,把这个变量广播出去,由于广播变量只在每一个节点的Executor才一份副本
广播变量在初始的时候,就只在Driver上有一份。task在运行的时候,想要使用广播变量中的数据,此时首先会在本身本地的Executor对应的BlockManager中,尝试获取变量副本;若是本地没有,那么就从Driver远程拉取变量副本,并保存在本地的BlockManager中;此后这个executor上的task,都会直接使用本地的BlockManager中的副本。
executor的BlockManager除了从driver上拉取,也可能从其余节点的BlockManager上拉取变量副本,总之越近越好。session
默认状况下,Spark内部是使用Java的序列化机制,ObjectOutputStream/ObjectInputStream,对象输入输出流机制,来进行序列化。
优势:处理起来比较方便,只是在算子里面使用的变量,必须是实现Serializable接口的。
缺点:默认的序列化机制的效率不高,序列化的速度比较慢;序列化之后的数据,占用的内存空间相对仍是比较大。
Kryo序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。
Kryo序列化机制,一旦启用之后,会生效的几个地方:架构
使用Kryo序列化步骤:
fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue;
fastutil可以提供更小的内存占用,更快的存取速度;咱们使用fastutil提供的集合类,来替代本身平时使用的JDK的原生的Map、List、Set,好处在于,fastutil集合类,能够减少内存的占用,而且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度;
fastutil也提供了64位的array、set和list,以及高性能快速的,以及实用的IO类,来处理二进制和文本类型的文件
fastutil的每一种集合类型,都实现了对应的Java中的标准接口(好比fastutil的map,实现了Java的Map接口),所以能够直接放入已有系统的任何代码中。
fastutil还提供了一些JDK标准类库中没有的额外功能(好比双向迭代器)。
fastutil除了对象和原始类型为元素的集合,fastutil也提供引用类型的支持,可是对引用类型是使用等于号(=)进行比较的,而不是equals()方法。
若是算子函数使用了外部变量;那么第一,你可使用Broadcast广播变量优化;第二,可使用Kryo序列化类库,提高序列化性能和效率;第三,若是外部变量是某种比较大的集合,那么能够考虑使用fastutil改写外部变量,首先从源头上就减小内存的占用,经过广播变量进一步减小内存占用,再经过Kryo序列化类库进一步减小内存占用。
在你的算子函数里,若是要建立比较大的Map、List等集合,可能会占用较大的内存空间,并且可能涉及到消耗性能的遍历、存取等集合操做;那么此时,能够考虑将这些集合类型使用fastutil类库重写,使用了fastutil集合类之后,就能够在必定程度上,减小task建立出来的集合类型的内存占用。避免executor内存频繁占满,频繁唤起GC,致使性能降低。
<dependency> <groupId>it.unimi.dsi</groupId> <artifactId>fastutil</artifactId> <version>7.0.6</version> </dependency>
UserVisitSessionAnalyzeSpark.java中831行有示例。
PROCESS_LOCAL > NODE_LOCAL > NO_PREF > RACK_LOCAL > ANY
Spark要对任务(task)进行分配的时候, 会计算出每一个task要计算的是哪一个分片的数据(partition),Spark的task分配算法,会按照上面的顺序来进行分配。
可能PROCESS_LOCAL节点的计算资源和计算能力都满了;Spark会等待一段时间,默认状况下是3s钟(不是绝对的,还有不少种状况,对不一样的本地化级别,都会去等待),到最后,就会选择一个比较差的本地化级别,好比说,将task分配到靠它要计算的数据所在节点,比较近的一个节点,而后进行计算。
观察日志,spark做业的运行日志,先用client模式,在本地就直接能够看到比较全的日志。日志里面会显示,starting task...,PROCESS LOCAL、NODE LOCAL
若是是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长。调节完,应该是要反复调节,每次调节完之后,再来运行,观察日志
spark.locality.wait
, 3s, 6s, 10s...
spark中,堆内存又被划分红了两块儿,一起是专门用来给RDD的cache、persist操做进行RDD数据缓存用的;另一块儿,用来给spark算子函数的运行使用的,存放函数中本身建立的对象。
默认状况下,给RDD cache操做的内存占比,是0.6,60%的内存都给了cache操做了。可是问题是,若是某些状况下,cache不是那么的紧张,问题在于task算子函数中建立的对象过多,而后内存又不太大,致使了频繁的minor gc,甚至频繁full gc,致使spark频繁的中止工做。性能影响会很大。
能够经过spark ui,若是是spark on yarn的话,那么就经过yarn的界面,去查看你的spark做业的运行统计。能够看到每一个stage的运行状况,包括每一个task的运行时间、gc时间等等。若是发现gc太频繁,时间太长。此时就能够适当调价这个比例。
下降cache操做的内存占比,大不了用persist操做,选择将一部分缓存的RDD数据写入磁盘,或者序列化方式,配合Kryo序列化类,减小RDD缓存的内存占用;下降cache操做内存占比;对应的,算子函数的内存占比就提高了。这个时候,可能,就能够减小minor gc的频率,同时减小full gc的频率。对性能的提高是有必定的帮助的。
spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2
有时候,若是你的spark做业处理的数据量特别特别大,几亿数据量;而后spark做业一运行,时不时的报错,shuffle file cannot find,executor、task lost,out of memory(内存溢出)
多是说executor的堆外内存不太够用,致使executor在运行的过程当中,可能会内存溢出;而后可能致使后续的stage的task在运行的时候,可能要从一些executor中去拉取shuffle map output文件,
可是executor可能已经挂掉了,关联的block manager也没有了;因此可能会报shuffle output file not found;resubmitting task;executor lost;spark做业完全崩溃。
--conf spark.yarn.executor.memoryOverhead=2048
spark-submit脚本里面,去用--conf的方式,去添加配置; 切记,不是在你的spark做业代码中,用new SparkConf().set()这种方式去设置,不要这样去设置,是没有用的!必定要在spark-submit脚本中去设置。
默认状况下,这个堆外内存上限大概是300多M;一般项目,真正处理大数据的时候,这里都会出现问题,致使spark做业反复崩溃,没法运行;此时就会去调节这个参数,到至少1G(1024M),甚至说2G、4G
若是Executor远程从另外一个Executor中拉取数据的时候,那个Executor正好在gc,此时呢,没法创建网络链接,会卡住;spark默认的网络链接的超时时长,是60s;若是卡住60s都没法创建链接的话,那么就宣告失败了。
碰到某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。颇有多是有那份数据的executor在jvm gc。因此拉取数据的时候,创建不了链接。而后超过默认60s之后,直接宣告失败。
--conf spark.core.connection.ack.wait.timeout=300
spark-submit脚本,切记,不是在new SparkConf().set()这种方式来设置的。一般来讲,能够避免部分的偶尔出现的某某文件拉取失败,某某文件lost
《北风网Spark项目实战》
github: https://github.com/yangtong123/StudySpark