RateLimiter

Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便,并且十分高效。算法

RateLimiter使用

public void testAcquire() {
      RateLimiter limiter = RateLimiter.create(1);

      for(int i = 1; i < 10; i = i + 2 ) {
          double waitTime = limiter.acquire(i);
          System.out.println("cutTime=" + System.currentTimeMillis() + " acq:" + i + " waitTime:" + waitTime);
      }
  }

结果:缓存

cutTime=1535439657427 acq:1 waitTime:0.0
cutTime=1535439658431 acq:3 waitTime:0.997045
cutTime=1535439661429 acq:5 waitTime:2.993028
cutTime=1535439666426 acq:7 waitTime:4.995625
cutTime=1535439673426 acq:9 waitTime:6.999223

首先经过RateLimiter.create(1);建立一个限流器,参数表明每秒生成的令牌数,经过limiter.acquire(i);来以阻塞的方式获取令牌,固然也能够经过tryAcquire(int permits, long timeout, TimeUnit unit)来设置等待超时时间的方式获取令牌,若是超timeout为0,则表明非阻塞,获取不到当即返回。服务器

从输出来看,RateLimiter支持预消费,好比在acquire(5)时,等待时间是3秒,是上一个获取令牌时预消费了3个两排,固须要等待3*1秒,而后又预消费了5个令牌,以此类推网络

RateLimiter经过限制后面请求的等待时间,来支持必定程度的突发请求(预消费)less

 

源码注释中的一个例子,好比咱们有不少任务须要执行,可是咱们不但愿每秒超过两个任务执行,那么咱们就可使用RateLimiter:ide

final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List<Runnable> tasks, Executor executor) {
    for (Runnable task : tasks) {
        rateLimiter.acquire(); // may wait
        executor.execute(task);
    }
}

另一个例子,假如咱们会产生一个数据流,而后咱们想以每秒5kb的速度发送出去.咱们能够每获取一个令牌(permit)就发送一个byte的数据,这样咱们就能够经过一个每秒5000个令牌的RateLimiter来实现:函数

final RateLimiter rateLimiter = RateLimiter.create(5000.0);
void submitPacket(byte[] packet) {
    rateLimiter.acquire(packet.length);
    networkService.send(packet);
}

另外,咱们也可使用非阻塞的形式达到降级运行的目的,即便用非阻塞的tryAcquire()方法:工具

if(limiter.tryAcquire()) { //未请求到limiter则当即返回false
    doSomething();
}else{
    doSomethingElse();
}

 

设计思路:ui

考虑一下RateLimiter是如何设计的,而且为何要这样设计.this

RateLimiter的主要功能就是提供一个稳定的速率,实现方式就是经过限制请求流入的速度,好比计算请求等待合适的时间阈值.

实现QPS速率的最简单的方式就是记住上一次请求的最后受权时间,而后保证1/QPS秒内不容许请求进入.好比QPS=5,若是咱们保证最后一个被受权请求以后的200ms的时间内没有请求被受权,那么咱们就达到了预期的速率.若是一个请求如今过来可是最后一个被受权请求是在100ms以前,那么咱们就要求当前这个请求等待100ms.按照这个思路,请求15个新令牌(许可证)就须要3秒.

有一点很重要:上面这个设计思路的RateLimiter记忆很是的浅,它的脑容量很是的小,只记得上一次被受权的请求的时间.若是RateLimiter的一个被受权请求q以前很长一段时间没有被使用会怎么样?这个RateLimiter会立马忘记过去这一段时间的利用不足,而只记得刚刚的请求q.

过去一段时间的利用不足意味着有过剩的资源是能够利用的.这种状况下,RateLimiter应该加把劲(speed up for a while)将这些过剩的资源利用起来.好比在向网络中发生数据的场景(限流),过去一段时间的利用不足可能意味着网卡缓冲区是空的,这种场景下,咱们是能够加速发送来将这些过程的资源利用起来.

另外一方面,过去一段时间的利用不足可能意味着处理请求的服务器对即将到来的请求是准备不足的(less ready for future requests),好比由于很长一段时间没有请求当前服务器的cache是陈旧的,进而致使即将到来的请求会触发一个昂贵的操做(好比从新刷新全量的缓存).

为了处理这种状况,RateLimiter中增长了一个维度的信息,就是过去一段时间的利用不足(past underutilization),代码中使用storedPermits变量表示.当没有利用不足这个变量为0,最大能达到maxStoredPermits(maxStoredPermits表示彻底没有利用).所以,请求的令牌可能从两个地方来:

1.过去剩余的令牌(stored permits, 可能没有)
2.现有的令牌(fresh permits,当前这段时间还没用完的令牌)

咱们将经过一个例子来解释它是如何工做的:

对一个每秒产生一个令牌的RateLimiter,每有一个没有使用令牌的一秒,咱们就将storedPermits加1,若是RateLimiter在10秒都没有使用,则storedPermits变成10.0.这个时候,一个请求到来并请求三个令牌(acquire(3)),咱们将从storedPermits中的令牌为其服务,storedPermits变为7.0.这个请求以后立马又有一个请求到来并请求10个令牌,咱们将从storedPermits剩余的7个令牌给这个请求,剩下还须要三个令牌,咱们将从RateLimiter新产生的令牌中获取.咱们已经知道,RateLimiter每秒新产生1个令牌,就是说上面这个请求还须要的3个请求就要求其等待3秒.

想象一个RateLimiter每秒产生一个令牌,如今彻底没有使用(处于初始状态),限制一个昂贵的请求acquire(100)过来.若是咱们选择让这个请求等待100秒再容许其执行,这显然很荒谬.咱们为何什么也不作而只是傻傻的等待100秒,一个更好的作法是容许这个请求当即执行(和acquire(1)没有区别),而后将随后到来的请求推迟到正确的时间点.这种策略,咱们容许这个昂贵的任务当即执行,并将随后到来的请求推迟100秒.这种策略就是让任务的执行和等待同时进行.

一个重要的结论:RateLimiter不会记最后一个请求,而是即下一个请求容许执行的时间.这也能够很直白的告诉咱们到达下一个调度时间点的时间间隔.而后定一个一段时间未使用的Ratelimiter也很简单:下一个调度时间点已通过去,这个时间点和如今时间的差就是Ratelimiter多久没有被使用,咱们会将这一段时间翻译成storedPermits.全部,若是每秒钟产生一个令牌(rate==1),而且正好每秒来一个请求,那么storedPermits就不会增加.

 

原理:

Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提高直到维持在一个稳定值) 两种模式实现思路相似,主要区别在等待时间的计算上,本篇重点介绍SmoothBursty

RateLimiter的建立

经过调用RateLimiter的create接口来建立实例,实际是调用的SmoothBuisty稳定模式建立的实例。

public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}

SmoothBursty中的两个构造参数含义:

  • SleepingStopwatch:guava中的一个时钟类实例,会经过这个来计算时间及令牌
  • maxBurstSeconds:官方解释,在ReteLimiter未使用时,最多保存几秒的令牌,默认是1

在解析SmoothBursty原理前,重点解释下SmoothBursty中几个属性的含义

/**
 * The work (permits) of how many seconds can be saved up if this RateLimiter is unused?
 * 在RateLimiter未使用时,最多存储几秒的令牌
 * */
 final double maxBurstSeconds;
 

/**
 * The currently stored permits.
 * 当前存储令牌数
 */
double storedPermits;

/**
 * The maximum number of stored permits.
 * 最大存储令牌数 = maxBurstSeconds * stableIntervalMicros(见下文)
 */
double maxPermits;

/**
 * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
 * per second has a stable interval of 200ms.
 * 添加令牌时间间隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌数)
 */
double stableIntervalMicros;

/**
 * The time when the next request (no matter its size) will be granted. After granting a request,
 * this is pushed further in the future. Large requests push this further than small requests.
 * 下一次请求能够获取令牌的起始时间
 * 因为RateLimiter容许预消费,上次请求预消费令牌后
 * 下次请求须要等待相应的时间到nextFreeTicketMicros时刻才能够获取令牌
 */
private long nextFreeTicketMicros = 0L; // could be either in the past or future

关键函数

  • setRate
public final void setRate(double permitsPerSecond) {
  checkArgument(
      permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
  synchronized (mutex()) {
    doSetRate(permitsPerSecond, stopwatch.readMicros());
  }
}

经过这个接口设置令牌通每秒生成令牌的数量,内部时间经过调用SmoothRateLimiterdoSetRate来实现

  • doSetRate  
final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
}

这里先经过调用resync生成令牌以及更新下一期令牌生成时间,而后更新stableIntervalMicros,最后又调用了SmoothBurstydoSetRate

  • resync
/**
 * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
 * 基于当前时间,更新下一次请求令牌的时间,以及当前存储的令牌(能够理解为生成令牌)
 */
void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
}

根据令牌桶算法,桶中的令牌是持续生成存放的,有请求时须要先从桶中拿到令牌才能开始执行,谁来持续生成令牌存放呢?

一种解法是,开启一个定时任务,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口须要分别对每一个用户作访问频率限制,假设系统中存在6W用户,则至多须要开启6W个定时任务来维持每一个桶中的令牌数,这样的开销是巨大的。

另外一种解法则是延迟计算,如上resync函数。该函数会在每次获取令牌以前调用,其实现思路为,若当前时间晚于nextFreeTicketMicros,则计算该段时间内能够生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只须要在获取令牌时计算一次便可。

  • SmoothBursty的doSetRate
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  maxPermits = maxBurstSeconds * permitsPerSecond;
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don't special-case this, we would get storedPermits == NaN, below
    // Double.POSITIVE_INFINITY 表明无穷啊
    storedPermits = maxPermits;
  } else {
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits;
  }
}

桶中可存放的最大令牌数由maxBurstSeconds计算而来,其含义为最大存储maxBurstSeconds秒生成的令牌。
该参数的做用在于,能够更为灵活地控制流量。如,某些接口限制为300次/20秒,某些接口限制为50次/45秒等。也就是流量不局限于qps

RateLimiter几个经常使用接口分析

在了解以上概念后,就很是容易理解RateLimiter暴露出来的接口

@CanIgnoreReturnValue
public double acquire() {
  return acquire(1);
}

/**
* 获取令牌,返回阻塞的时间
**/
@CanIgnoreReturnValue
public double acquire(int permits) {
  long microsToWait = reserve(permits);
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}

acquire函数主要用于获取permits个令牌,并计算须要等待多长时间,进而挂起等待,并将该值返回,主要经过reserve返回须要等待的时间,reserve中经过调用reserveAndGetWaitLength获取等待时间

/**
 * Reserves next ticket and returns the wait time that the caller must wait for.
 *
 * @return the required wait time, never negative
 */
final long reserveAndGetWaitLength(int permits, long nowMicros) {
  long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
  return max(momentAvailable - nowMicros, 0);
}

最后调用了reserveEarliestAvailable

 

@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  resync(nowMicros);
  long returnValue = nextFreeTicketMicros;
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  double freshPermits = requiredPermits - storedPermitsToSpend;
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
          + (long) (freshPermits * stableIntervalMicros);

  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}
首先经过resync生成令牌以及同步nextFreeTicketMicros时间戳,freshPermits从令牌桶中获取令牌后还须要的令牌数量,经过storedPermitsToWaitTime计算出获取freshPermits还须要等待的时间,在稳定模式中,这里就是(long) (freshPermits * stableIntervalMicros) ,而后更新nextFreeTicketMicros以及storedPermits,此次获取令牌须要的等待到的时间点, reserveAndGetWaitLength返回须要等待的时间间隔。 从`reserveEarliestAvailable`能够看出RateLimiter的预消费原理,以及获取令牌的等待时间时间原理(能够解释示例结果),再获取令牌不足时,并无等待到令牌所有生成,而是更新了下次获取令牌时的nextFreeTicketMicros,从而影响的是下次获取令牌的等待时间。 `reserve`这里返回等待时间后,`acquire`经过调用`stopwatch.sleepMicrosUninterruptibly(microsToWait);`进行sleep操做,这里不一样于Thread.sleep(), 这个函数的sleep是uninterruptibly的,内部实现:
public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
    //sleep 阻塞线程 内部经过Thread.sleep()
  boolean interrupted = false;
  try {
    long remainingNanos = unit.toNanos(sleepFor);
    long end = System.nanoTime() + remainingNanos;
    while (true) {
      try {
        // TimeUnit.sleep() treats negative timeouts just like zero.
        NANOSECONDS.sleep(remainingNanos);
        return;
      } catch (InterruptedException e) {
        interrupted = true;
        remainingNanos = end - System.nanoTime();
        //若是被interrupt能够继续,更新sleep时间,循环继续sleep
      }
    }
  } finally {
    if (interrupted) {
      Thread.currentThread().interrupt();
      //若是被打断过,sleep事后再真正中断线程
    }
  }
}
sleep以后,`acquire`返回sleep的时间,阻塞结束,获取到令牌。 

 

public boolean tryAcquire(int permits) {
  return tryAcquire(permits, 0, MICROSECONDS);
}

public boolean tryAcquire() {
  return tryAcquire(1, 0, MICROSECONDS);
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  long timeoutMicros = max(unit.toMicros(timeout), 0);
  checkPermits(permits);
  long microsToWait;
  synchronized (mutex()) {
    long nowMicros = stopwatch.readMicros();
    if (!canAcquire(nowMicros, timeoutMicros)) {
      return false;
    } else {
      microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    }
  }
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return true;
}

private boolean canAcquire(long nowMicros, long timeoutMicros) {
  return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}

@Override
final long queryEarliestAvailable(long nowMicros) {
  return nextFreeTicketMicros;
}

tryAcquire函数能够尝试在timeout时间内获取令牌,若是能够则挂起等待相应时间并返回true,不然当即返回false
canAcquire用于判断timeout时间内是否能够获取令牌,经过判断当前时间+超时时间是否大于nextFreeTicketMicros 来决定是否可以拿到足够的令牌数,若是能够获取到,则过程同acquire,线程sleep等待,若是经过canAcquire在此超时时间内不能回去到令牌,则能够快速返回,不须要等待timeout后才知道可否获取到令牌。

相关文章
相关标签/搜索