spark参数调优

Spark 性能调优参数总结html

一、Shuffle 相关node

Shuffle 操做大概是对Spark 性能影响最大的步骤之一(由于可能涉及到排序,磁盘IO,网算法

络IO 等众多CPU 或IO 密集的操做),这也是为何在Spark 1.1 的代码中对整个Shuffleapache

框架代码进行了重构,将Shuffle 相关读写操做抽象封装到Pluggable 的Shuffle Manager网络

中,便于试验和实现不一样的Shuffle 功能模块。例如为了解决Hash Based 的Shuffle Manager数据结构

在文件读写效率方面的问题而实现的Sort Base 的Shuffle Manager。并发

 

①spark.shuffle.managerapp

用来配置所使用的Shuffle Manager,目前可选的Shuffle Manager 包括默认的框架

org.apache.spark.shuffle.sort.HashShuffleManager(配置参数值为hash)和新的oop

org.apache.spark.shuffle.sort.SortShuffleManager(配置参数值为sort)。

这两个ShuffleManager 如何选择呢,首先须要了解他们在实现方式上的区别。

 

HashShuffleManager,故名思义也就是在Shuffle 的过程当中写数据时不作排序操做,只是将

数据根据Hash 的结果,将各个Reduce 分区的数据写到各自的磁盘文件中。带来的问题就

是若是Reduce 分区的数量比较大的话,将会产生大量的磁盘文件。若是文件数量特别巨大,

对文件读写的性能会带来比较大的影响,此外因为同时打开的文件句柄数量众多,序列化,

以及压缩等操做须要分配的临时内存空间也可能会迅速膨胀到没法接受的地步,对内存的使

用和GC 带来很大的压力,在Executor 内存比较小的状况下尤其突出,例如Spark on Yarn

模式。

 

SortShuffleManager,是1.1 版本以后实现的一个试验性(也就是一些功能和接口还在开发

演变中)的ShuffleManager,它在写入分区数据的时候,首先会根据实际状况对数据采用

不一样的方式进行排序操做,底线是至少按照Reduce 分区Partition 进行排序,这样来至于

同一个Map 任务Shuffle 到不一样的Reduce 分区中去的全部数据均可以写入到同一个外部磁

盘文件中去,用简单的Offset 标志不一样Reduce 分区的数据在这个文件中的偏移量。这样

一个Map 任务就只须要生成一个shuffle 文件,从而避免了上述HashShuffleManager 可能

遇到的文件数量巨大的问题

 

二者的性能比较,取决于内存,排序,文件操做等因素的综合影响。

对于不须要进行排序的Shuffle 操做来讲,如repartition 等,若是文件数量不是特别巨大,

HashShuffleManager 面临的内存问题不大,而SortShuffleManager 须要额外的根据

Partition 进行排序,显然HashShuffleManager 的效率会更高。

 

而对于原本就须要在Map 端进行排序的Shuffle 操做来讲,如ReduceByKey 等,使用

HashShuffleManager 虽然在写数据时不排序,但在其它的步骤中仍然须要排序,而

SortShuffleManager 则能够将写数据和排序两个工做合并在一块儿执行,所以即便不考虑

HashShuffleManager 的内存使用问题,SortShuffleManager 依旧可能更快。

 

②spark.shuffle.sort.bypassMergeThreshold

这个参数仅适用于SortShuffleManager,如前所述,SortShuffleManager 在处理不须要排

序的Shuffle 操做时,因为排序带来性能的降低。这个参数决定了在这种状况下,当Reduce

分区的数量小于多少的时候,在SortShuffleManager 内部不使用Merge Sort 的方式处理数

据,而是与Hash Shuffle 相似,直接将分区文件写入单独的文件,不一样的是,在最后一步

仍是会将这些文件合并成一个单独的文件。这样经过去除Sort 步骤来加快处理速度,代价

是须要并发打开多个文件,因此内存消耗量增长,本质上是相对HashShuffleMananger 一

个折衷方案。这个参数的默认值是200 个分区,若是内存GC 问题严重,能够下降这个值。

 

spark.shuffle.consolidateFiles

这个配置参数仅适用于HashShuffleMananger 的实现,一样是为了解决生成过多文件的问

题,采用的方式是在不一样批次运行的Map 任务之间重用Shuffle 输出文件,也就是说合并

的是不一样批次的Map 任务的输出数据,可是每一个Map 任务所须要的文件仍是取决于Reduce

分区的数量,所以,它并不减小同时打开的输出文件的数量,所以对内存使用量的减小并没

有帮助。只是HashShuffleManager 里的一个折中的解决方案。

须要注意的是,这部分的代码实现尽管原理上说很简单,可是涉及到底层具体的文件系统的

实现和限制等因素,例如在并发访问等方面,须要处理的细节不少,所以一直存在着这样那

样的bug 或者问题,致使在例如EXT3 上使用时,特定状况下性能反而可能降低,所以从

Spark 0.8 的代码开始,一直尚未被标志为Stable,不是默认采用的方式。此外由于并不

减小同时打开的输出文件的数量,所以对性能具体能带来多大的改善也取决于具体的文件数

量的状况。因此即便你面临着Shuffle 文件数量巨大的问题,这个配置参数是否使用,在什

么版本中可使用,也最好仍是实际测试之后再决定。

③spark.shuffle.spill

shuffle 的过程当中,若是涉及到排序,聚合等操做,势必会须要在内存中维护一些数据结构,

进而占用额外的内存。若是内存不够用怎么办,那只有两条路能够走,一就是out of memory

出错了,二就是将部分数据临时写到外部存储设备中去,最后再合并到最终的Shuffle 输出

文件中去。

这里spark.shuffle.spill 决定是否Spill 到外部存储设备(默认打开),若是你的内存足够使

用,或者数据集足够小,固然也就不须要Spill,毕竟Spill 带来了额外的磁盘操做。

spark.shuffle.memoryFraction / spark.shuffle.safetyFraction

在启用Spill 的状况下,spark.shuffle.memoryFraction(1.1 后默认为0.2)决定了当Shuffle

过程当中使用的内存达到总内存多少比例的时候开始Spill。

经过spark.shuffle.memoryFraction 能够调整Spill 的触发条件,即Shuffle 占用内存的大小,

进而调整Spill 的频率和GC 的行为。总的来讲,若是Spill 太过频繁,能够适当增长

spark.shuffle.memoryFraction 的大小,增长用于Shuffle 的内存,减小Spill 的次数。固然

这样一来为了不内存溢出,对应的可能须要减小RDD cache 占用的内存,即减少

spark.storage.memoryFraction 的值,这样RDD cache 的容量减小,有可能带来性能影响,

所以须要综合考虑。

因为Shuffle 数据的大小是估算出来的,一来为了下降开销,并非每增长一个数据项都完

整的估算一次,二来估算也会有偏差,因此实际暂用的内存可能比估算值要大,这里

spark.shuffle.safetyFraction(默认为0.8)用来做为一个保险系数,下降实际Shuffle 使用

的内存阀值,增长必定的缓冲,下降实际内存占用超过用户配置值的几率。

 

④spark.shuffle.spill.compress / spark.shuffle.compress

这两个配置参数都是用来设置Shuffle 过程当中是否使用压缩算法对Shuffle 数据进行压缩,

前者针对Spill 的中间数据,后者针对最终的shuffle 输出文件,默认都是True

理论上说,spark.shuffle.compress 设置为True 一般都是合理的,由于若是使用千兆如下

的网卡,网络带宽每每最容易成为瓶颈。此外,目前的Spark 任务调度实现中,以Shuffle

划分Stage,下一个Stage 的任务是要等待上一个Stage 的任务所有完成之后才能开始执

行,因此shuffle 数据的传输和CPU 计算任务之间一般不会重叠,这样Shuffle 数据传输量

的大小和所需的时间就直接影响到了整个任务的完成速度。可是压缩也是要消耗大量的

CPU 资源的,因此打开压缩选项会增长Map 任务的执行时间,所以若是在CPU 负载的影

响远大于磁盘和网络带宽的影响的场合下,也可能将spark.shuffle.compress 设置为False

才是最佳的方案

对于spark.shuffle.spill.compress 而言,状况相似,可是spill 数据不会被发送到网络中,仅

仅是临时写入本地磁盘,并且在一个任务中同时须要执行压缩和解压缩两个步骤,因此对

CPU 负载的影响会更大一些,而磁盘带宽(若是标配12HDD 的话)可能每每不会成为Spark

应用的主要问题,因此这个参数相对而言,或许更有机会须要设置为False。

总之,Shuffle 过程当中数据是否应该压缩,取决于CPU/DISK/NETWORK 的实际能力和负载,

应该综合考虑。

 

二、Storage 相关配置参数

spark.local.dir

Spark 用于写中间数据,如RDD Cache,Shuffle,Spill 等数据的位置,那么有什么能够注

意的呢。

首先,最基本的固然是咱们能够配置多个路径(用逗号分隔)到多个磁盘上增长总体IO 带

宽。

其次,目前的实现中,Spark 是经过对文件名采用hash 算法分布到多个路径下的目录中去,

若是你的存储设备有快有慢,好比SSD+HDD 混合使用,那么你能够经过在SSD 上配置更

多的目录路径来增大它被Spark 使用的比例,从而更好地利用SSD 的IO 带宽能力。固然

这只是一种变通的方法,终极解决方案仍是应该像目前HDFS 的实现方向同样,让Spark

可以感知具体的存储设备类型,针对性的使用。

须要注意的是,在Spark 1.0 之后,SPARK_LOCAL_DIRS (Standalone, Mesos) or

LOCAL_DIRS (YARN)参数会覆盖这个配置。好比Spark On YARN 的时候,Spark Executor

的本地路径依赖于Yarn 的配置,而不取决于这个参数。

spark.executor.memory

Executor 内存的大小,和性能自己固然并无直接的关系,可是几乎全部运行时性能相关

的内容都或多或少间接和内存大小相关。这个参数最终会被设置到Executor 的JVM 的heap

尺寸上,对应的就是Xmx 和Xms 的值

理论上Executor 内存固然是多多益善,可是实际受机器配置,以及运行环境,资源共享,

JVM GC 效率等因素的影响,仍是有可能须要为它设置一个合理的大小。多大算合理,要看

实际状况

Executor 的内存基本上是Executor 内部全部任务共享的,而每一个Executor 上能够支持的

任务的数量取决于Executor 所管理的CPU Core 资源的多少,所以你须要了解每一个任务的

数据规模的大小,从而推算出每一个Executor 大体须要多少内存便可知足基本的需求。

如何知道每一个任务所需内存的大小呢,这个很难统一的衡量,由于除了数据集自己的开销,

还包括算法所需各类临时内存空间的使用,而根据具体的代码算法等不一样,临时内存空间的

开销也不一样。可是数据集自己的大小,对最终所需内存的大小仍是有必定的参考意义的。

一般来讲每一个分区的数据集在内存中的大小,多是其在磁盘上源数据大小的若干倍(不考

虑源数据压缩,Java 对象相对于原始裸数据也还要算上用于管理数据的数据结构的额外开

销),须要准确的知道大小的话,能够将RDD cache 在内存中,从BlockManager 的Log

输出能够看到每一个Cache 分区的大小(其实也是估算出来的,并不彻底准确)

如: BlockManagerInfo: Added rdd_0_1 on disk on sr438:41134 (size: 495.3 MB)

反过来讲,若是你的Executor 的数量和内存大小受机器物理配置影响相对固定,那么你就

须要合理规划每一个分区任务的数据规模,例如采用更多的分区,用增长任务数量(进而须要

更多的批次来运算全部的任务)的方式来减少每一个任务所需处理的数据大小。

spark.storage.memoryFraction

如前面所说spark.executor.memory 决定了每一个Executor 可用内存的大小,而

spark.storage.memoryFraction 则决定了在这部份内存中有多少能够用于Memory Store 管

理RDD Cache 数据,剩下的内存用来保证任务运行时各类其它内存空间的须要。

 

spark.executor.memoryFraction 默认值为0.6,官方文档建议这个比值不要超过JVM Old

Gen 区域的比值。这也很容易理解,由于RDD Cache 数据一般都是长期驻留内存的,理论

上也就是说最终会被转移到Old Gen 区域(若是该RDD 尚未被删除的话),若是这部分

数据容许的尺寸太大,势必把Old Gen 区域占满,形成频繁的FULL GC。

如何调整这个比值,取决于你的应用对数据的使用模式和数据的规模,粗略的来讲,若是频

繁发生Full GC,能够考虑下降这个比值,这样RDD Cache 可用的内存空间减小(剩下的

部分Cache 数据就须要经过Disk Store 写到磁盘上了),会带来必定的性能损失,可是腾

出更多的内存空间用于执行任务,减小Full GC 发生的次数,反而可能改善程序运行的总体

性能

spark.streaming.blockInterval

这个参数用来设置Spark Streaming 里Stream Receiver 生成Block 的时间间隔,默认为

200ms。具体的行为表现是具体的Receiver 所接收的数据,每隔这里设定的时间间隔,就

从Buffer 中生成一个StreamBlock 放进队列,等待进一步被存储到BlockManager 中供后

续计算过程使用。理论上来讲,为了每一个Streaming Batch 间隔里的数据是均匀的,这个

时间间隔固然应该能被Batch 的间隔时间长度所整除。整体来讲,若是内存大小够用,

Streaming 的数据来得及处理,这个blockInterval 时间间隔的影响不大,固然,若是数据

Cache Level 是Memory+Ser,即作了序列化处理,那么BlockInterval 的大小会影响序列化

后数据块的大小,对于Java 的GC 的行为会有一些影响。

此外spark.streaming.blockQueueSize 决定了在StreamBlock 被存储到BlockMananger 之

前,队列中最多能够容纳多少个StreamBlock。默认为10,由于这个队列Poll 的时间间隔

是100ms,因此若是CPU 不是特别繁忙的话,基本上应该没有问题。

压缩和序列化相关

spark.serializer

默认为org.apache.spark.serializer.JavaSerializer, 可选

org.apache.spark.serializer.KryoSerializer, 实际上只要是org.apache.spark.serializer 的子

类就能够了,不过若是只是应用,大概你不会本身去实现一个的。

序列化对于spark 应用的性能来讲,仍是有很大影响的,在特定的数据格式的状况

下,KryoSerializer 的性能能够达到JavaSerializer 的10 倍以上,固然放到整个Spark 程序中

来考量,比重就没有那么大了,可是以Wordcount 为例,一般也很容易达到30%以上的性能

提高。而对于一些Int 之类的基本类型数据,性能的提高就几乎能够忽略了。KryoSerializer

依赖Twitter 的Chill 库来实现,相对于JavaSerializer,主要的问题在于不是全部的Java

Serializable 对象都能支持。

须要注意的是,这里可配的Serializer 针对的对象是Shuffle 数据,以及RDD Cache 等场

合,而Spark Task 的序列化是经过spark.closure.serializer 来配置,可是目前只支持

JavaSerializer,因此等于无法配置啦。

更多Kryo 序列化相关优化配置,能够参

考http://spark.apache.org/docs/latest/tuning.html#data-serialization 一节

spark.rdd.compress

这个参数决定了RDD Cache 的过程当中,RDD 数据在序列化以后是否进一步进行压缩再储

存到内存或磁盘上。固然是为了进一步减少Cache 数据的尺寸,对于Cache 在磁盘上而言,

绝对大小大概没有太大关系,主要是考虑Disk 的IO 带宽。而对于Cache 在内存中,那主

要就是考虑尺寸的影响,是否可以Cache 更多的数据,是否能减少Cache 数据对GC 形成

的压力等。

这二者,前者一般不会是主要问题,尤为是在RDD Cache 自己的目的就是追求速度,减小

重算步骤,用IO 换CPU 的状况下。然后者,GC 问题固然是须要考量的,数据量小,占用

空间少,GC 的问题大概会减轻,可是是否真的须要走到RDD Cache 压缩这一步,或许用

其它方式来解决可能更加有效。

因此这个值默认是关闭的,可是若是在磁盘IO 的确成为问题或者GC 问题真的没有其它更

好的解决办法的时候,能够考虑启用RDD 压缩。

spark.broadcast.compress

是否对Broadcast 的数据进行压缩,默认值为True。

Broadcast 机制是用来减小运行每一个Task 时,所须要发送给TASK 的RDD 所使用到的相

关数据的尺寸,一个Executor 只须要在第一个Task 启动时,得到一份Broadcast 数据,

以后的Task 都从本地的BlockManager 中获取相关数据。在1.1 版本之后的代码中,RDD

自己也改成以Broadcast 的形式发送给Executor(以前的实现RDD 自己是随每一个任务发送

的),所以基本上不太须要显式的决定哪些数据须要broadcast 了。

由于Broadcast 的数据须要经过网络发送,而在Executor 端又须要存储在本地

BlockMananger 中,加上最新的实现,默认RDD 经过Boradcast 机制发送,所以大大增长

了Broadcast 变量的比重,因此经过压缩减少尺寸,来减小网络传输开销和内存占用,一般

都是有利于提升总体性能的。

什么状况可能不压缩更好呢,大体上我的以为一样仍是在网络带宽和内存不是问题的时候,

若是Driver 端CPU 资源很成问题(毕竟压缩的动做基本都在Driver 端执行),那或许有调

整的必要。

spark.io.compression.codec

RDD Cache 和Shuffle 数据压缩所采用的算法Codec,默认值曾经是使用LZF 做为默认

Codec,最近由于LZF 的内存开销的问题,默认的Codec 已经改成Snappy。

LZF 和Snappy 相比较,前者压缩率比较高(固然要看具体数据内容了,一般要高20%左

右),可是除了内存问题之外,CPU 代价也大一些(大概也差20%~50%?)

在用于Shuffle 数据的场合下,内存方面,应该主要是在使用HashShuffleManager 的时候

有可能成为问题,由于若是Reduce 分区数量巨大,须要同时打开大量的压缩数据流用于写

文件,进而在Codec 方面须要大量的buffer。可是若是使用SortShuffleManager,因为shuffle

文件数量大大减小,不会产生大量的压缩数据流,因此内存开销大概不会成为主要问题。

剩下的就是CPU 和压缩率的权衡取舍,和前面同样,取决于CPU/网络/磁盘的能力和负载,

我的认为CPU 一般更容易成为瓶颈。因此要调整性能,要不不压缩,要不使用Snappy 可

能性大一些?

对于RDD Cache 的场合来讲,绝大多数场合都是内存操做或者本地IO,因此CPU 负载的

问题可能比IO 的问题更加突出,这也是为何spark.rdd.compress 自己默认为不压缩,

若是要压缩,大概也是Snappy 合适一些?

 

schedule 调度相关

大概会是你针对本身的集群第一步就会配置的参数,这里多少就其内部机制作一些解释。

 

三、spark.cores.max

一个集群最重要的参数之一,固然就是CPU 计算资源的数量。spark.cores.max 这个参数

决定了在Standalone 和Mesos 模式下,一个Spark 应用程序所能申请的CPU Core 的数

量。若是你没有并发跑多个Spark 应用程序的需求,那么能够不须要设置这个参数,默认

会使用spark.deploy.defaultCores 的值(而spark.deploy.defaultCores 的值默认为Int.Max,

也就是不限制的意思)从而应用程序可使用全部当前能够得到的CPU 资源。

针对这个参数须要注意的是,这个参数对Yarn 模式不起做用,YARN 模式下,资源由Yarn

统一调度管理,一个应用启动时所申请的CPU 资源的数量由另外两个直接配置Executor

的数量和每一个Executor 中core 数量的参数决定。

SPARK_EXECUTOR_INSTANCES/SPARK_EXECUTOR_CORES

--num-executors / --executor-cores

(历史缘由形成,不一样运行模式下的一些启动参数我的认为还有待进一步整合)

此外,在Standalone 模式等后台分配CPU 资源时,目前的实现中,在spark.cores.max

容许的范围内,基本上是优先从每一个Worker 中申请所能获得的最大数量的CPU core 给每

个Executor,所以若是人工限制了所申请的Max Core 的数量小于Standalone 和Mesos

模式所管理的CPU 数量,可能发生应用只运行在集群中部分节点上的状况(由于部分节点

所能提供的最大CPU 资源数量已经知足应用的要求),而不是平均分布在集群中。一般这

不会是太大的问题,可是若是涉及数据本地性的场合,有可能就会带来必定的必须进行远程

数据读取的状况发生。理论上,这个问题能够经过两种途径解决:一是Standalone 和Mesos

的资源管理模块自动根据节点资源状况,均匀分配和启动Executor,二是和Yarn 模式同样,

容许用户指定和限制每一个Executor 的Core 的数量。

spark.task.cpus

这个参数在字面上的意思就是分配给每一个任务的CPU 的数量,默认为1。实际上,这个参

数并不能真的控制每一个任务实际运行时所使用的CPU 的数量,好比你能够经过在任务内部

建立额外的工做线程来使用更多的CPU(至少目前为止,未来任务的执行环境是否能经过

LXC 等技术来控制还很差说)。它所发挥的做用,只是在做业调度时,每分配出一个任务

时,对已使用的CPU 资源进行计数。也就是说只是理论上用来统计资源的使用状况,便于

安排调度。所以,若是你指望经过修改这个参数来加快任务的运行,那仍是赶忙换个思路吧。

这个参数的意义,我的以为仍是在你真的在任务内部本身经过任何手段,占用了更多的CPU

资源时,让调度行为更加准确的一个辅助手段。

spark.scheduler.mode

这个参数决定了单个Spark 应用内部调度的时候使用FIFO 模式仍是Fair 模式。是的,你

没有看错,这个参数只管理一个Spark 应用内部的多个没有依赖关系的Job 做业的调度策

略。

若是你须要的是多个Spark 应用之间的调度策略,那么在Standalone 模式下,这取决于每

个应用所申请和得到的CPU 资源的数量(暂时没有得到资源的应用就Pending 在那里了),

基本上就是FIFO 形式的,谁先申请和得到资源,谁就占用资源直到完成。而在Yarn 模式

下,则多个Spark 应用间的调度策略由Yarn 本身的策略配置文件所决定。

那么这个内部的调度逻辑有什么用呢?若是你的Spark 应用是经过服务的形式,为多个用

户提交做业的话,那么能够经过配置Fair 模式相关参数来调整不一样用户做业的调度和资源

分配优先级。

spark.locality.wait

spark.locality.wait 和spark.locality.wait.process,spark.locality.wait.node,

spark.locality.wait.rack 这几个参数影响了任务分配时的本地性策略的相关细节。

Spark 中任务的处理须要考虑所涉及的数据的本地性的场合,基本就两种,一是数据的来源

是HadoopRDD; 二是RDD 的数据来源来自于RDD Cache(即由CacheManager 从

BlockManager 中读取,或者Streaming 数据源RDD)。其它状况下,若是不涉及shuffle

操做的RDD,不构成划分Stage 和Task 的基准,不存在判断Locality 本地性的问题,而

若是是ShuffleRDD,其本地性始终为No Prefer,所以其实也无所谓Locality。

在理想的状况下,任务固然是分配在能够从本地读取数据的节点上时(同一个JVM 内部或

同一台物理机器内部)的运行时性能最佳。可是每一个任务的执行速度没法准确估计,因此很

难在事先得到全局最优的执行策略,当Spark 应用获得一个计算资源的时候,若是没有可

以知足最佳本地性需求的任务能够运行时,是退而求其次,运行一个本地性条件稍差一点的

任务呢,仍是继续等待下一个可用的计算资源已指望它能更好的匹配任务的本地性呢?

这几个参数一块儿决定了Spark 任务调度在获得分配任务时,选择暂时不分配任务,而是等

待得到知足进程内部/节点内部/机架内部这样的不一样层次的本地性资源的最长等待时间。默

认都是3000 毫秒。

基本上,若是你的任务数量较大和单个任务运行时间比较长的状况下,单个任务是否在数据

本地运行,代价区别可能比较显著,若是数据本地性不理想,那么调大这些参数对于性能优

化可能会有必定的好处。反之若是等待的代价超过带来的收益,那就不要考虑了。

特别值得注意的是:在处理应用刚启动后提交的第一批任务时,因为看成业调度模块开始工

做时,处理任务的Executors 可能尚未彻底注册完毕,所以一部分的任务会被放置到No

Prefer 的队列中,这部分任务的优先级仅次于数据本地性知足Process 级别的任务,从而

被优先分配到非本地节点执行,若是的确没有Executors 在对应的节点上运行,或者的确是

No Prefer 的任务(如shuffleRDD),这样作确实是比较优化的选择,可是这里的实际状况

只是这部分Executors 还没来得及注册上而已。这种状况下,即便加大本节中这几个参数的

数值也没有帮助。针对这个状况,有一些已经完成的和正在进行中的PR 经过例如动态调整

No Prefer 队列,监控节点注册比例等等方式试图来给出更加智能的解决方案。不过,你也

能够根据自身集群的启动状况,经过在建立SparkContext 以后,主动Sleep 几秒的方式来

简单的解决这个问题。

spark.speculation

spark.speculation 以及spark.speculation.interval, spark.speculation.quantile,

spark.speculation.multiplier 等参数调整Speculation 行为的具体细节,Speculation 是在任

务调度的时候,若是没有适合当前本地性要求的任务可供运行,将跑得慢的任务在空闲计算

资源上再度调度的行为,这些参数调整这些行为的频率和判断指标,默认是不使用

Speculation 的。

一般来讲很难正确的判断是否须要Speculation,能真正发挥Speculation 用处的场合,往

往是某些节点因为运行环境缘由,好比CPU 资源因为某种缘由被占用,磁盘损坏致使IO

缓慢形成任务执行速度异常的状况,固然前提是你的分区任务不存在仅能被执行一次,或者

不能同时执行多个拷贝等状况。Speculation 任务参照的指标一般是其它任务的执行时间,

而实际的任务可能因为分区数据尺寸不均匀,原本就会有时间差别,加上必定的调度和IO

的随机性,因此若是一致性指标定得过严,Speculation 可能并不能真的发现问题,反而增

加了没必要要的任务开销,定得过宽,大概又基本至关于没用。

我的以为,若是你的集群规模比较大,运行环境复杂,的确可能常常发生执行异常,加上数

据分区尺寸差别不大,为了程序运行时间的稳定性,那么能够考虑仔细调整这些参数。不然

仍是考虑如何排除形成任务执行速度异常的因数比较靠铺一些。

相关文章
相关标签/搜索