标签(空格分隔): Sparkgit
先回顾一下WordCount的过程:web
sc.textFile("README.rd").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_)
val rawFile = sc.textFile("README.rd")
HadoopRDD
--> MappedRDD
;val splittedText = rawFile.flatMap(line => line.split(" "))
MappedRDD
--> FlatMappedRDD
;val wordCount = splittedText.map(word => (word, 1))
FlatMappedRDD
-- > MappedRDD
;val reduceJob = wordCount.reduceByKey(_+_)
reduceByKey
不是MappedRDD
的方法。reduceJob.foreach(println)
ClosureCleaner
的主要功能当Scala在建立一个闭包时,须要先断定那些变量会被闭包所使用并将这些须要使用的变量存储在闭包以内。可是有时会捕捉太多没必要要的变量,形成带宽浪费和资源浪费,ClosureCleaner
则能够移除这些没必要要的外部变量。编程
常常会遇到Task Not Serializable
错误,产生没法序列化的缘由就是在RDD的操做中引用了没法序列化的变量。设计模式
做业的提交过程主要涉及Driver和Executor两个节点。
在Driver中主要解决一下问题:api
(对于WordCount程序来讲,一直到foreach()阶段才会被提交,分析,执行!!)浏览器
Spark中的RDD之间的依赖分为窄依赖和宽依赖。缓存
将会致使窄依赖的Transformation
有:闭包
将会致使宽依赖的Transformation
有:并发
Scheduler会计算RDD之间的依赖关系,将拥有持续窄依赖的RDD归并到同一个Stage中,而宽依赖则做为划分不一样Stage的判断标准。其中,handleJobSubmitted
和submitStage
主要负责依赖性分析,对其处理逻辑作进一步的分析。app
handleJobSubmitted
-- 生成finalStage
并产生ActiveJob
finalStage = new Stage(finalRDD, partitions.size, None, jobId, callSite); //生成finalStage val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) //根据finalStage产生ActiveJob
newStage
-- 建立一个新的Stageprivate def newStage(rdd:RDD[_], numTasks:Int, shuffleDep:Option[shuffleDependency[_,_,_]], jobId:Int, callSite:CallSite) : Stage = { val stage = new Stage(id,rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) } //参数含义:id -- Stage的序号,数字越大,优先级越高 //rdd:Rdd[_] -- 归属本Stage的最后一个rdd //numTasks -- 建立的Task数目,等于父rdd的输出Partition的数目 //parents -- 父Stage列表
也就是说,在建立Stage的时候,已经清楚该Stage须要从多少不一样的Partition读入数据,并写出到多少个不一样的Partition中,即输入与输出的个数已经明确。
submitStage
-- 递归完成所依赖的Stage而后提交1) 所依赖的Stage是否都已经完成,若是没有则先执行所依赖的Stage;
2) 若是所依赖的Stage已经完成,则提交自身所处的Stage。
private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if(jobId.isDefined) { .... //依次处理所依赖的没有完成的Stage } else { abortStage(stage, "No active job for stage " + stage.id) //提交自身的Stage } }
getMissingParentStage
-- 经过图的遍历,找出依赖的全部父Stageprivate def getMissingParentStage(stage: Stage) : List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] }
Stage的划分是如何肯定的呢? -- 重要的判断依据是是否存在ShuffleDependency
,若是有则建立一个新的Stage。
如何判断是否存在ShuffleDependency
呢? -- 取决于RDD的转换。ShuffledRDD, CoGroupedRDD, SubtractedRDD
都会返回ShuffleDependency
。
getDependencies
-- 对于所建立的RDD,明确其Dependency类型override def getDependencies: Seq[Dependency[_]] = { List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine)) }
Stage划分完毕就会明确如下内容:
1) 产生的Stage须要从多少个Partition中读取数据;
2) 产生的Stage会生成多少个Partition -- 决定须要产生多少不一样的Task;
3) 产生的Stage是否属于ShuffleMap类型 -- 决定生成的Task类型。
Spark中共分2种不一样的Task:ShuffleMap和ResultTask。
在做业提交及执行期间,Spark会产生大量的消息交互,那么这些信息如何进行交互的呢?
HelloWorld in Akka:
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props class HelloActor extends Actor { def receive = { case "hello" => println("hello back at you") case _ => println("huh?") } } object Main extends App { val system = ActorSystem("HelloSystem") //default Actor constructor val helloActor = System.actorOf(Props[HelloActor], name = "helloactor") helloActor ! "hello" helloActor ! "dias" }
注意:
!
;receive
函数来处理接收到的消息。ShuffleMapTask(Map)
和ResultTask(Reduce)
两种;isShuffleMap
标记肯定Task的类型,若是标记为True则建立shuffleMapTask
,不然建立ResultTask
;submitMissingTasks
负责建立新的Task(根据isShuffleMap
标志来肯定是哪一种Task,而后肯定Stage的输出和输出Partition);makeOffers
-- 处理DriverActor接收到的消息信号TaskschedulerImpl
发送ReviveOffers
消息给DriverActor
,DriverActor
接收到消息后,调用makeOffers
处理消息;
def makeOffers() { launchTasks(scheduler.resourceOffers( executorHost.toArray.map{case(id, host) => new WorkerOffer(id, host, freeCores(id))})) }
makeOffers的处理逻辑为:
resourceOffers
-- 任务分发SchedulerBackend
-- 将新建立的Task分发给ExecutorLaunchTasks
-- 发送指令TaskDescription
-- 完成序列化launchTask
//CoarseGrainedSchedulerBackend.launchTasks def launchTasks(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { val tr = new TaskRunner(context, taskId, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) }
TaskRunner
-- 反序列化updateDependencies
-- 解决依赖性问题TaskRunner会启动一个新的线程,如何在run中调用用户本身定义的处理函数呢?做用于RDD上的Operation是如何真正起做用的呢?
TaskRunner.run |_Task.run |_Task.runTask |_RDD.iterator |_RDD.computeOrReadCheckpoint |_RDD.compute
Task在执行的时候,会产生大量的数据交互,这些数据能够分红3种不一样的类型:
1)状态相关,如StatusUpdate;
2)中间结果;
3)计算相关的数据Metrics Data.
ShuffleMapTask和ResultTask返回的结果有什么不一样:
ShuffleMapTask
须要返回MapStatus,而ResultTask
只须要告知是否已经成功完成执行;ScheduleBack
接收到Executor发送过来的StatusUpdate;ScheduleBackend
接收到StatusUpdate以后:若是任务已经成功处理,则将其从监视列表中删除。若是整个做业都完成,将占用的资源释放;TaskSchedulerImpl
将当前顺利完成的任务放入完成队列,同时取出下一个等待运行的Task;DAGSchedule
中的handleTaskCompletion
,会针对ResultTask和ShuffleMapTask区别对待结果:DAGSchedule
会发出TaskSucced来通知对整个做业执行状况感兴趣的监听者出于容错性及效率方面的考虑,有时须要将中间结果进行持久化保存,能够方便后面再次利用到该RDD时不须要从新计算。
中间结果的存储有两种方式:Checkpoint 和 Cache
当用户在使用Spark时,不管对Spark Cluster的运行状况仍是Spark Application运行时的一些细节,但愿可以可视化的观察。
浏览器输入:http://localhost:8080
Http Server是如何启动的,网页中显示的数据是从哪里获得的?
1) Spark用到的Http Server是Jetty,用Java编写,可以嵌入到用户程序中执行,不用想Tomcat或JBoss那样须要本身独立的JVM进程。
2) SparkUI在SparkContext初始化时建立。
//Initial the spark UI, registering all asociated listeners private[spark] val ui = new SparkUI(this) ui.bind() //bind()函数真正启动JettyServer
3) SparkListener持续监听Stage和Task相关事件的发生,并进行数据更新(典型的观察者设计模式)。
测量模块是不可或缺的,经过测量数据来感知系统的运行状况。在Spark中,由MetricsSystem来担任这个任务。
Instance:
表示谁在使用MetricSystem -- Master,Worker,Executor,Client Driver;Source:
表示数据源;Sinks:
数据目的地:在WordCount程序中,在JobTracker提交以后,被DAGScheduler分为两个Stage:ShuffleMapTask和ResultTask。ShuffleMapTask的输出数据是ResultTask的输入。
ShuffleMapTask.runTask ---| |-->ShuffledRDD.compute ---| | | | V-Store V-Store
那么问题来了,ShuffleMapTask的计算结果是如何被ResultTask得到的呢?
1)ShuffleMapTask将计算的状态(不是具体的计算数值)包装为MapStatus返回给DAGScheduler;
2)DAGScheduler将MapStatus保存到MapOutputTrackerMaster中;
3)ResultTask在调用ShuffledRDD时会利用BlockStoreShuffleFetcher中的fetch方法获取数据:
a. 首先要咨询MapOutputTrackerMaster所要获取数据的location;
b. 根据返回的结果调用BlockManager.getMultiple获取到真正的数据。
其中,MapStatus的结构如上图所示,由blockmanager_id 和 byteSize
构成,blockmanager_id
表示计算的中间结果数据实际存储在哪一个BlockManager,byteSize
表示不一样reduceid所要读取的数据的大小。
写入过程:
ShuffleMapTask.runTask HashShuffleWriter.write BlockObjectWriter.write
HashShuffleWriter.write
主要完成两件事情:
<hello, 1>
和<hello, 1>
都要写入的话,要先生成<hello, 2>
,再进行后续的写入工做;<k, val>
写入哪个文件中。(shuffle_id, map_id, reduce_id)
决定,;ShuffledRDD的compute函数式读取ShuffleMapTask计算结果的触点。
ShuffleRDD.compute()
-- 触发读取ShuffleMapTask的计算结果override def compute(split:Partition, context:TaskContext) : Iterator[P] = { val dep = dependencies.head.asInstanceOf[ShuffleDependency[K,V,C]] SparkEnv.get.shuffleManager.getReader().**read()**.asInstanceOf[Iterator[P]] //getReader()返回HashShuffleReader ...... }
HashShuffleReader.read()
override def read() : Iterator[Product2[K,C]] = { val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, Serializer.getSerializer(dep.serializer)) ..... }
BlockStoreShuffleFetcher.fetch()
BlockStoreShuffleFetcher
须要解决的问题:
如何根据得到的MapStatus取相应的BlockManager获取数据。
若是所要获取的文件落在本地,则调用getLocal读取;不然发送请求到远端BlockManager。
Spark内存的消耗。
Spark对内存的要求较高,在ShuffleMapTask和ResultTask中,因为须要先将计算结果保存在内存,而后写入磁盘,若是每一个数据分区的数据很大则会消耗大量的内存。
在Spark运行过程当中,能够将结果显示地保存下来,那么若是想获取缓存中的数据该怎么办?
CacheManager
:RDD在进行计算转换的时候,经过CacheManager来获取数据,并经过CacheManager来存储计算结果;BlockManager
:CacheManager在读取和存储数据的时候主要依赖BlockManager来操做,它决定数据是从内存仍是磁盘读取数据;MemoryStore
:负责将数据保存在或从内存中读取数据;DiskStore
:复杂将数据保存在或从内存中读取数据;BlockManagerWorker
:数据写入本地的MemoryStore或DiskStore是一个同步操做,为了保证容错性还须要将数据复制到其余节点,由BlockManagerWorker异步完成数据复制操做;ConnectionManager
:负责与其余计算节点创建链接,并负责数据的发送和接收;BlockManagerMaster
:该模块只运行在Driver Application所在的Executor,功能是负责记录下全部BlockId存储在哪一个SlaveWorker上。每一个存储子模块有SparkEnv来建立,建立过程在SparkEnv.create中完成。
① RDD.iterator是与Storage子系统交互的入口;
② CacheManager.getOrCompute调用BlockManager中的put接口来写入数据;
③ 数据优先写入到MemoryStore,若是内存已满,则将最近使用次数较少的数据写入磁盘;
④ 通知BlockManagerMaster有新的数据写入,在BlockManagerMaster中保存元数据;
⑤ 若是数据备份数目大于1,则将写入的数据与其余Slave Worker同步。
BlockManager.get()
,先尝试从本地获取,若是所要获取的内容不在本地,则发起远程获取。getRemote -> doGetRemote
;Spark优先将计算结果存储到内存中,当内存不足的时候,写到外部磁盘,究竟是怎样作的呢?
Tachyon以Master/Worker的方式组织集群,由Master负责管理、维护文件系统,文件数据存储在Worker节点中。
在最新的Spark中,Storage子系统引入了TachyonStore,在内存中实现了HDFS文件系统的接口,主要目的是尽量的利用内存来做为数据持久层,避免过多的磁盘读写操做。