原文连接:www.cnblogs.com/luozhiyun/p…html
这一篇我仍是继续上一篇没有讲完的内容,先上一个例子:java
private static final int threadCount = 100;
public static void main(String[] args) {
initFlowRule();
for (int i = 0; i < threadCount; i++) {
Thread entryThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Entry methodA = null;
try {
TimeUnit.MILLISECONDS.sleep(5);
methodA = SphU.entry("methodA");
} catch (BlockException e1) {
// Block exception
} catch (Exception e2) {
// biz exception
} finally {
if (methodA != null) {
methodA.exit();
}
}
}
}
});
entryThread.setName("working thread");
entryThread.start();
}
}
private static void initFlowRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource("methodA");
// set limit concurrent thread for 'methodA' to 20
rule1.setCount(20);
rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
复制代码
我先把例子放上来node
Entry methodA = null;
try {
methodA = SphU.entry("methodA");
// dosomething
} catch (BlockException e1) {
block.incrementAndGet();
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (methodA != null) {
methodA.exit();
}
}
复制代码
咱们先进入到entry方法里面: SphU#entryc#
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
复制代码
这个方法里面会调用Env的sph静态方法,咱们进入到Env里面看看api
public class Env {
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.
InitExecutor.doInit();
}
}
复制代码
这个方法初始化的时候会调用InitExecutor.doInit()
InitExecutor#doInit数组
public static void doInit() {
//InitExecutor只会初始化一次,而且初始化失败会退出
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
//经过spi加载InitFunc子类,默认是MetricCallbackInit
ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class);
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : loader) {
RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
//因为这里只有一个loader里面只有一个子类,那么直接就返回initList里面包含一个元素的集合
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
//这里调用MetricCallbackInit的init方法
w.func.init();
RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
w.func.getClass().getCanonicalName(), w.order));
}
} catch (Exception ex) {
RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
ex.printStackTrace();
} catch (Error error) {
RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
error.printStackTrace();
}
}
复制代码
这个方法主要是经过spi加载InitFunc 的子类,默认是MetricCallbackInit。 而后会将MetricCallbackInit封装成OrderWrapper实例,而后遍历,调用 MetricCallbackInit的init方法:安全
MetricCallbackInit#initbash
public void init() throws Exception {
//添加回调函数
//key是com.alibaba.csp.sentinel.metric.extension.callback.MetricEntryCallback
StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(),
new MetricEntryCallback());
//key是com.alibaba.csp.sentinel.metric.extension.callback.MetricExitCallback
StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(),
new MetricExitCallback());
}
复制代码
这个init方法就是注册了两个回调实例MetricEntryCallback和MetricExitCallback。并发
而后会经过调用Env.sph.entry
会最后调用到CtSph的entry方法:app
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
//这里name是Resource,type是out
StringResourceWrapper resource = new StringResourceWrapper(name, type);
//count是1 ,args是一个空数组
return entry(resource, count, args);
}
复制代码
这个方法会将resource和type封装成StringResourceWrapper实例,而后调用entry重载方法追踪到CtSph的entryWithPriority方法。
//这里传入得参数count是1,prioritized=false,args是容量为1的空数组
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
//获取当前线程的上下文
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context);
}
//为空的话,建立一个默认的context
if (context == null) { //1
// Using default context.
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}
// Global switch is close, no rule checking will do.
if (!Constants.ON) {//这里会返回false
return new CtEntry(resourceWrapper, null, context);
}
//2
//建立一系列功能插槽
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */
//若是超过了插槽的最大数量,那么会返回null
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
//3
//调用责任链
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
复制代码
这个方法是最核心的方法,主要作了三件事:
在讲建立context以前咱们先看一下ContextUtil这个类初始化的时候会作什么
ContextUtil
/** * Holds all {@link EntranceNode}. Each {@link EntranceNode} is associated with a distinct context name. */
private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
static {
// Cache the entrance node for default context.
initDefaultContext();
}
private static void initDefaultContext() {
String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
//初始化一个sentinel_default_context,type为in的队形
EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
//Constants.ROOT会初始化一个name是machine-root,type=IN的对象
Constants.ROOT.addChild(node);
//因此如今map里面有一个key=CONTEXT_DEFAULT_NAME的对象
contextNameNodeMap.put(defaultContextName, node);
}
复制代码
ContextUtil在初始化的时候会先调用initDefaultContext方法。经过Constants.ROOT
建立一个root节点,而后将建立的node做为root的子节点入队,而后将node节点put到contextNameNodeMap中 结构以下:
Constants.ROOT:
machine-root(EntryType#IN)
/
/
sentinel_default_context(EntryType#IN)
复制代码
如今咱们再回到entryWithPriority方法中:
if (context == null) {//1
// Using default context.
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}
复制代码
若是context为空,那么会调用MyContextUtil.myEnter
建立一个新的context,这个方法最后会调用到ContextUtil.trueEnter
方法中进行建立。
protected static Context trueEnter(String name, String origin) {
Context context = contextHolder.get();
if (context == null) {
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
//若是为null的话,检查contextNameNodeMap的size是否是超过2000
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
// 重复initDefaultContext方法的内容
try {
LOCK.lock();
node = contextNameNodeMap.get(name);
if (node == null) {
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// Add entrance node.
Constants.ROOT.addChild(node);
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}
return context;
}
复制代码
在trueEnter方法中会作一个校验,若是contextNameNodeMap中的数量已经超过了2000,那么会返回一个NULL_CONTEXT。因为咱们在initDefaultContext中已经初始化过了node节点,因此这个时候直接根据name获取node节点放入到contextHolder中。
建立完了context以后咱们再回到entryWithPriority方法中继续往下走:
//建立一系列功能插槽
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
复制代码
经过调用lookProcessChain方法会建立功能插槽
CtSph#lookProcessChain
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
//根据resourceWrapper初始化插槽
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.最大插槽数量为6000
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
//初始化新的插槽
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
复制代码
这里会调用SlotChainProvider.newSlotChain
进行插槽的初始化。
SlotChainProvider#newSlotChain
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
//根据spi初始化slotChainBuilder,默认是DefaultSlotChainBuilder
resolveSlotChainBuilder();
if (slotChainBuilder == null) {
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
}
return slotChainBuilder.build();
}
复制代码
默认调用DefaultSlotChainBuilder的build方法进行初始化
DefaultSlotChainBuilder#build
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
//建立Node节点
chain.addLast(new NodeSelectorSlot());
//用于构建资源的 ClusterNode
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
//用于统计实时的调用数据
chain.addLast(new StatisticSlot());
//用于对入口的资源进行调配
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
//用于限流
chain.addLast(new FlowSlot());
//用于降级
chain.addLast(new DegradeSlot());
return chain;
}
复制代码
DefaultProcessorSlotChain里面会建立一个头节点,而后把其余节点经过addLast串成一个链表:
最后咱们再回到CtSph的entryWithPriority方法中,往下走调用chain.entry
方法触发调用链。
在往下看Slot插槽以前,咱们先总结一下Context是怎样的一个结构:
在Sentinel中,全部的统计操做都是基于context来进行的。context会经过ContextUtil的trueEnter方法进行建立,会根据context的不一样的name来组装不一样的Node来实现数据的统计。
在通过NodeSelectorSlot的时候会根据传入的不一样的context的name字段来获取不一样的DefaultNode对象,而后设置到context的curEntry实例的curNode属性中。
NodeSelectorSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
//设置到context的curEntry实例的curNode属性中
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
复制代码
而后再通过ClusterBuilderSlot槽位在初始化的时候会初始化一个静态的全局clusterNodeMap用来记录全部的ClusterNode,维度是ResourceWrapper。每次调用entry方法的时候会先去全局的clusterNodeMap,找不到就会建立一个新的clusterNode,放入到node的ClusterNode属性中,用来统计ResourceWrapper维度下面的全部数据。
//此变量是静态的,因此只会初始化一次,存有全部的ResourceWrapper维度下的数据
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode();
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
复制代码
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
try {
//先直接往下调用,若是没有报错则进行统计
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
//当前线程数加1
// Request passed, add thread count and pass count.
node.increaseThreadNum();
//经过的请求加上count
node.addPassRequest(count);
...
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
...
} catch (BlockException e) {
//设置错误信息
// Blocked, set block exception to current entry.
context.getCurEntry().setError(e);
...
//设置被阻塞的次数
// Add block count.
node.increaseBlockQps(count);
...
throw e;
} catch (Throwable e) {
// Unexpected error, set error to current entry.
context.getCurEntry().setError(e);
//设置异常的次数
// This should not happen.
node.increaseExceptionQps(count);
...
throw e;
}
}
复制代码
这段代码中,我把不相关的代码都省略了,不影响咱们的主流程。 在entry方法里面,首先是往下继续调用,根据其余的节点的状况来进行统计,好比抛出异常,那么就统计ExceptionQps,被阻塞那么就统计BlockQps,直接经过,那么就统计PassRequest。
咱们先看一下线程数是如何统计的:node.increaseThreadNum()
DefaultNode#increaseThreadNum 咱们先看一下DefaultNode的继承关系:
public void increaseThreadNum() {
super.increaseThreadNum();
this.clusterNode.increaseThreadNum();
}
复制代码
因此super.increaseThreadNum
是调用到了父类的increaseThreadNum方法。
this.clusterNode.increaseThreadNum()
这句代码和super.increaseThreadNum
是同样的使用方式,因此看看StatisticNode的increaseThreadNum方法就行了
StatisticNode#increaseThreadNum
private LongAdder curThreadNum = new LongAdder();
public void decreaseThreadNum() {
curThreadNum.increment();
}
复制代码
这个方法很简单,每次都直接使用LongAdder的api加1就行了,最后会在退出的时候减1,使用LongAdder也保证了原子性。
若是请求经过的时候会继续往下调用node.addPassRequest
:
DefaultNode#addPassRequest
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
复制代码
这句代码也是调用了StatisticNode的addPassRequest方法进行统计的。
StatisticNode#addPassRequest
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
复制代码
这段代码里面有两个调用,一个是按分钟统计的,一个是按秒统计的。由于咱们这里是使用的FlowRuleManager因此是会记录按分钟统计的。具体是怎么初始化,以及怎么打印统计日志的能够看看我上一篇分析:1.Sentinel源码分析—FlowRuleManager加载规则作了什么?,我这里再也不赘述。
因此咱们直接看看rollingCounterInMinute.addPass(count)
这句代码就行了,这句代码会直接调用ArrayMetric的addPass方法。
ArrayMetric#addPass
public void addPass(int count) {
//获取当前的时间窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
//窗口内的pass加1
wrap.value().addPass(count);
}
复制代码
这里会首先调用currentWindow获取当前的时间窗口WindowWrap,而后调用调用窗口内的MetricBucket的addPass方法加1,我继续拿我上一篇文章的图过来讲明:
我面来到MetricBucket的addPass方法: MetricBucket#addPass
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
复制代码
addPass方法会使用枚举类而后将counters数组内的pass槽位的值加n;counters数组是LongAdder数组,因此也不会有线程安全问题。
node.increaseBlockQps和node.increaseExceptionQps代码也是同样的,你们能够自行去看看。
FlowSlot能够根据预先设置的规则来判断一个请求是否应该被经过。
FlowSlot
private final FlowRuleChecker checker;
public FlowSlot() {
this(new FlowRuleChecker());
}
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
复制代码
FlowSlot在实例化的时候会设置一个规则检查器,而后在调用entry方法的时候会调用规则检查器的checkFlow方法
咱们进入到FlowRuleChecker的checkFlow 方法中: FlowRuleChecker#checkFlow
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
//返回FlowRuleManager里面注册的全部规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
//若是当前的请求不能经过,那么就抛出FlowException异常
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
// Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
复制代码
checkFlow这个方法就是过去全部的规则而后根据规则进行过滤。主要的过滤操做是在canPassCheck中进行的。
FlowRuleChecker#canPassCheck
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
//若是没有设置limitapp,那么不进行校验,默认会给个defualt
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
//集群模式
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
//本地模式
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
复制代码
这个方法首先会校验limitApp,而后判断是集群模式仍是本地模式,咱们这里暂时分析本地模式。
FlowRuleChecker#passLocalCheck
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
//节点选择
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
//根据设置的规则来拦截
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
复制代码
本地模式中,首先会调用selectNodeByRequesterAndStrategy进行节点选择,根据不一样的模式选择不一样的节点,而后调用规则控制器的canPass方法进行拦截。
FlowRuleChecker#selectNodeByRequesterAndStrategy
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
// The limit app should not be empty.
String limitApp = rule.getLimitApp();
//关系限流策略
int strategy = rule.getStrategy();
String origin = context.getOrigin();
//origin不为`default` or `other`,而且limitApp和origin相等
if (limitApp.equals(origin) && filterOrigin(origin)) {//1
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
//关系限流策略为关联或者链路的处理
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {//2
if (strategy == RuleConstant.STRATEGY_DIRECT) {
//这里返回ClusterNode,表示全部应用对该资源的全部请求状况
// Return the cluster node.
return node.getClusterNode();
}
//关系限流策略为关联或者链路的处理
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {//3
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
//关系限流策略为关联或者链路的处理
return selectReferenceNode(rule, context, node);
}
return null;
}
复制代码
这个方法主要是用来根据控制根据不一样的规则,获取不一样的node进行数据的统计。
我这里引用官方文档的一段话进行解释:
default:表示不区分调用者,来自任何调用者的请求都将进行限流统计。若是这个资源名的调用总和超过了这条规则定义的阈值,则触发限流。
{some_origin_name}:表示针对特定的调用者,只有来自这个调用者的请求才会进行流量控制。例如 NodeA 配置了一条针对调用者caller1的规则,那么当且仅当来自 caller1 对 NodeA 的请求才会触发流量控制。
other:表示针对除 {some_origin_name} 之外的其他调用方的流量进行流量控制。例如,资源NodeA配置了一条针对调用者 caller1 的限流规则,同时又配置了一条调用者为 other 的规则,那么任意来自非 caller1 对 NodeA 的调用,都不能超过 other 这条规则定义的阈值
同一个资源名能够配置多条规则,规则的生效顺序为:{some_origin_name} > other > default
复制代码
而后返回到passLocalCheck方法中,继续往下走,调用rule.getRater()
,咱们这里没有指定特殊的rater,因此返回的是DefaultController。
DefaultController#canPass
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//判断是限流仍是限制并发数量,而后获取流量或并发数量
int curCount = avgUsedTokens(node);
//若是二者相加大于限定的并发数
if (curCount + acquireCount > count) {
...
return false;
}
return true;
}
复制代码
这里首先调用avgUsedTokens,根据grade判断当前的规则是QPS限流仍是线程数限流,若是二者之和大于count,那么返回false。
返回false以后会回到FlowRuleChecker的checkFlow方法,抛出FlowException异常。
到这里Sentinel的主流程就分析完毕了。