在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流算法
缓存
缓存的目的是提高系统访问速度和增大系统处理容量降级
降级是当服务出现问题或者影响到核心流程时,须要暂时屏蔽掉,待高峰或者问题解决后再打开限流
限流的目的是经过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则能够拒绝服务、排队或等待、降级等处理漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以必定的速度出水,当水流入速度过大会直接溢出,能够看出漏桶算法能强行限制数据的传输速率。spring
对于不少应用场景来讲,除了要求可以限制数据的平均传输速率外,还要求容许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而若是请求须要被处理,则须要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。express
Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方便,并且十分高效。缓存
Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提高直到维持在一个稳定值) 两种模式实现思路相似,主要区别在等待时间的计算上,本篇重点介绍SmoothBursty安全
public static RateLimiter create(double permitsPerSecond) { /* * 默认的RateLimiter配置能够保存最多一秒钟的未使用许可证 */ return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond); }
RateLimiter是一个抽象类,SmoothBursty是其子类SmoothRateLimiter的子类,其两个构造参数含义以下服务器
@VisibleForTesting static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); //根据每秒向桶中放入令牌的数量来设置当前存储令牌数 rateLimiter.setRate(permitsPerSecond); return rateLimiter; }
public final void setRate(double permitsPerSecond) { //若是每秒向桶中放入令牌的数量(permitsPerSecond)大于0且为数字,经过检查,不然抛出参数异常 checkArgument( permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive"); //对每一个线程进行互斥,创建互斥对象的锁定 synchronized (mutex()) { //由各项参数更新当前存储令牌数 doSetRate(permitsPerSecond, stopwatch.readMicros()); } }
public static void checkArgument(boolean expression, @Nullable Object errorMessage) { if (!expression) { throw new IllegalArgumentException(String.valueOf(errorMessage)); } }
private volatile Object mutexDoNotUseDirectly; //线程安全的互斥对象
private Object mutex() { Object mutex = mutexDoNotUseDirectly; if (mutex == null) { synchronized (this) { mutex = mutexDoNotUseDirectly; if (mutex == null) { mutexDoNotUseDirectly = mutex = new Object(); } } } return mutex; }
在SmoothBursty中并发
@Override final void doSetRate(double permitsPerSecond, long nowMicros) { //若当前时间晚于nextFreeTicketMicros,则计算该段时间内能够生成多少令牌,将生成的令牌加入令牌桶中并更新数据 resync(nowMicros); //更新添加1个令牌的时间间隔(单位微妙)为1000000微妙(1秒)除以每秒放入令牌桶中的数量 double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; //将令牌桶中能够存储令牌的时间参数加上更新当前能够存储的令牌数 doSetRate(permitsPerSecond, stableIntervalMicros); }
private long nextFreeTicketMicros = 0L; //下一次请求能够获取令牌的起始时间
double storedPermits; //当前存储令牌数
double maxPermits; //最大存储令牌数 = maxBurstSeconds * stableIntervalMicros
double stableIntervalMicros; //添加令牌时间间隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌数)
final double maxBurstSeconds; //在RateLimiter未使用时,最多存储几秒的令牌
private void resync(long nowMicros) { //若是当前时间大于下一次请求能够获取令牌的起始时间 if (nowMicros > nextFreeTicketMicros) { //比较最大存储令牌数和当前存储的令牌数加上如今要增长的令牌数的大小,小的那个赋给当年存储令牌数,即增长令牌数与当前令牌数之和不能大于最大令牌数 storedPermits = min(maxPermits, storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros); //将当前时间赋给下一次请求能够获取的起始时间 nextFreeTicketMicros = nowMicros; } }
@Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { //将最大存储令牌数存入临时副本 double oldMaxPermits = this.maxPermits; //更新最大存储令牌数为存放令牌的秒数乘以每秒向桶中放入的令牌数 maxPermits = maxBurstSeconds * permitsPerSecond; //若是最大存储令牌数的临时副本为正无穷大 if (oldMaxPermits == Double.POSITIVE_INFINITY) { //更新当前存储令牌数为最大存储令牌数 storedPermits = maxPermits; } else { //若是最大存储令牌数的临时副本不为正无穷大 //若是最大存储令牌数的临时副本为0,则更新当前存储令牌数为0,不然 //更新当前存储令牌数为当前存储令牌数乘以最大存储令牌数除以最大存储令牌数的临时副本数 storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } }
咱们再来看一下RateLimiter的tryAcquire方法app
public boolean tryAcquire(long timeout, TimeUnit unit) { //尝试在timeout时间内获取令牌,若是能够则挂起(睡眠)等待相应时间并返回true,不然当即返回false return tryAcquire(1, timeout, unit); }
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { //取等待时间的微妙数与0比较取大值赋给超时时间 long timeoutMicros = max(unit.toMicros(timeout), 0); //若是检查时间>0,经过检查,此处为1 checkPermits(permits); long microsToWait; //创建互斥对象加锁互斥 synchronized (mutex()) { //获取当前时间 long nowMicros = stopwatch.readMicros(); //若是下一次请求能够获取令牌的起始时间减去等待时间大于当前时间 if (!canAcquire(nowMicros, timeoutMicros)) { return false; //返回false } else { //若是下一次请求能够获取令牌的起始时间减去等待时间小于等于当前时间 //获取下一次请求能够获取令牌的起始时间减去当前时间的值与0之间的大值并刷新各参数(下一次请求能够获取令牌的起始时间、当前存储令牌数) microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } //线程休眠microsToWait时间 stopwatch.sleepMicrosUninterruptibly(microsToWait); //返回true return true; }
private static int checkPermits(int permits) { checkArgument(permits > 0, "Requested permits (%s) must be positive", permits); return permits; }
final Stopwatch stopwatch = Stopwatch.createStarted();
@Override long readMicros() { return stopwatch.elapsed(MICROSECONDS); }
private boolean canAcquire(long nowMicros, long timeoutMicros) { //返回下一次请求能够获取令牌的起始时间减去等待时间是否小于等于当前时间 return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; }
final long reserveAndGetWaitLength(int permits, long nowMicros) { //获取下一次请求能够获取令牌的起始时间并更新各参数(下一次请求能够获取令牌的起始时间、当前存储令牌数) long momentAvailable = reserveEarliestAvailable(permits, nowMicros); //返回下一次请求能够获取令牌的起始时间减去当前时间的值与0之间的大值 return max(momentAvailable - nowMicros, 0); }
@Override void sleepMicrosUninterruptibly(long micros) { if (micros > 0) { Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS); } }
在SmoothBursty中ide
@Override final long queryEarliestAvailable(long nowMicros) { //返回下一次请求能够获取令牌的起始时间 return nextFreeTicketMicros; }
@Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { //若当前时间晚于nextFreeTicketMicros,则计算该段时间内能够生成多少令牌,将生成的令牌加入令牌桶中并更新数据 resync(nowMicros); //获取下一次请求能够获取令牌的起始时间 long returnValue = nextFreeTicketMicros; //在容许的请求数(这里为1)和当前存储令牌数间取小值赋给容许消费的存储令牌数(storedPermitsToSpend) double storedPermitsToSpend = min(requiredPermits, this.storedPermits); //将容许的请求数减去容许消费的存储令牌数赋给容许刷新数(freshPermits) double freshPermits = requiredPermits - storedPermitsToSpend; //将容许刷新数乘以添加令牌时间间隔赋给等待微妙数(waitMicros) long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); //更新下一次请求能够获取令牌的起始时间为下一次请求能够获取令牌的起始时间加上等待微妙数 this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros; //更新当前存储令牌数为当前存储令牌数减去容许消费的存储令牌数 this.storedPermits -= storedPermitsToSpend; return returnValue; }
@Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { return 0L; }
在Uninterruptibles中spring-boot
public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) { //定义是否已中断为false boolean interrupted = false; try { //将下一次请求能够获取令牌的起始时间减去当前时间的值转化为纳秒定义为remainingNanos long remainingNanos = unit.toNanos(sleepFor); //将系统的纳秒值加上该转化值为end long end = System.nanoTime() + remainingNanos; while (true) { try { //线程休眠remainingNanos时间 NANOSECONDS.sleep(remainingNanos); return; } catch (InterruptedException e) { //若是发生中断异常,将是否已中断更新为true interrupted = true; //更新remainingNanos为end减去系统的纳秒值,并进入下一轮循环 remainingNanos = end - System.nanoTime(); } } } finally { //若是发生中断异常 if (interrupted) { //当前线程中断 Thread.currentThread().interrupt(); } } }
源码分析就是这些了,如今咱们来看一下Guava RateLimiter的应用,在APO中拦截Controller,并进行限流
在pom中添加
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency>
标签
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface LxRateLimit { /** * * @return */ String value() default ""; /** * 每秒向桶中放入令牌的数量 默认最大即不作限流 * @return */ double perSecond() default Double.MAX_VALUE; /** * 获取令牌的等待时间 默认0 * @return */ int timeOut() default 0; /** * 超时时间单位 * @return */ TimeUnit timeOutUnit() default TimeUnit.MILLISECONDS; }
AOP类
@Slf4j @Aspect @Component public class LxRateLimitAspect { private RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE); /** * 带有指定注解切入 */ @ResponseBody @Around(value = "@annotation(com.guanjian.annotation.LxRateLimit)") public Object aroundNotice(ProceedingJoinPoint pjp) throws Throwable { log.info("拦截到了{}方法...", pjp.getSignature().getName()); Signature signature = pjp.getSignature(); MethodSignature methodSignature = (MethodSignature)signature; //获取目标方法 Method targetMethod = methodSignature.getMethod(); if (targetMethod.isAnnotationPresent(LxRateLimit.class)) { //获取目标方法的@LxRateLimit注解 LxRateLimit lxRateLimit = targetMethod.getAnnotation(LxRateLimit.class); rateLimiter.setRate(lxRateLimit.perSecond()); if (!rateLimiter.tryAcquire(lxRateLimit.timeOut(), lxRateLimit.timeOutUnit())) return "服务器繁忙,请稍后再试!"; } return pjp.proceed(); } }
Controller
@RestController public class AnnotationTestController { @GetMapping("/testannotation") @LxRateLimit(perSecond = 2000.0, timeOut = 500) //此处限速为2000qps public String testAnnotation() { return "get token success"; } }
咱们先在Controller中将@LxRateLimit(perSecond = 2000.0, timeOut = 500)注释掉
运行Jmeter进行压测
咱们启用500线程压测
压测结果
吞吐量为7867.8qps,此时是不限速的
如今咱们恢复Controller中的@LxRateLimit(perSecond = 2000.0, timeOut = 500)
吞吐量为2067.7qps
系统日志能够看到大量的拦截
2019-05-26 21:24:33.370 INFO 11092 --- [o-8080-exec-176] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.370 INFO 11092 --- [io-8080-exec-27] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.374 INFO 11092 --- [o-8080-exec-128] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.374 INFO 11092 --- [o-8080-exec-191] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.374 INFO 11092 --- [io-8080-exec-23] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.377 INFO 11092 --- [io-8080-exec-36] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.379 INFO 11092 --- [o-8080-exec-123] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.379 INFO 11092 --- [io-8080-exec-61] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.380 INFO 11092 --- [io-8080-exec-19] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.382 INFO 11092 --- [io-8080-exec-77] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法... 2019-05-26 21:24:33.384 INFO 11092 --- [io-8080-exec-23] com.guanjian.aop.LxRateLimitAspect : 拦截到了testAnnotation方法...