限流原理解读之guava中的RateLimiter

RateLimiter有两种新建的方式算法

  1. 建立Bursty方式数据库

  2. 建立WarmingUp方式缓存

如下源码来自 guava-17.0安全

Bursty

//初始化
RateLimiter r = RateLimiter.create(1); 
//不阻塞
r.tryAcquire();
//阻塞
r.acquire()
复制代码

RateLimiter.create作了两件事情建立Bursty对象和设置了速率,至次初始化过程结束bash

RateLimiter rateLimiter = new Bursty(ticker, 1.0 /* maxBurstSeconds */); //ticker默认使用本身定义的
rateLimiter.setRate(permitsPerSecond);
复制代码
  1. 新建Bursty对象。它指定的是可以存储的最大时间是多长,好比设置的时间是1s,那么假设容许每秒钟发放的令牌数量为2,能存储的最大量为2;
  2. setRate。 内部经过私有锁来保证速率的修改是线程安全的
    synchronized (mutex) {
      //1:查看当前的时间是否比预计下次可发放令牌的时间要大,若是大,更新下次可发放令牌的时间为当前时间
      resync(readSafeMicros());
      //2:计算两次发放令牌之间的时间间隔,好比1s中须要发放5个,那它就是 200000.0微秒
      double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
      this.stableIntervalMicros = stableIntervalMicros;
      //3:设置maxPermits和storedPermits
      doSetRate(permitsPerSecond, stableIntervalMicros);
    }
    复制代码

resync源码网络

private void resync(long nowMicros) {
  // 查看当前的时间是否比预计下次可发放令牌的时间要大,若是大,更新下次可发放令牌的时间为当前时间
  if (nowMicros > nextFreeTicketMicros) {
    storedPermits = Math.min(maxPermits,
        storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
    nextFreeTicketMicros = nowMicros;
  }
}
复制代码

doSetRate源码多线程

@Overridevoid doSetRate(double permitsPerSecond, double stableIntervalMicros) {
 double oldMaxPermits = this.maxPermits;
 maxPermits = maxBurstSeconds * permitsPerSecond;
 storedPermits = (oldMaxPermits == 0.0)
     ? 0.0 // 初始条件存储的是没有
    storedPermits * maxPermits / oldMaxPermits;
}
复制代码

在整个的初始化过程当中,关键信息是:并发

  • nextFreeTicketMicros 预计下次发放令牌的时间, stableIntervalMicros 两次发放令牌之间的时间间隔ide

  • maxPermits 最大能存储的令牌的数量 storedPermits 已经存储的令牌数函数

为何是nextFreeTicketMicros?

最简单的维持QPS速率的方式就是记住最后一次请求的时间,而后确保再次有请求过来的时候,已经通过了 1/QPS 秒。好比QPS是5 次/秒,只须要确保两次请求时间通过了200ms便可,若是恰好在100ms到达,就会再等待100ms,也就是说,若是一次性须要15个令牌,须要的时间为为3s。可是对于一个长时间没有请求的系统,这样的的设计方式有必定的不合理之处。考虑一个场景:若是一个RateLimiter,每秒产生1个令牌,它一直没有使用过,忽然来了一个须要100个令牌的请求,选择等待100s再执行这个请求,显得不太明智,更好的处理方式为当即执行它,而后把接下来的请求推迟100s。

于是RateLimiter自己并不记下最后一次请求的时间,而是记下下一次指望运行的时间(nextFreeTicketMicros)。

这种方式带来的一个好处是,能够去判断等待的超时时间是否大于下次运行的时间,以使得可以执行,若是等待的超时时间过短,就能当即返回。

为何会有一个标记表明存储了多少令牌?

一样的考虑长时间没有使用的场景。若是长时间没有请求,忽然间来了,这个时候是否应该立马放行这些请求?长时间没有使用可能意味着两件事:

  1. 不少资源是存在空闲的状况,好比说网络请求长时间没有,它的缓冲区颇有多是空的,此时是能够加速传输,提升它的利用率
  2. 一些时候,瞬间的爆发会致使溢出,好比说服务上的缓存过时了,须要去查询库,这个花销是很是“昂贵”的,过多的请求会致使数据库撑不住

RateLimiter就使用storedPermits来给过去请求的不充分程度建模。它的存储规则以下: 假设RateLimiter每秒产生一个令牌,每过去一秒若是没有请求,RateLimter也就没有消费,就使storedPermits增加1。假设10s以内都没有请求过来,storedPermits就变成了10(假设maxPermits>10),此时若是要获取3个令牌,会使用storedPermits来中的令牌来处理,而后它的值变为了7,片刻以后,若是调用了acquire(10),部分的会从storedPermits拿到7个权限,剩余的3个则须要从新产生。

总的来讲RateLimiter提供了一个storedPermits变量,当资源利用充分的时候,它就是0,最大能够增加到 maxStoredPermits。请求所需的令牌来自于两个地方:stored permits(空闲时存储的令牌)和fresh permits(现有的令牌)

怎么衡量从storedPermits中获取令牌这个过程?

一样假设每秒RateLimiter只生产一个令牌,正常状况下,若是一次来了3个请求,整个过程会持续3秒钟。考虑到长时间没有请求的场景:

  1. 资源空闲。这种时候系统是能承受住必定量的请求的,固然但愿在承受范围以内可以更快的提供请求,也就是说,若是有存储令牌,相比新产生令牌,此时但愿可以更快的获取令牌,也就是此时从存储令牌中获取令牌的时间消耗要比产生新令牌要少,从而更快相应请求
  2. 瞬时流量过大。这时候就不但愿过快的消耗存储的令牌,但愿它可以相比产生新的令牌的时间消耗大些,从而可以使请求相对平缓。

分析可知,针对不一样的场景,须要对获取storedPermits作不一样的处理,Ratelimiter的实现方式就是 storedPermitsToWaitTime 函数,它创建了从storedPermits中获取令牌和时间花销的模型函数,而衡量时间的花销就是经过对模型函数进行积分计算,好比原来存储了10个令牌,如今须要拿3个令牌,还剩余7个,那么所须要的时间花销就是该函数从7-10区间中的积分。

这种方式保证了任何获取令牌方式所须要的时间都是同样的,比如 每次拿一个和先拿两个再拿一个,从时间上来说并无分别。

storedPermitsToWaitTime实现原理

storedPermits自己是用来衡量没有使用的时间的。在没有使用令牌的时候存储,存储的速率(单位时间内存储的令牌的个数)是 每没用1次就存储1次: rate=permites/time 。也就是说 1 / rate = time / permits,那么可获得 (1/rate)*permits 就能够来衡量时间花销。

选取(1/rate)做为基准线

  • 若是选取一条在它之上的线,就作到了比从fresh permits中获取要慢;
  • 若是在基准线之下,则是比从fresh permits中获取要快;
  • 恰好是基准线,那么从storedPermits中获取和新产生的速率如出一辙;

Bursty的storedPermitsToWaitTime函数实现

long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  return 0L;
}
复制代码

它直接返回了0,也就是在基准线之下,获取storedPermits的速率比新产生要快,当即可以拿到存储的量

WarmingUp

//初始化
RateLimiter r =RateLimiter.create(1,1,TimeUnit.SECONDS);
//不阻塞
r.tryAcquire();
//阻塞
r.acquire()
复制代码

create方法建立了WarmingUp对象,并这只了对应的速率

RateLimiter rateLimiter = new WarmingUp(ticker, warmupPeriod, unit);
rateLimiter.setRate(permitsPerSecond);
复制代码

相比Bursty,它多了个参数warmupPeroid,它会以提供的unit为时间单位,转换成微秒存储。setRate相似于Bursty,只是在doSetRate提供不一样的实现

void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = maxPermits;
  //1:最大的存储个数为须要预热的时间除以两个请求的时间间隔,好比设定预热时间为1s,每秒有5个请求,那么最大的存储个数为1000ms/200ms=5个
  maxPermits = warmupPeriodMicros / stableIntervalMicros; 
  //2:计算最大存储permits的一半
  halfPermits = maxPermits / 2.0;
  //3:初始化稳定时间间隔的3倍做为冷却时间间隔
  double coldIntervalMicros = stableIntervalMicros * 3.0;
  //4:设置基准线的斜率
  slope = (coldIntervalMicros - stableIntervalMicros) / halfPermits;
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    storedPermits = 0.0;
  } else {
    storedPermits = (oldMaxPermits == 0.0)
        ? maxPermits // 初始条件下,认为就是存储满的,以达到缓慢消费的效果
        : storedPermits * maxPermits / oldMaxPermits;
  }
}
复制代码

在这个过程当中能够看到Warmup方式新增了一个halfPermits的设计,以及经过公式 slope=(coldIntervalMicros-stableIntervalMicros)/halfPermits,他们在函数 storedPermitsToWaitTime中获得了运用

@Overridelong storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  //1:计算储存的令牌中超过了最大令牌一半的数量
  double availablePermitsAboveHalf = storedPermits - halfPermits;
  long micros = 0;
  // 计算超过一半的部分所须要的时间花销(对于函数来讲,就是积分计算)
  if (availablePermitsAboveHalf > 0.0) {
    double permitsAboveHalfToTake = Math.min(availablePermitsAboveHalf, permitsToTake);
    micros = (long) (permitsAboveHalfToTake * (permitsToTime(availablePermitsAboveHalf)
        + permitsToTime(availablePermitsAboveHalf - permitsAboveHalfToTake)) / 2.0);
    permitsToTake -= permitsAboveHalfToTake;
  }
  // 计算函数的还没有超过一半的部分所须要的时间花销
  micros += (stableIntervalMicros * permitsToTake);
  return micros;
}

private double permitsToTime(double permits) {
  return stableIntervalMicros + permits * slope;
}
复制代码

WarmingUp的设计理念

WarmingUp对时间花销衡量方式为下图

*          ^ throttling
   *          |
   * 3*stable +                  /
   * interval |                 /.
   *  (cold)  |                / .
   *          |               /  .   <-- "warmup period" is the area of the trapezoid between
   * 2*stable +              /   .       halfPermits and maxPermits
   * interval |             /    .
   *          |            /     .
   *          |           /      .
   *   stable +----------/  WARM . }
   * interval |          .   UP  . } <-- this rectangle (from 0 to maxPermits, and
   *          |          . PERIOD. }     height == stableInterval) defines the cooldown period,
   *          |          .       . }     and we want cooldownPeriod == warmupPeriod
   *          |---------------------------------> storedPermits
   *              (halfPermits) (maxPermits)
复制代码

横轴表示存储的令牌个数,纵轴表示时间,这样函数的积分就能够表示所要消耗的时间。
在程序刚开始运行的时候,warmingup方式会存满全部的令牌,而根据从存储令牌中的获取方式,能够实现从存储最大令牌中到降到一半令牌所须要的时间为存储同量令牌时间的2倍,从而使得刚开始的时候发放令牌的速度比较慢,等消耗一半以后,获取的速率和生产的速率一致,从而也就实现了一个‘热身’的概念

从storedPermits中获取令牌所须要的时间,它分为两部分,以maxPetmits的一半为分割点

  • storedPermits <= halfPermits 的时候,存储和消费storedPermits的速率与产生的速率如出一辙

  • storedPermits>halfPermits, 存储storePermites所须要的时间和产生的速率保持一致,可是消费storePermites从maxPermits到halfPermits所须要的时间为从halfPermits增加到maxPermits所须要时间的2被,也就是比新令牌产生要慢 为何在分隔点计算还有斜率方面选了3倍和一半的位置 对函数作积分计算(图形面积),恰好能够保证,超过一半的部分,若是要拿掉一半的存储令牌所须要的时间刚好是存储一样量(或者说是新令牌产生)时间花销的两倍,对应场景若是过了很长一段时间没有使用(存储的令牌会达到maxPermits),刚开始能接收请求的速率相对比较慢,而后再增加到稳定的消费速率

关键在于存储的速率是和新令牌产生的速率同样,可是消费的速率,当存储的超过一半时,会慢于新令牌产生的速率,小于一半则速率是同样的

TryAcquire

它会尝试去获取令牌,若是没法获取就当即返回,不然再超时时间以内返回给定的令牌。 源码以下

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  //1:使用微秒来转换超时时间
  long timeoutMicros = unit.toMicros(timeout);
  checkPermits(permits);
  long microsToWait;
  synchronized (mutex) {
    long nowMicros = readSafeMicros();
    //2.1:若是下次可以获取令牌的时间超过超时时间范围,立马返回;
    if (nextFreeTicketMicros > nowMicros + timeoutMicros) {
      return false;
    } else {
      //2.2:获取须要等待的时间,本次获取的时间确定不会超时
      microsToWait = reserveNextTicket(permits, nowMicros);
    }
  }
  //3:实行等待
  ticker.sleepMicrosUninterruptibly(microsToWait);
  return true;
}
复制代码

第一次运行的时候,nextFreeTicketMicros是建立时候的时间,一定小于当前时间,因此第一次确定会放过,容许执行,只是须要计算要等待的时间。

private long reserveNextTicket(double requiredPermits, long nowMicros) {
  //1:若是下次能够获取令牌的时间在过去,更新
  resync(nowMicros);
  //2:计算距离下次获取令牌须要的时间,若是nextFreeTikcetMicros>nowMicros,这个时间段一定在超时时间以内,假如入超时时间是0,那么一定是microsToNextFreeTicket趋近于0,也就是立马可以放行;
  long microsToNextFreeTicket = Math.max(0, nextFreeTicketMicros - nowMicros);
  //3:计算须要消耗的存储的令牌
  double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits);
  //4:计算须要新产生的令牌
  double freshPermits = requiredPermits - storedPermitsToSpend;
  //5:计算消耗存储令牌所须要的时间和新产生令牌所须要的时间。对于Bursty来说,消耗存储的令牌所须要时间为0,WarmingUp方式则是须要根据不一样的场景有不一样的结果
  long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
      + (long) (freshPermits * stableIntervalMicros);
  //6:下次可以获取令牌的时间,须要延迟当前已经等待的时间,也就是说,若是立马有请求过来会放行,可是这个等待时间将会影响后续的请求访问,也就是说,此次的请求若是当前的特别的多,下一次可以请求的可以容许的时间一定会有很长的延迟
  this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
  //7:扣除消耗的存储令牌
  this.storedPermits -= storedPermitsToSpend;
  //8:返回本次要获取令牌所须要的时间,它确定不会超过超时时间
  return microsToNextFreeTicket;
}
复制代码

Acquire

它会阻塞知道容许放行,返回值为阻塞的时长 源码以下

public double acquire(int permits) {
  long microsToWait = reserve(permits); //也就是调用reserveNextTicket
  ticker.sleepMicrosUninterruptibly(microsToWait); //阻塞住须要等待的时长
  return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);
}
复制代码

TryAcquire 运行案例

程序设置10个线程,使得并发数为10,模拟线上的场景,任务内容以下

class MyTask implements Runnable{
    private CountDownLatch latch;
    private RateLimiter limiter;

    public MyTask(CountDownLatch latch, RateLimiter limiter) {
        this.latch = latch;
        this.limiter = limiter;
    }

    @Override    public void run() {
        try {
            //使得线程同时触发            
           latch.await();
            System.out.println("time "+System.currentTimeMillis()+"ms :"+limiter.tryAcquire());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
复制代码

Bursty-TryAcquire

这里设置限制每秒的流量为5,也就是说第一次请求事后,下次请求须要等200ms

RateLimiter r =RateLimiter.create(5);
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10);

for (int i=0;i<10;i++)
{

    service.submit(new MyTask(latch, r));
    latch.countDown();
    System.out.println("countdown:" + latch.getCount());
}
System.out.println("countdown over");
service.shutdown();
复制代码

结果以下

countdown:9
countdown:8
countdown:7
countdown:6
countdown:5
countdown:4
countdown:3
countdown:2
countdown:1
countdown:0
countdown over
time 1538487195698ms :true
time 1538487195699ms :false
time 1538487195699ms :false
time 1538487195699ms :false
time 1538487195699ms :false
time 1538487195699ms :false
time 1538487195699ms :false
time 1538487195698ms :false
time 1538487195698ms :false
time 1538487195699ms :false
复制代码

若是使得线程等待401ms,那么程序会存储的令牌为2个

注意刚开始存储的时候,不是慢的,这里的存储量是慢慢增加,而且可以立马拿到

RateLimiter r =RateLimiter.create(5);
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10);

for (int i=0;i<10;i++)
{

    service.submit(new MyTask(latch, r));
    if (i==9){
        TimeUnit.MILLISECONDS.sleep(401);
        System.out.println("sleep 10 seconds over");
    }
    latch.countDown();
    System.out.println("countdown:" + latch.getCount());
}
System.out.println("countdown over");
service.shutdown();
复制代码

运行结果恰好容许3个运行

countdown:9
countdown:8
countdown:7
countdown:6
countdown:5
countdown:4
countdown:3
countdown:2
countdown:1
sleep 10 seconds over
countdown:0
countdown over
time 1538487297981ms :true
time 1538487297981ms :false
time 1538487297981ms :false
time 1538487297981ms :false
time 1538487297981ms :true
time 1538487297981ms :true
time 1538487297981ms :false
time 1538487297981ms :false
time 1538487297981ms :false
time 1538487297981ms :false
复制代码

若是等待时间超过1秒,容许放行的流量也不会超过6个,存储的令牌+第一个令牌

RateLimiter r =RateLimiter.create(5);
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10);

for (int i=0;i<10;i++)
{

    service.submit(new MyTask(latch, r));
    if (i==9){
        TimeUnit.MILLISECONDS.sleep(1001);
        System.out.println("sleep 10 seconds over");
    }
    latch.countDown();
    System.out.println("countdown:" + latch.getCount());
}
System.out.println("countdown over");
service.shutdown();
复制代码

结果为

countdown:9
countdown:8
countdown:7
countdown:6
countdown:5
countdown:4
countdown:3
countdown:2
countdown:1
sleep 10 seconds over
countdown:0
countdown over
time 1538487514780ms :true
time 1538487514780ms :true
time 1538487514780ms :true
time 1538487514780ms :false
time 1538487514780ms :true
time 1538487514780ms :false
time 1538487514780ms :false
time 1538487514780ms :false
time 1538487514780ms :true
time 1538487514780ms :true
复制代码

WarmingUp-TryAcquire

使用warmingUp的方式因为默认已经存储满了令牌,那么,它在第一次请求执行完以后,必须等待必定的时间才会让下一次请求开始,而这个请求放行的时间则是会超过存储所须要的时间

注意这里的不一样,默认是存储满的,也就是刚开始的消费要慢不少

RateLimiter r =RateLimiter.create(5,1,TimeUnit.SECONDS);
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10);

for (int i=0;i<10;i++)
{

    service.submit(new MyTask(latch, r));
    latch.countDown();
    System.out.println("countdown:" + latch.getCount());
}
System.out.println("countdown over");
service.shutdown();
复制代码

运行结果以下

countdown:9
countdown:8
countdown:7
countdown:6
countdown:5
countdown:4
countdown:3
countdown:2
countdown:1
countdown:0
countdown over
time 1538487677462ms :true
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
time 1538487677462ms :false
复制代码

Acquire运行案例

所须要的task源码以下

class MyTask implements Runnable{
    private CountDownLatch latch;
    private RateLimiter limiter;
    private long start;

    public MyTask(CountDownLatch latch, RateLimiter limiter,long start) {
        this.latch = latch;
        this.limiter = limiter;
       this.start=start;
    }

    @Override    public void run() {
        try {
            //使得线程同时触发            
           latch.await();
            System.out.printf("result:"+limiter.acquire(2));
            System.out.println(" time "+(System.currentTimeMillis()-start)+"ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
复制代码

Busty-Acquire

Acquire会阻塞运行的结果,并且会提早消费

RateLimiter r =RateLimiter.create(1); 
r.acquire();
System.out.println("time cost:"+(System.currentTimeMillis()-start)+"ms");
r.acquire();
System.out.println("time cost:"+(System.currentTimeMillis()-start)+"ms");
r.acquire(3);
System.out.println("time cost:"+(System.currentTimeMillis()-start)+"ms");
r.acquire();
System.out.println("time cost:"+(System.currentTimeMillis()-start)+"ms");
复制代码

第一次会立马运行,而后由于请求了一次,下次发放令牌的时间日后迁移,获取的令牌越多,下次可以运行须要等待的时间越长

运行结果为

time cost:0ms
time cost:1005ms
time cost:2004ms
time cost:5001ms
复制代码

在多线程背景运行以下

RateLimiter r =RateLimiter.create(1);
long start=System.currentTimeMillis();
r.acquire(3);
System.out.println("time cost:"+(System.currentTimeMillis()-start)+"ms");
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10);

for (int i=0;i<10;i++)
{

    service.submit(new MyTask(latch, r,start));
    latch.countDown();
    System.out.println("countdown:" + latch.getCount());
}
System.out.println("countdown over");
service.shutdown();
复制代码

结果以下

time cost:1ms
countdown:9
countdown:8
countdown:7
countdown:6
countdown:5
countdown:4
countdown:3
countdown:2
countdown:1
countdown:0
countdown over
result:2.995732 time 3024ms
result:4.995725 time 5006ms
result:6.995719 time 7007ms
result:8.995716 time 9006ms
result:10.995698 time 11004ms
result:12.995572 time 13006ms
result:14.995555 time 15007ms
result:16.995543 time 17005ms
result:18.995516 time 19005ms
result:20.995463 time 21005ms
复制代码

WarmingUp-acquire

warmingUp经过acquire的方式获取的令牌,一样会被按照同步的方式获取

RateLimiter r =RateLimiter.create(1,1,TimeUnit.SECONDS);
long start=System.currentTimeMillis();
r.acquire(3);
System.out.println("time cost:"+(System.currentTimeMillis()-start)+"ms”); ExecutorService service = Executors.newFixedThreadPool(10); CountDownLatch latch = new CountDownLatch(10); for (int i=0;i<10;i++) { service.submit(new MyTask(latch, r,start)); latch.countDown(); System.out.println("countdown:" + latch.getCount()); } System.out.println("countdown over"); service.shutdown(); 复制代码

结果以下

time cost:0ms
countdown:9
countdown:8
countdown:7
countdown:6
countdown:5
countdown:4
countdown:3
countdown:2
countdown:1
countdown:0
countdown over
result:3.496859 time 3521ms
result:5.496854 time 5506ms
result:7.49685 time 7505ms
result:9.496835 time 9504ms
result:11.496821 time 11505ms
result:13.496807 time 13502ms
result:15.496793 time 15504ms
result:17.496778 time 17506ms
result:19.496707 time 19506ms
result:21.496699 time 21506ms
复制代码

RateLimiter自己实现的就是一个令牌桶算法

相关文章
相关标签/搜索