sparkContext建立还没完呢,紧接着前两天,咱们继续探索。。做死。。。缓存
紧接着前几天咱们继续SparkContext的建立:app
接下来从这里咱们能够看到,spark开始加载hadoop的配置信息,第二张图中 new出来的Configuration正是hadoop的Configuration。同时,将全部sparkConf中全部以spark.hadoop.开头的属性都复制到了Hadoop的Configuration.同时又将spark.buffer.size复制为Hadoop的Configuration的配置的Io.file.buffer.size.随之加载相关jar包。再下来,咱们能够看到:oop
咱们能够看到,将全部的executor的环境变量加载于_executorMemory以及executorEnvs,后续应该在注册executor时进行调用。随之建立_taskScheduler:spa
那么咱们深刻看下createTaskScheduler的过程:3d
这里能够看到,它干了不少变态的事情,那么先说下,什么是TaskScheduler呢?TaskScheduler负责任务的提交,而且请求集群管理器对任务调度。TaskScheduler也能够看作任务调度的客户端。那么createTaskScheduler会根据master的配置(master match),匹配部署模式,利用反射建立yarn-cluster(本例图中为local及yarn-cluster),随之initialize了CoarseGrainedSchedulerBackend。(之后再深刻了解CoarseGrainedSchedulerBackend)blog
代码中能够看到,建立了TaskSchedulerImpl,它是什么呢?事件
它从SparkConf中读取配置信息,包括每一个任务分配的CPU数,失败task重试次数(可经过spark.task.maxFailures来配置),多久推测执行一次spark.speculation.interval(固然是在spark.speculation为true的状况下生效)等等。这里还有个调度模式,调度模式分为FIFO和FAIR两种,经过修改参数spark.scheduler.mode来改变。 最终建立TaskResultGetter,它的做用是对executor中的task的执行结果进行处理。hadoop
随之,开始建立DAG。DAGScheduler主要用于在任务正式交给TaskSchedulerImpl提交以前作一些准备工做。建立job,将DAG中的RDD划分到不一样的Stage,提交Stage,等等。部署
咱们继续深刻看下它的建立过程。get
从这些变量中,咱们能够看到,DAG是将全部jobId,stageId等信息之间的关系,以及缓存的RDD的partition位置等。好比getCacheLocs、getShuffleMapStage、getParentStagesAndId、newOrUsedShuffleStage。下来,经过applicationId注册并建立executor.
中间省略一万字(实际上是没看懂),下来建立并启动ExecutorAllocationManager,它是干吗的呢?
ExecutorAllocationManager是对全部的已分配的Executor进行管理。默认状况下不会建立ExecutorAllocationManager,能够修改属性spark.dynamicAllocation.enabled为true来建立。ExecutorAllocationManager能够设置动态分配最小Executor数量、动态分配最大Executor数量,每一个Executor能够运行的Task数量等配置信息。(这个还真要试一下,没有配置过)ExecutorAllocationListener经过监听listenerBus里的事件、动态添加、删除exeuctor,经过Thread不断添加Executor,遍历Executor,将超时的Executor杀掉并移除。
参考文献:《深刻理解Spark核心思想与源码解析》