Java线程池核心原理剖析

在系统开发时,咱们常常会遇到“池”的概念。使用池一种以空间换时间的作法,一般在内存中事先保存一系列整装待命的对象,以供后期供其余对象随时调用。常见的池有:数据库链接池,socket链接池,线程池等。今天咱们就来看一下线程池的概念。java


Executor
git

JDK为咱们提供了一套Executor框架来方便咱们来管理和使用线程池。
打开java.util.concurrent.Executors类,咱们能够发现JDK为咱们提供了那么多的方法来帮助咱们高效快捷的建立线程池:github

123456复制代码
public static ExecutorService newFixedThreadPool(int nThreads);//建立一个固定数目的、可重用的线程池public static ExecutorService newSingleThreadExecutor();//建立一个单线程化的线程public static ExecutorService newCachedThreadPool();//建立一个可缓存线程池public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);//建立一个支持定时及周期性任务执行的线程池public static ScheduledExecutorService newSingleThreadScheduledExecutor() ;//建立一个支持定时及周期性任务执行的线程池public static ExecutorService newWorkStealingPool() ;//建立一个拥有多个任务队列的线程池复制代码

上方简单列举了几个Executor框架为咱们提供的建立线程池的方法,这些线程池拥有各类各样的功能,我想当你刚刚开始使用线程的时候google如何使用线程池的时候大部分文章都是教你如何使用上方的一些方法建立一个线程池。可是若是你去查看他们的源码就会发现他们最后构造的时候都调用了同一个构造方法。(除了newWorkStealingPool以外,这个咱们在下篇文章再讨论)数据库

1234567复制代码
ThreadPoolExecutor(int corePoolSize,//线程池线程数量                           int maximumPoolSize,//线程中最大的线程数量                           long keepAliveTime,//线程池线程数量超过corePoolSize的空闲线程的存活时间                           TimeUnit unit,//keepAliveTime时间单位                           BlockingQueue<Runnable> workQueue,//被提交还没执行的任务存放在这里                           ThreadFactory threadFactory,//线程工厂                           RejectedExecutionHandler handler)//任务过多时的拒绝策略复制代码

上方的4个参数我想你看到了就会明白了,如今咱们着重来说一下下面的三个参数。缓存


WorkQueue
bash

参数workQueue是用来存放已提交但还未执行的任务,JDK为咱们提供了一下实现:框架

直接提交队列SynchronousQueuesocket

12345678910复制代码
当新任务过来的时候它是这样处理的:if(有空闲线程){    处理}else{    if(当前线程数<maximumPoolSize){        建立新线程处理    }else{        执行拒绝策略    }}复制代码

所以使用这个队列时必定要设置很大的maximumPoolSizeide

有界的任务队列ArrayBlockingQueueui

12345678910111213复制代码
if(当前线程数<corePoolSize){    建立新线程执行}else{    if(任务队列是否已满){       if(当前线程<maximumPoolSize){          建立新线程处理       }else{          执行拒绝策略        }    }else{       放到任务队列    }}复制代码

无界的任务队列LinkedBlockingDeque

12345678910111213复制代码
if(当前线程数<corePoolSize){    建立新线程执行}else{    放入任务队列,等待执行,直到系统资源耗尽}优先任务队列PriorityBlockingQueue根据任务的优先级将任务存放在任务队列特定位置if(当前线程数<corePoolSize){    建立新线程执行}else{    等待执行,直到系统资源耗尽}复制代码


线程工厂

第六个参数threadFactory是为线程池中建立线程的,咱们使用Executor框架建立的线程就是有threadFactory提供的。咱们看一下JDK提供的默认的threadFactory:

1234567891011121314151617181920212223242526复制代码
static class DefaultThreadFactory implements ThreadFactory {        private static final AtomicInteger poolNumber = new AtomicInteger(1);        private final ThreadGroup group;        private final AtomicInteger threadNumber = new AtomicInteger(1);        private final String namePrefix;        DefaultThreadFactory() {            SecurityManager s = System.getSecurityManager();            group = (s != null) ? s.getThreadGroup() :                                  Thread.currentThread().getThreadGroup();            namePrefix = "pool-" +                          poolNumber.getAndIncrement() +                         "-thread-";        }        public Thread newThread(Runnable r) {            Thread t = new Thread(group, r,                                  namePrefix + threadNumber.getAndIncrement(),                                  0);            if (t.isDaemon())                t.setDaemon(false);            if (t.getPriority() != Thread.NORM_PRIORITY)                t.setPriority(Thread.NORM_PRIORITY);            return t;        }    }复制代码

重点关注一下其中的newThread方法,看到这个我想你就明白了为何你使用线程池建立出来的线程打印的时候名字的来源,还有是不是守护线程和优先级等属性的来源了。


拒绝策略

看到刚刚的几种任务队列咱们发现当任务过多时是须要指定拒绝策略来进行拒绝呢,那么JDK又为咱们提供了哪些拒绝策略呢。

1234复制代码
AbortPolicy直接抛出异常。CallerRunsPolicy:若是线程池未关闭,则在调用者线程中运行当前任务DiscardOldestPolicy:丢弃即将执行的任务,而后再尝试提交当前任务DiscardPolicy:丢弃此任务复制代码


线程池的扩展

ThreadPoolExecutor不单单可以建立各类各样的线程来帮助咱们实行功能,它还预留了三个接口来供咱们进行扩展。

在runWorker方法中调用线程进行执行以前调用了beforeExecute方法,执行以后调用了afterExecute()方法

123456789101112131415161718192021222324252627282930313233343536373839复制代码
final void runWorker(Worker w) {       Thread wt = Thread.currentThread();       Runnable task = w.firstTask;       w.firstTask = null;       w.unlock();        boolean completedAbruptly = true;       try {           while (task != null || (task = getTask()) != null) {               w.lock();               if ((runStateAtLeast(ctl.get(), STOP) ||                    (Thread.interrupted() &&                     runStateAtLeast(ctl.get(), STOP))) &&                   !wt.isInterrupted())                   wt.interrupt();               try {                   beforeExecute(wt, task);//线程执行前                   Throwable thrown = null;                   try {                       task.run();                   } catch (RuntimeException x) {                       thrown = x; throw x;                   } catch (Error x) {                       thrown = x; throw x;                   } catch (Throwable x) {                       thrown = x; throw new Error(x);                   } finally {                       afterExecute(task, thrown);//线程执行后                   }               } finally {                   task = null;                   w.completedTasks++;                   w.unlock();               }           }           completedAbruptly = false;       } finally {           processWorkerExit(w, completedAbruptly);       }   }复制代码

这两个方法在ThreadPoolExecutor类中是没有实现的,咱们想要监控线程运行先后的数据就能够经过继承ThreadPoolExecutor类来实现这个扩展。
另外还有一个terminated()方法是在整个线程池退出的时候调用的,咱们这里一并扩展。

public class ThreadPoolExecutorDemo extends ThreadPoolExecutor {
    //注意这里由于ThreadPoolExecutor没有无参的构造,因此还须要重写一下构造方法。
    //这里限于篇幅就不贴了
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        System.out.println(Thread.currentThread().getId()+"执行完成");

    }
    @Override
    protected void terminated() {
        System.out.println("线程池退出");
    }
}

//使用这个demo就能够验证咱们扩展的结果了。

public class ThreadPoolDemo {
    static class ThreadDemo extends Thread {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID is:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutorDemo threadPoolExecutorDemo=  new ThreadPoolExecutorDemo(5,5,0,TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>());
        ThreadDemo threadDemo = new ThreadDemo();
        for (int i = 0; i < 20; i++) {
            threadPoolExecutorDemo.submit(threadDemo);
        }
        threadPoolExecutorDemo.shutdown();
    }
}
复制代码

本文全部源码:github.com/shiyujun/sy…



相关文章
相关标签/搜索