spark学习(五):shuffle以及内存管理机制

目录

1. shuffle详解

1.1 那么到底什么时shufffle?

reduceByKey的含义?

问题:

如何聚合?

1.2 Shuffle分为shuffle和sortShuffle

1.2.1 shuffle普通机制

1.2.2 shuffle合并机制

1.2.3 SortShuffle普通运行机制

1.3 shuffle文件寻址

1.4 shuffle调优

2. spark的内存管理机制

2.1 静态内存管理机制

2.2 统一内存管理机制

2.3 spark关于内存分配的参数


1. shuffle详解

之前说RDD宽窄依赖,划分stage时说到一个RDD是否有多个子RDD判断其是窄依赖还是宽依赖,这其中宽依赖:一个RDD有多个子RDD,这个过程中有shuffle过程;

Stage中pipeline计算模型时提到数据什么时候落地:shuffle或者checkpoint

1.1 那么到底什么时shufffle?

我们之前说到reduceByKey会有shuffle过程,那我们从reduceByKey为例开始

reduceByKey的含义?

reduceByKey会将一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是<key, value>的形式,这样每一个key对应一个聚合起来的value

问题:

每一个key对应的value不一定都在一个partition中,也不太可能在同一个节点上,因为RDD是分布式的弹性的数据集,他的partition很可能分布在各个节点上

如何聚合?

  • Shuffle Write:上一个stage的每个map task就必须保证将自己处理的当前分区中的数据相同的key写入一个分区文件中,可能会写入多个不同的分区文件
  • Shuffle Read:reduce task就会从上一个stage的所有task所在的机器上寻找属于自己的那些分区文件,这样就可以保证每一个key多对应的value都会汇聚到同一个节点上去处理和聚合

1.2 Shuffle分为shuffle和sortShuffle

1.2.1 shuffle普通机制

shuffle 普通机制运行原理图

一个executor中的每个task都有属于自己的buffer缓冲区

执行流程

  1. Map task 处理完数据后,将结果写入到buffer缓冲区(与reduce task数量一致),每个buffer默认32k
  2. 满了之后溢写到磁盘,每个buffer对应一个磁盘小文件。
  3. Reduce task到不同的节点去拉取数据自己的数据

产生的小文件的个数:M(map task) * R(reduce task )

问题:

产生磁盘小文件太多

  1. 写磁盘文件的对象多
  2. 拉取数据读磁盘文件对象多
  3. 创建对象多,容易造成gc,gc还不满足内存使用,就会OOM

OOM问题:

  1. Driver端回收RDD数据
  2. Executor 端创建对象非常多,可能会有OOM(0.2 task内存)
  3. Executor 端拉取shuffle数据,如果5个task一次拉取的数据量在Executor0.2的shuffle内存中放不下
  4. Executor端对RDD进行缓存或者广播变量的RDD数据量比较大(0.6内存)

怎么优化解决问题?

1.2.2 shuffle合并机制

优化后的HashShuffleManager 运行原理图

一个executor中的task共享一个buffer缓冲区

产生的小文件的个数:C(core) * R(reduce task)

其他的和普通机制一样

1.2.3 SortShuffle普通运行机制

SortShuffleManager普通运行机制原理图

 执行流程:

  1. map task 将处理的结果写入一个5M的内存结构中
  2. SortShuffle中会估算这个内存结构大小,当下一次结果放不下时,会申请2*估计-当前
  3. 如果申请的到内存,继续往数据结构中写数据,如果申请不到,溢写磁盘,每批次是1万条溢写,溢写过程中会有排序。
  4. 溢写的数据在磁盘上最终形成两个文件:一个索引文件一个数据文件
  5. reduce 拉取数据首先解析索引文件,再去拉取数据

产生的小文件的个数 2*M(map task)

SortShuffleManager bypass运行机制原理图

产生的小文件个数 2 * M(map task)

与普通运行机制相比,少了排序功能;

Bypass运行机制的处罚条件:当Shuffle reduce task个数小于spark.shuffle.sort.bypassMergeThreshold (默认200)参数的值,会开启bypass机制。

1.3 shuffle文件寻址

  1. map task处理完的数据,将结果和数据位置封装到MapStatus对象中,通过MapOutputTrackerWorker汇报给Driver中的MapOutputTrackerMaster。Driver中掌握了数据位置。
  2. reduce  端处理数据,首先向本进程中的MapOutputTrackerWorker要磁盘文件位置,再向Driver中的MapOutputTrackerMaster要磁盘数据位置,Driver返回磁盘数据位置。   
  3. reduce 拿到数据位置之后,通过BlockManager中的ConnectionManager连接数据所在的节点,连接上之后,通过BlockManager中的BlockTransferService拉取数据         
  4. BlockTransferService拉取数据默认启动5个task,这5个task默认一次拉取的数据量不能超过48M。
  5. 拉取过来的数据放在Executor端的shuffle聚合内存中(spark.shuffle.memeoryFraction 0.2)。
  6. 如果5个task一次拉取的数据放不到shuffle内存中会有OOM,如果放下一次,不会有OOM,以后放不下的会放磁盘。

shuffle过程中的一些参数:

参照官网:http://spark.apache.org/docs/2.2.0/configuration.html#shuffle-behavior

MapOutputTracker:磁盘小文件
            MapOutputTrackerMaster(Driver端)                
            MapOutputTrackerWorker(Executor端)
BlockManager:块管理者
            BlockManagerMaster(Driver端):
                DiskStore:管理磁盘数据
                MemoryStroe:管理内存数据
                ConnectionManager:负责连接其他BlockManager
                BlockTransferService:负责拉取数据
            BlockManagerSlaves(Executor端):
                DiskStore:管理磁盘数据
                MemoryStroe:管理内存数据
                ConnectionManager:负责连接其他BlockManager
                BlockTransferService:负责拉取数据
寻址过程:

1.4 shuffle调优

shuffle参数调优 提取码:hym5 

或者访问该博客:https://www.cnblogs.com/arachis/p/Spark_Shuffle.html

2. spark的内存管理机制

2.1 静态内存管理机制

在spark1.6之前,使用静态管理机制

图片来源于:http://www.javashuo.com/article/p-vrnktyar-de.html

静态管理机制--堆内

静态内存管理图示——堆外

2.2 统一内存管理机制

在spark1.6之后,使用统一内存管理机制

统一内存管理--堆内

统一内存管理--堆外

内存分为3部分:

storage:缓存  60% * 50%

execution:shuffle,join等运行  60% * 50%

other: spark内部的数据运行; 保护oom  40%

动态占用机制

  1. 如果双方的内存都是要完了,直接溢出磁盘。
  2. Storage占用的execution的内存,可以被Execution剔除。
  3. execution占用了Storage的内存,不能被剔除,直到exection占用的内存释放掉。

collect方法,如果数据量太大,直接报错OOM。

2.3 spark关于内存分配的参数

官方网站:http://spark.apache.org/docs/2.2.0/configuration.html#memory-management