高并发场景-请求合并(二)揭秘HystrixCollapser-利用Queue和线程池异步实现

背景

在互联网的高并发场景下,请求会很是多,可是数据库链接池比较少,或者说须要减小CPU压力,减小处理逻辑的,须要把单个查询,用某些手段,改成批量查询多个后返回。
如:支付宝中,查询“我的信息”,用户只会触发一次请求,查询本身的信息,可是多我的同时这样作就会产生屡次数据库链接。为了减小链接,须要在JAVA服务端进行合并请求,把多个“我的信息”查询接口,合并为批量查询多个“我的信息”接口,而后以我的信息在数据库的id做为Key返回给上游系统或者页面URL等调用方。java

目的

  1. 减小访问数据库的次数
  2. 单位时间内的多个请求,合并为一个请求。让业务逻辑层把单个查询的sql,改成批量查询的sql。或者逻辑里面须要调用redis,那批量逻辑里面就能够用redis的pipeline去实现。
  3. 本次须要使用JDK原生手段来实现请求合并,由于你们不必定会有Hystrix,因此用原生办法实现,并解析HystrixCollapser里面是如何实现的。
点赞再看,关注公众号:【地藏思惟】给你们分享互联网场景设计与架构设计方案
掘金:地藏Kelvin https://juejin.im/user/5d67da8d6fb9a06aff5e85f7

主要解决手段

  1. SpringCloud的Hystrix的自定义HystrixCollapse和HystrixCommand
  2. SpringCloud的Hystrix注解方式。
  3. 没有服务治理框架时,利用JDK队列、定时任务线程池处理。

在上一章已经说了第一二种,鉴于有同窗没有SpringCloud,因此使用第3种来作请求合并,并一块儿分析请求合并的原理。git

建议先看第一章,第二章至关于为HystrixCollapser的内部原理描述
高并发场景-请求合并(一)SpringCloud中Hystrix请求合并

交互流程

开发

本章节为利用JDK原生包开发,因此没有SpringCloud那么多东西要配置,编写代码只有一个类。redis

1. 建立请求层

只须要暴露单个查询的接口,业务逻辑层里作请求合并的逻辑。spring

@RestController
public class UserController {
    @Autowired
    private UserBatchWithFutureServiceImpl userBatchWithFutureServiceImpl;
   @RequestMapping(method = RequestMethod.GET,value = "/userbyMergeWithFuture/{id}")
    public User userbyMergeWithFuture(@PathVariable Long id) throws InterruptedException, ExecutionException {
        User user = this.userBatchWithFutureServiceImpl.getUserById(id);
        return user;
    }
}

2. 请求合并逻辑层

  1. 建立请求合并逻辑入口
  2. 建立阻塞队列,用于累计多个请求参数
  3. 建立CompletableFuture类,为了本条线程阻塞,等批量查询处理完后,异步获取当前id对应的User结果信息。
  4. 执行CompletableFuture.get方法等待异步结果通知。
@Component
public class UserBatchWithFutureServiceImpl {
    /** 积攒请求的阻塞队列 */
    private LinkedBlockingDeque<UserQueryDto> requestQueue = new LinkedBlockingDeque<>();

    public User getUserById(Long id) throws InterruptedException, ExecutionException {

        UserQueryDto userQueryDto = new UserQueryDto();
        userQueryDto.setId(id);
        CompletableFuture<User> completedFuture = new CompletableFuture<>();
        userQueryDto.setCompletedFuture(completedFuture);

        requestQueue.add(userQueryDto);

        User user = completedFuture.get();
        return user;
    }

HystrixCollapser也是利用这种办法来作异步通知的手段,让请求接口主线程在得到真正结果前阻塞等待。sql

3. 定时任务

在相同的类下建立定时任务,利用@PostConstruct让当前类的Bean构造完后执行该方法,生成一个5秒定时任务。
你们能够设定定时的时间,我为了比较方便测试,而用了5秒。数据库

/** 线程池数量 */
    private int threadNum = 1;
    /** 定时间隔时长 */
    private long period = 5000;
    @PostConstruct
    public void init() {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(createDeviceMergeNum);
        // 每5秒执行一次
        scheduledExecutorService.scheduleAtFixedRate(new UserBatchThread(), 0, createDeviceMergePeriod,
                TimeUnit.MILLISECONDS);
    }

HystrixCollapser的每隔n毫秒就会处理一次执行单个方法转批量方法,也是经过这类来实现的。架构

4. 在UserBatchWithFutureServiceImpl 类下建立内部类

建立内部类为了定时任务执行此逻辑,而且为了代码整洁,不在建立线程池时编写大方法块的代码。并发

在内部类里面主要逻辑:app

  1. 从存放请求接口参数的requestQueue 队列中,获取全部成员,并放入当此触发任务逻辑的局部变量中
  2. 而且取出关键的请求参数id放入局部变量List中。
  3. 只要获取出变量,则进行批量查询
  4. 最后利用CompletedFuture异步通知并唤醒getUserById方法等待的线程。
public class UserBatchThread implements Runnable {

        @Override
        public void run() {
            List<UserQueryDto> requestQueueTmp = new ArrayList<>();
            // 存放批量查询的入参
            List<Long> requestId = new ArrayList<>();

            // 把出请求层放入的消息queue的元素取出来
            int size = requestQueue.size();
            for (int i = 0; i < size; i++) {
                UserQueryDto request = requestQueue.poll();
                if (Objects.nonNull(request)) {
                    requestQueueTmp.add(request);
                    requestId.add(request.getId());
                }
            }

            if (!requestId.isEmpty()) {
                try {
                    List<User> response = getUserBatchById(requestId);
                    Map<Long, User> collect = response.stream().collect(
                            Collectors.toMap(detail -> detail.getId(), Function.identity(), (key1, key2) -> key2));
                    // 通知请求的线程
                    for (UserQueryDto request : requestQueueTmp) {
                        request.getCompletedFuture().complete(collect.get(request.getId()));
                    }

                } catch (Exception e) {
                    // 通知请求的线程-异常
                    requestQueueTmp.forEach(request -> request.getCompletedFuture().obtrudeException(e));
                }
            }
        }

    }

    public List<User> getUserBatchById(List<Long> ids) {
        System.out.println("进入批量处理方法" + ids);
        List<User> ps = new ArrayList<>();
        for (Long id : ids) {
            User p = new User();
            p.setId(id);
            p.setUsername("dizang" + id);
            ps.add(p);
        }
        return ps;
    }

请求接口中入队列的元素,就会从这里取出,HystrixCollasper也是利用这种poll方法原子性的获取队列里面元素,不会被定时任务的屡次触发而重复的获取,只要知足有至少一个都会作批量查询,因此HystrixCollasper合并请求时,即便n毫秒内只有一个请求,也会去处理。框架

测试验证

  1. 同上一章同样触发Swagger-ui页面
  2. 请求两次不一样的参数
  3. 结果以下图中,console日志已经输出了两次请求的入参

总结

到这里相信你们都已经完成了合并请求了。此次没有依赖框架,基于原生作法,利用队列存查询所需的入参,而后利用线程池定时地获取队列的入参,再批量处理,利用线程的Future作异步返回结果。这样咱们就理解了SpringCloud的HystrixCollasper的内部流程了。但愿可以帮助没有框架的项目,或者公司技术栈不合适的状况下的同窗。

本文Demo

都在我springcloud的demo里面了,看provider-hystrix-request-merge这个工程下的内容,在UserBatchWithFutureServiceImpl类中。

https://gitee.com/kelvin-cai/spring-cloud-demo


欢迎关注公众号,文章更快一步

个人公众号 :地藏思惟

掘金:地藏Kelvin

简书:地藏Kelvin

个人Gitee: 地藏Kelvin https://gitee.com/kelvin-cai

相关文章
相关标签/搜索