限流?python+redis就能搞定!

最近在浏览高并发相关内容并试图点亮技能树的时候,老是能看到一句话: 保护高并发系统的三大利器:缓存、降级和限流。那什么是 限流呢?用我没读过太多书的话来说, 限流就是限制流量。咱们都知道服务器的处理能力是有上限的,若是超过了上限继续听任请求进来的话,可能会发生不可控的后果。而经过 限流,在请求数量超出阈值的时候就排队等待甚至拒绝服务,就可使系统在扛不住太高并发的状况下作到 有损服务而不是不服务。

举个栗子🌰,最近新型肺炎肆虐,各地都出现口罩紧缺的状况,广州政府为了缓解市民买不到口罩的情况,上线了预定服务,只有预定到的市民才能到指定的药店购买少许口罩。这就是生活中限流的状况,说这个也是但愿你们这段时间保护好本身,注意防御 :)
html

接下来就跟你们分享下接口限流的常见玩法吧,部分算法用python + redis粗略实现了一下,关键是图解啊!你品,你细品~python

固定窗口法

固定窗口法是限流算法里面最简单的,好比我想限制1分钟之内请求为100个,从如今算起的一分钟内,请求就最多就是100个,这分钟过完的那一刻把计数器归零,从新计算,周而复始。
git

固定窗口法
固定窗口法

伪代码实现

def can_pass_fixed_window(user, action, time_zone=60, times=30):
    """
    :param user: 用户惟一标识
    :param action: 用户访问的接口标识(即用户在客户端进行的动做)
    :param time_zone: 接口限制的时间段
    :param time_zone: 限制的时间段内容许多少请求经过
    """

    key = '{}:{}'.format(user, action)
    # redis_conn 表示redis链接对象
    count = redis_conn.get(key)
    if not count:
        count = 1
        redis_conn.setex(key, time_zone, count)
    if count < times:
        redis_conn.incr(key)
        return True

    return False
复制代码

这个方法虽然简单,但有个大问题是没法应对两个时间边界内的突发流量。如上图所示,若是在计数器清零的前1秒以及清零的后1秒都进来了100个请求,那么在短期内服务器就接收到了两倍的(200个)请求,这样就有可能压垮系统。会致使上面的问题是由于咱们的统计精度还不够,为了将临界问题的影响下降,咱们可使用滑动窗口法。github

滑动窗口法

滑动窗口法,简单来讲就是随着时间的推移,时间窗口也会持续移动,有一个计数器不断维护着窗口内的请求数量,这样就能够保证任意时间段内,都不会超过最大容许的请求数。例如当前时间窗口是0s~60s,请求数是40,10s后时间窗口就变成了10s~70s,请求数是60。web

时间窗口的滑动和计数器可使用redis的有序集合(sorted set)来实现。score的值用毫秒时间戳来表示,能够利用 当前时间戳 - 时间窗口的大小 来计算出窗口的边界,而后根据score的值作一个范围筛选就能够圈出一个窗口;value的值仅做为用户行为的惟一标识,也用毫秒时间戳就好。最后统计一下窗口内的请求数再作判断便可。
redis

滑动窗口法
滑动窗口法

伪代码实现

def can_pass_slide_window(user, action, time_zone=60, times=30):
    """
    :param user: 用户惟一标识
    :param action: 用户访问的接口标识(即用户在客户端进行的动做)
    :param time_zone: 接口限制的时间段
    :param time_zone: 限制的时间段内容许多少请求经过
    """

    key = '{}:{}'.format(user, action)
    now_ts = time.time() * 1000
    # value是什么在这里并不重要,只要保证value的惟一性便可,这里使用毫秒时间戳做为惟一值
    value = now_ts 
    # 时间窗口左边界
    old_ts = now_ts - (time_zone * 1000)
    # 记录行为
    redis_conn.zadd(key, value, now_ts)
    # 删除时间窗口以前的数据
    redis_conn.zremrangebyscore(key, 0, old_ts)
    # 获取窗口内的行为数量
    count = redis_conn.zcard(key)
    # 设置一个过时时间省得占空间
    redis_conn.expire(key, time_zone + 1)
    if not count or count < times:
        return True
    return False
复制代码

虽然滑动窗口法避免了时间界限的问题,可是依然没法很好解决细时间粒度上面请求过于集中的问题,就例如限制了1分钟请求不能超过60次,请求都集中在59s时发送过来,这样滑动窗口的效果就大打折扣。
为了使流量更加平滑,咱们可使用更加高级的令牌桶算法和漏桶算法。算法

令牌桶法

令牌桶算法的思路不复杂,它先以固定的速率生成令牌,把令牌放到固定容量的桶里,超过桶容量的令牌则丢弃,每来一个请求则获取一次令牌,规定只有得到令牌的请求才能放行,没有得到令牌的请求则丢弃。
编程


伪代码实现

# 令牌桶法,具体步骤:
# 请求来了就计算生成的令牌数,生成的速率有限制
# 若是生成的令牌太多,则丢弃令牌
# 有令牌的请求才能经过,不然拒绝
def can_pass_token_bucket(user, action, time_zone=60, times=30):
    """
    :param user: 用户惟一标识
    :param action: 用户访问的接口标识(即用户在客户端进行的动做)
    :param time_zone: 接口限制的时间段
    :param time_zone: 限制的时间段内容许多少请求经过
    """

    # 请求来了就倒水,倒水速率有限制
    key = '{}:{}'.format(user, action)
    rate = times / time_zone # 令牌生成速度
    capacity = times # 桶容量
    tokens = redis_conn.hget(key, 'tokens'# 看桶中有多少令牌
    last_time = redis_conn.hget(key, 'last_time'# 上次令牌生成时间
    now = time.time()
    tokens = int(tokens) if tokens else capacity
    last_time = int(last_time) if last_time else now
    delta_tokens = (now - last_time) * rate # 通过一段时间后生成的令牌
    if delta_tokens > 1:
        tokens = tokens + tokens # 增长令牌
        if tokens > tokens:
            tokens = capacity
        last_time = time.time() # 记录令牌生成时间
        redis_conn.hset(key, 'last_time', last_time)

    if tokens >= 1:
        tokens -= 1 # 请求进来了,令牌就减小1
        redis_conn.hset(key, 'tokens', tokens)
        return True
    return False
复制代码

令牌桶法限制的是请求的平均流入速率,优势是能应对必定程度上的突发请求,也能在必定程度上保持流量的来源特征,实现难度不高,适用于大多数应用场景。后端

漏桶算法

漏桶算法的思路与令牌桶算法有点相反。你们能够将请求想象成是水流,水流能够任意速率流入漏桶中,同时漏桶以固定的速率将水流出。若是流入速度太大会致使水满溢出,溢出的请求被丢弃。
api


经过上图能够看出漏桶法的特色是: 不限制请求流入的速率,可是 限制了请求流出的速率。这样突发流量能够被整造成一个稳定的流量,不会发生超频。

关于漏桶算法的实现方式有一点值得注意,我在浏览相关内容时发现网上大多数对于漏桶算法的伪代码实现,都只是实现了

根据维基百科,漏桶算法的实现理论有两种,分别是基于 meter 的基于 queue 的,他们实现的具体思路不一样,我大概介绍一下。

基于meter的漏桶

基于 meter 的实现相对来讲比较简单,其实它就有一个计数器,而后有消息要发送的时候,就看计数器够不够,若是计数器没有满的话,那么这个消息就能够被处理,若是计数器不足以发送消息的话,那么这个消息将会被丢弃。

那么这个计数器是怎么来的呢,基于 meter 的形式的计数器就是发送的频率,例如你设置得频率是不超过 5条/s ,那么计数器就是 5,在一秒内你每发送一条消息就减小一个,当你发第 6 条的时候计时器就不够了,那么这条消息就被丢弃了。

这种实现有点相似最开始介绍的固定窗口法,只不过期间粒度再小一些,伪代码就不上了。

基于queue的漏桶

基于 queue 的实现起来比较复杂,可是原理却比较简单,它也存在一个计数器,这个计数器却不表示速率限制,而是表示 queue 的大小,这里就是当有消息要发送的时候看 queue 中是否还有位置,若是有,那么就将消息放进 queue 中,这个 queue 以 FIFO 的形式提供服务;若是 queue 没有位置了,消息将被抛弃。

在消息被放进 queue 以后,还须要维护一个定时器,这个定时器的周期就是咱们设置的频率周期,例如咱们设置得频率是 5条/s,那么定时器的周期就是 200ms,定时器每 200ms 去 queue 里获取一次消息,若是有消息,那么就发送出去,若是没有就轮空。

这种实现方式比较复杂,限于篇幅这里就没有实现了,可是贴心的我仍是为你们找来了参考的栗子🌰。

熟悉python的朋友能够参考aiolimiter的实现 👉🏻 python传送门
熟悉go的朋友能够参考uber的ratelimit的实现 👉🏻 go传送门

注意,网上不少关于漏桶法的伪代码实现只实现了水流入桶的部分,没有实现关键的水从桶中漏出的部分。若是只实现了前半部分,其实跟令牌桶没有大的区别噢😯

若是以为上面的都太难,很差实现,那么我墙裂建议你尝试一下redis-cell这个模块!

redis-cell

Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也使用了漏斗算法,并提供了原子的限流指令。有了这个模块,限流问题就很是简单了。
这个模块须要单独安装,安装教程网上不少,它只有一个指令:CL.THROTTLE

CL.THROTTLE user123 15 30 60 1
                ▲    ▲  ▲  ▲ ▲
                |    |  |  | └───── apply 1 operation (default if omitted) 每次请求消耗的水滴
                |    |  └──┴─────── 30 operations / 60 seconds 漏水的速率
                |    └───────────── 15 max_burst 漏桶的容量
                └─────────────────── key “user123” 用户行为
复制代码

执行以上命令以后,redis会返回以下信息:

> cl.throttle laoqian:reply 15 30 60
1) (integer) 0   # 0 表示容许,1表示拒绝
2) (integer) 16  # 漏桶容量
3) (integer) 15  # 漏桶剩余空间left_quota
4) (integer) -1  # 若是拒绝了,须要多长时间后再试(漏桶有空间了,单位秒)
5) (integer) 2   # 多长时间后,漏桶彻底空出来(单位秒)
复制代码

有了上面的redis模块,就能够轻松对付大多数的限流场景了,简直太方便了有木有!

后记&引用

限流算法就大概介绍到这里了,这些算法图虽然是参照了好几篇文章所画,但也是花了精力在上面的,但愿能对你们产生帮助吧。而后我还将上面的代码整理到了github,须要的朋友请戳 👉🏻 这里


参考文章:

读后三连⭐️

若是你以为这篇内容对你有帮助,我想邀请你帮我三个小忙:

  1. 点赞/在看,让更多的人也能看到这篇内容(这对我真的很重要)
  2. 关注公众号「py一下」,不按期分享原创知识,python编程/后端/数据结构与算法/我的成长等
  3. 也多看看其余文章
相关文章
相关标签/搜索