高并发系统限流-漏桶算法和令牌桶算法

高并发系统限流-漏桶算法和令牌桶算法html

 

 参考:java

http://www.cnblogs.com/LBSer/p/4083131.htmlnode

https://blog.csdn.net/scorpio3k/article/details/53103239linux

https://www.cnblogs.com/clds/p/5850070.htmlnginx

http://jinnianshilongnian.iteye.com/blog/2305117git

http://iamzhongyong.iteye.com/blog/1742829github

 

 

 

1、问题描述  

  某天A君忽然发现本身的接口请求量忽然涨到以前的10倍,没多久该接口几乎不可以使用,并引起连锁反应致使整个系统崩溃。如何应对这种状况呢?生活给了咱们答案:好比老式电闸都安装了保险丝,一旦有人使用超大功率的设备,保险丝就会烧断以保护各个电器不被强电流给烧坏。同理咱们的接口也须要安装上“保险丝”,以防止非预期的请求对系统压力过大而引发的系统瘫痪,当流量过大时,能够采起拒绝或者引流等机制。 web

2、经常使用的限流算法

      经常使用的限流算法有两种:漏桶算法和令牌桶算法redis

      漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以必定的速度出水,当水流入速度过大会直接溢出,能够看出漏桶算法能强行限制数据的传输速率。算法

图1 漏桶算法示意图

      对于不少应用场景来讲,除了要求可以限制数据的平均传输速率外,还要求容许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。如图2所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而若是请求须要被处理,则须要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

图2 令牌桶算法示意图

3、限流工具类RateLimiter

   Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法来完成限流,很是易于使用。RateLimiter类的接口描述请参考:RateLimiter接口描述,具体使用请参考:RateLimiter使用实践

      下面是主要源码:

复制代码
public double acquire() {
        return acquire(1);
    }

 public double acquire(int permits) {
        checkPermits(permits);  //检查参数是否合法(是否大于0)
        long microsToWait;
        synchronized (mutex) { //应对并发状况须要同步
            microsToWait = reserveNextTicket(permits, readSafeMicros()); //得到须要等待的时间 
        }
        ticker.sleepMicrosUninterruptibly(microsToWait); //等待,当未达到限制时,microsToWait为0
        return 1.0 * microsToWait / TimeUnit.SECONDS.toMicros(1L);
    }

private long reserveNextTicket(double requiredPermits, long nowMicros) {
        resync(nowMicros); //补充令牌
        long microsToNextFreeTicket = nextFreeTicketMicros - nowMicros;
        double storedPermitsToSpend = Math.min(requiredPermits, this.storedPermits); //获取此次请求消耗的令牌数目
        double freshPermits = requiredPermits - storedPermitsToSpend;

        long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
                + (long) (freshPermits * stableIntervalMicros); 

        this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
        this.storedPermits -= storedPermitsToSpend; // 减去消耗的令牌
        return microsToNextFreeTicket;
    }

private void resync(long nowMicros) {
        // if nextFreeTicket is in the past, resync to now
        if (nowMicros > nextFreeTicketMicros) {
            storedPermits = Math.min(maxPermits,
                    storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
            nextFreeTicketMicros = nowMicros;
        }
    }
复制代码

 

 

 

 

 

 

 

服务治理---限流(令牌桶算法)

一、最近在写一个分布式服务的框架,对于分布式服务的框架来讲,除了远程调用,还要进行服务的治理

当进行促销的时候,全部的资源都用来完成重要的业务,就好比双11的时候,主要的业务就是让用户查询商品,以及购买支付,

此时,金币查询、积分查询等业务就是次要的,所以要对这些服务进行服务的降级,典型的服务降级算法是采用令牌桶算法,

所以在写框架的时候去研究了一下令牌桶算法

 

二、在实施QOS策略时,能够将用户的数据限制在特定的带宽,当用户的流量超过额定带宽时,超过的带宽将采起其它方式来处理。

要衡量流量是否超过额定的带宽,网络设备并非采用单纯的数字加减法来决定的,也就是说,好比带宽为100K,而用户发来

的流量为110K,网络设备并非靠110K减去100K等于10K,就认为用户超过流量10K。网络设备衡量流量是否超过额定带宽,

须要使用令牌桶算法来计算。下面详细介绍令牌桶算法机制:

    当网络设备衡量流量是否超过额定带宽时,须要查看令牌桶,而令牌桶中会放置必定数量的令牌,一个令牌容许接口发送

  或接收1bit数据(有时是1 Byte数据),当接口经过1bit数据后,同时也要从桶中移除一个令牌。当桶里没有令牌的时候,任何流

  量都被视为超过额定带宽,只有当桶中有令牌时,数据才能够经过接口。令牌桶中的令牌不只仅能够被移除,一样也能够往里添加,

  因此为了保证接口随时有数据经过,就必须不停地往桶里加令牌,因而可知,往桶里加令牌的速度,就决定了数据经过接口的速度。

  所以,咱们经过控制往令牌桶里加令牌的速度从而控制用户流量的带宽。而设置的这个用户传输数据的速率被称为承诺信息速率(CIR),

  一般以秒为单位。好比咱们设置用户的带宽为1000  bit每秒,只要保证每秒钟往桶里添加1000个令牌便可。

 

三、举例:

    将CIR设置为8000  bit/s,那么就必须每秒将8000个令牌放入桶中,当接口有数据经过时,就从桶中移除相应的令牌,每经过1  bit,

  就从桶中移除1个令牌。当桶里没有令牌的时候,任何流量都被视为超出额定带宽,而超出的流量就要采起额外动做。每秒钟往桶里加的令牌

  就决定了用户流量的速率,这个速率就是CIR,可是每秒钟须要往桶里加的令牌总数,并非一次性加完的,一次性加进的令牌数量被称为Burst  size(Bc),

  若是Bc只是CIR的一半,那么很明显每秒钟就须要往桶里加两次令牌,每次加的数量老是Bc的数量。还有就是加令牌的时间,Time interval(Tc),

  Tc表示多久该往桶里加一次令牌,而这个时间并不能手工设置,由于这个时间能够靠CIR和Bc的关系计算获得,  Bc/ CIR= Tc。

 

四、令牌桶算法图例

      

    a. 按特定的速率向令牌桶投放令牌

    b. 根据预设的匹配规则先对报文进行分类,不符合匹配规则的报文不须要通过令牌桶的处理,直接发送;

    c. 符合匹配规则的报文,则须要令牌桶进行处理。当桶中有足够的令牌则报文能够被继续发送下去,同时令牌桶中的令牌 量按报文的长度作相应的减小;

    d. 当令牌桶中的令牌不足时,报文将不能被发送,只有等到桶中生成了新的令牌,报文才能够发送。这就能够限制报文的流量只能是小于等于令牌生成的速度,达到限制流量的目的。

 

五、Java参考代码:

复制代码
package com.netease.datastream.util.flowcontrol;

import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;


/**
 * <pre>
 * Created by inter12 on 15-3-18.
 * </pre>
 */
public class TokenBucket {

    // 默认桶大小个数 即最大瞬间流量是64M
    private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;

    // 一个桶的单位是1字节
    private int everyTokenSize = 1;

    // 瞬间最大流量
    private int maxFlowRate;

    // 平均流量
    private int avgFlowRate;

    // 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 *
    // 1024 * 64
    private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(
            DEFAULT_BUCKET_SIZE);

    private ScheduledExecutorService scheduledExecutorService = Executors
            .newSingleThreadScheduledExecutor();

    private volatile boolean isStart = false;

    private ReentrantLock lock = new ReentrantLock(true);

    private static final byte A_CHAR = 'a';

    public TokenBucket() {
    }

    public TokenBucket(int maxFlowRate, int avgFlowRate) {
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }

    public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
        this.everyTokenSize = everyTokenSize;
        this.maxFlowRate = maxFlowRate;
        this.avgFlowRate = avgFlowRate;
    }

    public void addTokens(Integer tokenNum) {

        // 如果桶已经满了,就再也不家如新的令牌
        for (int i = 0; i < tokenNum; i++) {
            tokenQueue.offer(Byte.valueOf(A_CHAR));
        }
    }

    public TokenBucket build() {

        start();
        return this;
    }

    /**
     * 获取足够的令牌个数
     * 
     * @return
     */
    public boolean getTokens(byte[] dataSize) {

//        Preconditions.checkNotNull(dataSize);
//        Preconditions.checkArgument(isStart,
//                "please invoke start method first !");

        int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数

        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
            if (!result) {
                return false;
            }

            int tokenCount = 0;
            for (int i = 0; i < needTokenNum; i++) {
                Byte poll = tokenQueue.poll();
                if (poll != null) {
                    tokenCount++;
                }
            }

            return tokenCount == needTokenNum;
        } finally {
            lock.unlock();
        }
    }

    public void start() {

        // 初始化桶队列大小
        if (maxFlowRate != 0) {
            tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
        }

        // 初始化令牌生产者
        TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
        scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1,
                TimeUnit.SECONDS);
        isStart = true;

    }

    public void stop() {
        isStart = false;
        scheduledExecutorService.shutdown();
    }

    public boolean isStarted() {
        return isStart;
    }

    class TokenProducer implements Runnable {

        private int avgFlowRate;
        private TokenBucket tokenBucket;

        public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
            this.avgFlowRate = avgFlowRate;
            this.tokenBucket = tokenBucket;
        }

        @Override
        public void run() {
            tokenBucket.addTokens(avgFlowRate);
        }
    }

    public static TokenBucket newBuilder() {
        return new TokenBucket();
    }

    public TokenBucket everyTokenSize(int everyTokenSize) {
        this.everyTokenSize = everyTokenSize;
        return this;
    }

    public TokenBucket maxFlowRate(int maxFlowRate) {
        this.maxFlowRate = maxFlowRate;
        return this;
    }

    public TokenBucket avgFlowRate(int avgFlowRate) {
        this.avgFlowRate = avgFlowRate;
        return this;
    }

    private String stringCopy(String data, int copyNum) {

        StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);

        for (int i = 0; i < copyNum; i++) {
            sbuilder.append(data);
        }

        return sbuilder.toString();

    }

    public static void main(String[] args) throws IOException,
            InterruptedException {

        tokenTest();
    }

    private static void arrayTest() {
        ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(
                10);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        tokenQueue.offer(1);
        System.out.println(tokenQueue.size());
        System.out.println(tokenQueue.remainingCapacity());
    }

    private static void tokenTest() throws InterruptedException, IOException {
        TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512)
                .maxFlowRate(1024).build();

        BufferedWriter bufferedWriter = new BufferedWriter(
                new OutputStreamWriter(new FileOutputStream("D:/ds_test")));
        String data = "xxxx";// 四个字节
        for (int i = 1; i <= 1000; i++) {

            Random random = new Random();
            int i1 = random.nextInt(100);
            boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data,
                    i1).getBytes());
            TimeUnit.MILLISECONDS.sleep(100);
            if (tokens) {
                bufferedWriter.write("token pass --- index:" + i1);
                System.out.println("token pass --- index:" + i1);
            } else {
                bufferedWriter.write("token rejuect --- index" + i1);
                System.out.println("token rejuect --- index" + i1);
            }

            bufferedWriter.newLine();
            bufferedWriter.flush();
        }

        bufferedWriter.close();
    }

}
复制代码

 

 

 

 

 

 

高并发系统限流中的漏桶算法和令牌桶算法,经过流量整形和速率限制提高稳定性

在大数据量高并发访问时,常常会出现服务或接口面对暴涨的请求而不可用的状况,甚至引起连锁反映致使整个系统崩溃。此时你须要使用的技术手段之一就是限流,当请求达到必定的并发数或速率,就进行等待、排队、降级、拒绝服务等。在限流时,常见的两种算法是漏桶和令牌桶算法算法,本文即对相关内容进行重点介绍。

 

1、漏桶和令牌桶算法的概念

漏桶算法(Leaky Bucket):主要目的是控制数据注入到网络的速率,平滑网络上的突发流量。漏桶算法提供了一种机制,经过它,突发流量能够被整形以便为网络提供一个稳定的流量。漏桶算法的示意图以下:


请求先进入到漏桶里,漏桶以必定的速度出水,当水请求过大会直接溢出,能够看出漏桶算法能强行限制数据的传输速率。

 

令牌桶算法(Token Bucket):是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型状况下,令牌桶算法用来控制发送到网络上的数据的数目,并容许突发数据的发送。令牌桶算法示意图以下所示:


大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。若是令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中能够保存的最大令牌数永远不会超过桶的大小。

 

2、两种算法的区别

二者主要区别在于“漏桶算法”可以强行限制数据的传输速率,而“令牌桶算法”在可以限制数据的平均传输速率外,还容许某种程度的突发传输。在“令牌桶算法”中,只要令牌桶中存在令牌,那么就容许突发地传输数据直到达到用户配置的门限,因此它适合于具备突发特性的流量。

 

3、使用Guava的RateLimiter进行限流控制

Guava是google提供的java扩展类库,其中的限流工具类RateLimiter采用的就是令牌桶算法。RateLimiter 从概念上来说,速率限制器会在可配置的速率下分配许可证,若是必要的话,每一个acquire() 会阻塞当前线程直到许可证可用后获取该许可证,一旦获取到许可证,不须要再释放许可证。通俗的讲RateLimiter会按照必定的频率往桶里扔令牌,线程拿到令牌才能执行,好比你但愿本身的应用程序QPS不要超过1000,那么RateLimiter设置1000的速率后,就会每秒往桶里扔1000个令牌。例如咱们须要处理一个任务列表,但咱们不但愿每秒的任务提交超过两个,此时能够采用以下方式:


有一点很重要,那就是请求的许可数历来不会影响到请求自己的限制(调用acquire(1) 和调用acquire(1000) 将获得相同的限制效果,若是存在这样的调用的话),但会影响下一次请求的限制,也就是说,若是一个高开销的任务抵达一个空闲的RateLimiter,它会被立刻许可,可是下一个请求会经历额外的限制,从而来偿付高开销任务。注意:RateLimiter 并不提供公平性的保证。

 

4、使用Semphore进行并发流控

Java 并发库的Semaphore 能够很轻松完成信号量控制,Semaphore能够控制某个资源可被同时访问的个数,经过 acquire() 获取一个许可,若是没有就等待,而 release() 释放一个许可。单个信号量的Semaphore对象能够实现互斥锁的功能,而且能够是由一个线程得到了“锁”,再由另外一个线程释放“锁”,这可应用于死锁恢复的一些场合。下面的Demo中申明了一个只有5个许可的Semaphore,而有20个线程要访问这个资源,经过acquire()和release()获取和释放访问许可:



最后:进行限流控制还能够有不少种方法,针对不一样的场景各有优劣,例如经过AtomicLong计数器控制、使用MQ消息队列进行流量消峰等等。

 

 

 

 

 

 

接口限流算法总结

背景

曾经在一个大神的博客里看到这样一句话:在开发高并发系统时,有三把利器用来保护系统:缓存、降级和限流。那么何为限流呢?顾名思义,限流就是限制流量,就像你宽带包了1个G的流量,用完了就没了。经过限流,咱们能够很好地控制系统的qps,从而达到保护系统的目的。本篇文章将会介绍一下经常使用的限流算法以及他们各自的特色。

算法介绍

计数器法

计 数器法是限流算法里最简单也是最容易实现的一种算法。好比咱们规定,对于A接口来讲,咱们1分钟的访问次数不能超过100个。那么咱们能够这么作:在一开 始的时候,咱们能够设置一个计数器counter,每当一个请求过来的时候,counter就加1,若是counter的值大于100而且该请求与第一个 请求的间隔时间还在1分钟以内,那么说明请求数过多;若是该请求与第一个请求的间隔时间大于1分钟,且counter的值还在限流范围内,那么就重置 counter,具体算法的示意图以下:

2016-09-01_20:31:28.jpg

具体的伪代码以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CounterDemo {
public long timeStamp = getNowTime();
public int reqCount = 0;
public final int limit = 100; // 时间窗口内最大请求数
public final long interval = 1000; // 时间窗口ms
public boolean grant() {
long now = getNowTime();
if (now < timeStamp + interval) {
// 在时间窗口内
reqCount++;
// 判断当前时间窗口内是否超过最大请求控制数
return reqCount <= limit;
}
else {
timeStamp = now;
// 超时后重置
reqCount =  1;
return true;
}
}
}

这个算法虽然简单,可是有一个十分致命的问题,那就是临界问题,咱们看下图:

2016-09-01_20:35:21.jpg

从上图中咱们能够看到,假设有一个恶意用户,他在0:59时,瞬间发送了100个请求,而且1:00又瞬间发送了100个请求,那么其实这个用户在 1秒里面,瞬间发送了200个请求。咱们刚才规定的是1分钟最多100个请求,也就是每秒钟最多1.7个请求,用户经过在时间窗口的重置节点处突发请求, 能够瞬间超过咱们的速率限制。用户有可能经过算法的这个漏洞,瞬间压垮咱们的应用。

聪明的朋友可能已经看出来了,刚才的问题实际上是由于咱们统计的精度过低。那么如何很好地处理这个问题呢?或者说,如何将临界问题的影响下降呢?咱们能够看下面的滑动窗口算法。

滑动窗口

滑动窗口,又称rolling window。为了解决这个问题,咱们引入了滑动窗口算法。若是学过TCP网络协议的话,那么必定对滑动窗口这个名词不会陌生。下面这张图,很好地解释了滑动窗口算法:

2016-09-01_20:42:46.jpg

在上图中,整个红色的矩形框表示一个时间窗口,在咱们的例子中,一个时间窗口就是一分钟。而后咱们将时间窗口进行划分,好比图中,咱们就将滑动窗口 划成了6格,因此每格表明的是10秒钟。每过10秒钟,咱们的时间窗口就会往右滑动一格。每个格子都有本身独立的计数器counter,好比当一个请求 在0:35秒的时候到达,那么0:30~0:39对应的counter就会加1。

那么滑动窗口怎么解决刚才的临界问题的呢?咱们能够看上图,0:59到达的100个请求会落在灰色的格子中,而1:00到达的请求会落在橘黄色的格 子中。当时间到达1:00时,咱们的窗口会往右移动一格,那么此时时间窗口内的总请求数量一共是200个,超过了限定的100个,因此此时可以检测出来触 发了限流。

我再来回顾一下刚才的计数器算法,咱们能够发现,计数器算法其实就是滑动窗口算法。只是它没有对时间窗口作进一步地划分,因此只有1格。

因而可知,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。

漏桶算法

漏桶算法,又称leaky bucket。为了理解漏桶算法,咱们看一下维基百科上的对于该算法的示意图:

2016-09-02_09:57:32.jpg

从图中咱们能够看到,整个算法其实十分简单。首先,咱们有一个固定容量的桶,有水流进来,也有水流出去。对于流进来的水来讲,咱们没法预计一共有多 少水会流进来,也没法预计水流的速度。可是对于流出去的水来讲,这个桶能够固定水流出的速率。并且,当桶满了以后,多余的水将会溢出。

咱们将算法中的水换成实际应用中的请求,咱们能够看到漏桶算法天生就限制了请求的速度。当使用了漏桶算法,咱们能够保证接口会以一个常速速率来处理请求。因此漏桶算法天生不会出现临界问题。具体的伪代码实现以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class LeakyDemo {
public long timeStamp = getNowTime();
public int capacity; // 桶的容量
public int rate; // 水漏出的速度
public int water; // 当前水量(当前累积请求数)
public boolean grant() {
long now = getNowTime();
water = max( 0, water - (now - timeStamp) * rate); // 先执行漏水,计算剩余水量
timeStamp = now;
if ((water + 1) < capacity) {
// 尝试加水,而且水还未满
water +=  1;
return true;
}
else {
// 水满,拒绝加水
return false;
}
}
}

令牌桶算法

令牌桶算法,又称token bucket。为了理解该算法,咱们再来看一下维基百科上对该算法的示意图:

2016-09-02_10:10:24.jpg

从图中咱们能够看到,令牌桶算法比漏桶算法稍显复杂。首先,咱们有一个固定容量的桶,桶里存放着令牌(token)。桶一开始是空的,token以 一个固定的速率r往桶里填充,直到达到桶的容量,多余的令牌将会被丢弃。每当一个请求过来时,就会尝试从桶里移除一个令牌,若是没有令牌的话,请求没法通 过。

具体的伪代码实现以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TokenBucketDemo {
public long timeStamp = getNowTime();
public int capacity; // 桶的容量
public int rate; // 令牌放入速度
public int tokens; // 当前令牌数量
public boolean grant() {
long now = getNowTime();
// 先添加令牌
tokens = min(capacity, tokens + (now - timeStamp) * rate);
timeStamp = now;
if (tokens < 1) {
// 若不到1个令牌,则拒绝
return false;
}
else {
// 还有令牌,领取令牌
tokens -=  1;
return true;
}
}
}

相关变种

若仔细研究算法,咱们会发现咱们默认从桶里移除令牌是不须要耗费时间的。若是给移除令牌设置一个延时时间,那么实际上又采用了漏桶算法的思路。Google的guava库下的SmoothWarmingUp类就采用了这个思路。

临界问题

我 们再来考虑一下临界问题的场景。在0:59秒的时候,因为桶内积满了100个token,因此这100个请求能够瞬间经过。可是因为token是以较低的 速率填充的,因此在1:00的时候,桶内的token数量不可能达到100个,那么此时不可能再有100个请求经过。因此令牌桶算法能够很好地解决临界问 题。下图比较了计数器(左)和令牌桶算法(右)在临界点的速率变化。咱们能够看到虽然令牌桶算法容许突发速率,可是下一个突发速率必需要等桶内有足够的 token后才能发生:

2016-09-02_14:40:58.jpg

总结

计数器 VS 滑动窗口

计数器算法是最简单的算法,能够当作是滑动窗口的低精度实现。滑动窗口因为须要存储多份的计数器(每个格子存一份),因此滑动窗口在实现上须要更多的存储空间。也就是说,若是滑动窗口的精度越高,须要的存储空间就越大。

漏桶算法 VS 令牌桶算法

漏桶算法和令牌桶算法最明显的区别是令牌桶算法容许流量必定程度的突发。由于默认的令牌桶算法,取走token是不须要耗费时间的,也就是说,假设桶内有100个token时,那么能够瞬间容许100个请求经过。

令牌桶算法因为实现简单,且容许某些流量的突发,对用户友好,因此被业界采用地较多。固然咱们须要具体状况具体分析,只有最合适的算法,没有最优的算法。

 

 

 

 

聊聊高并发系统之限流特技

在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。缓存的目的是提高系统访问速度和增大系统能处理的容量,可谓是抗高并发流量的银弹;而降级是当服务出问题或者影响到核心流程的性能则须要暂时屏蔽掉,待高峰或者问题解决后再打开;而有些场景并不能用缓存和降级来解决,好比稀缺资源(秒杀、抢购)、写服务(如评论、下单)、频繁的复杂查询(评论的最后几页),所以需有一种手段来限制这些场景的并发/请求量,即限流。

 

限流的目的是经过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则能够拒绝服务(定向到错误页或告知资源没有了)、排队或等待(好比秒杀、评论、下单)、降级(返回兜底数据或默认数据,如商品详情页库存默认有货)。

 

通常开发高并发系统常见的限流有:限制总并发数(好比数据库链接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发链接数)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其余还有如限制远程接口调用速率、限制MQ的消费速率。另外还能够根据网络链接数、网络流量、CPU或内存负载等来限流。

 

先有缓存这个银弹,后有限流来应对61八、双十一高并发流量,在处理高并发问题上能够说是如虎添翼,不用担忧瞬间流量致使系统挂掉或雪崩,最终作到有损服务而不是不服务;限流须要评估好,不可乱用,不然会正常流量出现一些奇怪的问题而致使用户抱怨。

 

在实际应用时也不要太纠结算法问题,由于一些限流算法实现是同样的只是描述不同;具体使用哪一种限流技术仍是要根据实际场景来选择,不要一味去找最佳模式,白猫黑猫能解决问题的就是好猫。

 

因在实际工做中遇到过许多人来问如何进行限流,所以本文会详细介绍各类限流手段。那么接下来咱们从限流算法、应用级限流、分布式限流、接入层限流来详细学习下限流技术手段。

 

限流算法

常见的限流算法有:令牌桶、漏桶。计数器也能够进行粗暴限流实现。

 

令牌桶算法

令牌桶算法是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。令牌桶算法的描述以下:

  • 假设限制2r/s,则按照500毫秒的固定速率往桶中添加令牌;

  • 桶中最多存放b个令牌,当桶满时,新添加的令牌被丢弃或拒绝;

  • 当一个n个字节大小的数据包到达,将从桶中删除n个令牌,接着数据包被发送到网络上;

  • 若是桶中的令牌不足n个,则不会删除令牌,且该数据包将被限流(要么丢弃,要么缓冲区等待)。

 



 

漏桶算法

漏桶做为计量工具(The Leaky Bucket Algorithm as a Meter)时,能够用于流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述以下:

  • 一个固定容量的漏桶,按照常量固定速率流出水滴;

  • 若是桶是空的,则不需流出水滴;

  • 能够以任意速率流入水滴到漏桶;

  • 若是流入水滴超出了桶的容量,则流入的水滴溢出了(被丢弃),而漏桶容量是不变的。



 

令牌桶和漏桶对比:

  • 令牌桶是按照固定速率往桶中添加令牌,请求是否被处理须要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;

  • 漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;

  • 令牌桶限制的是平均流入速率(容许突发请求,只要有令牌就能够处理,支持一次拿3个令牌,4个令牌),并容许必定程度突发流量;

  • 漏桶限制的是常量流出速率(即流出速率是一个固定常量值,好比都是1的速率流出,而不能一次是1,下次又是2),从而平滑突发流入速率;

  • 令牌桶容许必定程度的突发,而漏桶主要目的是平滑流入速率;

  • 两个算法实现能够同样,可是方向是相反的,对于相同的参数获得的限流效果是同样的。

 

另外有时候咱们还使用计数器来进行限流,主要用来限制总并发数,好比数据库链接池、线程池、秒杀的并发数;只要全局总请求数或者必定时间段的总请求数设定的阀值则进行限流,是简单粗暴的总数量限流,而不是平均速率限流。

 

到此基本的算法就介绍完了,接下来咱们首先看看应用级限流。

 

 

应用级限流

 

限流总并发/链接/请求数

对于一个应用系统来讲必定会有极限并发/请求数,即总有一个TPS/QPS阀值,若是超了阀值则系统就会不响应用户请求或响应的很是慢,所以咱们最好进行过载保护,防止大量请求涌入击垮系统。

若是你使用过Tomcat,其Connector 其中一种配置有以下几个参数:

acceptCount:若是Tomcat的线程都忙于响应,新来的链接会进入队列排队,若是超出排队大小,则拒绝链接;

maxConnections: 瞬时最大链接数,超出的会排队等待;

maxThreads:Tomcat能启动用来处理请求的最大线程数,若是请求处理量一直远远大于最大线程数则可能会僵死。

详细的配置请参考官方文档。另外如Mysql(如max_connections)、Redis(如tcp-backlog)都会有相似的限制链接数的配置。

 

限流总资源数

若是有的资源是稀缺资源(如数据库链接、线程),并且可能有多个系统都会去使用它,那么须要限制应用;可使用池化技术来限制总资源数:链接池、线程池。好比分配给每一个应用的数据库链接是100,那么本应用最多可使用100个资源,超出了能够等待或者抛异常。

 

限流某个接口的总并发/请求数

若是接口可能会有突发访问状况,但又担忧访问量太大形成崩溃,如抢购业务;这个时候就须要限制这个接口的总并发/请求数总请求数了;由于粒度比较细,能够为每一个接口都设置相应的阀值。可使用Java中的AtomicLong进行限流:

try {
if(atomic.incrementAndGet() > 限流数) {
//拒绝请求
  }
//处理请求
} finally {
atomic.decrementAndGet();
}

适合对业务无损的服务或者须要过载保护的服务进行限流,如抢购业务,超出了大小要么让用户排队,要么告诉用户没货了,对用户来讲是能够接受的。而一些开放平台也会限制用户调用某个接口的试用请求量,也能够用这种计数器方式实现。这种方式也是简单粗暴的限流,没有平滑处理,须要根据实际状况选择使用;

 

限流某个接口的时间窗请求数

即一个时间窗口内的请求数,如想限制某个接口/服务每秒/每分钟/天天的请求数/调用量。如一些基础服务会被不少其余系统调用,好比商品详情页服务会调用基础商品服务调用,可是怕由于更新量比较大将基础服务打挂,这时咱们要对每秒/每分钟的调用量进行限速;一种实现方式以下所示:

LoadingCache<Long, AtomicLong> counter =
        CacheBuilder.newBuilder()
                .expireAfterWrite(2, TimeUnit.SECONDS)
                .build(new CacheLoader<Long, AtomicLong>() {
                    @Override
                    public AtomicLong load(Long seconds) throws Exception {
                        return new AtomicLong(0);
                    }
                });
long limit = 1000;
while(true) {
    //获得当前秒
   
long currentSeconds = System.currentTimeMillis() / 1000;
    if(counter.get(currentSeconds).incrementAndGet() > limit) {
        System.out.println("限流了:" + currentSeconds);
        continue;
    }
    //业务处理
}

 咱们使用Guava的Cache来存储计数器,过时时间设置为2秒(保证1秒内的计数器是有的),而后咱们获取当前时间戳而后取秒数来做为KEY进行计数统计和限流,这种方式也是简单粗暴,刚才说的场景够用了。

 

平滑限流某个接口的请求数

以前的限流方式都不能很好地应对突发请求,即瞬间请求可能都被容许从而致使一些问题;所以在一些场景中须要对突发请求进行整形,整形为平均速率请求处理(好比5r/s,则每隔200毫秒处理一个请求,平滑了速率)。这个时候有两种算法知足咱们的场景:令牌桶和漏桶算法。Guava框架提供了令牌桶算法实现,可直接拿来使用。

Guava RateLimiter提供了令牌桶算法实现:平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)实现。

 

SmoothBursty

RateLimiter limiter = RateLimiter.create(5);
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());

   将获得相似以下的输出:

  0.0

  0.198239

  0.196083

  0.200609

  0.199599

  0.19961

一、RateLimiter.create(5) 表示桶容量为5且每秒新增5个令牌,即每隔200毫秒新增一个令牌;

二、limiter.acquire()表示消费一个令牌,若是当前桶中有足够令牌则成功(返回值为0),若是桶中没有令牌则暂停一段时间,好比发令牌间隔是200毫秒,则等待200毫秒后再去消费令牌(如上测试用例返回的为0.198239,差很少等待了200毫秒桶中才有令牌可用),这种实现将突发请求速率平均为了固定请求速率。

 

再看一个突发示例:

RateLimiter limiter = RateLimiter.create(5);
System.out.println(limiter.acquire(5));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));

将获得相似以下的输出:

0.0

0.98745

0.183553

0.199909

limiter.acquire(5)表示桶的容量为5且每秒新增5个令牌,令牌桶算法容许必定程度的突发,因此能够一次性消费5个令牌,但接下来的limiter.acquire(1)将等待差很少1秒桶中才能有令牌,且接下来的请求也整形为固定速率了。

RateLimiter limiter = RateLimiter.create(5);
System.out.println(limiter.acquire(10));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));

将获得相似以下的输出:

0.0

1.997428

0.192273

0.200616

同上边的例子相似,第一秒突发了10个请求,令牌桶算法也容许了这种突发(容许消费将来的令牌),但接下来的limiter.acquire(1)将等待差很少2秒桶中才能有令牌,且接下来的请求也整形为固定速率了。


接下来再看一个突发的例子:

RateLimiter limiter = RateLimiter.create(2);
System.out.println(limiter.acquire());
Thread.sleep(2000L);
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());

将获得相似以下的输出:

0.0

0.0

0.0

0.0

0.499876

0.495799

一、建立了一个桶容量为2且每秒新增2个令牌;

二、首先调用limiter.acquire()消费一个令牌,此时令牌桶能够知足(返回值为0);

三、而后线程暂停2秒,接下来的两个limiter.acquire()都能消费到令牌,第三个limiter.acquire()也一样消费到了令牌,到第四个时就须要等待500毫秒了。

此处能够看到咱们设置的桶容量为2(即容许的突发量),这是由于SmoothBursty中有一个参数:最大突发秒数(maxBurstSeconds)默认值是1s,突发量/桶容量=速率*maxBurstSeconds,因此本示例桶容量/突发量为2,例子中前两个是消费了以前积攒的突发量,而第三个开始就是正常计算的了。令牌桶算法容许将一段时间内没有消费的令牌暂存到令牌桶中,留待将来使用,并容许将来请求的这种突发。

 

SmoothBursty经过平均速率和最后一次新增令牌的时间计算出下次新增令牌的时间的,另外须要一个桶暂存一段时间内没有使用的令牌(便可以突发的令牌数)。另外RateLimiter还提供了tryAcquire方法来进行无阻塞或可超时的令牌消费。

 

由于SmoothBursty容许必定程度的突发,会有人担忧若是容许这种突发,假设忽然间来了很大的流量,那么系统极可能扛不住这种突发。所以须要一种平滑速率的限流工具,从而系统冷启动后慢慢的趋于平均固定速率(即刚开始速率小一些,而后慢慢趋于咱们设置的固定速率)。Guava也提供了SmoothWarmingUp来实现这种需求,其能够认为是漏桶算法,可是在某些特殊场景又不太同样。

 

SmoothWarmingUp建立方式:RateLimiter.create(doublepermitsPerSecond, long warmupPeriod, TimeUnit unit)

permitsPerSecond表示每秒新增的令牌数,warmupPeriod表示在从冷启动速率过渡到平均速率的时间间隔。

 

示例以下:

RateLimiter limiter = RateLimiter.create(5, 1000, TimeUnit.MILLISECONDS);
for(int i = 1; i < 5;i++) {
    System.out.println(limiter.acquire());
}
Thread.sleep(1000L);
for(int i = 1; i < 5;i++) {
    System.out.println(limiter.acquire());
}

将获得相似以下的输出:

0.0

0.51767

0.357814

0.219992

0.199984

0.0

0.360826

0.220166

0.199723

0.199555

速率是梯形上升速率的,也就是说冷启动时会以一个比较大的速率慢慢到平均速率;而后趋于平均速率(梯形降低到平均速率)。能够经过调节warmupPeriod参数实现一开始就是平滑固定速率。

 

到此应用级限流的一些方法就介绍完了。假设将应用部署到多台机器,应用级限流方式只是单应用内的请求限流,不能进行全局限流。所以咱们须要分布式限流和接入层限流来解决这个问题。

 

分布式限流

分布式限流最关键的是要将限流服务作成原子化,而解决方案可使使用redis+lua或者nginx+lua技术进行实现,经过这两种技术能够实现的高并发和高性能。

首先咱们来使用redis+lua实现时间窗内某个接口的请求数限流,实现了该功能后能够改造为限流总并发/请求数和限制总资源数。Lua自己就是一种编程语言,也可使用它实现复杂的令牌桶或漏桶算法。

 

redis+lua实现中的lua脚本:

local key = KEYS[1] --限流KEY(一秒一个)
local limit = tonumber(ARGV[1]) --限流大小
local current = tonumber(redis.call("INCRBY", key, "1")) --请求数+1
if current > limit then --若是超出限流大小
    return 0
elseif current == 1 then --只有第一次访问须要设置2秒的过时时间
    redis.call("expire", key,"2")
end
return 1

如上操做因是在一个lua脚本中,又因Redis是单线程模型,所以是线程安全的。如上方式有一个缺点就是当达到限流大小后仍是会递增的,能够改形成以下方式实现:

local key = KEYS[1] --限流KEY(一秒一个)
local limit = tonumber(ARGV[1]) --限流大小
local current = tonumber(redis.call('get', key) or "0")
if current + 1 > limit then --若是超出限流大小
    return 0
else --请求数+1,并设置2秒过时
    redis.call("INCRBY", key,"1")
    redis.call("expire", key,"2")
    return 1
end

以下是Java中判断是否须要限流的代码:

public static boolean acquire() throws Exception {
String luaScript = Files.toString(new File("limit.lua"), Charset.defaultCharset());
Jedis jedis = new Jedis("192.168.147.52", 6379);
String key = "ip:" + System.currentTimeMillis()/ 1000; //此处将当前时间戳取秒数
Stringlimit = "3"; //限流大小
return (Long)jedis.eval(luaScript,Lists.newArrayList(key), Lists.newArrayList(limit)) == 1;
}

由于Redis的限制(Lua中有写操做不能使用带随机性质的读操做,如TIME)不能在Redis Lua中使用TIME获取时间戳,所以只好从应用获取而后传入,在某些极端状况下(机器时钟不许的状况下),限流会存在一些小问题。

 

使用Nginx+Lua实现的Lua脚本:

local locks = require "resty.lock"
local function acquire()
    local lock =locks:new("locks")
    local elapsed, err =lock:lock("limit_key") --互斥锁
    local limit_counter =ngx.shared.limit_counter --计数器
    local key = "ip:" ..os.time()
    local limit = 5 --限流大小
    local current =limit_counter:get(key)

    if current ~= nil and current + 1> limit then --若是超出限流大小
        lock:unlock()
        return 0
    end
    if current == nil then
        limit_counter:set(key, 1, 1) --第一次须要设置过时时间,设置key的值为1,过时时间为1秒
    else
        limit_counter:incr(key, 1) --第二次开始加1便可
    end
    lock:unlock()
    return 1
end
ngx.print(acquire())

实现中咱们须要使用lua-resty-lock互斥锁模块来解决原子性问题(在实际工程中使用时请考虑获取锁的超时问题),并使用ngx.shared.DICT共享字典来实现计数器。若是须要限流则返回0,不然返回1。使用时须要先定义两个共享字典(分别用来存放锁和计数器数据):

 

Java代码   收藏代码
  1. http {  
  2.     ……  
  3.     lua_shared_dict locks 10m;  
  4.     lua_shared_dict limit_counter 10m;  
  5. }  

有人会纠结若是应用并发量很是大那么redis或者nginx是否是能抗得住;不过这个问题要从多方面考虑:你的流量是否是真的有这么大,是否是能够经过一致性哈希将分布式限流进行分片,是否是能够当并发量太大降级为应用级限流;对策很是多,能够根据实际状况调节;像在京东使用Redis+Lua来限流抢购流量,通常流量是没有问题的。

 

对于分布式限流目前遇到的场景是业务上的限流,而不是流量入口的限流;流量入口限流应该在接入层完成,而接入层笔者通常使用Nginx。

 

接入层限流

接入层一般指请求流量的入口,该层的主要目的有:负载均衡、非法请求过滤、请求聚合、缓存、降级、限流、A/B测试、服务质量监控等等,能够参考笔者写的《使用Nginx+Lua(OpenResty)开发高性能Web应用》。

 

对于Nginx接入层限流可使用Nginx自带了两个模块:链接数限流模块ngx_http_limit_conn_module和漏桶算法实现的请求限流模块ngx_http_limit_req_module。还可使用OpenResty提供的Lua限流模块lua-resty-limit-traffic进行更复杂的限流场景。

 

limit_conn用来对某个KEY对应的总的网络链接数进行限流,能够按照如IP、域名维度进行限流。limit_req用来对某个KEY对应的请求的平均速率进行限流,并有两种用法:平滑模式(delay)和容许突发模式(nodelay)。

 

ngx_http_limit_conn_module

limit_conn是对某个KEY对应的总的网络链接数进行限流。能够按照IP来限制IP维度的总链接数,或者按照服务域名来限制某个域名的总链接数。可是记住不是每个请求链接都会被计数器统计,只有那些被Nginx处理的且已经读取了整个请求头的请求链接才会被计数器统计。

 

配置示例:

http {
    limit_conn_zone$binary_remote_addr zone=addr:10m; 
    limit_conn_log_level error; 
    limit_conn_status 503;
    ...
    server {
    ...
    location /limit {
        limit_conn addr 1;
    }

limit_conn:要配置存放KEY和计数器的共享内存区域和指定KEY的最大链接数;此处指定的最大链接数是1,表示Nginx最多同时并发处理1个链接;

limit_conn_zone:用来配置限流KEY、及存放KEY对应信息的共享内存区域大小;此处的KEY是“$binary_remote_addr”其表示IP地址,也可使用如$server_name做为KEY来限制域名级别的最大链接数;

limit_conn_status:配置被限流后返回的状态码,默认返回503;

limit_conn_log_level:配置记录被限流后的日志级别,默认error级别。

 

limit_conn的主要执行过程以下所示:

一、请求进入后首先判断当前limit_conn_zone中相应KEY的链接数是否超出了配置的最大链接数;

2.一、若是超过了配置的最大大小,则被限流,返回limit_conn_status定义的错误状态码;

2.二、不然相应KEY的链接数加1,并注册请求处理完成的回调函数;

三、进行请求处理;

四、在结束请求阶段会调用注册的回调函数对相应KEY的链接数减1。

 

limt_conn能够限流某个KEY的总并发/请求数,KEY能够根据须要变化。

 

按照IP限制并发链接数配置示例:

首先定义IP维度的限流区域:

limit_conn_zone $binary_remote_addrzone=perip:10m;

 

接着在要限流的location中添加限流逻辑:

location /limit {
    limit_conn perip 2;
    echo "123";
}

即容许每一个IP最大并发链接数为2。

 

使用AB测试工具进行测试,并发数为5个,总的请求数为5个:

ab -n 5 -c 5 http://localhost/limit

  

将获得以下access.log输出:

[08/Jun/2016:20:10:51+0800] [1465373451.802] 200

[08/Jun/2016:20:10:51+0800] [1465373451.803] 200

[08/Jun/2016:20:10:51 +0800][1465373451.803] 503

[08/Jun/2016:20:10:51 +0800][1465373451.803] 503

[08/Jun/2016:20:10:51 +0800][1465373451.803] 503

 

此处咱们把access log格式设置为log_format main  '[$time_local] [$msec] $status';分别是“日期 日期秒/毫秒值 响应状态码”。

 

若是被限流了,则在error.log中会看到相似以下的内容:

2016/06/08 20:10:51 [error] 5662#0: *5limiting connections by zone "perip", client: 127.0.0.1, server: _,request: "GET /limit HTTP/1.0", host: "localhost"

 

按照域名限制并发链接数配置示例:

首先定义域名维度的限流区域:

limit_conn_zone $ server_name zone=perserver:10m;

 

接着在要限流的location中添加限流逻辑:

location /limit {
    limit_conn perserver 2;
    echo "123";
}

即容许每一个域名最大并发请求链接数为2;这样配置能够实现服务器最大链接数限制。

 

ngx_http_limit_req_module

limit_req是漏桶算法实现,用于对指定KEY对应的请求进行限流,好比按照IP维度限制请求速率。

 

配置示例:

http {
    limit_req_zone $binary_remote_addr zone=one:10m rate=1r/s;
    limit_conn_log_level error;
    limit_conn_status 503;
    ...
    server {
    ...
    location /limit {
        limit_req zone=one burst=5 nodelay;
    }

limit_req:配置限流区域、桶容量(突发容量,默认0)、是否延迟模式(默认延迟);

limit_req_zone:配置限流KEY、及存放KEY对应信息的共享内存区域大小、固定请求速率;此处指定的KEY是“$binary_remote_addr”表示IP地址;固定请求速率使用rate参数配置,支持10r/s和60r/m,即每秒10个请求和每分钟60个请求,不过最终都会转换为每秒的固定请求速率(10r/s为每100毫秒处理一个请求;60r/m,即每1000毫秒处理一个请求)。

limit_conn_status:配置被限流后返回的状态码,默认返回503;

limit_conn_log_level:配置记录被限流后的日志级别,默认error级别。

 

limit_req的主要执行过程以下所示:

一、请求进入后首先判断最后一次请求时间相对于当前时间(第一次是0)是否须要限流,若是须要限流则执行步骤2,不然执行步骤3;

2.一、若是没有配置桶容量(burst),则桶容量为0;按照固定速率处理请求;若是请求被限流,则直接返回相应的错误码(默认503);

2.二、若是配置了桶容量(burst>0)且延迟模式(没有配置nodelay);若是桶满了,则新进入的请求被限流;若是没有满则请求会以固定平均速率被处理(按照固定速率并根据须要延迟处理请求,延迟使用休眠实现);

2.三、若是配置了桶容量(burst>0)且非延迟模式(配置了nodelay);不会按照固定速率处理请求,而是容许突发处理请求;若是桶满了,则请求被限流,直接返回相应的错误码;

三、若是没有被限流,则正常处理请求;

四、Nginx会在相应时机进行选择一些(3个节点)限流KEY进行过时处理,进行内存回收。

 

场景2.1测试

首先定义IP维度的限流区域:

limit_req_zone $binary_remote_addrzone=test:10m rate=500r/s;

限制为每秒500个请求,固定平均速率为2毫秒一个请求。

 

接着在要限流的location中添加限流逻辑:

location /limit {
    limit_req zone=test;
    echo "123";
}

即桶容量为0(burst默认为0),且延迟模式。

 

使用AB测试工具进行测试,并发数为2个,总的请求数为10个:

ab -n 10 -c 2 http://localhost/limit

                 

将获得以下access.log输出:

[08/Jun/2016:20:25:56+0800] [1465381556.410] 200

[08/Jun/2016:20:25:56 +0800][1465381556.410] 503

[08/Jun/2016:20:25:56 +0800][1465381556.411] 503

[08/Jun/2016:20:25:56+0800] [1465381556.411] 200

[08/Jun/2016:20:25:56 +0800][1465381556.412] 503

[08/Jun/2016:20:25:56 +0800][1465381556.412] 503

 

虽然每秒容许500个请求,可是由于桶容量为0,因此流入的请求要么被处理要么被限流,没法延迟处理;另外平均速率在2毫秒左右,好比1465381556.410和1465381556.411被处理了;有朋友会说这固定平均速率不是1毫秒嘛,其实这是由于实现算法没那么精准形成的。

 

若是被限流在error.log中会看到以下内容:

2016/06/08 20:25:56 [error] 6130#0: *1962limiting requests, excess: 1.000 by zone "test", client: 127.0.0.1,server: _, request: "GET /limit HTTP/1.0", host:"localhost"

 

若是被延迟了在error.log(日志级别要INFO级别)中会看到以下内容:

2016/06/10 09:05:23 [warn] 9766#0: *97021delaying request, excess: 0.368, by zone "test", client: 127.0.0.1,server: _, request: "GET /limit HTTP/1.0", host:"localhost"

 

场景2.2测试

首先定义IP维度的限流区域:

limit_req_zone $binary_remote_addr zone=test:10m rate=2r/s;

为了方便测试设置速率为每秒2个请求,即固定平均速率是500毫秒一个请求。

 

接着在要限流的location中添加限流逻辑:

location /limit {
    limit_req zone=test burst=3;
    echo "123";
}

固定平均速率为500毫秒一个请求,通容量为3,若是桶满了新的请求被限流,不然能够进入桶中排队并等待(实现延迟模式)。

 

为了看出限流效果咱们写了一个req.sh脚本:

ab -c 6 -n 6 http://localhost/limit
sleep 0.3
ab -c 6 -n 6 http://localhost/limit

首先进行6个并发请求6次URL,而后休眠300毫秒,而后再进行6个并发请求6次URL;中间休眠目的是为了能跨越2秒看到效果,若是看不到以下的效果能够调节休眠时间。

 

将获得以下access.log输出:

[09/Jun/2016:08:46:43+0800] [1465433203.959] 200

[09/Jun/2016:08:46:43 +0800][1465433203.959] 503

[09/Jun/2016:08:46:43 +0800][1465433203.960] 503

[09/Jun/2016:08:46:44+0800] [1465433204.450] 200

[09/Jun/2016:08:46:44+0800] [1465433204.950] 200

[09/Jun/2016:08:46:45 +0800][1465433205.453] 200

 

[09/Jun/2016:08:46:45 +0800][1465433205.766] 503

[09/Jun/2016:08:46:45 +0800][1465433205.766] 503

[09/Jun/2016:08:46:45 +0800][1465433205.767] 503

[09/Jun/2016:08:46:45+0800] [1465433205.950] 200

[09/Jun/2016:08:46:46+0800] [1465433206.451] 200

[09/Jun/2016:08:46:46+0800] [1465433206.952] 200



 

桶容量为3,即桶中在时间窗口内最多流入3个请求,且按照2r/s的固定速率处理请求(即每隔500毫秒处理一个请求);桶计算时间窗口(1.5秒)=速率(2r/s)/桶容量(3),也就是说在这个时间窗口内桶最多暂存3个请求。所以咱们要以当前时间往前推1.5秒和1秒来计算时间窗口内的总请求数;另外由于默认是延迟模式,因此时间窗内的请求要被暂存到桶中,并以固定平均速率处理请求:

第一轮:有4个请求处理成功了,按照漏桶桶容量应该最多3个才对;这是由于计算算法的问题,第一次计算因没有参考值,因此第一次计算后,后续的计算才能有参考值,所以第一次成功能够忽略;这个问题影响很小能够忽略;并且按照固定500毫秒的速率处理请求。

第二轮:由于第一轮请求是突发来的,差很少都在1465433203.959时间点,只是由于漏桶将速率进行了平滑变成了固定平均速率(每500毫秒一个请求);而第二轮计算时间应基于1465433203.959;而第二轮突发请求差很少都在1465433205.766时间点,所以计算桶容量的时间窗口应基于1465433203.959和1465433205.766来计算,计算结果为1465433205.766这个时间点漏桶为空了,能够流入桶中3个请求,其余请求被拒绝;又由于第一轮最后一次处理时间是1465433205.453,因此第二轮第一个请求被延迟到了1465433205.950。这里也要注意固定平均速率只是在配置的速率左右,存在计算精度问题,会有一些误差。

 

若是桶容量改成1(burst=1),执行req.sh脚本能够看到以下输出:

09/Jun/2016:09:04:30+0800] [1465434270.362] 200

[09/Jun/2016:09:04:30 +0800][1465434270.371] 503

[09/Jun/2016:09:04:30 +0800] [1465434270.372]503

[09/Jun/2016:09:04:30 +0800][1465434270.372] 503

[09/Jun/2016:09:04:30 +0800][1465434270.372] 503

[09/Jun/2016:09:04:30+0800] [1465434270.864] 200

 

[09/Jun/2016:09:04:31 +0800][1465434271.178] 503

[09/Jun/2016:09:04:31 +0800][1465434271.178] 503

[09/Jun/2016:09:04:31 +0800][1465434271.178] 503

[09/Jun/2016:09:04:31 +0800][1465434271.178] 503

[09/Jun/2016:09:04:31 +0800][1465434271.179] 503

[09/Jun/2016:09:04:31+0800] [1465434271.366] 200

桶容量为1,按照每1000毫秒一个请求的固定平均速率处理请求。

 

场景2.3测试

首先定义IP维度的限流区域:

limit_req_zone $binary_remote_addr zone=test:10m rate=2r/s;

为了方便测试配置为每秒2个请求,固定平均速率是500毫秒一个请求。

 

接着在要限流的location中添加限流逻辑:

location /limit {
    limit_req zone=test burst=3 nodelay;
    echo "123";
}

桶容量为3,若是桶满了直接拒绝新请求,且每秒2最多两个请求,桶按照固定500毫秒的速率以nodelay模式处理请求。

 

为了看到限流效果咱们写了一个req.sh脚本:

ab -c 6 -n 6 http://localhost/limit
sleep 1
ab -c 6 -n 6 http://localhost/limit
sleep 0.3
ab -c 6 -n 6 http://localhost/limit
sleep 0.3
ab -c 6 -n 6 http://localhost/limit
sleep 0.3
ab -c 6 -n 6 http://localhost/limit
sleep 2
ab -c 6 -n 6 http://localhost/limit

 

将获得相似以下access.log输出:

[09/Jun/2016:14:30:11+0800] [1465453811.754] 200

[09/Jun/2016:14:30:11+0800] [1465453811.755] 200

[09/Jun/2016:14:30:11+0800] [1465453811.755] 200

[09/Jun/2016:14:30:11+0800] [1465453811.759] 200

[09/Jun/2016:14:30:11 +0800][1465453811.759] 503

[09/Jun/2016:14:30:11 +0800][1465453811.759] 503

 

[09/Jun/2016:14:30:12+0800] [1465453812.776] 200

[09/Jun/2016:14:30:12+0800] [1465453812.776] 200

[09/Jun/2016:14:30:12 +0800][1465453812.776] 503

[09/Jun/2016:14:30:12 +0800][1465453812.777] 503

[09/Jun/2016:14:30:12 +0800][1465453812.777] 503

[09/Jun/2016:14:30:12 +0800][1465453812.777] 503

 

[09/Jun/2016:14:30:13 +0800] [1465453813.095]503

[09/Jun/2016:14:30:13 +0800][1465453813.097] 503

[09/Jun/2016:14:30:13 +0800][1465453813.097] 503

[09/Jun/2016:14:30:13 +0800][1465453813.097] 503

[09/Jun/2016:14:30:13 +0800][1465453813.097] 503

[09/Jun/2016:14:30:13 +0800][1465453813.098] 503

 

[09/Jun/2016:14:30:13+0800] [1465453813.425] 200

[09/Jun/2016:14:30:13 +0800][1465453813.425] 503

[09/Jun/2016:14:30:13 +0800][1465453813.425] 503

[09/Jun/2016:14:30:13 +0800][1465453813.426] 503

[09/Jun/2016:14:30:13 +0800][1465453813.426] 503

[09/Jun/2016:14:30:13 +0800][1465453813.426] 503

 

[09/Jun/2016:14:30:13+0800] [1465453813.754] 200

[09/Jun/2016:14:30:13 +0800][1465453813.755] 503

[09/Jun/2016:14:30:13 +0800][1465453813.755] 503

[09/Jun/2016:14:30:13 +0800][1465453813.756] 503

[09/Jun/2016:14:30:13 +0800][1465453813.756] 503

[09/Jun/2016:14:30:13 +0800][1465453813.756] 503

 

[09/Jun/2016:14:30:15+0800] [1465453815.278] 200

[09/Jun/2016:14:30:15+0800] [1465453815.278] 200

[09/Jun/2016:14:30:15+0800] [1465453815.278] 200

[09/Jun/2016:14:30:15 +0800][1465453815.278] 503

[09/Jun/2016:14:30:15 +0800][1465453815.279] 503

[09/Jun/2016:14:30:15 +0800][1465453815.279] 503

 

[09/Jun/2016:14:30:17+0800] [1465453817.300] 200

[09/Jun/2016:14:30:17+0800] [1465453817.300] 200

[09/Jun/2016:14:30:17+0800] [1465453817.300] 200

[09/Jun/2016:14:30:17+0800] [1465453817.301] 200

[09/Jun/2016:14:30:17 +0800][1465453817.301] 503

[09/Jun/2016:14:30:17 +0800][1465453817.301] 503



 

桶容量为3(,即桶中在时间窗口内最多流入3个请求,且按照2r/s的固定速率处理请求(即每隔500毫秒处理一个请求);桶计算时间窗口(1.5秒)=速率(2r/s)/桶容量(3),也就是说在这个时间窗口内桶最多暂存3个请求。所以咱们要以当前时间往前推1.5秒和1秒来计算时间窗口内的总请求数;另外由于配置了nodelay,是非延迟模式,因此容许时间窗内突发请求的;另外从本示例会看出两个问题:

第一轮和第七轮:有4个请求处理成功了;这是由于计算算法的问题,本示例是若是2秒内没有请求,而后接着忽然来了不少请求,第一次计算的结果将是不正确的;这个问题影响很小能够忽略;

第五轮:1.0秒计算出来是3个请求;此处也是因计算精度的问题,也就是说limit_req实现的算法不是很是精准的,假设此处当作相对于2.75的话,1.0秒内只有1次请求,因此仍是容许1次请求的。

 

若是限流出错了,能够配置错误页面:

proxy_intercept_errors on;
recursive_error_pages on;
error_page 503 //www.jd.com/error.aspx;

limit_conn_zone/limit_req_zone定义的内存不足,则后续的请求将一直被限流,因此须要根据需求设置好相应的内存大小。

 

此处的限流都是单Nginx的,假设咱们接入层有多个nginx,此处就存在和应用级限流相同的问题;那如何处理呢?一种解决办法:创建一个负载均衡层将按照限流KEY进行一致性哈希算法将请求哈希到接入层Nginx上,从而相同KEY的将打到同一台接入层Nginx上;另外一种解决方案就是使用Nginx+Lua(OpenResty)调用分布式限流逻辑实现。

 

lua-resty-limit-traffic

以前介绍的两个模块使用上比较简单,指定KEY、指定限流速率等就能够了,若是咱们想根据实际状况变化KEY、变化速率、变化桶大小等这种动态特性,使用标准模块就很难去实现了,所以咱们须要一种可编程来解决咱们问题;而OpenResty提供了lua限流模块lua-resty-limit-traffic,经过它能够按照更复杂的业务逻辑进行动态限流处理了。其提供了limit.conn和limit.req实现,算法与nginx limit_conn和limit_req是同样的。

 

此处咱们来实现ngx_http_limit_req_module中的【场景2.2测试】,不要忘记下载lua-resty-limit-traffic模块并添加到OpenResty的lualib中。

 

配置用来存放限流用的共享字典:

lua_shared_dict limit_req_store 100m;

 

如下是实现【场景2.2测试】的限流代码limit_req.lua:

local limit_req = require "resty.limit.req"
local rate = 2 --固定平均速率 2r/s
local burst = 3 --桶容量
local error_status = 503
local nodelay = false --是否须要不延迟处理
local lim, err = limit_req.new("limit_req_store", rate, burst)
if not lim then --没定义共享字典
    ngx.exit(error_status)
end
local key = ngx.var.binary_remote_addr --IP维度的限流
--流入请求,若是请求须要被延迟则delay > 0
local delay, err = lim:incoming(key, true)
if not delay and err == "rejected" then --超出桶大小了
    ngx.exit(error_status)
end
if delay > 0 then --根据须要决定是延迟或者不延迟处理
    if nodelay then
        --直接突发处理了
    else
        ngx.sleep(delay) --延迟处理
    end
end

即限流逻辑再nginx access阶段被访问,若是不被限流继续后续流程;若是须要被限流要么sleep一段时间继续后续流程,要么返回相应的状态码拒绝请求。

 

在分布式限流中咱们使用了简单的Nginx+Lua进行分布式限流,有了这个模块也可使用这个模块来实现分布式限流。

 

另外在使用Nginx+Lua时也能够获取ngx.var.connections_active进行过载保护,即若是当前活跃链接数超过阈值进行限流保护。

if tonumber(ngx.var.connections_active) >= tonumber(limit) then
    //限流
end

 

nginx也提供了limit_rate用来对流量限速,如limit_rate 50k,表示限制下载速度为50k。

 

到此笔者在工做中涉及的限流用法就介绍完,这些算法中有些容许突发,有些会整形为平滑,有些计算算法简单粗暴;其中令牌桶算法和漏桶算法实现上是相似的,只是表述的方向不太同样,对于业务来讲没必要刻意去区分它们;所以须要根据实际场景来决定如何限流,最好的算法不必定是最适用的。

 

参考资料

https://en.wikipedia.org/wiki/Token_bucket

https://en.wikipedia.org/wiki/Leaky_bucket

http://redis.io/commands/incr

http://nginx.org/en/docs/http/ngx_http_limit_req_module.html

http://nginx.org/en/docs/http/ngx_http_limit_conn_module.html

https://github.com/openresty/lua-resty-limit-traffic

 

http://nginx.org/en/docs/http/ngx_http_core_module.html#limit_rate

 

 

 

 

 

系统监控和流控-java应用

目前系统的监控方面,linux机器,能够定时的获取cpu、load、IO、网络等状况,统计以后,若是超过阀值,便可报警。

web的请求,能够经过分析apache的日志,获取PV、UV以及页面的响应时间等信息,统计这些信息,若是有异常,报警便可。

可是java系统(一个java进程)中的bean的状况如何作到监控和流控呢?

    双十一,各个系统都有一些监控和流控的策略,了解了一圈以后,打算总结一下,只是粗略的写一下思路,记录一下,不涉及到细节(由于细节坑不少)。

    监控的目前是为了了解系统的运行细节,流控是为了在出现问题(瞬间请求暴涨、持续一段时间请求超过平均值、底层依赖的应用或者DB出现异常)的时候可以作出处理(动态限流拒绝新的请求进来<限流策略能够设置>、利用开关<修改静态类的属性>进行应用降级处理减小非核心的调用)。

    (1)业务日志

    监控业务代码抛出来的日志信息(能够在linux中经过crontab定时程序来抓取,也能够经过控制台来远程处理,也能够把文件拉到集中的分析服务器来处理),若是异常数太多,或者出现本身以前设置过的报警关键字,则进行报警;

    (2)java gc的日志

    java在运行过程当中,若是配置了参数(-verbose:gc -Xloggc:/home/admin/logs/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps),可以看到系统的gc的状况,若是FullGc特别频繁(超过一小时一次),能够提示报警;

    (3)拥堵线程数做为监控指标

    建立一个对象,里面一个key是方法的表明,而后属性中有个计数器(这个key此时对应的拥堵线程数),若是方法体内处理变慢,并发状况下拥堵线程数会增长,此时,能够进行报警,也能够把后续的请求当掉,下降负载。计数器进行变化能够有两种方式:第一种就是,经过AOP的方式,在方法调用前(before)拥堵线程数加一,方法返回后(after)拥堵线程数减一,这种好处就是不用硬编码;另一种就须要写代码,在方法体内,调用开始加以,而后try住,在finally的时候减一。

    (4)根据方法调用的QPS和RT

    QPS即一秒内方法调用的次数,RT即一秒内方法调用的平均返回时间。对于web请求,通常在web服务器(apache/nginx)中作掉这个,在服务器A中部署一个client,用来作计数统计,具体的计数规则可能会比较复杂,超过阀值(有多是攻击),则请求server端此时的拒绝策略,是让用户等待,仍是跳转待验证码页面让其输入验证码以后再进行访问,仍是直接返回错误。对于java中的bean方法,如何获取呢?如三种描述,两种方式一种是AOP拦截,一种是代码中硬编码。

    (5)基于AOP获取一个特定方法在特定时间段(例如一分钟)的拥堵线程数、执行次数(总次数、成功次数、失败次数)、响应时间

    作一个本地内存对象,用于记录这些信息,经过AOP的before获取信息(当前时间,线程数加一,调用次数加一),afterReturen获取信息(返回时间、线程数减一,调用次数加一,能够定制啥叫成功失败,而后相应次数增长),afterThrowing获取信息(返回时间、线程数减1、失败次数加一),最后定时dump内存中的数据,存储到DB或者日志,而后对于这些信息进行监控(能够根据历史来进行同比和环比)进行报警。

    (6)动态来添加QPS和RT的流控

    这种流控方式最为灵活,应用端依赖一个jar包,而后添加一个全局的AOP配置(对于页面的请求,在web.xml中添加filter便可​),拦截全部的方法(性能消耗能够忽略),可是只有符合规则的方法才会进行统计,而后有一个控制台设置规则,设置好规则后推送到应用端,应用端获取这个规则,根据这个规则来进行统计,超出阀值则进行限流。这种方式最为灵活,遇到紧急问题的时候能够经过控制台来限流掉。

    (7)请求是海量的状况下如何进行监控

    在请求很大的状况下,此时应用就不要处理监控的逻辑了,只要获取调用的信息(时间点、响应时间、方法签名等信息),而后把这些信息打印到日志中,异步来进行处理,把这些日志拉到专门的分析集群,而后在分析集群里面来作实时的分析,把分析的结果持久话到BD,对于分析后的结构化数据进行监控。

 

转载公司一位同事的文章:

http://rdc.taobao.com/team/jm/archives/2594

 

 

流量预警和限流方案中,比较经常使用的有两种。第一种滑窗模式,经过统计多个单元时间的访问次数来进行控制,当单位时间的访问次数达到的某个峰值时进行限流。第二种为响应模式,经过控制当前活跃请求数,来进行流量控制。下面来简单分析下两种的优缺点。

一、滑窗模式

模式分析:

在每次有访问进来时,咱们判断前N个单位时间里总访问量是否超过了设置的阈值,若超过则不容许执行。

这种模式的实现的方式更加契合流控的本质意义。理解较为简单。但因为访问量的预先不可预见性,会发生单位时间的前半段有大量的请求涌入,然后半段则拒绝全部请求的状况发生。(通常,须要会将单位时间切的足够的细来解决这个问题)其次,咱们很难肯定这个阈值设置在多少比较合适,只能经过经验或者模拟(如压测)来进行估计,不过即便是压测也很难估计的准确,线上每台机器的硬件参数的不一样,或者同一台机子在不一样的时间点其能够接受的阈值也不尽相同(系统中),每一个时间点致使可以承受的最大阈值也不尽相同,咱们没法考虑的周全。

因此滑窗模式每每用来对某一资源的保护上(或者说是承诺比较合适:我对某一接口的提供者承诺过,最高调用量不超过XX),如对db的保护,对某一服务的调用的控制上。由于对于咱们应用来讲,db或某一接口就是一共单一的总体。

代码实现思路:

每个窗(单位时间)就是一个独立的计数器(原子计数器),用以数组保存。将当前时间以某种方式(好比取模)映射到数组的一项中。每次访问先对当前窗内计数器+1,再计算前N个单元格的访问量综合,超过阈值则限流。

这里有个问题,时间永远是递增的,单纯的取模,会致使数组过长,使用内存过多,咱们能够用环形队列来解决这个问题。

二、响应模式

模式分析:

每次操做执行时,咱们经过判断当前正在执行的访问数是否超过某个阈值在决定是否限流。

该模式看着思路比较的另类,但却有其独到之处。实际上咱们限流的根本是为了保护资源,防止系统接受的请求过多,目不暇接,拖慢系统中其余接口的服务,形成雪崩。也就是说咱们真正须要关心的是那些运行中的请求,而那些已经完成的请求已经是过去时,再也不是须要关心的了。

咱们来看看其阈值的计算方式,对于一个请求来讲,响应时间rt/qps是一个比较容易获取的参数,那么咱们这样计算:qps/1000*rt。

此外,一个应用每每是个复杂的系统,提供的服务或者暴露的请求、资源不止一个。内部GC、定时任务的执行、其余服务访问的骤增,外部依赖方、db的抖动,抑或是代码中不经意间的一个bug。均可能致使相应时间的变化,致使系统同时能够执行请求的变化。而这种模式,则能恰如其分的自动作出调整,当系统不适时,rt增长时,会自动的对qps作出适应。

代码实现思路:

当访问开始时,咱们对当前计数器(原子计数器)+1,当完成时,-1。该计数器即为当前正在执行的请求数。只需判断这个计数器是否超过阈值便可。

相关文章
相关标签/搜索