Spark调优

Spark调优

写在前面

对于调优, 我以为是没有放之四海而皆准的办法.html

不少时候, 调优显得没有必要, 即便不进行调优, 程序也可以顺利执行.java

在没有出现问题的时候, 不进行调优, 即便是在大数据中, 这也是我经常采用的原则.node

而且, 针对问题再进行调优, 每每是更为合适的.git

好比, 明明资源充足, 但程序运行依然很慢, 在这种状况下, 咱们非得采起 kryo序列化的方式去增长一点点运行速度. 而没有增长并行度.github

又或者有些时候, 减小没必要要的 shuffle操做, 是更好地方式.apache

没有哪一种调优手段是必须的, 且有效的.数组

这只是在帮助咱们去了解, 问题可能会出在哪些地方, 在代码无可变动的状况下, 又该如何调整, 以使程序顺利运行. 生搬硬套, 万万不可取.缓存

官方连接: Tuning Spark安全

中文连接: Spark 调优服务器

因为大多数 Spark 计算的内存性质,Spark 程序可能由集群中的任何资源: 如 CPU,网络带宽, 内存 致使瓶颈, 一般状况下,若是数据有合适的内存,瓶颈就是网络带宽,但有时您还须要进行一些调整,例如 以序列化形式存储 RDD 来减小内存的使用。

本指南将涵盖两个主要的主题:数据序列化,这对于良好的网络性能相当重要,而且还能够减小内存使用和内存优化。咱们选几个较小的主题进行展开。

数据序列化

序列化在任何分布式应用程序的性能中起着重要的做用。很慢的将对象序列化或消费大量字节的格式将会大大减慢计算速度。一般,这多是您优化 Spark 应用程序的第一件事。Spark 宗旨在于方便和性能之间取得一个平衡(容许您使用操做中的任何 Java 类型)。它提供了两种序列化库:

  • Java serialization: 默认状况下, Spark序列化对象使用的是 Java的 ObjectOutputStream, 只要你实现了 java.io.Serializable 接口, 就可以正常运行. 你也能够经过使用 java.io.Externalizable 来控制 对象中的哪一部分属性不须要进行序列化. Java序列化是至关灵活的, 但一般来讲也是很是慢的. 并致使不少类序列化后的值太大.

参考: Serializable和Externalizable浅析

  • Kryo serialization: Spark一样可使用 kryo(版本4)使得序列化对象变得更快, kryo的方式 通常要比 Java 自身的序列化速度快十倍以上, 但缺点是, 并不支持全部的类型的 序列化, 须要你在准备使用kryo的程序中注册 classses.

你能够切换到Kryo, 经过 sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 的方式. 经过这个序列化配置, 不只仅可以使 Kryo序列化使用在 shuffling操做中, 也能够在 序列化 RDDs 到硬盘时使用. 之因此在 Spark中没有将 Kryo当作默认实现方式的缘由是, 须要自身手动注册对应的class.

可是建议你在 任何的 网络密集型 application 中使用它. 从Spark 2.0.0以来, Spark在使用简单类型, 如 基本类型的数组, 或是String类型, 在这些数据类型作 shuffling操做时, 使用的内部方式都是 Kryo.

Spark 自动包含 Kryo 序列化器,用于 Twitter chill 中 AllScalaRegistrar 涵盖的许多经常使用的核心 Scala 类, 这意味着, 对于大多数常见类来讲, 你并不须要经过以下方式注册类:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

若是想要知道更多的 kryo细节, 能够参考kryo的官方, 官方提供了更加先进的注册方式, 如自定义 序列化 代码.

Kryo serialization

还有一点须要注意到的地方是:

若是你的对象太大, 你须要增大 spark.kryoserializer.buffer 配置.

可是在看过配置说明以后, 发现实际上须要更改的配置是 spark.kryoserializer.buffer.max 暂时尚未通过验证.

对于 spark.kryoserializer.buffer 配置而言, 默认值是 64k, 这个是 kryo buffer 的初始化值, 对于每一个worker中的每个核都只有一个 buffer. buffer 在须要的状况下最终会增加到 spark.kryoserializer.buffer.max.

而 spark.kryoserializer.buffer.max 则是 kryo buffer 的最大值, 默认是 64m, 必须大于任何一个你须要序列化的对象 同时 要须要 2048m, 当遇到 buffer limit exceeded exception时 说明你须要增长这个值了.

最后须要注意的是, 即便你并不注册 任何一个 classes, kryo依然能够工做, 可是 此时不得不 存储 类的全名称, 这是很大的浪费.

内存调优

内存调优有三个方面须要考虑:

  • 你的对象所使用的内存总量(你可能想要整个数据集都存储在内存中.)

  • 访问对象的成本

  • 垃圾收集的开销

通常来讲, 访问对象是很快速的, 可是很容易消耗比其字段中的 “raw” 数据多 2-5 倍的空间, 这是因为如下几个缘由:

  • 每一个不一样的 Java 对象都有一个 “object header”,它大约是 16 个字节,包含一个指向它的类的指针。对于一个数据不多的对象(好比说一个Int字段),这能够比数据大。

  • Java String 在原始字符串数据上具备大约 40 字节的开销(由于它们存储在 Char 数组中并保留额外的数据,例如长度),而且因为 UTF-16 的内部使用而将每一个字符存储为 两个 字节 String 编码。所以,一个 10 个字符的字符串能够容易地消耗 60 个字节。

  • 公共收集类,例如 HashMap 和 LinkedList,使用连接的数据结构,其中每一个条目(例如: Map.Entry)存在 “包装器” 对象。该对象不只具备 header,还包括指针(一般为 8 个字节)到列表中的下一个对象.

  • 原始类型的集合一般将它们存储为 “盒装” 对象,例如: java.lang.Integer.

本节将从 Spark 的内存管理概述开始,而后讨论用户能够采起的具体策略,以便在 application 中更有效地使用内存。具体来讲,咱们将描述如何肯定对象的内存使用状况,以及如何改进数据结构,或经过以序列化的格式存储数据。而后咱们将介绍调整 Spark 的缓存大小和 Java 垃圾回收器。

内存管理

Spark中的内存使用主要能够分为两大类, 执行 和 存储.

执行内存 主要指 用于计算中的 混洗(shuffle), 合并(join), 聚合(aggregation).

存储内存主要指 在集群中 缓存 和 传播内部数据 使用的内存.

在Spark中, 执行 和 存储 共享统一区域(M). 当没有 执行 内存在使用时, 存储便可获取全部的可用内存 反之亦然.

若是须要, 执行能够 驱逐 存储, 但这必须在 总的存储 内存的使用量 低于 阈值(R). 换句话说, R 描述了M 缓存块永远不会被驱逐 的 区域. 因为实现的复杂性, 存储永远 不会驱逐 执行.

该设计确保了几个理想的性能.

首先,不使用缓存的应用程序能够将整个空间用于执行,从而避免没必要要的磁盘泄漏。

第二,使用缓存的应用程序能够保留最小的存储空间(R),其中数据块不受驱逐。

最后,这种方法为各类工做负载提供了合理的开箱即用性能,而不须要用户内部如何分配内存的专业知识。

虽然提供了两条相关配置, 可是典型 用户 并不须要调整他们, 默认值已经可以知足大多数状况下的使用要求了.

  • spark.memory.fraction 描述中提到的 内存 M, 是指占用的 (JVM堆内存 - 300M) 的比率, 默认是0.6. 剩余的0.4保留用于用户数据结构,Spark中的内部元数据,而且在偶尔遇到异常大的记录的状况下保护OOM错误.

  • spark.memory.storageFraction 描述中所提到的R, 是指占用 M 的比率, 默认是0.5. R 是 M 缓存块中的缓存被执行驱逐的存储空间.

第二个参数 也就已经描述了, 存储 和 执行 各自须要占用的比例, 也即, 对于 shuffle join agg 并不存在的 程序中, 彻底能够将比例调低, 将内存供给 执行器使用. 默认是55开.

在 Jvm 的旧版本 或 长期支持的版本中, 应该指定 spark.memory.fraction 以适应 堆内存的大小.

肯定内存消耗

肯定数据集须要占用内存的大小的最佳方式是, 建立RDD, 放入缓存中, 而后查看 Web UI 的 Storage 页面.

为了估算特定对象的内存占用, 应该使用 SizeEstimator 的 estimate 方法, 这对于尝试使用不一样的数据布局以减小内存使用量 以及 肯定广播变量将在每一个执行程序堆上占用的空间量颇有用.

调整数据结构

减小内存消耗的第一种方法是避免添加开销的 Java 功能,例如基于指针的数据结构和包装对象。有几种方法能够作到这一点:

  • 将数据结构设计为偏好对象数组和原始类型,而不是标准的 Java 或 Scala 集合类(例如: HashMap)。该 fastutil 库提供方便的集合类基本类型是与 Java 标准库兼容。

    <dependency>
          <groupId>it.unimi.dsi</groupId>
          <artifactId>fastutil</artifactId>
          <version>8.3.0</version>
      </dependency>

fastUtil 官方连接

  • 尽量避免使用不少小对象和指针的嵌套结构. 好比Integer.

  • 考虑使用数字 ID 或枚举对象而不是 字符串形式的 键.

  • 若是您的 RAM 小于32 GB,请设置 JVM 标志 -XX:+UseCompressedOops,使指针为4个字节而不是8个字节. 你能够在 spark-env.sh中添加这个选项.

序列化RDD

尽管已经作了些许调整, 可是的对象仍然太大而没法有效存储,减小内存使用的一个更简单的方法是以序列化形式存储它们,使用 RDD 持久性 API 中的序列化 StorageLevel,例如: MEMORY_ONLY_SER, Spark 将会将每一个 RDD 分区存储为一个大字节数组。以序列化形式存储数据的惟一缺点是访问变得更慢,由于必须对每一个对象进行反序列化。若是你想以序列化形式缓存数据,强烈建议使用 Kryo,由于它致使比 Java 序列化更小的尺寸(并且确定比原 Java 对象)更小。

GC调优

当您的程序存储的 RDD 有很大的”流失”时,JVM 垃圾收集多是一个问题。(程序中一般没有问题,只读一次 RDD,而后在其上运行许多操做)。当 Java 须要驱逐旧对象为新的对象腾出空间时,须要跟踪全部 Java 对象并找到未使用的。要记住的要点是,垃圾收集的成本与 Java 对象的数量成正比,所以使用较少对象的数据结构(例如: Ints 数组,而不是 LinkedList)大大下降了此成本。一个更好的方法是如上所述以序列化形式持久化对象:如今每一个 RDD 分区只有一个对象(一个字节数组)。在尝试其余技术以前,若是 GC 是一个问题,首先要使用序列化缓存。

因为任务的工做内存(运行任务所需的空间)和缓存在节点上的 RDD 之间的干扰,GC 也多是一个问题。咱们将讨论如何控制分配给RDD缓存的空间来减轻这一点。

测量 GC 的影响

GC 调整的第一步是收集关于垃圾收集发生频率和GC花费的时间的统计信息。这能够经过添加 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 到 Java 选项来完成.

至于如何添加到 Java选项, 能够参考: Spark集群-Standalone 模式

下次运行 Spark 做业时,每当发生垃圾回收时,都会看到在工做日志中打印的消息。请注意,这些日志将在你的集群的工做节点上(stdout 在其工做目录中的文件中),而不是你的 driver节点。

高级 GC 优化

GC并不是只是在 Spark项目中独有的, 而是放之Java而皆准的道理.

只要网上搜下, Java GC调优, 相信你会找到许许多多的资料, 我就不在这里详细描述了.

由于经过以前的描述, 已经懂得了如何估算使用内存, 查看特定对象的内存, 以及打印GC日志.

最后, 能够经过配置:

spark.executor.extraJavaOptions 来指定执行器的 GC 调整参数.

其余

并行度

相信只要稍微了解过Spark调优, 就会看到这个观点.

除非您为每一个操做设置足够高的并行度,不然群集将没法充分利用. Spark根据文件的大小自动设置要在每一个文件上运行的 “映射” 任务的数量(尽管您能够经过可选的参数来控制它SparkContext.textFile,等等),而且对于分布式的 reduce 操做(例如groupByKey和reduceByKey),它使用最大的父RDD的分区数。您能够将并行性级别做为第二个参数传递,或将config属性设置spark.default.parallelism为更改默认值。一般,咱们建议集群中每一个CPU内核执行2-3个任务.

减小任务的内存使用

有时并不是由于RDD没有足够的内存致使内存溢出, 而是由于 而是由于 某一个 任务的 工做集, 如在 groupByKey 的 reduce任务 太大了, Spark的 shuffle操做 (sortByKey, groupByKey, reduceByKey, join 等操做) 创建每一个任务中的哈希表来进行分组,而这每每是很大的. 这里最简单的解决方案是 增长并行级别, 以便每一个任务的输入集都更小。Spark 能够有效地支持短达 200ms 的任务,由于它能够将多个任务中的一个执行者 JVM 重用,而且任务启动成本低,所以您能够将并行级别安全地提升到比集群中的核心数量更多。

广播大的变量

使用 可用的广播功能 SparkContext 能够大大减小每一个序列化任务的大小,以及在群集上启动做业的成本。若是您的任务使用其中的驱动程序中的任何大对象(例如:静态查找表),请考虑将其变为广播变量。Spark 打印主机上每一个任务的序列化大小,所以您能够查看该任务以决定您的任务是否过大; 通常任务大于 20KB 大概值得优化。

数据本地化

数据本地化可能会对 Spark job 的性能产生重大影响。若是数据和在其上操做的代码在一块儿,则计算每每是快速的。但若是代码和数据分开,则必须移动到另外一个。一般,代码大小远小于数据,所以将数据代码从一个地方寄送到另外一个地方比一大块数据更快。Spark 围绕数据局部性的通常原则构建其调度。

数据本地化是指数据和代码处理有多近。根据数据的当前位置有几个地方级别。从最近到最远的顺序:

  • PROCESS_LOCAL 数据与运行代码在同一个 JVM 中。这是可能的最好的地方

  • NODE_LOCAL 数据在同一个节点上。示例可能在同一节点上的 HDFS 或同一节点上的另外一个执行程序中。这比 PROCESS_LOCAL 由于数据必须在进程之间移动慢一些

  • NO_PREF 数据从任何地方一样快速访问,而且没有本地偏好

  • RACK_LOCAL 数据位于同一机架上的服务器上。数据位于同一机架上的不一样服务器上,所以须要经过网络发送,一般经过单个交换机发送

  • ANY 数据在网络上的其余地方,而不在同一个机架中

Spark 喜欢将全部 task 安排在最佳的本地级别,但并不能作到永远如愿以偿。在任何空闲 executor 中没有未处理数据的状况下,Spark 将切换到较低的本地级别。

有两个选项:

  1. 等待繁忙的CPU释放以在同一服务器上的数据上启动任务

  2. 当即将数据移动到更远的地方启动新任务

Spark一般要作的是稍等一下,以期释放繁忙的CPU。

一旦超时到期,它将开始将数据从很远的地方移到空闲的CPU中。每一个级别之间的回退等待超时能够单独配置,也能够一块儿配置在一个参数中。

有关详细信息,请参见配置页面spark.locality上的 参数。若是您的任务很长而且位置不佳,则应该增长这些设置,可是默认设置一般效果很好。

参数有:

spark.locality.wait 默认值3秒, 级别会逐渐从 PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL 逐渐过渡. 采起的时间都是相同的.

能够分别指定三种类型对应的数据. spark.locality.wait.node, spark.locality.wait.process, spark.locality.wait.rack.

相关文章
相关标签/搜索