spark-性能调优

问题:
一、分配哪些资源?
二、在哪里分配这些资源?
三、为何多分配了这些资源之后,性能会获得提高?shell

  • 分配哪些资源?executor、cpu per executor、memory per executor、driver memory
  • 在哪里分配这些资源?在咱们在生产环境中,提交spark做业时,用的spark-submit shell脚本,里面调整对应的参数
    /usr/local/spark/bin/spark-submit \
    --class cn.spark.sparktest.core.WordCountCluster \
    --num-executors 3 \  配置executor的数量
    --driver-memory 100m \  配置driver的内存(影响很大)
    --executor-memory 100m \  配置每一个executor的内存大小
    --executor-cores 3 \  配置每一个executor的cpu core数量
    /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

     

  • 调节到多大,算是最大呢?
    • 第一种,Spark Standalone,公司集群上,搭建了一套Spark集群,你内心应该清楚每台机器还可以给你使用的,大概有多少内存,多少cpu core;那么,设置的时候,就根据这个实际的状况,去调节每一个spark做业的资源分配。好比说你的每台机器可以给你使用4G内存,2个cpu core;20台机器;executor,20;平均每一个executor:4G内存,2个cpu core。数组

    • 第二种,Yarn。资源队列。资源调度。应该去查看,你的spark做业,要提交到的资源队列,大概有多少资源?500G内存,100个cpu core;executor,50;平均每一个executor:10G内存,2个cpu core。缓存

    • 设置队列名称:spark.yarn.queue defaultbash

    • 一个原则,你能使用的资源有多大,就尽可能去调节到最大的大小(executor的数量,几十个到上百个不等;executor内存;executor cpu core)架构

  • 为何调节了资源之后,性能能够提高?app

    • 增长executor:性能

      若是executor数量比较少,那么,可以并行执行的task数量就比较少,就意味着,咱们的Application的并行执行的能力就很弱。好比有3个executor,每一个executor有2个cpu core,那么同时可以并行执行的task,就是6个。6个执行完之后,再换下一批6个task。增长了executor数量之后,那么,就意味着,可以并行执行的task数量,也就变多了。好比原先是6个,如今可能能够并行执行10个,甚至20个,100个。那么并行能力就比以前提高了数倍,数十倍。相应的,性能(执行的速度),也能提高数倍~数十倍。优化

      有时候数据量比较少,增长大量的task反而性能会下降,为何?(想一想就明白了,你用多了,别人用的就少了。。。。)spa

    • 增长每一个executor的cpu core:scala

      也是增长了执行的并行能力。本来20个executor,每一个才2个cpu core。可以并行执行的task数量,就是40个task。如今每一个executor的cpu core,增长到了5个。可以并行执行的task数量,就是100个task。执行的速度,提高了2.5倍。

      SparkContext,DAGScheduler,TaskScheduler,会将咱们的算子,切割成大量的task,
      提交到Application的executor上面去执行。

    • 增长每一个executor的内存量:

      增长了内存量之后,对性能的提高,有三点:
      一、若是须要对RDD进行cache,那么更多的内存,就能够缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减小了磁盘IO。
      二、对于shuffle操做,reduce端,会须要内存来存放拉取的数据并进行聚合。若是内存不够,也会写入磁盘。若是给executor分配更多内存之后,就有更少的数据,须要写入磁盘,
      甚至不须要写入磁盘。减小了磁盘IO,提高了性能。
      三、对于task的执行,可能会建立不少对象。若是内存比较小,可能会频繁致使JVM堆内存满了,而后频繁GC,垃圾回收,minor GC和full GC。(速度很慢)。内存加大之后,带来更少的GC,垃圾回收,避免了速度变慢,速度变快了。

Spark并行度指的是什么?

Spark做业,Application,Jobs,action(collect)触发一个job,1个job;每一个job拆成多个stage,
发生shuffle的时候,会拆分出一个stage,reduceByKey。

stage0
val lines = sc.textFile("hdfs://")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val wordCount = pairs.reduceByKey(_ + _)

stage1
val wordCount = pairs.reduceByKey(_ + _)
wordCount.collect()

reduceByKey,stage0的task,在最后,执行到reduceByKey的时候,会为每一个stage1的task,都建立一份文件(也多是合并在少许的文件里面);每一个stage1的task,会去各个节点上的各个task建立的属于本身的那一份文件里面,拉取数据;每一个stage1的task,拉取到的数据,必定是相同key对应的数据。对相同的key,对应的values,才能去执行咱们自定义的function操做(_ + _)

并行度:其实就是指的是,Spark做业中,各个stage的task数量,也就表明了Spark做业的在各个阶段(stage)的并行度。

若是不调节并行度,致使并行度太低,会怎么样?

  • task没有设置,或者设置的不多,好比就设置了,100个task。50个executor,每一个executor有3个cpu core,也就是说,你的Application任何一个stage运行的时候,都有总数在150个cpu core,能够并行运行。可是你如今,只有100个task,平均分配一下,每一个executor分配到2个task,ok,那么同时在运行的task,只有100个,每一个executor只会并行运行2个task。每一个executor剩下的一个cpu core,就浪费掉了。
  • 你的资源虽然分配足够了,可是问题是,并行度没有与资源相匹配,致使你分配下去的资源都浪费掉了。合理的并行度的设置,应该是要设置的足够大,大到能够彻底合理的利用你的集群资源;好比上面的例子,总共集群有150个cpu core,能够并行运行150个task。那么就应该将你的Application的并行度,至少设置成150,才能彻底有效的利用你的集群资源,让150个task,并行执行;并且task增长到150个之后,便可以同时并行运行,还可让每一个task要处理的数据量变少;好比总共150G的数据要处理,若是是100个task,每一个task计算1.5G的数据;如今增长到150个task,能够并行运行,并且每一个task主要处理1G的数据就能够。
  • 很简单的道理,只要合理设置并行度,就能够彻底充分利用你的集群计算资源,而且减小每一个task要处理的数据量,最终,就是提高你的整个Spark做业的性能和运行速度。
    1. task数量,至少设置成与Spark application的总cpu core数量相同(最理想状况,好比总共150个cpu core,分配了150个task,一块儿运行,差很少同一时间运行完毕)

    2. 官方是推荐,task数量,设置成spark application总cpu core数量的2~3倍,好比150个cpu core,基本要设置task数量为300~500;实际状况,与理想状况不一样的,有些task会运行的快一点,好比50s就完了,有些task,可能会慢一点,要1分半才运行完,因此若是你的task数量,恰好设置的跟cpu core数量相同,可能仍是会致使资源的浪费,由于,好比150个task,10个先运行完了,剩余140个还在运行,可是这个时候,有10个cpu core就空闲出来了,就致使了浪费。那若是task数量设置成cpu core总数的2~3倍,那么一个task运行完了之后,另外一个task立刻能够补上来,就尽可能让cpu core不要空闲,同时也是尽可能提高spark做业运行的效率和速度,提高性能。

    3. 如何设置一个Spark Application的并行度?

      spark.default.parallelism 
      SparkConf conf = new SparkConf().set("spark.default.parallelism", "500")

       

默认状况下,屡次对一个RDD执行算子,去获取不一样的RDD;都会对这个RDD以及以前的父RDD,所有从新计算一次;读取HDFS->RDD1->RDD2-RDD4这种状况,是绝对绝对,必定要避免的,一旦出现一个RDD重复计算的状况,就会致使性能急剧下降。好比,HDFS->RDD1-RDD2的时间是15分钟,那么此时就要走两遍,变成30分钟

  1. RDD架构重构与优化尽可能去复用RDD,差很少的RDD,能够抽取称为一个共同的RDD,供后面的RDD计算时,反复使用。
  2. 公共RDD必定要实现持久化。就比如北方吃饺子,现包现煮。你人来了,要点一盘饺子。馅料+饺子皮+水->包好的饺子,对包好的饺子去煮,煮开了之后,才有你须要的熟的,热腾腾的饺子。现实生活中,饺子现包现煮,固然是最好的了。可是Spark中,RDD要去“现包现煮”,那就是一场致命的灾难。对于要屡次计算和使用的公共RDD,必定要进行持久化。持久化,也就是说,将RDD的数据缓存到内存中/磁盘中,(BlockManager),之后不管对这个RDD作多少次计算,那么都是直接取这个RDD的持久化的数据,好比从内存中或者磁盘中,直接提取一份数据。
  3. 持久化,是能够进行序列化的若是正常将数据持久化在内存中,那么可能会致使内存的占用过大,这样的话,也许,会致使OOM内存溢出。当纯内存没法支撑公共RDD数据彻底存放的时候,就优先考虑,使用序列化的方式在纯内存中存储。将RDD的每一个partition的数据,序列化成一个大的字节数组,就一个对象;序列化后,大大减小内存的空间占用。序列化的方式,惟一的缺点就是,在获取数据的时候,须要反序列化。若是序列化纯内存方式,仍是致使OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。内存+磁盘,序列化。
  4. 为了数据的高可靠性,并且内存充足,可使用双副本机制,进行持久化持久化的双副本机制,持久化后的一个副本,由于机器宕机了,副本丢了,就仍是得从新计算一次;持久化的每一个数据单元,存储一份副本,放在其余节点上面;从而进行容错;一个副本丢了,不用从新计算,还可使用另一份副本。这种方式,仅仅针对你的内存资源极度充足.

持久化,很简单,就是对RDD调用persist()方法,并传入一个持久化级别

  • 若是是persist(StorageLevel.MEMORY_ONLY()),纯内存,无序列化,那么就能够用cache()方法来替代
    • StorageLevel.MEMORY_ONLY_SER(),第二选择
    • StorageLevel.MEMORY_AND_DISK(),第三选择
    • StorageLevel.MEMORY_AND_DISK_SER(),第四选择
    • StorageLevel.DISK_ONLY(),第五选择
  • 若是内存充足,要使用双副本高可靠机制,选择后缀带_2的策略
    • StorageLevel.MEMORY_ONLY_2()
相关文章
相关标签/搜索