在定时任务中为了加快处理速度,通常都会使用多线程处理业务。须要注意一下事项:java
1. 定时任务是否容许上一个定时任务未结束,下一个定时任务能够启动,经过Scheduled中的配置在决定。服务器
2. 主线程已经关闭,线程池中的线程还在运行问题。线程池的关闭方法问题多线程
3. 定时任务有大量数据,致使服务没法中止问题。线程
4. 如何获取线程的处理结果code
以下代码是示例,stop状态的使用和线程池shutdown的处理逻辑须要依据本身的业务来处理。orm
@PreDestroy public void destory(){ stop = true; } //线程中止状态, 经过注解检测到服务器中止时修改stop状态 boolean stop = false; //服务器启动后延迟1分钟执行,定时任务结束后延迟1分钟执行下一次 @Scheduled(initialDelay = 60*1000L, fixedDelay = 60*1000L) public void scheduling(){ List<String> dataList = new ArrayList<>(); for (int i = 0; i < 1000; i++) { dataList.add("data_"+i); } int threadSize = 10; ExecutorService esPool = Executors.newFixedThreadPool(threadSize); //接收线程的完成时间 或者其余返回结果 CompletionService<String> ecs = new ExecutorCompletionService<String>(esPool); ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(dataList); logger.info("===============start {}==================", System.currentTimeMillis()); //启动线程时修改退出线程状态 stop = false; for (int i = 0; i < threadSize; i++) { ecs.submit(()->{ long count = 0; //线程处理加try catch 防止抛出异常中断线程,可能会致使线程池中全部的线程都中断,无可用线程 try{ // !queue.isEmpty()比queue.size()>0效率高不少 .size() 是要遍历一遍集合的 while (!stop && !queue.isEmpty()){ String data = queue.poll(); //500 能够在60秒内完成处理,正常退出 //改为 1000 若是不使用下面的收集结果代码,60秒内没法处理完,会强制shutdown 抛出异常 Thread.sleep(1000L); logger.info("data {} ok.",data); count++; } }catch (Exception e){ logger.error("",e); } //这里范围线程处理的结果 return System.currentTimeMillis()+"_"+count; }); } //获取线程的返回结果 会阻塞主线程,直到线程池中全部的线程返回结果 /*try { for (int i = 0; i < threadSize; i++) { String threadTime = ecs.take().get(); logger.info("thread run ok time:{}"+threadTime); } } catch (InterruptedException e) { e.printStackTrace(); }catch (ExecutionException e) { e.printStackTrace(); }*/ //关闭线程池 try { esPool.shutdown(); logger.info("esPoll shutdown:{}", DateUtil.format(new Date(),DateUtil.PATTERN_DEFAULT)); //线程池阻塞,到指定的时间退出,若是全部线程执行完成返回true 不然返回false boolean await = esPool.awaitTermination(60*1000L,TimeUnit.MILLISECONDS); logger.info("esPool.awaitTermination 1:{}, {}",await,DateUtil.format(new Date(),DateUtil.PATTERN_DEFAULT)); if(!await) { stop = true; await = esPool.awaitTermination(10*1000L,TimeUnit.MILLISECONDS); logger.info("esPool.awaitTermination 2:{}, {}",await,DateUtil.format(new Date(),DateUtil.PATTERN_DEFAULT)); } if(!await){ logger.info("wait 60s not stop, shutdownNow"); // 超时的时候向线程池中全部的线程发出中断(interrupted)。 // 让线程池中的全部线程当即中断。 会抛出异常 esPool.shutdownNow(); } } catch (InterruptedException e) { //awaitTermination方法被中断的时候也停止线程池中所有的线程的执行。 esPool.shutdownNow(); logger.error("awaitTermination",e); } logger.info("===============end {}==================", System.currentTimeMillis()); }