http://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operationshtml
一些spark的特定操做将会触发被称为shuffle的事件。Shuffle是Spark用于从新分布数据的机制,这样能够在不一样的分区来分组。这一般涉及到在executor和机器之间进行拷贝数据,因此shuffle是一个很复杂而且消耗高的操做。web
为了了解shuffle期间发生了什么,咱们能够考虑reduceByKey操做做为例子。reduceByKey操做生成了一个新的RDD经过全部的单个键值组合为一个元组-关键字和针对与该关键字相关的全部值执行reduce函数的结果。这里的挑战不是全部的值对一个单独的键都在同一个分区上或者甚至说在一台机器上,而是它们必须被从新分布来计算结果。
在Spark,数据一般不会跨分区分布到特定操做的必要位置。在计算中,一个单独的任务将会在一个单独的分区上操做-然而为了组织全部的数据来被一个的单独reduceByKey 的reduce任务来执行,Spark须要来执行一个all-to-all操做。它必须读取全部分区来找到全部键的值,而后将它们带到一块儿跨分区来为每个键计算最终的结果---这个被称为shuffle。
尽管在每个分区中的新的shuffled数据的元素集是很重要的,一样分区本身的顺序也很重要,而元素之间的顺序就不是了。若是一个想要预测shuffle中的顺序数据那么可使用: 算法
能够引发一个shuffle 的操做包括:repartition 和 coalesce,ByKey的操做,除了counting以外的好比:groupByKey 和reduceByKey,以及join操做好比cogroup 和 join。apache
Shuffle是一个昂贵的操做由于它涉及到磁盘I/O,数据序列化和网络I/O。为了给shuffle组织数据,spark生成一系列任务-maps用于组织数据,以及一系列reduce任务来汇集它。这个命名系统来自于MapReduce并且并不直接和SparK的map,reduce操做有关。
在内部,单独的map任务的结果会被保存在内存中直到它们不适用。而后这些结果会被根据目标分区排序而且写向单一的文件。在reduce方面,任务读取相关的排序块。
必定的shuffle操做会消耗明显的数量的堆内存由于它们使用的是在内存中的数据结构来组织记录在传输以前或者以后。明显的,reduceByKey和AggregateByKey创造了这些结构在map阶段,以及 ‘Bykey的操做生成了它们在reduce阶段。当数据不能放进内存中时,Spark将会将这些表散落到硬盘中,会引发而外的磁盘I/O和增长垃圾回收次数。
Shuffle一样会生成大量的中间文件在磁盘中。从Spark1.3开始,这些文件被保存直到对应的RDDs再也不被使用以及已经被垃圾回收了。这样作是为了shuffle文件不须要被从新创造若是lineage被从新计算时。垃圾回收也许会发生只有在一段很长时间,若是这个应用保留了对RDD的引用或者若是GC没有频繁的发生。这意味着长期运行的spark任务也许会消耗大量的磁盘空间。这个零时的磁盘目录会被spark.local.dir参数所指定。
Shuffle行为能够被调整经过一系列的参数。能够参考 Spark Configuration Guide.‘Shuffle Behavior’章节。缓存
属性名称Property Name | 默认值Default | 含义Meaning |
---|---|---|
spark.reducer.maxSizeInFlight | 48m | 从每个reduce任务中同步获取的map输出的最大值。因为每个输出须要咱们创造一个缓存来接受它,这个表明了每一个任务的固定的内存开销,因此尽可能保证它较小除非你有不少内存。 |
spark.reducer.maxReqsInFlight | Int.MaxValue | 这个配置限制了任意给定点远程请求获取块数。当集群中的主机数量增长的时候,它也许会致使一个很是大数量的内部链接到一到多个节点,引发worker在负载下失败。经过容许它来限制获取请求的数量,这个状况也许会缓解 |
spark.reducer.maxBlocksInFlightPerAddress | Int.MaxValue | 这个配置限制了每个从给定端口里的的reduce任务能够获取的远程端口数量。当一个大量的block被一个给定的地址在一次单独获取或者同步获取所请求时,可能会冲垮服务的executor或者Node Manager。这个配置对于减小Node Manager的负载尤其有用当外部的shuffle是被容许的。你能够经过设定一个较低值来减轻这个状况。 |
spark.maxRemoteBlockSizeFetchToMem | Long.MaxValue | 远程的块将会被获取到磁盘中,当这个块的大小超过了这个配置的值在byte单位上。这个用于避免一个巨大的请求占据了太多的内存。咱们能够将这个配置为一个指定的值(好比,200M)。注意到这个配置将会影响到shuffle的获取以及远程块获取的块管理。对于容许了外部shuffle服务的用户,这个特性只会在外部shuffle服务版本高于Spark2。2时有效。 |
spark.shuffle.compress | true | 是否压缩map的输出文件,一般是一个好想法。压缩将会使用spark.io.compression.codec. |
spark.shuffle.file.buffer | 32k | 对每个shuffle文件输出流的在内存中的缓存大小,单位是KiB除非有其余的特别指定。这些缓存减小了硬盘查找和系统调用建立中间shuffle文件的过程。 |
spark.shuffle.io.maxRetries | 3 | (Netty only)最大自动重复尝试的次数若是这个值没有被设置为0.这个重试逻辑有助于稳定大型的shuffle在长时间的GC暂停或者暂时的网络链接问题上。 |
spark.shuffle.io.numConnectionsPerPeer | 1 | (Netty only) 节点之间的链接的重复使用为了减小大型集群中重复创建链接的状况。对于有不少硬盘和不多主机的集群,这个将会致使并发行不足以饱和全部硬盘,所以用户可能会考虑增长这个值。 |
spark.shuffle.io.preferDirectBufs | true | (Netty only) 堆外缓冲区在shuffle和缓存块转移期间被用于减小垃圾回收。对于对外缓存内存数量有限的环境,用户也许想要关掉这个来强迫全部的来自于Netty的分配都是在堆上。 |
spark.shuffle.io.retryWait | 5s | (Netty only) 在每一次重试直接须要等待多久。最大的延迟时间默认是15秒,maxRetries * retryWait. |
spark.shuffle.service.enabled | false | 容许外部shuffle服务。这个服务保存了经过executor所写的shuffle文件,这样这个executor能够安全的被移除。这个配置必须被容许若是spark.dynamicAllocation.enabled是“true”。这个外部的shuffle服务必须被启动。查看dynamic allocation configuration and setup documentation 来得到更多信息。 |
spark.shuffle.service.port | 7337 | 外部shuffle服务将会运行的端口。 |
spark.shuffle.service.index.cache.size | 100m | 缓存条目限制在指定的内存占用空间中,以字节为单位 |
spark.shuffle.maxChunksBeingTransferred | Long.MAX_VALUE | 在shuffle服务中同一时间最大容许传输的块数量。注意到新来的链接将会被关闭若是达到了最大数量。这个客户端将会尝试从新链接根据shuffle的重试配置(see spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait),若是这个限制也被达到了,那么这个任务将会失败。 |
spark.shuffle.sort.bypassMergeThreshold | 200 | (Advanced)在基于排序的shuffle管理中,避免合并排序数据若是这里没有map-side的聚合和这里最多有配置的这么多的reduce分区。 |
spark.shuffle.spill.compress | true | 是否压缩溢出的数据在shuffle期间 |
spark.shuffle.accurateBlockThreshold | 100 * 1024 * 1024 | 阀值是以bytes为单位,高于此值将准确记录HighlyCompressedMapStatus中的shuffle块的大小。这个用于帮助阻止OOM经过避免错误估计了shuffle块大小当获取了shuffle块时。 |
spark.shuffle.registration.timeout | 5000 | 注册外部shuffle服务的超时时间,单位是毫秒 |
spark.shuffle.registration.maxAttempts | 3 | 当注册外部shuffle服务失败的时候,咱们会重复尝试的最大次数 |
spark.io.encryption.enabled | false | 容许IO编码。目前支持全部的模式除了Mesos。当使用这个特性的时候,咱们推荐RPC编码。 |
spark.io.encryption.keySizeBits | 128 | IO编码的值大小单位为bit。支持的值有128,192和256. |
spark.io.encryption.keygen.algorithm | HmacSHA1 | 当生成一个IO编码键值时使用的算法。被支持的算法在Java Cryptography Architecture Standard Algorithm Name 文档的KeyGenerator章节中被描述。 |