Spark结构化流处理机制(2)

容错机制

  端到端的有且仅有一次保证,是结构化流设计的关键目标之一.session

  结构化流设计了 Structured Streaming sources,sinks等等,来跟踪确切的处理进度,并让其重启或重运行来处理任何故障socket

  streaming source是相似kafka的偏移量(offsets)来跟踪流的读取位置.执行引擎使用检查点(checkpoint)和预写日志(write ahead logs)来记录每一个执行其的偏移范围值ide

  streaming sinks 是设计用来保证处理的幂等性ui

  这样,依靠可回放的数据源(streaming source)和处理幂等(streaming sinks),结构流来作到任何故障下的端到端的有且仅有一次保证spa

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

其中,spark是SparkSession,lines是DataFrame,DataFrame就是Dataset[Row]。设计

DataSet日志

看看Dataset的触发因子的代码实现,好比foreach操做:code

def foreach(f: T => Unit): Unit = withNewRDDExecutionId {

    rdd.foreach(f)

  }



 private def withNewRDDExecutionId[U](body: => U): U = {

    SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) {

      rddQueryExecution.executedPlan.foreach { plan =>

        plan.resetMetrics()

      }

      body

    }

  }

接着看:orm

 def withNewExecutionId[T](

      sparkSession: SparkSession,

      queryExecution: QueryExecution,

      name: Option[String] = None)(body: => T): T = {

    val sc = sparkSession.sparkContext

    val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)

    val executionId = SQLExecution.nextExecutionId

    sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)

    executionIdToQueryExecution.put(executionId, queryExecution)

    try {     

      withSQLConfPropagated(sparkSession) {       

        try {         

          body

        } catch {         

        } finally {         

        }

      }

    } finally {

      executionIdToQueryExecution.remove(executionId)

      sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)

    }

  }

执行的真正代码就是 queryExecution: QueryExecution。 对象

@transient private lazy val rddQueryExecution: QueryExecution = {

    val deserialized = CatalystSerde.deserialize[T](logicalPlan)

    sparkSession.sessionState.executePlan(deserialized)

  }

看到了看到了,是sessionState.executePlan执行logicalPlan而获得了QueryExecution

这里的sessionState.executePlan其实就是建立了一个QueryExecution对象。而后执行QueryExecution的executedPlan方法获得SparkPlan这个物理计划。怎么生成的呢?

lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {

    SparkSession.setActiveSession(sparkSession)   

    planner.plan(ReturnAnswer(optimizedPlan.clone())).next()

  }

经过planner.plan方法生成。

planner是SparkPlanner。在BaseSessionStateBuilder类中定义。

protected def planner: SparkPlanner = {

    new SparkPlanner(session.sparkContext, conf, experimentalMethods) {

      override def extraPlanningStrategies: Seq[Strategy] =

        super.extraPlanningStrategies ++ customPlanningStrategies

    }

  }

SparkPlanner类

SparkPlanner对LogicalPlan执行各类策略,返回对应的SparkPlan。好比对于流应用来讲,有这样的策略:DataSourceV2Strategy。

典型的几个逻辑计划到物理计划的映射关系以下:

StreamingDataSourceV2Relation-》ContinuousScanExec

StreamingDataSourceV2Relation-》MicroBatchScanExec

前一种对应与Offset没有endOffset的状况,后一种对应于有endOffset的状况。前一种是没有结束的连续流,后一种是有区间的微批处理流。

前一种的时延能够达到1ms,后一种的时延只能达到100ms。

【代码】:

case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isDefined =>

      val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]

      val scanExec = MicroBatchScanExec(

        r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)

      val withProjection = if (scanExec.supportsColumnar) {

        scanExec

      } else {

        // Add a Project here to make sure we produce unsafe rows.

        ProjectExec(r.output, scanExec)

      }

      withProjection :: Nil

    case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isEmpty =>

      val continuousStream = r.stream.asInstanceOf[ContinuousStream]

      val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)

      val withProjection = if (scanExec.supportsColumnar) {

        scanExec

      } else {

        // Add a Project here to make sure we produce unsafe rows.

        ProjectExec(r.output, scanExec)

      }

      withProjection :: Nil
相关文章
相关标签/搜索