Hadoop 和 Spark 的关系
Spark 运算比 Hadoop 的 MapReduce 框架快的缘由是由于 Hadoop 在一次 MapReduce 运算以后,会将数据的运算结果从内存写入到磁盘中,第二次 Mapredue 运算时在从磁盘中读取数据,因此其瓶颈在2次运算间的多余 IO 消耗. Spark 则是将数据一直缓存在内存中,直到计算获得最后的结果,再将结果写入到磁盘,因此屡次运算的状况下, Spark 是比较快的. 其优化了迭代式工做负载html
Hadoop的局限 |
Spark的改进 |
|
-
- 经过使用RDD的统一抽象,实现数据处理逻辑的代码很是简洁
|
|
-
- 经过RDD提供了不少转换和动做,实现了不少基本操做,如Sort, Join等
|
-
- 一个Job只有Map和Reduce两个阶段,复杂的程序须要大量的Job来完成,且Job之间的依赖关系须要开发者自行管理
|
-
- 一个Job能够包含RDD的多个转换操做,在调度时能够生成多个阶段(Stage),并且若是多个map操做的RDD的分区不变,是能够放在同一个Task中进行
|
|
-
- RDD的转换支持流式API,提供处理逻辑的总体视图
|
-
- 对迭代式数据处理性能比较差,Reduce与下一步Map之间的中间结果只能存放在HDFS中
|
-
- 经过内存缓存数据,可大大提升迭代式计算的性能,内存不足时能够溢出到本地磁盘,而不是HDFS
|
-
- ReduceTask须要等待全部MapTask都完成后才能够开始
|
-
- 分区相同的转换构成流水线放在一个Task中运行,分区不一样的转换须要Shuffle,被划分到不一样的Stage中,须要等待前面的Stage完成后才能够开始
|
-
- 时延高,只适用Batch数据处理,对于交互式数据处理和实时数据处理的支持不够
|
-
- 经过将流拆成小的batch提供Discretized Stream处理流数据
|
Spark 的主要特色还包括:编程
-
- (1)提供 Cache 机制来支持须要反复迭代计算或者屡次数据共享,减小数据读取的 IO 开销;
- (2)提供了一套支持 DAG 图的分布式并行计算的编程框架,减小屡次计算之间中间结果写到 Hdfs 的开销;
- (3)使用多线程池模型减小 Task 启动开稍, shuffle 过程当中避免没必要要的 sort 操做并减小磁盘 IO 操做。(Hadoop 的 Map 和 reduce 之间的 shuffle 须要 sort)
Spark 系统架构
明确相关术语缓存
- Application: Appliction都是指用户编写的Spark应用程序,其中包括一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码
- Driver: Spark中的Driver即运行上述Application的main函数并建立SparkContext,建立SparkContext的目的是为了准备Spark应用程序的运行环境,在Spark中有SparkContext负责与ClusterManager通讯,进行资源申请、任务的分配和监控等,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭,一般用SparkContext表明Driver
- Executor: 某个Application运行在worker节点上的一个进程, 该进程负责运行某些Task, 而且负责将数据存到内存或磁盘上,每一个Application都有各自独立的一批Executor, 在Spark on Yarn模式下,其进程名称为CoarseGrainedExecutor Backend。一个CoarseGrainedExecutor Backend有且仅有一个Executor对象, 负责将Task包装成taskRunner,并从线程池中抽取一个空闲线程运行Task, 这个每个oarseGrainedExecutor Backend能并行运行Task的数量取决与分配给它的cpu个数
- Cluter Manager:指的是在集群上获取资源的外部服务。目前有三种类型
-
- Standalon : spark原生的资源管理,由Master负责资源的分配
- Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架
- Hadoop Yarn: 主要是指Yarn中的ResourceManager
- Worker: 集群中任何能够运行Application代码的节点,在Standalone模式中指的是经过slave文件配置的Worker节点,在Spark on Yarn模式下就是NoteManager节点
- Task: 被送到某个Executor上的工做单元,但hadoopMR中的MapTask和ReduceTask概念同样,是运行Application的基本单位,多个Task组成一个Stage,而Task的调度和管理等是由TaskScheduler负责
- Job: 包含多个Task组成的并行计算,每每由Spark Action触发生成, 一个Application中每每会产生多个Job
- Stage: 每一个Job会被拆分红多组Task, 做为一个TaskSet, 其名称为Stage,Stage的划分和调度是有DAGScheduler来负责的,Stage有非最终的Stage(Shuffle Map Stage)和最终的Stage(Result Stage)两种,Stage的边界就是发生shuffle的地方
- DAGScheduler: 根据Job构建基于Stage的DAG(Directed Acyclic Graph有向无环图),并提交Stage给TASkScheduler。 其划分Stage的依据是RDD之间的依赖的关系找出开销最小的调度方法,以下图

- TASKSedulter: 将TaskSET提交给worker运行,每一个Executor运行什么Task就是在此处分配的. TaskScheduler维护全部TaskSet,当Executor向Driver发生心跳时,TaskScheduler会根据资源剩余状况分配相应的Task。另外TaskScheduler还维护着全部Task的运行标签,重试失败的Task。下图展现了TaskScheduler的做用

- 在不一样运行模式中任务调度器具体为:
-
- Spark on Standalone模式为TaskScheduler
- YARN-Client模式为YarnClientClusterScheduler
- YARN-Cluster模式为YarnClusterScheduler
- 将这些术语串起来的运行层次图以下:

- Job=多个stage,Stage=多个同种task, Task分为ShuffleMapTask和ResultTask,Dependency分为ShuffleDependency和NarrowDependency
整个 Spark 集群中,分为 Master 节点与 worker 节点,,其中 Master 节点负责将串行任务变成可并行执行的任务集Tasks, 同时还负责出错问题处理等,而 Worker 节点负责执行任务.
Driver 的功能是建立 SparkContext, 负责执行用户写的 Application 的 main 函数进程,Application 就是用户写的程序.
不一样的模式可能会将 Driver 调度到不一样的节点上执行.集群管理模式里, local 通常用于本地调试.
每一个 Worker 上存在一个或多个 Executor 进程,该对象拥有一个线程池,每一个线程负责一个 Task 任务的执行.根据 Executor 上 CPU-core 的数量,其每一个时间能够并行多个 跟 core 同样数量的 Task.Task 任务即为具体执行的 Spark 程序的任务. 网络

- spark运行流程图以下:

- 构建Spark Application的运行环境,启动SparkContext
- SparkContext向资源管理器(能够是Standalone,Mesos,Yarn)申请运行Executor资源,并启动StandaloneExecutorbackend,
- Executor向SparkContext申请Task
- SparkContext将应用程序分发给Executor
- SparkContext构建成DAG图,将DAG图分解成Stage、将Taskset发送给Task Scheduler,最后由Task Scheduler将Task发送给Executor运行
- Task在Executor上运行,运行完释放全部资源
Spark运行特色:多线程
- 每一个Application获取专属的executor进程,该进程在Application期间一直驻留,并以多线程方式运行Task。这种Application隔离机制是有优点的,不管是从调度角度看(每一个Driver调度他本身的任务),仍是从运行角度看(来自不一样Application的Task运行在不一样JVM中),固然这样意味着Spark Application不能跨应用程序共享数据,除非将数据写入外部存储系统
- Spark与资源管理器无关,只要可以获取executor进程,并能保持相互通讯就能够了
- 提交SparkContext的Client应该靠近Worker节点(运行Executor的节点),最好是在同一个Rack里,由于Spark Application运行过程当中SparkContext和Executor之间有大量的信息交换
- Task采用了数据本地性和推测执行的优化机制
Spark做业基本运行原理

详细原理见上图。架构
咱们使用spark-submit提交一个Spark做业以后,这个做业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不一样,Driver进程可能在本地启动,也可能在集群中某个工做节点上启动。Driver进程自己会根据咱们设置的参数,占有必定数量的内存和CPU core。而Driver进程要作的第一件事情,就是向集群管理器(YARN或者其余资源管理集群)申请运行Spark做业须要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据咱们为Spark做业设置的资源参数,在各个工做节点上,启动必定数量的Executor进程,每一个Executor进程都占有必定数量的内存和CPU core。并发
在申请到了做业执行所需的资源以后,Driver进程就会开始调度和执行咱们编写的做业代码了。Driver进程会将咱们编写的Spark做业代码分拆为多个stage,每一个stage执行一部分代码片断,并为每一个stage建立一批task,而后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行如出一辙的计算逻辑(也就是咱们本身编写的某个代码片断),只是每一个task处理的数据不一样而已。一个stage的全部task都执行完毕以后,会在各个节点本地的磁盘文件中写入计算中间结果,而后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将咱们本身编写的代码逻辑所有执行完,而且计算完全部的数据,获得咱们想要的结果为止。框架
Spark是根据shuffle类算子来进行stage的划分。若是咱们的代码中执行了某个shuffle类算子(好比reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。能够大体理解为,shuffle算子执行以前的代码会被划分为一个stage,shuffle算子执行以及以后的代码会被划分为下一个stage。所以一个stage刚开始执行的时候,它的每一个task可能都会从上一个stage的task所在的节点,去经过网络传输拉取须要本身处理的全部key,而后对拉取到的全部相同的key使用咱们本身编写的算子函数执行聚合操做(好比reduceByKey()算子接收的函数)。这个过程就是shuffle。分布式
当咱们在代码中执行了cache/persist等持久化操做时,根据咱们选择的持久化级别的不一样,每一个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。函数
所以Executor的内存主要分为三块:第一块是让task执行咱们本身编写的代码时使用,默认是占Executor总内存的20%;第二块是让task经过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操做时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。
task的执行速度是跟每一个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每一个Executor进程上分配到的多个task,都是以每一个task一条线程的方式,多线程并发运行的。若是CPU core数量比较充足,并且分配到的task数量比较合理,那么一般来讲,能够比较快速和高效地执行完这些task线程。
以上就是Spark做业的基本运行原理的说明
Refer