SphU.entry
,这明显是很关键的方法,下图的内容就是这里构建的-Sentinel工做主流程就包含在上面一个方法里,经过链式调用的方式,通过了创建树状结构,保存统计簇点,异常日志记录,实时数据统计,负载保护,权限认证,流量控制,熔断降级等Slothtml
Entry e = new CtEntry(resourceWrapper, chain, context); try { chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); }
HttpEventTask
类,它开启了一个线程,转么用来作为socket链接,控制台经过socket请求通知客户端,从而更新客户端规则,更改规则核心代码以下// Find the matching command handler. CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName); if (commandHandler != null) { CommandResponse<?> response = commandHandler.handle(request); handleResponse(response, printWriter, outputStream); } else { // No matching command handler. badRequest(printWriter, "Unknown command `" + commandName + '`'); }
经过命令模式,commandName为setRules时,更新规则node
sentinel-transport-netty-http
这个包和sentinel-transport-simple-http
处于同级,官方的例子用的simple-http,但明显它也准备了netty-http,因而我替换成了netty-http,运行后效果和原先同样,至于效率上有没有提高,我就不清楚了^_^FlowRuleChecker
,在core核心包中,核心检查方法以下private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }
DegradeRuleManager
,在core核心包,核心内容以下,再深刻就是它判断的算法了,感兴趣的本身去看以下的passCheck
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException { Set<DegradeRule> rules = degradeRules.get(resource.getName()); if (rules == null) { return; } for (DegradeRule rule : rules) { if (!rule.passCheck(context, node, count)) { throw new DegradeException(rule.getLimitApp(), rule); } } }
DefaultSlotChainBuilder
,构建了以下的slotpublic class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; } }
SlotChainProvider
中的构建方法以下private static void resolveSlotChainBuilder() { List<SlotChainBuilder> list = new ArrayList<SlotChainBuilder>(); boolean hasOther = false; for (SlotChainBuilder builder : LOADER) { if (builder.getClass() != DefaultSlotChainBuilder.class) { hasOther = true; list.add(builder); } } if (hasOther) { builder = list.get(0); } else { // No custom builder, using default. builder = new DefaultSlotChainBuilder(); } RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: " + builder.getClass().getCanonicalName()); }
LOADER
中加入了其余的非默认实现就能够替代原来的DefaultSlotChainBuilder
,那LOADER
怎么来的?看代码,以下的全局变量,也就是须要自定义实现SlotChainBuilder
接口的实现类private static final ServiceLoader<SlotChainBuilder> LOADER = ServiceLoader.load(SlotChainBuilder.class);
ServiceLoader
,也就是SPI
,全称Service Provider Interface
,加载它须要特定的配合,好比我自定义实现一个Slot
/** * @author laoliangliang * @date 2019/7/25 14:13 */ public class MySlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); //自定义的 chain.addLast(new CarerSlot()); return chain; } }
/** * @author laoliangliang * @date 2019/7/25 14:15 */ @Slf4j public class CarerSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { log.info(JSON.toJSONString(resourceWrapper)); fireEntry(context, resourceWrapper, node, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } }
CarerSlot
,那是否能被加载到呢?事实上还不够,须要在META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder
建这样一个文件,内容以下public class Env { public static final Sph sph = new CtSph(); static { // If init fails, the process will exit. InitExecutor.doInit(); } }
SphU.entry
的源码,就会发现,以下,该方法一进入不就是先获取Env的sph,再调用的entry吗,因此初始化的地方也就找到了,第一次调用SphU.entry
的地方,或者你不用这个,使用的注解,里面一样有这个方法public static Entry entry(String name) throws BlockException { return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0); }
SphU.entry
包裹能够实现熔断降级,经过注解的形式包裹代码方法应该是比较容易的,那么在哪里实现和配置的呢@Bean public SentinelResourceAspect sentinelResourceAspect() { pushlish(); return new SentinelResourceAspect(); }
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { }
对@SentinelResource
注解进行了处理git
Warm Up
模式不看算法细节,看它的中文说明应该就能理解是怎么回事了吧;所谓慢启动模式,要求系统的QPS请求增速不能超过必定的速率,不然会被压制超过部分请求失败,应该是为了不一启动就有大流量的请求进入致使系统一会儿就宕机卡主或直接进入了熔断@Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { long passQps = (long) node.passQps(); long previousQps = (long) node.previousPassQps(); syncToken(previousQps); // 开始计算它的斜率 // 若是进入了警惕线,开始调整他的qps long restToken = storedTokens.get(); if (restToken >= warningToken) { long aboveToken = restToken - warningToken; // 消耗的速度要比warning快,可是要比慢 // current interval = restToken*slope+1/count double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); if (passQps + acquireCount <= warningQps) { return true; } } else { if (passQps + acquireCount <= count) { return true; } } return false; }
今日视频:微服务架构实战160讲
欢迎关注公众号,一块儿学习进步