目前在作一个消息中台,提供给业务方各类消息通道能力。咱们在系统设计过程当中,除了有对业务方在使用时作 Quota 限制;也有对请求作流量控制(几w+ QPS),防止并发流量上来时打垮服务。下面是我的在调研流量控制方案的一些梳理和总结。html
并发一般是指并发访问,也就是在某个时间点,有多少个访问请求同时到来。机器的性能是有限的,若是这个量级达到必定程度,就会形成系统压力,影响系统性能。前端
应对高并发流量的几种解决方案:nginx
高并发最有效和经常使用的解决方案是流量控制 ,也就是限流。为应对服务的高可用,经过对大流量的请求进行限流,拦截掉大部分请求,只容许一部分请求真正进入后端服务器,这样就能够防止大量请求形成系统压力过大致使系统崩溃的状况,从而保护服务正常可用。git
经常使用的限流算法github
计数器是一种比较简单的限流算法,用途比较普遍,在接口层面,不少地方使用这种方式限流。在一段时间内,进行计数,与阀值进行比较,到了时间临界点,再将计数器清0。算法
package counter import ( "fmt" "time" ) func CounterDemo() { // init rate limiter limitCount := int64(100) interval := 60 * time.Second rateLimiter := NewRateLimiter(limitCount, interval) for i := 0; i < 800; i++ { if rateLimiter.Grant() { fmt.Println("Continue to process") continue } fmt.Println("Exceed rate limit") } } type RateLimiter struct { limitCount int64 interval time.Duration requestCount int64 startAt time.Time } func NewRateLimiter(limitCount int64, interval time.Duration) *RateLimiter { return &RateLimiter{ startAt: time.Now(), interval: interval, limitCount: limitCount, } } func (rl *RateLimiter) Grant() bool { now := time.Now() if now.Before(rl.startAt.Add(rl.interval)) { if rl.requestCount < rl.limitCount { rl.requestCount++ return true } return false } rl.startAt = time.Now() rl.requestCount = 0 return false }
这种实现方式存在一个时间临界点问题:若是在单位时间 1min 内的前 1s ,已经经过了 100 个请求,那后面的 59s ,只能把请求拒绝,这种现象称为 突刺现象。数据库
因为计数器存在突刺现象,可使用漏桶算法来解决。漏桶提供了一种简单、直观的方法,经过队列来限制速率,能够把队列看做是一个存放请求的桶。当一个请求被注册时,会被加到队列的末端。每隔一段时间,队列中的第一个事件就会被处理。这也被称为先进先出(FIFO)队列。若是队列已满,那么额外的请求就会被丢弃(或泄露)。windows
package leakyBucket import ( "fmt" "time" ) func LeakyBucketDemo() { // init rate limiter rate := int64(5) size := int64(10) rateLimiter := NewRateLimiter(rate, size) for i := 0; i < 800; i++ { if rateLimiter.Grant() { fmt.Println("Continue to process") continue } fmt.Println("Exceed rate limit") } } type RateLimiter struct { startAt time.Time // bucket size size int64 // now the water in bucket water int64 // rater discharge rate rate int64 } func NewRateLimiter(rate, size int64) *RateLimiter { return &RateLimiter{ startAt: time.Now(), rate: rate, // rate of processing requests, request/s size: size, } } func (rl *RateLimiter) Grant() bool { // calculating water output now := time.Now() out := int64(now.Sub(rl.startAt).Milliseconds()) * rl.rate // remain water after the leak rl.water = max(0, rl.water-out) rl.startAt = now if rl.water+1 < rl.size { rl.water++ return true } return false } func max(a, b int64) int64 { if a > b { return a } return b }
漏桶算法的优势是,它能将突发的请求平滑化,并以近似平均的速度处理。可是,瞬间高并发的流量可能会使请求占满队列,使最新的请求没法获得处理,也不能保证请求在固定时间内获得处理。后端
令牌桶算法是对漏桶算法的一种改进,桶算法可以限制请求调用的速率,而令牌桶算法可以在限制调用的平均速率的同时还容许必定程度的突发调用。缓存
该算法的基本原理也很容易理解。就是有一个桶,里面有一个最大数量的 Token(容量)。每当一个消费者想要调用一个服务或消费一个资源时,他就会取出一个或多个 Token。只有当消费者可以取出所需数量的 Token 时,他才能消费一项服务。若是桶中没有所需数量的令牌,他须要等待,直到桶中有足够的令牌。
package tokenBucket import ( "fmt" "time" ) func tokenBucketDemo() { tokenRate := int64(5) size := int64(10) rateLimiter := NewRateLimiter(tokenRate, size) for i := 0; i < 800; i++ { if rateLimiter.Grant() { fmt.Println("Continue to process") continue } fmt.Println("Exceed rate limit") } } type RateLimiter struct { startAt time.Time size int64 tokens int64 tokenRate int64 } func NewRateLimiter(tokenRate, size int64) *RateLimiter { return &RateLimiter{ startAt: time.Now(), tokenRate: tokenRate, size: size, } } func (rl *RateLimiter) Grant() bool { now := time.Now() in := now.Sub(rl.startAt).Milliseconds() * rl.tokenRate rl.tokens = min(rl.size, rl.tokens+in) rl.startAt = now if rl.tokens > 0 { rl.tokens-- return true } return false } func min(a, b int64) int64 { if a > b { return b } return a }
漏桶和令牌桶算法存在两个缺点:
这里推荐一种更优秀的限流算法:滑动窗口。它能够灵活地扩展速率限制,而且性能良好。 能较好解决上面这两个缺陷,同时避免了漏桶的瞬时大流量问题,以及计数器实现的突刺现象。
滑动窗口是把固定时间片,进行划分,而且随着时间进行移动,这样就巧妙的避开了计数器的突刺现象。也就是说这些固定数量的能够移动的格子,将会进行计数判断阀值,所以格子的数量影响着滑动窗口算法的精度。
package slidingWindow import ( "fmt" "sync" "time" ) func SlidingWindowDemo() { // allow 10 requests per second rateLimiter := NewRateLimiter(time.Second, 10, func() Window { return NewLocalWindow() }) if rateLimiter.Grant() { fmt.Println("Continue to process") } else { fmt.Println("Exceed rate limit") } } // Window represents a fixed-window type Window interface { // Start returns the start boundary Start() time.Time // Count returns the accumulated count Count() int64 // AddCount increments the accumulated count by n AddCount(n int64) // Reset sets the state of the window with the given settings Reset(s time.Time, c int64) } type NewWindow func() Window type LocalWindow struct { start int64 count int64 } func NewLocalWindow() *LocalWindow { return &LocalWindow{} } func (w *LocalWindow) Start() time.Time { return time.Unix(0, w.start) } func (w *LocalWindow) Count() int64 { return w.count } func (w *LocalWindow) AddCount(n int64) { w.count += n } func (w *LocalWindow) Reset(s time.Time, c int64) { w.start = s.UnixNano() w.count = c } type RateLimiter struct { size time.Duration limit int64 mu sync.Mutex curr Window prev Window } func NewRateLimiter(size time.Duration, limit int64, newWindow NewWindow) *RateLimiter { currWin := newWindow() // The previous window is static (i.e. no add changes will happen within it), // so we always create it as an instance of LocalWindow prevWin := NewLocalWindow() return &RateLimiter{ size: size, limit: limit, curr: currWin, prev: prevWin, } } // Size returns the time duration of one window size func (rl *RateLimiter) Size() time.Duration { return rl.size } // Limit returns the maximum events permitted to happen during one window size func (rl *RateLimiter) Limit() int64 { rl.mu.Lock() defer rl.mu.Unlock() return rl.limit } func (rl *RateLimiter) SetLimit(limit int64) { rl.mu.Lock() defer rl.mu.Unlock() rl.limit = limit } // shorthand for GrantN(time.Now(), 1) func (rl *RateLimiter) Grant() bool { return rl.GrantN(time.Now(), 1) } // reports whether n events may happen at time now func (rl *RateLimiter) GrantN(now time.Time, n int64) bool { rl.mu.Lock() defer rl.mu.Unlock() rl.advance(now) elapsed := now.Sub(rl.curr.Start()) weight := float64(rl.size-elapsed) / float64(rl.size) count := int64(weight*float64(rl.prev.Count())) + rl.curr.Count() if count+n > rl.limit { return false } rl.curr.AddCount(n) return true } // advance updates the current/previous windows resulting from the passage of time func (rl *RateLimiter) advance(now time.Time) { // Calculate the start boundary of the expected current-window. newCurrStart := now.Truncate(rl.size) diffSize := newCurrStart.Sub(rl.curr.Start()) / rl.size if diffSize >= 1 { // The current-window is at least one-window-size behind the expected one. newPrevCount := int64(0) if diffSize == 1 { // The new previous-window will overlap with the old current-window, // so it inherits the count. newPrevCount = rl.curr.Count() } rl.prev.Reset(newCurrStart.Add(-rl.size), newPrevCount) // The new current-window always has zero count. rl.curr.Reset(newCurrStart, 0) } }
上面的4种限流方式,更可能是针对单实例下的并发场景, 下面介绍几种服务集群的限流方案:
Nginx 官方提供的限速模块使用的是 漏桶算法,保证请求的实时处理速度不会超过预设的阈值,主要有两个设置:
经过 Redis 提供的 incr 命令,在规定的时间窗口,容许经过的最大请求数
Kong 官方提供了一种分布式滑动窗口算法的设计, 目前支持在 Kong 上作集群限流配置。它经过集中存储每一个滑动窗口和 consumer 的计数,从而支持集群场景。这里推荐一个 Go 版本的实现: slidingwindow
另外业界在分布式场景下,也有 经过 Nginx+Lua 和 Redis+Lua 等方式来实现限流
本文主要是本身在学习和调研高并发场景下的限流方案的总结。目前业界流行的限流算法包括计数器、漏桶、令牌桶和滑动窗口, 每种算法都有本身的优点,实际应用中能够根据本身业务场景作选择,而分布式场景下的限流方案,也基本经过以上限流算法来实现。在高并发下流量控制的一个原则是:先让请求先到队列,并作流量控制,不让流量直接打到系统上。