缓存,降级和限流是大型分布式系统中的三把利剑。目前限流主要有漏桶和令牌桶两种算法。java
漏桶算法的示意图以下:git
漏桶算法能够将系统处理请求限定到恒定的速率,当请求过载时,漏桶将直接溢出。漏桶算法假定了系统处理请求的速率是恒定的,可是在现实环境中,每每咱们的系统处理请求的速率不是恒定的。漏桶算法没法解决系统突发流量的状况。github
令牌桶算法相对漏桶算法的优点在于能够处理系统的突发流量,其算法示意图以下所示:算法
令牌桶有必定的容量(capacity),后台服务向令牌桶中以恒定的速率放入令牌(token),当令牌桶中的令牌数量超过capacity以后,多余的令牌直接丢弃。当一个请求进来时,须要从桶中拿到N个令牌,若是可以拿到则继续后面的处理流程,若是拿不到,则当前线程能够选择阻塞等待桶中的令牌数量够本次请求的数量或者不等待直接返回失败。segmentfault
Guava RateLimiter是一个谷歌提供的限流工具,RateLimiter基于令牌桶算法,能够有效限定单个JVM实例上某个接口的流量。缓存
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RateLimiterExample {
public static void main(String[] args) throws InterruptedException {
// qps设置为5,表明一秒钟只容许处理五个并发请求
RateLimiter rateLimiter = RateLimiter.create(5);
ExecutorService executorService = Executors.newFixedThreadPool(5);
int nTasks = 10;
CountDownLatch countDownLatch = new CountDownLatch(nTasks);
long start = System.currentTimeMillis();
for (int i = 0; i < nTasks; i++) {
final int j = i;
executorService.submit(() -> {
rateLimiter.acquire(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + " gets job " + j + " done");
countDownLatch.countDown();
});
}
executorService.shutdown();
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("10 jobs gets done by 5 threads concurrently in " + (end - start) + " milliseconds");
}
}
复制代码
输出结果:bash
pool-1-thread-1 gets job 0 done
pool-1-thread-2 gets job 1 done
pool-1-thread-3 gets job 2 done
pool-1-thread-4 gets job 3 done
pool-1-thread-5 gets job 4 done
pool-1-thread-6 gets job 5 done
pool-1-thread-7 gets job 6 done
pool-1-thread-8 gets job 7 done
pool-1-thread-9 gets job 8 done
pool-1-thread-10 gets job 9 done
10 jobs gets done by 5 threads concurrently in 2805 milliseconds
复制代码
上面例子中咱们提交10个工做任务,每一个任务大概耗时1000微秒,开启10个线程,而且使用RateLimiter设置了qps为5,一秒内只容许五个并发请求被处理,虽然有10个线程,可是咱们设置了qps为5,一秒以内只能有五个并发请求。咱们预期的总耗时大概是2000微秒左右,结果为2805和预期的差很少。并发
RateLimiter基于令牌桶算法,它的核心思想主要有:框架
acquire(20)
,则不须要等待20秒钟,由于令牌桶中已经有10个空闲的令牌。SmoothRateLimiter 类中的 storedPermits 就是用来表示当前令牌桶中的空闲令牌数。RateLimiter主要的类的类图以下所示:分布式
RateLimiter 是一个抽象类,SmoothRateLimiter 继承自 RateLimiter,不过 SmoothRateLimiter 仍然是一个抽象类,SmoothBursty 和 SmoothWarmingUp 才是具体的实现类。
SmoothRateLimiter 是抽象类,其定义了一些关键的参数,咱们先来看一下这些参数:
/**
* The currently stored permits.
*/
double storedPermits;
/**
* The maximum number of stored permits.
*/
double maxPermits;
/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
*/
double stableIntervalMicros;
/**
* The time when the next request (no matter its size) will be granted. After granting a request,
* this is pushed further in the future. Large requests push this further than small requests.
*/
private long nextFreeTicketMicros = 0L; // could be either in the past or future
复制代码
storedPermits 代表当前令牌桶中有多少令牌。maxPermits 表示令牌桶最大令牌数目,storedPermits 的取值范围为:[0, maxPermits]。stableIntervalMicros 等于 1/qps
,它表明系统在稳按期间,两次请求之间间隔的微秒数。例如:若是咱们设置的 qps 为5,则 stableIntervalMicros 为200ms。nextFreeTicketMicros 表示系统处理完当前请求后,下一次请求被许可的最短微秒数,若是在这以前有请求进来,则必须等待。
当咱们设置了 qps 以后,须要计算某一段时间系统可以生成的令牌数目,那么怎么计算呢?一种方式是开启一个后台任务去作,可是这样代价未免有点大。RateLimiter 中采起的是惰性计算方式:在每次请求进来的时候先去计算上次请求和本次请求之间应该生成多少个令牌。
RateLimiter 中提供了建立 SmoothBursty 的方法:
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); // maxBurstSeconds 用于计算 maxPermits
rateLimiter.setRate(permitsPerSecond); // 设置生成令牌的速率
return rateLimiter;
}
复制代码
SmoothBursty 的 maxBurstSeconds 构造函数参数主要用于计算 maxPermits :maxPermits = maxBurstSeconds * permitsPerSecond;
。
咱们再看一下 setRate 的方法,RateLimiter 中 setRate 方法最终后调用 doSetRate 方法,doSetRate 是一个抽象方法,SmoothRateLimiter 抽象类中覆写了 RateLimiter 的 doSetRate 方法:
// SmoothRateLimiter类中的doSetRate方法,覆写了 RateLimiter 类中的 doSetRate 方法,此方法再委托下面的 doSetRate 方法作处理。
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
// SmoothBursty 和 SmoothWarmingUp 类中覆写此方法
abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros);
// SmoothBursty 中对 doSetRate的实现
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } } 复制代码
SmoothRateLimiter 类的 doSetRate方法中咱们着重看一下 resync 这个方法:
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
复制代码
resync 方法就是 RateLimiter 中惰性计算 的实现。每一次请求来的时候,都会调用到这个方法。这个方法的过程大体以下:
1 / QPS
。coolDownIntervalMicros 方法在 SmoothWarmingUp 中的计算方式为warmupPeriodMicros / maxPermits
,warmupPeriodMicros 是 SmoothWarmingUp 的“预热”时间。tryAcquire 方法用于尝试获取若干个 permit,此方法不会等待,若是获取失败则直接返回失败。canAcquire 方法用于判断当前的请求可否经过:
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
if (!canAcquire(nowMicros, timeoutMicros)) { // 首先判断当前超时时间以内请求可否被知足,不能知足的话直接返回失败
return false;
} else {
microsToWait = reserveAndGetWaitLength(permits, nowMicros); // 计算本次请求须要等待的时间,核心方法
}
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
final long queryEarliestAvailable(long nowMicros) {
return nextFreeTicketMicros;
}
复制代码
canAcquire 方法逻辑比较简单,就是看 nextFreeTicketMicros 减去 timeoutMicros 是否小于等于 nowMicros。若是当前需求能被知足,则继续往下走。
接着会调用 SmoothRateLimiter 类的 reserveEarliestAvailable 方法,该方法返回当前请求须要等待的时间。改方法在 acquire 方法中也会用到,咱们来着重分析这个方法。
// 计算本次请求须要等待的时间
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros); // 本次请求和上次请求之间间隔的时间是否应该有新的令牌生成,若是有则更新 storedPermits
long returnValue = nextFreeTicketMicros;
// 本次请求的令牌数 requiredPermits 由两个部分组成:storedPermits 和 freshPermits,storedPermits 是令牌桶中已有的令牌
// freshPermits 是须要新生成的令牌数
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
// 分别计算从两个部分拿走的令牌各自须要等待的时间,而后总和做为本次请求须要等待的时间,SmoothBursty 中从 storedPermits 拿走的部分不须要等待时间
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 更新 nextFreeTicketMicros,这里更新的实际上是下一次请求的时间,是一种“预消费”
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// 更新 storedPermits
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
/**
* Translates a specified portion of our currently stored permits which we want to spend/acquire,
* into a throttling time. Conceptually, this evaluates the integral of the underlying function we
* use, for the range of [(storedPermits - permitsToTake), storedPermits].
*
* <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
*/
abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);
复制代码
上面的代码是 SmoothRateLimiter 中的具体实现。其主要有如下步骤:
acquire 方法没有等待超时的概念,会一直阻塞直到知足本次请求。
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
abstract long reserveEarliestAvailable(int permits, long nowMicros);
复制代码
acquire 方法最终仍是经过 reserveEarliestAvailable 方法来计算本次请求须要等待的时间。这个方法上面已经分析过了,这里就再也不过多阐述。
SmoothWarmingUp 相对 SmoothBursty 来讲主要区别在于 storedPermitsToWaitTime 方法。其余部分原理和 SmoothBursty 相似。
SmoothWarmingUp 是 SmoothRateLimiter 的子类,它相对于 SmoothRateLimiter 多了几个属性:
static final class SmoothWarmingUp extends SmoothRateLimiter {
private final long warmupPeriodMicros;
/**
* The slope of the line from the stable interval (when permits == 0), to the cold interval
* (when permits == maxPermits)
*/
private double slope;
private double thresholdPermits;
private double coldFactor;
...
}
复制代码
这四个参数都是和 SmoothWarmingUp 的“热身”(warmup)机制相关。warmup 能够用以下的图来表示:
* ^ throttling
* |
* cold + /
* interval | /.
* | / .
* | / . ← "warmup period" is the area of the trapezoid between
* | / . thresholdPermits and maxPermits
* | / .
* | / .
* | / .
* stable +----------/ WARM .
* interval | . UP .
* | . PERIOD.
* | . .
* 0 +----------+-------+--------------→ storedPermits
* 0 thresholdPermits maxPermits
复制代码
上图中横坐标是当前令牌桶中的令牌 storedPermits,前面说过 SmoothWarmingUp 将 storedPermits 分为两个区间:[0, thresholdPermits) 和 [thresholdPermits, maxPermits]。纵坐标是请求的间隔时间,stableInterval 就是 1 / QPS
,例如设置的 QPS 为5,则 stableInterval 就是200ms,coldInterval = stableInterval * coldFactor
,这里的 coldFactor "hard-code"写死的是3。
当系统进入 cold 阶段时,图像会向右移,直到 storedPermits 等于 maxPermits;当系统请求增多,图像会像左移动,直到 storedPermits 为0。
上面"矩形+梯形"图像的面积就是 waitMicros 也便是本次请求须要等待的时间。计算过程在 SmoothWarmingUp 类的 storedPermitsToWaitTime 方法中覆写:
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveThreshold > 0.0) { // 若是当前 storedPermits 超过 availablePermitsAboveThreshold 则计算从 超过部分拿令牌所须要的时间(图中的 WARM UP PERIOD)
// WARM UP PERIOD 部分计算的方法,这部分是一个梯形,梯形的面积计算公式是 “(上底 + 下底) * 高 / 2”
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// TODO(cpovirk): Figure out a good name for this variable.
double length = permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0); // 计算出从 WARM UP PERIOD 拿走令牌的时间
permitsToTake -= permitsAboveThresholdToTake; // 剩余的令牌从 stable 部分拿
}
// measuring the integral on the left part of the function (the horizontal line)
micros += (stableIntervalMicros * permitsToTake); // stable 部分令牌获取花费的时间
return micros;
}
// WARM UP PERIOD 部分 获取相应令牌所对应的的时间
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}
复制代码
SmoothWarmingUp 类中 storedPermitsToWaitTime 方法将 permitsToTake 分为两部分,一部分从 WARM UP PERIOD 部分拿,这部分是一个梯形,面积计算就是(上底 + 下底)* 高 / 2。另外一部分从 stable 部分拿,它是一个长方形,面积就是 长 * 宽。最后返回两个部分的时间总和。