此文是从思惟导图中导出稍做调整后生成的,思惟脑图对代码浏览支持不是很好,为了更好阅读体验,文中涉及到的源码都是删除掉没必要要的代码后的伪代码,如需获取更好阅读体验可下载脑图配合阅读:git
此博文共分为四个部分:github
DStream和RDD关系:app
DStream is a continuous sequence of RDDs: generatedRDDs=new HashMap[Time,RDD[T]]()
存储格式socket
DStream内部经过一个HashMap的变量generatedRDD来记录生成的RDD:ide
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
其中 :函数
- key: time是生成当前batch的时间戳oop
- value: 生成的RDD实例this
每个不一样的 DStream 实例,都有一个本身的 generatedRDD,即每一个转换操做的结果都会保留spa
从rdd的map中获取:generatedRDDs.get(time).orElsescala
map中没有则计算:val newRDD=compute(time)
将计算的newRDD放入map中:generatedRDDs.put(time, newRDD)
其中compute方法有如下特色:
不一样DStream的计算方式不一样
inputStream会对接对应数据源的API
transformStream会从父依赖中去获取RDD并进行转换得新的DStream
compute方法实现:
class ReceiverInputDStream{ override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // driver failure without any write ahead log to recover pre-failure data. new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }
RDD主要分为如下三个过程:InputStream -> TransFormationStream -> OutputStream
inputstream包括FileInputStream,KafkaInputStream等等
FileInputStream的生成步骤:
找到新产生的文件:val newFiles = findNewFiles(validTime.milliseconds)
将newFiles转换为RDDs:val rdds=filesToRDD(newFiles)
2.1. 遍历文件列表获取生成RDD: val fileRDDs=files.map(file=>newAPIHadoop(file))
2.2. 将每一个文件的RDD进行合并并返回:return new UnionRDD(fileRDDs)
RDD的转换实现:
转换类的DStream实现特色:
传入parent DStream和转换函数
compute方法中从parent DStream中获取DStream并对其做用转换函数
private[streaming] class MappedDStream[T: ClassTag, U: ClassTag] ( parent: DStream[T], mapFunc: T => U ) extends DStream[U](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.map[U](mapFunc)) } }
不一样DStream的getOrCompute方法实现:
parent.getOrCompute(validTime).map(_.filter(filterFunc)
parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
在最开始, DStream 的 transformation 的 API 设计与 RDD 的 transformation 设计保持了一致,就使得,每个 dStreamA.transformation() 获得的新 dStreamB 能将 dStreamA.transformation() 操做完美复制为每一个 batch 的 rddA.transformation() 操做。这也就是 DStream 可以做为 RDD 模板,在每一个 batch 里实例化 RDD 的根本缘由。
OutputDStream的操做最后都转换到ForEachDStream(),ForeachDStream中会生成Job并返回。
伪代码
def generateJob(time:Time){ val jobFunc=()=>crateRDD{ foreachFunc(rdd,time) } Some(new Job(time,jobFunc)) }
源码
private[streaming] class ForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit ) extends DStream[Unit](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Unit]] = None override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time) { ssc.sparkContext.setCallSite(creationSite) foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } } }
经过对output stream节点进行遍历,就能够获得全部上游依赖的DStream,直至找到没有父依赖的inputStream。
DStream基本属性:
父依赖: dependencies: List[DStream[_]]
时间间隔:slideDuration:Duration
生成RDD的函数:compute
DStream的实现类可分为三种:输入,转换和输出
DStream之间的转换相似于RDD之间的转换,对于wordCount的例子,实现代码:
val lines=ssc.socketTextStream(ip,port) val worlds=lines.flatMap(_.split("_")) val pairs=words.map(word=>(word,1)) val wordCounts=pairs.reduceByKey(_+_) wordCounts.print()
每一个函数的返回对象用具体实现代替:
val lines=new SocketInputDStream(ip,port) val words=new FlatMappedDStream(lines,_.split("_")) val pairs=new MappedDStream(words,word=>(word,1)) val wordCounts=new ShuffledDStream(pairs,_+_) new ForeachDStream(wordCounts,cnt=>cnt.print())
DStream的实现分为两种,transformation和output
不一样的转换操做有其对应的DStream实现,全部的output操做只对应于ForeachDStream
逻辑DAG: 经过transformation操做正向生成
物理DAG: 惰性求值的缘由,在遇到output操做时根据dependency逆向宽度优先遍历求值。
DStreamGraph属性
inputStreams=new ArrayBuffer[InputDStream[_]]() outputStreams=new ArrayBuffer[DStream[_]]()
DAG实现过程
经过对output stream节点进行遍历,就能够获得全部上游依赖的DStream,直至找到没有父依赖的inputStream。
sparkStreaming 记录整个DStream DAG的方式就是经过一个DStreamGraph 实例记录了到全部output stream节点的引用
generateJobs
def generateJobs(time: Time): Seq[Job] = { val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = // 调用了foreachDStream来生成每一个job outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } // 返回生成的Job列表 jobs }
脑图制做参考:https://github.com/lw-lin/CoolplaySpark
完整脑图连接地址:https://sustblog.oss-cn-beijing.aliyuncs.com/blog/2018/spark/srccode/spark-streaming-all.png