大batch任务对structured streaming任务影响

 信念,你拿它没办法,可是没有它你什么也作不成。—— 撒姆尔巴特勒html

前言

对于spark streaming而言,大的batch任务会致使后续batch任务积压,对于structured streaming任务影响如何,本篇文章主要来作一下简单的说明。java

本篇文章的称为设置trigger后,运行时间长的 query 对后续 query 的submit time的影响sql

Trigger类型

首先trigger有三种类型,分别为 OneTimeTrigger ProcessingTime 以及 ContinuousTrigger  三种。这三种解释能够参照 spark 集群优化 中对 trigger的解释说明。apache

设置OneTimeTrigger后,运行时间长的 query 对后续 query 的submit time的影响

OneTimeTrigger只执行一次query就结束了,不存在对后续batch影响。 json

设置ProcessingTimeTrigger后,运行时间长的 query 对后续 query 的submit time的影响

设置超过 trigger inverval的sleep时间

代码截图以下,即在每个partition上的task中添加一个sleep逻辑:session

运行效果

运行效果截图以下:app

UI的Jobs面板截图以下:less

UI的SQL面板截图以下:dom

经过上面两个面板截图中的submitted列能够看出,此时每个batch的query 提交时间是根据前驱query的结束时间来肯定的。ide

设置 ContinuousTrigger 后,运行时间长的 query 对后续 query 的submit time的影响

源码分析

下面从源码角度来分析一下。

StreamExecution的职责

它是当新数据到达时在后台连续执行的查询的句柄。管理在单独线程中发生的流式Spark SQL查询的执行。 与标准查询不一样,每次新数据到达查询计划中存在的任何 Source 时,流式查询都会重复执行。每当新数据到达时,都会建立一个 QueryExecution,并将结果以事务方式提交给给定的 Sink 。

它有两个子类,截图以下:

Tigger和StreamExecution的对应关系

在org.apache.spark.sql.streaming.StreamingQueryManager#createQuery方法中有以下代码片断:

 1 (sink, trigger) match {
 2       case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) =>
 3         if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
 4           UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
 5         }
 6         // 使用 ContinuousTrigger 则为 ContinuousExecution
 7         new StreamingQueryWrapper(new ContinuousExecution(
 8           sparkSession,
 9           userSpecifiedName.orNull,
10           checkpointLocation,
11           analyzedPlan,
12           v2Sink,
13           trigger,
14           triggerClock,
15           outputMode,
16           extraOptions,
17           deleteCheckpointOnStop))
18       case _ =>
19         // 使用 ProcessingTrigger 则为 MicroBatchExecution
20         new StreamingQueryWrapper(new MicroBatchExecution(
21           sparkSession,
22           userSpecifiedName.orNull,
23           checkpointLocation,
24           analyzedPlan,
25           sink,
26           trigger,
27           triggerClock,
28           outputMode,
29           extraOptions,
30           deleteCheckpointOnStop))
31     }

能够看出,Tigger和对应的StreamExecution的关系以下:

Trigger 
StreamExecution
OneTimeTrigger MicroBatchExecution
ProcessingTrigger MicroBatchExecution
ContinuousTrigger ContinuousExecution

另外,StreamExecution构造参数中的analyzedPlan是指LogicalPlan,也就是说在第一个query启动以前,LogicalPlan已经生成,此时的LogicalPlan是 UnResolved LogicalPlan,由于此时每个AST依赖的数据节点的source信息还未知,还没法优化LogicalPlan。

注意

ContinuousExecution支持的source类型目前有限,主要为StreamWriteSupport子类,即:

source
class full name
console org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
kafka org.apache.spark.sql.kafka010.KafkaSourceProvider
ForeachSink org.apache.spark.sql.execution.streaming.sources.ForeachWriterProvider 
MemorySinkV2 org.apache.spark.sql.execution.streaming.sources.MemorySinkV2

不然会匹配到 MicroBatchExecution, 可是在初始化 triggerExecution成员变量时,只支持ProcessingTrigger,不支持 ContinuousTrigger,会抛出以下异常:

StreamExecution的执行

org.apache.spark.sql.streaming.StreamingQueryManager#startQuery有以下代码片断:

 1 try {
 2       // When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously.
 3       // As it's provided by the user and can run arbitrary codes, we must not hold any lock here.
 4       // Otherwise, it's easy to cause dead-lock, or block too long if the user codes take a long
 5       // time to finish.
 6       query.streamingQuery.start()
 7     } catch {
 8       case e: Throwable =>
 9         activeQueriesLock.synchronized {
10           activeQueries -= query.id
11         }
12         throw e
13     }

这里的query.streamingQuery就是StreamExecution,即为MicroBatchExecution 或 ContinuousExecution。

StreamExecution的start 方法以下:

 1 /**
 2    * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]]
 3    * has been posted to all the listeners.
 4    */
 5   def start(): Unit = {
 6     logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.")
 7     queryExecutionThread.setDaemon(true)
 8     queryExecutionThread.start()
 9     startLatch.await()  // Wait until thread started and QueryStart event has been posted
10   }

queryExecutionThread成员变量声明以下:

 1 /**
 2    * The thread that runs the micro-batches of this stream. Note that this thread must be
 3    * [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a
 4    * running `KafkaConsumer` may cause endless loop.
 5    */
 6   val queryExecutionThread: QueryExecutionThread =
 7     new QueryExecutionThread(s"stream execution thread for $prettyIdString") {
 8       override def run(): Unit = {
 9         // To fix call site like "run at <unknown>:0", we bridge the call site from the caller
10         // thread to this micro batch thread
11         sparkSession.sparkContext.setCallSite(callSite)
12         runStream()
13       }
14     }

其中,QueryExecutionThread 是 UninterruptibleThread 的子类,UninterruptibleThread  是 Thread的子类,即QueryExecutionThread  是一个线程类。他会运行runStream方法,runStream关键代码以下:

try {
// 运行Stream query的准备工做,send QueryStartedEvent event, countDown latch,streaming configure等操做
runActivatedStream(sparkSessionForStream) // 运行 stream
} catch {
// 异常处理
} finally {
 // 运行完Stream query的收尾工做,stop source,send stream stop event,删除checkpoint(若是启用deleteCheckpointOnStop)等等操做
}

runActivatedStream 说明:Run the activated stream until stopped. :它是抽象方法,由子类实现。

MicroBatchExecution的runActivatedStream的实现

MicroBatchExecution 的 runActivatedStream的方法逻辑描述以下:

1 triggerExecutor.execute(() =>{
2     提交执行每个query的操做
3 })

triggerExecution 的定义以下:

private val triggerExecutor = trigger match {
    case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
    case OneTimeTrigger => OneTimeExecutor()
    case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
}

即便用 ProcessingTime 会使用 ProcessingTimeExecutor 来周期性生成 batch query,其 execution 方法代码以下:

 1 override def execute(triggerHandler: () => Boolean): Unit = {
 2     while (true) {
 3       val triggerTimeMs = clock.getTimeMillis
 4       val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
 5       val terminated = !triggerHandler()
 6       if (intervalMs > 0) {
 7         val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
 8         if (batchElapsedTimeMs > intervalMs) {
 9           notifyBatchFallingBehind(batchElapsedTimeMs)
10         }
11         if (terminated) {
12           return
13         }
14         clock.waitTillTime(nextTriggerTimeMs)
15       } else {
16         if (terminated) {
17           return
18         }
19       }
20     }
21   }

伪代码以下:

def execute(triggerHandler: () => Boolean): Unit = {
 while(true) {
    获取current_time
    根据current_time和interval获取下一个批次start_time
    执行query任务获取并获取是否结束stream的标志位
    if(interval > 0) {
       query使用时间 = 新获取的current_time - 旧的current_time
       if(query使用时间 > interval) {
          notifyBatchFallingBehind // 目前只是打印warn日志
       }
       if(stream终止标志位为true){
          return // 结束这个while循环退出方法
       }
        // Clock.waitTillTime SystemClock子类经过while + sleep(ms)实现,其他子类经过while + wait(ms) 来实现,使用while是为了防止外部中断致使wait时间不够
    } else {
       if(stream终止标志位为true){
          return // 结束这个while循环退出方法
       }
    }
 }
}

即stream没有中止状况下,下一个batch的提交时间为 = 当前batch使用时间 > interval ? 当前batch结束时间:本批次开始时间 / interval * interval + interval 

ContinuousExecution的runActivatedStream的实现

源码以下:

 1 override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
 2     val stateUpdate = new UnaryOperator[State] {
 3       override def apply(s: State) = s match {
 4         // If we ended the query to reconfigure, reset the state to active.
 5         case RECONFIGURING => ACTIVE
 6         case _ => s
 7       }
 8     }
 9 
10     do {
11       runContinuous(sparkSessionForStream)
12     } while (state.updateAndGet(stateUpdate) == ACTIVE)
13   }

其中,runContinuous 源码以下:

  /**
   * Do a continuous run.
   * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
   */
  private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
    // A list of attributes that will need to be updated.
    val replacements = new ArrayBuffer[(Attribute, Attribute)]
    // Translate from continuous relation to the underlying data source.
    var nextSourceId = 0
    continuousSources = logicalPlan.collect {
      case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
        val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
        nextSourceId += 1

        dataSource.createContinuousReader(
          java.util.Optional.empty[StructType](),
          metadataPath,
          new DataSourceOptions(extraReaderOptions.asJava))
    }
    uniqueSources = continuousSources.distinct

    val offsets = getStartOffsets(sparkSessionForQuery)

    var insertedSourceId = 0
    val withNewSources = logicalPlan transform {
      case ContinuousExecutionRelation(source, options, output) =>
        val reader = continuousSources(insertedSourceId)
        insertedSourceId += 1
        val newOutput = reader.readSchema().toAttributes

        assert(output.size == newOutput.size,
          s"Invalid reader: ${Utils.truncatedString(output, ",")} != " +
            s"${Utils.truncatedString(newOutput, ",")}")
        replacements ++= output.zip(newOutput)

        val loggedOffset = offsets.offsets(0)
        val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
        reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull))
        StreamingDataSourceV2Relation(newOutput, source, options, reader)
    }

    // Rewire the plan to use the new attributes that were returned by the source.
    val replacementMap = AttributeMap(replacements)
    val triggerLogicalPlan = withNewSources transformAllExpressions {
      case a: Attribute if replacementMap.contains(a) =>
        replacementMap(a).withMetadata(a.metadata)
      case (_: CurrentTimestamp | _: CurrentDate) =>
        throw new IllegalStateException(
          "CurrentTimestamp and CurrentDate not yet supported for continuous processing")
    }

    val writer = sink.createStreamWriter(
      s"$runId",
      triggerLogicalPlan.schema,
      outputMode,
      new DataSourceOptions(extraOptions.asJava))
    val withSink = WriteToContinuousDataSource(writer, triggerLogicalPlan)

    val reader = withSink.collect {
      case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r
    }.head

    reportTimeTaken("queryPlanning") {
      lastExecution = new IncrementalExecution(
        sparkSessionForQuery,
        withSink,
        outputMode,
        checkpointFile("state"),
        runId,
        currentBatchId,
        offsetSeqMetadata)
      lastExecution.executedPlan // Force the lazy generation of execution plan
    }

    sparkSessionForQuery.sparkContext.setLocalProperty(
      StreamExecution.IS_CONTINUOUS_PROCESSING, true.toString)
    sparkSessionForQuery.sparkContext.setLocalProperty(
      ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString)
    // Add another random ID on top of the run ID, to distinguish epoch coordinators across
    // reconfigurations.
    val epochCoordinatorId = s"$runId--${UUID.randomUUID}"
    currentEpochCoordinatorId = epochCoordinatorId
    sparkSessionForQuery.sparkContext.setLocalProperty(
      ContinuousExecution.EPOCH_COORDINATOR_ID_KEY, epochCoordinatorId)
    sparkSessionForQuery.sparkContext.setLocalProperty(
      ContinuousExecution.EPOCH_INTERVAL_KEY,
      trigger.asInstanceOf[ContinuousTrigger].intervalMs.toString)

    // Use the parent Spark session for the endpoint since it's where this query ID is registered.
    val epochEndpoint =
      EpochCoordinatorRef.create(
        writer, reader, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
    val epochUpdateThread = new Thread(new Runnable {
      override def run: Unit = {
        try {
          triggerExecutor.execute(() => {
            startTrigger()

            if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) {
              if (queryExecutionThread.isAlive) {
                queryExecutionThread.interrupt()
              }
              false
            } else if (isActive) {
              currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
              logInfo(s"New epoch $currentBatchId is starting.")
              true
            } else {
              false
            }
          })
        } catch {
          case _: InterruptedException =>
            // Cleanly stop the query.
            return
        }
      }
    }, s"epoch update thread for $prettyIdString")

    try {
      epochUpdateThread.setDaemon(true)
      epochUpdateThread.start()

      reportTimeTaken("runContinuous") {
        SQLExecution.withNewExecutionId(sparkSessionForQuery, lastExecution) {
          lastExecution.executedPlan.execute()
        }
      }
    } catch {
      case t: Throwable
          if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING =>
        logInfo(s"Query $id ignoring exception from reconfiguring: $t")
        // interrupted by reconfiguration - swallow exception so we can restart the query
    } finally {
      // The above execution may finish before getting interrupted, for example, a Spark job having
      // 0 partitions will complete immediately. Then the interrupted status will sneak here.
      //
      // To handle this case, we do the two things here:
      //
      // 1. Clean up the resources in `queryExecutionThread.runUninterruptibly`. This may increase
      //    the waiting time of `stop` but should be minor because the operations here are very fast
      //    (just sending an RPC message in the same process and stopping a very simple thread).
      // 2. Clear the interrupted status at the end so that it won't impact the `runContinuous`
      //    call. We may clear the interrupted status set by `stop`, but it doesn't affect the query
      //    termination because `runActivatedStream` will check `state` and exit accordingly.
      queryExecutionThread.runUninterruptibly {
        try {
          epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
        } finally {
          SparkEnv.get.rpcEnv.stop(epochEndpoint)
          epochUpdateThread.interrupt()
          epochUpdateThread.join()
          stopSources()
          // The following line must be the last line because it may fail if SparkContext is stopped
          sparkSession.sparkContext.cancelJobGroup(runId.toString)
        }
      }
      Thread.interrupted()
    }
  }

 // TODO 伪代码,后续整理

相关文章
相关标签/搜索