欢迎转载,转载请注明出处,徽沪一郎。html
本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回。shell
local-cluster模式也称为伪分布式,可使用以下指令运行apache
MASTER=local[1,2,1024] bin/spark-shell
[1,2,1024] 分别表示,executor number, core number和内存大小,其中内存大小不该小于默认的512Mapi
步骤1: 根据初始化入参生成SparkConf,再根据SparkConf来建立SparkEnv, SparkEnv中主要包含如下关键性组件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManagerbash
private[spark] val env = SparkEnv.create( conf, "", conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt, isDriver = true, isLocal = isLocal) SparkEnv.set(env)
步骤2:建立TaskScheduler,根据Spark的运行模式来选择相应的SchedulerBackend,同时启动taskscheduler,这一步至为关键app
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName) taskScheduler.start()
TaskScheduler.start目的是启动相应的SchedulerBackend,并启动定时器进行检测分布式
override def start() { backend.start() if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, SPECULATION_INTERVAL milliseconds) { checkSpeculatableTasks() } } }
步骤3:以上一步中建立的TaskScheduler实例为入参建立DAGScheduler并启动运行ide
@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler) dagScheduler.start()
步骤4:启动WEB UI函数
ui.start()
仍是以最简单的wordcount为例说明rdd的转换过程oop
sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
上述一行简短的代码其实发生了很复杂的RDD转换,下面仔细解释每一步的转换过程和转换结果
textFile先是生成hadoopRDD,而后再经过map操做生成MappedRDD,若是在spark-shell中执行上述语句,获得的结果能够证实所作的分析
scala> sc.textFile("README.md") 14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes 14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0, maxMem=311387750 14/04/23 13:11:48 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB) 14/04/23 13:11:48 DEBUG BlockManager: Put block broadcast_0 locally took 277 ms 14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took 281 ms res0: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13
flatMap将原来的MappedRDD转换成为FlatMappedRDD
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f))
利用word生成相应的键值对,上一步的FlatMappedRDD被转换成为MappedRDD
步骤2,3中使用到的operation所有定义在RDD.scala中,而这里使用到的reduceByKey却在RDD.scala中见不到踪影。reduceByKey的定义出如今源文件PairRDDFunctions.scala
细心的你必定会问reduceByKey不是MappedRDD的属性和方法啊,怎么能被MappedRDD调用呢?其实这背后发生了一个隐式的转换,该转换将MappedRDD转换成为PairRDDFunctions
implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) = new PairRDDFunctions(rdd)
这种隐式的转换是scala的一个语法特征,若是想知道的更多,请用关键字"scala implicit method"进行查询,会有很多的文章对此进行详尽的介绍。
接下来再看一看reduceByKey的定义
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { reduceByKey(defaultPartitioner(self), func) } def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)] = { if (getKeyClass().isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else if (mapSideCombine) { val combined = self.mapPartitionsWithContext((context, iter) => { aggregator.combineValuesByKey(iter, context) }, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) partitioned.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) } else { // Don't apply map-side combiner. val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } }
reduceByKey最终会调用combineByKey, 在这个函数中PairedRDDFunctions会被转换成为ShuffleRDD,当调用mapPartitionsWithContext以后,shuffleRDD被转换成为MapPartitionsRDD
Log输出能证实咱们的分析
res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13
小结一下整个RDD转换过程
HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD
整个转换过程好长啊,这一切的转换都发生在任务提交以前。
在对任务运行过程当中的函数调用关系进行分析以前,咱们也来探讨一个偏理论的东西,做用于RDD之上的Transformantion为何会是这个样子?
对这个问题的解答和数学搭上关系了,从理论抽象的角度来讲,任务处理均可归结为“input->processing->output"。input和output对应于数据集dataset.
在此基础上做一下简单的分类
task的提交过程参考本系列中的第二篇文章。本节主要讲解当task在运行期间是如何一步步调用到做用于RDD上的各个operation
或许当看到RDD.compute函数定义时,仍是觉着f没有被调用,以MappedRDD的compute定义为例
override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(split, context).map(f)
注意,这里最容易产生错觉的地方就是map函数,这里的map不是RDD中的map,而是scala中定义的iterator的成员函数map, 请自行参考http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator
80 at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111) 81 at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154) 82 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149) 83 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64) 84 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 85 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 86 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 87 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 88 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 89 at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) 90 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 91 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 92 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 93 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 94 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 95 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) 96 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 97 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 98 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) 99 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) 100 at org.apache.spark.scheduler.Task.run(Task.scala:53) 101 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
compute的计算过程对于ShuffleMapTask比较复杂,绕的圈圈比较多,对于ResultTask就直接许多。
override def runTask(context: TaskContext): U = { metrics = Some(context.taskMetrics) try { func(context, rdd.iterator(split, context)) } finally { context.executeOnCompleteCallbacks() } }
上面的分析知道,wordcount这个job在最终提交以后,被DAGScheduler分为两个stage,第一个Stage是shuffleMapTask,第二个Stage是ResultTask.
那么ShuffleMapTask的计算结果是如何被ResultTask取得的呢?这个过程简述以下
BlockStoreShuffleFetcher的fetch函数伪码
val blockManager = SparkEnv.get.blockManager val startTime = System.currentTimeMillis val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format( shuffleId, reduceId, System.currentTimeMillis - startTime)) val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) val itr = blockFetcherItr.flatMap(unpackBlock)
注意上述代码中的getServerStatuses及getMultiple,一个是询问数据的位置,一个是去获取真正的数据。
有关Shuffle的详细解释,请参考”详细探究Spark的shuffle实现一文" http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/