因为大多数 Spark 计算的内存性质, Spark 程序可能由集群中的任何资源( CPU ,网络带宽或内存)致使瓶颈。 一般状况下,若是数据有合适的内存,瓶颈就是网络带宽,但有时您还须要进行一些调整,例如 以序列化形式存储 RDD 来减小内存的使用。 本指南将涵盖两个主要的主题:数据序列化,这对于良好的网络性能相当重要,而且还能够减小内存使用和内存优化。 咱们选几个较小的主题进行展开。html
序列化在任何分布式应用程序的性能中起着重要的做用。 很慢的将对象序列化或消费大量字节的格式将会大大减慢计算速度。 一般,这多是您优化 Spark 应用程序的第一件事。 Spark 宗旨在于方便和性能之间取得一个平衡(容许您使用操做中的任何 Java 类型)。 它提供了两种序列化库:java
ObjectOutputStream
框架的 Spark 序列化对象,而且能够与您建立的任何实现 java.io.Serializable
的类一块儿使用。 您还能够经过扩展 java.io.Externalizable
来更紧密地控制序列化的性能。 Java 序列化是灵活的,但一般至关缓慢,并致使许多类的大型序列化格式。Serializable
类型,而且须要先注册您将在程序中使用的类以得到最佳性能。您能够经过使用 SparkConf 初始化做业 并进行调用来切换到使用 Kryo conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
。此设置配置用于不只在工做节点之间进行洗牌数据的串行器,并且还将 RDD 序列化到磁盘。 Kryo 不是默认的惟一缘由是由于自定义注册要求,但咱们建议您尝试在任何网络密集型应用程序。自从 Spark 2.0.0 以来,咱们在使用简单类型,简单类型的数组或字符串类型对RDD进行混洗时,内部使用 Kryo serializer 。git
Spark 自动包含 Kryo 序列化器,用于 Twitter chill 中 AllScalaRegistrar 涵盖的许多经常使用的核心 Scala 类。github
要使用 Kryo 注册本身的自定义类,请使用该 registerKryoClasses
方法。apache
val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)
所述 Kryo 文档 描述了更先进的注册选项,如添加自定义序列的代码。api
若是您的对象很大,您可能还须要增长 spark.kryoserializer.buffer
配置。该值须要足够大才能容纳您将序列化的最大对象。数组
最后,若是您没有注册自定义类, Kryo 仍然能够工做,但它必须存储每一个对象的完整类名称,这是浪费的。缓存
有三个方面的考虑在调整内存使用:该量的存储你的对象所使用的(你可能但愿你的整个数据集,以适应在内存中),则成本访问这些对象,而且开销垃圾收集(若是你有高成交物品条款)。安全
默认状况下, Java 对象能够快速访问,但能够轻松地消耗比其字段中的 “raw” 数据多2-5倍的空间。这是因为如下几个缘由:服务器
Int
字段),这能够比数据大。String
在原始字符串数据上具备大约40字节的开销(由于它们存储在 Char
数组中并保留额外的数据,例如长度),而且因为 UTF-16 的内部使用而将每一个字符存储为两个字节 String
编码。所以,一个10个字符的字符串能够容易地消耗60个字节。HashMap
和 LinkedList
,使用连接的数据结构,其中每一个条目(例如: Map.Entry
)存在”包装器”对象。该对象不只具备 header ,还包括指针(一般为8个字节)到列表中的下一个对象。java.lang.Integer
。本节将从 Spark 的内存管理概述开始,而后讨论用户能够采起的具体策略,以便在他/她的应用程序中更有效地使用内存。具体来讲,咱们将描述如何肯定对象的内存使用状况,以及如何改进数据结构,或经过以串行格式存储数据。而后咱们将介绍调整 Spark 的缓存大小和 Java 垃圾回收器。
Spark 中的内存使用大部分属于两类:执行和存储。执行存储器是指用于以混洗,链接,排序和聚合计算的存储器,而存储内存是指用于在集群中缓存和传播内部数据的内存。在 Spark 中,执行和存储共享一个统一的区域(M)。当没有使用执行存储器时,存储器能够获取全部可用的存储器,反之亦然。若是须要,执行能够驱逐存储,但只有在总存储内存使用量低于某个阈值(R)以前。换句话说, R
描述 M
缓存块永远不会被驱逐的区域。因为实施的复杂性,存储不得驱逐执行。
该设计确保了几个理想的性能。首先,不使用缓存的应用程序能够将整个空间用于执行,从而避免没必要要的磁盘泄漏。第二,使用缓存的应用程序能够保留最小的存储空间(R),其中数据块不受驱逐。最后,这种方法为各类工做负载提供了合理的开箱即用性能,而不须要用户内部如何分配内存的专业知识。
虽然有两种相关配置,但典型用户不须要调整它们,由于默认值适用于大多数工做负载:
spark.memory.fraction
表示大小 M
(JVM堆空间 - 300MB)(默认为0.6)的一小部分。剩余的空间(40%)保留用于用户数据结构,Spark中的内部元数据,而且在稀疏和异常大的记录的状况下保护OOM错误。spark.memory.storageFraction
表示大小 R
为 M
(默认为0.5)的一小部分。 R
是 M
缓存块中的缓存被执行驱逐的存储空间。spark.memory.fraction
应该设置值,以便在 JVM 的旧版或”终身”版本中温馨地适应这一堆堆空间。有关详细信息,请参阅下面高级 GC 调优的讨论。
大小数据集所需的内存消耗量的最佳方式是建立 RDD ,将其放入缓存中,并查看 Web UI 中的“存储”页面。该页面将告诉您 RDD 占用多少内存。
为了估计特定对象的内存消耗,使用 SizeEstimator
的 estimate
方法这是用于与不一样的数据布局试验修剪内存使用状况,以及肯定的空间的广播变量将占据每一个执行器堆的量是有用的。
减小内存消耗的第一种方法是避免添加开销的 Java 功能,例如基于指针的数据结构和包装对象。有几种方法能够作到这一点:
HashMap
)。该 fastutil 库提供方便的集合类基本类型是与 Java 标准库兼容。-XX:+UseCompressedOops
,使指针为4个字节而不是8个字节。您能够添加这些选项 spark-env.sh
当您的对象仍然太大而没法有效存储,尽管这种调整,减小内存使用的一个更简单的方法是以序列化形式存储它们,使用 RDD 持久性 API 中的序列化 StorageLevel ,例如: MEMORY_ONLY_SER
。 Spark 将会将每一个 RDD 分区存储为一个大字节数组。以序列化形式存储数据的惟一缺点是访问时间较短,由于必须对每一个对象进行反序列化。若是您想以序列化形式缓存数据,咱们强烈建议使用 Kryo ,由于它致使比 Java 序列化更小的尺寸(并且确定比原 Java 对象)更小。
当您的程序存储的 RDD 有很大的”流失”时, JVM 垃圾收集多是一个问题。(程序中一般没有问题,只读一次 RDD ,而后在其上运行许多操做)。 当 Java 须要驱逐旧对象为新的对象腾出空间时,须要跟踪全部 Java 对象并找到未使用的。要记住的要点是,垃圾收集的成本与 Java 对象的数量成正比,所以使用较少对象的数据结构(例如: Ints
数组,而不是 LinkedList
)大大下降了此成本。 一个更好的方法是如上所述以序列化形式持久化对象:如今每一个 RDD 分区只有一个对象(一个字节数组)。 在尝试其余技术以前,若是 GC 是一个问题,首先要使用序列化缓存。
因为任务的工做记忆(运行任务所需的空间)和缓存在节点上的 RDD 之间的干扰, GC 也多是一个问题。咱们将讨论如何控制分配给RDD缓存的空间来减轻这一点。
测量 GC 的影响
GC 调整的第一步是收集关于垃圾收集发生频率和GC花费的时间的统计信息。这能够经过添加 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
到 Java 选项来完成。(有关将 Java 选项传递给 Spark 做业的信息,请参阅配置指南)下次运行 Spark 做业时,每当发生垃圾回收时,都会看到在工做日志中打印的消息。请注意,这些日志将在您的群集的工做节点上( stdout
在其工做目录中的文件中),而不是您的驱动程序。
高级 GC 优化
为了进一步调整垃圾收集,咱们首先须要了解一些关于 JVM 内存管理的基本信息:
Java堆空间分为两个区域 Young 和 Old 。 Young 一代的目的是持有短命的物体,而 Old 一代的目标是使用寿命更长的物体。
Young 一代进一步分为三个区域[ Eden , Survivor1 , Survivor2 ]。
垃圾收集过程的简化说明:当 Eden 已满时, Eden 上运行了一个小型 GC ,并将 Eden 和 Survivor1 中存在的对象复制到 Survivor2 。幸存者地区被交换。若是一个对象足够老,或者 Survivor2 已满,则会移动到 Old 。最后,当 Old 接近满时,一个完整的 GC 被调用。
Spark 中 GC 调优的目的是确保只有长寿命的 RDD 存储在 Old 版本中,而且 Young 版本的大小足够存储短命期的对象。这将有助于避免使用完整的 GC 来收集任务执行期间建立的临时对象。可能有用的一些步骤是:
经过收集 GC 统计信息来检查垃圾收集是否太多。若是在任务完成以前屡次调用完整的 GC ,这意味着没有足够的可用于执行任务的内存。
若是过小的集合太多,而不是不少主要的 GC ,为 Eden 分配更多的内存将会有所帮助。您能够将 Eden 的大小设置为对每一个任务须要多少内存的估计。若是肯定 Eden 的大小 E
,那么您可使用该选项设置年轻一代的大小 -Xmn=4/3*E
。(按比例增长4/3是考虑幸存者地区使用的空间。)
在打印的 GC 统计信息中,若是 OldGen 接近于满,则经过下降减小用于缓存的内存量 spark.memory.fraction
; 缓存较少的对象比减慢任务执行更好。或者,考虑减小年轻一代的大小。这意味着 -Xmn
若是您将其设置为如上所述下降。若是没有,请尝试更改 JVM NewRatio
参数的值。许多 JVM 默认为2,这意味着 Old 版本占据堆栈的2/3。它应该足够大,使得该分数超过 spark.memory.fraction
。
尝试使用 G1GC 垃圾回收器 -XX:+UseG1GC
。在垃圾收集是瓶颈的一些状况下,它能够提升性能. 请注意,对于大型 excutor 的堆大小,经过设置 -XX:G1HeapRegionSize 参数来增长 G1 区域的大小 是很是重要的
例如,若是您的任务是从 HDFS 读取数据,则可使用从 HDFS 读取的数据块的大小来估计任务使用的内存量。请注意,解压缩块的大小一般是块大小的2或3倍。因此若是咱们但愿有3或4个任务的工做空间,而 HDFS 块的大小是128MB,咱们能够估计 Eden 的大小4*3*128MB
。
监控垃圾收集的频率和时间如何随着新设置的变化而变化。
咱们的经验代表, GC 调整的效果取决于您的应用程序和可用的内存量。有更多的优化选项 在线描述,但在较高的水平,管理完整的 GC 如何常常发生能够减小开销帮助。
能够经过spark.executor.extraJavaOptions
在做业的配置中设置来指定执行器的 GC 调整标志。
集群不会被充分利用,除非您将每一个操做的并行级别设置得足够高。自动星火设置的 “地图” 任务的数量根据其大小对每一个文件运行(尽管你能够经过可选的参数来控制它 SparkContext.textFile
,等等),以及用于分布式”减小”操做,如: groupByKey
和 reduceByKey
,它采用了最大父 RDD 的分区数。您能够将并行级别做为第二个参数传递(请参阅 spark.PairRDDFunctions
文档),或者将 config 属性设置 spark.default.parallelism
为更改默认值。通常来讲,咱们建议您的群集中每一个 CPU 内核有2-3个任务。
有时,您将获得一个 OutOfMemoryError ,由于您的 RDD 不适合内存,而是由于您的其中一个任务的工做集(如其中一个 reduce 任务groupByKey
)太大。 Spark 的 shuffle 操做(sortByKey
, groupByKey
, reduceByKey
, join
,等)创建每一个任务中的哈希表来进行分组,而这每每是很大的。这里最简单的解决方案是增长并行级别,以便每一个任务的输入集都更小。 Spark 能够有效地支持短达200 ms 的任务,由于它能够将多个任务中的一个执行者JVM重用,而且任务启动成本低,所以您能够将并行级别安全地提升到集群中的核心数量。
使用 可用的广播功能 SparkContext
能够大大减小每一个序列化任务的大小,以及在群集上启动做业的成本。若是您的任务使用其中的驱动程序中的任何大对象(例如:静态查找表),请考虑将其变为广播变量。 Spark 打印主机上每一个任务的序列化大小,所以您能够查看该任务以决定您的任务是否过大; 通常任务大于20 KB大概值得优化。
数据本地化可能会对 Spark job 的性能产生重大影响。若是数据和在其上操做的代码在一块儿,则计算每每是快速的。但若是代码和数据分开,则必须移动到另外一个。一般,代码大小远小于数据,所以将数据代码从一个地方寄送到另外一个地方比一大块数据更快。 Spark 围绕数据局部性的通常原则构建其调度。
数据本地化是指数据和代码处理有多近。根据数据的当前位置有几个地方级别。从最近到最远的顺序:
PROCESS_LOCAL
数据与运行代码在同一个 JVM 中。这是可能的最好的地方NODE_LOCAL
数据在同一个节点上。示例可能在同一节点上的 HDFS 或同一节点上的另外一个执行程序中。这比 PROCESS_LOCAL
由于数据必须在进程之间移动慢一些NO_PREF
数据从任何地方一样快速访问,而且没有本地偏好RACK_LOCAL
数据位于同一机架上的服务器上。数据位于同一机架上的不一样服务器上,所以须要经过网络发送,一般经过单个交换机发送ANY
数据在网络上的其余地方,而不在同一个机架中Spark 喜欢将全部 task 安排在最佳的本地级别,但这并不老是可能的。在任何空闲 executor 中没有未处理数据的状况下, Spark 将切换到较低的本地级别。有两个选项: a )等待一个繁忙的 CPU 释放在相同服务器上的数据上启动任务,或者 b )当即在更远的地方启动一个新的任务,须要在那里移动数据。
Spark 一般作的是等待一个繁忙的 CPU 释放的但愿。一旦超时,它将开始将数据从远处移动到可用的 CPU 。每一个级别之间的回退等待超时能够在一个参数中单独配置或所有配置; 有关详细信息,请参阅配置页面 spark.locality
上的 参数。若是您的 task 很长,而且本地化差,您应该增长这些设置,但默认值一般会很好。
这是一个简短的指南,指出调整 Spark 应用程序时应该了解的主要问题 - 最重要的是数据序列化和内存调整。对于大多数程序,以序列化形式切换到 Kryo 序列化和持久化数据将会解决大多数常见的性能问题。随时在 Spark 邮件列表中询问有关其余调优最佳作法的信息。