[TOC]html
根据以前old li
(百度高级大数据工程师)给的一张草图从新整理,并用processon
绘图一下,这样就更加清晰了。须要注意的是,这里是基于Spark 2.x
如下的版本,由于在以前,底层通讯是基于AKKA ACTOR
的方式,可是以后就是使用RPC
的方式了。(最近原来是想把spark 2.x
的源码好好阅读一下,可是公司已有的系统都是基于spark 1.x
的,而且最近才更新到spark 1.6.3
,因此也不折腾,就把spark 1.x
的好好搞透,也不影响后面进一步的深刻学习与解理,由于这些都是举一反三的。)apache
另外,这里的原理图是spark standalone
模式,关于其它模式(如spark on yarn
),后面则会再整理一下。架构
原理图以下:分布式
说明以下:ide
Spark
集群,其实就是经过运行spark-all.sh
脚原本启动master
节点和worker
节点,启动了一个个对应的master
进程和worker
进程;worker
启动以后,向master
进程发送注册信息(该过程基于AKKA Actor
事件驱动模型);worker
向master
注册成功以后,会不断向master
发送心跳包,监听master
节点是否存活(该过程基于AKKA Actor事件驱动模型);driver
向Spark
集群提交做业,经过spark-submit.sh
脚本,向master
节点申请资源(该过程基于AKKA Actor
事件驱动模型);master
收到Driver
提交的做业请求以后,向worker
节点指派任务,其实就是让其启动对应的executor
进程;worker
节点收到master
节点发来的启动executor
进程任务,就启动对应的executor
进程,同时向master
汇报启动成功,处于能够接收任务的状态;executor
进程启动成功后,就像Driver
进程反向注册,以此来告诉driver
,谁能够接收任务,执行spark
做业(该过程基于AKKA Actor
事件驱动模型);driver
接收到注册以后,就知道了向谁发送spark
做业,这样在spark
集群中就有一组独立的executor
进程为该driver
服务;SparkContext
重要组件运行——DAGScheduler
和TaskScheduler
,DAGScheduler
根据宽依赖将做业划分为若干stage
,并为每个阶段组装一批task
组成taskset
(task
里面就包含了序列化以后的咱们编写的spark transformation
);而后将taskset
交给TaskScheduler
,由其将任务分发给对应的executor
;executor
进程接收到driver
发送过来的taskset
,进行反序列化,而后将这些task封装进一个叫taskrunner
的线程中,放到本地线程池中,调度咱们的做业的执行;1.为何要向Executor发送taskset?oop
移动数据的成本远远高于移动计算,在大数据计算领域中,无论是spark
仍是MapReduce
,都遵循一个原则:移动计算,不移动数据!学习
2.由于最终的计算都是在worker的executor上完成的,那么driver为何要将spark做业提交给master而不提交给worker?测试
能够举个简单的例子来讲明这个问题,假如如今集群有8 cores
、8G
内存(两个worker
节点,资源同样的,因此每一个worker
节点为4 cores
、4G
),而提交的spark
任务须要4 cores
、6G
内存,若是要找worker
,请问哪个worker
能搞定?显然都不能,因此须要经过master
来进行资源的合理分配,由于此时的计算是分布式计算,而再也不是过去传统的单个节点的计算了。大数据
Executor进程,会运行多少个task去处理RDD呢?这取决于RDD的partition数量,参考官方的说明:ui
One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize
(e.g. sc.parallelize(data, 10)
). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.
参考:http://spark.apache.org/docs/1.6.3/programming-guide.html#resilient-distributed-datasets-rdds
固然,使用sc.textFile的方式从外部数据集建立rdd时,也是能够指定partition的数量的,这就意味着,当你的数据集量很大时,适当的提升partition数量,能够提升并行度,固然,这也得取决于你的spark集群或hadoop集群规模(Yarn模式下)。
这里能够测试一下,sc.parallelize和sc.textFile方式来建立RDD时,指定partition数量的效果。
建立RDD:
scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 3) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:21
执行action操做,观察日志输出:
scala> rdd.count() 19/01/10 11:07:09 INFO spark.SparkContext: Starting job: count at <console>:24 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Got job 21 (count at <console>:24) with 3 output partitions 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Final stage: ResultStage 21 (count at <console>:24) 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Parents of final stage: List() 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Missing parents: List() 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Submitting ResultStage 21 (ParallelCollectionRDD[12] at parallelize at <console>:21), which has no missing parents 19/01/10 11:07:09 INFO storage.MemoryStore: Block broadcast_25 stored as values in memory (estimated size 1096.0 B, free 941.6 KB) 19/01/10 11:07:09 INFO storage.MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 804.0 B, free 942.4 KB) 19/01/10 11:07:09 INFO storage.BlockManagerInfo: Added broadcast_25_piece0 in memory on localhost:58709 (size: 804.0 B, free: 511.0 MB) 19/01/10 11:07:09 INFO spark.SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 21 (ParallelCollectionRDD[12] at parallelize at <console>:21) 19/01/10 11:07:09 INFO scheduler.TaskSchedulerImpl: Adding task set 21.0 with 3 tasks 19/01/10 11:07:09 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 21.0 (TID 65, localhost, partition 0,PROCESS_LOCAL, 2026 bytes) 19/01/10 11:07:09 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 21.0 (TID 66, localhost, partition 1,PROCESS_LOCAL, 2026 bytes) 19/01/10 11:07:09 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 21.0 (TID 67, localhost, partition 2,PROCESS_LOCAL, 2026 bytes) 19/01/10 11:07:09 INFO executor.Executor: Running task 1.0 in stage 21.0 (TID 66) 19/01/10 11:07:09 INFO executor.Executor: Running task 0.0 in stage 21.0 (TID 65) 19/01/10 11:07:09 INFO executor.Executor: Finished task 0.0 in stage 21.0 (TID 65). 953 bytes result sent to driver 19/01/10 11:07:09 INFO executor.Executor: Running task 2.0 in stage 21.0 (TID 67) 19/01/10 11:07:09 INFO executor.Executor: Finished task 2.0 in stage 21.0 (TID 67). 953 bytes result sent to driver 19/01/10 11:07:09 INFO executor.Executor: Finished task 1.0 in stage 21.0 (TID 66). 953 bytes result sent to driver 19/01/10 11:07:09 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 21.0 (TID 65) in 5 ms on localhost (1/3) 19/01/10 11:07:09 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 21.0 (TID 67) in 4 ms on localhost (2/3) 19/01/10 11:07:09 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 21.0 (TID 66) in 6 ms on localhost (3/3) 19/01/10 11:07:09 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 21.0, whose tasks have all completed, from pool 19/01/10 11:07:09 INFO scheduler.DAGScheduler: ResultStage 21 (count at <console>:24) finished in 0.006 s 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Job 21 finished: count at <console>:24, took 0.015381 s res25: Long = 6
能够看到,由于前面指定了RDD的partition数量为3,因此执行action操做时,executor上分配的task数量就为3,即task 0.0
、task 1.0
、task 2.0
。
建立RDD:
scala> val rdd = sc.textFile("/hello", 2) rdd: org.apache.spark.rdd.RDD[String] = /hello MapPartitionsRDD[18] at textFile at <console>:21
执行action操做,观察日志输出:
scala> rdd.count() 19/01/10 11:14:01 INFO mapred.FileInputFormat: Total input paths to process : 1 19/01/10 11:14:01 INFO spark.SparkContext: Starting job: count at <console>:24 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Got job 24 (count at <console>:24) with 2 output partitions 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Final stage: ResultStage 24 (count at <console>:24) 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Parents of final stage: List() 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Missing parents: List() 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Submitting ResultStage 24 (/hello MapPartitionsRDD[18] at textFile at <console>:21), which has no missing parents 19/01/10 11:14:01 INFO storage.MemoryStore: Block broadcast_31 stored as values in memory (estimated size 2.9 KB, free 1638.6 KB) 19/01/10 11:14:01 INFO storage.MemoryStore: Block broadcast_31_piece0 stored as bytes in memory (estimated size 1764.0 B, free 1640.3 KB) 19/01/10 11:14:01 INFO storage.BlockManagerInfo: Added broadcast_31_piece0 in memory on localhost:58709 (size: 1764.0 B, free: 511.0 MB) 19/01/10 11:14:01 INFO spark.SparkContext: Created broadcast 31 from broadcast at DAGScheduler.scala:1006 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 24 (/hello MapPartitionsRDD[18] at textFile at <console>:21) 19/01/10 11:14:01 INFO scheduler.TaskSchedulerImpl: Adding task set 24.0 with 2 tasks 19/01/10 11:14:01 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 24.0 (TID 76, localhost, partition 0,ANY, 2129 bytes) 19/01/10 11:14:01 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 24.0 (TID 77, localhost, partition 1,ANY, 2129 bytes) 19/01/10 11:14:01 INFO executor.Executor: Running task 0.0 in stage 24.0 (TID 76) 19/01/10 11:14:01 INFO executor.Executor: Running task 1.0 in stage 24.0 (TID 77) 19/01/10 11:14:01 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/hello:0+31 19/01/10 11:14:01 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/hello:31+31 19/01/10 11:14:01 INFO executor.Executor: Finished task 0.0 in stage 24.0 (TID 76). 2137 bytes result sent to driver 19/01/10 11:14:01 INFO executor.Executor: Finished task 1.0 in stage 24.0 (TID 77). 2137 bytes result sent to driver 19/01/10 11:14:01 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 24.0 (TID 76) in 23 ms on localhost (1/2) 19/01/10 11:14:01 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 24.0 (TID 77) in 23 ms on localhost (2/2) 19/01/10 11:14:01 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 24.0, whose tasks have all completed, from pool 19/01/10 11:14:01 INFO scheduler.DAGScheduler: ResultStage 24 (count at <console>:24) finished in 0.023 s 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Job 24 finished: count at <console>:24, took 0.035795 s res29: Long = 6
那么效果跟前面是同样的,只是这里是从hdfs上读取文件,同时partition数量设置为2.