Spark Streaming源码解析之DAG定义

此文是从思惟导图中导出稍做调整后生成的,思惟脑图对代码浏览支持不是很好,为了更好阅读体验,文中涉及到的源码都是删除掉没必要要的代码后的伪代码,如需获取更好阅读体验可下载脑图配合阅读:git

此博文共分为四个部分:github

  1. imgDAG定义
  2. imgJob动态生成
  3. img数据的产生与导入
  4. img容错

img

img

1. DStream

img

1.1. RDD

img

DStream和RDD关系:app

DStream is a continuous sequence of RDDs:
generatedRDDs=new HashMap[Time,RDD[T]]()

1.1.1. 存储

存储格式socket

DStream内部经过一个HashMap的变量generatedRDD来记录生成的RDD:ide

private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

其中 :函数

- key: time是生成当前batch的时间戳oop

- value: 生成的RDD实例this

每个不一样的 DStream 实例,都有一个本身的 generatedRDD,即每一个转换操做的结果都会保留spa

1.1.2. 获取

img

1.1.2.1. getOrCompute

img

  1. 从rdd的map中获取:generatedRDDs.get(time).orElsescala

  2. map中没有则计算:val newRDD=compute(time)

  3. 将计算的newRDD放入map中:generatedRDDs.put(time, newRDD)

其中compute方法有如下特色:

  • 不一样DStream的计算方式不一样

  • inputStream会对接对应数据源的API

  • transformStream会从父依赖中去获取RDD并进行转换得新的DStream

compute方法实现:

class ReceiverInputDStream{

 override def compute(validTime: Time): Option[RDD[T]] = {

    val blockRDD = {
 
      if (validTime < graph.startTime) {

        
        // If this is called for any time before the start time of the context,

        // then this returns an empty RDD. This may happen when recovering from a

        // driver failure without any write ahead log to recover pre-failure data.

        new BlockRDD[T](ssc.sc, Array.empty)

      } else {

        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream

        // for this batch

        val receiverTracker = ssc.scheduler.receiverTracker

        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

 
        // Register the input blocks information into InputInfoTracker

        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)

        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

 
        // Create the BlockRDD

        createBlockRDD(validTime, blockInfos)

      }

    }

    Some(blockRDD)

  }

1.1.3. 生成

RDD主要分为如下三个过程:InputStream -> TransFormationStream -> OutputStream

img

1.1.3.1. InputStream

inputstream包括FileInputStream,KafkaInputStream等等

img

1.1.3.1.1. FileInputStream

FileInputStream的生成步骤:

img

  1. 找到新产生的文件:val newFiles = findNewFiles(validTime.milliseconds)

  2. 将newFiles转换为RDDs:val rdds=filesToRDD(newFiles)

2.1. 遍历文件列表获取生成RDD: val fileRDDs=files.map(file=>newAPIHadoop(file))

2.2. 将每一个文件的RDD进行合并并返回:return new UnionRDD(fileRDDs)

  1. 返回生成的rdds

1.1.3.2. TransformationStream

img

RDD的转换实现:

  1. 获取parent DStream:val parentDs=parent.getOrCompute(validTime)
  2. 执行转换函数并返回转换结果:return parentDs.map(mapFunc)

转换类的DStream实现特色

  • 传入parent DStream和转换函数

  • compute方法中从parent DStream中获取DStream并对其做用转换函数

private[streaming]

class MappedDStream[T: ClassTag, U: ClassTag] (
   parent: DStream[T],
   mapFunc: T => U
 ) extends DStream[U](parent.ssc) {

 override def dependencies: List[DStream[_]] = List(parent)
 override def slideDuration: Duration = parent.slideDuration
 override def compute(validTime: Time): Option[RDD[U]] = {
   parent.getOrCompute(validTime).map(_.map[U](mapFunc))
 }
}

不一样DStream的getOrCompute方法实现:

  • FilteredDStream:parent.getOrCompute(validTime).map(_.filter(filterFunc)
  • FlatMapValuedDStream:parent.getOrCompute(validTime).map(_.flatMapValues[U](flatMapValueFunc)
  • MappedDStream:parent.getOrCompute(validTime).map(_.map[U](mapFunc))

在最开始, DStream 的 transformation 的 API 设计与 RDD 的 transformation 设计保持了一致,就使得,每个 dStreamA.transformation() 获得的新 dStreamB 能将 dStreamA.transformation() 操做完美复制为每一个 batch 的 rddA.transformation() 操做。这也就是 DStream 可以做为 RDD 模板,在每一个 batch 里实例化 RDD 的根本缘由。

1.1.3.3. OutputDStream

OutputDStream的操做最后都转换到ForEachDStream(),ForeachDStream中会生成Job并返回。

伪代码

def generateJob(time:Time){
	val jobFunc=()=>crateRDD{
		foreachFunc(rdd,time)
	}
	Some(new Job(time,jobFunc))
}

源码

private[streaming]
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)
  override def slideDuration: Duration = parent.slideDuration
  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time) {
          ssc.sparkContext.setCallSite(creationSite)
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
        case None => None
    }
  }
}

经过对output stream节点进行遍历,就能够获得全部上游依赖的DStream,直至找到没有父依赖的inputStream。

1.2. 特征

DStream基本属性:

  • 父依赖: dependencies: List[DStream[_]]

  • 时间间隔:slideDuration:Duration

  • 生成RDD的函数:compute

1.3. 实现类

DStream的实现类可分为三种:输入,转换和输出

img

DStream之间的转换相似于RDD之间的转换,对于wordCount的例子,实现代码:

val lines=ssc.socketTextStream(ip,port)
val worlds=lines.flatMap(_.split("_"))
val pairs=words.map(word=>(word,1))
val wordCounts=pairs.reduceByKey(_+_)
wordCounts.print()

每一个函数的返回对象用具体实现代替:

val lines=new SocketInputDStream(ip,port)
val words=new FlatMappedDStream(lines,_.split("_"))
val pairs=new MappedDStream(words,word=>(word,1))
val wordCounts=new ShuffledDStream(pairs,_+_)
new ForeachDStream(wordCounts,cnt=>cnt.print())

1.3.1. ForeachDStream

DStream的实现分为两种,transformation和output

不一样的转换操做有其对应的DStream实现,全部的output操做只对应于ForeachDStream

1.3.2. Transformed DStream

img

1.3.3. InputDStream

img

2. DStreamGraph

img

2.1 DAG分类

  • 逻辑DAG: 经过transformation操做正向生成

  • 物理DAG: 惰性求值的缘由,在遇到output操做时根据dependency逆向宽度优先遍历求值。

2.2 DAG生成

DStreamGraph属性

inputStreams=new ArrayBuffer[InputDStream[_]]()
 outputStreams=new ArrayBuffer[DStream[_]]()

DAG实现过程

​ 经过对output stream节点进行遍历,就能够获得全部上游依赖的DStream,直至找到没有父依赖的inputStream。

​ sparkStreaming 记录整个DStream DAG的方式就是经过一个DStreamGraph 实例记录了到全部output stream节点的引用

generateJobs

def generateJobs(time: Time): Seq[Job] = {
      val jobs = this.synchronized {
      outputStreams.flatMap { 
		outputStream =>
        val jobOption = 
			// 调用了foreachDStream来生成每一个job
			outputStream.generateJob(time)   jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
	// 返回生成的Job列表
    jobs
  }

脑图制做参考https://github.com/lw-lin/CoolplaySpark

完整脑图连接地址https://sustblog.oss-cn-beijing.aliyuncs.com/blog/2018/spark/srccode/spark-streaming-all.png

相关文章
相关标签/搜索