鉴于 Spark 基于内存计算这一天性,如下集群资源可能会形成 Spark 程序的瓶颈:java
CPU,带宽和内存。一般状况下,若是内存足够的状况下,瓶颈只可能出如今网络带apache
宽方面;但有时,你也须要作一些例如序列化优化来下降内存使用率。这份指导主要数组
集中于两方面:数据序列化,这是充分提高网络表现和下降内存消耗、内存优化的关缓存
键;咱们也会简要阐述一些小技巧。服务器
数据序列化网络
序列化在任何分布式应用的运行中扮演了重要的角色。采用那些序列化慢的格式、或数据结构
者消费巨量字节时将会严重拖慢计算效率。一般状况下,调整数据的序列化方式是你app
优化 Spark 程序时首先须要作的事。Spark 程序试图在简洁(循序你在代码中使用任框架
何 Java 的数据类型)和效率之间取得一种平衡。Spark 提供了两种序列化库。分布式
Java
serialization:默认状况下,Spark 序列话一个对象时使用 Java 自带的
ObjectOutputStream 框架,对于任何实现了 java.io.Serializable 接口的类都
有效。有也能够经过继承 java.io.Externalizable 来自定义你的序列化过程。
Java serialization 是灵活的,但一般至关缓慢而且致使不少类的序列化格式很
臃肿。
·
Kryo
serialization:
Spark 也可使用更快的序列化类库 Kryo
library
(version 2)来序列化对象。相比 Java serialization,Kryo 具备更快和更加紧凑
(一般提供 10 倍于 Java 序列化的效率)的优点。但对于全部可序列化的类型
不是所有都支持,所以为了更好的效率,你须要提早为你的程序注册这些类。
又能够经过设置初始化时的 SparkConf 和调用
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")来切换到
Kryo 模式。这项配置不只会在工做节点进行数据混洗时用到 Kryo 序列化,并且在将
RDD 序列化到硬盘时也会使用到 kryo。Kryo 之因此没有成为默认设置是由于使用者
须要自行注册一些类,可是咱们建议在一些网络密集型应用中尝试使用 kryo 序列化。
从 Spark2.0.0 开始,当对于简单类型,简单类型数组和字符串类型的 RDD 进行混洗
时,Spark 已经使用 Kryo 进行了内部整合。
Spark 已经自动对不少经常使用的核心 Scala 类(包含于 AllScalaRegistrar,位于
Twitter chill library)进行了 Kryo 序列化注册。
要使自定义类应用 Kryo 注册,你须要 registerKryoClasses 方法:
[java] view plain copy
1. SparkConf sparkConf = new SparkConf().setAppName("Kryo");
2. Class<?>[] classs={MyClass.class,YourClass.class};
3. sparkConf.registerKryoClasses(classs);
4. JavaSparkContext sc = new JavaSparkContext(sparkConf);
kryo 文档描述了进阶注册选项,例如添加自定义序列化编码。
若是你的对象很大,你可能须要增长 spark.kryoserializer.buffer。这个值须要足够
大,以致于可以容纳下被序列化的最大的对象。
若是你不注册你的自定义类,Kryo 仍然会执行下去,但它将会存储每一个对象的类全
名,这真是一种浪费。
内存优化
在优化内存使用率时主要有三方面可考虑的因素:你建立的对象占用的内存量(你也
许想将整个数据据装进内存),访问这些数据的代价和累计回收的开销(当你的对象
在内存中具备较高轮换率时)
默认状况下,访问 Java 对象是快速的。但一不留神就会消耗 2 到 5 倍的空间用于存储
对象中的原始属性变量,这主要是出于如下缘由:
·
每一个不一样的 Java 对象拥有一个“对象头”,这个“对象头”占用 16 字节,包
含指向所属类型的指针信息。对于一个包含不多数据的对象(例如一个 Int 属
性),这些“对象头”信息占用的内存空间可能比数据自己更大。
Java 的 String 对象包含将近 40 字节的开销用于描述这些原始字符串数据(因
为他们将其存储与一个字符数组并保存了额外的信息,例如字符串长度),同
时每一个字符使用两个字节存储(UTF-16)。所以一个 10 个字符的 String 对
象,轻轻松松就能消耗 60 字节的空间。
·
常见的集合类,例如 HashMap 和 LinkedList,使用了链式结构,针对每一个实
体(Map.Entry)都对应一个包装对象。这个对象不只包含了“头信息”,而
且存储了指向下一个对象的指针。
原始基本数据类型的集合对象在存储每个基本类型时仍是用了包装类对象,
例如 java.lang.Integer
这一部分将首先简要概述一下 Spark 的内存管理,而后列举一些特殊的策略,来帮助
你在优化你的应用时采起更高效的方式。咱们将着重描述如何肯定对象的内存占用和
如何改变数据结构和序列化方式来下降内存占用。而后,咱们会介绍如何优化 Spark
的缓存大小和 Java 垃圾回收。
内存管理概览
Spark 的内存使用大体可划分两类:执行和存储。执行存储指的是计算(shuffles,
join,sorts 和 aggregations)时用的内存,而存储内存指的是用于缓存和在集群内
部传播的数据。在 Spark 中,执行和存储共享统一的区域(M)。当执行模块没有占
用内存时,存储模块能够获取所有内存(统一区域),反之亦然。执行模块能够驱逐
存储模块,当且仅当所有内存使用落到某个设定的阈值时(R)。换句话说,R 从 M
划分出一个亚区,这个亚区的缓存不可被驱逐。出于实现的复杂性的考虑,存储模块
没法驱逐执行模块。
这个设计确保了一些吸引人的特性。一、若是应用不使用缓存的话,计算模块可使用
整个内存空间进行计算,排除没必要要的硬盘溢写。二、若是一个应用能够经过 R 预约一
个最低的存储空间用于缓存,那这些缓存对于驱逐是免疫的。三、这提供了可靠的开箱
即用的方式来应对不一样的工做,即使你不是一个对内部存储分配了如指掌的专家。
尽管 Spark 提供了两个相关的配置项,但大部分用户并不须要去调整它们,由于配置
项的默认值已经足够应对大多数工做任务。
·
spark.memory.fraction:表明了上文中的 M,表示内存占用(JVM
heap
space-300MB)比率(默认值 0.6)。剩余的 40%的空间主要用来存储数据
结构、内部元数据并预防由稀疏、大记录引起 OOM。
spark.memory.storageFraction:表示上文提到的 R,表示从 M 中划分出 R
大小的一个区域(默认值 0.5),这个被划分出的区域中的缓存数据块对于计
算模块的驱逐是免疫的。
spark.memory.fraction 值的配置应当使得 JVM 中的堆内存与老代和永久代的空间相
协调。具体配置见下文 GC 优化调整细节。
判断内存消耗
判断一个数据集到底消耗多少内存的最佳方式是:将数据集加载到 RDD 并将其缓存下
来,而后去 Spark Web UI 查看“Storage”页面。这个页面将告诉你,你的 RDD 正
在申请多大的内存。
要预估某个指定对象的内存消耗时,请使用 SizeEstimator 的 estimate 方法,这是对
于哪些想试验一下如何经过改变数据类型来消减内存和判断某个广播变量将在每一个执
行器申请多大内存的朋友来讲是个好工具。
优化数据结构
下降内存消耗的首要方法就是避免使用添加额外开销的 Java 特征,例如基于指针的数
据结构和包装对象。具体小贴士以下:
1. 使用对象数组和原始类型来构造你的数据结构,而不是使用标准的 Java 和
Scala 集合类(例如 HashMap)。fastutil 库提供了针对原始类型的便捷的集
合类,这些类兼容 Java 标准库。
2. 避免使用包含过多小对象和指针的嵌套结构。
3. 考虑使用数字和枚举对象代替字符串做为键值。
4. 若是你使用的随机内存少于 32G,设置 JVM 的标志-
XX:+UseCompressedOops 来使引用只占用 4 字节而不是 8 字节。同窗你可
以在 spark-env.sh 中添加这个配置项哦
序列化 RDD 存储
当你的对象太大以致于以上优化均被无视的状况下,有一个副更简单的药能够拯救你
的对象,那就是将它存储为序列化格式来下降内存使用。经过使用序列化级别来将
RDD 持久化,例如 MEMORY_ONLY_SER。随后,Spark 将 RDD 的每一个分区存储成
一个个字节数组。这粒药丸只有一个反作用,那就是访问这些序列化的数据是须要多
耗费些时间,由于在读取前须要先反序列化这些数据。若是你以为你的 Spark 程序需
要吃药的话,咱们强烈建议你使用 Kryo 这一序列化格式来缓存你的数据,由于相比
Java 自带的序列化方式,Kryo 可让你的对象更瘦(这就是抽脂和整容流行的原
因)。
垃圾回收优化
当你的程序存储的 RDD 须要频繁轮换时,JVM 垃圾回收可能会出现问题。(当对一
个 RDD 仅读取一次,而后在其上进行屡次操做时并不会带来问题)当 Java 须要回收
老对象占用的空间时,它将扫描你全部的对象来找到其中不被使用的。须要指出的一
点是,垃圾回收的消耗和你的 Java 对象个数成正比,所以你所应用的数据结构拥有的
对象越少越好(例如使用 int 数组代替 LinkedList)。一个更好的方法是使用序列化
格式来持久化你的对象,如上所述:一旦序列化后,每一个 RDD 将只对应一个对象(一
个字节数组)。因此当存在 GC 问题时,在尝试其余技巧前,你首先要作的是使用序
列化的缓存技术。
因为工做节点上任务工做内存和 RDD 缓存之间的冲突也会致使 GC 问题。咱们将会讨
论如何分配空间去存储 RDD 缓存来缓解这个问题。
测算 GC 的影响
第一步是收集关于垃圾处理的频率和 GC 消耗时间的统计数据。这个能够经过添加如
下 Java 选项-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 来实
现。
[java] view plain copy
1. ./bin/spark-submit --name "My app" --master local[4] --
conf spark.eventLog.enabled=false
2.
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -
XX:+PrintGCTimeStamps" myApp.jar
下次 Spark 启动时,每次 GC 的操做日志将会打印出来。须要注意的是这些日志将出
如今你的集群工做节点上(在工做目录的标准输出文件里),而不是你的驱动程序
里。
GC 进阶优化
为了准备更深刻的垃圾回收优化,咱们先要理解一些关于 JVM 内存管理的基本知识:
·
Java Heap 空间被分红了两个区域 Young 和 Old。Young 代主要保存短生命
周期的对象,而 Old 代用于保存具备长生命周期的对象。
·
·
Young 代又被进一步划分为三个区[Eden,Survivor1,Survivor2]
简述一下内存碎片整理步骤:当 Eden 满了,一个小型的 GC 被触发,Eden
和 Survivor1 中幸存的仍被使用的对象被复制到 Survivor2。Survivor1 和
Survivor2 区域进行交换。当一个对象生存的时间足够长或者 Survivor2 满
了,它被转移到 Old 代。最终当 Old 空间快满时,一个全面的 GC 被召唤。
GC 优化的目的是使 Spark 保证只有长生存周期的 RDDs 才会被存储在 Old
代,而且 Young 代设计为知足存储短生命周期的对象。
这将帮你避免全面 GC 去收集 Spark 运行期间产生的临时对象。一些实用的小技巧如
下:
·
首先检查 GC 日志中是否有过于频繁的 GC。若是在一个任务完成前,全量 GC
被唤醒了屡次,它意味着对于执行任务来讲没有分配足够的内存。
若是有太多的小型垃圾收集但全量 GC 出现并很少,给 Eden 分配更多的内存
会颇有帮助。你能够为每一个任务设置为一个高于其所需内存的值。假设 Eden
代的内存需求量为 E,你将能够设置 Young 代的内存为-Xmn=4/3*E。(这一
设置一样也会致使 survivor 区同时扩张)
·
在 GC 打印的日志中,若是 OldGen 接近满时,能够经过下降
spark.memory.fraction 来减小用于缓存的空间。更好的方式是缓存更少的对
象而不是下降做业执行时间。一个可选的方案是减小 Young 代的规模。若是
你设置了“-Xmn”,能够下降-Xmn。若是没有设置,能够尝试改变 JVM 的
NewRatio 参数。不少 JVM 的 NewRation 默认值是 2,这意味着 Old 代申请
2/3 的堆空间。它的值应在足够大以致能够超过 spark.memory.fraction。
尝试使用 G1GC 垃圾收集选项:-XX:+UseG1GC。当 GC 存在瓶颈时,采用这
一选项在某些状况下能够提高性能。当执行器的堆空间比较大时,提高 G1
region size(-XX:G1HeapRegionSize)是一种重要的选择。
·
·
若是你的任务须要从 HDFS 系统读取数据,能够经过估计 HDFS 文件的大小来
预估任务所需的内存量。须要注意的是解压后的块大小是原大小的 2 到 3 倍。
所以咱们须要设置 3 到 4 倍的工做空间用于做业执行,例如 HDFS 的块大小为
128MB,咱们须要预估 Eden 的大小为 4*3*128MB。
·
监控在新变化和设置生效后,GC 的频率和耗费的事件。
咱们的经验建议是,GC 优化的成效依赖与你的应用和可用内存的多少。网上也有许多
优化策略,可是须要更深的知识基础,例如经过控制全量 GC 发生的频率来降少总开
销。
经过设置 spark.executor.extraJavaOptions 能够实现对执行器中 GC 的优化调整。
其余的考虑
并行度
若是合理分配每一个操做的并行度,将极大的发挥集群的优点。Spark 按照文件的大小
自动设置 map 任务数来处理每一个文件(固然你能够经过设置 SparkContext.textFile
的可选参数来控制并行程度),而且对于分布式的 reduce 操做,例如 groupByKey
和 reduceByKey 他们使用父 RDD 的最大分区数来设置并行数。你也能够经过传递第
二个参数来控制并行级别(参考 spark.PairRDDFunctions 文档)或者设置启动时的
并行级别来改变默认值(spark.default.parallelism),咱们建议你为每一个 CPU 分配
2-3 个并行任务。
Reduce 任务的内存分配
有时,你会收到一个 OOM 的错误,由于你的 RDD 超出了内存的大小,这有多是
由于某个任务的工做集太大形成的,例如 groupByKey 这个 reduce 任务。Spark 的
混洗操做(例如 sortByKey,groupByKey,reduceByKey,join 等)在每一个子任务中构造
并维护了一个哈希表,来完成归类操做,一般状况下此表很大。最简单的修复方法是
提高并行程度,以便使每一个任务的输入足够小。Spark 能够高效的支持短如 200ms 的
任务,由于它能够跨多任务重用执行器的 JVM,而且它的任务加载和启动开销很是
小,所以你能够放心的增长并行度,甚至能够设置比集群核心总数更多的并行任务
数。
广播大变量
使用 SparkContext 的广播功能极大的减小每一个序列化人物的大小,而且下降集群中
任务加载的开销。若是你的任务使用任何来自驱动器的大对象(例如一个静态查找
表),应该考虑将这个大对象加载到广播变量中去。Spark 能够打印序列化的任务的
大小,所以你能够经过查看输出来判断你的做业是否太大了;一般状况下,一个大于
20KB 的对象是值得放进广播变量中来进行优化的。
数据存放位置
数据的存放位置对于 Spark 做业的执行效率具备重要影响。若是数据和操做它的代码
在一块儿的话,计算将是很是高效的。可是,若是代码和数据是分离的,一方须要移动
到另外一方那里。显而易见的是移动代码比移动数据高效的多,由于代码的字节数源小
于数据。Spark 是基于这一常规原则来构建她的数据存放策略的。
数据本地化是使得数据和在它之上的操做距离更近。这里列举了一些存放数据的级
别,这些级别的划分是基于数据存储位置的,按照由近及远的顺序。
·
·
PROCESS_LOCAL:数据和运行的代码同时在 JVM 中。这是最好的存储方
式。
NODE_LOCAL:数据和代码在同一个节点上。实例在同一个节点的 HDFS
中,或者在相同节点的另外一个执行器里。这是比 PROCESS_LOCAL 稍慢的级
别,由于数据须要在进程间传递。
·
·
NO_PREF:从任何地方访问数据都很快,没有位置偏好。
RACK_LOCAL:数据和代码存在与同一个机架的服务器中。数据在同一个机架
的不一样服务中,所以须要依靠网络来传递这一数据,通常只需通过一个交换
器。
·
ANY:数据存储在同一网络环境,但不在同一机架上。
Spark 程序理想状态是调度全部的子任务都处于最佳的存储策略之下,但这一般只是
理想。
(海量Spark, hadoop资源共享请点击: 498835267)