浅谈异步并发请求和请求合并

概述

在作业务系统需求开发中,常常须要从其余服务获取数据,拼接数据,而后返回数据给前端使用;常见的服务调用就是经过http接口调用,而对于http,一般一个请求会分配一个线程执行,在同步调用接口的状况下,整个线程是一直被占用或者阻塞的;若是有大量的这种请求,整个系统的吞吐量就比较低,而在依赖的服务响应时间比较低的状况下,咱们但愿先让出cpu,让其余请求先执行,等依赖的服务请求返回结果时再继续往下执行,这时咱们会考虑将请求异步化,或者将相同的请求合并,从而达到提升系统执行效率和吞吐量的目的。css

异步并发请求

目前常见的几种调用方式是同步调用,线程池+future,异步回调completableFuture;协程也是异步调用的解决方式,但java目前不支持协程;对于future方式,只能用get或者while(!isDone)轮询这种阻塞的方式直到线程执行完成,这也不是咱们但愿的异步执行方式,jdk8提供的completableFuture其实也不是异步的方式,只是对依赖多服务的Callback调用结果处理作结果编排,来弥补Callback的不足,从而实现异步链式调用的目的,这也是比较推荐的方式。前端

同步调用

RpcService rpcService = new RpcService();
	HttpService httpService = new HttpService();
	// 假设rpc1耗时10ms
	Map<String, String> rpcResult1=rpcService.getRpcResult();
	// 假设rpc2耗时20ms
	Integer rpcResult2 = httpService.getHttpResult();
	
	// 则线程总耗时:30ms

future

ExecutorService executor = Executors.newFixedThreadPool(2);
	RpcService rpcService = new RpcService();
	HttpService httpService = new HttpService();
	future1 = executor.submit(() -> rpcService.getRpcResult());
	future2 = executor.submit(() -> httpService.getHttpResult());
	//rpc1耗时10ms
	Map<String, String> rpcResult1 = future1.get(300, TimeUnit.MILLISECONDS);
	//rpc2耗时20ms
	Integer rpcResult2 = future2.get(300, TimeUnit.MILLISECONDS);

	//则线程总耗时20ms

CompletableFuture

/** 
	* 场景:两个接口并发异步调用,返回CompletableFuture,不阻塞主线程 
	* 两个服务也是异步非阻塞调用
	**/
	CompletableFuture future1 = service.getHttpData("http://www.vip.com/showGoods/50");
	CompletableFuture future2 = service.getHttpData("http://www.vip.com/showGoods/50");
	CompletableFuture future3 = future1.thenCombine(future2, (f1, f2) -> {
	    //处理业务....

	    return f1 + "," + f2;
	}).exceptionally(e -> {

	    return "";
	});

CompletableFuture使用ForkJoinPool执行线程,在ForkJoinPool类注册ForkJoinWorkerThread线程时能够看到,ForkJoinPool里面的线程都是daemon线程(垃圾回收线程就是一个典型的deamon线程),java

/**
     * Callback from ForkJoinWorkerThread constructor to establish and
     * record its WorkQueue.
     *
     * @param wt the worker thread
     * @return the worker's queue
     */
    final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
        UncaughtExceptionHandler handler;
		
	    #注册守护线程
        wt.setDaemon(true);          // configure thread
		
        if ((handler = ueh) != null)
            wt.setUncaughtExceptionHandler(handler);
        WorkQueue w = new WorkQueue(this, wt);
        int i = 0;                                    // assign a pool index
        int mode = config & MODE_MASK;
        int rs = lockRunState();
        try {
            WorkQueue[] ws; int n;                    // skip if no array
            if ((ws = workQueues) != null && (n = ws.length) > 0) {
                int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                int m = n - 1;
                i = ((s << 1) | 1) & m;               // odd-numbered indices
                if (ws[i] != null) {                  // collision
                    int probes = 0;                   // step by approx half n
                    int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                    while (ws[i = (i + step) & m] != null) {
                        if (++probes >= n) {
                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                            m = n - 1;
                            probes = 0;
                        }
                    }
                }
                w.hint = s;                           // use as random seed
                w.config = i | mode;
                w.scanState = i;                      // publication fence
                ws[i] = w;
            }
        } finally {
            unlockRunState(rs, rs & ~RSLOCK);
        }
        wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
        return w;
    }

当主线程执行完后,jvm就会退出,因此须要考虑主线程执行完成时间和fork出去的线程执行时间,也须要考虑线程池的大小,默认为当前cpu的核数-1,能够参考下其余系统的故障记录:CompletableFuture线程池问题linux

请求合并

当系统遇到瞬间产生大量请求时,能够考虑将相同的请求合并,最大化利用系统IO,提升系统的吞吐量。nginx

设计时,能够将符合条件的url请求,先收集起来,直到知足如下条件之一时进行合并发送:segmentfault

  1. 收集到的请求数超过预设的最大请求数。
  2. 距离上次请求发送时长超过预设的最大时长。

实现方案有自行使用阻塞队列方式:并发环境下的请求合并,也能够考虑Hystrix:Hystrix实现请求合并/请求缓存缓存

目前公司网关组件janus也是经过合并auth请求的方式减小网络开销,提升cpu的利用率和系统吞吐量的。网络

nginx一样有合并请求模块nginx-http-concat用来减小请求io,参考:nginx 合并多个js/css请求为一个请求 并发

赖泽坤@vipshop.comapp

相关文章
相关标签/搜索