Java并发之线程池的使用浅

背景

   当系统并发的线程数量不少,而且每一个线程都是执行一个时间很短的任务就结束了,这样频繁建立线程就会大大下降系统的效率,由于频繁建立线程和销毁线程须要消耗大量的系统资源。java

  因此须要一个办法使得线程能够复用,即当线程执行完一个任务,并不被销毁,而是能够继续执行其余的任务。在java中就能够经过线程池来实现这样的效果。本文讲述了java中的线程池类以及如何使用线程池数组

1、java中的线程池ThreadPoolExecutor

   ThreadPoolExecutor是线程池中基础类也是最为核心的类。想要了解和合理使用线程池绕不开ThreadPoolExecutor类。下面介绍一下此类。
该类的构造函数以下
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
  //...
}

线程池有这么几个重要的参数缓存

  1. corePoolSize: 线程池里的核心线程数量
  2. maximumPoolSize: 线程池里容许有的最大线程数量
  3. keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。 默认状况下,若是当前线程数量 > corePoolSize,多出来的线程会在keepAliveTime以后就被释放掉,直到线程池中的线程数不大于corePoolSize。可是若是调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起做用,直到线程池中的线程数为0
  4. unit: keepAliveTime的时间单位,有7种单位
    • TimeUnit.DAYS;                           //
      TimeUnit.HOURS;                        //小时
      TimeUnit.MINUTES;                    //分钟
      TimeUnit.SECONDS;                  //
      TimeUnit.MILLISECONDS;         //毫秒
      TimeUnit.MICROSECONDS;       //微妙
      TimeUnit.NANOSECONDS;        //
  5. workQueue: 队列workQueue的类型为BlockingQueue<Runnable>,一般能够取下面三种类型:
    1. 有界任务队列ArrayBlockingQueue:基于数组的先进先出队列,此队列建立时必须指定大小; 
    2. 无界任务队列LinkedBlockingQueue:基于链表的先进先出队列,若是建立时没有指定此队列大小,则默认为Integer.MAX_VALUE;
    3. 直接提交队列synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。若是进程数量已经达到最大值,则执行拒绝策略。所以使用该队列须要设置很大的maximumPoolSize,不然很容易执行拒绝策略。
  6. threadFactory: 每当须要建立新的线程放入线程池的时候,就是经过这个线程工厂来建立的
  7. handler: 就是说当线程,队列都满了,以后采起的策略,好比抛出异常等策略
    • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,不作任何处理
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,而后从新尝试执行任务
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

2、 线程池中重要的方法 

线程池有两个重要的操做,提交任务和关闭线程池。在讲述这两个操做以前先了解一下线程池的状态。注意!!!,线程池的状态而不是线程状态。并发

在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:app

//当前线程池的状态,voliate 保证了线程之间的可见
volatile int runState;

//建立线程池后,初始时,线程池处于此状态
static final int RUNNING    = 0;

/*调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不可以接受新的任务,它会等待全部任务执行完毕*/
static final int SHUTDOWN   = 1;

/*调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,而且会去尝试终止正在执行的任务*/
static final int STOP       = 2;

/*线程池处于SHUTDOWN或STOP状态,而且全部工做线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态*/
static final int TERMINATED = 3;

2.1 线程池提交任务

ThreadPoolExecutor的提交操做可使用submit和execute这两种方法async

2.1.1 excute

 最核心的任务提交方法是execute()方法,虽然经过submit也能够提交任务,可是实际上submit方法里面最终调用的仍是execute()方法 而且ExecutorService中的invokeAll(),invokeAny()都是调用的execute方法。execute提交的任务无返回值,所以没法判断任务是否执行成功。可是若是出现线程错误能够显示部分异常堆栈信息ide

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();
 // 1.当前线程数量小于corePoolSize,则建立并启动线程。
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
        // 成功,则返回
                return;
            c = ctl.get();
        }
    // 2.步骤1建立线程失败,则尝试把任务加入阻塞队列,
        if (isRunning(c) && workQueue.offer(command)) {
       // 入队列成功,检查线程池状态,若是状态部署RUNNING并且remove成功,则拒绝任务
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
       // 若是当前worker数量为0,经过addWorker(null, false)建立一个线程,其任务为null
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
    // 3. 步骤1和2失败,则尝试将线程池的数量由corePoolSize扩充至maxPoolSize,若是失败,则拒绝任务
        else if (!addWorker(command, false))
            reject(command);
    }
 }

详细流程解读函数

  1.   workerCountOf方法根据ctl的低29位,获得线程池的当前线程数,若是线程数小于corePoolSize,则执行addWorker方法建立新的线程执行任务;不然执行步骤(2); 
  2.  若是线程池处于RUNNING状态,且把提交的任务成功放入阻塞队列中,则执行步骤(3),不然执行步骤(4)
  3.  再次检查线程池的状态,若是线程池没有RUNNING,且成功从阻塞队列中删除任务,则执行reject方法处理任务
  4. 执行addWorker方法尝试将线程池的数量由corePoolSize扩充至maxPoolSize,若是addWoker执行失败,则执行reject方法处理任务;

2.1.2 sumbit

      若是使用submit 方法来提交任务,它会返回一个future,那么咱们能够经过这个future来判断任务是否执行成功,经过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后当即返回,这时有可能任务没有执行完。能够经过如下方式改造submit得到部分异常堆栈信息性能

try {
 Future re = pools.submit(Task);
 re.get();

} catch (InterruptedException e) {

// 处理中断异常

} catch (ExecutionException e) {

// 处理没法执行任务异常

} finally {

// 关闭线程池

executor.shutdown();

}

2.2 线程池的关闭

使用线程池时,咱们能够经过调用线程池的shutdown或shutdownNow方法来关闭线程池,可是它们的实现原理不一样。this

    • shutdown的原理是只是将线程池的状态设置成SHUTDOWN状态,而后中断全部没有正在执行任务的线程。
    • shutdownNow的原理是遍历线程池中的工做线程,而后逐个调用线程的interrupt方法来中断线程,因此没法响应中断的任务可能永远没法终止。shutdownNow会首先将线程池的状态设置成STOP,而后尝试中止全部的正在执行或暂停任务的线程,并返回等待执行任务的列表。

只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当全部的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于咱们应该调用哪种方法来关闭线程池,应该由提交到线程池的任务特性决定,一般调用shutdown来关闭线程池,若是任务不必定要执行完,则能够调用shutdownNow。

3、线程池种类

常见的五种线程池

  • newFixedThreadPool:固定数量线程池,这个比较经常使用.能够指定线程池的大小,该线程池corePoolSize和maximumPoolSize相等,阻塞队列使用的是LinkedBlockingQueue,大小为整数最大值
  • newSingleThreadExecutor: 单线程线程池,通常不多使用.
  • newCachedThreadPool:缓存线程池,缓存的线程默认存活60秒,线程的核心池corePoolSize大小为0,核心池最大为Integer.MAX_VALUE,阻塞队列使用的是SynchronousQueue。
  • newScheduledThreadPool:定时线程池,该线程池可用于周期性地去执行任务,一般用于周期性的同步数据。
  • newWorkStealingPool:工做窃取线程池,该线程为jdk1.8版新增。

 1. newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

这里思考一个问题为何newFixedThreadPool的corePoolSizemamximumPoolSize设计为同样的?

       答案能够从execute的源码中找到,首先线程池提交任务时是先判断corePoolSize,再判断workQueue,最后判断mamximumPoolSize,然而LinkedBlockingQueue是无界队列,因此它是达不到判断mamximumPoolSize这一步的,因此mamximumPoolSize成多少,并无多大所谓。

下边简单的介绍一下newFixedThreadPool的使用

public class ThreadPoolDemo {

    public static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":ThreadID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {

        MyTask task = new MyTask();

        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; ++i) {
            es.submit(task);
        }
    }
}

      这里建立了固定大小为5的线程,而后依次向线程池提交了10个任务。此后线程池就会安排调度这10个任务。每一个任务都会将本身的执行时间和执行任务线程的Id打印出来,而且每个任务执行时间为1秒

执行代码,输出以下

       能够看出,前5个任务和后5个任务执行时间相差1秒,而且前五个和后五个ID是一致的。这说明任务是分两个批次执行。这也符合一个只有5个线程的线程池行为

 2. newCachedThreadPool

       该方法返回一个可根据实际状况调整线程数量大小的线程池,线程池的数量不肯定,但如有空闲线程能够复用,则会优先使用可复用的线程,若是全部线程均在工做,又有新的任务被提交,

则会建立新的线程执行任务。全部线程在当前任务执行完毕后,将返回线程池进行复用。

public static ExecutorService newCachedThreadPool() {
       return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

        这里能够看到CachedThreadExecutormamximumPoolSize被设计成接近无限大。     

        缘由就是和synchronousQueue相关,上边已经简单介绍过该队列了,该队列的每一个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。若是 mamximumPoolSize不设计得很大,那么就很容易抛出异常。
        因此,使用该线程池时,必定要注意控制并发的任务数,不然建立大量的线程可能致使严重的性能问题

3. newScheduledThreadPool

    计划任务,和其余线程池不一样,该线程池并不必定会当即安排任务,主要是起计划任务的做用。他会在指定时间、对任务进行调度。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory); }
  • scheduleAtFixedRate:是以固定的频率去执行任务,周期是指每次执行任务成功执行之间的间隔。
  • schedultWithFixedDelay:是以固定的延时去执行任务,延时是指上一次执行成功以后和下一次开始执行的以前的时间

4. newSingleThreadExecutor

     单个线程线程池,只有一个线程的线程池,阻塞队列使用的是LinkedBlockingQueue,如有多余的任务提交到线程池中,则会被暂存到阻塞队列,待空闲时再去执行。按照先入先出的顺序执行任务。

public static ExecutorService newSingleThreadExecutor() {
       return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
       return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
}

5. newWorkStealingPool

会根据所需的并行层次来动态建立一个拥有足够的线程数目的线程池。经过使用多个队列来下降竞争。并行的层次是和运行的最大线程数目相关。运行过程当中实际的线程数目或许会动态地增加和收缩。

 其本质是一个一个工做窃取的线程池,因此对于提交的任务不能保证是顺序执行的。底层用的ForkJoinPool来实现的。ForkJoinPool的优点在于,能够充分利用多cpu,多核cpu的优点,

把一个任务拆分红多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成以后,再将这些执行结果合并起来便可。分治的思想。下面是newWorkStealingPool的构造函数

public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool (parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);
 }

//使用一个无限队列来保存须要执行的任务,能够传入线程的数量,不传入,则默认使用当前计算机中可用的cpu数量,使用分治法来解决问题,使用fork()和join()来进行调用
public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler,boolean asyncMode) {
        this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE,"ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

  假设有3个线程A、B、C在运行,workStealing能够简单这么认为,每一个线程都维护本身的一个队列,线程A的队列里头积累了3个任务,线程B的队列里2个任务,C的队列里1个任务;那么当线程C执行完任务以后,它会去别的线程池所维护的队列里面把任务偷过来继续执行,主动的找活干。

咱们看一下newWorkStealingPool的使用案例

/**
 * WorkStealingPool(任务窃取,都是守护线程)
 * 每一个线程都有要处理的队列中的任务,若是其中的线程完成本身队列中的任务,
 * 那么它能够去其余线程中获取其余线程的任务去执行
 */
public class TestWorkStealingPool {

    public static void main(String[] args) throws IOException {
        // 根据cpu是几核来开启几个线程
        ExecutorService service = Executors.newWorkStealingPool();
        // 查看当前计算机是几核
        System.out.println(Runtime.getRuntime().availableProcessors());
        service.execute(new R(1000));
        service.execute(new R(3000));
        service.execute(new R(4000));
        service.execute(new R(2000));
        service.execute(new R(3000));
        service.execute(new R(3000));
        service.execute(new R(3000));
        service.execute(new R(3000));

        // WorkStealing是精灵线程(守护线程、后台线程),主线程不阻塞,看不到输出。
        // 虚拟机不中止,守护线程不中止
        System.in.read();
    }

    static class R implements Runnable {
        int time;

        public R(int time) {
            this.time = time;
        }

        @Override
        public void run() {
            System.out.println(time + ":" + Thread.currentThread().getName() + "执行时间为:" + System.currentTimeMillis());
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

结果输出 

   能够看到newWorkStealingPool执行时根据cpu核心分配线程数量,这里打印显示cpu核心数为4,明显的能够看出线程将前4 个任务扔给了1-4号线程,后四个任务在排队等待。当1-4号线程中某个线程最早执行完毕后

会自动窃取未执行的任务,这里能够看到1号线程最早执行完毕用时1000ms,而后就窃取第五个任务。第0号线程执行完毕后,又紧接着执行第六个任务....。依次类推,直到全部任务执行完毕。

4、线程池的使用建议

  建议根据本身的须要手动建立线程池(new ThreadPoolExecutor(......)),这样能够灵活使用线程池,而且可能够加深本身对线程池的理解。阿里巴巴开发手册和Dubbo线程池的开发手册也是这样建议

Dubbo线程池的开发手册以下

阿里巴巴开发手册建议以下

咱们能够看一下Dubbo怎么建立线程池的

@SPI("fixed")
public interface ThreadPool {

    /**
     * 线程池
     * 
     * @param url 线程参数
     * @return 线程池
     */
    @Adaptive({Constants.THREADPOOL_KEY})
    Executor getExecutor(URL url);

}
public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 
                queues == 0 ? new SynchronousQueue<Runnable>() : 
                    (queues < 0 ? new LinkedBlockingQueue<Runnable>() 
                            : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

    默认状况下,Dubbo的FixedThreadPool中,maximumPoolSize = 200,队列是容量很小的SynchronousQueue.因此当线程超过200的时候,线程池就会抛出异常.

总结

     建议根据本身的须要手动建立线程池,也能够根据本身的须要实现ThreadFactory接口,从而切合实际使用中的项目

相关文章
相关标签/搜索