一. Spark做业原理
咱们使用spark-submit提交一个Spark做业以后,这个做业就会启动一个对应的Driver进程。该进程是向集群管理器(Yarn,K8s)申请运行Spark做业须要使用的资源,这里的资源指的就是Executor进程。
YARN集群管理器会根据咱们为Spark做业设置的资源参数,在各个工做节点上,启动必定数量的Executor进程,每一个Executor进程都占有必定数量的内存和CPU core。
在申请到了做业执行所需的资源以后,Driver进程就会开始调度和执行咱们编写的做业代码了。
Driver进程会将咱们编写的Spark做业代码分拆为多个stage,每一个stage执行一部分代码片断,并为每一个stage建立一批task,而后将这些task分配到各个Executor进程中执行。
task是最小的计算单元,负责执行如出一辙的计算逻辑(也就是咱们本身编写的某个代码片断),只是每一个task处理的数据不一样而已。
一个stage的全部task都执行完毕以后,会在各个节点本地的磁盘文件中写入计算中间结果,而后Driver就会调度运行下一个stage。
下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将咱们本身编写的代码逻辑所有执行完,而且计算完全部的数据,获得咱们想要的结果为止。
Spark是根据shuffle类算子来进行stage的划分。若是咱们的代码中执行了某个shuffle类算子(好比reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。
能够大体理解为,shuffle算子执行以前的代码会被划分为一个stage,shuffle算子执行以及以后的代码会被划分为下一个stage。
所以一个stage刚开始执行的时候,它的每一个task可能都会从上一个stage的task所在的节点,去经过网络传输拉取须要本身处理的全部key,而后对拉取到的全部相同的key使用咱们本身编写的算子函数执行聚合操做(好比reduceByKey()算子接收的函数)。这个过程就是shuffle。
当咱们在代码中执行了cache/persist等持久化操做时,根据咱们选择的持久化级别的不一样,每一个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。
所以Executor的内存主要分为三块:
第一块是让task执行咱们本身编写的代码时使用,默认是占Executor总内存的20%;
第二块是让task经过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操做时使用,默认也是占Executor总内存的20%;
第三块是让RDD持久化时使用,默认占Executor总内存的60%。
task的执行速度是跟每一个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每一个Executor进程上分配到的多个task,都是以每一个task一条线程的方式,多线程并发运行的。
若是CPU core数量比较充足,并且分配到的task数量比较合理,那么一般来讲,能够比较快速和高效地执行完这些task线程。
二.核心调优参数
num-executors:
该参数用于设置Spark做业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽量按照你的设置来在集群的各个工做节点上,启动相应数量的Executor进程。这个参数很是之重要,若是不设置的话,默认只会给你启动少许的Executor进程,此时你的Spark做业的运行速度是很是慢的。(建议50~100个左右的Executor进程)
executor-memory:
该参数用于设置每一个Executor进程的内存。Executor内存的大小,不少时候直接决定了Spark做业的性能,并且跟常见的JVM OOM异常,也有直接的关联。(根据做业大小不一样,建议设置4G~8G,num-executors乘以executor-memory,是不能超过队列的最大内存量的)
executor-cores:
该参数用于设置每一个Executor进程的CPU core数量。这个参数决定了每一个Executor进程并行执行task线程的能力。由于每一个CPU core同一时间只能执行一个task线程,所以每一个Executor进程的CPU core数量越多,越可以快速地执行完分配给本身的全部task线程。(建议设置为2~4个,且num-executors * executor-cores不要超过队列总CPU core的1/3~1/2)
driver-memory:
该参数用于设置Driver进程的内存(建议设置512M到1G)。
spark.default.parallelism:
该参数用于设置每一个stage的默认task数量。这个参数极为重要,若是不设置可能会直接影响你的Spark做业性能。(建议为50~500左右,缺省状况下Spark本身根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。Spark官网建议设置该参数为num-executors * executor-cores的2~3倍较为合适)
spark.storage.memoryFraction:
该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6(原则上是尽量保证数据可以所有在内存中,但若是发现做业发生频繁的GC,就该考虑是否调小)
spark.shuffle.memoryFraction:
该参数用于设置shuffle过程当中一个task拉取到上个stage的task的输出后,进行聚合操做时可以使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操做。shuffle操做在进行聚合时,若是发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地下降性能。(shuffle操做较多时,建议下降持久化操做的内存占比,提升shuffle操做的内存占比比例,避免shuffle过程当中数据过多时内存不够用,必须溢写到磁盘上,下降了性能)
微信扫描二维码,关注个人公众号