做者javadoop,资深Java工程师。本文已获做者受权发布。java
原文连接https://www.javadoop.com/post/rate-limiternode
本文主要介绍关于流控的两部份内容。算法
第一部分介绍 Guava 中 RateLimiter 的源码,包括它的两种模式,目前网上大部分文章只分析简单的 SmoothBursty 模式,而没有分析带有预热的 SmoothWarmingUp。数据库
第二部分介绍 Sentinel 中流控的实现,本文不要求读者了解 Sentinel,这部份内容和 Sentinel 耦合很低,因此读者不须要有阅读压力。api
Sentinel 中流控设计是参考 Guava RateLimiter 的,因此阅读第二部份内容,须要有第一部份内容的背景。缓存
RateLimiter 基于漏桶算法,但它参考了令牌桶算法,这里不讨论流控算法,请自行查找资料。多线程
RateLimiter 的接口很是简单,它有两个静态方法用来实例化,实例化之后,咱们只须要关心 acquire 就好了,甚至都没有 release 操做。并发
// RateLimiter 接口列表:less
// 实例化的两种方式:
public static RateLimiter create(double permitsPerSecond){}
public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit) {}
public double acquire() {}
public double acquire(int permits) {}
public boolean tryAcquire() {}
public boolean tryAcquire(int permits) {}
public boolean tryAcquire(long timeout, TimeUnit unit) {}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {}
public final double getRate() {}
public final void setRate(double permitsPerSecond) {}复制代码
RateLimiter 的做用是用来限流的,咱们知道 java 并发包中提供了 Semaphore,它也可以提供对资源使用进行控制,咱们看一下下面的代码:ide
// Semaphore
Semaphore semaphore = new Semaphore(10);
for (int i = 0; i < 100; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
semaphore.acquireUninterruptibly(1);
try {
doSomething();
} finally {
semaphore.release();
}
}
});
}
复制代码
Semaphore 用来控制同时访问某个资源的并发数量,如上面的代码,咱们设置 100 个线程工做,可是咱们能作到最多只有 10 个线程能同时到 doSomething() 方法中。它控制的是并发数量。
而 RateLimiter 是用来控制访问资源的速率(rate)的,它强调的是控制速率。好比控制每秒只能有 100 个请求经过,好比容许每秒发送 1MB 的数据。
它的构造方法指定一个 permitsPerSecond 参数,表明每秒钟产生多少个 permits,这就是咱们的速率。
RateLimiter 容许预占将来的令牌,好比,每秒产生 5 个 permits,咱们能够单次请求 100 个,这样,紧接着的下一个请求须要等待大概 20 秒才能获取到 permits。
RateLimiter 目前只有一个子类,那就是抽象类 SmoothRateLimiter,SmoothRateLimiter 有两个实现类,也就是咱们这边要介绍的两种模式,咱们先简单介绍下 SmoothRateLimiter,而后后面分两个小节分别介绍它的两个实现类。
RateLimiter 做为抽象类,只有两个属性:
private final SleepingStopwatch stopwatch;
private volatile Object mutexDoNotUseDirectly;复制代码
stopwatch 很是重要,它用来“计时”,RateLimiter 把实例化的时间设置为 0 值,后续都是取相对时间,用微秒表示。
mutexDoNotUseDirectly 用来作锁,RateLimiter 依赖于 synchronized 来控制并发,因此咱们以后能够看到,各个属性甚至都没有用 volatile 修饰。
而后咱们来看 SmoothRateLimiter 的属性,分别表明什么意思。
// 当前还有多少 permits 没有被使用,被存下来的 permits 数量
double storedPermits;
// 最大容许缓存的 permits 数量,也就是 storedPermits 能达到的最大值
double maxPermits;
// 每隔多少时间产生一个 permit,
// 好比咱们构造方法中设置每秒 5 个,也就是每隔 200ms 一个,这里单位是微秒,也就是 200,000
double stableIntervalMicros;
// 下一次能够获取 permits 的时间,这个时间是相对 RateLimiter 的构造时间的,是一个相对时间,理解为时间戳吧
private long nextFreeTicketMicros = 0L; 复制代码
其实,看到这几个属性,咱们就能够大体猜一下它的内部实现了:
nextFreeTicketMicros 是一个很关键的属性。咱们每次获取 permits 的时候,先拿 storedPermits 的值,若是够,storedPermits 减去相应的值就能够了,若是不够,那么还须要将 nextFreeTicketMicros 往前推,表示我预占了接下来多少时间的量了。那么下一个请求来的时候,若是还没到 nextFreeTicketMicros 这个时间点,须要 sleep 到这个点再返回,固然也要将这个值再往前推。
你们在这里可能会有疑惑,由于时间是一直往前走的,因此 storedPermits 的信息多是不许确的,不过,只须要在关键的操做中同步一下,从新计算就行了。
咱们先从比较简单的 SmoothBursty 出发,来分析 RateLimiter 的源码,以后咱们再分析 SmoothWarmingUp。
Bursty 是突发的意思,它说的不是下面这个意思:咱们设置了 1k 每秒,而咱们能够一次性获取 5k 的 permits,这个场景表达的不是突发,而是在说预先占有了接下来几秒产生的 permits。
突发说的是,RateLimiter 会缓存必定数量的 permits 在池中,这样对于突发请求,能及时获得知足。想象一下咱们的某个接口,好久没有请求过来,忽然同时来了好几个请求,若是咱们没有缓存一些 permits 的话,不少线程就须要等待了。
SmoothBursty 默认缓存最多 1 秒钟的 permits,不能够修改。
RateLimiter 的静态构造方法:
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}复制代码
构造参数 permitsPerSecond 指定每秒钟能够产生多少个 permits。
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}复制代码
咱们看到,这里实例化的是 SmoothBursty 的实例,它的构造方法很简单,并且它只有一个属性 maxBurstSeconds,这里就不贴代码了。
构造函数指定了 maxBurstSeconds 为 1.0,也就是说,最多会缓存 1 秒钟,也就是 (1.0 * permitsPerSecond) 这么多个 permits 到池中。
这个 1.0 秒,关系到 storedPermits 和 maxPermits:
0 <= storedpermits="" <="maxPermits" =="" permitspersecond<="" p="">
咱们继续日后看 setRate 方法:
public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}复制代码
setRate 这个方法是一个 public 方法,它能够用来调整速率。咱们这边继续跟的是初始化过程,可是你们提早知道这个方法是用来调整速率用的,对理解源码有很大的帮助。注意看,这里用了 synchronized 控制并发。
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
// 同步
resync(nowMicros);
// 计算属性 stableIntervalMicros
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}复制代码
resync 方法很简单,它用来调整 storedPermits 和 nextFreeTicketMicros。这就是咱们说的,在关键的节点,须要先更新一下 storedPermits 到正确的值。
void resync(long nowMicros) {
// 若是 nextFreeTicket 已通过掉了,想象一下很长时间都没有再次调用 limiter.acquire() 的场景
// 须要将 nextFreeTicket 设置为当前时间,从新计算 storedPermits
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}复制代码
coolDownIntervalMicros() 这个方法你们先不用关注,能够看到,在 SmoothBursty 类中的实现是直接返回了 stableIntervalMicros 的值,也就是咱们说的,每产生一个 permit 的时间长度。
固然了,细心的读者,可能会发现,此时的 stableIntervalMicros 其实没有设置,也就是说,上面发生了一次除以 0 值的操做,获得的 newPermits 实际上是一个无穷大。而 maxPermits 此时仍是 0 值,不过这里其实没有关系。
咱们回到前面一个方法,resync 同步之后,会设置 stableIntervalMicros 为一个正确的值,而后进入下面的方法:
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
// 这里计算了,maxPermits 为 1 秒产生的 permits
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
// 由于 storedPermits 的值域变化了,须要等比例缩放
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}复制代码
上面这个方法,咱们要这么看,原来的 RateLimiter 是用某个 permitsPerSecond 值初始化的,如今咱们要调整这个频率。对于 maxPermits 来讲,是从新计算,而对于 storedPermits 来讲,是作等比例的缩放。
到此,构造方法就完成了,咱们获得了一个 RateLimiter 的实现类 SmoothBursty 的实例,可能上面的源码你仍是会有一些疑惑,不过也不要紧,继续往下看,可能你的不少疑惑就解开了。
接下来,咱们来分析 acquire 方法:
@CanIgnoreReturnValue
public double acquire() {
return acquire(1);
}
@CanIgnoreReturnValue
public double acquire(int permits) {
// 预定,若是当前不能直接获取到 permits,须要等待
// 返回值表明须要 sleep 多久
long microsToWait = reserve(permits);
// sleep
stopwatch.sleepMicrosUninterruptibly(microsToWait);
// 返回 sleep 的时长
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}复制代码
咱们来看 reserve 方法:
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
// 返回 nextFreeTicketMicros
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
// 计算时长
return max(momentAvailable - nowMicros, 0);
}复制代码
继续往里看:
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 这里作一次同步,更新 storedPermits 和 nextFreeTicketMicros (若是须要)
resync(nowMicros);
// 返回值就是 nextFreeTicketMicros,注意刚刚已经作了 resync 了,此时它是最新的正确的值
long returnValue = nextFreeTicketMicros;
// storedPermits 中可使用多少个 permits
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// storedPermits 中不够的部分
double freshPermits = requiredPermits - storedPermitsToSpend;
// 为了这个不够的部分,须要等待多久时间
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 这部分固定返回 0
+ (long) (freshPermits * stableIntervalMicros);
// 将 nextFreeTicketMicros 往前推
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// storedPermits 减去被拿走的部分
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
复制代码
咱们能够看到,获取 permits 的时候,实际上是获取了两部分,一部分来自于存量 storedPermits,存量不够的话,另外一部分来自于预占将来的 freshPermits。
这里提一个关键点吧,咱们看到,返回值是 nextFreeTicketMicros 的旧值,由于只要到这个时间点,就说明当次 acquire 能够成功返回了,而无论 storedPermits 够不够。若是 storedPermits 不够,会将 nextFreeTicketMicros 往前推必定的时间,预占了必定的量。
到这里,acquire 方法就分析完了,你们看到这里,逆着往前看就是了。应该说,SmoothBursty 的源码仍是很是简单的。
分析完了 SmoothBursty,咱们再来分析 SmoothWarmingUp 会简单一些。咱们说过,SmoothBursty 能够处理突发请求,由于它会缓存最多 1 秒的 permits,而待会咱们会看到 SmoothWarmingUp 彻底不一样的设计。
SmoothWarmingUp 适用于资源须要预热的场景,好比咱们的某个接口业务,须要使用到数据库链接,因为链接须要预热才能进入到最佳状态,若是咱们的系统长时间处于低负载或零负载状态(固然,应用刚启动也是同样的),链接池中的链接慢慢释放掉了,此时咱们认为链接池是冷的。
假设咱们的业务在稳定状态下,正常能够提供最大 1000 QPS 的访问,可是若是链接池是冷的,咱们就不能让 1000 个请求同时进来,由于这会拖垮咱们的系统,咱们应该有个预热升温的过程。
对应到 SmoothWarmingUp 中,若是系统处于低负载状态,storedPermits 会一直增长,当请求来的时候,咱们要从 storedPermits 中取 permits,最关键的点在于,从 storedPermits 中取 permits 的操做是比较耗时的,由于没有预热。
回顾一下前面介绍的 SmoothBursty,它从 storedPermits 中获取 permits 是不须要等待时间的,而这边洽洽相反,从 storedPermits 获取须要更多的时间,这是最大的不一样,先理解这一点,能帮助你更好地理解源码。
你们先有一些粗的概念,而后咱们来看下面这个图:
这个图不容易看懂,X 轴表明 storedPermits 的数量,Y 轴表明获取一个 permits 须要的时间。
假设指定 permitsPerSecond 为 10,那么 stableInterval 为 100ms,而 coldInterval 是 3 倍,也就是 300ms(coldFactor,3 倍是写死的,用户不能修改)。也就是说,当达到 maxPermits 时,此时处于系统最冷的时候,获取一个 permit 须要 300ms,而若是 storedPermits 小于 thresholdPermits 的时候,只须要 100ms。
想象有一条垂直线 x=k,它与 X 轴的交点 k 表明当前 storedPermits 的数量:
预热时间是咱们在构造的时候指定的,图中梯形的面积就是预热时间,由于预热完成后,咱们能进入到一个稳定的速率中(stableInterval),下面咱们来计算出 thresholdPermits 和 maxPermits 的值。
有一个关键点,从 thresholdPermits 到 0 的时间,是从 maxPermits 到 thresholdPermits 时间的一半,也就是梯形的面积是长方形面积的 2 倍,梯形的面积是 warmupPeriod。
之因此长方形的面积是 warmupPeriod/2,是由于 coldFactor 是硬编码的 3。
梯形面积为 warmupPeriod,即:
warmupPeriod = 2 * stableInterval * thresholdPermits复制代码
由此,咱们得出 thresholdPermits 的值:
thresholdPermits = 0.5 * warmupPeriod / stableInterval复制代码
而后咱们根据梯形面积的计算公式:
warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)复制代码
得出 maxPermits 为:
maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)复制代码
这样,咱们就获得了 thresholdPermits 和 maxPermits 的值。
接下来,咱们来看一下冷却时间间隔,它指的是 storedPermits 中每一个 permit 的增加速度,也就是咱们前面说的 x=k 这条垂直线往右的移动速度,为了达到从 0 到 maxPermits 花费 warmupPeriodMicros 的时间,咱们将其定义为:
@Override
double coolDownIntervalMicros() {
return warmupPeriodMicros / maxPermits;
}
贴一下代码,你们就知道了,在 resync 中用到的这个:
void resync(long nowMicros) {
if (nowMicros > nextFreeTicketMicros) {
// coolDownIntervalMicros 在这里使用
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
复制代码
基于上面的分析,咱们来看 SmoothWarmingUp 的其余源码。
首先,咱们来看它的 doSetRate 方法,有了前面的介绍,这个方法的源码很是简单:
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
// coldFactor 是固定的 3
double coldIntervalMicros = stableIntervalMicros * coldFactor;
// 这个公式咱们上面已经说了
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
// 这个公式咱们上面也已经说了
maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
// 计算那条斜线的斜率。数学知识,对边 / 临边
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}复制代码
setRate 方法很是简单,接下来,咱们要分析的是 storedPermitsToWaitTime 方法,咱们回顾一下下面的代码:
这段代码是 acquire 方法的核心,waitMicros 由两部分组成,一部分是从 storedPermits 中获取花费的时间,一部分是等待 freshPermits 产生花费的时间。在 SmoothBursty 的实现中,从 storedPermits 中获取 permits 直接返回 0,不须要等待。
而在 SmoothWarmingUp 的实现中,因为须要预热,因此从 storedPermits 中取 permits 须要花费必定的时间,其实就是要计算下图中,阴影部分的面积。
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// 若是右边梯形部分有 permits,那么先从右边部分获取permits,计算梯形部分的阴影部分的面积
if (availablePermitsAboveThreshold > 0.0) {
// 从右边部分获取的 permits 数量
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// 梯形面积公式:(上底+下底)*高/2
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// 加上 长方形部分的阴影面积
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}
// 对于给定的 x 值,计算 y 值
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}
复制代码
到这里,SmoothWarmingUp 也已经说完了。
若是你们对于 Guava RateLimiter 还有什么疑惑,欢迎在留言区留言,对于 Sentinel 中的流控不感兴趣的读者,看到这里就能够结束了。
Sentinel 是阿里开源的流控、熔断工具,这里不作过多的介绍,感兴趣的读者请自行了解。
在 Sentinel 的流控中,咱们能够配置流控规则,主要是控制 QPS 和线程数,这里咱们不讨论控制线程数,控制线程数的代码不在咱们这里的讨论范围内,下面的介绍都是指控制 QPS。
RateLimiterController 很是简单,它经过使用 latestPassedTime 属性来记录最后一次经过的时间,而后根据规则中 QPS 的限制,计算当前请求是否能够经过。
举个很是简单的例子:设置 QPS 为 10,那么每 100 毫秒容许经过一个,经过计算当前时间是否已通过了上一个请求的经过时间 latestPassedTime 以后的 100 毫秒,来判断是否能够经过。假设才过了 50ms,那么须要当前线程再 sleep 50ms,而后才能够经过。若是同时有另外一个请求呢?那须要 sleep 150ms 才行。
public class RateLimiterController implements TrafficShapingController {
// 排队最大时长,默认 500ms
private final int maxQueueingTimeMs;
// QPS 设置的值
private final double count;
// 上一次请求经过的时间
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
// 一般 acquireCount 为 1,这里不用关心参数 prioritized
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
//
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// 计算每 2 个请求之间的间隔,好比 QPS 限制为 10,那么间隔就是 100ms
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
// 能够经过,设置 latestPassedTime 而后就返回 true 了
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// 不能够经过,须要等待
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// 等待时长大于最大值,返回 false
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
// 将 latestPassedTime 往前推
long oldTime = latestPassedTime.addAndGet(costTime);
try {
// 须要 sleep 的时间
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
复制代码
这个策略仍是很是好理解的,简单粗暴,快速失败。
WarmUpController 用来防止突发流量迅速上升,致使系统负载严重太高,原本系统在稳定状态下能处理的,可是因为许多资源没有预热,致使这个时候处理不了了。好比,数据库须要创建链接、须要链接到远程服务等,这就是为何咱们须要预热。
啰嗦一句,这里不只仅指系统刚刚启动须要预热,对于长时间处于低负载的系统,突发流量也须要从新预热。
Guava 的 SmoothWarmingUp 是用来控制获取令牌的速率的,和这里的控制 QPS 仍是有一点区别,可是中心思想是同样的。咱们在看完源码之后再讨论它们的区别。
为了帮助你们理解源码,咱们这边先设定一个场景:QPS 设置为 100,预热时间设置为 10 秒。代码中使用 “【】” 表明根据这个场景计算出来的值。
public class WarmUpController implements TrafficShapingController {
// 阈值
protected double count;
// 3
private int coldFactor;
// 转折点的令牌数,和 Guava 的 thresholdPermits 一个意思
// [500]
protected int warningToken = 0;
// 最大的令牌数,和 Guava 的 maxPermits 一个意思
// [1000]
private int maxToken;
// 斜线斜率
// [1/25000]
protected double slope;
// 累积的令牌数,和 Guava 的 storedPermits 一个意思
protected AtomicLong storedTokens = new AtomicLong(0);
// 最后更新令牌的时间
protected AtomicLong lastFilledTime = new AtomicLong(0);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
// 下面的构造方法,和 Guava 中是差很少的,只不过 thresholdPermits 和 maxPermits 都换了个名字
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// warningToken 和 thresholdPermits 是同样的意思,计算结果实际上是同样的
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// 【warningToken = (10*100)/(3-1) = 500】
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// maxToken 和 maxPermits 是同样的意思,计算结果实际上是同样的
// maxPermits = thresholdPermits + 2*warmupPeriod/(stableInterval+coldInterval)
// 【maxToken = 500 + (2*10*100)/(1.0+3) = 1000】
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// 斜率计算
// slope
// slope = (coldIntervalMicros-stableIntervalMicros)/(maxPermits-thresholdPermits);
// 【slope = (3-1.0) / 100 / (1000-500) = 1/25000】
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Sentinel 的 QPS 统计使用的是滑动窗口
// 当前时间窗口的 QPS
long passQps = (long) node.passQps();
// 这里是上一个时间窗口的 QPS,这里的一个窗口跨度是1分钟
long previousQps = (long) node.previousPassQps();
// 同步。设置 storedTokens 和 lastFilledTime 到正确的值
syncToken(previousQps);
long restToken = storedTokens.get();
// 令牌数超过 warningToken,进入梯形区域
if (restToken >= warningToken) {
// 这里简单说一句,由于当前的令牌数超过了 warningToken 这个阈值,系统处于须要预热的阶段
// 经过计算当前获取一个令牌所需时间,计算其倒数便是当前系统的最大 QPS 容量
long aboveToken = restToken - warningToken;
// 这里计算警惕 QPS 值,就是当前状态下能达到的最高 QPS。
// (aboveToken * slope + 1.0 / count) 其实就是在当前状态下获取一个令牌所须要的时间
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
// 若是不会超过,那么经过,不然不经过
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
// count 是最高能达到的 QPS
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
protected void syncToken(long passQps) {
// 下面几行代码,说明在第一次进入新的 1 秒钟的时候,作同步
// 题外话:Sentinel 默认地,1 秒钟分为 2 个时间窗口,分别 500ms
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
// 令牌数量的旧值
long oldValue = storedTokens.get();
// 计算新的令牌数量,往下看
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
// 令牌数量上,减去上一分钟的 QPS,而后设置新值
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
// 更新令牌数
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 当前令牌数小于 warningToken,添加令牌
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
// 当前令牌数量处于梯形阶段,
// 若是当前经过的 QPS 大于 count/coldFactor,说明系统消耗令牌的速度,大于冷却速度
// 那么不须要添加令牌,不然须要添加令牌
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
}
复制代码
coolDownTokens 这个方法用来计算新的 token 数量,其实我也没有彻底理解做者的设计:
最后,咱们再简单说说 Guava 的 SmoothWarmingUp 和 Sentinel 的 WarmupController 的区别。
Guava 在于控制获取令牌的速率,它关心的是,获取 permits 须要多少时间,包括从 storedPermits 中获取,以及获取 freshPermits,以此推动 nextFreeTicketMicros 到将来的某个时间点。
而 Sentinel 在于控制 QPS,它用令牌数来标识当前系统处于什么状态,根据时间推动一直增长令牌,根据经过的 QPS 一直减小令牌。若是 QPS 持续降低,根据推演,能够发现 storedTokens 愈来愈多,而后越过 warningTokens 这个阈值,以后只有当 QPS 降低到 count/3 之后,令牌才会继续往上增加,一直到 maxTokens。
storedTokens 是以 “count 每秒”的增加率增加的,减小是以 前一分钟的 QPS 来减小的。其实这里我也有个疑问,为何增长令牌的时候考虑了时间,而减小的时候却不考虑时间因素,提了 issue,彷佛没人搭理。
注意,这个类继承自刚刚介绍的 WarmUpController,它的流控效果定义为排队等待。它的代码其实就是前面介绍的 RateLimiterController 加上 WarmUpController。
public class WarmUpRateLimiterController extends WarmUpController {
private final int timeoutInMs;
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) {
super(count, warmUpPeriodSec, coldFactor);
this.timeoutInMs = timeOutMs;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
long currentTime = TimeUtil.currentTimeMillis();
long restToken = storedTokens.get();
long costTime = 0;
long expectedTime = 0;
// 和 RateLimiterController 比较,区别主要就是这块代码
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// current interval = restToken*slope+1/count
double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
} else {
costTime = Math.round(1.0 * (acquireCount) / count * 1000);
}
expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
long waitTime = costTime + latestPassedTime.get() - currentTime;
if (waitTime > timeoutInMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > timeoutInMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}复制代码
这个代码很简单,就是 RateLimiter 中的代码,而后加入了预热的内容。
在 RateLimiter 中,单个请求的 costTime 是固定的,就是 1/count,好比设置 100 qps,那么 costTime 就是 10ms。
可是这边,加入了 WarmUp 的内容,就是说,经过令牌数量,来判断当前系统的 QPS 应该是多少,若是当前令牌数超过 warningTokens,那么系统的 QPS 容量已经低于咱们预设的 QPS,相应的,costTime 就会延长。
有段时间没写文章了,写得很差之处,欢迎指正。
关注做者公众号: