spark计算框架

首先明确一点:学计算框架主要就是学2部分:1.资源调度 2.任务调度
写一个spark程序包含加载配置文件,建立上下文,建立RDD , 调用RDD的算子,用户在算子中自定义的函数
map端:狭窄的理解是MapReduce中的map端,本质就是将数据变成你想要的形式,例如:按照空格切分,乘2等等操做。
shuffle : 分为shuffle write(临时存到本地磁盘)和shuffle read(从磁盘拉数据,同一个分区的拉到一个partition上)阶段,本质就是数据的规整,例如同一个分区的拉到一块。
reduce端:狭窄的理解是MapReduce中的reduce端,本质就是数据的聚合web

宽泛的理解2个stage之间,前面的能够说是map端,后面的stage能够理解为reduce端,中间正好须要shuffle过程,且shuffle过程须要再shuffle write阶段将数据暂时存到本地磁盘上。网络

spark专业术语:

任务相关的专业术语:
1.application:用户写的应用程序(包含2部分:Driver Program(运行应用的main()方法,建立spark上下文 )和Executor Program(用户在算子中自定义的函数))
2.job:一个action类算子触发执行的操做,有多少个action类算子就有多少个job,一个应用程序能够有多个job.
3.stage(阶段):一组任务(task)就是一个stage,例如MapReduce中一组的map task(一个切片对应一个map task),一个job中能够有有多个stage(根据宽依赖为分界线来划分的)
.4.task(任务:底层就是一个thread(线程)):在集群运行时最小的执行单元
集群相关的专业术语:
Master:资源管理的主节点
Worker:资源管理的从节点
Executor:执行任务的进程,运行在worker节点上,负责运行task,负责将数据存储到内存或磁盘,每一个application有多个独立的Executors
ThreadPool:线程池,存在与Executor进程中,task在线程池中运行数据结构

RDD的依赖关系

RDD依赖关系
RDD有5大特性:
1.一个RDD有多个partition组成。
2.每一个算子实质上做用于每一个partition上。
3.每一个RDD依赖其父RDD.
4.可选项 :分区器是做用于KV格式的RDD上
5.可选项:RDD会提供一系列的最佳的计算位置app

父RDD不知道其子RDD,可是子RDD知道的的全部父RDD框架

1.窄依赖:父RDD与子RDD,partition的关系是一对一,这种状况并无shuffle过程
例如:map(x=>x.split(" "))分布式

2.宽依赖 : 父RDD与子RDD,partition之间的关系是一对多,这种状况下通常都会致使shuffle数据规整的过程
例如:groupByKey()->相同key的二元组必定在同一个分区中,无参的状况下子RDD的分区数等于父RDD的分区数(也就是会先计算key的hash函数再与父RDD的分区数求余,因此最终的数据必定会散落在这几个分区中),固然你能够传入参数,这个参数用于锁定该子RDD有多少个分区,后面调优的时候会用到。
groupBy:根据指定的做为分组依据,同sortBy和sortByKeysvg

宽窄依赖的做用是:将job切割成多个stage.从祖先RDD开始找,若是是窄依赖继续往下找,以宽依赖为切割点,分为2个stage函数

那么为何要划分出stage呢?由于每一个stage中的RDD都是窄依赖,没有shuffle过程,且每一个partition都是一对一的关系,因此能够在后面以管道的形式使每一个partition上的task并行处理 (简单说就是为了是每一个task以管道的形式进行计算)spa

关于stage的一个结论:stage与stage之间是宽依赖,stage内部都是窄依赖线程

造成一个DAG(有向无环图)须要从最后一个RDD往前回溯:由于子RDD知道父RDD,可是父RDD不知道子RDD

DAG

RDD中不是存储的真实数据,而是存储的对数据处理的逻辑过程
对于KV格式的RDD应该说:存储的逻辑过程的返回类型是二元组类型咱们称为是KV格式的RDD

task的解读
每一个task做用于partition所在的block或副本所在的节点上(计算向数据移动,本地化能够大大减小网络传输),这里task的计算逻辑(也就是这个展开式),处理的结果并无落地(存到磁盘的意思),而是以管道的模式,一条一条数据的从partition(逻辑上的,数据存在block上)中读到内存,在内存中一直连续的执行,直到最后执行完这个task才可能会落地,一条接着一条的流式处理,一个task中的数据像流水线同样,多个task是并行计算的。

伪代码中的输出:一条filter的输出,一条map的输出,交替出现,而不是先将filter中的全部数据都打印出来,再打印map的数据。

从这里就能明显感受到spark计算框架比MapReduce计算框架的优点:基于内存迭代,不须要落地,不须要存储到磁盘,减小了磁盘IO,大大提升了效率。

在这里插入图片描述

几个问题:
1.stage中的task(管道模式)并行计算,何时会落地磁盘呢?
①若是是stage后面是action类算子
collect:将每一个管道中的输出结果收集到driver端的内存中
saveAsTextFile:将每一个管道中的输出结果保存到指定目录,能够是本地磁盘,也能够是hdfs中
count:将管道的计算结果统计记录数,返回给Driver
②若是是stage后面是stage
在shuffle write节点会写到本地磁盘暂时存储,由于内存中的数据不够稳定,为了防止reduce task拉取数据失败
2.spark在计算过程当中,是否是很是消耗内存?
不是,正常使用,由于管道是很细的不会致使内存过大,多个task并行运算,也是正常使用,可是若是使用控制类算子的 cache,就会消耗大量内存,由于若是一个rdd调用cache(),会将这个管道,开一个口,将数据复制一份放到内存中存储,方便下次运行,可是很是消耗内存。
3.RDD弹性分布式数据集,为何不存储数据,还依然叫数据集?
由于它有处理数据的能力,能够经过生活的例子来举例说明:例如:滴滴虽然每一年一直亏损,可是市值依然很高,由于他虽然没钱,但有创造钱的能力

对比一下spark和MapReduce的计算模式的差别:
mapreduce是1+1=2 2+1=3
spark是1+1+1=3

spark的任务调度过程:
spark的任务调度过程
1.首先编写一个Application(上面的这个程序缺乏一个action算子),一个spark应用程序是基于RDD来操做的,会先建立出相应的RDD对象,而后创建一个系统DAG(有向无环图)
2.DAGScheduler(有向无环图调度器)分割这个DAG,将其分割成多个stage,每一个stage中有一组的task,因此也叫TaskSet(任务集合),一个stage就是一个TaskSet
3.将TaskSet提交给TaskScheduler(任务调度器),经由集群管理者发送任务到worker节点运行,监控task,会重试失败的task和掉队的task,不可能无限重试,因此限制重试次数为3次,默认最大失败次数为4次,若是重试了3次仍是失败,此时TaskScheduler会向DAGScheduler汇报当前失败的task所在的stage失败,此时DAGScheduler收到汇报也会重试该stage,重试次数默认为4次,注意此时已经成功执行的task不须要再从新执行了,只须要提交失败的task就行,若是stage重试4次失败,说明这个job就完全失败了,job没有重试。
那么问题是发送到哪一个work节点呢?最好是存储节点(HDFS)包含计算节点(这里是spark集群),由于这样为了数据本地化。根据文件名就能够得到该文件的全部信息,根据文件名能够得到每个block的位置,以及block所在节点的ip等,而后就将task发送到该节点运行就行。
4.task放到work节点的executor进程中的线程池中运行
在这里插入图片描述

spark资源调度的方式

spark 使用粗粒度的资源调度

粗粒度的资源调度
在任务执行前申请到所需的全部资源,当全部 task 执行完毕后再释放资源
优势:task 直接使用已经申请好的资源,执行效率高
缺点:全部的 task 执行完毕才释放资源,可能致使集群资源浪费,例如只剩一个 task 迟迟不能结束,那么大量资源将被闲置

细粒度的资源调度
任务执行时,task 本身去申请资源,执行完毕后释放资源
优势:使集群资源得以充分利用
缺点:task 须要本身申请资源,执行效率低

spark on standalone 执行流程

1> worker 节点启动,向 master 汇报信息,该信息被存储在 workers 对象中,workers 底层使用 HashSet 数据结构,为了防止同一台 worker 节点在 master 中注册两次(worker 节点挂掉可是迅速恢复可能会致使此问题)

2> 在客户端提交任务,这里以客户端提交方式为例,首先客户端会启动 driver 进程,而后构建Spark Application的运行环境,建立 SparkContext 对象,这会建立并初始化 TaskScheduler 和 DAGScheduler 两个对象

3> 当两个对象建立完成后,TaskScheduler 会向 master 为 Application 申请资源, Application 的信息会注册在 master 上的 waitingApps 对象中,waitingApps 使用 ArrayBuffer 存储数据

4> 当 waitingApps 集合中的元素发生变化时会回调 schedule() 方法,这时 master 就知道有 Appliacation 在请求执行。master 会去读取 workers 来获取本身掌握的 worker 节点,而后在资源充足的 worker 节点上为 Appliacation 分配资源 -> 通知 worker 节点启动Executor 进程,Executor 进程启动时会在内部初始化一个线程池,用来执行 task

–master 采用轮循方式分配资源,确保整个集群的资源获得充分利用,并有利于后面分发 task 时实现数据本地化
–每个 worker 节点上默认为 Applacation 启动 1 个 Executor 进程,该 Executor 进程默认使用 1G 内存和该 worker 节点上空闲的全部的核
可经过在提交任务时使用 - -executor-cores 和 - -executor-memory 来手动指定每一个 Executor 使用的资源
–spark 采用粗粒度的资源调度,当全部 task 都执行完毕后,才进行资源回收

5> 当 Executor 成功启动后,会去向 TaskScheduler 反向注册,此时 TaskScheduler 就获得全部成功启动的 Executor 的信息

all


6> SparkContext 对象解析代码构建DAG(有向无环图)交给 DAGScheduler,每个 job 会构建一个DAG图,DAGScheduler 根据 DAG 中 RDD 的宽窄依赖将其切分红一个个 stage,每一个 stage 中包含一组 task,每一个 task 由于都是窄依赖,不会产生 shuffle,因此都是 pipeline(管道) 计算模式

7> DAGScheduler 将一个 stage 封装到一个 taskSet 中,传给 TaskScheduler,TaskScheduler拿到后遍历 taskSet ,获得一个个 task,解读其要计算的数据,而后调用 HDFS 的 API 获得数据所在的位置

8> 本着计算向数据靠拢的原则,TaskScheduler 将 task 分发到其所要计算的数据所在的节点的 Executor 进程中,task 最后会被封装到线程池里的一个线程中执行,task 执行的过程当中 TaskScheduler 会对其进行监控

9> 若是 task 执行失败,TaskScheduler 会进行重试,再次分发该 task ,最多重试3次;
若是 task 陷入挣扎而且 spark 开启了推测执行,TaskScheduler 会换一个节点分发陷入挣扎的 task,两个 task 谁先执行完就以谁的结果为准

陷入挣扎的断定标准:当75%的 task 已经执行完毕后,这时 TaskScheduler 每隔10ms会计算一次剩余 task 当前执行时间的中值 t,而后以 t 的1.5倍 为标准,未执行完的 task 当前执行时间若是大于 t*1.5 则该 task 被断定为陷入挣扎的 task

10> 若是3次重试后 task 依然执行失败,该 task 所在的 stage 就会被断定为失败,TaskScheduler 会向 DAGScheduler 反馈,DAGScheduler 会重试失败的 stage,最多重试4次,若是4次重试后该 stage 依然失败,则该 job 被断定为失败,程序停止

DAGScheduler 重试 stage 时只会重试 stage 中失败的 task

11> 当全部 task 成功执行完毕后或 job 失败,driver 会通知 master, master 会通知 worker kill 掉 Executor,完成资源回收