在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流java
缓存
缓存的目的是提高系统访问速度和增大系统处理容量降级
降级是当服务出现问题或者影响到核心流程时,须要暂时屏蔽掉,待高峰或者问题解决后再打开限流
限流的目的是经过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则能够拒绝服务、排队或等待、降级等处理漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以必定的速度出水,当水流入速度过大会直接溢出,能够看出漏桶算法能强行限制数据的传输速率。
对于不少应用场景来讲,除了要求可以限制数据的平均传输速率外,还要求容许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而若是请求须要被处理,则须要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便,并且十分高效。
首先简单介绍下RateLimiter的使用,算法
public void testAcquire() { RateLimiter limiter = RateLimiter.create(1); for(int i = 1; i < 10; i = i + 2 ) { double waitTime = limiter.acquire(i); System.out.println("cutTime=" + System.currentTimeMillis() + " acq:" + i + " waitTime:" + waitTime); } }
输出结果:shell
cutTime=1535439657427 acq:1 waitTime:0.0 cutTime=1535439658431 acq:3 waitTime:0.997045 cutTime=1535439661429 acq:5 waitTime:2.993028 cutTime=1535439666426 acq:7 waitTime:4.995625 cutTime=1535439673426 acq:9 waitTime:6.999223
首先经过RateLimiter.create(1);
建立一个限流器,参数表明每秒生成的令牌数,经过limiter.acquire(i);
来以阻塞的方式获取令牌,固然也能够经过tryAcquire(int permits, long timeout, TimeUnit unit)
来设置等待超时时间的方式获取令牌,若是超timeout为0,则表明非阻塞,获取不到当即返回。segmentfault
从输出来看,RateLimiter支持预消费,好比在acquire(5)时,等待时间是3秒,是上一个获取令牌时预消费了3个两排,固须要等待3*1秒,而后又预消费了5个令牌,以此类推缓存
RateLimiter经过限制后面请求的等待时间,来支持必定程度的突发请求(预消费),在使用过程当中须要注意这一点,具体实现原理后面再分析。并发
Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提高直到维持在一个稳定值) 两种模式实现思路相似,主要区别在等待时间的计算上,本篇重点介绍SmoothBursty
经过调用RateLimiter的create
接口来建立实例,实际是调用的SmoothBuisty
稳定模式建立的实例。ide
public static RateLimiter create(double permitsPerSecond) { return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); } static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; }
SmoothBursty
中的两个构造参数含义:函数
在解析SmoothBursty原理前,重点解释下SmoothBursty中几个属性的含义高并发
/** * The work (permits) of how many seconds can be saved up if this RateLimiter is unused? * 在RateLimiter未使用时,最多存储几秒的令牌 * */ final double maxBurstSeconds; /** * The currently stored permits. * 当前存储令牌数 */ double storedPermits; /** * The maximum number of stored permits. * 最大存储令牌数 = maxBurstSeconds * stableIntervalMicros(见下文) */ double maxPermits; /** * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits * per second has a stable interval of 200ms. * 添加令牌时间间隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌数) */ double stableIntervalMicros; /** * The time when the next request (no matter its size) will be granted. After granting a request, * this is pushed further in the future. Large requests push this further than small requests. * 下一次请求能够获取令牌的起始时间 * 因为RateLimiter容许预消费,上次请求预消费令牌后 * 下次请求须要等待相应的时间到nextFreeTicketMicros时刻才能够获取令牌 */ private long nextFreeTicketMicros = 0L; // could be either in the past or future
接下来介绍几个关键函数工具
public final void setRate(double permitsPerSecond) { checkArgument( permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive"); synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } }
经过这个接口设置令牌通每秒生成令牌的数量,内部时间经过调用SmoothRateLimiter
的doSetRate
来实现
@Override final void doSetRate(double permitsPerSecond, long nowMicros) { resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); }
这里先经过调用resync
生成令牌以及更新下一期令牌生成时间,而后更新stableIntervalMicros,最后又调用了SmoothBursty
的doSetRate
/** * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. * 基于当前时间,更新下一次请求令牌的时间,以及当前存储的令牌(能够理解为生成令牌) */ void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } }
根据令牌桶算法,桶中的令牌是持续生成存放的,有请求时须要先从桶中拿到令牌才能开始执行,谁来持续生成令牌存放呢?
一种解法是,开启一个定时任务,由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口须要分别对每一个用户作访问频率限制,假设系统中存在6W用户,则至多须要开启6W个定时任务来维持每一个桶中的令牌数,这样的开销是巨大的。
另外一种解法则是延迟计算,如上resync
函数。该函数会在每次获取令牌以前调用,其实现思路为,若当前时间晚于nextFreeTicketMicros,则计算该段时间内能够生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只须要在获取令牌时计算一次便可。
@Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below // Double.POSITIVE_INFINITY 表明无穷啊 storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } }
桶中可存放的最大令牌数由maxBurstSeconds计算而来,其含义为最大存储maxBurstSeconds秒生成的令牌。
该参数的做用在于,能够更为灵活地控制流量。如,某些接口限制为300次/20秒,某些接口限制为50次/45秒等。也就是流量不局限于qps
在了解以上概念后,就很是容易理解RateLimiter暴露出来的接口
@CanIgnoreReturnValue public double acquire() { return acquire(1); } /** * 获取令牌,返回阻塞的时间 **/ @CanIgnoreReturnValue public double acquire(int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); } final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } }
acquire
函数主要用于获取permits个令牌,并计算须要等待多长时间,进而挂起等待,并将该值返回,主要经过reserve
返回须要等待的时间,reserve
中经过调用reserveAndGetWaitLength
获取等待时间
/** * Reserves next ticket and returns the wait time that the caller must wait for. * * @return the required wait time, never negative */ final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); }
最后调用了reserveEarliestAvailable
@Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this.storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); this.storedPermits -= storedPermitsToSpend; return returnValue; }
首先经过resync生成令牌以及同步nextFreeTicketMicros时间戳,freshPermits从令牌桶中获取令牌后还须要的令牌数量,经过storedPermitsToWaitTime计算出获取freshPermits还须要等待的时间,在稳定模式中,这里就是(long) (freshPermits * stableIntervalMicros) ,而后更新nextFreeTicketMicros以及storedPermits,此次获取令牌须要的等待到的时间点, reserveAndGetWaitLength返回须要等待的时间间隔。 从`reserveEarliestAvailable`能够看出RateLimiter的预消费原理,以及获取令牌的等待时间时间原理(能够解释示例结果),再获取令牌不足时,并无等待到令牌所有生成,而是更新了下次获取令牌时的nextFreeTicketMicros,从而影响的是下次获取令牌的等待时间。 `reserve`这里返回等待时间后,`acquire`经过调用`stopwatch.sleepMicrosUninterruptibly(microsToWait);`进行sleep操做,这里不一样于Thread.sleep(), 这个函数的sleep是uninterruptibly的,内部实现:
public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) { //sleep 阻塞线程 内部经过Thread.sleep() boolean interrupted = false; try { long remainingNanos = unit.toNanos(sleepFor); long end = System.nanoTime() + remainingNanos; while (true) { try { // TimeUnit.sleep() treats negative timeouts just like zero. NANOSECONDS.sleep(remainingNanos); return; } catch (InterruptedException e) { interrupted = true; remainingNanos = end - System.nanoTime(); //若是被interrupt能够继续,更新sleep时间,循环继续sleep } } } finally { if (interrupted) { Thread.currentThread().interrupt(); //若是被打断过,sleep事后再真正中断线程 } } }
sleep以后,`acquire`返回sleep的时间,阻塞结束,获取到令牌。
public boolean tryAcquire(int permits) { return tryAcquire(permits, 0, MICROSECONDS); } public boolean tryAcquire() { return tryAcquire(1, 0, MICROSECONDS); } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); checkPermits(permits); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); if (!canAcquire(nowMicros, timeoutMicros)) { return false; } else { microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; } private boolean canAcquire(long nowMicros, long timeoutMicros) { return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; } @Override final long queryEarliestAvailable(long nowMicros) { return nextFreeTicketMicros; }
tryAcquire
函数能够尝试在timeout时间内获取令牌,若是能够则挂起等待相应时间并返回true,不然当即返回falsecanAcquire
用于判断timeout时间内是否能够获取令牌,经过判断当前时间+超时时间是否大于nextFreeTicketMicros 来决定是否可以拿到足够的令牌数,若是能够获取到,则过程同acquire,线程sleep等待,若是经过canAcquire
在此超时时间内不能回去到令牌,则能够快速返回,不须要等待timeout后才知道可否获取到令牌。
由于SmoothBursty容许必定程度的突发,会有人担忧若是容许这种突发,假设忽然间来了很大的流量,那么系统极可能扛不住这种突发。所以须要一种平滑速率的限流工具,从而系统冷启动后慢慢的趋于平均固定速率(即刚开始速率小一些,而后慢慢趋于咱们设置的固定速率)。Guava也提供了SmoothWarmingUp来实现这种需求,其能够认为是漏桶算法,可是在某些特殊场景又不太同样。
SmoothWarmingUp建立方式:
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod); return create( permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer()); }
permitsPerSecond表示每秒新增的令牌数,warmupPeriod表示在从冷启动速率过渡到平均速率的时间间隔,大体原理是相似的,这里就先不分析了。
到此,Guava RateLimiter稳定模式的实现原理基本已经清楚,如发现文中错误的地方,劳烦指正!
上述分析主要参考了:https://segmentfault.com/a/11...,再此基础上作了些笔记补充