PIDRateEstimator是Spark Streaming用来实现backpressure的关键组件。html
看了一些博客文章,感受对它的解释都没有说到要点,仍是本身来研究一下比较好。app
首先,须要搞清楚的一个问题是Spark Streaming的backpressure是想让系统达到怎么样的一种状态。这个问题不明确,PIDRateEstimator的做用就搞不清楚。ide
首先,backpressure这套机制是系统(由应用程序和物理资源组成的总体)的内在性质对Spark Streaming的吞吐量的限制,而并不是是某种优化。能够认为,在固定的资源下(CPU、内存、IO),Spark Streaming程序存在吞吐量的上限。优化
放在非micro-batch的状况下考虑,这意味着存在一个最大处理速度,RateEstimator认为这个速度的单位为records/second (不过实际上,每条消息的处理所耗的时间可能差异很大,因此这个速度的单位用records/second其实是可能并不合适,多是一种过分的简化)。this
放在Spark Streaming的micro-batch的状况下,因为调度器每隔batch duration的时间间隔生成一个micro-batch,这个吞吐率的上限意味着每一个batch总的消息数量存在上限。若是给每一个batch分配率的消息总数超过这个上限,每秒处理消息条数是不变的,只会使得batch的处理时间延长,这样对于系统没有什么好处,反而因为每一个batch太大而可能致使OOM。spa
当达到这个最大处理速度时,表现就是batch duration等于batch的计算阶段所花的时间,也就是batch duration == batch processing time。3d
那么backpressure的目标,就是使得系统达到上边这个状态(这个并不是彻底对,下面的分析会给出具体的状态)。它不会使得系统的累积未处理的数据减小,也不会使得系统的吞吐率提升(在不引发OOM,以及不计算GC的开销的状况下,当processing time > batch duration时,系统的吞吐量已经达到最高)。而只是使得系统的实际吞吐量稳定在最大吞吐量(除非你手动设置的rate的最大值小于最大吞吐量)code
首先,要明确PID控制器的做用。orm
引用一篇blog的说法:htm
PID控制器是一个在工业控制应用中常见的反馈回路部件。
这个控制器把收集到的数据和一个参考值进行比较,而后把这个差异用于计算新的输入值,
这个新的输入值的目的是可让系统的数据达到或者保持在参考值。
PID控制器能够根据历史数据和差异的出现率来调整输入值,使系统更加准确而稳定。
重点在于它的目的是调整输入,比而使得系统的某个咱们关注的目标指标到目标值。
PID的控制输出的公式为
这里u(t)为PID的输出。
SP是setpoint, 就是参考值
PV是 process variable, 也就是测量值。
A PID controller continuously calculates an error value e(t) as the difference between a desired setpoint (SP) and a measured process variable (PV) and applies a correction based on proportional, integral, and derivative terms (denoted P, I, and D respectively), hence the name.
首先,看下RateEstimator的compute方法的定义
private[streaming] trait RateEstimator extends Serializable { /** * Computes the number of records the stream attached to this `RateEstimator` * should ingest per second, given an update on the size and completion * times of the latest batch. * * @param time The timestamp of the current batch interval that just finished * @param elements The number of records that were processed in this batch * @param processingDelay The time in ms that took for the job to complete * @param schedulingDelay The time in ms that the job spent in the scheduling queue */ def compute( time: Long, elements: Long, processingDelay: Long, schedulingDelay: Long): Option[Double] }
看下参数的含义
PIDRateEstimator是获取当前这个结束的batch的数据,而后估计下一个batch的rate(注意,下一个batch并不必定跟当前结束的batch是连续两个batch,可能会有积压未处理的batch)。
PIDRateEstimator对于PID控制器里的"error"这个值是这么计算的:
// in seconds, should be close to batchDuration
val delaySinceUpdate = (time - latestTime).toDouble / 1000
// in elements/second
val processingRate = numElements.toDouble / processingDelay * 1000
// In our system `error` is the difference between the desired rate and the measured rate
// based on the latest batch information. We consider the desired rate to be latest rate,
// which is what this estimator calculated for the previous batch.
// in elements/second
val error = latestRate - processingRate
val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
// in elements/(second ^ 2)
val dError = (error - latestError) / delaySinceUpdate
val newRate = (latestRate - proportional * error -
integral * historicalError -
derivative * dError).max(minRate)
这里的latestRate是指PID控制器为上一个batch,也就是当前结束的batch,在生成这个batch的时候估计的处理速度。
因此上边代码中,latestRate就是参考值, processingRate就是测量值。
这里为何如此计算我仍是没搞清楚,由于latestRate是一个变化的值,不知道这样在数学上会对后边的积分、微分项的含义形成什么影响。
能够推导出来当batchDuration = processingDelay时候,这里的error为零。
推导过程为:
latestRate实际上等于numElements / batchDuration,由于numElements是上次生成job时根据这个latestRate(也就是当时的estimated rate)算出来的。
那么 error = (numElements / batchDuaration) - (numElements/processingDelay) 这里的processingDelay就是processing time
因此,当processingDelay等于batchDuration时候,error为零。
可是error为零时,PID的输出不必定为零,由于须要考虑到历史偏差和偏差的变化。这里刚结束的batch可能并不是生成后就当即被执行,而是在调度队列里排了一会队,因此仍是须要考虑schedulingDelay,它反应了历史偏差。
当PID输出为0时,newRate就等于latestRate,此时系统达到了稳定状态,error为零,historicalError和dError都为0。
这意味着:
这就是整个RateEstimator,也就是backpressure想要系统达到的状态。
这里能够定性地分析一下达到稳定状态的过程:
能够看出来这个PIDRateEstimator并不是是广泛最优的,由于它的假设是系统的动态特定不随时间变化,可是实际上若是没有颇有效的资源隔离,系统对于Spark Streaming程度来说,其资源是随时间变化的,并且在某些时间可能发生剧烈的变化。此时,此时RateEstimator应该作出更剧烈的变化来应对,好比经过动态调整各个部分的系数。
若是用户对本身的系统有深的了解,好比当资源和负载是周期性变化时,那就能够定制更合适的RateEstimator,好比考虑到天天同比的流量变化来调整estimatedRate。