Spark做业运行架构原理解析

[TOC]html


1 说明

根据以前old li(百度高级大数据工程师)给的一张草图从新整理,并用processon绘图一下,这样就更加清晰了。须要注意的是,这里是基于Spark 2.x如下的版本,由于在以前,底层通讯是基于AKKA ACTOR的方式,可是以后就是使用RPC的方式了。(最近原来是想把spark 2.x的源码好好阅读一下,可是公司已有的系统都是基于spark 1.x的,而且最近才更新到spark 1.6.3,因此也不折腾,就把spark 1.x的好好搞透,也不影响后面进一步的深刻学习与解理,由于这些都是举一反三的。)apache

另外,这里的原理图是spark standalone模式,关于其它模式(如spark on yarn),后面则会再整理一下。架构

2 运行架构原理图与解析

原理图以下:分布式

Spark做业运行架构原理解析

说明以下:ide

  • 1.启动Spark集群,其实就是经过运行spark-all.sh脚原本启动master节点和worker节点,启动了一个个对应的master进程和worker进程;
  • 2.worker启动以后,向master进程发送注册信息(该过程基于AKKA Actor事件驱动模型);
  • 3.workermaster注册成功以后,会不断向master发送心跳包,监听master节点是否存活(该过程基于AKKA Actor事件驱动模型);
  • 4.driverSpark集群提交做业,经过spark-submit.sh脚本,向master节点申请资源(该过程基于AKKA Actor事件驱动模型);
  • 5.master收到Driver提交的做业请求以后,向worker节点指派任务,其实就是让其启动对应的executor进程;
  • 6.worker节点收到master节点发来的启动executor进程任务,就启动对应的executor进程,同时向master汇报启动成功,处于能够接收任务的状态;
  • 7.当executor进程启动成功后,就像Driver进程反向注册,以此来告诉driver,谁能够接收任务,执行spark做业(该过程基于AKKA Actor事件驱动模型);
  • 8.driver接收到注册以后,就知道了向谁发送spark做业,这样在spark集群中就有一组独立的executor进程为该driver服务;
  • 9.SparkContext重要组件运行——DAGSchedulerTaskSchedulerDAGScheduler根据宽依赖将做业划分为若干stage,并为每个阶段组装一批task组成tasksettask里面就包含了序列化以后的咱们编写的spark transformation);而后将taskset交给TaskScheduler,由其将任务分发给对应的executor
  • 10.executor进程接收到driver发送过来的taskset,进行反序列化,而后将这些task封装进一个叫taskrunner的线程中,放到本地线程池中,调度咱们的做业的执行;

3 疑惑与解答

1.为何要向Executor发送taskset?oop

移动数据的成本远远高于移动计算,在大数据计算领域中,无论是spark仍是MapReduce,都遵循一个原则:移动计算,不移动数据学习

2.由于最终的计算都是在worker的executor上完成的,那么driver为何要将spark做业提交给master而不提交给worker?测试

能够举个简单的例子来讲明这个问题,假如如今集群有8 cores8G内存(两个worker节点,资源同样的,因此每一个worker节点为4 cores4G),而提交的spark任务须要4 cores6G内存,若是要找worker,请问哪个worker能搞定?显然都不能,因此须要经过master来进行资源的合理分配,由于此时的计算是分布式计算,而再也不是过去传统的单个节点的计算了。大数据

4 task数量问题

Executor进程,会运行多少个task去处理RDD呢?这取决于RDD的partition数量,参考官方的说明:ui

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.

参考:http://spark.apache.org/docs/1.6.3/programming-guide.html#resilient-distributed-datasets-rdds

固然,使用sc.textFile的方式从外部数据集建立rdd时,也是能够指定partition的数量的,这就意味着,当你的数据集量很大时,适当的提升partition数量,能够提升并行度,固然,这也得取决于你的spark集群或hadoop集群规模(Yarn模式下)。

这里能够测试一下,sc.parallelize和sc.textFile方式来建立RDD时,指定partition数量的效果。

4.1 Parallelized Collections

建立RDD:

scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:21

执行action操做,观察日志输出:

scala> rdd.count()
19/01/10 11:07:09 INFO spark.SparkContext: Starting job: count at <console>:24
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Got job 21 (count at <console>:24) with 3 output partitions
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Final stage: ResultStage 21 (count at <console>:24)
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Parents of final stage: List()
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Missing parents: List()
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Submitting ResultStage 21 (ParallelCollectionRDD[12] at parallelize at <console>:21), which has no missing parents
19/01/10 11:07:09 INFO storage.MemoryStore: Block broadcast_25 stored as values in memory (estimated size 1096.0 B, free 941.6 KB)
19/01/10 11:07:09 INFO storage.MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 804.0 B, free 942.4 KB)
19/01/10 11:07:09 INFO storage.BlockManagerInfo: Added broadcast_25_piece0 in memory on localhost:58709 (size: 804.0 B, free: 511.0 MB)
19/01/10 11:07:09 INFO spark.SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 21 (ParallelCollectionRDD[12] at parallelize at <console>:21)
19/01/10 11:07:09 INFO scheduler.TaskSchedulerImpl: Adding task set 21.0 with 3 tasks
19/01/10 11:07:09 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 21.0 (TID 65, localhost, partition 0,PROCESS_LOCAL, 2026 bytes)
19/01/10 11:07:09 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 21.0 (TID 66, localhost, partition 1,PROCESS_LOCAL, 2026 bytes)
19/01/10 11:07:09 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 21.0 (TID 67, localhost, partition 2,PROCESS_LOCAL, 2026 bytes)
19/01/10 11:07:09 INFO executor.Executor: Running task 1.0 in stage 21.0 (TID 66)
19/01/10 11:07:09 INFO executor.Executor: Running task 0.0 in stage 21.0 (TID 65)
19/01/10 11:07:09 INFO executor.Executor: Finished task 0.0 in stage 21.0 (TID 65). 953 bytes result sent to driver
19/01/10 11:07:09 INFO executor.Executor: Running task 2.0 in stage 21.0 (TID 67)
19/01/10 11:07:09 INFO executor.Executor: Finished task 2.0 in stage 21.0 (TID 67). 953 bytes result sent to driver
19/01/10 11:07:09 INFO executor.Executor: Finished task 1.0 in stage 21.0 (TID 66). 953 bytes result sent to driver
19/01/10 11:07:09 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 21.0 (TID 65) in 5 ms on localhost (1/3)
19/01/10 11:07:09 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 21.0 (TID 67) in 4 ms on localhost (2/3)
19/01/10 11:07:09 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 21.0 (TID 66) in 6 ms on localhost (3/3)
19/01/10 11:07:09 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 21.0, whose tasks have all completed, from pool
19/01/10 11:07:09 INFO scheduler.DAGScheduler: ResultStage 21 (count at <console>:24) finished in 0.006 s
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Job 21 finished: count at <console>:24, took 0.015381 s
res25: Long = 6

能够看到,由于前面指定了RDD的partition数量为3,因此执行action操做时,executor上分配的task数量就为3,即task 0.0task 1.0task 2.0

4.2 External Datasets

建立RDD:

scala> val rdd = sc.textFile("/hello", 2)
rdd: org.apache.spark.rdd.RDD[String] = /hello MapPartitionsRDD[18] at textFile at <console>:21

执行action操做,观察日志输出:

scala> rdd.count()
19/01/10 11:14:01 INFO mapred.FileInputFormat: Total input paths to process : 1
19/01/10 11:14:01 INFO spark.SparkContext: Starting job: count at <console>:24
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Got job 24 (count at <console>:24) with 2 output partitions
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Final stage: ResultStage 24 (count at <console>:24)
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Parents of final stage: List()
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Missing parents: List()
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Submitting ResultStage 24 (/hello MapPartitionsRDD[18] at textFile at <console>:21), which has no missing parents
19/01/10 11:14:01 INFO storage.MemoryStore: Block broadcast_31 stored as values in memory (estimated size 2.9 KB, free 1638.6 KB)
19/01/10 11:14:01 INFO storage.MemoryStore: Block broadcast_31_piece0 stored as bytes in memory (estimated size 1764.0 B, free 1640.3 KB)
19/01/10 11:14:01 INFO storage.BlockManagerInfo: Added broadcast_31_piece0 in memory on localhost:58709 (size: 1764.0 B, free: 511.0 MB)
19/01/10 11:14:01 INFO spark.SparkContext: Created broadcast 31 from broadcast at DAGScheduler.scala:1006
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 24 (/hello MapPartitionsRDD[18] at textFile at <console>:21)
19/01/10 11:14:01 INFO scheduler.TaskSchedulerImpl: Adding task set 24.0 with 2 tasks
19/01/10 11:14:01 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 24.0 (TID 76, localhost, partition 0,ANY, 2129 bytes)
19/01/10 11:14:01 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 24.0 (TID 77, localhost, partition 1,ANY, 2129 bytes)
19/01/10 11:14:01 INFO executor.Executor: Running task 0.0 in stage 24.0 (TID 76)
19/01/10 11:14:01 INFO executor.Executor: Running task 1.0 in stage 24.0 (TID 77)
19/01/10 11:14:01 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/hello:0+31
19/01/10 11:14:01 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/hello:31+31
19/01/10 11:14:01 INFO executor.Executor: Finished task 0.0 in stage 24.0 (TID 76). 2137 bytes result sent to driver
19/01/10 11:14:01 INFO executor.Executor: Finished task 1.0 in stage 24.0 (TID 77). 2137 bytes result sent to driver
19/01/10 11:14:01 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 24.0 (TID 76) in 23 ms on localhost (1/2)
19/01/10 11:14:01 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 24.0 (TID 77) in 23 ms on localhost (2/2)
19/01/10 11:14:01 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 24.0, whose tasks have all completed, from pool
19/01/10 11:14:01 INFO scheduler.DAGScheduler: ResultStage 24 (count at <console>:24) finished in 0.023 s
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Job 24 finished: count at <console>:24, took 0.035795 s
res29: Long = 6

那么效果跟前面是同样的,只是这里是从hdfs上读取文件,同时partition数量设置为2.

相关文章
相关标签/搜索