其余更多java基础文章:
java基础学习(目录)java
Spark的性能调优主要有如下几个方向:node
按照优化效果简单排序:算法
本系列主要讲解:shell
性能调优的王道,就是增长和分配更多的资源,性能和速度上的提高,是显而易见的;基本上,在必定范围以内,增长资源与性能的提高,是成正比的;写完了一个复杂的spark做业以后,进行性能调优的时候,首先第一步,我以为,就是要来调节最优的资源配置;在这个基础之上,若是说你的spark做业,可以分配的资源达到了你的能力范围的顶端以后,没法再分配更多的资源了,公司资源有限;那么才是考虑去作后面的这些性能调优的点。apache
在咱们在生产环境中,提交spark做业时,用的spark-submit shell脚本,里面调整对应的参数数组
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ 配置executor的数量
--driver-memory 100m \ 配置driver的内存(影响不大)
--executor-memory 100m \ 配置每一个executor的内存大小
--executor-cores 3 \ 配置每一个executor的cpu core数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
复制代码
若是executor数量比较少,那么,可以并行执行的task数量就比较少,就意味着,咱们的Application的并行执行的能力就很弱。
好比有3个executor,每一个executor有2个cpu core,那么同时可以并行执行的task,就是6个。6个执行完之后,再换下一批6个task。 增长了executor数量之后,那么,就意味着,可以并行执行的task数量,也就变多了。好比原先是6个,如今可能能够并行执行10个,甚至20个,100个。那么并行能力就比以前提高了数倍,数十倍。 相应的,性能(执行的速度),也能提高数倍~数十倍。缓存
增长每一个executor的cpu core,也是增长了执行的并行能力。本来20个executor,每一个才2个cpu core。可以并行执行的task数量,就是40个task。
如今每一个executor的cpu core,增长到了5个。可以并行执行的task数量,就是100个task。 执行的速度,提高了2.5倍。bash
增长了内存量之后,对性能的提高,有三点:网络
并行度:其实就是指的是,Spark做业中,各个stage的task数量,也就表明了Spark做业的在各个阶段(stage)的并行度。架构
假设,如今已经在spark-submit脚本里面,给咱们的spark做业分配了足够多的资源,好比50个executor,每一个executor有10G内存,每一个executor有3个cpu core。基本已经达到了集群或者yarn队列的资源上限。
task没有设置,或者设置的不多,好比就设置了,100个task。50个executor,每一个executor有3个cpu core,也就是说,你的Application任何一个stage运行的时候,都有总数在150个cpu core,能够并行运行。可是你如今,只有100个task,平均分配一下,每一个executor分配到2个task,ok,那么同时在运行的task,只有100个,每一个executor只会并行运行2个task。每一个executor剩下的一个cpu core,就浪费掉了。
你的资源虽然分配足够了,可是问题是,并行度没有与资源相匹配,致使你分配下去的资源都浪费掉了。
应该是要设置的足够大,大到能够彻底合理的利用你的集群资源;好比上面的例子,总共集群有150个cpu core,能够并行运行150个task。那么就应该将你的Application的并行度,至少设置成150,才能彻底有效的利用你的集群资源,让150个task,并行执行;并且task增长到150个之后,便可以同时并行运行,还可让每一个task要处理的数据量变少;好比总共150G的数据要处理,若是是100个task,每一个task计算1.5G的数据;如今增长到150个task,能够并行运行,并且每一个task主要处理1G的数据就能够。
很简单的道理,只要合理设置并行度,就能够彻底充分利用你的集群计算资源,而且减小每一个task要处理的数据量,最终,就是提高你的整个Spark做业的性能和运行速度。
实际状况,与理想状况不一样的,有些task会运行的快一点,好比50s就完了,有些task,可能会慢一点,要1分半才运行完,因此若是你的task数量,恰好设置的跟cpu core数量相同,可能仍是会致使资源的浪费,由于,好比150个task,10个先运行完了,剩余140个还在运行,可是这个时候,有10个cpu core就空闲出来了,就致使了浪费。那若是task数量设置成cpu core总数的2~3倍,那么一个task运行完了之后,另外一个task立刻能够补上来,就尽可能让cpu core不要空闲,同时也是尽可能提高spark做业运行的效率和速度,提高性能。
spark.default.parallelism
SparkConf conf = new SparkConf()
.set("spark.default.parallelism", "500")
复制代码
如上图第一条DAG,默认状况下,屡次对一个RDD执行算子,去获取不一样的RDD;都会对这个RDD以及以前的父RDD,所有从新计算一次;因此在计算RDD3和RDD4的时候,前面的读取HDFS文件,而后对RDD1执行算子,获取 到RDD2会计算两遍。
这种状况,是绝对绝对,必定要避免的,一旦出现一个RDD重复计算的状况,就会致使性能急剧下降。好比,HDFS->RDD1-RDD2的时间是15分钟,那么此时就要走两遍,变成30分钟。
另一种状况,在上图第二条DAG中国,从一个RDD到几个不一样的RDD,算子和计算逻辑实际上是彻底同样的,结果由于人为的疏忽,计算了屡次,获取到了多个RDD。这个也是尽可能要避免的。
1. RDD架构重构与优化
尽可能去复用RDD,差很少的RDD,能够抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。
2. 公共RDD必定要实现持久化
对于要屡次计算和使用的公共RDD,必定要进行持久化。
持久化,也就是说,将RDD的数据缓存到内存中/磁盘中,(BlockManager),之后不管对这个RDD作多少次计算,那么都是直接取这个RDD的持久化的数据,好比从内存中或者磁盘中,直接提取一份数据。
3. 持久化,是能够进行序列化的
若是正常将数据持久化在内存中,那么可能会致使内存的占用过大,这样的话,也许,会致使OOM内存溢出。
当纯内存没法支撑公共RDD数据彻底存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每一个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减小内存的空间占用。
序列化的方式,惟一的缺点就是,在获取数据的时候,须要反序列化。
4. 为了数据的高可靠性,并且内存充足,可使用双副本机制,进行持久化
持久化的双副本机制,持久化后的一个副本,由于机器宕机了,副本丢了,就仍是得从新计算一次;持久化的每一个数据单元,存储一份副本,放在其余节点上面;从而进行容错;一个副本丢了,不用从新计算,还可使用另一份副本。
这种方式,仅仅针对你的内存资源极度充足
关于广播能够阅读 Spark学习(二)——RDD基础 中的共享变量。
task执行的算子中,使用了外部的变量,而后driver会把变量以task的形式发送到excutor端,每一个task都会获取一份变量的副本。若是有不少个task,就会有不少给excutor端携带不少个变量,若是这个变量很是大的时候,就可能会形成内存溢出。
好比,外部变量map是1M。总共,你前面调优都调的特好,资源给的到位,配合着资源,并行度调节的绝对到位,1000个task。大量task的确都在并行运行。
这些task里面都用到了占用1M内存的map,那么首先,map会拷贝1000份副本,经过网络传输到各个task中去,给task使用。总计有1G的数据,会经过网络传输。网络传输的开销,不容乐观啊!!!网络传输,也许就会消耗掉你的spark做业运行的总时间的一小部分。
map副本,传输到了各个task上以后,是要占用内存的。1个map的确不大,1M;1000个map分布在你的集群中,一会儿就耗费掉1G的内存。对性能会有什么影响呢?
没必要要的内存的消耗和占用,就致使了,你在进行RDD持久化到内存,也许就无法彻底在内存中放下;就只能写入磁盘,最后致使后续的操做在磁盘IO上消耗性能;
你的task在建立对象的时候,也许会发现堆内存放不下全部对象,也许就会致使频繁的垃圾回收器的回收,GC。GC的时候,必定是会致使工做线程中止,也就是致使Spark暂停工做那么一点时间。频繁GC的话,对Spark做业的运行的速度会有至关可观的影响。
Kryo序列化机制,一旦启用之后,会生效的几个地方:
SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
复制代码
Kryo之因此没有被做为默认的序列化类库的缘由,就要出现了:主要是由于Kryo要求,若是要达到它的最佳性能的话,那么就必定要注册你自定义的类(好比,你的算子函数中使用到了外部自定义类型的对象变量,这时,就要求必须注册你的类,不然Kryo达不到最佳性能)。
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
复制代码
fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的类库,提供了特殊类型的map、set、list和queue;
fastutil可以提供更小的内存占用,更快的存取速度;咱们使用fastutil提供的集合类,来替代本身平时使用的JDK的原生的Map、List、Set,好处在于,fastutil集合类,能够减少内存的占用,而且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素的值的时候,提供更快的存取速度;
fastutil也提供了64位的array、set和list,以及高性能快速的,以及实用的IO类,来处理二进制和文本类型的文件; fastutil最新版本要求Java 7以及以上版本;
Spark中应用fastutil的场景:
第一步:在pom.xml中引用fastutil的包
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
复制代码
第二步:List => IntList
IntList fastutilExtractList = new IntArrayList();
复制代码
可是呢,一般来讲,有时,事与愿违,可能task没有机会分配到它的数据所在的节点,为何呢,可能那个节点的计算资源和计算能力都满了;因此呢,这种时候,一般来讲,Spark会等待一段时间,默认状况下是3s钟(不是绝对的,还有不少种状况,对不一样的本地化级别,都会去等待),到最后,实在是等待不了了,就会选择一个比较差的本地化级别,好比说,将task分配到靠它要计算的数据所在节点,比较近的一个节点,而后进行计算。
可是对于第二种状况,一般来讲,确定是要发生数据传输,task会经过其所在节点的BlockManager来获取数据,BlockManager发现本身本地没有数据,会经过一个getRemote()方法,经过TransferService(网络数据传输组件)从数据所在节点的BlockManager中,获取数据,经过网络传输回task所在节点。
对于咱们来讲,固然不但愿是相似于第二种状况的了。最好的,固然是task和数据在一个节点上,直接从本地executor的BlockManager中获取数据,纯内存,或者带一点磁盘IO;若是要经过网络传输数据的话,那么实在是,性能确定会降低的,大量网络传输,以及磁盘IO,都是性能的杀手。
观察日志,spark做业的运行日志,推荐你们在测试的时候,先用client模式,在本地就直接能够看到比较全的日志。 日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL 观察大部分task的数据本地化级别
若是大多都是PROCESS_LOCAL,那就不用调节了 若是是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长 调节完,应该是要反复调节,每次调节完之后,再来运行,观察日志 看看大部分的task的本地化级别有没有提高;看看,整个spark做业的运行时间有没有缩短
你别本末倒置,本地化级别却是提高了,可是由于大量的等待时长,spark做业的运行时间反而增长了,那就仍是不要调节了
new SparkConf()
.set("spark.locality.wait", "10")
复制代码
默认状况下,下面3个的等待时长,都是跟上面那个是同样的,都是3s