问题:
一、分配哪些资源?
二、在哪里分配这些资源?
三、为何多分配了这些资源之后,性能会获得提高?shell
/usr/local/spark/bin/spark-submit \ --class cn.spark.sparktest.core.WordCountCluster \ --num-executors 3 \ 配置executor的数量 --driver-memory 100m \ 配置driver的内存(影响很大) --executor-memory 100m \ 配置每一个executor的内存大小 --executor-cores 3 \ 配置每一个executor的cpu core数量 /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
第一种,Spark Standalone,公司集群上,搭建了一套Spark集群,你内心应该清楚每台机器还可以给你使用的,大概有多少内存,多少cpu core;那么,设置的时候,就根据这个实际的状况,去调节每一个spark做业的资源分配。好比说你的每台机器可以给你使用4G内存,2个cpu core;20台机器;executor,20;平均每一个executor:4G内存,2个cpu core。数组
第二种,Yarn。资源队列。资源调度。应该去查看,你的spark做业,要提交到的资源队列,大概有多少资源?500G内存,100个cpu core;executor,50;平均每一个executor:10G内存,2个cpu core。缓存
设置队列名称:spark.yarn.queue defaultbash
一个原则,你能使用的资源有多大,就尽可能去调节到最大的大小(executor的数量,几十个到上百个不等;executor内存;executor cpu core)架构
为何调节了资源之后,性能能够提高?app
增长executor:性能
若是executor数量比较少,那么,可以并行执行的task数量就比较少,就意味着,咱们的Application的并行执行的能力就很弱。好比有3个executor,每一个executor有2个cpu core,那么同时可以并行执行的task,就是6个。6个执行完之后,再换下一批6个task。增长了executor数量之后,那么,就意味着,可以并行执行的task数量,也就变多了。好比原先是6个,如今可能能够并行执行10个,甚至20个,100个。那么并行能力就比以前提高了数倍,数十倍。相应的,性能(执行的速度),也能提高数倍~数十倍。优化
有时候数据量比较少,增长大量的task反而性能会下降,为何?(想一想就明白了,你用多了,别人用的就少了。。。。)spa
增长每一个executor的cpu core:scala
也是增长了执行的并行能力。本来20个executor,每一个才2个cpu core。可以并行执行的task数量,就是40个task。如今每一个executor的cpu core,增长到了5个。可以并行执行的task数量,就是100个task。执行的速度,提高了2.5倍。
SparkContext,DAGScheduler,TaskScheduler,会将咱们的算子,切割成大量的task,
提交到Application的executor上面去执行。
增长每一个executor的内存量:
增长了内存量之后,对性能的提高,有三点:
一、若是须要对RDD进行cache,那么更多的内存,就能够缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减小了磁盘IO。
二、对于shuffle操做,reduce端,会须要内存来存放拉取的数据并进行聚合。若是内存不够,也会写入磁盘。若是给executor分配更多内存之后,就有更少的数据,须要写入磁盘,
甚至不须要写入磁盘。减小了磁盘IO,提高了性能。
三、对于task的执行,可能会建立不少对象。若是内存比较小,可能会频繁致使JVM堆内存满了,而后频繁GC,垃圾回收,minor GC和full GC。(速度很慢)。内存加大之后,带来更少的GC,垃圾回收,避免了速度变慢,速度变快了。
Spark并行度指的是什么?
Spark做业,Application,Jobs,action(collect)触发一个job,1个job;每一个job拆成多个stage,
发生shuffle的时候,会拆分出一个stage,reduceByKey。
stage0 val lines = sc.textFile("hdfs://") val words = lines.flatMap(_.split(" ")) val pairs = words.map((_,1)) val wordCount = pairs.reduceByKey(_ + _) stage1 val wordCount = pairs.reduceByKey(_ + _) wordCount.collect()
reduceByKey,stage0的task,在最后,执行到reduceByKey的时候,会为每一个stage1的task,都建立一份文件(也多是合并在少许的文件里面);每一个stage1的task,会去各个节点上的各个task建立的属于本身的那一份文件里面,拉取数据;每一个stage1的task,拉取到的数据,必定是相同key对应的数据。对相同的key,对应的values,才能去执行咱们自定义的function操做(_ + _)
并行度:其实就是指的是,Spark做业中,各个stage的task数量,也就表明了Spark做业的在各个阶段(stage)的并行度。
若是不调节并行度,致使并行度太低,会怎么样?
task数量,至少设置成与Spark application的总cpu core数量相同(最理想状况,好比总共150个cpu core,分配了150个task,一块儿运行,差很少同一时间运行完毕)
官方是推荐,task数量,设置成spark application总cpu core数量的2~3倍,好比150个cpu core,基本要设置task数量为300~500;实际状况,与理想状况不一样的,有些task会运行的快一点,好比50s就完了,有些task,可能会慢一点,要1分半才运行完,因此若是你的task数量,恰好设置的跟cpu core数量相同,可能仍是会致使资源的浪费,由于,好比150个task,10个先运行完了,剩余140个还在运行,可是这个时候,有10个cpu core就空闲出来了,就致使了浪费。那若是task数量设置成cpu core总数的2~3倍,那么一个task运行完了之后,另外一个task立刻能够补上来,就尽可能让cpu core不要空闲,同时也是尽可能提高spark做业运行的效率和速度,提高性能。
如何设置一个Spark Application的并行度?
spark.default.parallelism SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")
默认状况下,屡次对一个RDD执行算子,去获取不一样的RDD;都会对这个RDD以及以前的父RDD,所有从新计算一次;读取HDFS->RDD1->RDD2-RDD4这种状况,是绝对绝对,必定要避免的,一旦出现一个RDD重复计算的状况,就会致使性能急剧下降。好比,HDFS->RDD1-RDD2的时间是15分钟,那么此时就要走两遍,变成30分钟
持久化,很简单,就是对RDD调用persist()方法,并传入一个持久化级别