多易教育: Spark内存管理之堆内/堆外内存前世此生详解

1.前言

在执行Spark的应用程序时,Spark集群会启动Driver和Executor两种JVM进程,前者为主控进程,负责建立Spark上下文,提交Spark做业(Job),并将做业转化为计算任务(Task),在各个Executor进程间协调任务的调度,后者负责在工做节点上执行具体的计算任务,并将结果返回给Driver,同时为须要持久化的RDD提供存储功能。因为Driver的内存管理相对来讲较为简单,本文主要对Executor的内存管理进行分析,下文中的Spark内存均特指Executor的内存。apache

2.堆内和堆外内存

做为一个JVM进程,Executor的内存管理创建在JVM的内存管理之上,Spark对JVM的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark引入了堆外(Off-heap)内存,使之能够直接在工做节点的系统内存中开辟空间,进一步优化了内存的使用。缓存

2.1 堆内内存(On-heap Memory)

堆内内存的大小,由Spark应用程序启动时的–executor-memory或spark.executor.memory参数配置。Executor内运行的并发任务共享JVM堆内内存,这些任务在缓存RDD和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行Shuffle时占用的内存被规划为执行(Execution)内存,剩余的部分不作特殊规划,那些Spark内部的对象实例,或者用户定义的Spark应用程序中的对象实例,均占用剩余的空间。不一样的管理模式下,这三部分占用的空间大小各不相同(下面第2小节介绍)。并发

2.1.1 堆内内存的申请与释放性能

Spark对堆内内存的管理是一种逻辑上的“规划式”的管理,由于对象实例占用内存的申请和释放都由JVM完成,Spark只能在申请后和释放前记录这些内存:测试

申请内存优化

Spark在代码中new一个对象实例
JVM从堆内内存分配空间,建立对象并返回对象引用
Spark保存该对象的引用,记录该对象占用的内存

释放内存ui

Spark记录该对象释放的内存,删除该对象的引用
等待JVM的垃圾回收机制释放该对象占用的堆内内存

2.1.2 堆内内存优缺点分析spa

咱们知道,堆内内存采用JVM来进行管理。而JVM的对象能够以序列化的方式存储,序列化的过程是将对象转换为二进制字节流,本质上能够理解为将非连续空间的链式存储转化为连续空间或块存储,在访问时则须要进行序列化的逆过程——反序列化,将字节流转化为对象,序列化的方式能够节省存储空间,但增长了存储和读取时候的计算开销。操作系统

对于Spark中序列化的对象,因为是字节流的形式,其占用的内存大小可直接计算。
对于Spark中非序列化的对象,其占用的内存是经过周期性地采样近似估算而得,即并非每次新增的数据项都会计算一次占用的内存大小。这种方法:code

  • 下降了时间开销可是有可能偏差较大,致使某一时刻的实际内存有可能远远超出预期
  • 此外,在被Spark标记为释放的对象实例,颇有可能在实际上并无被JVM回收,致使实际可用的内存小于Spark记录的可用内存。因此Spark并不能准确记录实际可用的堆内内存,从而也就没法彻底避免内存溢出(OOM, Out of Memory)的异常。

虽然不能精准控制堆内内存的申请和释放,但Spark经过对存储内存和执行内存各自独立的规划管理,能够决定是否要在存储内存里缓存新的RDD,以及是否为新的任务分配执行内存,在必定程度上能够提高内存的利用率,减小异常的出现。

2.1.3 堆内内存分区(静态方式,弃)

在静态内存管理机制下,存储内存、执行内存和其余内存三部分的大小在Spark应用程序运行期间是固定的,但用户能够在应用程序启动前进行配置,堆内内存的分配如图所示:

能够看到,可用的堆内内存的大小须要按照下面的方式计算:

可用的存储内存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction
可用的执行内存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction

其中systemMaxMemory取决于当前JVM堆内内存的大小,最后可用的执行内存或者存储内存要在此基础上与各自的memoryFraction参数和safetyFraction参数相乘得出。上述计算公式中的两个safetyFraction参数,其意义在于在逻辑上预留出1-safetyFraction这么一块保险区域,下降因实际内存超出当前预设范围而致使OOM的风险(上文提到,对于非序列化对象的内存采样估算会产生偏差)。值得注意的是,这个预留的保险区域仅仅是一种逻辑上的规划,在具体使用时Spark并无区别对待,和“其它内存”同样交给了JVM去管理。

2.1.4 堆内内存分区(统一方式,现)

默认状况下,Spark 仅仅使用了堆内内存。Executor 端的堆内内存区域大体能够分为如下四大块:

分区 说明
Execution 内存 主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程当中的临时数据
Storage 内存 主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据
用户内存(User Memory) 主要用于存储 RDD 转换操做所须要的数据,例如 RDD 依赖等信息
预留内存(Reserved Memory) 系统预留内存,会用来存储Spark内部对象

整个 Executor 端堆内内存若是用图来表示的话,能够归纳以下:

咱们对上图进行如下说明:

  • systemMemory = Runtime.getRuntime.maxMemory,其实就是经过参数 spark.executor.memory 或 –executor-memory 配置的。
  • reservedMemory 在 Spark 2.2.1 中是写死的,其值等于 300MB,这个值是不能修改的(若是在测试环境下,咱们能够经过 spark.testing.reservedMemory 参数进行修改);
  • usableMemory = systemMemory – reservedMemory,这个就是 Spark 可用内存;
  • 关于动态占用机制,因为统一内存管理方式中堆内堆外内存的管理均基于此机制,因此单独提出来说解。参见文本第三节。

2.2 堆外内存(Off-heap Memory)

为了进一步优化内存的使用以及提升Shuffle时排序的效率,Spark引入了堆外(Off-heap)内存,使之能够直接在工做节点的系统内存中开辟空间,存储通过序列化的二进制数据。除了没有other空间,堆外内存与堆内内存的划分方式相同,全部运行中的并发任务共享存储内存和执行内存。

Spark 1.6 开始引入了Off-heap memory(详见SPARK-11389)。这种模式不在 JVM 内申请内存,而是调用 Java 的 unsafe 相关 API 进行诸如 C 语言里面的 malloc() 直接向操做系统申请内存。因为这种方式不通过 JVM 内存管理,因此能够避免频繁的 GC,这种内存申请的缺点是必须本身编写内存申请和释放的逻辑。

2.2.2 堆外内存的优缺点

利用JDK Unsafe API(从Spark 2.0开始,在管理堆外的存储内存时再也不基于Tachyon,而是与堆外的执行内存同样,基于JDK Unsafe API实现[3]),Spark能够直接操做系统堆外内存,减小了没必要要的内存开销,以及频繁的GC扫描和回收,提高了处理性能。堆外内存能够被精确地申请和释放,并且序列化的数据占用的空间能够被精确计算,因此相比堆内内存来讲下降了管理的难度,也下降了偏差。

2.2.3 堆外内存分区(静态方式,弃)

堆外的空间分配较为简单,存储内存、执行内存的大小一样是固定的

可用的执行内存和存储内存占用的空间大小直接由参数spark.memory.storageFraction决定,因为堆外内存占用的空间能够被精确计算,因此无需再设定保险区域。

静态内存管理机制实现起来较为简单,但若是用户不熟悉Spark的存储机制,或没有根据具体的数据规模和计算任务或作相应的配置,很容易形成“一半海水,一半火焰”的局面,即存储内存和执行内存中的一方剩余大量的空间,而另外一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。因为新的内存管理机制的出现,这种方式目前已经不多有开发者使用,出于兼容旧版本的应用程序的目的,Spark仍然保留了它的实现。

2.2.4 堆外内存分区(统一方式,现)

相比堆内内存,堆外内存只区分 Execution 内存和 Storage 内存,其内存分布以下图所示:

关于动态占用机制,因为统一内存管理方式中堆内堆外内存的管理均基于此机制,因此单独提出来说解。参见文本第三节。

3. 动态占用机制–Execution&&Storage

细心的同窗确定看到上面两张图中的 Execution 内存和 Storage 内存之间存在一条虚线,这是为何呢?

在 Spark 1.5 以前,Execution 内存和 Storage 内存分配是静态的,换句话说就是若是 Execution 内存不足,即便 Storage 内存有很大空闲程序也是没法利用到的;反之亦然。这就致使咱们很难进行内存的调优工做,咱们必须很是清楚地了解 Execution 和 Storage 两块区域的内存分布。

而目前 Execution 内存和 Storage 内存能够互相共享的。也就是说,若是 Execution 内存不足,而 Storage 内存有空闲,那么 Execution 能够从 Storage 中申请空间;反之亦然。因此上图中的虚线表明 Execution 内存和 Storage 内存是能够随着运做动态调整的,这样能够有效地利用内存资源。Execution 内存和 Storage 内存之间的动态调整能够归纳以下:

3.1 动态调整策略

具体的实现逻辑以下:

  • 程序提交的时候咱们都会设定基本的 Execution 内存和 Storage 内存区域(经过 spark.memory.storageFraction 参数设置);
  • 在程序运行时,双方的空间都不足时,则存储到硬盘;将内存中的块存储到磁盘的策略是按照 LRU 规则(Least Recently Used)进行的。若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
  • Execution 内存的空间被对方占用后,可以让对方将占用的部分转存到硬盘,而后”归还”借用的空间
  • Storage 内存的空间被对方占用后,目前的实现是没法让对方”归还”,由于须要考虑 Shuffle 过程当中的不少因素,实现起来较为复杂;并且 Shuffle 过程产生的文件在后面必定会被使用到,而 Cache 在内存的数据不必定在后面使用。

注意,上面说的借用对方的内存须要借用方和被借用方的内存类型都同样,都是堆内内存或者都是堆外内存,不存在堆内内存不够去借用堆外内存的空间。

4. Task内存申请流程

为了更好地使用使用内存,Executor 内运行的 Task 之间共享着 Execution 内存。具体的,Spark 内部维护了一个 HashMap 用于记录每一个 Task 占用的内存:

  • 当 Task 须要在 Execution 内存区域申请 numBytes 内存,其先判断 HashMap 里面是否维护着这个 Task 的内存使用状况,若是没有,则将这个 Task 内存使用置为0,而且以 TaskId 为 key,内存使用为 value 加入到 HashMap 里面。
  • 以后为这个 Task 申请 numBytes 内存,若是 Execution 内存区域正好有大于 numBytes 的空闲内存,则在 HashMap 里面将当前 Task 使用的内存加上 numBytes,而后返回;若是当前 Execution 内存区域没法申请到每一个 Task 最小可申请的内存,则当前 Task 被阻塞,直到有其余任务释放了足够的执行内存,该任务才能够被唤醒。
  • 每一个 Task 可使用 Execution 内存大小范围为 1/2N ~ 1/N,其中 N 为当前 Executor 内正在运行的 Task 个数。
  • 一个 Task 可以运行必须申请到最小内存为 (1/2N * Execution 内存);当 N = 1 的时候,Task 可使用所有的 Execution 内存。好比若是 Execution 内存大小为 10GB,当前 Executor 内正在运行的 Task 个数为5,则该 Task 能够申请的内存范围为 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB的范围。

5. 内存分配示例

为了更好的理解上面堆内内存和堆外内存的使用状况,这里给出一个简单的例子

5.1 只用了堆内内存

如今咱们提交的 Spark 做业关于内存的配置以下:

--executor-memory 18g

因为没有设置 spark.memory.fraction 和 spark.memory.storageFraction 参数,咱们能够看到 Spark UI 关于 Storage Memory 的显示以下:

 

上图很清楚地看到 Storage Memory 的可用内存是 10.1GB,这个数是咋来的呢?根据前面的规则,咱们能够得出如下的计算:

systemMemory = spark.executor.memory
reservedMemory = 300MB
usableMemory = systemMemory - reservedMemory
 
StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction

若是咱们把数据代进去,得出如下的结果:

systemMemory = 18Gb = 19327352832 字节
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 19327352832 - 314572800 = 19012780032
 
StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction
= 19012780032 * 0.6 * 0.5 = 5703834009.6 = 5.312109375GB

不对啊,和上面的 10.1GB 对不上啊。为何呢?这是由于 Spark UI 上面显示的 Storage Memory 可用内存其实等于 Execution 内存和 Storage 内存之和,也就是 usableMemory * spark.memory.fraction:

StorageMemory= usableMemory * spark.memory.fraction
= 19012780032 * 0.6 = 11407668019.2 = 10.62421GB

仍是不对,这是由于咱们虽然设置了 –executor-memory 18g,可是 Spark 的 Executor 端经过 Runtime.getRuntime.maxMemory 拿到的内存其实没这么大,只有 17179869184 字节,因此 systemMemory = 17179869184,而后计算的数据以下:

systemMemory = 17179869184 字节
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
 
StorageMemory= usableMemory * spark.memory.fraction
= 16865296384 * 0.6 = 9.42421875 GB

咱们经过将上面的 16865296384 * 0.6 字节除于 1024 * 1024 * 1024 转换成 9.42421875 GB,和 UI 上显示的仍是对不上,这是由于 Spark UI 是经过除于 1000 * 1000 * 1000 将字节转换成 GB,以下:

systemMemory = 17179869184 字节
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
 
StorageMemory= usableMemory * spark.memory.fraction
= 16865296384 * 0.6 字节 = 16865296384 * 0.6 / (1000 * 1000 * 1000) = 10.1GB

如今终于对上了。

具体将字节转换成 GB 的计算逻辑以下(core 模块下面的 /core/src/main/resources/org/apache/spark/ui/static/utils.js):

function formatBytes(bytes, type) {
    if (type !== 'display') return bytes;
    if (bytes == 0) return '0.0 B';
    var k = 1000;
    var dm = 1;
    var sizes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];
    var i = Math.floor(Math.log(bytes) / Math.log(k));
    return parseFloat((bytes / Math.pow(k, i)).toFixed(dm)) + ' ' + sizes[i];
}

咱们设置了 –executor-memory 18g,可是 Spark 的 Executor 端经过 Runtime.getRuntime.maxMemory 拿到的内存其实没这么大,只有 17179869184 字节,这个数据是怎么计算的?
Runtime.getRuntime.maxMemory 是程序可以使用的最大内存,其值会比实际配置的执行器内存的值小。这是由于内存分配池的堆部分分为 Eden,Survivor 和 Tenured 三部分空间,而这里面一共包含了两个 Survivor 区域,而这两个 Survivor 区域在任什么时候候咱们只能用到其中一个,因此咱们可使用下面的公式进行描述:

ExecutorMemory = Eden + 2 * Survivor + Tenured
Runtime.getRuntime.maxMemory = Eden + Survivor + Tenured

上面的 17179869184 字节可能由于你的 GC 配置不同获得的数据不同,可是上面的计算公式是同样的。

5.2 堆内内存+堆外内存

如今若是咱们启用了堆外内存,状况咋样呢?咱们的内存相关配置以下:

spark.executor.memory           18g
spark.memory.offHeap.enabled    true
spark.memory.offHeap.size       10737418240

从上面能够看出,堆外内存为 10GB,如今 Spark UI 上面显示的 Storage Memory 可用内存为 20.9GB,以下:

其实 Spark UI 上面显示的 Storage Memory 可用内存等于堆内内存和堆外内存之和,计算公式以下:

堆内:

systemMemory = 17179869184 字节
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
 
totalOnHeapStorageMemory = usableMemory * spark.memory.fraction
= 16865296384 * 0.6 = 10119177830

堆外

totalOffHeapStorageMemory = spark.memory.offHeap.size = 10737418240

StorageMemory = totalOnHeapStorageMemory + totalOffHeapStorageMemory
= (10119177830 + 10737418240) 字节
= (20856596070 / (1000 * 1000 * 1000)) GB
= 20.9 GB

几个问题

1. 再也不细分unroll,统一为storage

MemoryManager在storage内存中细分了unroll,静态内存管理的实现划分了unroll这部份内存,并设置了比例。统一内存管理再也不细分unroll,统一为storage。

2. 为何设置300M预留内存

统一内存管理最第一版本other这部份内存没有固定值300M设置,而是和静态内存管理类似,设置的百分比,最第一版本占25%。百分比设置在实际使用中出现了问题,若给定的内存较低时,例如1G,会致使OOM,具体讨论参考这里Make unified memory management work with small heaps,所以,other这部份内存作了修改,先划出300M内存。

spark.memory.fraction由0.75 降至 0.6

spark.memory.fraction最第一版本的值是0.75,不少分析统一内存管理这块的文章也是这么介绍的,一样的,在使用中发现这个值设置的偏高,致使了gc时间过长,spark 2.0版本将其调整为0.6,详细谈论参见Reduce spark.memory.fraction default to avoid overrunning old gen in JVM default config。

总结

以下表:

内存类别 区域划分 管理方式 优缺点
on-heap – Execution Memory
– Storage Memory
– User Memory
– Reserved Memory
使用JVM管理  
off-heap – Execution Memory
– Storage Memory
手动管理,不通过JVM 能够避免频繁的 GC 可是必须本身编写内存申请和释放的逻辑
相关文章
相关标签/搜索