Spark性能调优

1、分配资源
最大调节分配资源:
经常使用的资源调度模式有Spark Standalone和Spark On Yarn。好比说你的每台机器可以给你使用60G内存,10个cpu core,20台机器。那么executor的数量是20。平均每一个executor所能分配60G内存和10个cpu core。
1.增长executor数量:提高了并行度
2.增长每一个executor的CPU Core:提升了并行度
3.增长每一个executor的内存量:
对于RDD的cache,减小了磁盘IO
对于shuffle操做的reduce端:减小了磁盘IO
对于task的执行,可能会建立不少对象,内存小的话会频繁致使JVM堆内存满了,就会频繁的进行GC。
2、executor、task、cpu Core数量关系调优
1.并行度
task数量,至少设置与Spark Application的总CPU Core 数量相同。官方推荐,task数量,设置成spark application总CPU core数量的2-3倍。
假如集群有50个executor,每一个executor10个G内存,每一个executor有3个CPU core。150个CPU Core最多并行执行150个task
3、RDD架构重构以及RDD持久化
一、RDD架构重构
复用RDD,抽取共同的RDD
二、公共RDD的持久化
将RDD的数据缓存到内存中/磁盘中(BlockManager
三、持久化的序列化
减小内存开销
四、内存充足,双副本持久化
4、广播变量
一台节点有一个或者多个Executor,一个Executor有多个Task(task数量由Executor里的Cpu Core数量一对一决定)
广播变量:
将固定的,只读的数据变量提早广播给各个Executor,该Executor上的各个Task再从所在节点的BlockManager获取广播变量,若是本地的Executor没有就从Driver远程拉取变量副本,并保存到本地的BlockManager中.解决了每次将固定的,只读的数据用Driver广播到各个Task上的繁重且效率低下的问题.
使用方法:
1.调用SparkContext.broadcast建立一个Broadcast[T]对象.任何序列化的类型均可以这么实现
2.经过 .value 获取对象的值
3.变量只会被发送到各个节点一次,应该做为只读值处理(修改这个值不会影响到别的节点)
5、使用Kryo序列化
默认Spark内部的序列化机制:
Java的ObjectOutputStream/ObjectInputStream:
优势: 使用方便,实现Serializable接口便可
缺点: 效率低,序列化速度慢,序列化后的数据相对大
Spark支持使用Kryo序列化机制:
比Java速度快
序列化后的数据小 Java的十分之一大小
因此网络传输的数据变少,在集群中耗费的内存资源大大减小
使用Kryo后影响的几个地方
1.优化网络传输的性能,优化集群中内存占用和消耗
2.持久化RDD,优化内存的占用和消耗.持久化RDD的内存占用的内存越小,task执行的时候建立的对象,就不至于频繁的占满内存,频繁发生GC
3.shuffle 优化网络传输的性能
Kryo的使用:
Spark中Scala语法的Kryo使用:
//建立一个集合存储输入输出,字典文件,停用词库
val Array(inputPath,outputPath,dicPath,stopwords,date) = args
val conf = new SparkConf()
.setAppName(s"${this.getClass.getName}").setMaster("local[*]")
//使用Kryo序列胡
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val sQLContext = new SQLContext(sc)
//spark 1.6 版本时候默认的压缩方式仍是snappy,2.0以后才默认snappy
sQLContext.setConf("spark.sql.parquet.compression.codec","snappy")
6、使用fastutil优化数据格式
Fastutil:(最新版本要求Java7及以上版本)
概念:
是扩展了Java标准的集合框架(Map,List,Set,HashMap,ArrayList,HashSet)的类库,提供了特殊类型的map,set,list和queue
优点:
代替JDK的原生的集合框架,占内存小,存取速度快
提供了64位的array,set,list的集合列表,实用的IO类,来处理二进制和文本类型的文件
每种集合类型都实现了对应的java中的标准接口
支持双向迭代器
支持引用类型 可是对引用类型是使用等于号(=)进行比较的,而不是equals()方法
应用:
1.算子函数使用了外部变量:
使用fastutil改写外部变量,首先从源头上就减小内存的占用,经过广播变量进一步减小内存占用,再经过Kryo序列化类库进一步减小内存占用
2.在task任务执行的计算逻辑里(算子函数),若是逻辑中有要建立较大的Map,List集合,可使用fastutil类库重写,减小了task建立出来的集合类型的内存占用,避免executor内存频繁占满,频繁唤起GC,致使性能降低.
fastutil调优的说明:
广播变量、Kryo序列化类库、fastutil,都是以前所说的,对于性能来讲,相似于一种调味品,烤鸡,原本就很好吃了,而后加了一点特质的孜然麻辣粉调料,就更加好吃了一点。分配资源、并行度、RDD架构与持久化,这三个就是烤鸡。broadcast、kryo、fastutil,相似于调料。
好比:通过一些调优以后30分钟的任务,通过broadcast,kryo,fastutil调优多是29,28或者25,通过shuffle调优15分,groupbykey用reducebykey改写,执行本地聚合也许就只有10分钟,利用资源更大的YARN队列,1分钟.
fastutil使用:
在pom.xml中引用fastutil的包
<dependency>
    <groupId>fastutil</groupId>
    <artifactId>fastutil</artifactId>
    <version>5.0.9</version>
</dependency>
List<Integer> 至关于 IntList
基本都是相似于IntList的格式,前缀就是集合的元素类型。特殊的就是Map,Int2IntMap,表明了key-value映射的元素类型。除此以外,还支持object、reference
7、调节数据本地化等待时长
task的locality的五种方式:
PROCESS_LOCAL:进程本地化
NODE_LOCAL:节点本地化
NO_PREF:对于task来讲,数据从哪里获取都同样,没有优点之分
RACK_LOCAL:机架本地化
ANY:数据和task可能在集群中的任何地方,并且再也不一个机架中,性能最差.
Spark的任务调度:
Spark在Driver上,对Application的每个stage的task进行分配以前都会计算出每一个task要计算的是哪一个分片数据.
首先会优先将task正好分配到他要计算的数据所在的节点,这样就避免了网络传输数据.
可是可能会遇到那个节点的计算资源和计算能力都满了.这种时候,Spark默认等待3秒(不是绝对,还有不少种状况,对于不一样的本地化级别,都会去等待),到最后实在等不下去,就会选择一个差的本地化级别.
对于第二种差的状况,确定会发生数据传输,task会经过其所在的节点的blockManager来获取数据,BlockManager发现本身本地没有数据,会经过一个getRemote()方法,经过TransferService(网络传输组件)从数据所在节点的BlockManager中,获取数据.
适当的调节本地化等待时长:
观察Spark做业的运行日志,看数据本地化级别的信息,若是大可能是PROCESS_LOCALL,就不用调节,若大可能是NODE_LOCAL,ANY就须要调节,反复的看日志,以及做业运行时间,不要一味地提高本地化级别形成了本末倒置.
调节方式:
spark.locality.wait ,默认是3s。6s,10s
默认状况下,下面3个的等待时长,都是跟上面那个是同样的,都是3s
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
new SparkConf().set("spark.locality.wait", "10")
相关文章
相关标签/搜索