任何系统的性能都有一个上限,当并发量超过这个上限以后,可能会对系统形成毁灭性地打击。所以在任什么时候刻咱们都必须保护系统的并发请求数量不能超过某个阈值,限流就是为了完成这一目的。node
Guava RateLimiter 是一个谷歌提供的限流工具,RateLimiter 基于令牌桶算法实现,说明文档见:http://ifeve.com/guava-rateli...
使用示例以下:nginx
@Test public void rateLimiterTest() { //建立一个RateLimiter,指定每秒放0.5个令牌(2秒放1个令牌) val rateLimiter = RateLimiter.create(0.5); int[] a = {1,6,2}; for(int i = 0; i < a.length; ++i) { //acquire(x) 从RateLimiter获取x个令牌,该方法会被阻塞直到获取到请求 System.out.println(System.currentTimeMillis() + " acq " + a[i] + ": wait " + rateLimiter.acquire(a[i]) + "s"); } }
输出结果以下:git
1552389443244 acq 1: wait 0.0s 1552389443245 acq 6: wait 1.998468s 1552389445249 acq 2: wait 11.99443s
从输出结果能够看出,RateLimiter 具备预消费的能力:github
即:RateLimiter 经过限制后面请求的等待时间,来支持必定程度的突发请求 (预消费)算法
RateLimiter 有两种限流模式,一种为稳定模式 (SmoothBursty: 令牌生成速度恒定),一种为渐进模式 (SmoothWarmingUp: 令牌生成速度缓慢提高直到维持在一个稳定值)。缓存
RateLimiter 核心思想主要有:
响应本次请求以后,动态计算下一次能够服务的时间,若是下一次请求在这个时间以前则须要进行等待。SmoothRateLimiter 类中的 nextFreeTicketMicros 属性表示下一次能够响应的时间。例如,若是咱们设置 QPS 为 1,本次请求处理完以后,那么下一次最先的可以响应请求的时间一秒钟以后。
RateLimiter 的子类 SmoothBursty 支持处理突发流量请求,例如,咱们设置 QPS 为 1,在十秒钟以内没有请求,那么令牌桶中会有 10 个(假设设置的最大令牌数大于 10)空闲令牌,若是下一次请求是 acquire(20) ,则不须要等待 20 秒钟,由于令牌桶中已经有 10 个空闲的令牌。SmoothRateLimiter 类中的 storedPermits 就是用来表示当前令牌桶中的空闲令牌数。
SmoothWarmingUp 提出一种 “热身模型” 和 “冷却期” 的概念后面会详细介绍并发
SmoothRateLimiter 主要属性
SmoothRateLimiter 是抽象类,其定义了一些关键的参数,咱们先来看一下这些参数:dom
接下来看一下 SmoothBursty 中几个重要的方法ide
create(double permitsPerSecond) 根据指定的 QPS 数值建立 RateLimiter,底层调用方法以下:
SmoothBursty 的 maxBurstSeconds 构造函数参数主要用于计算 maxPermits :maxPermits = maxBurstSeconds * permitsPerSecond;。函数
@VisibleForTesting static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) { //1.建立SmoothBursty限流器 RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); //2.设置限流速率 rateLimiter.setRate(permitsPerSecond); return rateLimiter; }
再看 setRate 的方法,RateLimiter 中 setRate 方法最终后调用 doSetRate 方法,doSetRate 是一个抽象方法,SmoothRateLimiter 抽象类中覆盖了 RateLimiter 的 doSetRate 方法:
//// SmoothRateLimiter类中的doSetRate方法,覆盖了 RateLimiter 类中的 doSetRate 方法,此方法再委托下面的 doSetRate 方法作处理。 @Override final void doSetRate(double permitsPerSecond, long nowMicros) { resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); }
实现以下
@Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; //设置最大令牌数 if (oldMaxPermits == Double.POSITIVE_INFINITY) { storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 : storedPermits * maxPermits / oldMaxPermits; } }
acquire(int permits) 从 RateLimiter 获取 x 个令牌,该方法会被阻塞直到获取到请求;主要作了三件事
public double acquire(int permits) { long microsToWait = reserve(permits);//1.获取当前请求须要等待的时间(惰性计算 ) stopwatch.sleepMicrosUninterruptibly(microsToWait); //2.sleep microsToWait 时间窗口 return 1.0 * microsToWait / SECONDS.toMicros(1L);//3.返回microsToWait对应的秒级时间 } final long reserve(int permits) { checkPermits(permits); //检查参数是否>0 synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); //计算须要等待的时间 } }
该方法返回须要等待的时间,是 RateLimiter 的核心接口
RateLimiter 支持突发流量的本质就是,将当前须要的令牌数量 requiredPermits 拆分红 storedPermitsToSpend(持有令牌中可用的数量)和 freshPermits(须要预支的令牌数量);分别计算须要等待的时间,而后更新 nextFreeTicketMicros 下次获取令牌的时间
什么意思呢?举个例子:
当前 RateLimiter 持有 4 个令牌,当前请求须要 6 个令牌;则 6 个令牌中 4 个是能够从持有的令牌中直接获取,而另外两个须要预支的令牌则须要单独计算时间;
伪代码:getReqWaitTime(6) = getWaitTime(4) + getFreshWait(6 - 4)
而在 SmoothBursty 模式中, getWaitTime(4) 是能够直接获取的,即 time=0;getFreshWait(6 - 4) 则等于 freshPermits stableIntervalMicros (预支令牌数 生成一个令牌须要的时间)
@Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); //1.根据当前时间和预计下一秒时间判断有无新令牌产生,有则更新持有令牌数storedPermits 和 下次请求时间nextFreeTicketMicros long returnValue = nextFreeTicketMicros; //2.如下两句,根据请求须要的令牌数requiredPermits和storedPermits当前持有的令牌数storedPermits分别计算 持有令牌中可用的数量storedPermitsToSpend和须要预支的令牌数量freshPermits double storedPermitsToSpend = min(requiredPermits, this.storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)//3.分别计算storedPermitsToSpend和freshPermits的等待时间 + (long) (freshPermits * stableIntervalMicros); try { this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros); //4.更新nextFreeTicketMicros } catch (ArithmeticException e) { this.nextFreeTicketMicros = Long.MAX_VALUE; } this.storedPermits -= storedPermitsToSpend; //4.更新storedPermits return returnValue; }
WarmingUp 是 RateLimiter 的另外一种实例不一样于 SmoothBursty ,它存在一个 “热身” 的概念。即:若是当前系统处于 “ 冷却期”( 即一段时间没有获取令牌,即:当前持有的令牌数量大于某个阈值),则下一次获取令牌须要等待的时间比 SmoothBursty 模式下的线性时间要大,而且逐步降低到一个稳定的数值。
大体原理:将 storedPermits 分红两个区间值:[0, thresholdPermits) 和 [thresholdPermits, maxPermits]。当请求进来时,若是当前系统处于 "cold" 的冷却期状态,从 [thresholdPermits, maxPermits] 区间去拿令牌,所须要等待的时间会长于从区间 [0, thresholdPermits) 拿相同令牌所须要等待的时间。当请求增多,storedPermits 减小到 thresholdPermits 如下时,此时拿令牌所须要等待的时间趋于稳定。这也就是所谓 “热身” 的过程。
反应到代码上,和 SmoothBursty 的不一样有两点
其余部分原理相似。
WarmingUp 模式的限流器使用示例以下:
@Test public void rateLimiterTest2() throws InterruptedException { val rateLimiter = RateLimiter.create(5, 4000, TimeUnit.MILLISECONDS);//预热模式,设置预热时间和QPS,即在正式acquire前,限流器已经持有5*4=20个令牌 for(int i = 1; i < 50; i++) { System.out.println(System.currentTimeMillis() + " acq " + i + ": wait " + rateLimiter.acquire() + "s"); if(i == 15) { Thread.sleep(2000); System.out.println(System.currentTimeMillis() + " acq " + 15 + ": wait " + rateLimiter.acquire() + "s"); } } }
输出结果以下:
1552395652026 acq 1: wait 0.0s 1552395652028 acq 2: wait 0.578357s 1552395652612 acq 3: wait 0.533835s 1552395653151 acq 4: wait 0.495191s 1552395653649 acq 5: wait 0.457239s 1552395654110 acq 6: wait 0.41631s 1552395654528 acq 7: wait 0.377524s 1552395654912 acq 8: wait 0.334018s 1552395655248 acq 9: wait 0.298249s 1552395655550 acq 10: wait 0.256165s 1552395655808 acq 11: wait 0.217752s 1552395656028 acq 12: wait 0.197672s 1552395656231 acq 13: wait 0.19451s 1552395656429 acq 14: wait 0.196465s 1552395656630 acq 15: wait 0.195714s 1552395658834 acq 15: wait 0.0s 1552395658834 acq 16: wait 0.34158s 1552395659180 acq 17: wait 0.296628s 1552395659482 acq 18: wait 0.256914s 1552395659744 acq 19: wait 0.216517s 1552395659965 acq 20: wait 0.195077s 1552395660164 acq 21: wait 0.195953s 1552395660365 acq 22: wait 0.195196s 1552395660564 acq 23: wait 0.196015s 1552395660764 acq 24: wait 0.195972s
从输出结果能够看出,RateLimiter 具备预消费的能力:
SmoothWarmingUp 是 SmoothRateLimiter 的子类,它相对于 SmoothRateLimiter 多了几个属性:
SmoothRateLimiter 类的注释文档中有对预热模型的详细解释
横坐标:是当前令牌桶中的令牌 storedPermits,前面说过 SmoothWarmingUp 将 storedPermits 分为两个区间:[0, thresholdPermits) 和 [thresholdPermits, maxPermits]。
纵坐标:请求的间隔时间,stableInterval 就是 1 / QPS,例如设置的 QPS 为 5,则 stableInterval 就是 200ms,coldInterval = stableInterval * coldFactor,这里的 coldFactor 硬编码写死的是 3。
当系统请求增多,图像会像左移动,直到 storedPermits 为 0。等待一段时间后,随着令牌的生成当系统进入 cold 阶段时,图像会向右移,直到 storedPermits 等于 maxPermits。
create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
根据指定的 QPS 和预热期来建立 RateLimiter,在这段预热时间内,RateLimiter 每秒分配的许可数会平稳地增加直到预热期结束时达到其最大速率。
@VisibleForTesting static RateLimiter create( SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit, double coldFactor) { RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);//1.建立SmoothWarmingUp限流器 rateLimiter.setRate(permitsPerSecond);//2.设置限流速率 return rateLimiter; } public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod); return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond, warmupPeriod, unit, 3.0); } SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) { super(stopwatch); this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); //1.设置预热时间 this.coldFactor = coldFactor;//3.设置coldFactor为3 } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = maxPermits; double coldIntervalMicros = stableIntervalMicros * coldFactor; //1.设置冷却期等待时间数值coldIntervalMicros thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;//2.设置冷却期的阈值,thresholdPermits等于预热期产生令牌数的一半 maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);//3.设置持有令牌的最大值,为thresholdPermits的2倍 slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);//4.设置预热区的斜率;纵坐标之差/横坐标之差 if (oldMaxPermits == Double.POSITIVE_INFINITY) { storedPermits = 0.0; } else { storedPermits = (oldMaxPermits == 0.0) ? maxPermits : storedPermits * maxPermits / oldMaxPermits; } }
前面说到,SmoothWarmingUp 和 SmoothBursty 的一个重要区别就在于 “获取当前令牌中可用令牌的等待时间”storedPermitsToWaitTime 方法, 而 “获取预支令牌的等待时间” 和以前一致。
@Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { double availablePermitsAboveThreshold = storedPermits - thresholdPermits;//1.获取当前持有令牌数和阈值的差值availablePermitsAboveThreshold long micros = 0; if (availablePermitsAboveThreshold > 0.0) {//2.若是availablePermitsAboveThreshold>0,即当前持有令牌数>阈值,即到达冷区期;计算等待时间 double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);//3.计算WARM UP PERIOD部分计算的方法,这部分是一个梯形,梯形的面积计算公式是 “(上底 + 下底) * 高 / 2” micros = (long) (permitsAboveThresholdToTake * (permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)) / 2.0); permitsToTake -= permitsAboveThresholdToTake;//4.剩余的令牌从 stable部分拿 } micros += (stableIntervalMicros * permitsToTake);//5.stable 部分令牌获取花费的时间 return micros; }
如何理解这个方法?
举个例子:
建立限流器时 create(5, 4000, TimeUnit.MILLISECONDS);预热了 20 个令牌
场景 1:
当前持有 20 个令牌,请求一个令牌;须要等待的时间为:
场景 2:
当前持有 18 个令牌,请求 1 个令牌;须要等待的时间为:
场景 3:
当前持有 20 个令牌,一次性请求 11 个令牌;须要等待的时间为:
场景 4:
当前持有 10 个令牌,一次性请求 1 个令牌;须要等待的时间为:
小结:
总结一下 SmoothWarmingUp 和 SmoothBursty 的建立和使用令牌的过程:
SmoothBursty
SmoothWarmingUp
最后
SmoothWarmingUp 和 SmoothBursty 的最大区别就在于,“获取已持有令牌中可用令牌的等待时间” 不一样,SmoothBursty 是直接返回的,SmoothWarmingUp 则是基于 “热身模型” 和 “冷却期”(即一段时间没有获取令牌,衡量指标:当前持有的令牌数量大于某个阈值)的机制进行动态调整(冷却期按照梯形区域返回,不然按照矩形区域返回)
预支令牌的等待时间算法一致,waitTime = 预支令牌数量 * 生成一个令牌须要的时间(1/QPS)
SmoothWarmingUp 为系统提供一种冷启动的可能,例如:某系统底层使用缓存中间件,假如没有 “热身”,突发流量极可能形成缓存击穿等问题;WarmingUp 让系统应对突发流量有一个 “渐进准备资源” 的过程
Rhino 使用的令牌桶的平滑限流,即 WarmingUp 模式:Rhino C++ SDK 说明文档
nginx 有两个限流模块,从 github 上 clone 代码,位置在 nginx/src/http/modules 目录下:
二者都是按照 IP 或者域名限制的
本次调研仅聚焦其限流原理,相关配置参考: limit_req 官方说明 limit_conn 官方说明
ngx_http_limit_req_module 限流核心思想:
通俗来说:就是建立一个令牌的时间,只能接收并处理一个请求,其余的排队或者直接丢弃
用户可能同时配置若干限流,所以对于 HTTP 请求,nginx 须要遍历全部限流策略,判断是否须要限流;
ngx_http_limit_req_lookup 方法实现了漏桶算法,方法返回 3 种结果:
//limit,限流策略;hash,记录key的hash值;data,记录key的数据内容;len,记录key的数据长度;ep,待处理请求数目;account,是不是最后一条限流策略 static ngx_int_t ngx_http_limit_req_lookup(ngx_http_limit_req_limit_t *limit, ngx_uint_t hash, u_char *data, size_t len, ngx_uint_t *ep, ngx_uint_t account) { //红黑树查找指定界定,sentinel表明红黑树的NULL节点 while (node != sentinel) { if (hash < node->key) { node = node->left; continue; } if (hash > node->key) { node = node->right; continue; } //hash值相等,比较数据是否相等 lr = (ngx_http_limit_req_node_t *) &node->color; rc = ngx_memn2cmp(data, lr->data, len, (size_t) lr->len); //查找到 if (rc == 0) { ngx_queue_remove(&lr->queue); ngx_queue_insert_head(&ctx->sh->queue, &lr->queue); //将记录移动到LRU队列头部 ms = (ngx_msec_int_t) (now - lr->last); //当前时间减去上次访问时间 if (ms < -60000) { ms = 1; } else if (ms < 0) { ms = 0; } //漏桶算法 excess = lr->excess - ctx->rate * ms / 1000 + 1000; //待处理请求书-限流速率*时间段+1个请求(速率,请求数等都乘以1000了) if (excess < 0) { excess = 0; } *ep = excess; //待处理数目超过burst(等待队列大小),返回NGX_BUSY拒绝请求(没有配置burst时,值为0) if ((ngx_uint_t) excess > limit->burst) { return NGX_BUSY; } if (account) { //若是是最后一条限流策略,则更新上次访问时间,待处理请求数目,返回NGX_OK lr->excess = excess; lr->last = now; return NGX_OK; } //访问次数递增 lr->count++; ctx->node = lr; return NGX_AGAIN; //非最后一条限流策略,返回NGX_AGAIN,继续校验下一条限流策略 } node = (rc < 0) ? node->left : node->right; } //假如没有查找到节点,须要新建一条记录 *ep = 0; size = offsetof(ngx_rbtree_node_t, color) + offsetof(ngx_http_limit_req_node_t, data) + len; //尝试淘汰记录(LRU) ngx_http_limit_req_expire(ctx, 1); node = ngx_slab_alloc_locked(ctx->shpool, size);//分配空间 if (node == NULL) { //空间不足,分配失败 ngx_http_limit_req_expire(ctx, 0); //强制淘汰记录 node = ngx_slab_alloc_locked(ctx->shpool, size); //分配空间 if (node == NULL) { //分配失败,返回NGX_ERROR return NGX_ERROR; } } node->key = hash; //赋值 lr = (ngx_http_limit_req_node_t *) &node->color; lr->len = (u_char) len; lr->excess = 0; ngx_memcpy(lr->data, data, len); ngx_rbtree_insert(&ctx->sh->rbtree, node); //插入记录到红黑树与LRU队列 ngx_queue_insert_head(&ctx->sh->queue, &lr->queue); if (account) { //若是是最后一条限流策略,则更新上次访问时间,待处理请求数目,返回NGX_OK lr->last = now; lr->count = 0; return NGX_OK; } lr->last = 0; lr->count = 1; ctx->node = lr; return NGX_AGAIN; //非最后一条限流策略,返回NGX_AGAIN,继续校验下一条限流策略 }
当一个新请求进入 Nginx 的限流流程大体以下:
解释一下相关的变量:
excess:积压等待处理的请求数量(也就是桶中积压的令牌数量) 乘 1000(nginx 计算的时候单位换算乘 1000)
ctx->rate:限流的速率乘 1000(例如:设置当前的 IP 限流速率为 5 / 秒,则 rate 等于 5000;乘 1000 是 Nginx 内部的单位换算)
ms 是当前请求和上次成功请求时间的差值,单位毫秒
怎么理解这个表达式呢?
假设场景:
ms / 1000 的意思是本次请求在 1s 中的占比,ctx->rate * ms / 1000 意思是这段时间能够流过的请求数
1000 表明当次请求,即为 1(nginx 计算的时候单位换算乘 1000)
lr->excess - ctx->rate * ms / 1000 + 1000:的意思就是 当前积压令牌数 = 上次积压令牌数 - 这段时间能够产生的令牌数 + 本次请求(1 个令牌)
漏斗的本质:当 excess > limit->burst;即积压令牌 excess > 桶的最大容量,拒绝当前请求
举个例子,假设:
lr->excess 初始化为 0*1000
burst(桶最大容量)为 0*1000
令牌产生周期为 T,请求以下图所示
limit_conn 模块用来限制某个 IP 的并发链接数。它的实现与 limit_req 模块相似,总体逻辑和实现更为简单。limit_conn 模块也将某个 IP 的信息存储在红黑树的节点中。
涉及两个核心方法:
源代码详见:https://github.com/nginx/ngin...
conn_handler 方法处理请求的大体流程以下:
请求处理完成后,对当前节点链接数减 1,若当前节点链接数减至 0,析构当前节点,回收内存
ngx_http_limit_req_module 源码分析
ngx_http_limit_conn_module 源码分析