在SpringCloudGateway中官方默认提供了基于Redis的分布式限流方案,对于大部分的场景开箱即用。但实际应用场景下,针对不一样的业务场景可能须要进行定制化扩展,此时颇有必要了解其工做原理,从而更加快速有效的实现自定义扩展。html
此部分将经过3个层面逐步展开:java
既然是Gateway模块的源码分析,根据springboot源码分析的套路,从GatewayAutoConfiguration类着手逐步展开,在GatewayAutoConfiguration类中可以找到以下bean实例的注册web
@Bean(name = PrincipalNameKeyResolver.BEAN_NAME) @ConditionalOnBean(RateLimiter.class) public PrincipalNameKeyResolver principalNameKeyResolver() { return new PrincipalNameKeyResolver(); } @Bean @ConditionalOnBean({RateLimiter.class, KeyResolver.class}) public RequestRateLimiterGatewayFilterFactory requestRateLimiterGatewayFilterFactory(RateLimiter rateLimiter, PrincipalNameKeyResolver resolver) { return new RequestRateLimiterGatewayFilterFactory(rateLimiter, resolver); }
其中redis
不难发现两个bean实例的注册均依赖于 RateLimiter 实例,该接口定义了判断是否可以放行的isAllowed方法,以下:算法
public interface RateLimiter<C> extends StatefulConfigurable<C> { Mono<Response> isAllowed(String routeId, String id); ..... }
在默认配置中,能够在 GatewayRedisAutoConfiguration类中找到以下其Bean实例的默认装配,目前SpringCloudGateway分布式限流官方提供的正是基于redis的实现,以下spring
@Bean @ConditionalOnMissingBean public RedisRateLimiter redisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, @Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript, Validator validator) { return new RedisRateLimiter(redisTemplate, redisScript, validator); }
RedisRateLimiter 实例经过 @ConditionalOnMissingBean实现了条件注入,并不会被强制注入,其提供了自定义扩展的可能性。当前Bean实例依赖注入的 RedisScript实例,其指定了具体执行的lua脚本路径,express
@Bean @SuppressWarnings("unchecked") public RedisScript redisRequestRateLimiterScript() { DefaultRedisScript redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua"))); redisScript.setResultType(List.class); return redisScript; }
该脚本已经在对应的jar包中能够直接查看,其默认采用的是令牌桶算法。须要注意的是该bean实例并非条件注册的,而是默认强制注册。此时若是咱们须要对脚本进行简单的调整,能够添加一个新的 RedisScript 实例,同时从新注册 RedisRateLimiter 实例,并从新指定其依赖注入的RedisScript实例为定义的新实例便可。api
小节:
到这里基本已经清楚SpringCloudGateway基于Redis实现的分布式限流的核心组件以及对应的实现:缓存
Gateway中的限流目前是针对每一个路由单独定义的,在了解如何针对每一个路由定制化限流参数以前,须要先了解Gateway中是如何配置路由定位器的,从一个简单的application.yaml
配置角度入手,其定义以下:springboot
spring: cloud: gateway: routes: - id: consumer-service uri: http://127.0.0.1:8081 predicates: - Path=/consumer-service/** filters: - name: RequestRateLimiter args: key-resolver: "#{@userKeyResolver}" redis-rate-limiter.replenishRate: 5 redis-rate-limiter.burstCapacity: 10 - RewritePath=/consumer-service/(?<segment>.*), /$\{segment}
其中明确指定将采用限流过滤器 RequestRateLimiter并配置了3个主要参数。
此时再次把焦点放在 GatewayAutoConfiguration类,根据spring.cloud.gateway
前缀设定,上述 application.yaml中的配置项将绑定到 GatewayProperties实例中,
@Bean public GatewayProperties gatewayProperties() { return new GatewayProperties(); }
根据 GatewayProperties中的路由配置信息,将生成基于properties的路由定义定位器 PropertiesRouteDefinitionLocator
@Bean @ConditionalOnMissingBean public PropertiesRouteDefinitionLocator propertiesRouteDefinitionLocator(GatewayProperties properties) { return new PropertiesRouteDefinitionLocator(properties); }
默认状况下,系统还会注入一个基于内存的路由定义实例,以下 InMemoryRouteDefinitionRepository
@Bean @ConditionalOnMissingBean(RouteDefinitionRepository.class) public InMemoryRouteDefinitionRepository inMemoryRouteDefinitionRepository() { return new InMemoryRouteDefinitionRepository(); }
在实际开发中能够定义多个路由定义定位器(此部分也是一个常规的扩展点,好比经过DB获取路由定义等),并经过 CompositeRouteDefinitionLocator将全部的路由定义定位器信息进行组合合并,
@Bean @Primary public RouteDefinitionLocator routeDefinitionLocator(List<RouteDefinitionLocator> routeDefinitionLocators) { return new CompositeRouteDefinitionLocator(Flux.fromIterable(routeDefinitionLocators)); }
在Debug模式下能够看到 routeDefinitionLocators包含了上述两个路由定义实例,以下
基于路由配置定义便可实例化路由定位器,以下实例化RouteLocator的实现RouteDefinitionRouteLocatorr:
@Bean public RouteLocator routeDefinitionRouteLocator(GatewayProperties properties, List<GatewayFilterFactory> GatewayFilters, List<RoutePredicateFactory> predicates, RouteDefinitionLocator routeDefinitionLocator) { return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates, GatewayFilters, properties); }
其中将注入RouteDefinitionLocatorr实例以及GatewayPropertiesr实例,RouteDefinitionRouteLocatorr的构造函数以下:
public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator, List<RoutePredicateFactory> predicates, List<GatewayFilterFactory> gatewayFilterFactories, GatewayProperties gatewayProperties) { this.routeDefinitionLocator = routeDefinitionLocator; initFactories(predicates); gatewayFilterFactories.forEach(factory -> this.gatewayFilterFactories.put(factory.name(), factory)); this.gatewayProperties = gatewayProperties; }
目前来看构造函数中并无对routeDefinitionLocator 和gatewayProperties 进行过多的处理,其做用将会在下一小节分析中体现,
下一步会实例化CachingRouteLocator做为默认的RouteLocator实例,其会合并全部以前定义的RouteLocator实例,默认状况下仅有RouteDefinitionRouteLocator一个实现:
@Bean @Primary //TODO: property to disable composite? public RouteLocator cachedCompositeRouteLocator(List<RouteLocator> routeLocators) { return new CachingRouteLocator(new CompositeRouteLocator(Flux.fromIterable(routeLocators))); }
小节
如上在实例化路由定义相关bean实例时,仅有CachingRouteLocator(cachedCompositeRouteLocator)和CompositeRouteDefinitionLocator(routeDefinitionLocator)被@Primary
注解,故在后续的实际使用中注入的路由定义定位器和路由定位器即为CachingRouteLocator和CompositeRouteDefinitionLocator实例。
默认状况下,当Gateway接收到转发请求时,会被RoutePredicateHandlerMapping类接收处理,其中注入了RouteLocator对应的CachingRouteLocator实例,根据以前的分析,目前CachingRouteLocator实例中仅仅包含了一个RouteDefinitionRouteLocator实例,故其会执行RouteDefinitionRouteLocator下的getRoutes方法:
@Override public Flux<Route> getRoutes() { return this.routeDefinitionLocator.getRouteDefinitions() .map(this::convertToRoute) //TODO: error handling .map(route -> { if (logger.isDebugEnabled()) { logger.debug("RouteDefinition matched: " + route.getId()); } return route; }); }
此处的routeDefinitionLocator即为上述的CompositeRouteDefinitionLocator实例获取全部的路由定义,经过convertToRoute方法转换为实际路由对象,
private Route convertToRoute(RouteDefinition routeDefinition) { AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition); List<GatewayFilter> gatewayFilters = getFilters(routeDefinition); return Route.async(routeDefinition) .asyncPredicate(predicate) .replaceFilters(gatewayFilters) .build(); }
此处有两个核心方法combinePredicates和getFilters方法,此处咱们重点关注getFilters方法的定义,
private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) { List<GatewayFilter> filters = new ArrayList<>(); //TODO: support option to apply defaults after route specific filters? if (!this.gatewayProperties.getDefaultFilters().isEmpty()) { filters.addAll(loadGatewayFilters("defaultFilters", this.gatewayProperties.getDefaultFilters())); } if (!routeDefinition.getFilters().isEmpty()) { filters.addAll(loadGatewayFilters(routeDefinition.getId(), routeDefinition.getFilters())); } AnnotationAwareOrderComparator.sort(filters); return filters; }
如上代码所示,getFilters方法调用loadGatewayFilters方法从gatewayProperties和routeDefinition中采集全部的filter配置(如上application.yaml
示例,定义了2个filter),来观察loadGatewayFilters的定义
private List<GatewayFilter> loadGatewayFilters(String id, List<FilterDefinition> filterDefinitions) { List<GatewayFilter> filters = filterDefinitions.stream() .map(definition -> { // 对应了yaml中的name定义,经过name便可获取对应的GatewayFilterFactory,gatewayFilterFactories中存储了全部实例化的GatewayFilterFactory实例 GatewayFilterFactory factory = this.gatewayFilterFactories.get(definition.getName()); if (factory == null) { throw new IllegalArgumentException("Unable to find GatewayFilterFactory with name " + definition.getName()); } Map<String, String> args = definition.getArgs(); if (logger.isDebugEnabled()) { logger.debug("RouteDefinition " + id + " applying filter " + args + " to " + definition.getName()); } //根据定义的args参数转换为键值对,若是是#{***}格式的value则会转换为对应的Bean实例 Map<String, Object> properties = factory.shortcutType().normalize(args, factory, this.parser, this.beanFactory); // 对应GatewayFilterFactory中定义的Config类的默认值 Object configuration = factory.newConfig(); // 绑定属性到GatewayFilterFactory中定义的Config类 ConfigurationUtils.bind(configuration, properties, factory.shortcutFieldPrefix(), definition.getName(), validator); //配置GatewayFilterFactory GatewayFilter gatewayFilter = factory.apply(configuration); // 发布FilterArgsEvent事件,通知监听者绑定properties参数,id为当前route的id属性 if (this.publisher != null) { this.publisher.publishEvent(new FilterArgsEvent(this, id, properties)); } return gatewayFilter; }) .collect(Collectors.toList()); ArrayList<GatewayFilter> ordered = new ArrayList<>(filters.size()); for (int i = 0; i < filters.size(); i++) { GatewayFilter gatewayFilter = filters.get(i); if (gatewayFilter instanceof Ordered) { ordered.add(gatewayFilter); } else { ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1)); } } return ordered; }
Map<String, String> args = definition.getArgs();
便可获取对应的参数,以下图能够看到在application.yaml
中定义的3个参数,
args又是如何被绑定到配置实例的呢?全部的GatewayFilterFactory均实现了ShortcutConfigurable接口,ShortcutConfigurable中定义了解析上述参数的方法,
String key = normalizeKey(entry.getKey(), entryIdx, shortcutConf, args); Object value = getValue(parser, beanFactory, entry.getValue());
此部分为核心实现,在getValue方法中能够看到对以#{
开头和}
结果的value值将经过beanFactory获取对应的bean实例
if (rawValue != null && rawValue.startsWith("#{") && entryValue.endsWith("}")) { // assume it's spel StandardEvaluationContext context = new StandardEvaluationContext(); context.setBeanResolver(new BeanFactoryResolver(beanFactory)); Expression expression = parser.parseExpression(entryValue, new TemplateParserContext()); value = expression.getValue(context); }
此处很是关键,此方式提供了在application.yaml
经过变量定义便可决定具体采用哪一个Bean实例的能力,如上在实际开发应用中将经过userKeyResolver替换默认注册的principalNameKeyResolver做为KeyResolver实例。
借助ConfigurationUtils类中提供的bind方法将对应的属性绑定到RequestRateLimiterGatewayFilterFactory.Config类,
new Binder(new MapConfigurationPropertySource(properties)) .bind(configurationPropertyName, Bindable.ofInstance(toBind));
根据application.yaml
中的定义,此处会调用setKeyResolver绑定自定义的KeyResolver键定义bean实例(此处除了keyResolver
,rateLimiter
一样提供了相似的自定义配置能力)
public static class Config { private KeyResolver keyResolver; private RateLimiter rateLimiter; private HttpStatus statusCode = HttpStatus.TOO_MANY_REQUESTS; ..... public Config setKeyResolver(KeyResolver keyResolver) { this.keyResolver = keyResolver; return this; } ..... }
经过GatewayFilter gatewayFilter = factory.apply(configuration);
将调用RequestRateLimiterGatewayFilterFactory中的apply方法:
public GatewayFilter apply(Config config) { KeyResolver resolver = (config.keyResolver == null) ? defaultKeyResolver : config.keyResolver; RateLimiter<Object> limiter = (config.rateLimiter == null) ? defaultRateLimiter : config.rateLimiter; return (exchange, chain) -> {.... }; }
其中能够看到将来实际应用的KeyResolver 和RateLimiter取值逻辑,其会优先从Config中提取,若是没有任何自定义则直接采用默认值,默认值的设定已经在本章开头介绍过。
不难发现,咱们自定义的3个参数仅仅有keyResolver被成功赋值,那么剩下的两个参数呢,又是如何配置绑定?继续往下看
this.publisher.publishEvent(new FilterArgsEvent(this, id, properties));
此处发布了FilterArgsEvent事件,其中包含了全部的转换后的全部args
配置,以下观察AbstractRateLimiter类,其实现了ApplicationListener接口,并监听FilterArgsEvent事件,
public abstract class AbstractRateLimiter<C> extends AbstractStatefulConfigurable<C> implements RateLimiter<C>, ApplicationListener<FilterArgsEvent> { ..... @Override public void onApplicationEvent(FilterArgsEvent event) { Map<String, Object> args = event.getArgs(); if (args.isEmpty() || !hasRelevantKey(args)) { return; } String routeId = event.getRouteId(); C routeConfig = newConfig(); ConfigurationUtils.bind(routeConfig, args, configurationPropertyName, configurationPropertyName, validator); getConfig().put(routeId, routeConfig); } .. }
AbstractRateLimiter类是抽象类,此处真正使用的是RedisRateLimiter类,其除了最核心的isAllowed方法,还有以下参数配置定义
@ConfigurationProperties("spring.cloud.gateway.redis-rate-limiter") public class RedisRateLimiter extends AbstractRateLimiter<RedisRateLimiter.Config> implements ApplicationContextAware { @Validated public static class Config { @Min(1) private int replenishRate; @Min(1) private int burstCapacity = 1; ...... } }
根据spring.cloud.gateway.redis-rate-limiter
为前缀,replenishRate
和burstCapacity
值绑定过程定义在AbstractRateLimiter抽象类中
public void onApplicationEvent(FilterArgsEvent event) { Map<String, Object> args = event.getArgs(); if (args.isEmpty() || !hasRelevantKey(args)) { return; } String routeId = event.getRouteId(); C routeConfig = newConfig(); ConfigurationUtils.bind(routeConfig, args, configurationPropertyName, configurationPropertyName, validator); getConfig().put(routeId, routeConfig); }
绑定方式仍然是采用的ConfigurationUtils工具类,最后一行将routeId
做为了键,routeConfig
做为value值存储在Map中,故后续在isAllowed方法中将直接根据routeId
取出当前routeConfig
配置,同时也避免了每次请求均须要加载路由参数的配置(同理,CachingRouteLocator中也定义了对应的Map来缓存路由信息),仅有首次请求须要加载。最后来看看isAllowed方法定义:
public Mono<Response> isAllowed(String routeId, String id) { if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig); if (routeConfig == null) { throw new IllegalArgumentException("No Configuration found for route " + routeId); } // How many requests per second do you want a user to be allowed to do? int replenishRate = routeConfig.getReplenishRate(); // How much bursting do you want to allow? int burstCapacity = routeConfig.getBurstCapacity(); try { List<String> keys = getKeys(id); // The arguments to the LUA script. time() returns unixtime in seconds. List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1"); // allowed, tokens_left = redis.eval(SCRIPT, keys, args) Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); // .log("redisratelimiter", Level.FINER); return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) .reduce(new ArrayList<Long>(), (longs, l) -> { longs.addAll(l); return longs; }) .map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft)); if (log.isDebugEnabled()) { log.debug("response: " + response); } return response; }); } catch (Exception e) log.error("Error determining if user allowed from redis", e); } return Mono.just(new Response(true, getHeaders(routeConfig, -1L))); }
其中自定义参数经过routeId
便可从上一个步骤的getConfig()
中提取,最终经过执行lua脚原本判断是否可以放行。
小节
经过对请求的处理过程解析,能够看到其实际是分析了自定义参数如何被绑定到对应的配置实例。此处虽然仅仅是分析了RequestRateLimiterGatewayFilterFactory的相关参数绑定原理,但在SpringCloudGateway中全部的过滤器均遵循同样的执行流程以及数据绑定模式。
在CachingRouteLocator中能够看到以下代码段
@EventListener(RefreshRoutesEvent.class) /* for testing */ void handleRefresh() { refresh(); }
其监听RefreshRoutesEvent事件,而后执行路由器配置缓存的刷新操做。该事件的发布能够经过GatewayControllerEndpoint提供的refresh
来完成
@PostMapping("/refresh") public Mono<Void> refresh() { this.publisher.publishEvent(new RefreshRoutesEvent(this)); return Mono.empty(); }
同理在CachingRouteDefinitionLocator中也会同步监听该事件。此处须要特别注意,该端点依赖于spring-boot-starter-actuator
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
同时须要在配置文件中暴露gateway
端点信息
management: endpoint: gateway: enabled: true endpoints: web: exposure: include: ["health","info","gateway"]
更多能够参考官方文档。
经过本章的4部分介绍,不管是对rateLimiter过滤器进行定制化,亦或是对其余的过滤器定制化,甚至是添加彻底自定义的过滤器均会有指导性的做用。其主体的执行流程与配置模式基本是固定的。