Spark内核泛指Spark的核心运行机制,包括Spark核心组件的运行机制、Spark任务调度机制、Spark内存管理机制、Spark核心功能的运行原理等,熟练掌握Spark内核原理,可以帮助咱们更好地完成Spark代码设计,并可以帮助咱们准确锁定项目运行过程当中出现的问题的症结所在。html
Spark驱动器节点,用于执行Spark任务中的main方法,负责实际代码的执行工做。Driver在Spark做业执行时主要负责:node
Spark Executor节点是一个JVM进程,负责在 Spark 做业中运行具体任务,任务彼此之间相互独立。Spark 应用启动时,Executor节点被同时启动,而且始终伴随着整个 Spark 应用的生命周期而存在。若是有Executor节点发生了故障或崩溃,Spark 应用也能够继续执行,会将出错节点上的任务调度到其余Executor节点上继续运行。面试
Executor有两个核心功能:算法
1. 负责运行组成Spark应用的任务,并将结果返回给Driver进程;编程
2. 它们经过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在Executor进程内的,所以任务能够在运行时充分利用缓存数据加速运算。json
图1-1 Spark核心运行流程api
图1-1为Spark通用运行流程,不论Spark以何种模式进行部署,任务提交后,都会先启动Driver进程,随后Driver进程向集群管理器注册应用程序,以后集群管理器根据此任务的配置文件分配Executor并启动,当Driver所需的资源所有知足后,Driver开始执行main函数,Spark查询为懒执行,当执行到action算子时开始反向推算,根据宽依赖进行stage的划分,随后每个stage对应一个taskset,taskset中有多个task,根据本地化原则,task会被分发到指定的Executor去执行,在任务执行的过程当中,Executor也会不断与Driver进行通讯,报告任务运行状况。数组
Spark支持3种集群管理器(Cluster Manager),分别为:缓存
实际上,除了上述这些通用的集群管理器外,Spark内部也提供了一些方便用户测试和学习的简单集群部署模式。因为在实际工厂环境下使用的绝大多数的集群管理器是Hadoop YARN,所以咱们关注的重点是Hadoop YARN模式下的Spark集群部署。服务器
Spark的运行模式取决于传递给SparkContext的MASTER环境变量的值,个别模式还须要辅助的程序接口来配合使用,目前支持的Master字符串及URL包括:
表2-1 Spark运行模式配置
Master URL |
Meaning |
local |
在本地运行,只有一个工做进程,无并行计算能力。 |
local[K] |
在本地运行,有K个工做进程,一般设置K为机器的CPU核心数量。 |
local[*] |
在本地运行,工做进程数量等于机器的CPU核心数量。 |
spark://HOST:PORT |
以Standalone模式运行,这是Spark自身提供的集群运行模式,默认端口号: 7077。详细文档见:Spark standalone cluster。 |
mesos://HOST:PORT |
在Mesos集群上运行,Driver进程和Worker进程运行在Mesos集群上,部署模式必须使用固定值:--deploy-mode cluster。详细文档见:MesosClusterDispatcher. |
yarn-client |
在Yarn集群上运行,Driver进程在本地,Work进程在Yarn集群上,部署模式必须使用固定值:--deploy-mode client。Yarn集群地址必须在HADOOP_CONF_DIR or YARN_CONF_DIR变量里定义。 |
yarn-cluster |
在Yarn集群上运行,Driver进程在Yarn集群上,Work进程也在Yarn集群上,部署模式必须使用固定值:--deploy-mode cluster。Yarn集群地址必须在HADOOP_CONF_DIR or YARN_CONF_DIR变量里定义。 |
用户在提交任务给Spark处理时,如下两个参数共同决定了Spark的运行方式。
· –master MASTER_URL :决定了Spark任务提交给哪一种集群处理。
· –deploy-mode DEPLOY_MODE:决定了Driver的运行方式,可选值为Client或者Cluster。
Standalone集群有四个重要组成部分,分别是:
1) Driver:是一个进程,咱们编写的Spark应用程序就运行在Driver上,由Driver进程执行;
2) Master:是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责;
3) Worker:是一个进程,一个Worker运行在集群中的一台服务器上,主要负责两个职责,一个是用本身的内存存储RDD的某个或某些partition;另外一个是启动其余进程和线程(Executor),对RDD上的partition进行并行的处理和计算。
4) Executor:是一个进程,一个Worker上能够运行多个Executor,Executor经过启动多个线程(task)来执行对RDD的partition进行并行计算,也就是执行咱们对RDD定义的例如map、flatMap、reduce等算子操做。
图2-1 Standalone Client模式
在Standalone Client模式下,Driver在任务提交的本地机器上运行,Driver启动后向Master注册应用程序,Master根据submit脚本的资源需求找到内部资源至少能够启动一个Executor的全部Worker,而后在这些Worker之间分配Executor,Worker上的Executor启动后会向Driver反向注册,全部的Executor注册完成后,Driver开始执行main函数,以后执行到Action算子时,开始划分stage,每一个stage生成对应的taskSet,以后将task分发到各个Executor上执行。
图2-2 Standalone Cluster模式
在Standalone Cluster模式下,任务提交后,Master会找到一个Worker启动Driver进程, Driver启动后向Master注册应用程序,Master根据submit脚本的资源需求找到内部资源至少能够启动一个Executor的全部Worker,而后在这些Worker之间分配Executor,Worker上的Executor启动后会向Driver反向注册,全部的Executor注册完成后,Driver开始执行main函数,以后执行到Action算子时,开始划分stage,每一个stage生成对应的taskSet,以后将task分发到各个Executor上执行。
注意,Standalone的两种模式下(client/Cluster),Master在接到Driver注册Spark应用程序的请求后,会获取其所管理的剩余资源可以启动一个Executor的全部Worker,而后在这些Worker之间分发Executor,此时的分发只考虑Worker上的资源是否足够使用,直到当前应用程序所需的全部Executor都分配完毕,Executor反向注册完毕后,Driver开始执行main程序。
图2-3 YARN Client模式
在YARN Client模式下,Driver在任务提交的本地机器上运行,Driver启动后会和ResourceManager通信申请启动ApplicationMaster,随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster的功能至关于一个ExecutorLaucher,只负责向ResourceManager申请Executor内存。
ResourceManager接到ApplicationMaster的资源申请后会分配container,而后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程,Executor进程启动后会向Driver反向注册,Executor所有注册完成后Driver开始执行main函数,以后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每一个stage生成对应的taskSet,以后将task分发到各个Executor上执行。
图2-4 YARN Cluster模式
在YARN Cluster模式下,任务提交后会和ResourceManager通信申请启动ApplicationMaster,随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。
Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,而后在合适的NodeManager上启动Executor进程,Executor进程启动后会向Driver反向注册,Executor所有注册完成后Driver开始执行main函数,以后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每一个stage生成对应的taskSet,以后将task分发到各个Executor上执行。
Spark2.x版本使用Netty通信框架做为内部通信组件。spark 基于Netty新的rpc框架借鉴了Akka中的设计,它是基于Actor模型,以下图所示:
图4-1 Actor模型
Spark通信框架中各个组件(Client/Master/Worker)能够认为是一个个独立的实体,各个实体之间经过消息来进行通讯。具体各个组件之间的关系图以下:
图4-2 Spark通信架构
Endpoint(Client/Master/Worker)有1个InBox和N个OutBox(N>=1,N取决于当前Endpoint与多少其余的Endpoint进行通讯,一个与其通信的其余Endpoint对应一个OutBox),Endpoint接收到的消息被写入InBox,发送出去的消息写入OutBox并被发送到其余Endpoint的InBox中。
Spark通讯架构以下图所示:
图 4-3 Spark通信架构
1) RpcEndpoint:RPC端点,Spark针对每一个节点(Client/Master/Worker)都称之为一个Rpc端点,且都实现RpcEndpoint接口,内部根据不一样端点的需求,设计不一样的消息和不一样的业务处理,若是须要发送(询问)则调用Dispatcher;
2) RpcEnv:RPC上下文环境,每一个RPC端点运行时依赖的上下文环境称为RpcEnv;
3) Dispatcher:消息分发器,针对于RPC端点须要发送消息或者从远程RPC接收到的消息,分发至对应的指令收件箱/发件箱。若是指令接收方是本身则存入收件箱,若是指令接收方不是本身,则放入发件箱;
4) Inbox:指令消息收件箱,一个本地RpcEndpoint对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部ReceiverQueue中,另外Dispatcher建立时会启动一个单独线程进行轮询ReceiverQueue,进行收件箱消息消费;
5) RpcEndpointRef:RpcEndpointRef是对远程RpcEndpoint的一个引用。当咱们须要向一个具体的RpcEndpoint发送消息时,通常咱们须要获取到该RpcEndpoint的引用,而后经过该应用发送消息。
6) OutBox:指令消息发件箱,对于当前RpcEndpoint来讲,一个目标RpcEndpoint对应一个发件箱,若是向多个目标RpcEndpoint发送信息,则有多个OutBox。当消息放入Outbox后,紧接着经过TransportClient将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行;
7) RpcAddress:表示远程的RpcEndpointRef的地址,Host + Port。
8) TransportClient:Netty通讯客户端,一个OutBox对应一个TransportClient,TransportClient不断轮询OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer;
9) TransportServer:Netty通讯服务端,一个RpcEndpoint对应一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱;
根据上面的分析,Spark通讯架构的高层视图以下图所示:
图4-4 Spark通讯框架高层视图
在Spark中由SparkContext负责与集群进行通信、资源的申请以及任务的分配和监控等。当Worker节点中的Executor运行完毕Task后,Driver同时负责将SparkContext关闭。一般也可使用SparkContext来表明驱动程序(Driver)。
图4-1 SparkContext与集群交互
SparkContext是用户通往Spark集群的惟一入口,能够用来在Spark集群中建立RDD、累加器和广播变量。SparkContext也是整个Spark应用程序中相当重要的一个对象,能够说是整个Application运行调度的核心(不包括资源调度)。
SparkContext的核心做用是初始化Spark应用程序运行所需的核心组件,包括高层调度器(DAGScheduler)、底层调度器(TaskScheduler)和调度器的通讯终端(SchedulerBackend),同时还会负责Spark程序向Cluster Manager的注册等。
图4-2 SparkContext初始化组件
在实际的编码过程当中,咱们会先建立SparkConf实例,并对SparkConf的属性进行自定义设置,随后,将SparkConf做为SparkContext类的惟一构造参数传入来完成SparkContext实例对象的建立。
SparkContext在实例化的过程当中会初始化DAGScheduler、TaskScheduler和SchedulerBackend,当RDD的action算子触发了做业(Job)后,SparkContext会调用DAGScheduler根据宽窄依赖将Job划分红几个小的阶段(Stage),TaskScheduler会调度每一个Stage的任务(Task),另外,SchedulerBackend负责申请和管理集群为当前Application分配的计算资源(即Executor)。
若是咱们将Spark Application比做汽车,那么SparkContext就是汽车的引擎,而SparkConf就是引擎的配置参数。
下图描述了Spark-On-Yarn模式下在任务调度期间,ApplicationMaster、Driver以及Executor内部模块的交互过程:
图4-2 Spark组件交互过程
Driver初始化SparkContext过程当中,会分别初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,并启动SchedulerBackend以及HeartbeatReceiver。SchedulerBackend经过ApplicationMaster申请资源,并不断从TaskScheduler中拿到合适的Task分发到Executor执行。HeartbeatReceiver负责接收Executor的心跳信息,监控Executor的存活情况,并通知到TaskScheduler。
在工厂环境下,Spark集群的部署方式通常为YARN-Cluster模式,以后的内核分析内容中咱们默认集群的部署方式为YARN-Cluster模式。
在上一章中咱们讲解了Spark YARN-Cluster模式下的任务提交流程,以下图所示:
图5-1 YARN-Cluster任务提交流程
下面的时序图清晰地说明了一个Spark应用程序从提交到运行的完整流程:
图5-2 Spark任务提交时序图
提交一个Spark应用程序,首先经过Client向ResourceManager请求启动一个Application,同时检查是否有足够的资源知足Application的需求,若是资源条件知足,则准备ApplicationMaster的启动上下文,交给ResourceManager,并循环监控Application状态。
当提交的资源队列中有资源时,ResourceManager会在某个NodeManager上启动ApplicationMaster进程,ApplicationMaster会单独启动Driver后台线程,当Driver启动后,ApplicationMaster会经过本地的RPC链接Driver,并开始向ResourceManager申请Container资源运行Executor进程(一个Executor对应与一个Container),当ResourceManager返回Container资源,ApplicationMaster则在对应的Container上启动Executor。
Driver线程主要是初始化SparkContext对象,准备运行所需的上下文,而后一方面保持与ApplicationMaster的RPC链接,经过ApplicationMaster申请资源,另外一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲Executor上。
当ResourceManager向ApplicationMaster返回Container资源时,ApplicationMaster就尝试在对应的Container上启动Executor进程,Executor进程起来后,会向Driver反向注册,注册成功后保持与Driver的心跳,同时等待Driver分发任务,当分发的任务执行完毕后,将任务状态上报给Driver。
从上述时序图可知,Client只负责提交Application并监控Application的状态。对于Spark的任务调度主要是集中在两个方面: 资源申请和任务分发,其主要是经过ApplicationMaster、Driver以及Executor之间来完成。
当Driver起来后,Driver则会根据用户程序逻辑准备任务,并根据Executor资源状况逐步分发任务。在详细阐述任务调度前,首先说明下Spark里的几个概念。一个Spark应用程序包括Job、Stage以及Task三个概念:
l Job是以Action方法为界,遇到一个Action方法则触发一个Job;
l Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle作一次划分;
l Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。
Spark的任务调度整体来讲分两路进行,一路是Stage级的调度,一路是Task级的调度,整体调度流程以下图所示:
图5-3 Spark任务调度概览
Spark RDD经过其Transactions操做,造成了RDD血缘关系图,即DAG,最后经过Action的调用,触发Job并调度执行。DAGScheduler负责Stage级的调度,主要是将job切分红若干Stages,并将每一个Stage打包成TaskSet交给TaskScheduler调度。TaskScheduler负责Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行,调度过程当中SchedulerBackend负责提供可用资源,其中SchedulerBackend有多种实现,分别对接不一样的资源管理系统。
Spark的任务调度是从DAG切割开始,主要是由DAGScheduler来完成。当遇到一个Action操做后就会触发一个Job的计算,并交给DAGScheduler来提交,下图是涉及到Job提交的相关方法调用流程图。
图5-5 Job提交调用栈
Job由最终的RDD和Action方法封装而成,SparkContext将Job交给DAGScheduler提交,它会根据RDD的血缘关系构成的DAG进行切分,将一个Job划分为若干Stages,具体划分策略是,由最终的RDD不断经过依赖回溯判断父依赖是不是宽依赖,即以Shuffle为界,划分Stage,窄依赖的RDD之间被划分到同一个Stage中,能够进行pipeline式的计算,如上图紫色流程部分。划分的Stages分两类,一类叫作ResultStage,为DAG最下游的Stage,由Action方法决定,另外一类叫作ShuffleMapStage,为下游Stage准备数据,下面看一个简单的例子WordCount。
图5-6 WordCount实例
Job由saveAsTextFile触发,该Job由RDD-3和saveAsTextFile方法组成,根据RDD之间的依赖关系从RDD-3开始回溯搜索,直到没有依赖的RDD-0,在回溯搜索过程当中,RDD-3依赖RDD-2,而且是宽依赖,因此在RDD-2和RDD-3之间划分Stage,RDD-3被划到最后一个Stage,即ResultStage中,RDD-2依赖RDD-1,RDD-1依赖RDD-0,这些依赖都是窄依赖,因此将RDD-0、RDD-1和RDD-2划分到同一个Stage,即ShuffleMapStage中,实际执行的时候,数据记录会一鼓作气地执行RDD-0到RDD-2的转化。不难看出,其本质上是一个深度优先搜索算法。
一个Stage是否被提交,须要判断它的父Stage是否执行,只有在父Stage执行完毕才能提交当前Stage,若是一个Stage没有父Stage,那么从该Stage开始提交。Stage提交时会将Task信息(分区信息以及方法等)序列化并被打包成TaskSet交给TaskScheduler,一个Partition对应一个Task,另外一方面TaskScheduler会监控Stage的运行状态,只有Executor丢失或者Task因为Fetch失败才须要从新提交失败的Stage以调度运行失败的任务,其余类型的Task失败会在TaskScheduler的调度过程当中重试。
相对来讲DAGScheduler作的事情较为简单,仅仅是在Stage层面上划分DAG,提交Stage并监控相关状态信息。TaskScheduler则相对较为复杂,下面详细阐述其细节。
Spark Task的调度是由TaskScheduler来完成,由前文可知,DAGScheduler将Stage打包到TaskSet交给TaskScheduler,TaskScheduler会将TaskSet封装为TaskSetManager加入到调度队列中,TaskSetManager结构以下图所示。
图5-7 TaskManager结构
TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务。
TaskScheduler初始化后会启动SchedulerBackend,它负责跟外界打交道,接收Executor的注册信息,并维护Executor的状态,因此说SchedulerBackend是管“粮食”的,同时它在启动后会按期地去“询问”TaskScheduler有没有任务要运行,也就是说,它会按期地“问”TaskScheduler“我有这么余量,你要不要啊”,TaskScheduler在SchedulerBackend“问”它的时候,会从调度队列中按照指定的调度策略选择TaskSetManager去调度运行,大体方法调用流程以下图所示:
图5-9 task调度流程
图3-7中,将TaskSetManager加入rootPool调度池中以后,调用SchedulerBackend的riviveOffers方法给driverEndpoint发送ReviveOffer消息;driverEndpoint收到ReviveOffer消息后调用makeOffers方法,过滤出活跃状态的Executor(这些Executor都是任务启动时反向注册到Driver的Executor),而后将Executor封装成WorkerOffer对象;准备好计算资源(WorkerOffer)后,taskScheduler基于这些资源调用resourceOffer在Executor上分配task。
前面讲到,TaskScheduler会先把DAGScheduler给过来的TaskSet封装成TaskSetManager扔到任务队列里,而后再从任务队列里按照必定的规则把它们取出来在SchedulerBackend给过来的Executor上运行。这个调度过程实际上仍是比较粗粒度的,是面向TaskSetManager的。
TaskScheduler是以树的方式来管理任务队列,树中的节点类型为Schdulable,叶子节点为TaskSetManager,非叶子节点为Pool,下图是它们之间的继承关系。
图5-10 任务队列继承关系
调度队列的层次结构以下图所示:
图5-11 FIFO调度策略内存结构
TaskScheduler支持两种调度策略,一种是FIFO,也是默认的调度策略,另外一种是FAIR。在TaskScheduler初始化过程当中会实例化rootPool,表示树的根节点,是Pool类型。
FIFO调度策略执行步骤以下:
1) 对s1和s2两个Schedulable的优先级(Schedulable类的一个属性,记为priority,值越小,优先级越高);
2) 若是两个Schedulable的优先级相同,则对s1,s2所属的Stage的身份进行标识进行比较(Schedulable类的一个属性,记为priority,值越小,优先级越高);
3) 若是比较的结果小于0,则优先调度s1,不然优先调度s2。
图5-12 FIFO调度策略内存结构
FAIR调度策略的树结构以下图所示:
图5-13 FAIR调度策略内存结构
FAIR模式中有一个rootPool和多个子Pool,各个子Pool中存储着全部待分配的TaskSetMagager。
能够经过在Properties中指定spark.scheduler.pool属性,指定调度池中的某个调度池做为TaskSetManager的父调度池,若是根调度池不存在此属性值对应的调度池,会建立以此属性值为名称的调度池做为TaskSetManager的父调度池,并将此调度池做为根调度池的子调度池。
在FAIR模式中,须要先对子Pool进行排序,再对子Pool里面的TaskSetMagager进行排序,由于Pool和TaskSetMagager都继承了Schedulable特质,所以使用相同的排序算法。
排序过程的比较是基于Fair-share来比较的,每一个要排序的对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,比较时会综合考量runningTasks值,minShare值以及weight值。
注意,minShare、weight的值均在公平调度配置文件fairscheduler.xml中被指定,调度池在构建阶段会读取此文件的相关配置。
1) 若是A对象的runningTasks大于它的minShare,B对象的runningTasks小于它的minShare,那么B排在A前面;(runningTasks比minShare小的先执行)
2) 若是A、B对象的runningTasks都小于它们的minShare,那么就比较runningTasks与minShare的比值(minShare使用率),谁小谁排前面;(minShare使用率低的先执行)
3) 若是A、B对象的runningTasks都大于它们的minShare,那么就比较runningTasks与weight的比值(权重使用率),谁小谁排前面。(权重使用率低的先执行)
4) 若是上述比较均相等,则比较名字。
总体上来讲就是经过minShare和weight这两个参数控制比较过程,能够作到让minShare使用率和权重使用率少(实际运行task比例较少)的先运行。
FAIR模式排序完成后,全部的TaskSetManager被放入一个ArrayBuffer里,以后依次被取出并发送给Executor执行。
从调度队列中拿到TaskSetManager后,因为TaskSetManager封装了一个Stage的全部Task,并负责管理调度这些Task,那么接下来的工做就是TaskSetManager按照必定的规则一个个取出Task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。
DAGScheduler切割Job,划分Stage, 经过调用submitStage来提交一个Stage对应的tasks,submitStage会调用submitMissingTasks,submitMissingTasks 肯定每一个须要计算的 task 的preferredLocations,经过调用getPreferrdeLocations()获得partition 的优先位置,因为一个partition对应一个task,此partition的优先位置就是task的优先位置,对于要提交到TaskScheduler的TaskSet中的每个task,该task优先位置与其对应的partition对应的优先位置一致。
从调度队列中拿到TaskSetManager后,那么接下来的工做就是TaskSetManager按照必定的规则一个个取出task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。前面也提到,TaskSetManager封装了一个Stage的全部task,并负责管理调度这些task。
根据每一个task的优先位置,肯定task的Locality级别,Locality一共有五种,优先级由高到低顺序:
表5-1 Spark本地化等级
名称 |
解析 |
PROCESS_LOCAL |
进程本地化,task和数据在同一个Executor中,性能最好。 |
NODE_LOCAL |
节点本地化,task和数据在同一个节点中,可是task和数据不在同一个Executor中,数据须要在进程间进行传输。 |
RACK_LOCAL |
机架本地化,task和数据在同一个机架的两个节点上,数据须要经过网络在节点之间进行传输。 |
NO_PREF |
对于task来讲,从哪里获取都同样,没有好坏之分。 |
ANY |
task和数据能够在集群的任何地方,并且不在一个机架中,性能最差。 |
在调度执行时,Spark调度老是会尽可能让每一个task以最高的本地性级别来启动,当一个task以X本地性级别启动,可是该本地性级别对应的全部节点都没有空闲资源而启动失败,此时并不会立刻下降本地性级别启动而是在某个时间长度内再次以X本地性级别来启动该task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。
能够经过调大每一个类别的最大容忍延迟时间,在等待阶段对应的Executor可能就会有相应的资源去执行此task,这就在在必定程度上提到了运行性能。
除了选择合适的Task调度运行外,还须要监控Task的执行状态,前面也提到,与外部打交道的是SchedulerBackend,Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend则告诉TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager,并通知到该TaskSetManager,这样TaskSetManager就知道Task的失败与成功状态,对于失败的Task,会记录它失败的次数,若是失败次数尚未超过最大重试次数,那么就把它放回待调度的Task池子中,不然整个Application失败。
在记录Task失败次数过程当中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到必定的容错做用。黑名单记录Task上一次失败所在的Executor Id和Host,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个Task了。
图6-1 ShuffleMapStage与FinalStage
在划分stage时,最后一个stage称为FinalStage,它本质上是一个ResultStage对象,前面的全部stage被称为ShuffleMapStage。
ShuffleMapStage的结束伴随着shuffle文件的写磁盘。
ResultStage基本上对应代码中的action算子,即将一个函数应用在RDD的各个partition的数据集上,意味着一个job的运行结束。
1. map端task个数的肯定
Shuffle过程当中的task个数由RDD分区数决定,而RDD的分区个数与参数spark.default.parallelism有密切关系。
在Yarn Cluster模式下,若是没有手动设置spark.default.parallelism ,则有:
Others: total number of cores on all executor nodes or 2, whichever is larger.
spark.default.parallelism = max(全部executor使用的core总数, 2)
若是进行了手动配置,则:
spark.default.parallelism = 配置值
还有一个重要的配置:
The maximum number of bytes to pack into a single partition when reading files.
spark.files.maxPartitionBytes = 128 M (默认)
表明着rdd的一个分区能存放数据的最大字节数,若是一个400MB的文件,只分了两个区,则在action时会发生错误。
当一个spark应用程序执行时,生成sparkContext,同时会生成两个参数,由上面获得的spark.default.parallelism推导出这两个参数的值:
sc.defaultParallelism = spark.default.parallelism
sc.defaultMinPartitions = min(spark.default.parallelism,2)
当以上参数肯定后,就能够推算RDD分区数目了。
① 经过scala 集合方式parallelize生成的RDD
val rdd = sc.parallelize(1 to 10)
这种方式下,若是在parallelize操做时没有指定分区数,则有:
rdd的分区数 = sc.defaultParallelism
② 在本地文件系统经过textFile方式生成的RDD
val rdd = sc.textFile(“path/file”)
rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)
③ 在HDFS文件系统生成的RDD
rdd的分区数 = max(HDFS文件的Block数目, sc.defaultMinPartitions)
④ 从HBase数据表获取数据并转换为RDD
rdd的分区数 = Table的region个数
⑤ 经过获取json(或者parquet等等)文件转换成的DataFrame
rdd的分区数 = 该文件在文件系统中存放的Block数目
⑥ Spark Streaming获取Kafka消息对应的分区数
基于Receiver:
在Receiver的方式中,Spark中的partition和kafka中的partition并非相关的,因此若是咱们加大每一个topic的partition数量,
仅仅是增长线程来处理由单一Receiver消费的主题。可是这并无增长Spark在处理数据上的并行度。
基于DirectDStream:
Spark会建立跟Kafka partition同样多的RDD partition,而且会并行从Kafka中读取数据,因此在Kafka partition和RDD partition之间,有一个一对一的映射关系。
2. reduce端task个数的肯定
Reduce端进行数据的聚合,一部分聚合算子能够手动指定reducetask的并行度,若是没有指定,则以map端的最后一个RDD的分区数做为其分区数,那么分区数就决定了reduce端的task的个数。
根据stage的划分咱们知道,map端task和reduce端task不在相同的stage中,map task位于ShuffleMapStage,reduce task位于ResultStage,map task会先执行,那么后执行的reduce task如何知道从哪里去拉取map task落盘后的数据呢?
reduce端的数据拉取过程以下:
如下的讨论都假设每一个Executor有1个CPU core。
1. 未经优化的HashShuffleManager
shuffle write阶段,主要就是在一个stage结束计算以后,为了下一个stage能够执行shuffle类的算子(好比reduceByKey),而将每一个task处理的数据按key进行“划分”。所谓“划分”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘以前,会先将数据写入内存缓冲中,当内存缓冲填满以后,才会溢写到磁盘文件中去。
下一个stage的task有多少个,当前stage的每一个task就要建立多少份磁盘文件。好比下一个stage总共有100个task,那么当前stage的每一个task都要建立100份磁盘文件。若是当前stage有50个task,总共有10个Executor,每一个Executor执行5个task,那么每一个Executor上总共就要建立500个磁盘文件,全部Executor上会建立5000个磁盘文件。因而可知,未经优化的shuffle write操做所产生的磁盘文件的数量是极其惊人的。
shuffle read阶段,一般就是一个stage刚开始时要作的事情。此时该stage的每个task就须要将上一个stage的计算结果中的全部相同key,从各个节点上经过网络都拉取到本身所在的节点上,而后进行key的聚合或链接等操做。因为shuffle write的过程当中,map task给下游stage的每一个reduce task都建立了一个磁盘文件,所以shuffle read的过程当中,每一个reduce task只要从上游stage的全部map task所在节点上,拉取属于本身的那一个磁盘文件便可。
shuffle read的拉取过程是一边拉取一边进行聚合的。每一个shuffle read task都会有一个本身的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,而后经过内存中的一个Map进行聚合等操做。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操做。以此类推,直到最后将全部数据到拉取完,并获得最终的结果。
未优化的HashShuffleManager工做原理如图1-7所示:
图6-2 未优化的HashShuffleManager工做原理
2. 优化后的HashShuffleManager
为了优化HashShuffleManager咱们能够设置一个参数,spark.shuffle. consolidateFiles,该参数默认值为false,将其设置为true便可开启优化机制,一般来讲,若是咱们使用HashShuffleManager,那么都建议开启这个选项。
开启consolidate机制以后,在shuffle write过程当中,task就不是为下游stage的每一个task建立一个磁盘文件了,此时会出现shuffleFileGroup的概念,每一个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就能够并行执行多少个task。而第一批并行执行的每一个task都会建立一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用以前已有的shuffleFileGroup,包括其中的磁盘文件,也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。所以,consolidate机制容许不一样的task复用同一批磁盘文件,这样就能够有效将多个task的磁盘文件进行必定程度上的合并,从而大幅度减小磁盘文件的数量,进而提高shuffle write的性能。
假设第二个stage有100个task,第一个stage有50个task,总共仍是有10个Executor(Executor CPU个数为1),每一个Executor执行5个task。那么本来使用未经优化的HashShuffleManager时,每一个Executor会产生500个磁盘文件,全部Executor会产生5000个磁盘文件的。可是此时通过优化以后,每一个Executor建立的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量,也就是说,每一个Executor此时只会建立100个磁盘文件,全部Executor只会建立1000个磁盘文件。
优化后的HashShuffleManager工做原理如图1-8所示:
图6-3 优化后的HashShuffleManager工做原理d
SortShuffleManager的运行机制主要分红两种,一种是普通运行机制,另外一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort. bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。
在该模式下,数据会先写入一个内存数据结构中,此时根据不一样的shuffle算子,可能选用不一样的数据结构。若是是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边经过Map进行聚合,一边写入内存;若是是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构以后,就会判断一下,是否达到了某个临界阈值。若是达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,而后清空内存数据结构。
在溢写到磁盘文件以前,会先根据key对内存数据结构中已有的数据进行排序。排序事后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是经过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢以后再一次写入磁盘文件中,这样能够减小磁盘IO次数,提高性能。
一个task将全部数据写入内存数据结构的过程当中,会发生屡次磁盘溢写操做,也就会产生多个临时文件。最后会将以前全部的临时磁盘文件都进行合并,这就是merge过程,此时会将以前全部临时磁盘文件中的数据读取出来,而后依次写入最终的磁盘文件之中。此外,因为一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,所以还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
SortShuffleManager因为有一个磁盘文件merge的过程,所以大大减小了文件数量。好比第一个stage有50个task,总共有10个Executor,每一个Executor执行5个task,而第二个stage有100个task。因为每一个task最终只有一个磁盘文件,所以此时每一个Executor上只有5个磁盘文件,全部Executor只有50个磁盘文件。
普通运行机制的SortShuffleManager工做原理如图1-9所示:
图6-4 普通运行机制的SortShuffleManager工做原理
bypass运行机制的触发条件以下:
l shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
l 不是聚合类的shuffle算子。
此时,每一个task会为每一个下游task都建立一个临时磁盘文件,并将数据按key进行hash而后根据key的hash值,将key写入对应的磁盘文件之中。固然,写入磁盘文件时也是先写入内存缓冲,缓冲写满以后再溢写到磁盘文件的。最后,一样会将全部临时磁盘文件都合并成一个磁盘文件,并建立一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是如出一辙的,由于都要建立数量惊人的磁盘文件,只是在最后会作一个磁盘文件的合并而已。所以少许的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来讲,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不一样在于:第一,磁盘写机制不一样;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程当中,不须要进行数据的排序操做,也就节省掉了这部分的性能开销。
普通运行机制的SortShuffleManager工做原理如图1-10所示:
图6-5 bypass运行机制的SortShuffleManager工做原理
在执行Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,前者为主控进程,负责建立 Spark 上下文,提交 Spark 做业(Job),并将做业转化为计算任务(Task),在各个 Executor 进程间协调任务的调度,后者负责在工做节点上执行具体的计算任务,并将结果返回给 Driver,同时为须要持久化的 RDD 提供存储功能。因为 Driver 的内存管理相对来讲较为简单,本节主要对 Executor 的内存管理进行分析,下文中的 Spark 内存均特指 Executor 的内存。
做为一个 JVM 进程,Executor 的内存管理创建在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之能够直接在工做节点的系统内存中开辟空间,进一步优化了内存的使用。
堆内内存受到JVM统一管理,堆外内存是直接向操做系统进行内存的申请和释放。
图7-1 Executor堆内与堆外内存
堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不作特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。不一样的管理模式下,这三部分占用的空间大小各不相同。
Spark 对堆内内存的管理是一种逻辑上的”规划式”的管理,由于对象实例占用内存的申请和释放都由 JVM 完成,Spark 只能在申请后和释放前记录这些内存,咱们来看其具体流程:
申请内存流程以下:
释放内存流程以下:
1. Spark记录该对象释放的内存,删除该对象的引用;
2. 等待JVM的垃圾回收机制释放该对象占用的堆内内存。
咱们知道,JVM 的对象能够以序列化的方式存储,序列化的过程是将对象转换为二进制字节流,本质上能够理解为将非连续空间的链式存储转化为连续空间或块存储,在访问时则须要进行序列化的逆过程——反序列化,将字节流转化为对象,序列化的方式能够节省存储空间,但增长了存储和读取时候的计算开销。
对于 Spark 中序列化的对象,因为是字节流的形式,其占用的内存大小可直接计算,而对于非序列化的对象,其占用的内存是经过周期性地采样近似估算而得,即并非每次新增的数据项都会计算一次占用的内存大小,这种方法下降了时间开销可是有可能偏差较大,致使某一时刻的实际内存有可能远远超出预期。此外,在被 Spark 标记为释放的对象实例,颇有可能在实际上并无被 JVM 回收,致使实际可用的内存小于 Spark 记录的可用内存。因此 Spark 并不能准确记录实际可用的堆内内存,从而也就没法彻底避免内存溢出(OOM, Out of Memory)的异常。
虽然不能精准控制堆内内存的申请和释放,但 Spark 经过对存储内存和执行内存各自独立的规划管理,能够决定是否要在存储内存里缓存新的 RDD,以及是否为新的任务分配执行内存,在必定程度上能够提高内存的利用率,减小异常的出现。
为了进一步优化内存的使用以及提升 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之能够直接在工做节点的系统内存中开辟空间,存储通过序列化的二进制数据。
堆外内存意味着把内存对象分配在Java虚拟机的堆之外的内存,这些内存直接受操做系统管理(而不是虚拟机)。这样作的结果就是能保持一个较小的堆,以减小垃圾收集对应用的影响。
利用 JDK Unsafe API(从 Spark 2.0 开始,在管理堆外的存储内存时再也不基于 Tachyon,而是与堆外的执行内存同样,基于 JDK Unsafe API 实现),Spark 能够直接操做系统堆外内存,减小了没必要要的内存开销,以及频繁的 GC 扫描和回收,提高了处理性能。堆外内存能够被精确地申请和释放(堆外内存之因此可以被精确的申请和释放,是因为内存的申请和释放再也不经过JVM机制,而是直接向操做系统申请,JVM对于内存的清理是没法准确指定时间点的,所以没法实现精确的释放),并且序列化的数据占用的空间能够被精确计算,因此相比堆内内存来讲下降了管理的难度,也下降了偏差。
在默认状况下堆外内存并不启用,可经过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,全部运行中的并发任务共享存储内存和执行内存。
(*该部份内存主要用于程序的共享库、Perm Space、线程Stack和一些Memory mapping等, 或者类C方式allocate object)
1. 静态内存管理
在 Spark 最初采用的静态内存管理机制下,存储内存、执行内存和其余内存的大小在 Spark 应用程序运行期间均为固定的,但用户能够应用程序启动前进行配置,堆内内存的分配如图 2 所示:
图7-2 静态内存管理——堆内内存
能够看到,可用的堆内内存的大小须要按照代码清单1-1的方式计算:
代码清单1-1 堆内内存计算公式
可用的存储内存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safety Fraction
可用的执行内存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safety Fraction
其中 systemMaxMemory 取决于当前 JVM 堆内内存的大小,最后可用的执行内存或者存储内存要在此基础上与各自的 memoryFraction 参数和 safetyFraction 参数相乘得出。上述计算公式中的两个 safetyFraction 参数,其意义在于在逻辑上预留出 1-safetyFraction 这么一块保险区域,下降因实际内存超出当前预设范围而致使 OOM 的风险(上文提到,对于非序列化对象的内存采样估算会产生偏差)。值得注意的是,这个预留的保险区域仅仅是一种逻辑上的规划,在具体使用时 Spark 并无区别对待,和”其它内存”同样交给了 JVM 去管理。
Storage内存和Execution内存都有预留空间,目的是防止OOM,由于Spark堆内内存大小的记录是不许确的,须要留出保险区域。
堆外的空间分配较为简单,只有存储内存和执行内存,如图1-3所示。可用的执行内存和存储内存占用的空间大小直接由参数spark.memory.storageFraction 决定,因为堆外内存占用的空间能够被精确计算,因此无需再设定保险区域。
图7-3 静态内存管理
静态内存管理机制实现起来较为简单,但若是用户不熟悉 Spark 的存储机制,或没有根据具体的数据规模和计算任务或作相应的配置,很容易形成”一半海水,一半火焰”的局面,即存储内存和执行内存中的一方剩余大量的空间,而另外一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。因为新的内存管理机制的出现,这种方式目前已经不多有开发者使用,出于兼容旧版本的应用程序的目的,Spark 仍然保留了它的实现。
Spark 1.6 以后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,能够动态占用对方的空闲区域,统一内存管理的堆内内存结构如图 1-4所示:
图7-4 统一内存管理——堆内内存
统一内存管理的堆外内存结构如图 1-5所示:
图7-5 统一内存管理——堆外内存
其中最重要的优化在于动态占用机制,其规则以下:
3. 执行内存的空间被对方占用后,可以让对方将占用的部分转存到硬盘,而后”归还”借用的空间;
4. 存储内存的空间被对方占用后,没法让对方”归还”,由于须要考虑 Shuffle 过程当中的不少因素,实现起来较为复杂。
统一内存管理的动态占用机制如图 1-6所示:
图7-6 同一内存管理——动态占用机制
凭借统一内存管理机制,Spark 在必定程度上提升了堆内和堆外内存资源的利用率,下降了开发者维护 Spark 内存的难度,但并不意味着开发者能够高枕无忧。若是存储内存的空间太大或者说缓存的数据过多,反而会致使频繁的全量垃圾回收,下降任务执行时的性能,由于缓存的 RDD 数据一般都是长期驻留内存的。因此要想充分发挥 Spark 的性能,须要开发者进一步了解存储内存和执行内存各自的管理方式和实现原理。
弹性分布式数据集(RDD)做为 Spark 最根本的数据抽象,是只读的分区记录(Partition)的集合,只能基于在稳定物理存储中的数据集上建立,或者在其余已有的 RDD 上执行转换(Transformation)操做产生一个新的 RDD。转换后的 RDD 与原始的 RDD 之间产生的依赖关系,构成了血统(Lineage)。凭借血统,Spark 保证了每个 RDD 均可以被从新恢复。但 RDD 的全部转换都是惰性的,即只有当一个返回结果给 Driver 的行动(Action)发生时,Spark 才会建立任务读取 RDD,而后真正触发转换的执行。
Task 在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,若是没有则须要检查 Checkpoint 或按照血统从新计算。因此若是一个 RDD 上要执行屡次行动,能够在第一次行动中使用 persist 或 cache 方法,在内存或磁盘中持久化或缓存这个 RDD,从而在后面的行动时提高计算速度。
事实上,cache 方法是使用默认的 MEMORY_ONLY 的存储级别将 RDD 持久化到内存,故缓存是一种特殊的持久化。 堆内和堆外存储内存的设计,即可以对缓存 RDD 时使用的内存作统一的规划和管理。
RDD 的持久化由 Spark 的 Storage 模块负责,实现了 RDD 与物理存储的解耦合。Storage 模块负责管理 Spark 在计算过程当中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时 Driver 端和 Executor 端的 Storage 模块构成了主从式的架构,即 Driver 端的 BlockManager 为 Master,Executor 端的 BlockManager 为 Slave。
Storage 模块在逻辑上以 Block 为基本存储单位,RDD 的每一个 Partition 通过处理后惟一对应一个 Block(BlockId 的格式为 rdd_RDD-ID_PARTITION-ID )。Driver端的Master 负责整个 Spark 应用程序的 Block 的元数据信息的管理和维护,而Executor端的 Slave 须要将 Block 的更新等状态上报到 Master,同时接收 Master 的命令,例如新增或删除一个 RDD。
图7-7 Storage模块示意图
在对 RDD 持久化时,Spark 规定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 种不一样的存储级别 ,而存储级别是如下 5 个变量的组合:
代码清单5-1 resourceOffer代码
class StorageLevel private(
private var _useDisk: Boolean, //磁盘
private var _useMemory: Boolean, //这里实际上是指堆内内存
private var _useOffHeap: Boolean, //堆外内存
private var _deserialized: Boolean, //是否为非序列化
private var _replication: Int = 1 //副本个数
)
Spark中7种存储级别以下:
表5-1 Spark持久化级别
持久化级别 |
含义 |
MEMORY_ONLY |
以非序列化的Java对象的方式持久化在JVM内存中。若是内存没法彻底存储RDD全部的partition,那么那些没有持久化的partition就会在下一次须要使用它们的时候,从新被计算 |
MEMORY_AND_DISK |
同上,可是当某些partition没法存储在内存中时,会持久化到磁盘中。下次须要使用这些partition时,须要从磁盘上读取 |
MEMORY_ONLY_SER |
同MEMORY_ONLY,可是会使用Java序列化方式,将Java对象序列化后进行持久化。能够减小内存开销,可是须要进行反序列化,所以会加大CPU开销 |
MEMORY_AND_DISK_SER |
同MEMORY_AND_DISK,可是使用序列化方式持久化Java对象 |
DISK_ONLY |
使用非序列化Java对象的方式持久化,彻底存储到磁盘上 |
MEMORY_ONLY_2 MEMORY_AND_DISK_2 等等 |
若是是尾部加了2的持久化级别,表示将持久化数据复用一份,保存到其余节点,从而在数据丢失时,不须要再次计算,只须要使用备份数据便可 |
经过对数据结构的分析,能够看出存储级别从三个维度定义了 RDD 的 Partition(同时也就是 Block)的存储方式:
1) 存储位置:磁盘/堆内内存/堆外内存。如 MEMORY_AND_DISK 是同时在磁盘和堆内内存上存储,实现了冗余备份。OFF_HEAP 则是只在堆外内存存储,目前选择堆外内存时不能同时存储到其余位置。
2) 存储形式:Block 缓存到存储内存后,是否为非序列化的形式。如 MEMORY_ONLY 是非序列化方式存储,OFF_HEAP 是序列化方式存储。
3) 副本数量:大于 1 时须要远程冗余备份到其余节点。如 DISK_ONLY_2 须要远程备份 1 个副本。
RDD 在缓存到存储内存以前,Partition 中的数据通常以迭代器(Iterator)的数据结构来访问,这是 Scala 语言中一种遍历数据集合的方法。经过 Iterator 能够获取分区中每一条序列化或者非序列化的数据项(Record),这些 Record 的对象实例在逻辑上占用了 JVM 堆内内存的 other 部分的空间,同一 Partition 的不一样 Record 的存储空间并不连续。
RDD 在缓存到存储内存以后,Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间。将Partition由不连续的存储空间转换为连续存储空间的过程,Spark称之为"展开"(Unroll)。
Block 有序列化和非序列化两种存储格式,具体以哪一种方式取决于该 RDD 的存储级别。非序列化的 Block 以一种 DeserializedMemoryEntry 的数据结构定义,用一个数组存储全部的对象实例,序列化的 Block 则以 SerializedMemoryEntry的数据结构定义,用字节缓冲区(ByteBuffer)来存储二进制数据。每一个 Executor 的 Storage 模块用一个链式 Map 结构(LinkedHashMap)来管理堆内和堆外存储内存中全部的 Block 对象的实例,对这个 LinkedHashMap 新增和删除间接记录了内存的申请和释放。
由于不能保证存储空间能够一次容纳 Iterator 中的全部数据,当前的计算任务在 Unroll 时要向 MemoryManager 申请足够的 Unroll 空间来临时占位,空间不足则 Unroll 失败,空间足够时能够继续进行。
对于序列化的 Partition,其所需的 Unroll 空间能够直接累加计算,一次申请。
对于非序列化的 Partition 则要在遍历 Record 的过程当中依次申请,即每读取一条 Record,采样估算其所需的 Unroll 空间并进行申请,空间不足时能够中断,释放已占用的 Unroll 空间。
若是最终 Unroll 成功,当前 Partition 所占用的 Unroll 空间被转换为正常的缓存 RDD 的存储空间,以下图所示。
图7-8 Spark Unroll
在静态内存管理时,Spark 在存储内存中专门划分了一块 Unroll 空间,其大小是固定的,统一内存管理时则没有对 Unroll 空间进行特别区分,当存储空间不足时会根据动态占用机制进行处理。
因为同一个 Executor 的全部的计算任务共享有限的存储内存空间,当有新的 Block 须要缓存可是剩余空间不足且没法动态占用时,就要对 LinkedHashMap 中的旧 Block 进行淘汰(Eviction),而被淘汰的 Block 若是其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(Drop),不然直接删除该 Block。
存储内存的淘汰规则为:
l 被淘汰的旧 Block 要与新 Block 的 MemoryMode 相同,即同属于堆外或堆内内存;
l 新旧 Block 不能属于同一个 RDD,避免循环淘汰;
l 旧 Block 所属 RDD 不能处于被读状态,避免引起一致性问题;
l 遍历 LinkedHashMap 中 Block,按照最近最少使用(LRU)的顺序淘汰,直到知足新 Block 所需的空间。其中 LRU 是 LinkedHashMap 的特性。
落盘的流程则比较简单,若是其存储级别符合_useDisk 为 true 的条件,再根据其_deserialized 判断是不是非序列化的形式,如果则对其进行序列化,最后将数据存储到磁盘,在 Storage 模块中更新其信息。
执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照必定规则对 RDD 数据从新分区的过程,咱们来看 Shuffle 的 Write 和 Read 两阶段对执行内存的使用:
l Shuffle Write
在 map 端会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。
l Shuffle Read
1) 在对 reduce 端的数据进行聚合时,要将数据交给 Aggregator 处理,在内存中存储数据时占用堆内执行空间。
2) 若是须要进行最终结果排序,则要将再次将数据交给 ExternalSorter 处理,占用堆内执行空间。
在 ExternalSorter 和 Aggregator 中,Spark 会使用一种叫 AppendOnlyMap 的哈希表在堆内执行内存中存储数据,但在 Shuffle 过程当中全部数据并不能都保存到该哈希表中,当这个哈希表占用的内存会进行周期性地采样估算,当其大到必定程度,没法再从 MemoryManager 申请到新的执行内存时,Spark 就会将其所有内容存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后会被归并(Merge)。
Spark 的存储内存和执行内存有着大相径庭的管理方式:对于存储内存来讲,Spark 用一个 LinkedHashMap 来集中管理全部的 Block,Block 由须要缓存的 RDD 的 Partition 转化而成;而对于执行内存,Spark 用 AppendOnlyMap 来存储 Shuffle 过程当中的数据,在 Tungsten 排序中甚至抽象成为页式内存管理,开辟了全新的 JVM 内存管理机制。
BlockManager是整个Spark底层负责数据存储与管理的一个组件,Driver和Executor的全部数据都由对应的BlockManager进行管理。
Driver上有BlockManagerMaster,负责对各个节点上的BlockManager内部管理的数据的元数据进行维护,好比block的增删改等操做,都会在这里维护好元数据的变动。
每一个节点都有一个BlockManager,每一个BlockManager建立以后,第一件事即便去向BlockManagerMaster进行注册,此时BlockManagerMaster会为其长难句对应的BlockManagerInfo。
BlockManager运行原理以下图所示:
图8-1 BlockManager原理
BlockManagerMaster与BlockManager的关系很是像NameNode与DataNode的关系,BlockManagerMaster中保存中BlockManager内部管理数据的元数据,进行维护,当BlockManager进行Block增删改等操做时,都会在BlockManagerMaster中进行元数据的变动,这与NameNode维护DataNode的元数据信息,DataNode中数据发生变化时NameNode中的元数据信息也会相应变化是一致的。
每一个节点上都有一个BlockManager,BlockManager中有3个很是重要的组件:
每一个BlockManager建立以后,作的第一件事就是想BlockManagerMaster进行注册,此时BlockManagerMaster会为其建立对应的BlockManagerInfo。
使用BlockManager进行写操做时,好比说,RDD运行过程当中的一些中间数据,或者咱们手动指定了persist(),会优先将数据写入内存中,若是内存大小不够,会使用本身的算法,将内存中的部分数据写入磁盘;此外,若是persist()指定了要replica,那么会使用BlockTransferService将数据replicate一份到其余节点的BlockManager上去。
使用BlockManager进行读操做时,好比说,shuffleRead操做,若是能从本地读取,就利用DiskStore或者MemoryStore从本地读取数据,可是本地没有数据的话,那么会用BlockTransferService与有数据的BlockManager创建链接,而后用BlockTransferService从远程BlockManager读取数据;例如,shuffle Read操做中,颇有可能要拉取的数据在本地没有,那么此时就会到远程有数据的节点上,找那个节点的BlockManager来拉取须要的数据。
只要使用BlockManager执行了数据增删改的操做,那么必须将Block的BlockStatus上报到BlockManagerMaster,在BlockManagerMaster上会对指定BlockManager的BlockManagerInfo内部的BlockStatus进行增删改操做,从而达到元数据的维护功能。
Spark一个很是重要的特性就是共享变量。
默认状况下,若是在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每一个task中,此时每一个task只能操做本身的那份变量副本。若是多个task想要共享某个变量,那么这种方式是作不到的。
Spark为此提供了两种共享变量,一种是Broadcast Variable(广播变量),另外一种是Accumulator(累加变量)。Broadcast Variable会将用到的变量,仅仅为每一个节点拷贝一份,即每一个Executor拷贝一份,更大的用途是优化性能,减小网络传输以及内存损耗。Accumulator则能够让多个task共同操做一份变量,主要能够进行累加操做。Broadcast Variable是共享读变量,task不能去修改它,而Accumulator可让多个task操做一个变量。
广播变量容许编程者在每一个Executor上保留外部数据的只读变量,而不是给每一个任务发送一个副本。
每一个task都会保存一份它所使用的外部变量的副本,当一个Executor上的多个task都使用一个大型外部变量时,对于Executor内存的消耗是很是大的,所以,咱们能够将大型外部变量封装为广播变量,此时一个Executor保存一个变量副本,此Executor上的全部task共用此变量,再也不是一个task单独保存一个副本,这在必定程度上下降了Spark任务的内存占用。
图8-2 task使用外部变量
图8-3 使用广播变量
Spark还尝试使用高效的广播算法分发广播变量,以下降通讯成本。
Spark提供的Broadcast Variable是只读的,而且在每一个Executor上只会有一个副本,而不会为每一个task都拷贝一份副本,所以,它的最大做用,就是减小变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗。此外,Spark内部也使用了高效的广播算法来减小网络消耗。
能够经过调用SparkContext的broadcast()方法来针对每一个变量建立广播变量。而后在算子的函数内,使用到广播变量时,每一个Executor只会拷贝一份副本了,每一个task可使用广播变量的value()方法获取值。
在任务运行时,Executor并不获取广播变量,当task执行到 使用广播变量的代码时,会向Executor的内存中请求广播变量,以下图所示:
图8-4 task向Executor请求广播变量
以后Executor会经过BlockManager向Driver拉取广播变量,而后提供给task进行使用,以下图所示:
图8-5 Executor从Driver拉取广播变量
广播大变量是Spark中经常使用的基础优化方法,经过减小内存占用实现任务执行性能的提高。
累加器(accumulator):Accumulator是仅仅被相关操做累加的变量,所以能够在并行中被有效地支持。它们可用于实现计数器(如MapReduce)或总和计数。
Accumulator是存在于Driver端的,集群上运行的task进行Accumulator的累加,随后把值发到Driver端,在Driver端汇总(Spark UI在SparkContext建立时被建立,即在Driver端被建立,所以它能够读取Accumulator的数值),因为Accumulator存在于Driver端,从节点读取不到Accumulator的数值。
Spark提供的Accumulator主要用于多个节点对一个变量进行共享性的操做。Accumulator只提供了累加的功能,可是却给咱们提供了多个task对于同一个变量并行操做的功能,可是task只能对Accumulator进行累加操做,不能读取它的值,只有Driver程序能够读取Accumulator的值。
Accumulator的底层原理以下图所示:
图8-6 累加器原理
Spark的内核原理对于更好的使用Spark完成开发任务有着很是重要的做用,同时Spark的内核知识也是面试过程当中常常被问到的知识点。
在本课程的学习中,咱们对Spark的部署模式、通讯架构、任务调度机制、Shuffle过程、内存管理机制以及Spark的核心组件进行了详细分析,这些内容都是Spark最为重要的架构原理,但愿在以后的学习中你们能够不断深化对于Spark内核架构的理解,在更高的层次上去使用Spark技术框架。