SparkSession结构化流处理最后实际上是经过DataSet的writeStream触发执行的。这点与传统的spark sql方式是不同的。writeStream会找到StreamingQueryManager的startQuery方法,而后一步步到MicroBatchExecution和ContinuousExecution。java
核心点:MicroBatchExecution和ContinuousExecution里面会对StreamingRelationV2进行转换,转换成StreamingDataSourceV2Relation。而MicroBatchExecution和ContinuousExecution只有在StreamingQueryManager的createQuery方法中才会被使用到。那么这个StreamingQueryManager的createQuery方法会在哪里被使用到呢?跟踪代码会发现是DataStreamWriter中调用StreamingQueryManager的startQuery方法进而调用到createQuery方法的。sql
而DataStreamWriter是Dataset的writeStream建立的。session
【以上说的是写入流的过程】。框架
关键类:BaseSessionStateBuilder,里面有analyzer的定义。ide
protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallBackFileSourceV2(session) +: DataSourceResolution(conf, this.catalogManager) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = new DetectAmbiguousSelfJoin(conf) +: PreprocessTableCreation(session) +: PreprocessTableInsertion(conf) +: DataSourceAnalysis(conf) +: customPostHocResolutionRules override val extendedCheckRules: Seq[LogicalPlan => Unit] = PreWriteCheck +: PreReadCheck +: HiveOnlyCheck +: TableCapabilityCheck +: customCheckRules }
这里没有特别须要关注的,先忽略。post
DataSourceV2是指spark中V2版本的结构化流处理引擎框架。这里说的逻辑计划就是StreamingDataSourceV2Relation,对应的物理计划分红两类:MicroBatchScanExec和ContinuousScanExec,二者的应用场景从取名上就能够分辨出来,一个是微批处理模式;另外一个则是连续流模式。ui
咱们先从物理计划开始解析。this
这两个物理计划基于同一个父类:DataSourceV2ScanExecBase,先看看父类的代码:spa
关键代码:code
override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") inputRDD.map { r => numOutputRows += 1 r } }
子类须要重写inputRDD。
两种重要的checkpoint属性:
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))
offsetLog是当前读取到哪一个offset了,commitLog是当前处理到哪一个Offset了。这两个Log很是重要,合在一块儿保证了Exactly-once语义。
好了,先看看MicroBatchScanExec是怎么重写inputRDD的。
override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start, end) override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory() override lazy val inputRDD: RDD[InternalRow] = { new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar) }
有三个地方,第一个是重写Seq[InputPartition],调用stream的planInputPartitions方法,注意下这里的stream类型是MicroBatchStream;第二个是重写readerFactory,得到读取器工厂类;第三个重写是inputRDD,建立DataSourceRDD做为inputRDD,而前两步重写的Seq[InputPartition]和readerFactory做为DataSourceRDD的构造参数。
这里首先大概看下DataSourceRDD的功能是什么。
DataSourceRDD这个类的代码很短,很容易看清楚。最重要的就是compute方法,先给出所有代码:
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val inputPartition = castPartition(split).inputPartition val reader: PartitionReader[_] = if (columnarReads) { partitionReaderFactory.createColumnarReader(inputPartition) } else { partitionReaderFactory.createReader(inputPartition) } context.addTaskCompletionListener[Unit](_ => reader.close()) val iter = new Iterator[Any] { private[this] var valuePrepared = false override def hasNext: Boolean = { if (!valuePrepared) { valuePrepared = reader.next() } valuePrepared } override def next(): Any = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } valuePrepared = false reader.get() } } // TODO: SPARK-25083 remove the type erasure hack in data source scan new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]]) }
先根据读取器工厂类建立一个PartitionReader,而后调用PartitionReader的get方法获取数据。就是这么简单了!
最后再看下ContinuousScanExec的定义。
override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start) override lazy val readerFactory: ContinuousPartitionReaderFactory = { stream.createContinuousReaderFactory() } override lazy val inputRDD: RDD[InternalRow] = { EpochCoordinatorRef.get( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), sparkContext.env) .askSync[Unit](SetReaderPartitions(partitions.size)) new ContinuousDataSourceRDD( sparkContext, sqlContext.conf.continuousStreamingExecutorQueueSize, sqlContext.conf.continuousStreamingExecutorPollIntervalMs, partitions, schema, readerFactory.asInstanceOf[ContinuousPartitionReaderFactory]) }
和微批处理模式MicroBatchScanExec相似,也有三个地方重写,第一个是重写Seq[InputPartition],调用stream的planInputPartitions方法,注意下这里的stream类型是ContinuousStream;第二个是重写readerFactory,得到读取器工厂类ContinuousPartitionReaderFactory;第三个重写是inputRDD,建立ContinuousDataSourceRDD做为inputRDD,而前两步重写的Seq[InputPartition]和readerFactory做为ContinuousDataSourceRDD的构造参数。
这里首先大概看下ContinuousDataSourceRDD的功能是什么。
ContinuousDataSourceRDD的代码和DataSourceRDD的基本差很少,直接看源码吧,这里就不细说了,也没啥好细说的,显得啰里啰唆。
对于Kafka来讲,ContinuousDataSourceRDD和DataSourceRDD其实最终是同样的,具体代码能够看工程:spark-sql-kafka-0-10里的代码。