01 sparkAPI-阅读总结-TuningSpark

TuningSparkhtml

调整Spark

因为大多数Spark计算的内存特性,Spark程序可能会受到群集中任何资源的瓶颈:CPU,网络带宽或内存。大多数状况下,若是数据适合内存,瓶颈就是网络带宽,但有时候,您还须要进行一些调整,例如 以序列化形式存储RDD,以减小内存使用。本指南将介绍两个主要主题:数据序列化,这对于良好的网络性能相当重要,还能够减小内存使用和内存调整。咱们还草拟了几个较小的主题。java

数据序列化

序列化在任何分布式应用程序的性能中起着重要做用。将对象序列化或消耗大量字节的速度慢的格式将大大减慢计算速度。一般,这将是您应该优化Spark应用程序的第一件事。Spark旨在在便利性(容许您使用操做中的任何Java类型)和性能之间取得平衡。它提供了两个序列化库:git

  • Java序列化:默认状况下,Spark使用Java ObjectOutputStream框架序列化对象,而且能够与您建立的任何实现的类一块儿使用java.io.Serializable您还能够经过扩展来更紧密地控制序列化的性能 java.io.ExternalizableJava序列化是灵活的,但一般很慢,并致使许多类的大型序列化格式。
  • Kryo序列化:Spark还可使用Kryo库(版本4)更快地序列化对象。Kryo比Java序列化(一般高达10倍)明显更快,更紧凑,但不支持全部Serializable类型,而且须要您提早注册您将在程序中使用的类以得到最佳性能。

您能够经过使用SparkConf初始化做业 并调用来切换到使用Kryo conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")此设置配置序列化程序,不只用于在工做节点之间混洗数据,还用于将RDD序列化到磁盘。Kryo不是默认值的惟一缘由是由于自定义注册要求,但咱们建议在任何网络密集型应用程序中尝试它。从Spark 2.0.0开始,咱们在使用简单类型,简单类型数组或字符串类型对RDD进行混洗时,内部使用Kryo序列化程序。github

Spark自动包含Kryo序列化程序,用于来自Twitter chill的AllScalaRegistrar中涵盖的许多经常使用核心Scala类apache

要使用Kryo注册本身的自定义类,请使用该registerKryoClasses方法。api

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

所述KRYO文档描述了更先进的注册选项,如添加自定义序列的代码。数组

若是对象很大,则可能还须要增长spark.kryoserializer.buffer 配置此值必须足够大才能容纳要序列化最大对象。缓存

最后,若是你没有注册你的自定义类,Kryo仍然会工做,但它必须存储每一个对象的完整类名,这是浪费。安全

内存调整

有三个方面的考虑在调整内存使用:该的存储你的对象所使用的(你可能但愿你的整个数据集,以适应在内存中),则成本访问这些对象,而且开销垃圾收集(若是你有高成交对象的条款)。服务器

默认状况下,Java对象访问速度很快,但与其字段中的“原始”数据相比,能够轻松占用2-5倍的空间。这是因为如下几个缘由:

  • 每一个不一样的Java对象都有一个“对象头”,大约16个字节,并包含诸如指向其类的指针之类的信息。对于包含很是少数据的对象(好比一个Int字段),这可能比数据大。
  • Java String在原始字符串数据上有大约40字节的开销(由于它们将它存储在Char的数组中并保留额外的数据,例如长度),而且因为UTF-16的内部使用而将每一个字符存储为两个字节String编码。所以,10个字符的字符串很容易消耗60个字节。
  • 公共集合类,例如HashMapLinkedList,使用连接数据结构,其中每一个条目都有一个“包装”对象(例如Map.Entry)。此对象不只具备标题,还具备指向列表中下一个对象的指针(一般为8个字节)。
  • 原始类型的集合一般将它们存储为“盒装”对象,例如java.lang.Integer

本节将首先概述Spark中的内存管理,而后讨论用户能够采起的具体策略,以便在他/她的应用程序中更有效地使用内存。特别是,咱们将描述如何肯定对象的内存使用状况,以及如何经过更改数据结构或以序列化格式存储数据来改进它。而后咱们将介绍调整Spark的缓存大小和Java垃圾收集器。

内存管理概述

Spark中的内存使用大体属于如下两种类别之一:执行和存储。执行内存是指用于在随机,链接,排序和聚合中进行计算的内存,而存储内存是指用于在集群中缓存和传播内部数据的内存。在Spark中,执行和存储共享一个统一的区域(M)。当没有使用执行内存时,存储能够获取全部可用内存,反之亦然。若有必要,执行能够驱逐存储,但仅限于总存储内存使用量低于某个阈值(R)。换句话说,R描述了M高速缓存块从未被驱逐的子区域。因为实施的复杂性,存储可能不会驱逐执行。

该设计确保了几种理想的特性。首先,不使用缓存的应用程序可使用整个空间执行,从而避免没必要要的磁盘溢出。其次,使用缓存的应用程序能够保留最小的存储空间(R),其中数据块不受驱逐。最后,这种方法为各类工做负载提供了合理的开箱即用性能,而无需用户内部划份内存的专业知识。

虽然有两种相关配置,但典型用户不须要调整它们,由于默认值适用于大多数工做负载:

  • spark.memory.fraction表示大小M为(JVM堆空间 - 300MB)的一小部分(默认值为0.6)。其他的空间(40%)保留用于用户数据结构,Spark中的内部元数据,以及在稀疏和异常大的记录的状况下防止OOM错误。
  • spark.memory.storageFraction将大小表示RM(默认值0.5)的一小部分。 RM缓存块不受执行驱逐的存储空间

spark.memory.fraction应该设置值,以便在JVM的旧版或“终身”代中温馨地适应这个堆空间量。有关详细信息,请参阅下面的高级GC调整讨论。

肯定内存消耗

肯定数据集所需内存消耗量的最佳方法是建立RDD,将其放入缓存中,而后查看Web UI中的“存储”页面。该页面将告诉您RDD占用多少内存。

为了估计特定对象的内存消耗,使用SizeEstimatorestimate方法。这对于尝试使用不一样的数据布局来调整内存使用状况以及肯定广播变量在每一个执行程序堆上占用的空间量很是有用。

调整数据结构

减小内存消耗的第一种方法是避免增长开销的Java功能,例如基于指针的数据结构和包装器对象。作这件事有不少种方法:

  1. 设计您的数据结构以优先选择对象数组和基本类型,而不是标准的Java或Scala集合类(例如HashMap)。fastutil 库提供方便的集合类基本类型是与Java标准库兼容。
  2. 尽量避免使用包含大量小对象和指针的嵌套结构。
  3. 考虑使用数字ID或枚举对象而不是键的字符串。
  4. 若是RAM少于32 GB,请设置JVM标志-XX:+UseCompressedOops以使指针为四个字节而不是八个字节。您能够添加这些选项 spark-env.sh

序列化RDD存储

尽管进行了这种调整,可是当对象仍然太大而没法有效存储时,减小内存使用的一种更简单的方法是使用RDD持久性API中的序列化StorageLevels以序列化形式存储它们,例如而后,Spark将每一个RDD分区存储为一个大字节数组。因为必须动态地反序列化每一个对象,所以以序列化形式存储数据的惟一缺点是访问时间较慢。若是您但愿以序列化形式缓存数据,咱们强烈建议使用Kryo,由于它致使比Java序列化(固然比原始Java对象)小得多的尺寸。MEMORY_ONLY_SER

垃圾收集调整

当您根据程序存储的RDD进行大量“流失”时,JVM垃圾回收可能会出现问题。(在读取RDD一次而后在其上运行许多操做的程序中一般不会出现问题。)当Java须要逐出旧对象以便为新对象腾出空间时,它须要遍历全部Java对象并查找未使用的。这里要记住的要点是垃圾收集的成本与Java对象的数量成正比,所以使用具备较少对象的数据结构(例如,Ints而不是a 的数组LinkedList)大大下降了这种成本。更好的方法是以序列化形式保存对象,如上所述:如今只有一个每一个RDD分区的对象(一个字节数组)。在尝试其余技术以前,首先要尝试GC是一个问题是使用序列化缓存

因为任务的工做内存(运行任务所需的空间量)与节点上缓存的RDD之间的干扰,GC也多是一个问题。咱们将讨论如何控制分配给RDD缓存的空间以缓解这种状况。

测量GC的影响

GC调优的第一步是收集有关垃圾收集发生频率和GC使用时间的统计信息。这能够经过添加-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStampsJava选项来完成(有关将Java选项传递给Spark做业的信息,请参阅配置指南。)下次运行Spark做业时,每次发生垃圾收集时,您都会看到工做日志中打印的消息。请注意,这些日志将位于群集的工做节点上(stdout位于其工做目录中的文件中),而不是位于驱动程序上。

高级GC调整

为了进一步调整垃圾收集,咱们首先须要了解JVM中有关内存管理的一些基本信息:

  • Java堆空间分为Young和Old两个区域。Young表明意味着持有短命的物体,而老一代则用于生命周期较长的物体。

  • Young代进一步分为三个区域[Eden,Survivor1,Survivor2]。

  • 垃圾收集过程的简化描述:当Eden已满时,在Eden上运行次要GC,并将从Eden和Survivor1中存活的对象复制到Survivor2。幸存者地区被交换。若是对象足够大或Survivor2已满,则将其移至Old。最后,当Old接近完整时,将调用完整的GC。

Spark中GC调整的目标是确保只有长寿命的RDD存储在Old代中,而且Young代的大小足以存储短时间对象。这将有助于避免完整的GC收集在任务执行期间建立的临时对象。可能有用的一些步骤是:

  • 经过收集GC统计数据来检查是否有太多垃圾收集。若是在任务完成以前屡次调用完整的GC,则意味着没有足够的内存可用于执行任务。

  • 若是有太屡次要集合但没有不少主要的GC,那么为Eden分配更多内存将会有所帮助。您能够将Eden的大小设置为高估每一个任务所需的内存量。若是肯定Eden的大小E,则可使用该选项设置Young代的大小-Xmn=4/3*E(按比例增长4/3也是为了解释幸存者地区使用的空间。)

  • 在打印的GC统计信息中,若是OldGen接近满,则经过下降来减小用于缓存的内存量spark.memory.fraction缓存更少的对象比减慢任务执行速度更好。或者,考虑减少Young代的尺寸。-Xmn若是您按上述设置,这意味着下降若是没有,请尝试更改JVM NewRatio参数的值许多JVM将此默认为2,这意味着旧一代占据堆的2/3。它应该足够大,使得该分数超过spark.memory.fraction

  • 尝试使用G1GC垃圾收集器-XX:+UseG1GC在垃圾收集成为瓶颈的某些状况下,它能够提升性能。请注意,大执行人堆大小,可能重要的是增长了G1区域大小 与-XX:G1HeapRegionSize

  • 例如,若是您的任务是从HDFS读取数据,则可使用从HDFS读取的数据块的大小来估计任务使用的内存量。请注意,解压缩块的大小一般是块大小的2或3倍。所以,若是咱们但愿有3或4个任务的工做空间,而且HDFS块大小为128 MB,咱们能够估计Eden的大小4*3*128MB

  • 监视垃圾收集所用频率和时间如何随新设置而变化。

咱们的经验代表,GC调整的效果取决于您的应用程序和可用内存量。更多的微调选项在线描述,但在较高的水平,管理完整的GC如何常常发生能够减小开销帮助。

能够经过设置spark.executor.extraJavaOptions做业的配置来指定执行程序的GC调整标志

其余考虑因素

并行程度

除非您为每一个操做设置足够高的并行度,不然将没法充分利用群集。Spark根据其大小自动设置要在每一个文件上运行的“map”任务的数量(尽管能够经过可选参数来控制它SparkContext.textFile等),而且对于分布式“reduce”操做,例如groupByKeyreduceByKey,它使用最大的父级RDD的分区数量。您能够将并行级别做为第二个参数传递(请参阅spark.PairRDDFunctions文档),或者设置config属性spark.default.parallelism以更改默认值。一般,咱们建议群集中每一个CPU核心有2-3个任务。

减小任务的内存使用状况

有时候,你会获得一个OutOfMemoryError,不是由于你的RDD不适合内存,而是由于你的一个任务的工做集,好比其中一个reduce任务groupByKey,太大了。斯巴克的整理操做(sortByKeygroupByKeyreduceByKeyjoin,等)创建每一个任务中的哈希表来进行分组,而这每每是大的。这里最简单的解决方法是 增长并行度,以便每一个任务的输入集更小。Spark能够有效地支持短至200毫秒的任务,由于它能够在多个任务中重用一个执行程序JVM,而且它具备较低的任务启动成本,所以您能够安全地将并行度提升到超过群集中的核心数。

广播大变量

使用 可用广播功能SparkContext能够大大减小每一个序列化任务的大小,以及经过群集启动做业的成本。若是您的任务使用其中的驱动程序中的任何大对象(例如静态查找表),请考虑将其转换为广播变量。Spark打印主服务器上每一个任务的序列化大小,所以您能够查看它以肯定您的任务是否过大; 通常来讲,大于约20 KB的任务可能值得优化。

数据位置

数据位置可能会对Spark做业的性能产生重大影响。若是数据和在其上运行的代码在一块儿,那么计算每每很快。可是若是代码和数据是分开的,那么必须移动到另外一个。一般,将序列化代码从一个地方运送到另外一个地方比一块数据更快,由于代码大小比数据小得多。Spark围绕数据局部性的通常原则构建其调度。

数据位置是数据与处理它的代码的接近程度。根据数据的当前位置,有多个级别的位置。从最近到最远的顺序:

  • PROCESS_LOCAL数据与正在运行的代码位于同一JVM中。这是最好的地方
  • NODE_LOCAL数据在同一节点上。示例可能位于同一节点上的HDFS中,也可能位于同一节点上的另外一个执行程序中。这比PROCESS_LOCAL由于数据必须在进程之间传输要慢一些
  • NO_PREF 能够从任何地方快速访问数据,而且没有位置偏好
  • RACK_LOCAL数据位于同一机架的服务器上。数据位于同一机架上的不一样服务器上,所以须要经过网络发送,一般经过单个交换机
  • ANY 数据在网络上的其余位置,而不在同一个机架中

Spark喜欢将全部任务安排在最好的局部性级别,但这不老是可能的。在任何空闲执行器上没有未处理数据的状况下,Spark切换到较低的局域级别。有两种选择:a)等待繁忙的CPU释放出来,在同一服务器上的数据上启动任务;b)当即在须要将数据移动到那里的较远的地方启动新任务。.

Spark一般会作的是等待繁忙的CPU释放的但愿。一旦超时到期,它就开始将数据从远处移动到空闲CPU。每一个级别之间的回退等待超时能够单独配置,也能够在一个参数中一块儿配置; 有关详细信息,请参阅配置页面spark.locality的 参数若是您的任务很长而且看不到位置,则应该增长这些设置,但默认状况一般颇有效。

摘要

这是一个简短的指南,指出在调优Spark应用程序时应该了解的主要问题 - 最重要的是,数据序列化和内存调整。对于大多数程序,切换到Kryo序列化并以序列化形式保存数据将解决最多见的性能问题。请随时在 Spark邮件列表中询问有关其余调优最佳作法的信息。

相关文章
相关标签/搜索