sentinel 滑动窗口统计机制

sentinel的滑动窗口统计机制就是根据当前时间,获取对应的时间窗口,并更新该时间窗口中的各项统计指标(pass/block/rt等),这些指标被用来进行后续判断,好比限流、降级等;随着时间的推移,当前时间点对应的时间窗口是变化的,这时会涉及到时间窗口的初始化、复用等。能够说,sentinel上的功能所用到的数据几乎都是滑动窗口统计机制来维护和更新的。html

 

sentinel 处理流程是基于slot链(ProcessorSlotChain)来完成的,好比限流、熔断等,其中重要的一个slot就是StatisticSlot,它是作各类数据统计的,而限流/熔断的数据判断来源就是StatisticSlot,StatisticSlot的各类数据统计都是基于滑动窗口来完成的,所以本文就重点分析StatisticSlot的滑动窗口统计机制。node

 

sentinel 的slot链(ProcessorSlotChain)是责任链模式的体现,那SlotChain是在哪建立的呢?是在 CtSph.lookProcessChain()方法中建立的,而且该方法会根据当前请求的资源先去一个静态的HashMap中获取,若是获取不到才会建立,建立后会保存到HashMap中。这就意味着,同一个资源会全局共享一个SlotChain。默认生成ProcessorSlotChain逻辑为:安全

 1 // DefaultSlotChainBuilder
 2 public ProcessorSlotChain build() {  3    ProcessorSlotChain chain = new DefaultProcessorSlotChain();  4    chain.addLast(new NodeSelectorSlot());  5    chain.addLast(new ClusterBuilderSlot());  6    chain.addLast(new LogSlot());  7    chain.addLast(new StatisticSlot());  8    chain.addLast(new SystemSlot());  9    chain.addLast(new AuthoritySlot()); 10    chain.addLast(new FlowSlot()); 11    chain.addLast(new DegradeSlot()); 12 
13    return chain; 14 }

 

整个处理过程从第一个slot日后一直传递到最后一个的,当到达StatisticSlot时,开始统计各项指标,统计的结果又会被后续的Slot所采用,做为各类规则校验的依据。各类指标以下:数据结构

public enum MetricEvent { PASS, // Normal pass.
   BLOCK, // Normal block.
   EXCEPTION, // 异常统计
 SUCCESS, RT, // rt统计
 OCCUPIED_PASS }

 

StatisticSlot.entry流程

处理流程走到StatisticSlot时,首先触发后续slot.entry方法,而后统计各项指标,后续slot中数据判断来源就是这里统计的各项指标。StatisticSlot.entry 逻辑以下:并发

 1 @Override  2 public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNodenode, int count, Object... args) throws Throwable {  3    try {  4        // 触发下一个Slot的entry方法
 5  fireEntry(context, resourceWrapper, node, count, args);  6        // 若是能经过SlotChain中后面的Slot的entry方法,说明没有被限流或降级  7        // 统计信息
 8  node.increaseThreadNum();  9  node.addPassRequest(); 10        // 省略部分代码
11   } catch (BlockException e) { 12  context.getCurEntry().setError(e); 13        // Add block count.
14  node.increaseBlockedQps(); 15        // 省略部分代码
16        throw e; 17   } catch (Throwable e) { 18  context.getCurEntry().setError(e); 19        // Should not happen
20  node.increaseExceptionQps(); 21        // 省略部分代码
22        throw e; 23  } 24 }

由以上代码可知,StatisticSlot主要就作了3件事:app

  1. 触发后续slot的entry方法,进行规则校验分布式

  2. 校验经过则更新node实时指标数据ide

  3. 校验不经过则更新node异常指标数据高并发

注意:因为后续的fireEntry操做和更新本次统计信息是两个操做,不是原子的,会形成限流不许的小问题,好比设置的FlowRule count为20,并发状况下可能稍大于20,不过针对大部分场景来讲,这点误差是能够容忍的,毕竟咱们要的是限流效果,而不是必须精确的限流操做。性能

 

更新node实时指标数据

咱们能够看到 node.addPassRequest() 这段代码是在fireEntry执行以后执行的,这意味着,当前请求经过了sentinel的流控等规则,此时须要将当次请求记录下来,也就是执行 node.addPassRequest() 这行代码,具体的代码以下所示:

1 // DefaultNode
2 public void addPassRequest() { 3    super.addPassRequest(); 4    this.clusterNode.addPassRequest(); 5 }

这里的node是一个 DefaultNode 实例,这里特别补充一个 DefaultNode 和 ClusterNode 的区别:

  • DefaultNode:保存着某个resource在某个context中的实时指标,每一个DefaultNode都指向一个ClusterNode。

  • ClusterNode:保存着某个resource在全部的context中实时指标的总和,一样的resource会共享同一个ClusterNode,无论他在哪一个context中。

 

上面代码无论是 DefaultNode 仍是 ClusterNode ,走的都是StatisticNode 对象的 addPassRequest 方法:

1 private transient volatile Metric rollingCounterInSecond = new ArrayMetric(2, 1000); 2 private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000); 3 
4 public void addPassRequest(int count) { 5    rollingCounterInSecond.addPass(count); // 对每秒指标统计
6    rollingCounterInMinute.addPass(count); // 每分钟指标统计
7 }

 

每个经过的指标(pass)都是调用Metric 的接口进行操做的,而且是经过 ArrayMetric 这种实现类,代码以下:

public ArrayMetric(int windowLength, int interval) { this.data = new WindowLeapArray(windowLength, interval); } public void addPass(int count) { // 获取当前时间窗口
   WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count); }

 

首先经过 currentWindow() 获取当前时间窗口,而后更新当前时间窗口对应的统计指标,如下代码重点关注几个判断逻辑:

 1 // LeapArray
 2 public WindowWrap<T> currentWindow() {  3    return currentWindow(TimeUtil.currentTimeMillis());  4 }  5 // TimeUtil
 6 public static long currentTimeMillis() {  7    // currentTimeMillis是由一个tick线程每一个1ms更新一次,具体逻辑在TimeUtil类中
 8    return currentTimeMillis;  9 } 10 // LeapArray
11 public WindowWrap<T> currentWindow(long timeMillis) { 12    // 计算当前时间点落在滑动窗口的下标
13    int idx = calculateTimeIdx(timeMillis); 14    // Calculate current bucket start time.
15    long windowStart = calculateWindowStart(timeMillis); 16 
17    // 获取当前时间点对应的windowWrap,array为AtomicReferenceArray
18    while (true) { 19        WindowWrap<T> old = array.get(idx); 20        if (old == null) { 21            // 1.为空表示当前时间窗口为初始化过,建立WindowWrap并cas设置到array中
22            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs,windowStart, newEmptyBucket()); 23            if (array.compareAndSet(idx, null, window)) { 24                return window; 25           } else { 26  Thread.yield(); 27  } 28       } else if (windowStart == old.windowStart()) { 29            // 2.获取的时间窗口正好对应当前时间,直接返回
30            return old; 31       } else if (windowStart > old.windowStart()) { 32            // 3.获取的时间窗口为老的,进行reset操做复用
33            if (updateLock.tryLock()) { 34                try { 35                    return resetWindowTo(old, windowStart); 36               } finally { 37  updateLock.unlock(); 38  } 39           } else { 40  Thread.yield(); 41  } 42       } else if (windowStart < old.windowStart()) { 43            // 4.时间回拨了,正常状况下不会走到这里
44            return new WindowWrap<T>(windowLengthInMs, windowStart,newEmptyBucket()); 45  } 46  } 47 }

 

获取当前时间窗口对应的WindowWrap以后,就能够进行更新操做了。

// wrap.value().addPass(count);
public void addPass(int n) { add(MetricEvent.PASS, n); } // MetricBucket
public MetricBucket add(MetricEvent event, long n) { // 对应MetricEvent枚举中值
 counters[event.ordinal()].add(n); return this; }

到这里为止,整个指标统计流程就完成了,下面重点看下滑动窗口机制。

 

滑动窗口机制

时间窗口是用WindowWrap对象表示的,其属性以下:

private final long windowLengthInMs;  // 时间窗口的长度
private long windowStart; // 时间窗口开始时间
private T value; // MetricBucket对象,保存各个指标数据

 

sentinel时间基准由tick线程来作,每1ms更新一次时间基准,逻辑以下:

currentTimeMillis = System.currentTimeMillis(); Thread daemon = new Thread(new Runnable() { @Override public void run() { while (true) { currentTimeMillis = System.currentTimeMillis(); try { TimeUnit.MILLISECONDS.sleep(1); } catch (Throwable e) { } } } }); daemon.setDaemon(true); daemon.setName("sentinel-time-tick-thread"); daemon.start();

 

sentinel默认有每秒和每分钟的滑动窗口,对应的LeapArray类型,它们的初始化逻辑是:

protected int windowLengthInMs; // 单个滑动窗口时间值
protected int sampleCount; // 滑动窗口个数
protected int intervalInMs; // 周期值(至关于全部滑动窗口时间值之和)

public LeapArray(int sampleCount, int intervalInMs) { this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount); }

针对每秒滑动窗口,windowLengthInMs=500,sampleCount=2,intervalInMs=1000,针对每分钟滑动窗口,windowLengthInMs=1000,sampleCount=60,intervalInMs=60000,对应代码:

private transient volatile Metric rollingCounterInSecond = new ArrayMetric(2, 1000); private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000);

currentTimeMillis时间基准(tick线程)每1ms更新一次,经过currentWindow(timeMillis)方法获取当前时间点对应的WindowWrap对象,而后更新对应的各类指标,用于作限流、降级时使用。注意,当前时间基准对应的事件窗口初始化时lazy模式,而且会复用的。

 

Sentinel 底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据,能够很好地支撑写多于读的高并发场景。最后以一张图结束吧:

 

 

 往期精选 

以为文章不错,对你有所启发和帮助,但愿能转发给更多的小伙伴。若是有问题,请关注下面公众号,发送问题给我,多谢。
欢迎小伙伴 关注【TopCoder】阅读更多精彩好文。

 

原文出处:https://www.cnblogs.com/luoxn28/p/11109144.html

相关文章
相关标签/搜索