用CountDownLatch提高请求处理速度

countdownlatch是java多线程包concurrent里的一个常见工具类,经过使用它能够借助线程能力极大提高处理响应速度,且实现方式很是优雅。今天咱们用一个实际案例和你们来说解一下如何使用以及须要特别注意的点。java

因为线程类的东西都比较抽象,咱们换一种讲解思路,先讲解决问题的案例,而后再解释下原理。编程

假设在微服务架构中,A服务会调用B服务处理一些事情,且每处理一次业务,A可能要调用B屡次处理逻辑相同但数据不一样的事情。为了提高整个链路的处理速度,咱们天然会想到是否能够把A调用B的各个请求组成一个批次,这样A服务只须要调用B服务一次,等B服务处理完一块儿返回便可,省了屡次网络传输的时间。代码以下:安全

/** * 批次请求处理服务 * @param batchRequests 批次请求对象列表 * @return */
public List<DealResult> deal(List<DealRequest> batchRequests){
  List<DealResult> resultList = new ArrayList<>();
  if(batchRequests != null){
    for(DealRequest request : batchRequests){
      //遍历顺序处理单个请求
      resultList.add(process(request));
    }
  }
  return resultList;
}
复制代码

可是B服务顺序处理批次里每个请求的时间并无节省,假设批次里有3个请求,一个请求平均耗时100MS,则B服务仍是要花费300MS来处理完。有什么办法能马上简单提高3倍处理速度,令总花费时间只须要100MS?到咱们的大将countdownlatch出场了!代码以下:服务器

/** * 使用countdownlatch的批次请求处理服务 * @param batchRequests 批次请求对象列表 * @return */
public List<DealResult> countDownDeal(List<DealRequest> batchRequests){

  //定义线程安全的处理结果列表
  List<DealResult> countDownResultList = Collections.synchronizedList(new ArrayList<DealResult>());

  if(batchRequests != null){

        //定义countdownlatch线程数,有多少个请求,咱们就定义多少个
        CountDownLatch runningThreadNum = new CountDownLatch(batchRequests.size());

    for(DealRequest request : batchRequests){
      //循环遍历请求,并实例化线程(构造函数传入CountDownLatch类型的runningThreadNum),马上启动
      DealWorker dealWorker = new DealWorker(request, runningThreadNum, countDownResultList);
      new Thread(dealWorker).start();
    }

        try {
          //调用CountDownLatch的await方法则当前主线程会等待,直到CountDownLatch类型的runningThreadNum清0
          //每一个DealWorker处理完成会对runningThreadNum减1
          //若是等待1分钟后当前主线程都等不到runningThreadNum清0,则认为超时,直接中断,抛出中断异常InterruptedException
            runningThreadNum.await(1, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
          //此处简化处理,非正常中断应该抛出异常或返回错误结果
            return null;
        }
  }
  return countDownResultList;
}

/** * 线程请求处理类 * */
private class DealWorker implements Runnable {

      /** 正在运行的线程数 */
      private CountDownLatch  runningThreadNum;

      /**待处理请求*/
      private DealRequest request;

      /**待返回结果列表*/
      private List<DealResult> countDownResultList;

      /** * 构造函数 * @param request 待处理请求 * @param runningThreadNum 正在运行的线程数 * @param countDownResultList 待返回结果列表 */
      private DealWorker(DealRequest request, CountDownLatch runningThreadNum, List<DealResult> countDownResultList) {
        this.request = request;
        this.runningThreadNum = runningThreadNum;
        this.countDownResultList = countDownResultList;
      }

  @Override
  public void run() {
    try{
      this.countDownResultList.add(process(this.request));
    }finally{
      //当前线程处理完成,runningThreadNum线程数减1,此操做必须在finally中完成,避免处理异常后形成runningThreadNum线程数没法清0
      this.runningThreadNum.countDown();
    }
  }
}
复制代码

是否是很简单?下图和上面的代码又作了一个对应,假设有3个请求,则启动3个子线程DealWorker,并实例化值数等于3的CountDownLatch。每当一个子线程处理完成后,则调用countDown操做减1。主线程处于awaiting状态,直到CountDownLatch的值数减到0,则主线程继续resume执行。网络

在API中是这样描述的: 用给定的计数 初始化 CountDownLatch。因为调用了 countDown() 方法,因此在当前计数到达零以前,await 方法会一直受阻塞。以后,会释放全部等待的线程,await 的全部后续调用都将当即返回。这种现象只出现一次——计数没法被重置。若是须要重置计数,请考虑使用 CyclicBarrier。多线程

经典的java并发编程实战一书中作了更深刻的定义: CountDownLatch属于闭锁的范畴,闭锁是一种同步工具类,能够延迟线程的进度直到其到达终止状态。闭锁的做用至关于一扇门:在闭锁到达结束状态以前(上面代码中的runningThreadNumq清0),这扇门一直是关闭的,而且没有任何线程能经过(上面代码中的主线程一直await),当到达结束状态时,这扇门会打开并容许全部线程经过(上面代码中的主线程能够继续执行)。当闭锁到达结束状态后,将不会再改变状态,所以这扇门将永远保持打开状态。架构

像FutureTask,Semaphore这类在concurrent包里的类也属于闭锁,不过它们和CountDownLatch的应用场景仍是有差异的,这个咱们在后面的文章里再细说。并发

使用CountDownLatch有哪些须要注意的点

  1. 批次请求之间不能有执行顺序要求,不然多个线程并发处理没法保证请求执行顺序
  2. 各线程都要操做的结果列表必须是线程安全的,好比上面代码范例的countDownResultList
  3. 各子线程的countDown操做要在finally中执行,确保必定能够执行
  4. 主线程的await操做须要设置超时时间,避免因子线程处理异常而长时间一直等待,若是中断须要抛出异常或返回错误结果

使用CountDownLatch提升批次处理速度的问题

  1. 若是一个批次请求数不少,会瞬间占用服务器大量线程。此时必须使用线程池,并限定最大可处理线程数量,不然服务器不稳定性会大福提高。
  2. 主线程和子线程间的数据传输变得困难,稍不注意会形成线程不安全的问题,且代码可读性有必定降低

下一篇文章咱们讲讲FutureTask的应用场景,谢谢!ide

相关文章
相关标签/搜索