目录
1.2 Shuffle分为shuffle和sortShuffle
之前说RDD宽窄依赖,划分stage时说到一个RDD是否有多个子RDD判断其是窄依赖还是宽依赖,这其中宽依赖:一个RDD有多个子RDD,这个过程中有shuffle过程;
Stage中pipeline计算模型时提到数据什么时候落地:shuffle或者checkpoint
我们之前说到reduceByKey会有shuffle过程,那我们从reduceByKey为例开始
reduceByKey会将一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是<key, value>的形式,这样每一个key对应一个聚合起来的value
每一个key对应的value不一定都在一个partition中,也不太可能在同一个节点上,因为RDD是分布式的弹性的数据集,他的partition很可能分布在各个节点上
shuffle 普通机制运行原理图
一个executor中的每个task都有属于自己的buffer缓冲区
执行流程
产生的小文件的个数:M(map task) * R(reduce task )
问题:
产生磁盘小文件太多
- 写磁盘文件的对象多
- 拉取数据读磁盘文件对象多
- 创建对象多,容易造成gc,gc还不满足内存使用,就会OOM
OOM问题:
- Driver端回收RDD数据
- Executor 端创建对象非常多,可能会有OOM(0.2 task内存)
- Executor 端拉取shuffle数据,如果5个task一次拉取的数据量在Executor0.2的shuffle内存中放不下
- Executor端对RDD进行缓存或者广播变量的RDD数据量比较大(0.6内存)
怎么优化解决问题?
优化后的HashShuffleManager 运行原理图
一个executor中的task共享一个buffer缓冲区
产生的小文件的个数:C(core) * R(reduce task)
其他的和普通机制一样
SortShuffleManager普通运行机制原理图
执行流程:
产生的小文件的个数 2*M(map task)
SortShuffleManager bypass运行机制原理图
产生的小文件个数 2 * M(map task)
与普通运行机制相比,少了排序功能;
Bypass运行机制的处罚条件:当Shuffle reduce task个数小于spark.shuffle.sort.bypassMergeThreshold (默认200)参数的值,会开启bypass机制。
shuffle过程中的一些参数:
参照官网:http://spark.apache.org/docs/2.2.0/configuration.html#shuffle-behavior
MapOutputTracker:磁盘小文件
MapOutputTrackerMaster(Driver端)
MapOutputTrackerWorker(Executor端)
BlockManager:块管理者
BlockManagerMaster(Driver端):
DiskStore:管理磁盘数据
MemoryStroe:管理内存数据
ConnectionManager:负责连接其他BlockManager
BlockTransferService:负责拉取数据
BlockManagerSlaves(Executor端):
DiskStore:管理磁盘数据
MemoryStroe:管理内存数据
ConnectionManager:负责连接其他BlockManager
BlockTransferService:负责拉取数据
寻址过程:
shuffle参数调优 提取码:hym5
或者访问该博客:https://www.cnblogs.com/arachis/p/Spark_Shuffle.html
在spark1.6之前,使用静态管理机制
图片来源于:http://www.javashuo.com/article/p-vrnktyar-de.html
静态管理机制--堆内
静态内存管理图示——堆外
在spark1.6之后,使用统一内存管理机制
统一内存管理--堆内
统一内存管理--堆外
内存分为3部分:
storage:缓存 60% * 50%
execution:shuffle,join等运行 60% * 50%
other: spark内部的数据运行; 保护oom 40%
动态占用机制
collect方法,如果数据量太大,直接报错OOM。
官方网站:http://spark.apache.org/docs/2.2.0/configuration.html#memory-management