java并发编程实战:第八章----线程池的使用

1、在任务和执行策略之间隐性耦合算法

Executor框架将任务的提交和它的执行策略解耦开来。虽然Executor框架为制定和修改执行策略提供了至关大的灵活性,但并不是全部的任务都能适用全部的执行策略。数据库

  • 依赖性任务:依赖其余同步任务的结果,使其不得不顺序执行,影响活跃性
  • 使用线程封闭的任务:在单线程的Executor中执行,任务能够不是线程安全的,可是一旦提交到线程池时,就会失去线程安全
  • 对响应时间敏感的任务:在单个线程或含有少许线程的线程池中执行是不可接受的
  • 使用ThreadLocal的任务:ThreadLocal使每一个线程均可以拥有某个变量的一个私有"版本",而线程池中的线程是重复使用的,即一次使用完后,会被从新放回线程池,可被从新分配使用。所以,ThreadLocal线程变量,若是保存的信息只是针对一次请求的,放回线程池以前须要清空这些Threadlocal变量的值(或者取得线程以后,首先清空这些Threadlocal变量的值)

只有任务都是同类型而且相互独立时,线程池的效率达到最佳安全

一、线程饥饿死锁——在线程池中全部正在执行任务的线程都因为等待其余仍处于工做队列中的任务而阻塞服务器

  例1:在单线程池中,正在执行的任务阻塞等待队列中的某个任务执行完毕框架

  例2:线程池不够大时,经过栅栏机制协调多个任务时ide

  例3:因为其余资源的隐性限制,每一个任务都须要使用有限的数据库链接资源,那么无论线程池多大,都会表现出和和链接资源相同的大小 函数

每当提交了一个有依赖性的Executor任务时,要清楚地知道可能会出现线程"饥饿"死锁,所以须要在代码或配置Executor地配置文件中记录线程池地大小限制或配置限制性能

二、运行时间较长的任务ui

  线程池的大小应该超过有较长执行时间的任务数量,不然可能形成线程池中线程均服务于长时间任务致使其它短期任务也阻塞致使性能降低this

缓解策略:限定任务等待资源的时间,若是等待超时,那么能够把任务标示为失败,而后停止任务或者将任务从新返回队列中以便随后执行。这样,不管任务的最终结果是否成功,这种方法都能确保任务总能继续执行下去,并将线程释放出来以执行一些能更快完成的任务。例如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等

 

2、设置线程池的大小

线程池的理想大小取决于被提交任务的类型及所部署系统的特性

  • 线程池过大,那么大量的线程将在相对不多的CPU和内存资源上发生竞争,这不只会致使更高的内存使用量,并且还可能耗尽资源
  • 若是线程池太小,那么将致使许多空闲的处理器没法执行工做,从而下降吞吐量

对于计算密集型的任务,在拥有Ncpu个处理器的系统上,当线程池的大小为Ncpu+1时,一般能实现最优的利用率;对于包含I/O操做或者其余阻塞操做的任务,因为线程并不会一直执行,所以线程池的规模应该更大

N(threads)=N(cpu)*U(cpu)*(1+W/C)   N(cpu)=CPU的数量=Runtime.getRuntime().availableProcessors(); U(cpu)= 指望CPU的使用率,0<=U(cpu)<=1 ;W/C=等待时间与运行时间的比率

3、配置ThreadPoolExecutor

 

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

一、线程的建立与销毁

  • CorePoolSize: 线程池基本大小,在建立ThreadPoolExecutor初期,线程并不会当即启动,而是等到有任务提交时才会启动,除非调用prestartAllCoreThreads,而且只有在工做队列满了的状况下才会建立超出这个数量的线程。
  • MaxmumPooSize: 线程池最大大小表示可同时活动的线程数量的上限。若某个线程的空闲时间超过了keepAliveTime, 则被标记为可回收的

newFixedThreadPool: CorePoolSize = MaxmumPoolSize

newCachedThreadPool: CorePoolSize=0,MaxmumPoolSize=Integer.MAX_VALUE,线程池可被无限扩展,需求下降时自动回收

二、管理队列任务

  • workQueue:用于保存超过线程池线程处理速率的Runnable任务的队列 (三种:无界队列、有界队列和同步移交)

newFixedThreadPool和newSingleThreadPool在默认状况下将使用一个无界的LinkedBlockingQueue,有更好的性能

使用有界队列有助于避免资源耗尽的状况发生,为了不当队列填满后,在使用有界的工做队列时,队列的大小与线程池的大小必须一块儿调节,能防止过载

对于很是大的或者无界的线程池,能够经过使用SynchronousQueue来避免任务排队,要将一个元素放入SynchronousQueue中,必须有另外一个线程正在等待接受这个元素,任务会直接移交给执行它的线程,不然将拒绝任务。newCachedThreadPool工厂方法中就使用了SynchronousQueue

使用优先队列PriorityBlockingQueue能够控制任务被执行的顺序

三、饱和策略

  • AbortPolicy(停止策略),默认的饱和策略。会抛出RejectedExecutionException异常(抛弃当前任务vs抛弃最旧任务)
  • 调用者运行:下一个任务在调用了execute方法的主线程中进行运行,主线程至少在一段时间内不能提交任何任务。到达的请求将被保存在TCP层的队列中而不是在应用程序的队列中,致使服务器在高负载下实现一种平缓的性能下降

其余:对执行策略进行修改,使用信号量,控制处于执行中的任务

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;
    
    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }
    
    public void submitTask(final Runnable command){
        try {
            semaphore.acquire(); //提交任务前请求信号量
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    try{
                        command.run();
                    } finally{
                        semaphore.release(); //执行完释放信号
                    }
                }
            });
        } catch (InterruptedException e) {
            // handle exception
        }
    }
}

四、线程工厂

经过自定义线程工厂能够对其进行扩展加入新的功能实现

当应用须要利用安全策略来控制某些特殊代码库的访问权,能够利用PrivilegedThreadFactory来定制本身的线程工厂,以避免出现安全性异常。将与建立privilegedThreadFactory的线程拥有相同的访问权限、AccessControlContext和contextClassLoader

自定义线程工厂
 1 public class MyThreadFactory implements ThreadFactory {
 2     private final String poolName;
 3     
 4     public MyThreadFactory(String poolName) {
 5         super();
 6         this.poolName = poolName;
 7     }
 8 
 9     @Override
10     public Thread newThread(Runnable r) {
11         return new MyAppThread(r);
12     }
13 }
14 
15 public class MyAppThread extends Thread {
16     public static final String DEFAULT_NAME="MyAppThread";
17     private static volatile boolean debugLifecycle = false;
18     private static final AtomicInteger created = new AtomicInteger();
19     private static final AtomicInteger alive = new AtomicInteger();
20     private static final Logger log = Logger.getAnonymousLogger();
21     
22     public MyAppThread(Runnable r) {
23         this(r, DEFAULT_NAME);
24     }
25 
26     public MyAppThread(Runnable r, String name) {
27         super(r, name+ "-" + created.incrementAndGet());
28         setUncaughtExceptionHandler( //设置未捕获的异常发生时的处理器
29                 new Thread.UncaughtExceptionHandler() {
30                     @Override
31                     public void uncaughtException(Thread t, Throwable e) {
32                         log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
33                     }
34                 });
35     }
36 
37     @Override
38     public void run() {
39         boolean debug = debugLifecycle;
40         if (debug) 
41             log.log(Level.FINE, "running thread " + getName());
42         try {
43             alive.incrementAndGet();
44             super.run();
45         } finally {
46             alive.decrementAndGet();
47             if (debug) 
48                 log.log(Level.FINE, "existing thread " + getName());
49         }
50     }
51 } 

 

五、在调用构造函数后在定制ThreadPoolExecutor

  • 能够在建立线程池后,再经过Setter方法设置其基本属性(将ExecutorService扩展为ThreadPoolExecutor)
  • 在Executors中包含一个unconfigurableExecutorService工厂方法,该方法对一个现有的ExecutorService进行包装,使其只暴露出ExecutorService的方法,所以不能对它进行配置

 

4、扩展ThreadPoolExecutor

ThreadPoolExecutor使用了模板方法模式,提供了beforeExecute、afterExecute和terminated扩展方法

  • 线程执行前调用beforeExecute(若是beforeExecute抛出了一个RuntimeException,那么任务将不会被执行)
  • 线程执行后调用afterExecute(抛出异常也会调用,若是任务在完成后带有一个Error,那么就不会调用afterExecute)
  • 在线程池完成关闭操做时调用terminated,也就是全部任务都已经完成而且全部工做者线程也已经关闭后

 

增长日志和记时等功能的线程池
 1 public class TimingThreadPoolExecutor extends ThreadPoolExecutor {
 2     private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();//任务执行开始时间
 3     private final Logger log = Logger.getAnonymousLogger();
 4     private final AtomicLong numTasks = new AtomicLong(); //统计任务数
 5     private final AtomicLong totalTime = new AtomicLong(); //线程池运行总时间
 6 
 7     public TimingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
 8             long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
 9         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
10     }
11 
12     @Override
13     protected void beforeExecute(Thread t, Runnable r) {
14         super.beforeExecute(t, r);
15         log.fine(String.format("Thread %s: start %s", t, r));
16         startTime.set(System.nanoTime());
17     }
18 
19     @Override
20     protected void afterExecute(Runnable r, Throwable t) {
21         try{
22             long endTime = System.nanoTime();
23             long taskTime = endTime - startTime.get();
24             numTasks.incrementAndGet();
25             totalTime.addAndGet(taskTime);
26             log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
27         } finally{
28             super.afterExecute(r, t);
29         }
30     }
31 
32     @Override
33     protected void terminated() {
34         try{
35             //任务执行平均时间
36             log.info(String.format("Terminated: average time=%dns", totalTime.get() / numTasks.get()));
37         }finally{
38             super.terminated();
39         }
40     }
41 }
42 
43 

 

5、递归算法的并行化

  • 若是循环中的迭代操做都是独立的,而且不须要等待全部的迭代操做都完成再继续执行,那么就可使用Executor将串行循环转化为并行循环
  • 若是须要提交一个任务集并等待它们完成,那么可使用ExecutorService.invokeAll
  • 若是递归执行的任务中,在每一个迭代操做中都不须要来自于后续递归迭代的结果,能够建立一个特定于遍历过程的Executor,并使用shutdown和awaitTermination等方法,等待上面并行运行的结果
相关文章
相关标签/搜索