Spark内存模型详解

1 堆内和堆外内存规划

Spark执行器(Executor)的内存管理创建在 JVM 的内存管理之上,Spark 对 JVM 的空间(OnHeap+Off-heap)进行了更为详细的分配,以充分利用内存。同时,Spark 引入了Off-heap 内存模式,使之能够直接在工做节点的系统内存中开辟空间,进一步优化了内存的使用(能够理解为是独立于JVM托管的Heap以外利用c-style的malloc从os分配到的memory。因为再也不由JVM托管,经过高效的内存管理,能够避免JVM object overhead和Garbage collection的开销)。sql

运行于Executor中的Task同时可以使用JVM(OnHeap+Off-heap)和Off-heap两种模式的内存。shell

  • JVM OnHeap内存:大小由”--executor-memory”(即 spark.executor.memory)参数指定。Executor中运行的并发任务共享JVM堆内内存。
  • JVM OffHeap内存:大小由”spark.yarn.executor.memoryOverhead”参数指定,主要用于JVM自身,字符串, NIO Buffer等开销。
  • Off-heap模式:默认状况下Off-heap模式的内存并不启用,能够经过”spark.memory.offHeap.enabled”参数开启,并由spark.memory.offHeap.size指定堆外内存的大小(占用的空间划归JVM OffHeap内存)。

---备注:咱们如今未启用Off-heap模式的内存,所以,只介绍JVM模式的Executor内存管理。如下出现有Off-heap均为JVM中区别于Heap的内存。缓存

---OffHeap内存:存储通过序列化的二进制数据,Spark 能够直接操做系统堆外内存,减小了没必要要的内存开销,以及频繁的 GC 扫描和回收,提高了处理性能。堆外内存能够被精确地申请和释放,并且序列化的数据占用的空间能够被精确计算,因此相比堆内内存来讲下降了管理的难度,也下降了偏差。并发

2 Executor内存划分

2.1 Executor可用内存总量

                                                                                     Executor内存模型性能

如上图所示,Yarn集群管理模式中,Spark 以Executor Container的形式在NodeManager中运行,其可以使用的内存上限由“yarn.scheduler.maximum-allocation-mb” 指定, ---咱们能够称其为MonitorMemory测试

如前所述,Executor的内存由Heap内存和设定的Off-heap内存组成。优化

Heap: 由“spark.executor.memory” 指定, 如下称为ExecutorMemory
Off-heap: 由 “spark.yarn.executor.memoryOverhead” 指定, 如下称为MemoryOverheadspa

所以, 对现有Yarn集群,存在:操作系统

ExecutorMemory + MemoryOverhead <= MonitorMemory线程

若应用提交之时,指定的 ExecutorMemory与MemoryOverhead 之和大于 MonitorMemory,则会致使Executor申请失败;若运行过程当中,实际使用内存超过上限阈值,Executor进程会被Yarn终止掉(kill)。

2.2 Heap

"spark.executor.memory"指定的内存为JVM最大分配的堆内存("-xmx"),Spark为了更高效的使用这部份内存,对这部份内存进行了细分,下图(备注:此图源于互联网)对基于spark2(1.6+)对堆内存分配比例进行了描述:

                                                                                                                                                                                                               Heap内存模型

 

其中:

  1. Reserved Memory 保留内存,系统默认值为300,通常无需改动,不用关心此部份内存。 但若是Executor分配的内存小于 1.5 * 300 = 450M时,Executor将没法执行。
  2. Storage Memory 存储内存,用于存放广播数据及RDD缓存数据。由上图可知,Spark 2+中,初始状态下,Storage及Execution Memory均约占系统总内存的30%(1 * 0.6 * 0.5 = 0.3)。在UnifiedMemory管理中,这两部份内存能够相互借用,为了方便描述,咱们使用storageRegionSize来表示“spark.storage.storageFraction”。当计算内存不足时,能够改造storageRegionSize中未使用部分,且StorageMemory须要存储内存时也不可被抢占; 若实际StorageMemory使用量超过storageRegionSize,那么当计算内存不足时,能够改造(StorageMemory – storageRegionSize)部分,而storageRegionSize部分不可被抢占。

2.3 Java Off-heap (Memory Overhead)

Executor 中,另外一块内存为由“spark.yarn.executor.memoryOverhead”指定的Java Off-heap内存,此部份内存主要是建立Java Object时的额外开销,Native方法调用,线程栈, NIO Buffer等开销(Driect Buffer)。此部分为用户代码及Spark 不可操做的内存,不足时可经过调整参数解决, 无需过多关注。 具体须要调整的场景参见本文第4节。

3 任务内存管理(Task Memory Manager)

Executor中任务以线程的方式执行,各线程共享JVM的资源,任务之间的内存资源没有强隔离(任务没有专用的Heap区域)。所以,可能会出现这样的状况:先到达的任务可能占用较大的内存,然后到的任务因得不到足够的内存而挂起。

在Spark任务内存管理中,使用HashMap存储任务与其消耗内存的映射关系。每一个任务可占用的内存大小为潜在可以使用计算内存的1/2n – 1/n , 当剩余内存为小于1/2n时,任务将被挂起,直至有其余任务释放执行内存,而知足内存下限1/2n,任务被唤醒,其中n为当前Executor中活跃的任务数。

任务执行过程当中,若是须要更多的内存,则会进行申请,若是,存在空闲内存,则自动扩容成功,不然,将抛出OutOffMemroyError。

---备注:潜在可以使用计算内存为:初始计算内存+可抢占存储内存

4 内存调整方案

Executor中可同时运行的任务数由Executor分配的CPU的核数N 和每一个任务须要的CPU核心数C决定。其中:

  • N = spark.executor.cores
  • C = spark.task.cpus

Executor的最大任务并行度可表示为 ==TP = N / C==. 其中,C值与应用类型有关,大部分应用使用默认值1便可,所以,影响Executor中最大任务并行度的主要因素是N.

依据Task的内存使用特征,前文所述的Executor内存模型能够简单抽象为下图所示模型:

                                     Executor内存简化模型

 其中,Executor 向yarn申请的总内存可表示为: M = M1 + M2 

4.1 错误类型及调整方案

4.1.1 Executor OOM类错误 (错误代码 13七、143等)

该类错误通常是因为Heap(M2)已达上限,Task须要更多的内存,而又得不到足够的内存而致使。所以,解决方案要从增长每一个Task的内存使用量,知足任务需求 或 下降单个Task的内存消耗量,从而使现有内存能够知足任务运行需求两个角度出发。所以:

4.1.1.1 增长单个task的内存使用量
  • 增长最大Heap值, 即 上图中M2 的值,使每一个Task可以使用内存增长。
  • 下降Executor的可用Core的数量 N , 使Executor中同时运行的任务数减小,在总资源不变的状况下,使每一个Task得到的内存相对增长。
4.1.1.2 下降单个Task的内存消耗量

下降单个Task的内存消耗量可从配制方式和调整应用逻辑两个层面进行优化:

  • 配制方式:

          减小每一个Task处理的数据量,可下降Task的内存开销,在Spark中,每一个partition对应一个处理任务Task,所以,在数据总量必定的前提下,能够经过增长partition数量的方式来减小每一个Task处理的数据量,从而下降Task的内存开销。针对不一样的Spark应用类型,存在不一样的partition调整参数以下:

  • P = spark.default.parallism (非SQL应用)
  • P = spark.sql.shuffle.partition (SQL 应用)
  • P = mapred.reduce.tasks (HiveOnSpark)

经过增长P的值,可在必定程度上使Task现有内存知足任务运行
注: 当调整一个参数不能解决问题时,上述方案应进行协同调整

---备注:若应用shuffle阶段 spill严重,则能够经过调整“spark.shuffle.spill.numElementsForceSpillThreshold”的值,来限制spill使用的内存大小,好比设置(2000000),该值太大不足以解决OOM问题,若过小,则spill会太频繁,影响集群性能,所以,要依据负载类型进行合理伸缩(此处,可设法引入动态伸缩机制,待后续处理)。

  •  调整应用逻辑:

           Executor OOM 通常发生Shuffle阶段,该阶段需求计算内存较大,且应用逻辑对内存需求有较大影响,下面举例就行说明:       

  • groupByKey 转换为 reduceByKey

         通常状况下,groupByKey能实现的功能使用reduceByKey都可实现,而ReduceByKey存在Map端的合并,能够有效减小传输带宽占用及Reduce端内存消耗。

                                                                                                   选择合适的算子

  • data skew 预处理    

          Data Skew是指任务间处理的数据量存大较大的差别。
          如左图所示,key 为010的数据较多,当发生shuffle时,010所在分区存在大量数据,不只拖慢Job执行(Job的执行时间由最后完成的任务决定)。 并且致使010对应Task内存消耗过多,可能致使OOM. 而右图,通过预处理(加盐,此处仅为举例说明问题,解决方法不限于此)能够有效减小Data 

          Skew致使 的问题

 

                                                                                                         Data Skew预处理

---注:上述举例仅为说明调整应用逻辑能够在必定程序上解决OOM问题,解决方法不限于上述举例

4.1.2 Beyond…… memory, killed by yarn

出现该问题缘由是因为实际使用内存上限超过申请的内存上限而被Yarn终止掉了, 首先说明Yarn中Container内存监控机制:

  • Container进程的内存使用量:以Container进程为根的进程树中全部进程的内存使用总量。
  • Container被杀死的判断依据:进程树总内存(物理内存或虚拟内存)使用量超过向Yarn申请的内存上限值,则认为该Container使用内存超量,能够被“杀死”。

所以,对该异常的分析要从是否存在子进程两个角度出发。

a 不存在子进程

根据Container进程杀死的条件可知,在不存在子进程时,出现killed by yarn问题是于由Executor(JVM)进程自身内存超过向Yarn申请的内存总量M 所致。因为未出现4.1.1节所述的OOM异常,所以可断定其为 M1 (Overhead)不足, 依据Yarn内存使用状况有以下两种方案:

  • 若是,M未达到Yarn单个Container容许的上限时,可仅增长M1 ,从而增长M;若是,M达到Yarn单个Container容许的上限时,增长 M1, 下降 M2.

操做方法:在提交脚本中添加 --conf spark.yarn.executor.memoryOverhead=3072(或更大的值,好比4096等) --conf spark.executor.memory = 10g 或 更小的值,注意两者之各要小于Container监控内存量,不然伸请资源将被yarn拒绝。

  • 减小可用的Core的数量 N, 使并行任务数减小,从而减小Overhead开销

操做方法:在提交脚本中添加 --executor-cores=3 <比原来小的值> 或 --conf spark.executor.cores=3 <比原来小的值>

b 存在子进程

Spark 应用中Container以Executor(JVM进程)的形式存在,所以根进程为Executor对应的进程, 而Spark 应用向Yarn申请的总资源M = M1  + M 2 , 都是以Executor(JVM)进程(非进程树)可用资源的名义申请的。申请的资源并不是一次性全量分配给JVM使用,而是先为JVM分配初始值,随后内存不足时再按比率不断进行扩容,直致达到Container监控的最大内存使用量M 。当Executor中启动了子进程(调用shell等)时,子进程占用的内存(记为 S) 就被加入Container进程树,此时就会影响Executor实际可以使用内存资源(Executor进程实际可以使用资源为:M - S),然而启动JVM时设置的可用最大资源为M, 且JVM进程并不会感知Container中留给本身的使用量已被子进程占用,所以,当JVM使用量达到 M - S,还会继续开劈内存空间,这就会致使Executor进程树使用的总内存量大于M 而被Yarn 杀死。

典形场景有:PySpark(Spark已作内存限制,通常不会占用过大内存)、自定义Shell调用。其解决方案:

PySpark场景:

  • 若是,M未达到Yarn单个Container容许的上限时,可仅增长M1 ,从而增长M;若是,M达到Yarn单个Container容许的上限时,增长 M1, 下降 M2.
  • 减小可用的Core的数量 N, 使并行任务数减小,从而减小Overhead开销

自定义Shell 场景:(OverHead不足为假象)

  • 调整子进程可用内存量,(经过单机测试,内存控制在Container监控内存之内,且为Spark保留内存等留有空间)。操做方法同4.1.2<1>中所述
相关文章
相关标签/搜索