最近在参与一个识别热点数据的需求开发。其中涉及了限流算法相关的内容。因此这里记录一下本身了解的各类限流算法,以及各个限流算法的实现。java
限流算法的应用场景很是普遍,好比经过限流来确保下游配置较差的应用不会被上游应用的大量请求击穿,不管是HTTP请求仍是RPC请求,从而使得服务保持稳定。限流也一样能够用于客户端,好比当咱们须要从微博上爬取数据时,咱们须要在请求中携带token从而经过微博的网关验证。可是微博为了防止服务被单个客户端大量访问,每每会在服务端进行限流,好比多是一个token一个小时只能发起1000次请求。可是爬虫发出的请求一般远远不止这个量级。因此在客户端进行限流能够确保咱们的token不会失效或是查封。面试
限流算法能够从多种角度分类,好比按照处理方式分为两种,一种是在超出限定流量以后会拒绝多余的访问,另外一种是超出限定流量以后,只是报警或者是记录日志,访问仍然正常进行。算法
目前比较常见的限流算法有如下几种:api
本文主要记录一下固定窗口和滑动窗口。令牌桶算法在谷歌的开源guava包中有实现,下次再开一篇文章分享一下。文中错误的地方欢迎指出!若是guava中实现了滑动窗口算法也请告诉我,急需,目前没有找到orz。安全
这是限流算法中最暴力的一种想法。既然咱们但愿某个API在一分钟内只能固定被访问N次(多是出于安全考虑,也多是出于服务器资源的考虑),那么咱们就能够直接统计这一分钟开始对API的访问次数,若是访问次数超过了限定值,则抛弃后续的访问。直到下一分钟开始,再开放对API的访问。服务器
全部的暴力算法的共同点都是容易实现,而固定窗口限流的缺点也一样很明显。假设如今有一个恶意用户在上一分钟的最后一秒和下一分钟的第一秒疯狂的冲击API。按照固定窗口的限流规则,这些请求都可以访问成功,可是在这一秒内,服务将承受超过规定值的访问冲击(这个规定值极可能是服务器可以承受的最大负载),从而致使服务没法稳定提供。并且由于用户在这一秒内耗光了上一分钟和下一分钟的访问定额,从而致使别的用户没法享受正常的服务,对于服务提供方来讲是彻底不能接收的。微信
这里本身作了一个简单的实现:多线程
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class FixedWindowRateLimiter implements RateLimiter, Runnable { private static final int DEFAULT_ALLOWED_VISIT_PER_SECOND = 5; private final int maxVisitPerSecond; private AtomicInteger count; FixedWindowRateLimiter(){ this.maxVisitPerSecond = DEFAULT_ALLOWED_VISIT_PER_SECOND; this.count = new AtomicInteger(); } FixedWindowRateLimiter(int maxVisitPerSecond) { this.maxVisitPerSecond = maxVisitPerSecond; this.count = new AtomicInteger(); } @Override public boolean isOverLimit() { return currentQPS() > maxVisitPerSecond; } @Override public int currentQPS() { return count.get(); } @Override public boolean visit() { count.incrementAndGet(); System.out.print(isOverLimit()); return isOverLimit(); } @Override public void run() { System.out.println(this.currentQPS()); count.set(0); } public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); FixedWindowRateLimiter rateLimiter = new FixedWindowRateLimiter(); scheduledExecutorService.scheduleAtFixedRate(rateLimiter, 0, 1, TimeUnit.SECONDS); new Thread(new Runnable() { @Override public void run() { while(true) { rateLimiter.visit(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { while(true) { rateLimiter.visit(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
其中RateLimiter是一个通用的接口,后面的其它限流算法也会实现该接口:dom
public interface RateLimiter { boolean isOverLimit(); int currentQPS(); boolean visit(); }
也能够不使用多线程的方式实现,更加简单高效:ide
public class FixedWindowRateLimiterWithoutMultiThread implements RateLimiter { private Long lastVisitAt = System.currentTimeMillis(); private static final int DEFAULT_ALLOWED_VISIT_PER_SECOND = 5; private final int maxVisitPerSecond; private AtomicInteger count; public FixedWindowRateLimiterWithoutMultiThread(int maxVisitPerSecond){ this.maxVisitPerSecond = maxVisitPerSecond; this.count = new AtomicInteger(); } public FixedWindowRateLimiterWithoutMultiThread() { this(DEFAULT_ALLOWED_VISIT_PER_SECOND); } @Override public boolean isOverLimit() { return count.get() > maxVisitPerSecond; } @Override public int currentQPS() { return count.get(); } @Override public boolean visit() { long now = System.currentTimeMillis(); synchronized (lastVisitAt) { if (now - lastVisitAt > 1000) { lastVisitAt = now; System.out.println(currentQPS()); count.set(1); } } count.incrementAndGet(); return isOverLimit(); } public static void main(String[] args) { RateLimiter rateLimiter = new FixedWindowRateLimiterWithoutMultiThread(); new Thread(new Runnable() { @Override public void run() { while(true) { rateLimiter.visit(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { while(true) { rateLimiter.visit(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
固定窗口就像是滑动窗口的一个特例。滑动窗口将固定窗口再等分为多个小的窗口,每一次对一个小的窗口进行流量控制。这种方法能够很好的解决以前的临界问题。
这里找的网上一个图,假设咱们将1s划分为4个窗口,则每一个窗口对应250ms。假设恶意用户仍是在上一秒的最后一刻和下一秒的第一刻冲击服务,按照滑动窗口的原理,此时统计上一秒的最后750毫秒和下一秒的前250毫秒,这种方式可以判断出用户的访问依旧超过了1s的访问数量,所以依然会阻拦用户的访问。
使用定时任务实现的滑动窗口代码以下:
public class SlidingWindowRateLimiter implements RateLimiter, Runnable{ private final long maxVisitPerSecond; private static final int DEFAULT_BLOCK = 10; private final int block; private final AtomicLong[] countPerBlock; private AtomicLong count; private volatile int index; public SlidingWindowRateLimiter(int block, long maxVisitPerSecond) { this.block = block; this.maxVisitPerSecond = maxVisitPerSecond; countPerBlock = new AtomicLong[block]; for (int i = 0 ; i< block ; i++) { countPerBlock[i] = new AtomicLong(); } count = new AtomicLong(0); } public SlidingWindowRateLimiter() { this(DEFAULT_BLOCK, DEFAULT_ALLOWED_VISIT_PER_SECOND); } @Override public boolean isOverLimit() { return currentQPS() > maxVisitPerSecond; } @Override public long currentQPS() { return count.get(); } @Override public boolean visit() { countPerBlock[index].incrementAndGet(); count.incrementAndGet(); return isOverLimit(); } @Override public void run() { System.out.println(isOverLimit()); System.out.println(currentQPS()); System.out.println("index:" + index); index = (index + 1) % block; long val = countPerBlock[index].getAndSet(0); count.addAndGet(-val); } public static void main(String[] args) { SlidingWindowRateLimiter slidingWindowRateLimiter = new SlidingWindowRateLimiter(10, 1000); ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); scheduledExecutorService.scheduleAtFixedRate(slidingWindowRateLimiter, 100, 100, TimeUnit.MILLISECONDS); new Thread(new Runnable() { @Override public void run() { while (true) { slidingWindowRateLimiter.visit(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { while (true) { slidingWindowRateLimiter.visit(); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
Protect Your API Resources with Rate Limiting
想要了解更多开发技术,面试教程以及互联网公司内推,欢迎关注个人微信公众号!将会不按期的发放福利哦~