Sentinel
的核心功能之一是流量统计,例如咱们经常使用的指标QPS,当前线程数等。上一篇文章中咱们已经大体提到了提供数据统计功能的Slot(StatisticSlot)
,StatisticSlot
在Sentinel
的整个体系中扮演了一个很是重要的角色,后续的一系列操做(限流,熔断)等都依赖于StatisticSlot
所统计出的数据。html
本文所要讨论的重点就是StatisticSlot
是如何作的流量统计?java
其实在以前介绍经常使用限流算法[经常使用限流算法](https://www.jianshu.com/p/9edebaa446d3)的时候已经有提到过一个算法滑动窗口限流
,该算法的滑动窗口原理其实跟Sentinel
所提供的流量统计原理是同样的,都是基于时间窗口的滑动统计node
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { ... // 当前请求线程数加一 node.increaseThreadNum(); // 新增请求数 node.addPassRequest(count); ... }
能够看到StatisticSlot
主要统计了两种类型的数据git
线程数github
请求数(QPS)算法
对于线程数的统计比较简单,经过内部维护一个LongAdder
来进行当前线程数的统计,每进入一个请求加1,每释放一个请求减1,从而获得当前的线程数数组
对于请求数QPS的统计则相对比较复杂,其中有用到滑动窗口原理(也是本文的重点),下面根据源码来深刻的分析app
public void addPassRequest(int count) { // 调用父类(StatisticNode)来进行统计 super.addPassRequest(count); // 根据clusterNode 汇总统计(背后也是调用父类StatisticNode) this.clusterNode.addPassRequest(count); }
最终都是调用了父类StatisticNode
的addPassRequest
方法性能
/** * 按秒统计,分红两个窗口,每一个窗口500ms,用来统计QPS */ private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); /** * 按分钟统计,分红60个窗口,每一个窗口 1000ms */ private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); public void addPassRequest(int count) { rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); }
代码比较简单,能够知道内部是调用了ArrayMetric
的addPass
方法来统计的,而且统计了两种不一样时间维度的数据(秒级和分钟级)this
private final LeapArray<MetricBucket> data; public ArrayMetric(int sampleCount, int intervalInMs) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) { if (enableOccupy) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } else { this.data = new BucketLeapArray(sampleCount, intervalInMs); } } public void addPass(int count) { // 1\. 获取当前窗口 WindowWrap<MetricBucket> wrap = data.currentWindow(); // 2\. 当前窗口加1 wrap.value().addPass(count); }
ArrayMetric
其实也是一个包装类,内部经过实例化LeapArray
的对应实现类,来实现具体的统计逻辑,LeapArray
是一个抽象类,OccupiableBucketLeapArray
和BucketLeapArray
都是其具体的实现类
OccupiableBucketLeapArray
在1.5版本以后才被引入,主要是为了解决一些高优先级的请求在限流触发的时候也能经过(经过占用将来时间窗口的名额来实现) 也是默认使用的LeapArray实现类
而统计的逻辑也比较清楚,分红了两步:
定位到当前窗口
获取到当前窗口WindowWrap
的MetricBucket
并执行addPass
逻辑
这里咱们先看下第二步中的MetricBucket
类,看看它作了哪些事情
/** * 存放当前窗口各类类型的统计值(类型包括 PASS BLOCK EXCEPTION 等) */ private final LongAdder[] counters; public MetricBucket() { MetricEvent[] events = MetricEvent.values(); this.counters = new LongAdder[events.length]; for (MetricEvent event : events) { counters[event.ordinal()] = new LongAdder(); } initMinRt(); } // 统计pass数 public void addPass(int n) { add(MetricEvent.PASS, n); } // 统计可占用的pass数 public void addOccupiedPass(int n) { add(MetricEvent.OCCUPIED_PASS, n); } // 统计异常数 public void addException(int n) { add(MetricEvent.EXCEPTION, n); } // 统计block数 public void addBlock(int n) { add(MetricEvent.BLOCK, n); } ....
MetricBucket经过定义了一个LongAdder
数组来存储不一样类型的流量统计值,具体的类型则都定义在MetricEvent
枚举中。
执行addPass
方法对应LongAdder
数组索引下表为0的值递增
下面再来看下data.currentWindow()
的内部逻辑
OccupiableBucketLeapArray
继承了抽象类LeapArray
,核心逻辑也是在LeapArray
中
/** * 时间窗口大小 单位ms */ protected int windowLengthInMs; /** * 切分的窗口数 */ protected int sampleCount; /** * 统计的时间间隔 intervalInMs = windowLengthInMs * sampleCount */ protected int intervalInMs; /** * 窗口数组 数组大小 = sampleCount */ protected final AtomicReferenceArray<WindowWrap<T>> array; /** * update lock 更新窗口时须要上锁 */ private final ReentrantLock updateLock = new ReentrantLock(); /** * @param sampleCount 须要划分的窗口数 * @param intervalInMs 间隔的统计时间 */ public LeapArray(int sampleCount, int intervalInMs) { this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); } /** * 获取当前窗口 */ public WindowWrap<T> currentWindow() { return currentWindow(TimeUtil.currentTimeMillis()); }
以上须要着重理解的是几个参数的含义:
sampleCount 定义的窗口的数
intervalInMs 统计的时间间隔
windowLengthInMs 每一个窗口的时间大小 = intervalInMs / sampleCount
sampleCount
比较好理解,就是须要定义几个窗口(默认秒级统计维度的话是两个窗口),intervalInMs
指的就是咱们须要统计的时间间隔,例如咱们统计QPS的话那就是1000ms,windowLengthInMs
指的每一个窗口的大小,是由intervalInMs
除以sampleCount
得来
相似下图
理解了上诉几个参数的含义后,咱们直接进入到LeapArray
的currentWindow(long time)
方法中去看看具体的实现
public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 根据当前时间戳计算当前所属的窗口数组索引下标 int idx = calculateTimeIdx(timeMillis); // 计算当前窗口的开始时间戳 long windowStart = calculateWindowStart(timeMillis); /* * 从窗口数组中获取当前窗口项,分为三种状况 * * (1) 当前窗口为空还未建立,则初始化一个 * (2) 当前窗口的开始时间和上面计算出的窗口开始时间一致,代表当前窗口还未过时,直接返回当前窗口 * (3) 当前窗口的开始时间 小于 上面计算出的窗口开始时间,代表当前窗口已过时,须要替换当前窗口 */ while (true) { WindowWrap<T> old = array.get(idx); if (old == null) { /* * 第一种状况,新建一个窗口项 */ WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) { /* * 第二种状况 直接返回 */ return old; } else if (windowStart > old.windowStart()) { /* * 第三种状况 替换窗口 */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart < old.windowStart()) { // 第四种状况,讲道理不会走到这里 return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } } /** * 根据当前时间戳计算当前所属的窗口数组索引下标 */ private int calculateTimeIdx(/*@Valid*/ long timeMillis) { long timeId = timeMillis / windowLengthInMs; return (int)(timeId % array.length()); } /** * 计算当前窗口的开始时间戳 */ protected long calculateWindowStart(/*@Valid*/ long timeMillis) { return timeMillis - timeMillis % windowLengthInMs; }
上面的方法就是整个滑动窗口逻辑的核心代码,注释其实也写的比较清晰了,简单归纳下能够分为如下几步:
根据当前时间戳 和 窗口数组大小 获取到当前的窗口数组索引下标idx
,若是窗口数是2,那其实idx
只有两种值(0 或 1)
根据当前时间戳(windowStart
) 计算获得当前窗口的开始时间戳值。经过calculateWindowStart
计算来获得,这个方法还蛮有意思的,经过当前时间戳和窗口时间大小取余来获得 与当前窗口开始时间的 偏移量。比我用定时任务实现高级多了 ... 😆 能够去对比一下我以前文章中的蠢实现 [滑动窗口算法定时任务实现](https://github.com/WangJunnan/learn/blob/master/algorithm/src/main/java/com/walm/learn/algorithm/ratelimit/SlidingWindowRateLimit.java)
而后就是根据上面获得的两个值 来获取当前时间窗口,这里其实又分为三种状况
当前窗口为空还未建立,则初始化一个
当前窗口的开始时间和上面计算出的窗口开始时间(windowStart
)一致,代表当前窗口还未过时,直接返回当前窗口
当前窗口的开始时间 小于 上面计算出的窗口(windowStart
)开始时间,代表当前窗口已过时,须要替换当前窗口
总的来讲,currentWindow
方法的实现仍是很是巧妙的,由于我在看Sentinel
的源码前也写过一篇限流算法的文章,刚好其中也实现过一个滑动窗口限流算法,不过相比于Sentinel
的实现,我用了定时任务去作窗口的切换更新,显然性能上更差,实现的也不优雅,你们也能够去对比一下。[经常使用限流算法](https://www.jianshu.com/p/9edebaa446d3)
Sentinel系列