1、在任务和执行策略之间隐性耦合算法
Executor框架将任务的提交和它的执行策略解耦开来。虽然Executor框架为制定和修改执行策略提供了至关大的灵活性,但并不是全部的任务都能适用全部的执行策略。数据库
只有任务都是同类型而且相互独立时,线程池的效率达到最佳安全
一、线程饥饿死锁——在线程池中全部正在执行任务的线程都因为等待其余仍处于工做队列中的任务而阻塞服务器
例1:在单线程池中,正在执行的任务阻塞等待队列中的某个任务执行完毕框架
例2:线程池不够大时,经过栅栏机制协调多个任务时ide
例3:因为其余资源的隐性限制,每一个任务都须要使用有限的数据库链接资源,那么无论线程池多大,都会表现出和和链接资源相同的大小 函数
每当提交了一个有依赖性的Executor任务时,要清楚地知道可能会出现线程"饥饿"死锁,所以须要在代码或配置Executor地配置文件中记录线程池地大小限制或配置限制性能
二、运行时间较长的任务ui
线程池的大小应该超过有较长执行时间的任务数量,不然可能形成线程池中线程均服务于长时间任务致使其它短期任务也阻塞致使性能降低this
缓解策略:限定任务等待资源的时间,若是等待超时,那么能够把任务标示为失败,而后停止任务或者将任务从新返回队列中以便随后执行。这样,不管任务的最终结果是否成功,这种方法都能确保任务总能继续执行下去,并将线程释放出来以执行一些能更快完成的任务。例如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等
2、设置线程池的大小
线程池的理想大小取决于被提交任务的类型及所部署系统的特性
对于计算密集型的任务,在拥有Ncpu个处理器的系统上,当线程池的大小为Ncpu+1时,一般能实现最优的利用率;对于包含I/O操做或者其余阻塞操做的任务,因为线程并不会一直执行,所以线程池的规模应该更大
N(threads)=N(cpu)*U(cpu)*(1+W/C) N(cpu)=CPU的数量=Runtime.getRuntime().availableProcessors(); U(cpu)= 指望CPU的使用率,0<=U(cpu)<=1 ;W/C=等待时间与运行时间的比率
3、配置ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
一、线程的建立与销毁
newFixedThreadPool: CorePoolSize = MaxmumPoolSize
newCachedThreadPool: CorePoolSize=0,MaxmumPoolSize=Integer.MAX_VALUE,线程池可被无限扩展,需求下降时自动回收
二、管理队列任务
newFixedThreadPool和newSingleThreadPool在默认状况下将使用一个无界的LinkedBlockingQueue,有更好的性能
使用有界队列有助于避免资源耗尽的状况发生,为了不当队列填满后,在使用有界的工做队列时,队列的大小与线程池的大小必须一块儿调节,能防止过载
对于很是大的或者无界的线程池,能够经过使用SynchronousQueue来避免任务排队,要将一个元素放入SynchronousQueue中,必须有另外一个线程正在等待接受这个元素,任务会直接移交给执行它的线程,不然将拒绝任务。newCachedThreadPool工厂方法中就使用了SynchronousQueue
使用优先队列PriorityBlockingQueue能够控制任务被执行的顺序
三、饱和策略
其余:对执行策略进行修改,使用信号量,控制处于执行中的任务
public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command){ try { semaphore.acquire(); //提交任务前请求信号量 exec.execute(new Runnable() { @Override public void run() { try{ command.run(); } finally{ semaphore.release(); //执行完释放信号 } } }); } catch (InterruptedException e) { // handle exception } } }
四、线程工厂
经过自定义线程工厂能够对其进行扩展加入新的功能实现
当应用须要利用安全策略来控制某些特殊代码库的访问权,能够利用PrivilegedThreadFactory来定制本身的线程工厂,以避免出现安全性异常。将与建立privilegedThreadFactory的线程拥有相同的访问权限、AccessControlContext和contextClassLoader
自定义线程工厂
1 public class MyThreadFactory implements ThreadFactory { 2 private final String poolName; 3 4 public MyThreadFactory(String poolName) { 5 super(); 6 this.poolName = poolName; 7 } 8 9 @Override 10 public Thread newThread(Runnable r) { 11 return new MyAppThread(r); 12 } 13 } 14 15 public class MyAppThread extends Thread { 16 public static final String DEFAULT_NAME="MyAppThread"; 17 private static volatile boolean debugLifecycle = false; 18 private static final AtomicInteger created = new AtomicInteger(); 19 private static final AtomicInteger alive = new AtomicInteger(); 20 private static final Logger log = Logger.getAnonymousLogger(); 21 22 public MyAppThread(Runnable r) { 23 this(r, DEFAULT_NAME); 24 } 25 26 public MyAppThread(Runnable r, String name) { 27 super(r, name+ "-" + created.incrementAndGet()); 28 setUncaughtExceptionHandler( //设置未捕获的异常发生时的处理器 29 new Thread.UncaughtExceptionHandler() { 30 @Override 31 public void uncaughtException(Thread t, Throwable e) { 32 log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e); 33 } 34 }); 35 } 36 37 @Override 38 public void run() { 39 boolean debug = debugLifecycle; 40 if (debug) 41 log.log(Level.FINE, "running thread " + getName()); 42 try { 43 alive.incrementAndGet(); 44 super.run(); 45 } finally { 46 alive.decrementAndGet(); 47 if (debug) 48 log.log(Level.FINE, "existing thread " + getName()); 49 } 50 } 51 }
五、在调用构造函数后在定制ThreadPoolExecutor
4、扩展ThreadPoolExecutor
ThreadPoolExecutor使用了模板方法模式,提供了beforeExecute、afterExecute和terminated扩展方法
增长日志和记时等功能的线程池
1 public class TimingThreadPoolExecutor extends ThreadPoolExecutor { 2 private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();//任务执行开始时间 3 private final Logger log = Logger.getAnonymousLogger(); 4 private final AtomicLong numTasks = new AtomicLong(); //统计任务数 5 private final AtomicLong totalTime = new AtomicLong(); //线程池运行总时间 6 7 public TimingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 8 long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 9 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 10 } 11 12 @Override 13 protected void beforeExecute(Thread t, Runnable r) { 14 super.beforeExecute(t, r); 15 log.fine(String.format("Thread %s: start %s", t, r)); 16 startTime.set(System.nanoTime()); 17 } 18 19 @Override 20 protected void afterExecute(Runnable r, Throwable t) { 21 try{ 22 long endTime = System.nanoTime(); 23 long taskTime = endTime - startTime.get(); 24 numTasks.incrementAndGet(); 25 totalTime.addAndGet(taskTime); 26 log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime)); 27 } finally{ 28 super.afterExecute(r, t); 29 } 30 } 31 32 @Override 33 protected void terminated() { 34 try{ 35 //任务执行平均时间 36 log.info(String.format("Terminated: average time=%dns", totalTime.get() / numTasks.get())); 37 }finally{ 38 super.terminated(); 39 } 40 } 41 } 42 43
5、递归算法的并行化