【java并发与多线程系列】可暂停线程执行器

场景

在一些活动场景中,咱们常常须要处理一些营销活动,通常都有中途暂停,中途重启的需求。好比一个派券需求,我须要给1千万的用户派券,因为某些缘由,须要暂停这个活动一会,而后再从新接着派发。redis

原理

重写 beforeExecute线程

实现

@Configuration
@EnableScheduling
@EnableAsync
public class MultiThreadExecutorMonitor {

   private static final Logger LOG = LoggerFactory.getLogger(MultiThreadExecutorMonitor.class);

   private static PausableThreadPoolExecutor es = new PausableThreadPoolExecutor(
           50,
           100,
           0L,
           TimeUnit.MILLISECONDS,
           new LinkedBlockingQueue<Runnable>(100), // 生产的业务 能够经过 redis Queue 来处理
           new ThreadPoolExecutor.DiscardPolicy()  // 直接掉弃
   );

   private static ExecutorService peopler = new ThreadPoolExecutor(
           50,
           100,
           0L,
           TimeUnit.MILLISECONDS,
           new LinkedBlockingQueue<Runnable>(100)  // 定量的无界阻塞队列
   );

   private static AtomicInteger counter = new AtomicInteger(0);

   private static AtomicBoolean isPase = new AtomicBoolean(true);

   /**
    * 监控线程执行器
    *
    * @throws InterruptedException
    */
   @Async
   @Scheduled(fixedDelay = 3000)
   public void cronJob() throws InterruptedException {
       threadMonitor();
       LOG.info("监控结束 {}", counter.get());
   }

   /**
    *  尝试任务
    *
    * @throws InterruptedException
    */
   @Async
   @Scheduled(fixedDelay = 9000)
   public void cronTryJob() throws Exception {
       runThread();
       LOG.info(" 尝试任务 {}", counter.get());
   }


   /**
    * 模拟人为的暂停,重启
    *
    * @throws InterruptedException
    */
   @Async
   @Scheduled(fixedDelay = 15000)
   public void cronPaseOrResumeJob() throws InterruptedException {
         controllerThreadExecitor();
         LOG.info("操做控制线程执行器 {}", counter.get());
   }

   public static void main(String[] args) throws Exception {
       AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(
               MultiThreadExecutorMonitor.class);
       try {
           isPase.getAndSet(false);
           runThread();
           Thread.currentThread().join();
       } catch (InterruptedException e) {
           e.printStackTrace();
       } finally {
           context.close();
       }
   }

   public static void runThread() throws Exception {
       while (!isPase.get()) {
               // ToDo: 断点重试工做 ...
               es.execute(() -> {
//                  LOG.info("执行:{}", counter.incrementAndGet());
                   counter.incrementAndGet();
                   try {
                       Thread.sleep(1000);
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               });
       }
   }

   public static void controllerThreadExecitor() {
       if(isPase.get()) {
           LOG.info("=============================== 重启线程池 :{}", isPase.get());
           isPase.getAndSet(false);
           es.resume();
       } else  {
           LOG.info("=============================== 暂停线程池 :{}", isPase.get());
           isPase.getAndSet(true);
           es.pause();
       }

   }

    public  static void  threadMonitor() {
        ThreadPoolExecutor tpe = ((ThreadPoolExecutor) es);
        int queueSize = tpe.getQueue().size();
        LOG.info("当前排队线程数:{}" ,  queueSize);
        int activeCount = tpe.getActiveCount();
        LOG.info("当前活动线程数:{}" ,  activeCount);
        long completedTaskCount = tpe.getCompletedTaskCount();
        LOG.info("执行完成线程数:{}" ,  completedTaskCount);
        boolean isShutdown = tpe.isShutdown();
        LOG.info("是否暂停现场执行:{}" ,  isShutdown);
        boolean  isTerminated = tpe.isTerminated();
        LOG.info("isTerminated:{}" ,  isTerminated);
        boolean isTerminating = tpe.isTerminating();
        LOG.info("isTerminating:{}" ,  isTerminating);
        long taskCount = tpe.getTaskCount();
        LOG.info("总线程数:{}" ,  taskCount);
    }
}
相关文章
相关标签/搜索