浅谈Java并发编程系列(六) —— 线程池的使用

线程池的做用

  1. 下降资源消耗。经过重复利用已建立的线程下降线程建立和销毁形成的资源浪费。java

  2. 提升响应速度。当任务到达时,不须要等到线程建立就能当即执行。数据库

  3. 方便管理线程。线程是稀缺资源,若是无限制地建立,不只会消耗系统资源,还会下降系统的稳定性,使用线程池能够对线程进行统一的分配,优化及监控。安全

设置线程池大小

设置线程池的大小能够从如下几个方面分析入手:多线程

  • 系统中有多少个cpu?less

  • 多大的内存?ide

  • 任务是计算密集型、I/O密集集仍是两者皆可?oop

  • 是否须要像JDBC链接这样的稀缺资源?优化

对于计算密集型的任务,在拥有N个cpu的机器上,一般将线程池大小设置为N+1时,可以实现最优的利用率。 对于包含I/O操做或者其余阻塞操做的任务,因为线程并不会一直执行,所以线程池的规模应该更大。
可经过以下公式进行估计:
$$N_{threads} = N_{cpu}*U_{cpu}*(1+\frac{W}{C})$$
其中:
$$ U_{cpu} = target\ CPU\ utilization, 0 \le U_{cpu} \le 1$$
$$ \frac{W}{C} = ration\ of\ wait\ time\ to\ compute\ time$$
能够经过Rumtime来得到CUP的数目:ui

int N_CPUS = Runtime.getRuntime().availableProcessor();

固然,CPU周期并非惟一影响线程池大小的资源,还包括内存、文件句柄、套接字句柄和数据库链接等。计算方法:计算每一个任务对该资源的需求量,而后用该资源的可用总量除以每一个任务的须要量,所得结果就是线程池的大小上限。this

线程池的实现原理

ThreadPoolExecutor

Java的线程池针对不一样应用的场景,主要有固定长度类型、可变长度类型以及定时执行等几种。针对这几种类型的建立,java中有一个专门的Executors类提供了一系列的方法封装了具体的实现。这些功能和用途不同的线程池主要依赖ThreadPoolExecutor, ScheduledThreadPoolExecutor等几个类。如前面文章讨论所说,这些类和相关类的主要结构以下:

Java线程池相关类

ThreadPoolExecutor是实现线程池最核心的类之一。在分析ThreadPoolExecutor的实现原理以前,让来看看实现线程池须要考虑的点:
从线程池自己的定义来看,它是将一组事先建立好的线程放在一个资源池里,当须要的时候就将该线程分配给具体的任务来执行。那么,这个池子的大小如何肯定?线程池确定要面临多个线程资源访问的状况,是否是自己的结构要保证线程安全呢?若是线程池建立好以后后续有若干任务使用了线程资源,当池里面的资源使用完以后要如何安排?是给线程扩容,建立更多的线程资源,仍是增长一个队列,让一些任务先在里面排队呢?在一些极端的状况下,好比任务数量实在太多线程池处理不过来,对于这些任务怎么处理呢?线程执行的时候会碰到异常或都错误的状况,这些异常要如何处理?如何保证这些异常的处理不会致使线程池其余任务的正常运行不出错呢?

总结一下,这些问题能够概括为以下几点:

  1. 线程池的结构;

  2. 线程池的任务分配策略;

  3. 线程池的异常和错误处理机制;

下面结合ThreadPoolExecutor的实现源码来详细分析一下。

线程数量和线程池状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl是线程池的主要控制状态,它是一个AtomicInteger的数值,它表示了两部份内容:

  1. workerCount, 表示有效线程数;

  2. runState, 表示线程池状态,是否运行,中止等。

ctl是用一个integer(32)来包含线程池状态和数量的表示,高三位为线程池的状态,后(2^29)-1为线程数限制。 这就是为何前面用一个Integer.SIZE-3来做为位数。这样这个整数的0-28位表示的就是线程的数目。而高位的部分,29-31位表示线程池的状态。这里定义的主要有5种状态,分别对应值是从-1到3. 他们对应着线程的running, shutdown, stop, tidying, terminated这几个状态。

线程池的结构

除了以上部分外,线程池里还有以下成员:

private final BlockingQueue<Runnable> workQueue;
    
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * Set containing all worker threads in pool. Accessed only when holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * Wait condition to support awaitTermination
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * Tracks largest attained pool size. Accessed only under
     * mainLock.
     */
    private int largestPoolSize;

    /**
     * Counter for completed tasks. Updated only on termination of
     * worker threads. Accessed only under mainLock.
     */
    private long completedTaskCount;

    private volatile ThreadFactory threadFactory;

    /**
     * Handler called when saturated or shutdown in execute.
     */
    private volatile RejectedExecutionHandler handler;

    /**
     * Timeout in nanoseconds for idle threads waiting for work.
     * Threads use this timeout when there are more than corePoolSize
     * present or if allowCoreThreadTimeOut. Otherwise they wait
     * forever for new work.
     */
    private volatile long keepAliveTime;

    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
     * Core pool size is the minimum number of workers to keep alive
     * (and not allow to time out etc) unless allowCoreThreadTimeOut
     * is set, in which case the minimum is zero.
     */
    private volatile int corePoolSize;

    /**
     * Maximum pool size. Note that the actual maximum is internally
     * bounded by CAPACITY.
     */
    private volatile int maximumPoolSize;

workerQueue: 一个BlockingQueue<Runnable>队列,自己的结构能够保证访问的线程安全。至关于一个排队等待队列。当线程池的线程数达到corePoolSize的时候,一些须要等待执行的线程就放到这个队列里等待。
worker: 一个HashSet<Worker> 集合。线程池里全部能够当即执行的线程都放在这个集合里。
mainLock: 一个访问workers所须要使用的锁。从前面的workQueue,workers这两个结构能够看出,若是要往线程池里面增长执行任务或者执行完毕一个任务,都要访问这两个结构。因此大多数状况下为了保证线程安全,就须要使用mainLock这个锁。
corePoolSize:处于活跃状态的最少worker数目。每一个worker会建立一个新的线程去执行任务。在建立了线程池后,默认状况下,线程池中并无任何线程,而是等待有任务来才建立线程去执行,除非调用了prestartAllCoreThreads()或prestartCoreThread()方法。当线程池中的线程数达到corePoolSize后,就会把到达的任务放到workerQueue中去;
maximumPoolSize: 线程池的最大长度。当线程池里面的线程数达到这个数字时就不能再往里面加了,此时会根据设置的handler参数,即拒绝处理任务策略来处理新到来的任务。
keepAliveTime: 表示线程没有任务执行时最多保持多久时间会终止。默认状况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才起做用。当线程池中的线程数大于corePoolSize时,若是一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。可是若是调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起做用,直到线程数为0。
threadFactory: 线程工厂,主要用来建立线程;
largestPoolSize: 用来记录线程池中曾经有过的最大线程数,跟线程池的容易没有任何关系。
handler: 表示当拒绝处理任务时的策略,有以四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,可是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,而后从新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

任务的执行者——Worker

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
相关文章
相关标签/搜索