场景
在一些活动场景中,咱们常常须要处理一些营销活动,通常都有中途暂停,中途重启的需求。好比一个派券需求,我须要给1千万的用户派券,因为某些缘由,须要暂停这个活动一会,而后再从新接着派发。redis
原理
重写 beforeExecute线程
实现
@Configuration @EnableScheduling @EnableAsync public class MultiThreadExecutorMonitor { private static final Logger LOG = LoggerFactory.getLogger(MultiThreadExecutorMonitor.class); private static PausableThreadPoolExecutor es = new PausableThreadPoolExecutor( 50, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100), // 生产的业务 能够经过 redis Queue 来处理 new ThreadPoolExecutor.DiscardPolicy() // 直接掉弃 ); private static ExecutorService peopler = new ThreadPoolExecutor( 50, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100) // 定量的无界阻塞队列 ); private static AtomicInteger counter = new AtomicInteger(0); private static AtomicBoolean isPase = new AtomicBoolean(true); /** * 监控线程执行器 * * @throws InterruptedException */ @Async @Scheduled(fixedDelay = 3000) public void cronJob() throws InterruptedException { threadMonitor(); LOG.info("监控结束 {}", counter.get()); } /** * 尝试任务 * * @throws InterruptedException */ @Async @Scheduled(fixedDelay = 9000) public void cronTryJob() throws Exception { runThread(); LOG.info(" 尝试任务 {}", counter.get()); } /** * 模拟人为的暂停,重启 * * @throws InterruptedException */ @Async @Scheduled(fixedDelay = 15000) public void cronPaseOrResumeJob() throws InterruptedException { controllerThreadExecitor(); LOG.info("操做控制线程执行器 {}", counter.get()); } public static void main(String[] args) throws Exception { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext( MultiThreadExecutorMonitor.class); try { isPase.getAndSet(false); runThread(); Thread.currentThread().join(); } catch (InterruptedException e) { e.printStackTrace(); } finally { context.close(); } } public static void runThread() throws Exception { while (!isPase.get()) { // ToDo: 断点重试工做 ... es.execute(() -> { // LOG.info("执行:{}", counter.incrementAndGet()); counter.incrementAndGet(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } public static void controllerThreadExecitor() { if(isPase.get()) { LOG.info("=============================== 重启线程池 :{}", isPase.get()); isPase.getAndSet(false); es.resume(); } else { LOG.info("=============================== 暂停线程池 :{}", isPase.get()); isPase.getAndSet(true); es.pause(); } } public static void threadMonitor() { ThreadPoolExecutor tpe = ((ThreadPoolExecutor) es); int queueSize = tpe.getQueue().size(); LOG.info("当前排队线程数:{}" , queueSize); int activeCount = tpe.getActiveCount(); LOG.info("当前活动线程数:{}" , activeCount); long completedTaskCount = tpe.getCompletedTaskCount(); LOG.info("执行完成线程数:{}" , completedTaskCount); boolean isShutdown = tpe.isShutdown(); LOG.info("是否暂停现场执行:{}" , isShutdown); boolean isTerminated = tpe.isTerminated(); LOG.info("isTerminated:{}" , isTerminated); boolean isTerminating = tpe.isTerminating(); LOG.info("isTerminating:{}" , isTerminating); long taskCount = tpe.getTaskCount(); LOG.info("总线程数:{}" , taskCount); } }