Executor框架

线程池

无限制的建立线程

若采用"为每一个任务分配一个线程"的方式会存在一些缺陷,尤为是当须要建立大量线程时:java

  • 线程生命周期的开销很是高
  • 资源消耗
  • 稳定性

引入线程池

任务是一组逻辑工做单元,线程则是使任务异步执行的机制。当存在大量并发任务时,建立、销毁线程须要很大的开销,运用线程池能够大大减少开销。缓存

 

 

Executor框架

说明:并发

  • Executor 执行器接口,该接口定义执行Runnable任务的方式。
  • ExecutorService 该接口定义提供对Executor的服务。
  • ScheduledExecutorService 定时调度接口。
  • AbstractExecutorService 执行框架抽象类。
  • ThreadPoolExecutor JDK中线程池的具体实现。
  • Executors 线程池工厂类。

 

 

ThreadPoolExecutor 线程池类

线程池是一个复杂的任务调度工具,它涉及到任务、线程池等的生命周期问题。要配置一个线程池是比较复杂的,尤为是对于线程池的原理不是很清楚的状况下,颇有可能配置的线程池不是较优的。框架

JDK中的线程池均由ThreadPoolExecutor类实现。其构造方法以下:less

 

 

参数说明:异步

corePoolSize:核心线程数。ide

maximumPoolSize:最大线程数。高并发

keepAliveTime:线程存活时间。当线程数大于core数,那么超过该时间的线程将会被终结。工具

unit:keepAliveTime的单位。java.util.concurrent.TimeUnit类存在静态静态属性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDSthis

workQueue:Runnable的阻塞队列。若线程池已经被占满,则该队列用于存放没法再放入线程池中的Runnable。

 

 

另外一个构造方法:

 

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)

 

该方法在下面的扩展部分有更深刻的讲解。其中handler表示线程池对拒绝任务的处理策略。

 

ThreadPoolExecutor的使用须要注意如下概念:

  • 若线程池中的线程数量小于corePoolSize,即便线程池中的线程都处于空闲状态,也要建立新的线程来处理被添加的任务。

  • 若线程池中的线程数量等于 corePoolSize且缓冲队列 workQueue未满,则任务被放入缓冲队列。

  • 若线程池中线程的数量大于corePoolSize且缓冲队列workQueue满,且线程池中的数量小于maximumPoolSize,则建新的线程来处理被添加的任务。

  • 若线程池中线程的数量大于corePoolSize且缓冲队列workQueue满,且线程池中的数量等于maximumPoolSize,那么经过 handler所指定的策略来处理此任务。

  • 当线程池中的线程数量大于corePoolSize时,若是某线程空闲时间超过keepAliveTime,线程将被终止。

 

 

Executors 工厂方法

JDK内部提供了五种最多见的线程池。由Executors类的五个静态工厂方法建立。

  • newFixedThreadPool(...)
  • newSingleThreadExecutor(...)
  • newCachedThreadPool(...)
  • newScheduledThreadPool(...)
  • newSingleThreadScheduledExecutor()

 

单线程的线程池 newSingleThreadExecutor

这个线程池只有一个线程在工做,也就是至关于单线程串行执行全部任务。

返回单线程的Executor,将多个任务交给此Exector时,这个线程处理完一个任务后接着处理下一个任务,若该线程出现异常,将会有一个新的线程来替代。此线程池保证全部任务的执行顺序按照任务的提交顺序执行。

说明:LinkedBlockingQueue会无限的添加须要执行的Runnable。

 

建立固定大小的线程池 newFixedThreadPool

每次提交一个任务就建立一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,若是某个线程由于执行异常而结束,那么线程池会补充一个新线程。

public static ExecutorSevice newFixedThreadPool()

返回一个包含指定数目线程的线程池,若是任务数量多于线程数目,那么没有没有执行的任务必须等待,直到有任务完成为止。

 

可缓存的线程池 newCachedThreadPool

若是线程池的大小超过了处理任务所须要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增长时,此线程池又能够智能的添加新线程来处理任务。此线程池不会对线程池大小作限制,线程池大小彻底依赖于操做系统(或者说JVM)可以建立的最大线程大小。

newCachedThreadPool方法建立的线程池能够自动的扩展线程池的容量。核心线程数量为0。

SynchronousQueue是个特殊的队列。 SynchronousQueue队列的容量为0。当试图为SynchronousQueue添加Runnable,则执行会失败。只有当一边从SynchronousQueue取数据,一边向SynchronousQueue添加数据才能够成功。SynchronousQueue仅仅起到数据交换的做用,并不保存线程。但newCachedThreadPool()方法没有线程上限。Runable添加到SynchronousQueue会被马上取出。

根据用户的任务数建立相应的线程来处理,该线程池不会对线程数目加以限制,彻底依赖于JVM能建立线程的数量,可能引发内存不足。

 

定时任务调度的线程池 newScheduledThreadPool

建立一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

例:

复制代码

public class ScheduledThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleWithFixedDelay(new Runnable() {
            
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(new Date());
            }
        }, 1000, 2000, TimeUnit.MILLISECONDS);
    }
    
}

复制代码

 

 

单线程的定时任务调度线程池 newSingleThreadScheduledExecutor

此线程池支持定时以及周期性执行任务的需求。

 

 

Executor接口

Executor是一个线程执行接口。任务执行的主要抽象不是Thead,而是Executor。

public interface Executor{
    void executor(Runnable command);
}

 

Executor将任务的提交过程与执行过程解耦,并用Runnable来表示任务。执行的任务放入run方法中便可,将Runnable接口的实现类交给线程池的execute方法,做为它的一个参数。若是须要给任务传递参数,能够经过建立一个Runnable接口的实现类来完成。

Executor能够支持多种不一样类型的任务执行策略。

Executor基于生产者消费者模式,提交任务的操做至关于生产者,执行任务的线程则至关于消费者。

 

ExecutorService接口

线程池接口。ExecutorService在Executor的基础上增长了一些方法,其中有两个核心的方法:

Future<?> submit(Runnable task)

<T> Future<T> submit(Callable<T> task)

 

这两个方法都是向线程池中提交任务,它们的区别在于Runnable在执行完毕后没有结果,Callable执行完毕后有一个结果。这在多个线程中传递状态和结果是很是有用的。另外他们的相同点在于都返回一个Future对象。Future对象能够阻塞线程直到运行完毕(获取结果,若是有的话),也能够取消任务执行,固然也可以检测任务是否被取消或者是否执行完毕。

在没有Future以前咱们检测一个线程是否执行完毕一般使用Thread.join()或者用一个死循环加状态位来描述线程执行完毕。如今有了更好的方法可以阻塞线程,检测任务执行完毕甚至取消执行中或者未开始执行的任务。

 

ScheduledExecutorService接口

ScheduledExecutorService描述的功能和Timer/TimerTask相似,解决那些须要任务重复执行的问题。这包括延迟时间一次性执行、延迟时间周期性执行以及固定延迟时间周期性执行等。固然了继承ExecutorService的ScheduledExecutorService拥有ExecutorService的所有特性。

 

 

 

线程池生命周期

线程是有多种执行状态的,一样管理线程的线程池也有多种状态。JVM会在全部线程(非后台daemon线程)所有终止后才退出,为了节省资源和有效释放资源关闭一个线程池就显得很重要。有时候没法正确的关闭线程池,将会阻止JVM的结束。

线程池Executor是异步的执行任务,所以任什么时候刻不可以直接获取提交的任务的状态。这些任务有可能已经完成,也有可能正在执行或者还在排队等待执行。所以关闭线程池可能出现一下几种状况:

  • 平缓关闭:已经启动的任务所有执行完毕,同时再也不接受新的任务。
  • 当即关闭:取消全部正在执行和未执行的任务。

另外关闭线程池后对于任务的状态应该有相应的反馈信息。

 

启动线程池

线程池在构造前(new操做)是初始状态,一旦构造完成线程池就进入了执行状态RUNNING。严格意义上讲线程池构造完成后并无线程被当即启动,只有进行"预启动"或者接收到任务的时候才会启动线程。

线程池是处于运行状态,随时准备接受任务来执行。

 

关闭线程池

线程池运行中能够经过shutdown()和shutdownNow()来改变运行状态。

  • shutdown():平缓的关闭线程池。线程池中止接受新的任务,同时等待已经提交的任务执行完毕,包括那些进入队列尚未开始的任务。shutdown()方法执行过程当中,线程池处于SHUTDOWN状态。
  • shutdownNow():当即关闭线程池。线程池中止接受新的任务,同时线程池取消全部执行的任务和已经进入队列可是尚未执行的任务。shutdownNow()方法执行过程当中,线程池处于STOP状态。shutdownNow方法本质是调用Thread.interrupt()方法。但咱们知道该方法仅仅是让线程处于interrupted状态,并不会让线程真正的中止!因此若只调用或只调用一次shutdownNow()方法,不必定会让线程池中的线程都关闭掉,线程中必需要有处理interrupt事件的机制。

 

线程池结束

一旦shutdown()或者shutdownNow()执行完毕,线程池就进入TERMINATED状态,即线程池就结束了。

  • isTerminating() 若是关闭后全部任务都已完成,则返回 true。
  • isShutdown() 若是此执行程序已关闭,则返回 true。

 

例:使用固定大小的线程池。并将任务添加到线程池。

复制代码

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class JavaThreadPool {
    public static void main(String[] args) {
        // 建立一个可重用固定线程数的线程池
        ExecutorService pool = Executors.newFixedThreadPool(2);
        // 建立实现了Runnable接口对象,Thread对象固然也实现了Runnable接口
        Thread t1 = new MyThread();
        Thread t2 = new MyThread();
        Thread t3 = new MyThread();
        Thread t4 = new MyThread();
        Thread t5 = new MyThread();

        // 将线程放入池中进行执行
        pool.execute(t1);
        pool.execute(t2);
        pool.execute(t3);
        pool.execute(t4);
        pool.execute(t5);

        // 关闭线程池
        pool.shutdown();
    }

}


class MyThread extends Thread {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "正在执行。。。");
    }

}

复制代码

 

 

Java线程池扩展

ThreadPoolExecutor线程池的执行监控

ThreadPoolExecutor中定义了三个空方法,用于监控线程的执行状况。

ThreadPoolExecutor源码:

protected void beforeExecute(Thread t, Runnable r) { }

protected void afterExecute(Runnable r, Throwable t) { }

protected void terminated() { }

 

例:使用覆盖方法,定义新的线程池。

复制代码

public class ExtThreadPoolTest {
    
    static class MyTask implements Runnable {
        public String name;
        
        public MyTask(String name) {
            super();
            this.name = name;
        }
        
        @Override
        public void run() {
            try {
                Thread.sleep(500);
                System.out.println("执行中:"+this.name);
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        
        ExecutorService es = new ThreadPoolExecutor(5,5,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()){

            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:" + ((MyTask)r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成:" + ((MyTask)r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("执行退出");
            }
            
        };
        
        
        for(int i=0;i<5;i++){
            MyTask task = new MyTask("Task-"+i);
            es.execute(task);
        }
        
        Thread.sleep(10);    // 等待terminated()执行
        
        es.shutdown();    // 若无该方法,主线程不会结束。
    }
    
}

复制代码

 

 

 

ThreadPoolExecutor的拒绝策略

线程池不可能处理无限多的线程。因此一旦线程池中中须要执行的任务过多,线程池对于某些任务就没法处理了。拒绝策略即对这些没法处理的任务进行处理。可能丢弃掉这些不能处理的任务,也可能用其余方式。

ThreadPoolExecutor类还有另外一个构造方法。该构造方法中的RejectedExecutionHandler 用于定义拒绝策略。 

复制代码

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    
    .....
    
    }

复制代码

 

 

JDK内部已经提供一些拒绝策略。

 

 

AbortPolicy 一旦线程不能处理,则抛出异常。

AbortPolicy源码:

复制代码

public static class AbortPolicy implements RejectedExecutionHandler {

        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always.
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

复制代码

 

 

DiscardPolicy 一旦线程不能处理,则丢弃任务。

DiscardPolicy源码:

复制代码

public static class DiscardPolicy implements RejectedExecutionHandler {

        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }


        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }

    }

复制代码

 

 

CallerRunsPolicy 一旦线程不能处理,则将任务返回给提交任务的线程处理。

CallerRunsPolicy源码:

复制代码

public static class CallerRunsPolicy implements RejectedExecutionHandler {

        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

 
        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

复制代码

 

 

DiscardOldestPolicy 一旦线程不能处理,丢弃掉队列中最老的任务。

DiscardOldestPolicy源码:

复制代码

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

复制代码

 

 

例:自定义拒绝策略。打印并丢弃没法处理的任务。

复制代码

public class RejectedPolicyHandleTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5,5,0,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new RejectedExecutionHandler() {
            
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // 打印并丢弃。
                System.out.println(r.toString()+" is discard");
            }
        });
        
        for(int i=0;i<Integer.MAX_VALUE;i++){
            MyTask task = new MyTask("Task-"+i);
            es.execute(task);
            Thread.sleep(10);
        }
        
        es.shutdown();    // 若无该方法,主线程不会结束。
        
    }
}

复制代码

 

 

ThreadFactory 线程工厂

ThreadPoolExecutor类构造器的参数其中之一即为ThreadFactory线程工厂。

ThreadFactory用于建立线程池中的线程。

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

 

ThreadFactory的实现类中通常定义线程了线程组,线程数与线程名称。

DefaultThreadFactory源码:

复制代码

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

复制代码

 

 

CompletionService接口

这里须要稍微提一下的是CompletionService接口,它是用于描述顺序获取执行结果的一个线程池包装器。它依赖一个具体的线程池调度,可是可以根据任务的执行前后顺序获得执行结果,这在某些状况下可能提升并发效率。

相关文章
相关标签/搜索