统计一段时间内容许经过的请求数。好比 qps为100,即1s内容许经过的请求数100,每来一个请求计数器加1,超过100的请求拒绝、时间过1s后计数器清0,从新计数。这样限流比较暴力,若是前10ms 来了100个请求,那剩下的990ms只能眼睁睁看着请求被过滤掉,并不能平滑处理这些请求,容易出现常说的“突刺现象”。redis
计数器算法流程图以下算法
请求到来时先放入漏桶中,漏桶再以匀速放行请求,若是进来请求超出了漏桶的容量时,则拒绝请求,这样作虽然可以避免“突刺现象”,可是过于平滑并不能应对短时的突发流量。
具体实现可采起将到来的请求放入队列中,再另起线程从队列中匀速拿出请求放行。spring
假设有个桶,而且会以必定速率往桶中投放令牌,每次请求来时都要去桶中拿令牌,若是拿到则放行,拿不到则进行等待直至拿到令牌为止,好比以每秒100的速度往桶中投放令牌,令牌桶初始化一秒事后桶内有100个令牌,若是大量请求来时会当即消耗完100个令牌,其他请求进行等待,最终以匀速方式放行这些请求。此算法的好处在于既能应对短暂瞬时流量,又能够平滑处理请求。数组
令牌桶限流流程图缓存
核心限流类:org.springframework.cloud.gateway.filter.ratelimit.RedisRateLimiter
核心方法以下:安全
public Mono<Response> isAllowed(String routeId, String id) { if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } Config routeConfig = loadConfiguration(routeId); //令牌桶平均投放速率 int replenishRate = routeConfig.getReplenishRate(); //桶容量 int burstCapacity = routeConfig.getBurstCapacity(); try { //获取限流key List<String> keys = getKeys(id); //组装lua脚本执行参数,第一个参数投放令牌速率、第二个参数桶的容量、第三个参数当前时间戳,第四个参数须要获取的令牌个数 List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1"); //经过lua脚本与redis交互获取令牌,返回数组,数组第一个元素表明是否获取成功(1成功0失败),第二个参数表明剩余令牌数 Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); //若是获取令牌异常,默认设置获取结果【一、-1】,顾默认获取令牌成功、剩余令牌-1,不作限流控制 return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) .reduce(new ArrayList<Long>(), (longs, l) -> { longs.addAll(l); return longs; }).map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft)); if (log.isDebugEnabled()) { log.debug("response: " + response); } return response; }); } catch (Exception e) { log.error("Error determining if user allowed from redis", e); } return Mono.just(new Response(true, getHeaders(routeConfig, -1L))); }
gateway限流lua脚本实现以下:微信
lua脚本分析及备注以下:多线程
--令牌桶剩余令牌数key local tokens_key = KEYS[1] --令牌桶最后填充时间key local timestamp_key = KEYS[2] --往令牌桶投放令牌速率 local rate = tonumber(ARGV[1]) --令牌桶大小 local capacity = tonumber(ARGV[2]) --当前数据戳 local now = tonumber(ARGV[3]) --请求获取令牌数量 local requested = tonumber(ARGV[4]) --计算令牌桶填充满须要的时间 local fill_time = capacity/rate --保证时间充足 local ttl = math.floor(fill_time*2) --获取redis中剩余令牌数 local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then last_tokens = capacity end --获取redis中最后一次更新令牌的时间 local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end local delta = math.max(0, now-last_refreshed) --计算出须要更新redis里的令牌桶数量(经过 过去的时间间隔内须要投放的令牌数+桶剩余令牌) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 --消耗令牌后,从新计算出须要更新redis缓存里的令牌数 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end --互斥更新redis 里的剩余令牌数 redis.call("setex", tokens_key, ttl, new_tokens) --互斥更新redis 里的最新更新令牌时间 redis.call("setex", timestamp_key, ttl, now) return { allowed_num, new_tokens }
spring cloud gateway经过redis实现令牌桶算法的流程图以下并发
总结: 经过redis 实现令牌桶算法限流,支持集群限流、但限速有上限,毕竟和redis交互须要消耗较长时间,限流没加锁虽然能够提高网关吞吐量,但实际并非知足线程安全,且还存在一个问题,例如桶大小10,往桶投放令牌速率为100/1s,当桶内10令牌消耗完后,这时两个正常的请求q1 和q2同时进入网关,若是q1恰好拿到产生新的令牌放行,q2则须要再过10ms才能获取新的令牌,因为两个请求间隔很短<10ms,致使q2去桶中拿不到令牌而被拦截为超速请求,致使缘由gateway未对消耗完桶后的请求进行入队等待。app
测试
设置令牌桶大小为5,投放速率为10/s ,配置以下
server: port: 8081 spring: cloud: gateway: routes: - id: limit_route uri: http://localhost:19090 predicates: - After=2017-01-20T17:42:47.789-07:00[America/Denver] filters: - name: RequestRateLimiter args: key-resolver: '#{@uriKeyResolver}' redis-rate-limiter.replenishRate: 10 redis-rate-limiter.burstCapacity: 5 application: name: gateway-limiter
如今用jmeter模拟10个并发请求,查看可以正常经过的请求数有多少?
运行结果:
经过打印结果发现,10个请求被拦截了5个请求。在实际应该中,10个请求或许都是正常请求,并无超过10qps却被拦截。
zuul-ratelimt 支持memory、redis限流,经过计数器算法实现限流,即在窗口时间内消耗指定数量的令牌后限流,窗口时间刷新后从新指定消耗令牌数量为0。
核心代码以下:
public class RateLimitFilter extends ZuulFilter { public static final String LIMIT_HEADER = "X-RateLimit-Limit"; public static final String REMAINING_HEADER = "X-RateLimit-Remaining"; public static final String RESET_HEADER = "X-RateLimit-Reset"; private static final UrlPathHelper URL_PATH_HELPER = new UrlPathHelper(); private final RateLimiter rateLimiter; private final RateLimitProperties properties; private final RouteLocator routeLocator; private final RateLimitKeyGenerator rateLimitKeyGenerator; @Override public String filterType() { return "pre"; } @Override public int filterOrder() { return -1; } @Override public boolean shouldFilter() { return properties.isEnabled() && policy(route()).isPresent(); } public Object run() { final RequestContext ctx = RequestContext.getCurrentContext(); final HttpServletResponse response = ctx.getResponse(); final HttpServletRequest request = ctx.getRequest(); final Route route = route(); policy(route).ifPresent(policy -> //生成限流key final String key = rateLimitKeyGenerator.key(request, route, policy); //执行核心限流方法,返回剩余能够用令牌数,若是rate.remaining<0则已超出流量限制 final Rate rate = rateLimiter.consume(policy, key); response.setHeader(LIMIT_HEADER, policy.getLimit().toString()); response.setHeader(REMAINING_HEADER, String.valueOf(Math.max(rate.getRemaining(), 0))); response.setHeader(RESET_HEADER, rate.getReset().toString()); if (rate.getRemaining() < 0) { ctx.setResponseStatusCode(TOO_MANY_REQUESTS.value()); ctx.put("rateLimitExceeded", "true"); throw new ZuulRuntimeException(new ZuulException(TOO_MANY_REQUESTS.toString(), TOO_MANY_REQUESTS.value(), null)); } }); return null; } }
核心限流方法rateLimiter.consume(policy, key)代码以下:
public abstract class AbstractCacheRateLimiter implements RateLimiter { @Override public synchronized Rate consume(Policy policy, String key, Long requestTime) { final Long refreshInterval = policy.getRefreshInterval(); final Long quota = policy.getQuota() != null ? SECONDS.toMillis(policy.getQuota()) : null; final Rate rate = new Rate(key, policy.getLimit(), quota, null, null); calcRemainingLimit(policy.getLimit(), refreshInterval, requestTime, key, rate); return rate; } protected abstract void calcRemainingLimit(Long limit, Long refreshInterval, Long requestTime, String key, Rate rate); } @Slf4j @RequiredArgsConstructor @SuppressWarnings("unchecked") public class RedisRateLimiter extends AbstractCacheRateLimiter { private final RateLimiterErrorHandler rateLimiterErrorHandler; private final RedisTemplate redisTemplate; @Override protected void calcRemainingLimit(final Long limit, final Long refreshInterval, final Long requestTime, final String key, final Rate rate) { if (Objects.nonNull(limit)) { long usage = requestTime == null ? 1L : 0L; Long remaining = calcRemaining(limit, refreshInterval, usage, key, rate); rate.setRemaining(remaining); } } private Long calcRemaining(Long limit, Long refreshInterval, long usage, String key, Rate rate) { rate.setReset(SECONDS.toMillis(refreshInterval)); Long current = 0L; try { current = redisTemplate.opsForValue().increment(key, usage); // Redis returns the value of key after the increment, check for the first increment, and the expiration time is set if (current != null && current.equals(usage)) { handleExpiration(key, refreshInterval); } } catch (RuntimeException e) { String msg = "Failed retrieving rate for " + key + ", will return the current value"; rateLimiterErrorHandler.handleError(msg, e); } return Math.max(-1, limit - current); } private void handleExpiration(String key, Long refreshInterval) { try { this.redisTemplate.expire(key, refreshInterval, SECONDS); } catch (RuntimeException e) { String msg = "Failed retrieving expiration for " + key + ", will reset now"; rateLimiterErrorHandler.handleError(msg, e); } } }
总结: 你们有没有注意到限流方法前面加了synchronized 锁,虽然保证了线程安全,但这里会存在一个问题,若是此限流方法执行时间2ms,即持锁时间过长(主要是和redis交互耗时),会致使整个网关的吞吐量不会超过500qps,因此在用redis作限流时建议作分key锁,每一个限流key之间互不影响,即保证了限流的安全性,又提升了网关的吞吐量。用memory作限流不须要考虑这个问题,由于本地限流持锁时间足够短,便是执行限流方法是串行的,但也能够拥有很高的吞吐量,zuul-ratelimt限流算法采用计数器限流,顾都有一个通病,避免不了“突刺现象”。
Guava RateLimiter提供了令牌桶算法实现:平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)实现,代码实现时序图以下:
测试
平滑预热限流,qps为10,看下去拿10个令牌依次耗时状况
/** * 平滑预热限流(SmoothWarmingUp) */ public class SmoothWarmingUp { public static void main(String[] args) { RateLimiter limiter = RateLimiter.create(10, 1000, TimeUnit.MILLISECONDS); for(int i = 0; i < 10;i++) { //获取一个令牌 System.out.println(limiter.acquire(1)); } } }
运行结果:返回线程等待的时间
平滑突发限流
/* 平滑突发限流(SmoothBursty) */ public class SmoothBurstyRateLimitTest { public static void main(String[] args) { //QPS = 5,每秒容许5个请求 RateLimiter limiter = RateLimiter.create(5); //limiter.acquire() 返回获取token的耗时,以秒为单位 System.out.println(limiter.acquire()); System.out.println(limiter.acquire()); System.out.println(limiter.acquire()); System.out.println(limiter.acquire()); System.out.println(limiter.acquire()); System.out.println(limiter.acquire()); } }
运行结果:
总结:Guava限流思路主要是经过计算下一个可用令牌的等待时间,去休眠线程,休眠结束后默认成功得到令牌,平滑预热算法SmoothWarmingUp也是相似,只是刚开始计算获取令牌的速率要比设定限流的速率底,最后再慢慢趋于限流速率。SmoothWarmingUp限流不适用中低频限流,在常规的应用限流中,好比咱们设定guava的限速为100qps,在同一个时间点来了q一、q二、q3三个正常请求,那么q1会被迫等待10ms,q2被迫等待20ms,q3被迫等待30ms,在除高并发的应用场景中是常常出现这种状况的,应用持续高并发情景并很少,只是在较短期内来了多个正常请求,却被迫等待必定时间,下降了请求的响应速度,在这种场景下算法显得过于平滑,仍是主要适用高并发应用场景 如秒杀场景等。SmoothBursty限流则不会,它有“桶”的概念,“桶”中令牌没拿完前是不会限速的,桶大小为限流速率大小,不支持自动调整桶大小。
自写限流目标:为了即保证限流算法线程安全,又能提升网关吞吐量,my-ratelimit网关限流采用分key锁,不一样key之间限流互不影响。为知足多业务场景,my-ratelimit支持了自定义限流维度、多维度限流、多维度自由排列组合限流、支持自选限流算法及仓库类型。
总结:my-ratelimit令牌桶限流算法核心思想:每来一个请求都会先作投放令牌操做,投放数量根据当前时间距离上次投放时间的时间段占1s的比例乘以限流速率limit计算而得,可能投放数量为0,投放完后再去桶中取令牌,若是取到了令牌则请求放行,若没有令牌则线程进入AQS同步队列中,直到有令牌产生再依次去唤醒队列中的线程来获取令牌。在实际的业务场景中,高频的时间段其实并很少,大都是低频的请求,为了尽量提升请求响应速度,知足低频“不限流”,高频平滑限流的指标,刚来的请求不会先入AQS同步队列中,而是先去拿令牌,当拿不到令牌时说明此时段流量比较大,再进入队列等待获取令牌达到平滑限流目的。另外在进来的请求前加了一个判断,则是若是等待队列大小已经到达了限流的速率limit大小了,则说明此时段请求已超速,顾直接拒绝请求。
为了测试限流算法自己耗时状况,先用单线程来测试,设置每秒产生10w的令牌,桶大小为1,平滑拿完这些令牌须要多少时间。
测试代码:
public static void singleCatfishLimit(long permitsRatelimit) throws InterruptedException { RateLimitPermit rateLimitPermit = RateLimitPermit.create(1,permitsRatelimit,1); int hastoken=0; long start =System.nanoTime(); for(int i=0 ; i < permitsRatelimit*1 ; i++){ if(rateLimitPermit.acquire()>=0){ hastoken++; } } System.out.println("catfishLimit use time:"+(NANOSECONDS.toMillis(System.nanoTime()-start-SECONDS.toNanos(1) ) ) + " ms" ); System.out.println("single thread hold Permit:"+hastoken); } public static void main(String[] args) throws Exception { singleCatfishLimit(100000); //guavaLimit(); //multCatfishLimit(2000,10000); }
运行结果:
说明:10w令牌平滑拿完用了115ms,平均每次限流逻辑执行时间1微秒左右,几乎能够忽略不计。
接下来测试多线程状况,设置并发请求2000个,限流qps为10000,测试用时
public static void multCatfishLimit(int threadCount ,long permitsRatelimit) throws InterruptedException { CountDownLatch countDownLatch=new CountDownLatch(threadCount); AtomicInteger hastoken= new AtomicInteger(0); CyclicBarrier cyclicBarrier= new CyclicBarrier(threadCount); RateLimitPermit rateLimitPermit = RateLimitPermit.create(1,permitsRatelimit,1); AtomicLong startTime= new AtomicLong(0); for (int i = 0; i < threadCount; i++) { Thread thread=new Thread(()->{ try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } startTime.compareAndSet(0,System.nanoTime()); for (int j = 0; j < 1; j++) { if( rateLimitPermit.acquire()>=0){ hastoken.incrementAndGet(); } } countDownLatch.countDown(); },"ratelimit-"+i); thread.start(); } countDownLatch.await(); System.out.println("catfishLimit use time:"+ (long)( NANOSECONDS.toMillis( System.nanoTime()-startTime.get() ) -Math.min(hastoken.get()*1.0/permitsRatelimit*1000L, threadCount*1.0/permitsRatelimit * SECONDS.toMillis(1)) )+" ms"); System.out.println("mult thread hold Permit:"+hastoken.get()); } public static void main(String[] args) throws Exception { singleCatfishLimit(100000); //guavaLimit(); multCatfishLimit(2000,10000); }
运行结果:
说明:看到结果会发现,qps为1w时,2000个线程去拿令牌用时127ms,这是为何呢,其实这里经过cyclicBarrier控制并发请求,请求数未到达2000时 进入wait,到达2000时才signalAll,这里换醒线程是有时间差的,很难经过程序控制多线程在同一个时间点同时执行,这里统计出的时间存在偏差。
测qps为100时,2000个请求同时去拿令牌耗时,能拿到多少令牌
public static void main(String[] args) throws Exception { // singleCatfishLimit(100000); //guavaLimit(); multCatfishLimit(2000,100); }
运行结果:
说明:2000个线程却拿到了155个令牌,拿令牌操做用时12ms,总用时1550ms+12ms,为啥没有精确拿到100个令牌,缘由仍是2000个线程未在同一个时间点执行拿令牌操做,经过打印线程唤醒时间发现,2000个线程被唤醒的最大时间差为566ms。
总结:该限流算法特别支持少许限流器,高并发限流,由于算法获取不到令牌会循环往桶投放令牌,若是限流器多致使N个循环投放令牌操做,增长cpu压力
算法流程图以下: