限流是对某一时间窗口内的请求数进行限制,保持系统的可用性和稳定性,防止因流量暴增而致使的系统运行缓慢或宕机。经常使用的限流算法有令牌桶和和漏桶,而Google开源项目Guava中的RateLimiter使用的就是令牌桶控制算法。html
在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流java
咱们常常在调别人的接口的时候会发现有限制,好比微信公众平台接口、百度API Store、聚合API等等这样的,对方会限制天天最多调多少次或者每分钟最多调多少次git
咱们本身在开发系统的时候也须要考虑到这些,好比咱们公司在上传商品的时候就作了限流,由于用户每一次上传商品,咱们须要将商品数据同到到美团、饿了么、京东、百度、自营等第三方平台,这个工做量是巨大,频繁操做会拖慢系统,故作限流。github
以上都是题外话,接下来咱们重点看一下令牌桶算法web
下面是从网上找的两张图来描述令牌桶算法:算法
https://github.com/google/guavaspring
RateLimiter的代码不长,注释加代码432行,看一下RateLimiter怎么用segmentfault
1 package com.cjs.example; 2 3 import com.google.common.util.concurrent.RateLimiter; 4 import org.springframework.web.bind.annotation.RequestMapping; 5 import org.springframework.web.bind.annotation.RestController; 6 7 import java.text.SimpleDateFormat; 8 import java.util.Date; 9 10 @RestController 11 public class HelloController { 12 13 private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); 14 15 private static final RateLimiter rateLimiter = RateLimiter.create(2); 16 17 /** 18 * tryAcquire尝试获取permit,默认超时时间是0,意思是拿不到就当即返回false 19 */ 20 @RequestMapping("/sayHello") 21 public String sayHello() { 22 if (rateLimiter.tryAcquire()) { // 一次拿1个 23 System.out.println(sdf.format(new Date())); 24 try { 25 Thread.sleep(500); 26 } catch (InterruptedException e) { 27 e.printStackTrace(); 28 } 29 }else { 30 System.out.println("limit"); 31 } 32 return "hello"; 33 } 34 35 /** 36 * acquire拿不到就等待,拿到为止 37 */ 38 @RequestMapping("/sayHi") 39 public String sayHi() { 40 rateLimiter.acquire(5); // 一次拿5个 41 System.out.println(sdf.format(new Date())); 42 return "hi"; 43 } 44 45 }
final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second" void submitTasks(List<Runnable> tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); // may wait executor.execute(task); } }
SmoothBursty以稳定的速度生成permit缓存
SmoothWarmingUp是渐进式的生成,最终达到最大值趋于稳定安全
public abstract class RateLimiter { /** * 用给定的吞吐量(“permits per second”)建立一个RateLimiter。 * 一般是QPS */ public static RateLimiter create(double permitsPerSecond) { return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); } static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; } /** * 用给定的吞吐量(QPS)和一个预热期建立一个RateLimiter */ public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod); return create(permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer()); } static RateLimiter create( double permitsPerSecond, long warmupPeriod, TimeUnit unit, double coldFactor, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor); rateLimiter.setRate(permitsPerSecond); return rateLimiter; } private final SleepingStopwatch stopwatch; // 锁 private volatile Object mutexDoNotUseDirectly; private Object mutex() { Object mutex = mutexDoNotUseDirectly; if (mutex == null) { synchronized (this) { mutex = mutexDoNotUseDirectly; if (mutex == null) { mutexDoNotUseDirectly = mutex = new Object(); } } } return mutex; } /** * 从RateLimiter中获取一个permit,阻塞直到请求能够得到为止 * @return 休眠的时间,单位是秒,若是没有被限制则是0.0 */ public double acquire() { return acquire(1); } /** * 从RateLimiter中获取指定数量的permits,阻塞直到请求能够得到为止 */ public double acquire(int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); } /** * 预约给定数量的permits以备未来使用 * 直到这些预约数量的permits能够被消费则返回逝去的微秒数 */ final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } private static void checkPermits(int permits) { checkArgument(permits > 0, "Requested permits (%s) must be positive", permits); } final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } }
abstract class SmoothRateLimiter extends RateLimiter { /** The currently stored permits. */ double storedPermits; /** The maximum number of stored permits. */ double maxPermits; /** * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits * per second has a stable interval of 200ms. */ double stableIntervalMicros; /** * The time when the next request (no matter its size) will be granted. After granting a request, * this is pushed further in the future. Large requests push this further than small requests. */ private long nextFreeTicketMicros = 0L; // could be either in the past or future final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 本次能够获取到的permit数量 double freshPermits = requiredPermits - storedPermitsToSpend; // 差值,若是存储的permit大于本次须要的permit数量则此处是0,不然是一个正数 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); // 计算须要等待的时间(微秒) this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); this.storedPermits -= storedPermitsToSpend; // 减去本次消费的permit数 return returnValue; } void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { // 表示当前能够得到permit double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); // 计算这段时间能够生成多少个permit storedPermits = min(maxPermits, storedPermits + newPermits); // 若是超过maxPermit,则取maxPermit,不然取存储的permit+新生成的permit nextFreeTicketMicros = nowMicros; // 设置下一次能够得到permit的时间点为当前时间 } } }
RateLimiter实现的令牌桶算法,不只能够应对正常流量的限速,并且能够处理突发暴增的请求,实现平滑限流。
经过代码,咱们能够看到它能够预消费,怎么讲呢
nextFreeTicketMicros表示下一次请求得到permits的最先时间。每次受权一个请求之后,这个值会向后推移(PS:想象一下时间轴)即向将来推移。所以,大的请求会比小的请求推得更。这里的大小指的是获取permit的数量。这个应该很好理解,由于上一次请求获取的permit数越多,那么下一次再获取受权时更待的时候会更长,反之,若是上一次获取的少,那么时间向后推移的就少,下一次得到许可的时间更短。可见,都是有代价的。正所谓:要浪漫就要付出代价。
还要注意到一点,就是获取令牌和处理请求是两个动做,并且,并非每一次都获取一个,也不要想固然的认为一个请求获取一个permit(或者叫令牌),能够再看看前面那幅图
一个以纳秒为单位度量流逝时间的对象。它是一个相对时间,而不是绝对时间。
Stopwatch stopwatch = Stopwatch.createStarted(); System.out.println("hahah"); stopwatch.stop(); Duration duration = stopwatch.elapsed(); System.out.println(stopwatch);
A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.
一个信号量维护了一系列permits。
每次调用acquire()方法获取permit,若是必要的话会阻塞直到有一个permit可用为止。
调用release()方法则会释放本身持有的permit,即用完了再还回去。
信号量限制的是并发访问临界资源的线程数。
漏桶的出水速度是恒定的,那么意味着若是瞬时大流量的话,将有大部分请求被丢弃掉(也就是所谓的溢出)。
生成令牌的速度是恒定的,而请求去拿令牌是没有速度限制的。这意味,面对瞬时大流量,该算法能够在短期内请求拿到大量令牌,并且拿令牌的过程并非消耗很大的事情。
最后,不管是对于令牌桶拿不到令牌被拒绝,仍是漏桶的水满了溢出,都是为了保证大部分流量的正常使用,而牺牲掉了少部分流量,这是合理的,若是由于极少部分流量须要保证的话,那么就可能致使系统达到极限而挂掉,得不偿失。
https://en.wikipedia.org/wiki/Little%27s_law
the long-term average number L of customers in a stationary system is equal to the long-term average effective arrival rate λ multiplied by the average time W that a customer spends in the system. Expressed algebraically the law is:
在一个固定系统中,顾客的长期平均数量L等于顾客的长期平均到达速率λ乘以顾客在系统中平均花费的时间W。用公式表示为:
虽然这看起来很容易,但这是一个很是显著的举世瞩目的结果,由于这种关系“不受到达过程的分布,服务分布,服务顺序,或其余任何因素的影响”。这个结果适用于任何系统,特别是适用于系统内的系统。惟一的要求是系统必须是稳定的非抢占式的。
假设有一个应用程序没有简单的方法来度量响应时间。若是系统的平均数量和吞吐量是已知的,那么平均响应时间就是:
mean response time = mean number in system / mean throughput
平均响应时间 = 系统的平均数量 / 平均吞吐量.
想象一下,一家小商店只有一个柜台和一个可供浏览的区域,每次只能有一我的在柜台,而且没有人不买东西就离开。
因此这个系统大体是:进入 --> 浏览 --> 柜台结帐 --> 离开
在一个稳定的系统中,人们进入商店的速度就是他们到达商店的速度(咱们叫作到达速度),它们离开的速度叫作离开速度。
相比之下,到达速度超过离开速度表明是一个不稳定的系统,这就会形成等待的顾客数量将逐渐增长到无穷大。
前面的小定律告诉咱们,商店的平均顾客数量L等于有效的到达速度λ乘以顾客在商店的平均停留时间W。用公式表示为:
假设,顾客以每小时10个的速度到达,而且平均停留时间是0.5小时。那么这就意味着,任意时间商店的平均顾客数量是5
如今假设商店正在考虑作更多的广告,把到达率提升到每小时20。商店必须准备好容纳平均10人,或者必须将每一个顾客在商店中的时间减小到0.25小时。商店能够经过更快地结账或者增长更多的柜台来达到后者的目的。
咱们能够把前面的小定律应用到商店系统中。例如,考虑柜台和在柜台前排的队。假设平均有2我的在柜台前排队,咱们知道顾客到达速度是每小时10,因此顾客平均必须停留时间为0.2小时。
这是单机(单进程)的限流,是JVM级别的的限流,全部的令牌生成都是在内存中,在分布式环境下不能直接这么用。
若是咱们能把permit放到Redis中就能够在分布式环境中用了。
https://blog.csdn.net/jek123456/article/details/77152571
http://www.javashuo.com/article/p-ggrsfxum-dq.html
http://www.javashuo.com/article/p-hyhmlwnk-km.html
https://blog.csdn.net/charleslei/article/details/53152883
https://www.jianshu.com/p/8f548e469bbe