阿里JAVA编码规约,建议采用ThreadPoolExecutor建立线程池。json
private static ExecutorService simpleExecutorService = new ThreadPoolExecutor( 200, 300, 0L, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<Runnable>(10000), new ThreadPoolExecutor.DiscardPolicy());
public void doSomething(final String message) { simpleExecutorService.execute(new Runnable() { @Override public void run() { try { Thread.sleep(3000); System.out.println("step 2"); System.out.println("message=>" + message); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println("step 1"); }
ThreadUtil threadUtil = new ThreadUtil(); threadUtil.doSomething("a thread pool demo");
step 1 step 2 message=>a thread pool demo
在Spring3.x以后框架已经支持采用@Async注解进行异步执行了。多线程
被@Async修饰的方法叫作异步方法,这些异步方法会在新的线程中进行处理,不影响主线程的顺序执行。并发
@Component @Slf4j public class AsyncTask { @Async public void dealNoReturnTask(){ log.info("Thread {} deal No Return Task start", Thread.currentThread().getName()); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("Thread {} deal No Return Task end at {}", Thread.currentThread().getName(), System.currentTimeMillis()); } }
进行调用:框架
@SpringBootTest(classes = SpringbootApplication.class) @RunWith(SpringJUnit4ClassRunner.class) @Slf4j public class AsyncTest { @Autowired private AsyncTask asyncTask; @Test public void testDealNoReturnTask(){ asyncTask.dealNoReturnTask(); try { log.info("begin to deal other Task!"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }
@Async public Future<String> dealHaveReturnTask() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } JSONObject jsonObject = new JSONObject(); jsonObject.put("thread", Thread.currentThread().getName()); jsonObject.put("time", System.currentTimeMillis()); return new AsyncResult<String>(jsonObject.toJSONString()); }
判断任务是否取消:异步
@Test public void testDealHaveReturnTask() throws Exception { Future<String> future = asyncTask.dealHaveReturnTask(); log.info("begin to deal other Task!"); while (true) { if(future.isCancelled()){ log.info("deal async task is Cancelled"); break; } if (future.isDone() ) { log.info("deal async task is Done"); log.info("return result is " + future.get()); break; } log.info("wait async task to end ..."); Thread.sleep(1000); } }
咱们能够实现AsyncConfigurer接口,也能够继承AsyncConfigurerSupport类来实现 在方法getAsyncExecutor()中建立线程池的时候,必须使用 executor.initialize(), 否则在调用时会报线程池未初始化的异常。 若是使用threadPoolTaskExecutor()来定义bean,则不须要初始化async
@Configuration @EnableAsync @Slf4j public class AsyncConfig implements AsyncConfigurer { // @Bean // public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // executor.setCorePoolSize(10); // executor.setMaxPoolSize(100); // executor.setQueueCapacity(100); // return executor; // } @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(100); executor.setQueueCapacity(100); executor.setThreadNamePrefix("AsyncExecutorThread-"); executor.initialize(); //若是不初始化,致使找到不到执行器 return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncExceptionHandler(); } }
异步异常处理类:ide
@Slf4j public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params)); if (ex instanceof AsyncException) { AsyncException asyncException = (AsyncException) ex; log.info("asyncException:{}",asyncException.getErrorMessage()); } log.info("Exception :"); ex.printStackTrace(); } } @Data @AllArgsConstructor public class AsyncException extends Exception { private int code; private String errorMessage; }
须要结合Callable测试
public class CallableDemo implements Callable<Integer> { private int sum; @Override public Integer call() throws Exception { System.out.println("Callable子线程开始计算啦!"); Thread.sleep(2000); for(int i=0 ;i<5000;i++){ sum=sum+i; } System.out.println("Callable子线程计算结束!"); return sum; } }
//建立线程池 ExecutorService es = Executors.newSingleThreadExecutor(); //建立Callable对象任务 CallableDemo calTask = new CallableDemo(); //提交任务并获取执行结果 Future<Integer> future = es.submit(calTask); //关闭线程池 es.shutdown(); try { System.out.println("主线程在执行其余任务"); if (future.get() != null) { //输出获取到的结果 System.out.println("future.get()-->" + future.get()); } else { //输出获取到的结果 System.out.println("future.get()未获取到结果"); } } catch (Exception e) { e.printStackTrace(); } System.out.println("主线程在执行完成");
//建立线程池 ExecutorService es = Executors.newSingleThreadExecutor(); //建立Callable对象任务 CallableDemo calTask = new CallableDemo(); //建立FutureTask FutureTask<Integer> future = new FutureTask<>(calTask); // future.run(); // 因为FutureTask继承于Runable,因此也能够直接调用run方法执行 //执行任务 es.submit(future); // 效果同上面直接调用run方法 //关闭线程池 es.shutdown(); try { System.out.println("主线程在执行其余任务"); if (future.get() != null) { //输出获取到的结果 System.out.println("future.get()-->" + future.get()); } else { //输出获取到的结果 System.out.println("future.get()未获取到结果"); } } catch (Exception e) { e.printStackTrace(); } System.out.println("主线程在执行完成");
public class FutureDemo{ public static void main(String[] args) { Long start = System.currentTimeMillis(); //开启多线程 ExecutorService exs = Executors.newFixedThreadPool(10); try { //结果集 List<Integer> list = new ArrayList<Integer>(); List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); //1.高速提交10个任务,每一个任务返回一个Future入list for(int i=0;i<10;i++){ futureList.add(exs.submit(new CallableTask(i+1))); } Long getResultStart = System.currentTimeMillis(); System.out.println("结果归集开始时间="+new Date()); //2.结果归集,遍历futureList,高速轮询(模拟实现了并发)获取future状态成功完成后获取结果,退出当前循环 for (Future<Integer> future : futureList) { //CPU高速轮询:每一个future都并发轮循,判断完成状态而后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询 while (true) { //获取future成功完成状态,若是想要限制每一个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用便可。 if (future.isDone()&& !future.isCancelled()) { Integer i = future.get();//获取结果 System.out.println("任务i="+i+"获取完成!"+new Date()); list.add(i); break;//当前future获取结果完毕,跳出while } else { //每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个 Thread.sleep(1); } } } System.out.println("list="+list); System.out.println("总耗时="+(System.currentTimeMillis()-start)+",取结果归集耗时="+(System.currentTimeMillis()-getResultStart)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown(); } }
static class CallableTask implements Callable<Integer>{ Integer i; public CallableTask(Integer i) { super(); this.i=i; } @Override public Integer call() throws Exception { if(i==1){ Thread.sleep(3000);//任务1耗时3秒 }else if(i==5){ Thread.sleep(5000);//任务5耗时5秒 }else{ Thread.sleep(1000);//其它任务耗时1秒 } System.out.println("task线程:"+Thread.currentThread().getName()+"任务i="+i+",完成!"); return i; } } }