你们好,我是练习java两年半时间的南橘,小伙伴能够一块儿互相交流经验哦。java
ThreadPoolExecutor是能够扩展的,它内部提供了几个能够在子类中改写的方法(红框内)。JDK内的注解上说,这些方法能够用以添加日志,计时、监视或进行统计信息的收集。是否是感受很熟悉?有没有一种spring aop中 @Around @Before @After三个注解的既视感?spring
咱们来对比一下dom
ThreadPoolExecutor | spring aop |
---|---|
beforeExecute()(线程执行以前调用) | @Before(在所拦截的方法执行以前执行 ) |
afterExecute() (线程执行以后调用) | @After (在所拦截的方法执行以后执行) |
terminated() (线程池退出时候调用) | |
@Around(能够同时在所拦截的方法先后执行) |
其实他们的效果是同样的,只是一个在线程池里,一个在拦截器中。异步
对于ThreadPoolExecutor中的这些方法,有这样的一些特色:性能
一、不管任务时从run中正常返回,仍是抛出一个异常而返回,afterExecute都会被调用(可是若是任务在完成后带有一个Error,那么就不会调用afterExecute)测试
二、同时,若是beforeExecute抛出一个RuntimeExecption,那么任务将不会被执行,连带afterExecute也不会被调用了。优化
三、在线程池完成关闭操做时会调用terminated,相似于try-catch中的finally操做同样。terminated能够用来释放Executor在其生命周期里分配的各类资源,此外也能够用来执行发送通知、记录日志亦或是收集finalize统计信息等操做。线程
咱们先构建一个自定义的线程池,它经过扩展方法来添加日志记录和统计信息的收集。为了测量任务的运行时间,beforeExecute必须记录开始时间并把它保存到一个afterExecute能够访问的地方,因而用ThreadLocal来存储变量,用afterExecute来读取,并经过terminated来输出平均任务和日志消息。日志
public class WeedThreadPool extends ThreadPoolExecutor { private final ThreadLocal<Long> startTime =new ThreadLocal<>(); private final Logger log =Logger.getLogger("WeedThreadPool"); //统计执行次数 private final AtomicLong numTasks =new AtomicLong(); //统计总执行时间 private final AtomicLong totalTime =new AtomicLong(); /** * 这里是实现线程池的构造方法,我随便选了一个,你们能够根据本身的需求找到合适的构造方法 */ public WeedThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } //线程执行以前调用 protected void beforeExecute(Thread t,Runnable r){ super.beforeExecute(t,r); System.out.println(String.format("Thread %s:start %s",t,r)); //由于currentTimeMillis返回的是ms,而众所周知ms是很难产生差别的,因此换成了nanoTime用ns来展现 startTime.set(System.nanoTime()); } //线程执行以后调用 protected void afterExecute(Runnable r,Throwable t){ try { Long endTime =System.nanoTime(); Long taskTime =endTime-startTime.get(); numTasks.incrementAndGet(); totalTime.addAndGet(taskTime); System.out.println(String.format("Thread %s:end %s, time=%dns",Thread.currentThread(),r,taskTime)); }finally { super.afterExecute(r,t); } } //线程池退出时候调用 protected void terminated(){ try{ System.out.println(String.format("Terminated: avg time =%dns, ",totalTime.get()/numTasks.get())); }finally { super.terminated(); } } }
测试案例:code
public class WeedThreadTest { BlockingQueue<Runnable> taskQueue; final static WeedThreadPool weedThreadPool =new WeedThreadPool(3,10,1, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(100)); public static void main(String[] args) throws InterruptedException { for(int i=0;i<3;i++) { weedThreadPool.execute(WeedThreadTest::run); } Thread.sleep(2000L); weedThreadPool.shutdown(); } private static void run() { System.out.println("thread id is: " + Thread.currentThread().getId()); try { Thread.sleep(1024L); } catch (InterruptedException e) { e.printStackTrace(); } } }
用到这些方法的地方其实和用到Spring AOP中一些场景比较类似,主要在记录跟踪、优化等方面可使用,如日志记录和统计信息的收集、测量任务的运行时间,以及一些任务完成以后发送通知、邮件、信息之类的。
若是咱们意外收获了一大批待执行的任务(举个例子,好比去调用各大旅游软件的出行机票信息),为了提升任务的执行效率,咱们可使用线程池submit异步计算任务,经过调用Future接口实现类的get方法获取结果。
虽然使用了线程池会提升执行效率,可是调用Future接口实现类的get方法是阻塞的,也就是和当前这个Future关联的任务所有执行完成的时候,get方法才返回结果,若是当前任务没有执行完成,而有其它Future关联的任务已经完成了,就会白白浪费不少等待的时间。
因此,有没有这样一个方法,遍历的时候谁先执行完成就先获取哪一个结果?
没错,咱们的ExecutorCompletionService就能够实现这样的效果,它的内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,经过调用它的take方法或poll方法能够获取到一个已经执行完成的Future,进而经过调用Future接口实现类的get方法获取最终的结果。
逻辑图以下:
ExecutorCompletionService实现了CompletionService接口,在CompletionService接口中定义了以下这些方法:
Future
Future
Future
Future
Future
public class WeedExecutorServiceDemo { /** * 继续用以前建好的线程池,只是调整一下池大小 */ BlockingQueue<Runnable> taskQueue; final static WeedThreadPool weedThreadPool = new WeedThreadPool(1, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100)); public static Random r = new Random(); public static void main(String[] args) throws InterruptedException, ExecutionException { CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(weedThreadPool); for (int i = 0; i < 3; i++) { cs.submit(() -> { //获取计算任务 int init = 0; for (int j = 0; j < 100; j++) { init += r.nextInt(); } Thread.sleep(1000L); return Integer.valueOf(init); }); } weedThreadPool.shutdown(); /** * 经过take方法获取,阻塞,直到有任务完成 */ for (int i = 0; i < 3; i++) { Future<Integer> future = cs.take(); if (future != null) { System.out.println(future.get()); } } } }
调用结果以下
咱们也能够经过poll方法来获取。
/** * 经过poll方法获取 */ for (int i = 0; i < 3; i++) { System.out.println(cs.poll(1200L,TimeUnit.MILLISECONDS).get()); }
结果天然是同样的
若是把阻塞时间改小一些,目前的代码就会出问题
/** * 经过poll方法获取 */ for (int i = 0; i < 3; i++) { System.out.println(cs.poll(800L,TimeUnit.MILLISECONDS).get()); }
一样的,poll方法也能够用来打断超时执行的业务,好比在poll超时的状况下,直接调用线程池的shutdownNow(),残暴地关闭整个线程池。
for (int i = 0; i < 3; i++) { Future<Integer> poll = cs.poll(800L, TimeUnit.MILLISECONDS); if (poll==null){ System.out.println("执行结束"); weedThreadPool.shutdownNow(); } }
选择怎么样的方法来异步执行任务,什么样的方式来接收任务,也是须要根据实际状况来考虑的。
1.、须要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一块儿,可以让批量异步任务的管理更简单。
二、让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你能够轻松实现后续处理的有序性,避免无谓的等待。
三、线程池隔离。CompletionService支持建立知己的线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
有须要的同窗能够加个人公众号,之后的最新的文章第一时间都在里面,须要以前文章的思惟导图也能够给我留言