开发本身的项目有一段时间了,由于是个长时间跑的服务器端程序,因此异常处理显得尤其重要。
对于异常的抓取和日志(狭义上的日志)的分析一点都不能落下。java
咱们使用了Java自带的Executor模块,我只是稍微看了下Executors当中三个线程池的实现(策略为:Fixed, Cached, Schedule),其实光看名字就能够了解各自的一些策略信息。OK,这一次我须要一种策略合并Fixed和Cached的两种特色的自定义Executor。其实很简单,给Cached设置一个上线就是了。注意他们的同步队列使用的不一样,用LinkedBlockingQueue是个不错的选择,至于BlockingQueue的实现能够自行谷歌(之后再记吧)。sql
先看写的简略的代码apache
package com.zjseek.recharge.core; import com.zjseek.recharge.exception.SKErrorCode; import com.zjseek.recharge.exception.SKOrderState; import com.zjseek.recharge.model.OrderModel; import com.zjseek.recharge.service.OrderService; import org.apache.log4j.Logger; import java.sql.Timestamp; import java.util.concurrent.*; /** * Created by geminiwen on 14-6-28. */ public class OrderExceptionThreadExecutor extends ThreadPoolExecutor { private Logger logger = Logger.getLogger(OrderExceptionThreadExecutor.class); private OrderService orderService; public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); init(); } public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); init(); } public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); init(); } public OrderExceptionThreadExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); init(); } private void init() { this.orderService = new OrderService(); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); Future<?> f = (Future<?>) r; try { f.get(); } catch (InterruptedException e) { logger.error("线程池中发现异常,被中断", e); } catch (ExecutionException e) { logger.error("线程池中发现异常,被中断", e); } } }
我这是一个订单处理流程,主要用到了一个protected方法,就是afterExecute。一看这个函数的样子,想固然的觉得若是线程池中出了问题,异常天然回在第二个参数t中传过来。
也许的确是这样的,可是这里有一个区别。
咱们知道ExecutorServcie中执行一个Runnable有两个方法,两个分别是服务器
public void execute(Runnable command); public <T> Future<T> submit(Runnable task, T result);
别看接受的参数差很少,其实submit最后是调用的execute的,并且在调用execute前,对task进行了一次封装,变成了RunnableFuture(它是接口,继承了Runnable和Future实际是一个实现类FutureTask)。ide
OK,对于实际操做Runnable的不一样,暂时说到这,看下execute方法作了什么事
execute方法对进来的Runnable又包装成了worker而后进入runWorker
runWorker方法中有这么几行函数
try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); }
好了,到了最关键的afterExecute这个步骤,我满心觉得这里全部的异常都会经过thrown传递进来,看来我仍是太年轻了,以前咱们分析过,这个Runnable已经被submit封装成了FutureTask,那么这个task.run()除了咱们本身定义的run任务以外,到底还干了啥呢?this
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
OK,这段源码摘自FutureTask中的run方法,实际咱们本身定义的任务已经变成了Callable:线程
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
从它的构造函数就能够看出来。日志
而后咱们在上面实际运行task的地方实际上是c.call()这一句。code
result = c.call();
咱们写的任务所有在这句代码里面执行完毕了,看看外面都wrap了啥? OK 咱们全部的Throwable所有已经被setException吃掉了,怎么还会抛出到外面那层的execute中呢?
因此我以前实验的时候,在submit中提交任务不管任务怎么抛异常,在afterExecute中的第二个参数是取不到的,缘由就在这。
再回头看看针对submit改造的函数
protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); Future<?> f = (Future<?>) r; try { f.get(); } catch (InterruptedException e) { logger.error("线程池中发现异常,被中断", e); } catch (ExecutionException e) { logger.error("线程池中发现异常,被中断", e); } }
固然,这里已经默认r是实现Future接口了。经过FutureTask的get方法,能把刚刚setException中的异常给抛出来,这样咱们就能真的拿到这些异常了。
若是咱们关心线程池执行的结果,则须要使用submit来提交task,那么在afterExecute中对异常的处理也须要经过Future接口调用get方法去取结果,才能拿到异常,若是咱们不关心这个任务的结果,能够直接使用ExecutorService中的execute方法(实际是继承Executor接口)来直接去执行任务,这样的话,咱们的Runnable没有通过多余的封装,在runWorker中获得的异常也直接能在afterExecute中捕捉。
好了,以上就是对线程池异常捕捉的一个记录。想一想应该不难,今天也是偶然机会看到的。今天在开发中碰到PHP锁的问题,头疼死了。