信念,你拿它没办法,可是没有它你什么也作不成。—— 撒姆尔巴特勒html
对于spark streaming而言,大的batch任务会致使后续batch任务积压,对于structured streaming任务影响如何,本篇文章主要来作一下简单的说明。java
本篇文章的全称为设置trigger后,运行时间长的 query 对后续 query 的submit time的影响sql
首先trigger有三种类型,分别为 OneTimeTrigger ,ProcessingTime 以及 ContinuousTrigger 三种。这三种解释能够参照 spark 集群优化 中对 trigger的解释说明。apache
OneTimeTrigger只执行一次query就结束了,不存在对后续batch影响。 json
代码截图以下,即在每个partition上的task中添加一个sleep逻辑:session
运行效果截图以下:app
UI的Jobs面板截图以下:less
UI的SQL面板截图以下:dom
经过上面两个面板截图中的submitted列能够看出,此时每个batch的query 提交时间是根据前驱query的结束时间来肯定的。ide
下面从源码角度来分析一下。
它是当新数据到达时在后台连续执行的查询的句柄。管理在单独线程中发生的流式Spark SQL查询的执行。 与标准查询不一样,每次新数据到达查询计划中存在的任何 Source 时,流式查询都会重复执行。每当新数据到达时,都会建立一个 QueryExecution,并将结果以事务方式提交给给定的 Sink 。
它有两个子类,截图以下:
在org.apache.spark.sql.streaming.StreamingQueryManager#createQuery方法中有以下代码片断:
1 (sink, trigger) match { 2 case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) => 3 if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) { 4 UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode) 5 } 6 // 使用 ContinuousTrigger 则为 ContinuousExecution 7 new StreamingQueryWrapper(new ContinuousExecution( 8 sparkSession, 9 userSpecifiedName.orNull, 10 checkpointLocation, 11 analyzedPlan, 12 v2Sink, 13 trigger, 14 triggerClock, 15 outputMode, 16 extraOptions, 17 deleteCheckpointOnStop)) 18 case _ => 19 // 使用 ProcessingTrigger 则为 MicroBatchExecution 20 new StreamingQueryWrapper(new MicroBatchExecution( 21 sparkSession, 22 userSpecifiedName.orNull, 23 checkpointLocation, 24 analyzedPlan, 25 sink, 26 trigger, 27 triggerClock, 28 outputMode, 29 extraOptions, 30 deleteCheckpointOnStop)) 31 }
能够看出,Tigger和对应的StreamExecution的关系以下:
Trigger
|
StreamExecution
|
---|---|
OneTimeTrigger | MicroBatchExecution |
ProcessingTrigger | MicroBatchExecution |
ContinuousTrigger | ContinuousExecution |
另外,StreamExecution构造参数中的analyzedPlan是指LogicalPlan,也就是说在第一个query启动以前,LogicalPlan已经生成,此时的LogicalPlan是 UnResolved LogicalPlan,由于此时每个AST依赖的数据节点的source信息还未知,还没法优化LogicalPlan。
ContinuousExecution支持的source类型目前有限,主要为StreamWriteSupport子类,即:
source
|
class full name
|
---|---|
console | org.apache.spark.sql.execution.streaming.ConsoleSinkProvider |
kafka | org.apache.spark.sql.kafka010.KafkaSourceProvider |
ForeachSink | org.apache.spark.sql.execution.streaming.sources.ForeachWriterProvider |
MemorySinkV2 | org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 |
不然会匹配到 MicroBatchExecution, 可是在初始化 triggerExecution成员变量时,只支持ProcessingTrigger,不支持 ContinuousTrigger,会抛出以下异常:
org.apache.spark.sql.streaming.StreamingQueryManager#startQuery有以下代码片断:
1 try { 2 // When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously. 3 // As it's provided by the user and can run arbitrary codes, we must not hold any lock here. 4 // Otherwise, it's easy to cause dead-lock, or block too long if the user codes take a long 5 // time to finish. 6 query.streamingQuery.start() 7 } catch { 8 case e: Throwable => 9 activeQueriesLock.synchronized { 10 activeQueries -= query.id 11 } 12 throw e 13 }
这里的query.streamingQuery就是StreamExecution,即为MicroBatchExecution 或 ContinuousExecution。
StreamExecution的start 方法以下:
1 /** 2 * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]] 3 * has been posted to all the listeners. 4 */ 5 def start(): Unit = { 6 logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.") 7 queryExecutionThread.setDaemon(true) 8 queryExecutionThread.start() 9 startLatch.await() // Wait until thread started and QueryStart event has been posted 10 }
queryExecutionThread成员变量声明以下:
1 /** 2 * The thread that runs the micro-batches of this stream. Note that this thread must be 3 * [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a 4 * running `KafkaConsumer` may cause endless loop. 5 */ 6 val queryExecutionThread: QueryExecutionThread = 7 new QueryExecutionThread(s"stream execution thread for $prettyIdString") { 8 override def run(): Unit = { 9 // To fix call site like "run at <unknown>:0", we bridge the call site from the caller 10 // thread to this micro batch thread 11 sparkSession.sparkContext.setCallSite(callSite) 12 runStream() 13 } 14 }
其中,QueryExecutionThread 是 UninterruptibleThread 的子类,UninterruptibleThread 是 Thread的子类,即QueryExecutionThread 是一个线程类。他会运行runStream方法,runStream关键代码以下:
try { // 运行Stream query的准备工做,send QueryStartedEvent event, countDown latch,streaming configure等操做 runActivatedStream(sparkSessionForStream) // 运行 stream } catch { // 异常处理 } finally { // 运行完Stream query的收尾工做,stop source,send stream stop event,删除checkpoint(若是启用deleteCheckpointOnStop)等等操做 }
runActivatedStream 说明:Run the activated stream until stopped. :它是抽象方法,由子类实现。
MicroBatchExecution 的 runActivatedStream的方法逻辑描述以下:
1 triggerExecutor.execute(() =>{ 2 提交执行每个query的操做 3 })
triggerExecution 的定义以下:
private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) case OneTimeTrigger => OneTimeExecutor() case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") }
即便用 ProcessingTime 会使用 ProcessingTimeExecutor 来周期性生成 batch query,其 execution 方法代码以下:
1 override def execute(triggerHandler: () => Boolean): Unit = { 2 while (true) { 3 val triggerTimeMs = clock.getTimeMillis 4 val nextTriggerTimeMs = nextBatchTime(triggerTimeMs) 5 val terminated = !triggerHandler() 6 if (intervalMs > 0) { 7 val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs 8 if (batchElapsedTimeMs > intervalMs) { 9 notifyBatchFallingBehind(batchElapsedTimeMs) 10 } 11 if (terminated) { 12 return 13 } 14 clock.waitTillTime(nextTriggerTimeMs) 15 } else { 16 if (terminated) { 17 return 18 } 19 } 20 } 21 }
伪代码以下:
def execute(triggerHandler: () => Boolean): Unit = { while(true) { 获取current_time 根据current_time和interval获取下一个批次start_time 执行query任务获取并获取是否结束stream的标志位 if(interval > 0) { query使用时间 = 新获取的current_time - 旧的current_time if(query使用时间 > interval) { notifyBatchFallingBehind // 目前只是打印warn日志 } if(stream终止标志位为true){ return // 结束这个while循环退出方法 } // Clock.waitTillTime SystemClock子类经过while + sleep(ms)实现,其他子类经过while + wait(ms) 来实现,使用while是为了防止外部中断致使wait时间不够 } else { if(stream终止标志位为true){ return // 结束这个while循环退出方法 } } } }
即stream没有中止状况下,下一个batch的提交时间为 = 当前batch使用时间 > interval ? 当前batch结束时间:本批次开始时间 / interval * interval + interval
源码以下:
1 override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { 2 val stateUpdate = new UnaryOperator[State] { 3 override def apply(s: State) = s match { 4 // If we ended the query to reconfigure, reset the state to active. 5 case RECONFIGURING => ACTIVE 6 case _ => s 7 } 8 } 9 10 do { 11 runContinuous(sparkSessionForStream) 12 } while (state.updateAndGet(stateUpdate) == ACTIVE) 13 }
其中,runContinuous 源码以下:
/** * Do a continuous run. * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with. */ private def runContinuous(sparkSessionForQuery: SparkSession): Unit = { // A list of attributes that will need to be updated. val replacements = new ArrayBuffer[(Attribute, Attribute)] // Translate from continuous relation to the underlying data source. var nextSourceId = 0 continuousSources = logicalPlan.collect { case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) => val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" nextSourceId += 1 dataSource.createContinuousReader( java.util.Optional.empty[StructType](), metadataPath, new DataSourceOptions(extraReaderOptions.asJava)) } uniqueSources = continuousSources.distinct val offsets = getStartOffsets(sparkSessionForQuery) var insertedSourceId = 0 val withNewSources = logicalPlan transform { case ContinuousExecutionRelation(source, options, output) => val reader = continuousSources(insertedSourceId) insertedSourceId += 1 val newOutput = reader.readSchema().toAttributes assert(output.size == newOutput.size, s"Invalid reader: ${Utils.truncatedString(output, ",")} != " + s"${Utils.truncatedString(newOutput, ",")}") replacements ++= output.zip(newOutput) val loggedOffset = offsets.offsets(0) val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json)) reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull)) StreamingDataSourceV2Relation(newOutput, source, options, reader) } // Rewire the plan to use the new attributes that were returned by the source. val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a).withMetadata(a.metadata) case (_: CurrentTimestamp | _: CurrentDate) => throw new IllegalStateException( "CurrentTimestamp and CurrentDate not yet supported for continuous processing") } val writer = sink.createStreamWriter( s"$runId", triggerLogicalPlan.schema, outputMode, new DataSourceOptions(extraOptions.asJava)) val withSink = WriteToContinuousDataSource(writer, triggerLogicalPlan) val reader = withSink.collect { case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r }.head reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( sparkSessionForQuery, withSink, outputMode, checkpointFile("state"), runId, currentBatchId, offsetSeqMetadata) lastExecution.executedPlan // Force the lazy generation of execution plan } sparkSessionForQuery.sparkContext.setLocalProperty( StreamExecution.IS_CONTINUOUS_PROCESSING, true.toString) sparkSessionForQuery.sparkContext.setLocalProperty( ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString) // Add another random ID on top of the run ID, to distinguish epoch coordinators across // reconfigurations. val epochCoordinatorId = s"$runId--${UUID.randomUUID}" currentEpochCoordinatorId = epochCoordinatorId sparkSessionForQuery.sparkContext.setLocalProperty( ContinuousExecution.EPOCH_COORDINATOR_ID_KEY, epochCoordinatorId) sparkSessionForQuery.sparkContext.setLocalProperty( ContinuousExecution.EPOCH_INTERVAL_KEY, trigger.asInstanceOf[ContinuousTrigger].intervalMs.toString) // Use the parent Spark session for the endpoint since it's where this query ID is registered. val epochEndpoint = EpochCoordinatorRef.create( writer, reader, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get) val epochUpdateThread = new Thread(new Runnable { override def run: Unit = { try { triggerExecutor.execute(() => { startTrigger() if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) { if (queryExecutionThread.isAlive) { queryExecutionThread.interrupt() } false } else if (isActive) { currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch) logInfo(s"New epoch $currentBatchId is starting.") true } else { false } }) } catch { case _: InterruptedException => // Cleanly stop the query. return } } }, s"epoch update thread for $prettyIdString") try { epochUpdateThread.setDaemon(true) epochUpdateThread.start() reportTimeTaken("runContinuous") { SQLExecution.withNewExecutionId(sparkSessionForQuery, lastExecution) { lastExecution.executedPlan.execute() } } } catch { case t: Throwable if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING => logInfo(s"Query $id ignoring exception from reconfiguring: $t") // interrupted by reconfiguration - swallow exception so we can restart the query } finally { // The above execution may finish before getting interrupted, for example, a Spark job having // 0 partitions will complete immediately. Then the interrupted status will sneak here. // // To handle this case, we do the two things here: // // 1. Clean up the resources in `queryExecutionThread.runUninterruptibly`. This may increase // the waiting time of `stop` but should be minor because the operations here are very fast // (just sending an RPC message in the same process and stopping a very simple thread). // 2. Clear the interrupted status at the end so that it won't impact the `runContinuous` // call. We may clear the interrupted status set by `stop`, but it doesn't affect the query // termination because `runActivatedStream` will check `state` and exit accordingly. queryExecutionThread.runUninterruptibly { try { epochEndpoint.askSync[Unit](StopContinuousExecutionWrites) } finally { SparkEnv.get.rpcEnv.stop(epochEndpoint) epochUpdateThread.interrupt() epochUpdateThread.join() stopSources() // The following line must be the last line because it may fail if SparkContext is stopped sparkSession.sparkContext.cancelJobGroup(runId.toString) } } Thread.interrupted() } }
// TODO 伪代码,后续整理