java并发编程——线程池的工做原理与源码解读

线程池的简单介绍

基于多核CPU的发展,使得多线程开发日趋流行。然而线程的建立和销毁,都涉及到系统调用,比较消耗系统资源,因此就引入了线程池技术,避免频繁的线程建立和销毁。html

在Java用有一个Executors工具类,能够为咱们建立一个线程池,其本质就是new了一个ThreadPoolExecutor对象。java

建议使用较为方便的 Executors 工厂方法来建立线程池。缓存

  • Executors.newCachedThreadPool()(无界线程池,能够进行自动线程回收)
  • Executors.newFixedThreadPool(int)(固定大小线程池)
  • Executors.newSingleThreadExecutor()(单个后台线程)。
  • Executors.newScheduledThreadPool() (支持计划任务的线程池)

ThreadPoolExecutor工做原理介绍

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  1. corePoolSize:线程池的核心线程数,说白了就是,即使是线程池里没有任何任务,也会有corePoolSize个线程在候着等任务。
  2. maximumPoolSize:最大线程数,无论你提交多少任务,线程池里最多工做线程数就是maximumPoolSize。
  3. keepAliveTime:线程的存活时间。当线程池里的线程数大于corePoolSize时,若是等了keepAliveTime时长尚未任务可执行,则线程退出。
  4. unit:这个用来指定keepAliveTime的单位,好比秒:TimeUnit.SECONDS。
  5. workQueue:一个阻塞队列,提交的任务将会被放到这个队列里。
  6. threadFactory:线程工厂,用来建立线程,主要是为了给线程起名字,默认工厂的线程名字:pool-1-thread-3。
  7. handler:拒绝策略,当线程池里线程被耗尽,且队列也满了的时候会调用。

线程池的执行流程图 多线程

任务被提交到线程池,会先判断当前线程数量是否小于corePoolSize,若是小于则建立线程来执行提交的任务,不然将任务放入workQueue队列,若是workQueue满了,则判断当前线程数量是否小于maximumPoolSize,若是小于则建立线程执行任务,不然就会调用handler,以表示线程池拒绝接收任务。框架

线程池使用介绍

newScheduledThreadPool的使用示例

public class SchedulePoolDemo {

    public static void main(String[] args){
        ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
        //若是前面的任务没有完成, 调度也不会启动
        service.scheduleAtFixedRate(()->{
         try {
             Thread.sleep(2000);// 每两秒打印一次.
             System.out.println(System.currentTimeMillis()/1000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
        }, 0, 2, TimeUnit.SECONDS);
    }
}

潜在宕机风险

使用Executors来建立要注意潜在宕机风险.其返回的线程池对象的弊端以下:ide

  • FixedThreadPool和SingleThreadPoolPool : 容许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而致使 OOM.
  • CachedThreadPool和ScheduledThreadPool : 容许的建立线程数量为 Integer.MAX_VALUE,可能会建立大量的线程,从而致使 OOM.

综上所述, 在可能有大量请求的线程池场景中, 更推荐自定义ThreadPoolExecutor来建立线程池, 具体构造函数配置以下:函数

线程池大小配置

通常根据任务类型进行区分, 假设CPU为N核工具

  • CPU密集型任务须要减小线程数量, 下降线程之间切换形成的开销, 可配置线程池大小为N + 1.
  • IO密集型任务则能够加大线程数量, 可配置线程池大小为 N * 2.
  • 混合型任务则能够拆分为CPU密集型与IO密集型, 独立配置.

自定义阻塞队列BlockingQueue

主要存放等待执行的线程, ThreadPoolExecutor中支持自定义该队列来实现不一样的排队队列.oop

  • ArrayBlockingQueue:先进先出队列,建立时指定大小, 有界;
  • LinkedBlockingQueue:使用链表实现的先进先出队列,默认大小为Integer.MAX_VALUE;
  • SynchronousQueue:不保存提交的任务, 数据也不会缓存到队列中, 用于生产者和消费者互等对方, 一块儿离开.
  • PriorityBlockingQueue: 支持优先级的队列

回调接口

线程池提供了一些回调方法, 具体使用以下所示.源码分析

ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行任务: " + r.toString());
            }
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("结束任务: " + r.toString());
            }
            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };

能够在回调接口中, 对线程池的状态进行监控, 例如任务执行的最长时间, 平均时间, 最短期等等, 还有一些其余的属性以下:

  • taskCount:线程池须要执行的任务数量.
  • completedTaskCount:线程池在运行过程当中已完成的任务数量.小于或等于taskCount.
  • largestPoolSize:线程池曾经建立过的最大线程数量.经过这个数据能够知道线程池是否满过.如等于线程池的最大大小,则表示线程池曾经满了.
  • getPoolSize:线程池的线程数量.若是线程池不销毁的话,池里的线程不会自动销毁,因此这个大小只增不减.
  • getActiveCount:获取活动的线程数.

自定义拒绝策略

线程池满负荷运转后, 由于时间空间的问题, 可能须要拒绝掉部分任务的执行.

jdk提供了RejectedExecutionHandler接口, 并内置了几种线程拒绝策略

  • AbortPolicy: 直接拒绝策略, 抛出异常.
  • CallerRunsPolicy: 调用者本身执行任务策略.
  • DiscardOldestPolicy: 舍弃最老的未执行任务策略. 使用方式也很简单, 直接传参给ThreadPool
ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println("reject task: " + r.toString());
                    }
                });

自定义ThreadFactory

线程工厂用于建立池里的线程. 例如在工厂中都给线程setDaemon(true), 这样程序退出的时候, 线程自动退出.或者统一指定线程优先级, 设置名称等等.

class NamedThreadFactory implements ThreadFactory {
    private static final AtomicInteger threadIndex = new AtomicInteger(0);
    private final String baseName;
    private final boolean daemon;

    public NamedThreadFactory(String baseName) {
        this(baseName, true);
    }

    public NamedThreadFactory(String baseName, boolean daemon) {
        this.baseName = baseName;
        this.daemon = daemon;
    }

    public Thread newThread(Runnable runnable) {
        Thread thread = new Thread(runnable, this.baseName + "-" + threadIndex.getAndIncrement());
        thread.setDaemon(this.daemon);
        return thread;
    }
}

关闭线程池

跟直接new Thread不同, 局部变量的线程池, 须要手动关闭, 否则会致使线程泄漏问题.

默认提供两种方式关闭线程池.

- shutdown: 等全部任务, 包括阻塞队列中的执行完, 才会终止, 可是不会接受新任务.
- shutdownNow: 当即终止线程池, 打断正在执行的任务, 清空队列.

ThreadPoolExecutor源码分析

ThreadPoolExecutor中ctl属性介绍

ctl是ThreadPoolExecutor的一个重要属性,它记录着ThreadPoolExecutor的线程数量和线程状态。

//Integer有32位,其中前三位用于记录线程状态,后29位用于记录线程的数量.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//表示用于记录线程数量的位数
private static final int COUNT_BITS = Integer.SIZE - 3;
//将1左移COUNT_BITS位再减1,表示能表示的最大线程数
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//用ctl前三位分别表示线程池的状态
//(前三位为111)接受新任务而且处理已经进入队列的任务
private static final int RUNNING    = -1 << COUNT_BITS;
//(前三位为000)不接受新任务,可是处理已经进入队列的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//(前三位001)不接受新任务,不处理已经进入队列的任务,而且中断正在执行的任务
private static final int STOP       =  1 << COUNT_BITS;
//(前三位010)全部任务执行完成,workerCount为0。线程转到了状态TIDYING会执行terminated()钩子方法
private static final int TIDYING    =  2 << COUNT_BITS;
//(前三位011)任务已经执行完成
private static final int TERMINATED =  3 << COUNT_BITS;
//状态值就是只关心前三位的值,因此把后29位清0
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//线程数量就是只关心后29位的值,因此把前3位清0
private static int workerCountOf(int c)  { return c & CAPACITY; }

//两个数相或
private static int ctlOf(int rs, int wc) { return rs | wc; }

execute()方法解析

public void execute(Runnable command) {
        if (command == null) throw new NullPointerException();
        int c = ctl.get();
       //判断当前活跃线程数是否小于corePoolSize
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))//调用addWorker建立线程执行任务
                return;
            c = ctl.get();
        }
        //若是不小于corePoolSize,则将任务添加到workQueue队列。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();//再次获取ctl的状态
            //若是不在运行状态了,那么就从队列中移除任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //若是在运行阶段,可是Worker数量为0,调用addWorker方法
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //尝试建立非核心线程若是建立失败就会调用reject拒绝接受任务。
        else if (!addWorker(command, false))
            reject(command);
    }
//调用handler的rejectedExecution(command,this)方法。handler是RejectedExecutionHandler接口,默认实现是AbortPolicy
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

addWorker()方法解析

addWorker方法用于建立线程,而且经过core参数表示该线程是不是核心线程,若是返回true则表示建立成功,不然失败。addWorker的代码以下所示:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获得线程池的运行状态

            // rs>=SHUTDOWN为false,即线程池处于RUNNING状态.
            // rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()这个条件为true,也就意味着三个条件同时知足,即线程池状态为SHUTDOWN且firstTask为null且队列不为空,这种状况为处理队列中剩余任务。上面提到过当处于SHUTDOWN状态时,不接受新任务,可是会处理完队列里面的任务。若是firstTask不为null,那么就属于添加新任务;若是firstTask为null,而且队列为空,那么就不须要再处理了。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                //若是建立的是非核心线程(core=false)时,则须要判断当前线程数wc>=maximumPoolSize,若是返回false,建立非核心线程失败。
                //若是建立的是核心线程(core=true)时,则须要判断当前线程数wc>=corePoolSize,若是返回false,建立核心线程失败。
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//worker+1执行成功,那么跳出外循环
                    break retry;
                c = ctl.get();
                if (runStateOf(c) != rs)//再次判断当前状态,若是新获取的状态和当前状态不一致,则再次进入外循环
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }


/*
一旦跳出外循环,表示能够建立建立线程,这里具体是Worker对象,Worker实现了Runnable接口而且继承AbstractQueueSynchronizer,内部维持一个Runnable的队列。try块中主要就是建立Worker对象,而后将其保存到workers中,workers是一个HashSet,表示工做线程的集合。而后若是添加成功,则开启Worker所在的线程。若是开启线程失败,则调用addWorkerFailed方法,addWokerFailed用于回滚worker线程的建立。
*/
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //以firstTask做为Worker的第一个任务建立Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();//对整个线程池加锁
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//启动启动这个线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorkerFailed()方法解析

private void addWorkerFailed(Worker w) {
        //对整个线程成绩加锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //移除Worker对象
            if (w != null)
                workers.remove(w);
            //减少worker数量
            decrementWorkerCount();
            //检查termination状态
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

addWorkerFailed首先从workers集合中移除线程,而后将wokerCount减1,最后检查终结。

tryTerminate()方法解析

tryTerminate()方法用于检查是否有必要将线程池状态转移到TERMINATED。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /*
                状态判断,若是有符合如下条件之一。则跳出循环
               (1)线程池处于RUNNING状态
               (2)线程池状态处于TIDYING状态
               (3)线程池状态处于SHUTDOWN状态而且队列不为空
若是不知足上述的状况,那么目前状态属于SHUTDOWN切队列为空,或者状态属于STOP,那么调用interruptIdleWorkers方法中止一个Worker线程,而后退出。
            */
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
/*
若是没有退出循环的话,那么就首先将状态设置成TIDYING,而后调用terminated方法,最后设置状态为TERMINATED。terminated方法是个空实现,用于当线程池终结时处理一些事情。
*/
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

构造函数Worker(firstTask)解析

Worker继承自AbstractQueuedSynchronizer并实现Runnbale接口。AbstractQueuedSynchronizer提供了一个实现阻塞锁和其余同步工具,好比信号量、事件等依赖于等待队列的框架。Worker的构造方法中会使用threadFactory构造线程变量并持有run方法调用了runWorker方法,将线程委托给主循环线程。

Worker(Runnable firstTask) {
    setState(-1);
    this.firstTask = firstTask;//设置该线程的
    this.thread = getThreadFactory().newThread(this);//建立一个线程
}

//当我咱们启动一个线程时就会触发Worker中的此方法
public void run() {
    runWorker(this);
}

runWorker()方法解析

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;//首次任务是建立Worker时添加的任务
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           //线程调用runWoker,会while循环调用getTask方法从workerQueue里读取任务,而后执行任务。只要getTask方法不返回null,此线程就不会退出。
            while (task != null || (task = getTask()) != null) {
                w.lock();//对Worker加锁
                //若是线程池中止了,那么中断线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//空实现,任务运行以前的操做
                    Throwable thrown = null;
                    try {
                        task.run();//执行任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//空实现,任务运行以后的操做
                    }
                } finally {
                    task = null;//任务执行完毕后,将task设置为null
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask()方法解析

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //必要时检查队列是否为空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            //判断是否容许超时,wc>corePoolSize则是判断当前线程数是否大于corePoolSize。
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //若是当前线程数大于corePoolSize,
                //则会调用workQueue的poll方法获取任务,超时时间是keepAliveTime。
                //若是超过keepAliveTime时长,poll返回了null,
                //上边提到的while循序就会退出,线程也就执行完了。
                //若是当前线程数小于corePoolSize,
                //则会调用workQueue的take方法阻塞当前线程,不会退出
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

参考地址:

相关文章
相关标签/搜索