原文:https://juejin.im/entry/5bd491c85188255ac2629bef?utm_source=coffeephp.comphp
在分布式领域,咱们不免会遇到并发量突增,对后端服务形成高压力,严重甚至会致使系统宕机。为避免这种问题,咱们一般会为接口添加限流、降级、熔断等能力,从而使接口更为健壮。Java领域常见的开源组件有Netflix的hystrix,阿里系开源的sentinel等,都是蛮不错的限流熔断框架。前端
今天咱们就基于Redis组件的特性,实现一个分布式限流组件,名字就定为shield-ratelimiter。
java
首先解释下为什么采用Redis做为限流组件的核心。node
通俗地讲,假设一个用户(用IP判断)每秒访问某服务接口的次数不能超过10次,那么咱们能够在Redis中建立一个键,并设置键的过时时间为60秒。git
当一个用户对此服务接口发起一次访问就把键值加1,在单位时间(此处为1s)内当键值增长到10的时候,就禁止访问服务接口。PS:在某种场景中添加访问时间间隔仍是颇有必要的。咱们本次不考虑间隔时间,只关注单位时间内的访问次数。github
原理已经讲过了,说下需求。redis
基于上述需求,咱们决定基于注解方式进行核心功能开发,基于Spring-boot-starter做为基础环境,从而可以很好的适配Spring环境。spring
另外,在本次开发中,咱们不经过简单的调用Redis的java类库API实现对Redis的incr操做。后端
缘由在于,咱们要保证整个限流的操做是原子性的,若是用Java代码去作操做及判断,会有并发问题。这里我决定采用Lua脚本进行核心逻辑的定义。springboot
在正式开发前,我简单介绍下对Redis的操做中,为什么推荐使用Lua脚本。
Redis添加了对Lua的支持,可以很好的知足原子性、事务性的支持,让咱们免去了不少的异常逻辑处理。对于Lua的语法不是本文的主要内容,感兴趣的能够自行查找资料。
到这里,咱们正式开始手写限流组件的进程。
项目基于maven构建,主要依赖Spring-boot-starter,咱们主要在springboot上进行开发,所以自定义的开发包能够直接依赖下面这个坐标,方便进行包管理。版本号自行选择稳定版。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>1.4.2.RELEASE</version> </dependency>
因为咱们是基于Redis进行的限流操做,所以须要整合Redis的类库,上面已经讲到,咱们是基于Springboot进行的开发,所以这里能够直接整合RedisTemplate。
这里咱们引入spring-boot-starter-redis的依赖。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> <version>1.4.2.RELEASE</version> </dependency>
新建一个Redis的配置类,命名为RedisCacheConfig,使用javaconfig形式注入CacheManager及RedisTemplate。为了操做方便,咱们采用了Jackson进行序列化。代码以下
@Configuration @EnableCaching public class RedisCacheConfig { private static final Logger LOGGER = LoggerFactory.getLogger(RedisCacheConfig.class); @Bean public CacheManager cacheManager(RedisTemplate<?, ?> redisTemplate) { CacheManager cacheManager = new RedisCacheManager(redisTemplate); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Springboot Redis cacheManager 加载完成"); } return cacheManager; } @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式) Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(mapper); template.setValueSerializer(serializer); //使用StringRedisSerializer来序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); template.afterPropertiesSet(); LOGGER.info("Springboot RedisTemplate 加载完成"); return template; } }
注意 要使用 @Configuration 标注此类为一个配置类,固然你可使用 @Component, 可是不推荐,缘由在于 @Component 注解虽然也能够看成配置类,可是并不会为其生成CGLIB代理Class,而使用@Configuration,CGLIB会为其生成代理类,进行性能的提高。
咱们的包开发完毕以后,调用方的application.properties须要进行相关配置以下:
#单机模式redis spring.redis.host=127.0.0.1 spring.redis.port=6379 spring.redis.pool.maxActive=8 spring.redis.pool.maxWait=-1 spring.redis.pool.maxIdle=8 spring.redis.pool.minIdle=0 spring.redis.timeout=10000 spring.redis.password=
若是有密码的话,配置password便可。
这里为单机配置,若是须要支持哨兵集群,则配置以下,Java代码不须要改动,只须要变更配置便可。注意 两种配置不能共存!
#哨兵集群模式 # database name spring.redis.database=0 # server password 密码,若是没有设置可不配 spring.redis.password= # pool settings ...池配置 spring.redis.pool.max-idle=8 spring.redis.pool.min-idle=0 spring.redis.pool.max-active=8 spring.redis.pool.max-wait=-1 # name of Redis server 哨兵监听的Redis server的名称 spring.redis.sentinel.master=mymaster # comma-separated list of host:port pairs 哨兵的配置列表 spring.redis.sentinel.nodes=127.0.0.1:26379,127.0.0.1:26479,127.0.0.1:26579
为了调用方便,咱们定义一个名为RateLimiter 的注解,内容以下
/** * @author snowalker * @version 1.0 * @date 2018/10/27 1:25 * @className RateLimiter * @desc 限流注解 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RateLimiter { /** * 限流key * @return */ String key() default "rate:limiter"; /** * 单位时间限制经过请求数 * @return */ long limit() default 10; /** * 过时时间,单位秒 * @return */ long expire() default 1; }
该注解明确只用于方法,主要有三个属性。
expire–incr的值的过时时间,业务中表示限流的单位时间。
定义好注解后,须要开发注解使用的切面,这里咱们直接使用aspectj进行切面的开发。先看代码
@Aspect @Component public class RateLimterHandler { private static final Logger LOGGER = LoggerFactory.getLogger(RateLimterHandler.class); @Autowired RedisTemplate redisTemplate; private DefaultRedisScript<Long> getRedisScript; @PostConstruct public void init() { getRedisScript = new DefaultRedisScript<>(); getRedisScript.setResultType(Long.class); getRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("rateLimter.lua"))); LOGGER.info("RateLimterHandler[分布式限流处理器]脚本加载完成"); }
这里是注入了RedisTemplate,使用其API进行Lua脚本的调用。
init() 方法在应用启动时会初始化DefaultRedisScript,并加载Lua脚本,方便进行调用。
PS: Lua脚本放置在classpath下,经过ClassPathResource进行加载。
@Pointcut("@annotation(com.snowalker.shield.ratelimiter.core.annotation.RateLimiter)") public void rateLimiter() {}
这里咱们定义了一个切点,表示只要注解了 @RateLimiter 的方法,都可以触发限流操做。
@Around("@annotation(rateLimiter)") public Object around(ProceedingJoinPoint proceedingJoinPoint, RateLimiter rateLimiter) throws Throwable { if (LOGGER.isDebugEnabled()) { LOGGER.debug("RateLimterHandler[分布式限流处理器]开始执行限流操做"); } Signature signature = proceedingJoinPoint.getSignature(); if (!(signature instanceof MethodSignature)) { throw new IllegalArgumentException("the Annotation @RateLimter must used on method!"); } /** * 获取注解参数 */ // 限流模块key String limitKey = rateLimiter.key(); Preconditions.checkNotNull(limitKey); // 限流阈值 long limitTimes = rateLimiter.limit(); // 限流超时时间 long expireTime = rateLimiter.expire(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("RateLimterHandler[分布式限流处理器]参数值为-limitTimes={},limitTimeout={}", limitTimes, expireTime); } /** * 执行Lua脚本 */ List<String> keyList = new ArrayList(); // 设置key值为注解中的值 keyList.add(limitKey); /** * 调用脚本并执行 */ Long result = (Long) redisTemplate.execute(getRedisScript, keyList, expireTime, limitTimes); if (result == 0) { String msg = "因为超过单位时间=" + expireTime + "-容许的请求次数=" + limitTimes + "[触发限流]"; LOGGER.debug(msg); return "false"; } if (LOGGER.isDebugEnabled()) { LOGGER.debug("RateLimterHandler[分布式限流处理器]限流执行结果-result={},请求[正常]响应", result); } return proceedingJoinPoint.proceed(); } }
这段代码的逻辑为,获取 @RateLimiter 注解配置的属性:key、limit、expire,并经过 redisTemplate.execute(RedisScript script, List keys, Object… args) 方法传递给Lua脚本进行限流相关操做,逻辑很清晰。
这里咱们定义若是脚本返回状态为0则为触发限流,1表示正常请求。
这里是咱们整个限流操做的核心,经过执行一个Lua脚本进行限流的操做。脚本内容以下
--获取KEY local key1 = KEYS[1] local val = redis.call('incr', key1) local ttl = redis.call('ttl', key1) --获取ARGV内的参数并打印 local expire = ARGV[1] local times = ARGV[2] redis.log(redis.LOG_DEBUG,tostring(times)) redis.log(redis.LOG_DEBUG,tostring(expire)) redis.log(redis.LOG_NOTICE, "incr "..key1.." "..val); if val == 1 then redis.call('expire', key1, tonumber(expire)) else if ttl == -1 then redis.call('expire', key1, tonumber(expire)) end end if val > tonumber(times) then return 0 end return 1
逻辑很通俗,我简单介绍下。
当过时后,又是新的一轮循环,整个过程是一个原子性的操做,可以保证单位时间不会超过咱们预设的请求阈值。
到这里咱们即可以在项目中进行测试。
这里我贴一下核心代码,咱们定义一个接口,并注解 @RateLimiter(key = “ratedemo:1.0.0”, limit = 5, expire = 100) 表示模块ratedemo:sendPayment:1.0.0
在100s内容许经过5个请求,这里的参数设置是为了方便看结果。实际中,咱们一般会设置1s内容许经过的次数。
@Controller public class TestController { private static final Logger LOGGER = LoggerFactory.getLogger(TestController.class); @ResponseBody @RequestMapping("ratelimiter") @RateLimiter(key = "ratedemo:1.0.0", limit = 5, expire = 100) public String sendPayment(HttpServletRequest request) throws Exception { return "正常请求"; } }
咱们经过RestClient请求接口,日志返回以下:
2018-10-28 00:00:00.602 DEBUG 17364 --- [nio-8888-exec-1] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]开始执行限流操做 2018-10-28 00:00:00.688 DEBUG 17364 --- [nio-8888-exec-1] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应 2018-10-28 00:00:00.860 DEBUG 17364 --- [nio-8888-exec-3] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]开始执行限流操做 2018-10-28 00:00:01.183 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]开始执行限流操做 2018-10-28 00:00:01.520 DEBUG 17364 --- [nio-8888-exec-3] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应 2018-10-28 00:00:01.521 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应 2018-10-28 00:00:01.557 DEBUG 17364 --- [nio-8888-exec-5] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]开始执行限流操做 2018-10-28 00:00:01.558 DEBUG 17364 --- [nio-8888-exec-5] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应 2018-10-28 00:00:01.774 DEBUG 17364 --- [nio-8888-exec-7] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]开始执行限流操做 2018-10-28 00:00:02.111 DEBUG 17364 --- [nio-8888-exec-8] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]开始 2018-10-28 00:00:02.169 DEBUG 17364 --- [nio-8888-exec-7] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]限流执行结果-result=1,请求[正常]响应 2018-10-28 00:00:02.169 DEBUG 17364 --- [nio-8888-exec-8] c.s.s.r.core.handler.RateLimterHandler : 因为超过单位时间=100-容许的请求次数=5[触发限流] 2018-10-28 00:00:02.276 DEBUG 17364 --- [io-8888-exec-10] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]开始执行限流操做 2018-10-28 00:00:02.276 DEBUG 17364 --- [io-8888-exec-10] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]参数值为-limitTimes=5,limitTimeout=100 2018-10-28 00:00:02.278 DEBUG 17364 --- [io-8888-exec-10] c.s.s.r.core.handler.RateLimterHandler : 因为超过单位时间=100-容许的请求次数=5[触发限流] 2018-10-28 00:00:02.445 DEBUG 17364 --- [nio-8888-exec-2] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]开始执行限流操做 2018-10-28 00:00:02.445 DEBUG 17364 --- [nio-8888-exec-2] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]参数值为-limitTimes=5,limitTimeout=100 2018-10-28 00:00:02.446 DEBUG 17364 --- [nio-8888-exec-2] c.s.s.r.core.handler.RateLimterHandler : 因为超过单位时间=100-容许的请求次数=5[触发限流] 2018-10-28 00:00:02.628 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]开始执行限流操做 2018-10-28 00:00:02.628 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler : RateLimterHandler[分布式限流处理器]参数值为-limitTimes=5,limitTimeout=100 2018-10-28 00:00:02.629 DEBUG 17364 --- [nio-8888-exec-4] c.s.s.r.core.handler.RateLimterHandler : 因为超过单位时间=100-容许的请求次数=5[触发限流]
根据日志可以看到,正常请求5次后,返回限流触发,说明咱们的逻辑生效,对前端而言也是能够看到false标记,代表咱们的Lua脚本限流逻辑是正确的,这里具体返回什么标记须要调用方进行明确的定义。
咱们经过Redis的incr及expire功能特性,开发定义了一套基于注解的分布式限流操做,核心逻辑基于Lua保证了原子性。达到了很好的限流的目的,生产上,能够基于该特色进行定制本身的限流组件,固然你能够参考本文的代码,相信你写的必定比个人demo更好!