定时任务使用多线程注意事项

在定时任务中为了加快处理速度,通常都会使用多线程处理业务。须要注意一下事项:java

1. 定时任务是否容许上一个定时任务未结束,下一个定时任务能够启动,经过Scheduled中的配置在决定。服务器

2. 主线程已经关闭,线程池中的线程还在运行问题。线程池的关闭方法问题多线程

3. 定时任务有大量数据,致使服务没法中止问题。线程

4. 如何获取线程的处理结果code

 

以下代码是示例,stop状态的使用和线程池shutdown的处理逻辑须要依据本身的业务来处理。orm

 

@PreDestroy
    public void destory(){
        stop = true;
    }

    //线程中止状态, 经过注解检测到服务器中止时修改stop状态
    boolean stop = false;

    //服务器启动后延迟1分钟执行,定时任务结束后延迟1分钟执行下一次
    @Scheduled(initialDelay = 60*1000L, fixedDelay = 60*1000L)
    public void scheduling(){
        List<String> dataList = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            dataList.add("data_"+i);
        }



        int threadSize = 10;
        ExecutorService esPool = Executors.newFixedThreadPool(threadSize);

        //接收线程的完成时间 或者其余返回结果
        CompletionService<String> ecs = new ExecutorCompletionService<String>(esPool);

        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(dataList);
        logger.info("===============start {}==================", System.currentTimeMillis());
        //启动线程时修改退出线程状态
        stop = false;
        for (int i = 0; i < threadSize; i++) {
            ecs.submit(()->{
                long count = 0;
                //线程处理加try catch 防止抛出异常中断线程,可能会致使线程池中全部的线程都中断,无可用线程
                try{
                    // !queue.isEmpty()比queue.size()>0效率高不少 .size() 是要遍历一遍集合的
                    while (!stop && !queue.isEmpty()){
                        String data = queue.poll();

                        //500 能够在60秒内完成处理,正常退出
                        //改为 1000 若是不使用下面的收集结果代码,60秒内没法处理完,会强制shutdown 抛出异常
                        Thread.sleep(1000L);
                        logger.info("data {} ok.",data);
                        count++;
                    }
                }catch (Exception e){
                    logger.error("",e);
                }

                //这里范围线程处理的结果
                return System.currentTimeMillis()+"_"+count;
            });
        }

        //获取线程的返回结果 会阻塞主线程,直到线程池中全部的线程返回结果
        /*try {
            for (int i = 0; i < threadSize; i++) {
                String threadTime = ecs.take().get();
                logger.info("thread run ok time:{}"+threadTime);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }catch (ExecutionException e) {
            e.printStackTrace();
        }*/

        //关闭线程池
        try {
            esPool.shutdown();
            logger.info("esPoll shutdown:{}", DateUtil.format(new Date(),DateUtil.PATTERN_DEFAULT));
            //线程池阻塞,到指定的时间退出,若是全部线程执行完成返回true 不然返回false
            boolean await = esPool.awaitTermination(60*1000L,TimeUnit.MILLISECONDS);
            logger.info("esPool.awaitTermination 1:{}, {}",await,DateUtil.format(new Date(),DateUtil.PATTERN_DEFAULT));
            if(!await) {
                stop = true;
                await = esPool.awaitTermination(10*1000L,TimeUnit.MILLISECONDS);
                logger.info("esPool.awaitTermination 2:{}, {}",await,DateUtil.format(new Date(),DateUtil.PATTERN_DEFAULT));
            }

            if(!await){
                logger.info("wait 60s not stop, shutdownNow");
                // 超时的时候向线程池中全部的线程发出中断(interrupted)。
                // 让线程池中的全部线程当即中断。 会抛出异常
                esPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            //awaitTermination方法被中断的时候也停止线程池中所有的线程的执行。
            esPool.shutdownNow();
            logger.error("awaitTermination",e);
        }

        logger.info("===============end {}==================", System.currentTimeMillis());

    }
相关文章
相关标签/搜索