Java并发编程入门(十一)限流场景和Spring限流器实现

Java极客  |  做者  /  铿然一叶
这是Java极客的第 39 篇原创文章

1、限流场景

限流场景通常基于硬件资源的使用负载,包括CPU,内存,IO。例如某个报表服务须要消耗大量内存,若是并发数增长就会拖慢整个应用,甚至内存溢出致使应用挂掉。java

限流适用于会动态增长的资源,已经池化的资源不必定须要限流,例如数据库链接池,它是已经肯定的资源,池的大小固定(即便能够动态伸缩池大小),这种场景下并不须要经过限流来实现,只要能作到若是池内连接已经使用完,则没法再获取新的链接则可。正则表达式

所以,使用限流的前提是:
1.防止资源使用过载产生不良影响。
2.使用的资源会动态增长,例如一个站点的请求。spring

2、Spring中实现限流

I、限流需求

1.只针对Controller限流
2.根据url请求路径限流
3.可根据正则表达式匹配url来限流 4.可定义多个限流规则,每一个规则的最大流量不一样数据库

II、相关类结构


1.CurrentLimiteAspect是一个拦截器,在controller执行先后执行后拦截
2.CurrentLimiter是限流器,能够添加限流规则,根据限流规则获取流量通行证,释放流量通行证;若是获取通行证失败则抛出异常。
3.LimiteRule是限流规则,限流规则可设置匹配url的正则表达式和最大流量值,同时获取该规则的流量通讯证和释放流量通讯证。
4.AcquireResult是获取流量通讯证的结果,结果有3种:获取成功,获取失败,不须要获取。
5.Application是Spring的启动类,简单起见,在启动类种添加限流规则。

III、Show me code

1.AcquireResult.java

public class AcquireResult {

    /** 获取通行证成功 */
    public static final int ACQUIRE_SUCCESS = 0;

    /** 获取通行证失败 */
    public static final int ACQUIRE_FAILED = 1;

    /** 不须要获取通行证 */
    public static final int ACQUIRE_NONEED = 2;

    /** 获取通行证结果 */
    private int result;

    /** 可用通行证数量 */
    private int availablePermits;

    public int getResult() {
        return result;
    }

    public void setResult(int result) {
        this.result = result;
    }

    public int getAvailablePermits() {
        return availablePermits;
    }

    public void setAvailablePermits(int availablePermits) {
        this.availablePermits = availablePermits;
    }
}
复制代码

2.LimiteRule.java

/** * @ClassName LimiteRule * @Description TODO * @Author 铿然一叶 * @Date 2019/10/4 20:18 * @Version 1.0 * javashizhan.com **/
public class LimiteRule {

    /** 信号量 */
    private final Semaphore sema;

    /** 请求URL匹配规则 */
    private final String pattern;

    /** 最大并发数 */
    private final int maxConcurrent;

    public LimiteRule(String pattern, int maxConcurrent) {
        this.sema = new Semaphore(maxConcurrent);
        this.pattern = pattern;
        this.maxConcurrent = maxConcurrent;
    }

    /** * 获取通行证。这里加同步是为了打印可用通行证数量时看起来逐个减小或者逐个增长,无此打印需求可不加synchronized关键字 * @param urlPath 请求Url * @return 0-获取成功,1-没有获取到通行证,2-不须要获取通行证 */
    public synchronized AcquireResult tryAcquire(String urlPath) {

        AcquireResult acquireResult = new AcquireResult();
        acquireResult.setAvailablePermits(this.sema.availablePermits());

        try {
            //Url请求匹配规则则获取通行证
            if (Pattern.matches(pattern, urlPath)) {

                boolean acquire = this.sema.tryAcquire(50, TimeUnit.MILLISECONDS);

                if (acquire) {
                    acquireResult.setResult(AcquireResult.ACQUIRE_SUCCESS);
                    print(urlPath);
                } else {
                    acquireResult.setResult(AcquireResult.ACQUIRE_FAILED);
                }
            } else {
                acquireResult.setResult(AcquireResult.ACQUIRE_NONEED);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return acquireResult;
    }

    /** * 释放通行证,这里加同步是为了打印可用通行证数量时看起来逐个减小或者逐个增长,无此打印需求可不加synchronized关键字 */
    public synchronized void release() {
        this.sema.release();
        print(null);
    }

    /** * 获得最大并发数 * @return */
    public int getMaxConcurrent() {
        return this.maxConcurrent;
    }

    /** * 获得匹配表达式 * @return */
    public String getPattern() {
        return this.pattern;
    }

    /** * 打印日志 * @param urlPath */
    private void print(String urlPath) {
        StringBuffer buffer = new StringBuffer();
        buffer.append("Pattern: ").append(pattern).append(", ");
        if (null != urlPath) {
            buffer.append("urlPath: ").append(urlPath).append(", ");
        }
        buffer.append("Available Permits:").append(this.sema.availablePermits());
        System.out.println(buffer.toString());
    }

}
复制代码

3.CurrentLimiter.java

/** * @ClassName CurrentLimiter * @Description TODO * @Author 铿然一叶 * @Date 2019/10/4 20:18 * @Version 1.0 * javashizhan.com **/
public class CurrentLimiter {

    /** 本地线程变量,存储一次请求获取到的通行证,和其余并发请求隔离开,在controller执行完后释放本次请求得到的通行证 */
    private static ThreadLocal<Vector<LimiteRule>> localAcquiredLimiteRules = new ThreadLocal<Vector<LimiteRule>>();

    /** 全部限流规则 */
    private static Vector<LimiteRule> allLimiteRules = new Vector<LimiteRule>();

    /** 私有构造器,避免实例化 */
    private CurrentLimiter() {}

    /** * 添加限流规则,在spring启动时添加,不须要加锁,若是在运行中动态添加,须要加锁 * @param rule */
    public static void addRule(LimiteRule rule) {
        printRule(rule);
        allLimiteRules.add(rule);
    }

    /** * 获取流量通讯证,全部流量规则都要获取后才能经过,若是一个不能获取则抛出异常 * 多线程并发,须要加锁 * @param urlPath */
    public static void tryAcquire(String urlPath) throws Exception {
        //有限流规则则处理
        if (allLimiteRules.size() > 0) {

            //能获取到通行证的流量规则要保存下来,在Controller执行完后要释放
            Vector<LimiteRule> acquiredLimitRules = new Vector<LimiteRule>();

            for(LimiteRule rule:allLimiteRules) {
                //获取通行证
                AcquireResult acquireResult = rule.tryAcquire(urlPath);

                if (acquireResult.getResult() == AcquireResult.ACQUIRE_SUCCESS) {
                    acquiredLimitRules.add(rule);
                    //获取到通行证的流量规则添加到本地线程变量
                    localAcquiredLimiteRules.set(acquiredLimitRules);

                } else if (acquireResult.getResult() == AcquireResult.ACQUIRE_FAILED) {
                    //若是获取不到通行证则抛出异常
                    StringBuffer buffer = new StringBuffer();
                    buffer.append("The request [").append(urlPath).append("] exceeds maximum traffic limit, the limit is ").append(rule.getMaxConcurrent())
                            .append(", available permit is").append(acquireResult.getAvailablePermits()).append(".");

                    System.out.println(buffer);
                    throw new Exception(buffer.toString());

                } else {
                    StringBuffer buffer = new StringBuffer();
                    buffer.append("This path does not match the limit rule, path is [").append(urlPath)
                            .append("], pattern is [").append(rule.getPattern()).append("].");
                    System.out.println(buffer.toString());
                }
            }
        }
    }

    /** * 释放获取到的通行证。在controller执行完后掉调用(抛出异常也须要调用) */
    public static void release() {
        Vector<LimiteRule> acquiredLimitRules = localAcquiredLimiteRules.get();
        if (null != acquiredLimitRules && acquiredLimitRules.size() > 0) {
            acquiredLimitRules.forEach(rule->{
                rule.release();
            });
        }

        //destory本地线程变量,避免内存泄漏
        localAcquiredLimiteRules.remove();
    }

    /** * 打印限流规则信息 * @param rule */
    private static void printRule(LimiteRule rule) {
        StringBuffer buffer = new StringBuffer();
        buffer.append("Add Limit Rule, Max Concurrent: ").append(rule.getMaxConcurrent())
                .append(", Pattern: ").append(rule.getPattern());
        System.out.println(buffer.toString());
    }
}
复制代码

4.CurrentLimiteAspect.java

/** * @ClassName CurrentLimiteAspect * @Description TODO * @Author 铿然一叶 * @Date 2019/10/4 20:15 * @Version 1.0 * javashizhan.com **/
@Aspect
@Component
public class CurrentLimiteAspect {

    /** * 拦截controller,自行修改路径 */
    @Pointcut("execution(* com.javashizhan.controller..*(..))")
    public void controller() { }

    @Before("controller()")
    public void controller(JoinPoint point) throws Exception {
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        //获取通行证,urlPath的格式如:/limit
        CurrentLimiter.tryAcquire(request.getRequestURI());
    }

    /** * controller执行完后调用,即便controller抛出异常这个拦截方法也会被调用 * @param joinPoint */
    @After("controller()")
    public void after(JoinPoint joinPoint) {
        //释放获取到的通行证
        CurrentLimiter.release();
    }
}
复制代码

5.Application.java

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).run(args);

        //添加限流规则
        LimiteRule rule = new LimiteRule("/limit", 4);
        CurrentLimiter.addRule(rule);
    }
}
复制代码

IV、验证

测试验证碰到的两个坑:
1.人工经过浏览器刷新请求发现controller是串行的
2.经过postman设置了并发测试也仍是串行的,即使设置了并发数,以下图:编程

百度无果,只能自行写代码验证了,代码以下:浏览器

/** * @ClassName CurrentLimiteTest * @Description 验证限流器 * @Author 铿然一叶 * @Date 2019/10/5 0:51 * @Version 1.0 * javashizhan.com **/
public class CurrentLimiteTest {

    public static void main(String[] args) {
        final String limitUrlPath = "http://localhost:8080/limit";
        final String noLimitUrlPath = "http://localhost:8080/nolimit";

        //限流测试
        test(limitUrlPath);

        //休眠一会,等上一批线程执行完,方便查看日志
        sleep(5000);

        //不限流测试
        test(noLimitUrlPath);

    }

    private static void test(String urlPath) {
        Thread[] requesters = new Thread[10];

        for (int i = 0; i < requesters.length; i++) {
            requesters[i] = new Thread(new Requester(urlPath));
            requesters[i].start();
        }
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Requester implements Runnable {

    private final String urlPath;
    private final RestTemplate restTemplate = new RestTemplate();

    public Requester(String urlPath) {
        this.urlPath = urlPath;
    }

    @Override
    public void run() {
        String response = restTemplate.getForEntity(urlPath, String.class).getBody();
        System.out.println("response: " + response);
    }
}
复制代码

输出日志以下:缓存

Pattern: /limit, urlPath: /limit, Available Permits:3
Pattern: /limit, urlPath: /limit, Available Permits:2
Pattern: /limit, urlPath: /limit, Available Permits:1
Pattern: /limit, urlPath: /limit, Available Permits:0
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
Pattern: /limit, Available Permits:1
Pattern: /limit, Available Permits:2
Pattern: /limit, Available Permits:3
Pattern: /limit, Available Permits:4
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
复制代码

能够看到日志输出信息为:
1.第1个测试url最大并发为4,一次10个并发请求,有4个获取通行证后,剩余6个获取通行证失败。
2.获取到通行证的4个请求在controller执行完后释放了通行证。
3.第2个测试url没有限制并发,10个请求均执行成功。安全

至此,限流器验证成功。bash

注意:去掉同步锁后(synchronized关键字),打印的日志相似以下,能够看到可用通行证数量不是递增或者递减的,但这并不代表逻辑不正确,这是由于信号量支持多个线程进入临界区,在打印以前,可能已经减小了多个通行证,另外先执行的线程不必定先结束,因此看到的可用通讯证数量不是递增也不是递减的。信号量只能保证的是用掉一个通行证,就少一个。多线程

Pattern: /limit, urlPath: /limit, Available Permits:2
Pattern: /limit, urlPath: /limit, Available Permits:1
Pattern: /limit, urlPath: /limit, Available Permits:0
Pattern: /limit, urlPath: /limit, Available Permits:2
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
Pattern: /limit, Available Permits:2
Pattern: /limit, Available Permits:4
Pattern: /limit, Available Permits:2
Pattern: /limit, Available Permits:3
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
复制代码

end.


相关阅读:
Java并发编程(一)知识地图
Java并发编程(二)原子性
Java并发编程(三)可见性
Java并发编程(四)有序性
Java并发编程(五)建立线程方式概览
Java并发编程入门(六)synchronized用法
Java并发编程入门(七)轻松理解wait和notify以及使用场景
Java并发编程入门(八)线程生命周期
Java并发编程入门(九)死锁和死锁定位
Java并发编程入门(十)锁优化
Java并发编程入门(十二)生产者和消费者模式-代码模板
Java并发编程入门(十三)读写锁和缓存模板
Java并发编程入门(十四)CountDownLatch应用场景
Java并发编程入门(十五)CyclicBarrier应用场景
Java并发编程入门(十六)秒懂线程池差异
Java并发编程入门(十七)一图掌握线程经常使用类和接口
Java并发编程入门(十八)再论线程安全


Java极客站点: javageektour.com/

<---此贴不易,左边点赞!

相关文章
相关标签/搜索