Spark基本原理

Hadoop和Spark关系

Spark比Hadoop快的缘由:Hadoop在MapReduce后会将结果写入磁盘,第二次MapReduce再取出,Spark去除了两次运算间多余的IO消耗,直接将数据缓存在内存中。html

Spark运行原理

提交做业->启动Driver进程->申请资源,即Executor进程->做业代码分拆为stage执行->从上一次stage拉取所需key执行,直至任务完成->保存到Executor进程的内存或磁盘web

Spark基本运行原理

咱们使用spark-submit提交一个Spark做业以后,这个做业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不一样,Driver进程可能在本地启动,也可能在集群中某个工做节点上启动。而Driver进程要作的第一件事情,就是向集群管理器(能够是Spark Standalone集群,也能够是其余的资源管理集群,美团•大众点评使用的是YARN做为资源管理集群)申请运行Spark做业须要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据咱们为Spark做业设置的资源参数,在各个工做节点上,启动必定数量的Executor进程,每一个Executor进程都占有必定数量的内存和CPU core。缓存

在申请到了做业执行所需的资源以后,Driver进程就会开始调度和执行咱们编写的做业代码了。Driver进程会将咱们编写的Spark做业代码分拆为多个stage,每一个stage执行一部分代码片断,并为每一个stage建立一批Task,而后将这些Task分配到各个Executor进程中执行。Task是最小的计算单元,负责执行如出一辙的计算逻辑(也就是咱们本身编写的某个代码片断),只是每一个Task处理的数据不一样而已。一个stage的全部Task都执行完毕以后,会在各个节点本地的磁盘文件中写入计算中间结果,而后Driver就会调度运行下一个stage。下一个stage的Task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将咱们本身编写的代码逻辑所有执行完,而且计算完全部的数据,获得咱们想要的结果为止。网络

Spark是根据shuffle类算子来进行stage的划分。若是咱们的代码中执行了某个shuffle类算子(好比reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。能够大体理解为,shuffle算子执行以前的代码会被划分为一个stage,shuffle算子执行以及以后的代码会被划分为下一个stage。所以一个stage刚开始执行的时候,它的每一个Task可能都会从上一个stage的Task所在的节点,去经过网络传输拉取须要本身处理的全部key,而后对拉取到的全部相同的key使用咱们本身编写的算子函数执行聚合操做(好比reduceByKey()算子接收的函数),这个过程就是shuffle。多线程

当咱们在代码中执行了cache/persist等持久化操做时,根据咱们选择的持久化级别的不一样,每一个Task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。并发

所以Executor的内存主要分为三块:第一块是让Task执行咱们本身编写的代码时使用,默认是占Executor总内存的20%;第二块是让Task经过shuffle过程拉取了上一个stage的Task的输出后,进行聚合等操做时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。分布式

Task的执行速度是跟每一个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每一个Executor进程上分配到的多个Task,都是以每一个Task一条线程的方式,多线程并发运行的。若是CPU core数量比较充足,并且分配到的Task数量比较合理,那么一般来讲,能够比较快速和高效地执行完这些Task线程。ide

RDD

  • RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集;
  • RDD最重要的特性就是,提供了容错性,Spark能够重算
  • RDD存放在内存中,可是内存不足时能够写入磁盘缓存(弹性)
  1. RDD分布式是什么意思?函数

    一个RDD,在逻辑上抽象地表明了一个HDFS文件;它其实是被分为多个存放在spark不一样节点上的分区。好比说,RDD有900万数据。分为9个partition,9个分区。oop

  2. RDD弹性是什么意思,体如今哪一方面? 容错性体如今哪方面?

    a.RDD自动进行内存和磁盘之间权衡和切换的机制,就是RDD的弹性的特色所在。b.当它发现本身的数据丢失,会自动从本身来源的数据进行重计算,这一切对用户是彻底透明的。

shuffle和stage

shuffle是划分DAG中stage的标识,同时影响 Spark 执行速度的关键步骤。以下 DAG 流程图中,分别读取数据,通过处理后 join 2个 RDD 获得结果:

RDD 的Transformation函数中,又分为窄依赖(narrow dependency)和宽依赖(wide dependency)的操做.窄依赖跟宽依赖的区别是是否发生shuffle(洗牌) 操做.宽依赖会发生shuffle操做. 窄依赖是子 RDD的各个分片(partition)不依赖于其余分片,可以独立计算获得结果,宽依赖指 RDD 的各个分片会依赖于父RDD 的多个分片,因此会形成父 RDD 的各个分片在集群中从新分片。例子:

// Map: "cat" -> c, cat
val rdd1 = rdd.Map(x => (x.charAt(0), x))
// groupby same key and count
val rdd2 = rdd1.groupBy(x => x._1).
                Map(x => (x._1, x._2.toList.length))

第一个Map操做将 RDD 里的各个元素进行映射, RDD 的各个数据元素之间不存在依赖,能够在集群的各个内存中独立计算,也就是并行化,第二个 groupby以后的 Map 操做,为了计算相同 key 下的元素个数,须要把相同 key 的元素汇集到同一个 partition 下,因此形成了数据在内存中的从新分布,即 shuffle 操做.shuffle 操做是 spark中最耗时的操做,应尽可能避免没必要要的 shuffle.

开发调优

  • 避免建立重复的RDD
  • 尽量复用同一个RDD
  • 对屡次使用的RDD进行持久化

资源参数调优

num-executors

  • 参数说明:该参数用于设置Spark做业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽量按照你的设置来在集群的各个工做节点上,启动相应数量的Executor进程。这个参数很是之重要,若是不设置的话,默认只会给你启动少许的Executor进程,此时你的Spark做业的运行速度是很是慢的。
  • 参数调优建议:每一个Spark做业的运行通常设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都很差。设置的太少,没法充分利用集群资源;设置的太多的话,大部分队列可能没法给予充分的资源。

executor-memory

  • 参数说明:该参数用于设置每一个Executor进程的内存。Executor内存的大小,不少时候直接决定了Spark做业的性能,并且跟常见的JVM OOM异常,也有直接的关联。
  • 参数调优建议:每一个Executor进程的内存设置4G~8G较为合适。可是这只是一个参考值,具体的设置仍是得根据不一样部门的资源队列来定。能够看看本身团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,若是你是跟团队里其余人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你本身的Spark做业占用了队列全部的资源,致使别的同窗的做业没法运行。

executor-cores

  • 参数说明:该参数用于设置每一个Executor进程的CPU core数量。这个参数决定了每一个Executor进程并行执行task线程的能力。由于每一个CPU core同一时间只能执行一个task线程,所以每一个Executor进程的CPU core数量越多,越可以快速地执行完分配给本身的全部task线程。
  • 参数调优建议:Executor的CPU core数量设置为2~4个较为合适。一样得根据不一样部门的资源队列来定,能够看看本身的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每一个Executor进程能够分配到几个CPU core。一样建议,若是是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其余同窗的做业运行。

driver-memory

  • 参数说明:该参数用于设置Driver进程的内存。
  • 参数调优建议:Driver的内存一般来讲不设置,或者设置1G左右应该就够了。惟一须要注意的一点是,若是须要使用collect算子将RDD的数据所有拉取到Driver上进行处理,那么必须确保Driver的内存足够大,不然会出现OOM内存溢出的问题。

spark.default.parallelism

  • 参数说明:该参数用于设置每一个stage的默认task数量。这个参数极为重要,若是不设置可能会直接影响你的Spark做业性能。
  • 参数调优建议:Spark做业的默认task数量为500~1000个较为合适。不少同窗常犯的一个错误就是不去设置这个参数,那么此时就会致使Spark本身根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。一般来讲,Spark默认设置的数量是偏少的(好比就几十个task),若是task数量偏少的话,就会致使你前面设置好的Executor的参数都前功尽弃。试想一下,不管你的Executor进程有多少个,内存和CPU有多大,可是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!所以Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,好比Executor的总CPU core数量为300个,那么设置1000个task是能够的,此时能够充分地利用Spark集群的资源。

spark.storage.memoryFraction

  • 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,能够用来保存持久化的RDD数据。根据你选择的不一样的持久化策略,若是内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
  • 参数调优建议:若是Spark做业中,有较多的RDD持久化操做,该参数的值能够适当提升一些,保证持久化的数据可以容纳在内存中。避免内存不够缓存全部的数据,致使数据只能写入磁盘中,下降了性能。可是若是Spark做业中的shuffle类操做比较多,而持久化操做比较少,那么这个参数的值适当下降一些比较合适。此外,若是发现做业因为频繁的gc致使运行缓慢(经过spark web ui能够观察到做业的gc耗时),意味着task执行用户代码的内存不够用,那么一样建议调低这个参数的值。

spark.shuffle.memoryFraction

  • 参数说明:该参数用于设置shuffle过程当中一个task拉取到上个stage的task的输出后,进行聚合操做时可以使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操做。shuffle操做在进行聚合时,若是发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地下降性能。
  • 参数调优建议:若是Spark做业中的RDD持久化操做较少,shuffle操做较多时,建议下降持久化操做的内存占比,提升shuffle操做的内存占比比例,避免shuffle过程当中数据过多时内存不够用,必须溢写到磁盘上,下降了性能。此外,若是发现做业因为频繁的gc致使运行缓慢,意味着task执行用户代码的内存不够用,那么一样建议调低这个参数的值。
    ***

    http://blog.csdn.net/ap0810217/article/details/55195962
    https://tech.meituan.com/spark-tuning-basic.html
    http://blog.csdn.net/databatman/article/details/53023818