Spark Streaming Backpressure分析

一、为何引入Backpressure架构

                默认状况下,Spark Streaming经过Receiver以生产者生产数据的速率接收数据,计算过程当中会出现batch processing time > batch interval的状况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔。这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置间隔内不能彻底处理当前接收速率接收的数据。若是这种状况持续过长的时间,会形成数据在内存中堆积,致使Receiver所在Executor内存溢出等问题(若是设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟)。Spark 1.5之前版本,用户若是要限制Receiver的数据接收速率,能够经过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然能够经过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。好比:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会形成资源利用率降低等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming从v1.5开始引入反压机制(back-pressure),经过动态控制数据接收速率来适配集群数据处理能力。并发

二、Backpressureasync

                Spark Streaming Backpressure:  根据JobScheduler反馈做业的执行信息来动态调整Receiver数据接收率。经过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。ide

2.1 Streaming架构以下图所示(详见Streaming数据接收过程文档和Streaming 源码解析)oop

2.2 BackPressure执行过程以下图所示:post

  在原架构的基础上加上一个新的组件RateController,这个组件负责监听“OnBatchCompleted”事件,而后从中抽取processingDelay 及schedulingDelay信息.  Estimator依据这些信息估算出最大处理速度(rate),最后由基于Receiver的Input Stream将rate经过ReceiverTracker与ReceiverSupervisorImpl转发给BlockGenerator(继承自RateLimiter).this

三、BackPressure 源码解析url

3.1 RateController类体系spa

                RateController 继承自StreamingListener. 用于处理BatchCompleted事件。核心代码为:线程

**
 * A StreamingListener that receives batch completion updates, and maintains * an estimate of the speed at which this stream should ingest messages, * given an estimate computation from a `RateEstimator` */
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) extends StreamingListener with Serializable { …… …… /** * Compute the new rate limit and publish it asynchronously. */
  private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit = Future[Unit] { val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay) newRate.foreach { s => rateLimit.set(s.toLong) publish(getLatestRate()) } } def getLatestRate(): Long = rateLimit.get() override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { val elements = batchCompleted.batchInfo.streamIdToInputInfo for { processingEnd <- batchCompleted.batchInfo.processingEndTime workDelay <- batchCompleted.batchInfo.processingDelay waitDelay <- batchCompleted.batchInfo.schedulingDelay elems <- elements.get(streamUID).map(_.numRecords) } computeAndPublish(processingEnd, elems, workDelay, waitDelay) } } 

3.2 RateController的注册

                JobScheduler启动时会抽取在DStreamGraph中注册的全部InputDstream中的rateController,并向ListenerBus注册监听. 此部分代码以下:

def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started
 logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates
   for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }

3.3 BackPressure执行过程分析

                BackPressure 执行过程分为BatchCompleted事件触发时机和事件处理两个过程

3.3.1 BatchCompleted触发过程

                对BatchedCompleted的分析,应该从JobGenerator入手,由于BatchedCompleted是批次处理结束的标志,也就是JobGenerator产生的做业执行完成时触发的,所以进行做业执行分析。

                Streaming 应用中JobGenerator每一个Batch Interval都会为应用中的每一个Output Stream创建一个Job, 该批次中的全部Job组成一个JobSet.使用JobScheduler的submitJobSet进行批量Job提交。此部分代码结构以下所示

/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env) // Checkpoint all RDDs marked for checkpointing to ensure their lineages are // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
  ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    graph.generateJobs(time) // generate jobs using allocated block
 } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }

 其中,sumitJobSet会建立固定数量的后台线程(具体由“spark.streaming.concurrentJobs”指定),去处理Job Set中的Job. 具体实现逻辑为:

def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }

其中JobHandler用于执行Job及处理Job执行结果信息。当Job执行完成时会产生JobCompleted事件. JobHandler的具体逻辑以下面代码所示:

private def handleJobCompletion(job: Job, completedTime: Long) { val jobSet = jobSets.get(job.time) jobSet.handleJobCompletion(job) job.setEndTime(completedTime) listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { jobSets.remove(jobSet.time) jobGenerator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) } job.result match { case Failure(e) => reportError("Error running job " + job, e) case _ => } }

3.3.二、BatchCompleted事件处理过程

                StreamingListenerBus将事件转交给具体的StreamingListener,所以BatchCompleted将交由RateController进行处理。RateController接到BatchCompleted事件后将调用onBatchCompleted对事件进行处理。

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { val elements = batchCompleted.batchInfo.streamIdToInputInfo for { processingEnd <- batchCompleted.batchInfo.processingEndTime workDelay <- batchCompleted.batchInfo.processingDelay waitDelay <- batchCompleted.batchInfo.schedulingDelay elems <- elements.get(streamUID).map(_.numRecords) } computeAndPublish(processingEnd, elems, workDelay, waitDelay) }

onBatchCompleted会从完成的任务中抽取任务的执行延迟和调度延迟,而后用这两个参数用RateEstimator(目前存在惟一实现PIDRateEstimator,proportional-integral-derivative (PID) controller, PID控制器)估算出新的rate并发布。代码以下:

/** * Compute the new rate limit and publish it asynchronously. */
  private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit = Future[Unit] { val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay) newRate.foreach { s => rateLimit.set(s.toLong) publish(getLatestRate()) } }

其中publish()由RateController的子类ReceiverRateController来定义。具体逻辑以下(ReceiverInputDStream中定义):

 /** * A RateController that sends the new rate to receivers, via the receiver tracker. */
  private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator) extends RateController(id, estimator) { override def publish(rate: Long): Unit = ssc.scheduler.receiverTracker.sendRateUpdate(id, rate) }

publish的功能为新生成的rate 借助ReceiverTracker进行转发。ReceiverTracker将rate包装成UpdateReceiverRateLimit事交ReceiverTrackerEndpoint

/** Update a receiver's maximum ingestion rate */ def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized { if (isTrackerStarted) { endpoint.send(UpdateReceiverRateLimit(streamUID, newRate)) } }

ReceiverTrackerEndpoint接到消息后,其将会从receiverTrackingInfos列表中获取Receiver注册时使用的endpoint(实为ReceiverSupervisorImpl),再将rate包装成UpdateLimit发送至endpoint.其接到信息后,使用updateRate更新BlockGenerators(RateLimiter子类),来计算出一个固定的令牌间隔。

/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
private val endpoint = env.rpcEnv.setupEndpoint( "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint { override val rpcEnv: RpcEnv = env.rpcEnv override def receive: PartialFunction[Any, Unit] = { case StopReceiver => logInfo("Received stop signal") ReceiverSupervisorImpl.this.stop("Stopped by driver", None) case CleanupOldBlocks(threshTime) => logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) case UpdateRateLimit(eps) => logInfo(s"Received a new rate limit: $eps.") registeredBlockGenerators.asScala.foreach { bg => bg.updateRate(eps) } } })

其中RateLimiter的updateRate实现以下:

/** * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that. * * @param newRate A new rate in events per second. It has no effect if it's 0 or negative. */
 private[receiver] def updateRate(newRate: Long): Unit =
   if (newRate > 0) { if (maxRateLimit > 0) { rateLimiter.setRate(newRate.min(maxRateLimit)) } else { rateLimiter.setRate(newRate) } }

 setRate的实现以下:

public final void setRate(double permitsPerSecond) { Preconditions.checkArgument(permitsPerSecond > 0.0
        && !Double.isNaN(permitsPerSecond), "rate must be positive"); synchronized (mutex) { resync(readSafeMicros()); double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;  //固定间隔
      this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); } }

到此,backpressure反压机制调整rate结束。

4.流量控制点

  当Receiver开始接收数据时,会经过supervisor.pushSingle()方法将接收的数据存入currentBuffer等待BlockGenerator定时将数据取走,包装成block. 在将数据存放入currentBuffer之时,要获取许可(令牌)。若是获取到许可就能够将数据存入buffer, 不然将被阻塞,进而阻塞Receiver从数据源拉取数据。

/** * Push a single data item into the buffer. */ def addData(data: Any): Unit = { if (state == Active) { waitToPush() //获取令牌
 synchronized { if (state == Active) { currentBuffer += data } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } } } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } }

   其令牌投放采用令牌桶机制进行, 原理以下图所示:

令牌桶机制: 大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。若是令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中能够保存的最大令牌数永远不会超过桶的大小。当进行某操做时须要令牌时会从令牌桶中取出相应的令牌数,若是获取到则继续操做,不然阻塞。用完以后不用放回。

  Streaming 数据流被Receiver接收后,按行解析后存入iterator中。而后逐个存入Buffer,在存入buffer时会先获取token,若是没有token存在,则阻塞;若是获取到则将数据存入buffer.  而后等价后续生成block操做。

相关文章
相关标签/搜索