WordCount能够说是分布式数据处理框架的”Hello World”,咱们能够以它为例来剖析一个Spark Job的执行全过程。编程
咱们要执行的代码为:数组
只有一行,很简单也很经典的代码。这里的collect
做为一个action,将触发一个Job,如今咱们从源码开始剖析这个Job执行的所有过程。我此次读的源码是Spark 1.4.1的release版本。网络
为了方便描述,咱们把上面的代码先进行一下拆分,这样能够清晰的看到每一步生成的RDD及其依赖关系,并方便下面分析时进行引用:闭包
首先,collect调用了SparkContext上的runJob方法。这个方法是一个阻塞方法,会在Job完成以前一直阻塞等待,直到Job执行完成以后返回所得的结果:框架
RDD.collect
分布式
须要注意的是这里传入了一个函数,这个函数就是这个Job的要执行的任务。后面咱们能够看到,它将会被包装并序列化后发送到要执行它的executor上,并在要处理的RDD上的每一个分区上被调用执行。ide
SparkContext的runJob被调用以后,这个Job的信息被递传给了SparkContext持有的一个DAGScheduler上。DAGScheduler自己维护着一个消息队列,在收到这个Job以后,将给本身的消息队列发送一个JobSubmitted消息。这个消息中包含了新生成的一个JobId, 触发action的RDD,通过清理后的闭包函数,要处理的各个分区的在RDD中的索引,以及一些其余信息。函数
DAGScheduler的消息队列在收到JobSubmitted消息后,将触发调用handleJobSubmitted方法。在这个方法中,首先会根据这个触发action的RDD的依赖信息计算出这个Job的全部Stage。在这个WordCount中,咱们是在reduceByKey生成的shuffledRDD3(其生成的过程涉及到通用的combineByKey方法,具体能够参考这篇文章)上触发的action,因此咱们的ResultStage所对应的finalRDD就是shuffledRDD3,ResultStage所要执行的就是shuffledRDD3的全部分区。shuffledRDD3有一个ShuffleDependency,指向mapPartitionsRDD2,据此ShuffleDependency会生成一个ShuffleMapStage,它是ResultStage的父Stage。oop
在分析出全部的Stage以后,DAGScheduler会根据ResultStage建立出一个ActiveJob对象,用来表示这个活跃的Job。而后提交ResultStage,可是在真正执行这个Stage以前,先递归的判断它有没有父Stage,如有的话先提交它的父Stage,并将当前Stage加入等待队列;若没有父Stage,才会真正的开始执行这个Stage。等待队列中的Stage,会在父Stage都执行完成以后再被执行。post
由此能够看出,在一个Job中,Stage之间必须按序执行,后一个Stage的执行将依赖前一个Stage的结果。一个Job只会有一个ResultStage,而且这个ResultStage必定会是整个Job的最后一个Stage,因此ResultStage执行的结束也就标志着整个Job的结束。
按照以前的分析,咱们的Job一共有两个Stage,一个ShuffleMapStage,一个ResultStage,并将先执行ShuffleMapStage。在执行Stage的时候,会按此Stage对应的RDD的分区数量,对应每个分区建立一个Task。若是是ShuffleMapStage则建立ShuffleMapTask,若是是ResultStage则建立ResultTask。这些Task在后面将会被序列化后发到其余的executor上面去运行。
在这里分析一下每一个Task包含哪些信息
两种Task都会包含的信息有 (1)当前Stage对应的RDD对象(轻量级) (2)当前Stage的ID (3)要处理的那个分区信息(轻量级),以及该任务可能的最优执行位置(例如,对于hdfs上的文件,HadoopRDD中会记录其每个分区存储在集群的位置,并将这个位置经过依赖继承到其子RDD)除此以外,ShuffleMapTask还包含了对应的ShuffleDependency的对象(这其中实际上有分区的方法,数据合并的方法等计算时所需的信息);ResultTask还包含了当前这个Job最终要执行在每一个数据上的函数(在此状况下就是collect传给SparkContext的那个函数)。
在对每一个要处理的分区建立出各个Task以后,DAGScheduler会将同一个Stage的各个Task合并成一个TaskSet,并将其提交给TaskScheduler。至此,调度这些Task的工做就交给了TaskScheduler来进行。
TaskScheduler在收到这个TaskSet以后,首先为其建立一个TaskSetManager,这个TaskSetManager将辅助任务的调度。而后TaskScheduler将会调用SchedulerBackend上的reviveOffers方法去申请可用的资源。
SchedulerBackend是一个接口,它在不一样的部署模式下会有不一样的实现(实际上TaskScheduler也是这样)。SchedulerBackend的做用是调度和控制整个集群里面的资源(我是这么理解的,这里的资源指的是可用的executors),当reviveOffers方法被调用后,它会将当前可用的全部资源信息,经过调用TaskScheduler的resourceOffers提供给TaskScheduler(实际上这个过程是经过另外一个EndPoint类以消息队列的方式实现的,这样能够保证同时只会进行一个对资源的申请或释放过程)。
TaskScheduler在收到当前全部可用的资源信息后,会将这些资源信息按序提供给当前正在执行的多个TaskSet,每一个TaskSet再根据这些资源信息将当前能够执行的Task序列化后包装到一个TaskDescription对象中返回(这个TaskDescription对象中也包含了这个任务将要运行在哪一个executor上),最终经过TaskScheduler将全部当前的资源状况能够执行的Task对应的TaskDescription返回给SchedulerBackend。
SchedulerBackend这时才根据每一个TaskDescription将executors资源真正的分配给这些Task,并记录已分配掉的资源和剩余的资源,而后将TaskDescription中序列化后的Task经过网络(Spark使用akka框架)发送给它对应的executor。
集群中的executor在收到Task后,申请一个线程开始运行这个Task。这是整个Job中最核心的部分了,真正的计算都在这一步发生。首先将其反序列化,而后调用这个Task对象上的runTask方法。在这里对于ShuffleMapTask和ResultTask,runTask方法有着不一样的实现,并将返回不一样的内容。咱们分别来分别分析。
对于ShuffleMapTask,runTask首先获取对应的RDD和ShuffleDependency。在这里对应的RDD是mapPartitionsRDD2,ShuffleDependency中则有着合并的计算信息。而后调用RDD的iterator方法获取一个对应分区数据的迭代器。若是当前RDD分区的数据已经在以前计算过了,则会直接去内存或磁盘中获取,不然在此时就会调用mapPartitionsRDD2的compute方法,根据其依赖去计算它的分区数据。若是ShuffleDependency中的mapSideCombine标记为true,就会将iterator方法返回的分区数据在这里(也就是map端)进行合并(此时要求ShuffleDependency中的aggregator不为空,aggregator中包含了如何将数据进行合并的信息)。而后根据ShuffleDependency中的partitioner(默认是一个HashPartitioner)计算出每条数据在其结果端(就是shuffleRDD3中)的分区,并将其写入到本地磁盘中对应的文件中去(在这里写入方法有多种实现方式,1.4.1的版本默认是用了SortShuffleManager,还有的其余实现是HashShuffleManager和UnsafeShuffleManager,具体的实现方法在此处就不详说了)。当分区的每条数据都处理完后,runTask会返回一个MapStatus,这其中包含了一个BlockManagerId(标记了这个任务被执行的位置,也就是Map后的数据存储的位置)以及每一个结果分区(每一个reduceId)的数据的大小信息。最后这个MapStatus将经过网络发回给driver,dirver将其记录。
ShuffleMapTask.runTask
对于ResultTask,runTask首先也是获取对应的RDD和要在数据上执行的函数func。在这里对应的RDD应该是shuffleRDD3,而后调用RDD上的iterator获取这个分区的数据,并将其传入func函数中,将func函数的返回值做为runTask的返回值返回。过程看似简单,实际上在shuffleRDD3上调用iterator时就对应了shuffle的reduce端的合并。从shuffleRDD3的compute方法的实现能够看出,它的每一个分区的数据都要去执行了ShuffleMapTask的executor上面获取,因此会产生大量的网络流量和磁盘IO。这个过程就是MapReduce范式中的shuffle过程,这里面还有不少的细节我并无详述,可是这个过程十分关键,它的实现效率直接决定了分布式大数据处理的效率。
ResultTask.runTask
在runTask计算结束返回数据后,executor将其返回的数据进行序列化,而后根据序列化后数据的大小进行判断:若是数据大与某个值,就将其写入本地的内存或磁盘(若是内存不够),而后将数据的位置blockId和数据大小封装到一个IndirectTaskResult中,并将其序列化;若是数据不是很大,则直接将其封装入一个DirectTaskResult并进行序列化。最终将序列化后的DirectTaskResult或者IndirectTaskResult递传给executor上运行的一个ExecutorBackend上(经过statusUpdate方法)。
ExecutorBackend如上面的SchedulerBackend有着类似的功能(实际上,对于local模式,这两个类都由一个LocalBackend实现),将结果封入一个StatusUpdate消息透传给一个对应的EndPoint类,EndPoint类中收到这个消息后将该消息再经过网络发送给driver。
在driver端的SchedulerBackend收到这个StatusUpdate消息以后,将结果续传给TaskScheduler,并进行资源的释放,在释放资源后再调用一次reviveOffers,这样又能够重复上面所描述的过程,将释放出来的资源安排给其余的Task来执行。
TaskScheduler在收到任务结果后,将这个任务标记为结束,而后使用一个TaskResultGetter类来进行结果的解析。TaskResultGetter将结果反序列化,判断若是其是一个DirectTaskResult则直接抽取出其中的结果;若是是一个IndirectTaskResult则须要根据其中的blockId信息去对应的机器上拉取结果。最终都是将结果拉取到driver的内存中(这就是咱们最好不要在大数据集上执行相似collect的方法的缘由,它会将全部的数据拉入driver的内存中,形成大量的内存开销,甚至内存不足)。而后TaskResultGetter会将拉取到的结果递交给TaskScheduler,TaskScheduler再将此结果递交给DAGScheduler。
DAGScheduler在收到Task完成的消息后,先判断这完成的是一个什么任务。若是是一个ShuffleMapTask则须要将返回的结果(MapStatus)记录到driver中,并判断若是当前的ShuffleMapStage如果已经完成,则去提交下一个Stage。若是是一个ResultTask完成了, 则将其结果递交给JobWaiter,并标记这个任务以完成。
JobWaiter是DAGScheduler在最开始submitJob的时候建立的一个对象,用于阻塞等待任务的完成,并进行结果的处理。JobWaiter在每收到一个ResultTask的结果时,都将结果在resultHandler上执行。这个resultHandler则是由SparkContext传进来的一个函数,其做用是将数据放入一个数组中,这个数组最终将做为SparkContext.runJob方法的返回值,被最开始的collect方法接收而后返回。若JobWaiter收到了每一个ResultTask的结果,则表示整个Job已经完成,此时就中止阻塞等待,因而SparkContext.runJob返回一个结果的数组,并由collect接收后返回给用户程序。
至此,一个Spark的WordCount执行结束。
本文从源码的角度详细分析了一个Spark Job的整个执行、调度的过程,不过不少东西还只是浅尝辄止,并未彻底深刻。尽管如此,通过连续好几天的分析,我仍是以为收获颇丰,对Spark的实现原理有了更加深刻的理解,甚至对MapReduce的编程范式以及其shuffle过程也增长了很多理解。PS:其实从一开始我到分析结束都是没有作任何记录的,只由于一直只知其一;不知其二实在不知道如何来作记录,因此只是去查阅一些资料和使劲儿的阅读源码。在我自认为分析结束后,我才开始写这篇记录,可是在写的过程当中我才发现我分析的过程有一些并非很清晰,而后从新去看,才真正弄的比较清晰了。可见写博文是很重要的过程,不只是将学到的知识分享出来,并且对自身的知识也有很好的加固做用。