Java线程池之ThreadPoolExecutor

前言java

  线程池能够提升程序的并发性能(固然是合适的状况下),由于对于没有线程的状况下,咱们每一次提交任务都新建一个线程,这种方法存在很多缺陷:安全

1.  线程的建立和销毁的开销很是高,线程的建立须要时间,会延迟任务的执行,会消耗大量的系统资源。并发

2.  活跃的线程会消耗系统资源,而大量的空闲线程会占用许多内存,给垃圾回收器带来很大的压力,而大量线程在竞争CPU资源的时间还会产生气体的性能开销。框架

3.  系统在可建立的线程上存在一个限制,若是超过了这个限制,极可能抛出OOM。ide

  咱们不难发现,在必定范围下,增长线程可以提升系统的吞吐量,而当线程数超过合理值后,增长的线程反而会下降程序的执行速度。函数

Executor的引入性能

  在JDK1.5中,咱们引入了一个线程池框架,Executor框架,它可以分解任务的建立和执行过程。它包括Executor、ExecutorService、Callable等接口和Executors、ThreadPoolExecutor实现类等。this

注意点atom

  固然了看待事物须要辩证,是否使用了Executor框架就能很好地将复杂的任务执行解耦开来的。这边咱们其实须要限制一下它的使用范围。spa

  •          对于依赖型的任务而言,不是很适合使用线程池去操做,容易引起死锁,由于这种状况下咱们须要当心维持这些任务的执行顺序,以保证不会触发死锁。
  •          对于经过线程封闭实现线程安全的任务而言,使用单线程的Executor可以保证更安全的并发。
  •          使用ThreadLocal的任务,由于线程池会复用线程,这将致使任务的ThreadLocal值失去意义(除非线程本地值受限于任务的生命周期)。
  •          对响应时间敏感的任务,假设咱们将一个执行时间很长的任务,或者多个执行时间很长的任务放到一个单线程的Executor中或者一个包含少许线程的线程池中,都会下降程序的响应速度。

合适的线程数

  线程池在对处理同一类型的任务且相互独立的时候,能达到性能上的最佳,不然任务时长不一致很容易引发拥塞或是饥饿。

那线程池的线程数以多少为合适呢,对于计算密集型的任务而言,咱们最好设置的线程数 = CPU数+1;(+1是为了保证当某个线程由于缺页故障或其余缘由而暂停时,这个+1的线程可以确保CPU的时钟周期不会被浪费);而对于包含IO操做或者其余阻塞操做的任务时,因为线程不会一直执行,因此线程池的规模应该更大点。

线程池的使用 

 在实际使用过程当中,咱们通常借助于ThreadPoolExecutor来完成线程池的建立。ThreadPoolExecutor具备极好的扩展性,除了系统提供的四种经常使用的线程池,如CachedThreadPoolExecutor,FixedThreadPoolExecutor,SingleThreadPoolExecutor,ScheduledThreadExecutor。咱们能够自定义线程池的构造函数,如线程池的基本线程数、最大线程数、线程池的超时时间(时间+时间单位),线程池的任务队列,线程池的线程工厂,线程池的饱和策略。

固然,咱们在使用系统提供的四种线程池的时候,一样能够在后来修改线程池的配置。

线程的任务队列

LinkedBlockQueue:无界   CachedThreadPoolExecutor  FixedThreadPoolExecutor的默认任务队列

ArrayBlockQueue:有界

PriorityQueue:优先级队列,有界   ScheduledThreadPoolExecutor的默认任务队列

SynchronousQueue:同步移交,队列容量为0,仅当有线程准备好时才会将任务放到队列中。

饱和策略

饱和策略的设置

  对于有界队列而言,当有界队列被填满后,这时候咱们须要用到线程的饱和策略,前面提到咱们能够在后面配置线程池的设置,而饱和策略的修改就是经过ThreadPoolExecutor的setRejectedExecutionHandler方法来进行修改的(当某个任务呗提交到一个已经被关闭的Executor中,也会用到饱和策略)

         饱和策略主要有如下几种:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy。

  1.  AbortPolicy:停止策略,该策略将抛出未受检的RejectedExecution,调用者能够捕获这个异常根据实际须要编写直接的处理代码。
  2. CallerRunsPolicy:调用者运行,该策略不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者中,从而下降新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是调用了execute的线程中执行任务,当线程池的任务队列被填满后,下一个任务会在调用execute时在主线程中执行,因为执行任务须要必定时间,这段时间内主线程显然不能提交任务,从而保证线程池在这段时间内处理现有任务。
  3. DiscardPolicy:抛弃策略,舍弃任务。
  4. DiscardOldestPolicy:抛弃下一个将被执行的任务,而后尝试从新提交新的任务,(不适用于和优先队列合用,由于这样将抛弃的将是优先级最高的等待任务)

         固然,咱们可使用Semaphore(信号量)来控制任务的提交速率。

线程工厂 

  咱们能够经过自定义线程工厂,来实现咱们本身的线程。具体示例以下所示:

  自定义的线程工厂,记录了线程池的名字。

import java.util.concurrent.ThreadFactory;

/**
 * Created by DB on 2017/9/1.
 */
public class MyThreadFactory implements ThreadFactory {
    private final String poolName;

    public MyThreadFactory(String poolName) {
        this.poolName = poolName;
    }

    @Override
    public Thread newThread(Runnable r) {
        return new MyAPPThread(r,poolName);
    }
}

自定义的线程类:实现了指定线程的名字,设置自定义UncaughtExecptionHandler。

import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Created by DB on 2017/9/1.
 */
public class MyAPPThread extends Thread {
    public static  final  String DEFALUT_NAME = "MyAppThread";
    private static volatile boolean debugLifecycle = false;
    private static final AtomicInteger created = new AtomicInteger();
    private static final AtomicInteger alive = new AtomicInteger();
    private static final Logger log = Logger.getAnonymousLogger();

    public MyAPPThread(Runnable r){
        this(r,DEFALUT_NAME);
    }
    public MyAPPThread(Runnable r,String name){
        super(r,name +"-"+created.incrementAndGet());
        setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                log.log(Level.SEVERE,"Uncaught in thread"+t.getName(),e);
            }
        });
    }

    public void run(){
        boolean debug = debugLifecycle;
        if(debug){
            log.log(Level.FINE,"Created" +getName());
        }
        try {
            alive.incrementAndGet();
            super.run();
        }finally {
            alive.decrementAndGet();
            if(debug){
                log.log(Level.FINE,"Exiting"+getName());
            }
        }
    }
    public static int getThreadsCreated(){
        return  created.get();
    }
    public static int getThreadsAlive(){
        return alive.get();
    }
    public static boolean getDebug(){
        return  debugLifecycle;
    }
    public static  void setDebug(Boolean b){
        debugLifecycle=b;
    }
}

扩展ThreadPoolExecutor

  ThreadPoolExecutor是可扩展的,它提供了几个在子类中能够改写的方法:beforeExecute、afterExecute和terminated,经过实现这些方法咱们能够实现扩展。

         在执行任务的线程中将调用beforeExecute和afterExecute方法,在这些方法中咱们能够添加日志、计时。监视或者统计信息收集的功能。不管任务是从run中正常返回仍是抛出一个异常而返回,afterExecute都会被调用。而beforeExecute抛出RuntimeException时,任务将不被执行,afterExecute固然也不会被调用。

         在线程池完成关闭操做时,会调用terminated,也就在全部任务都已经完成而且全部工做者线程也已经关闭后。在这个方法中咱们能够实现Executor在其生命周期中分配的各类资源,还有执行发送通知、记录日志或者收集finalize统计信息等操做。

  具体的框架以下:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by DB on 2017/9/1.
 */
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
    }

    @Override
    protected void terminated() {
        super.terminated();
    }
}
相关文章
相关标签/搜索