为何要单独讲解TimedSupervisorTask这个类呢?由于这个类在咱们DiscoveryClient类的initScheduledTasks方法进行定时任务初始化时被使用得比较多,因此咱们须要了解下这个类,咱们先看下TimedSupervisorTask这个类在initScheduledTasks的具体使用:ide
private final ScheduledExecutorService scheduler; private void initScheduledTasks() { …省略其余代码 // 初始化定时拉取服务注册信息 scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); …省略其余代码 // 初始化定时服务续约任务 scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); …省略其余代码 }
因而可知,TimedSupervisorTask类被使用在了定时任务的初始化中,咱们具体来看看这个类的结构:ui
public class TimedSupervisorTask extends TimerTask { private static final Logger logger = LoggerFactory.getLogger(TimedSupervisorTask.class); private final Counter timeoutCounter; private final Counter rejectedCounter; private final Counter throwableCounter; private final LongGauge threadPoolLevelGauge; private final ScheduledExecutorService scheduler; private final ThreadPoolExecutor executor; private final long timeoutMillis; private final Runnable task; private final AtomicLong delay; private final long maxDelay; public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) { this.scheduler = scheduler; this.executor = executor; this.timeoutMillis = timeUnit.toMillis(timeout); this.task = task; this.delay = new AtomicLong(timeoutMillis); this.maxDelay = timeoutMillis * expBackOffBound; // Initialize the counters and register. timeoutCounter = Monitors.newCounter("timeouts"); rejectedCounter = Monitors.newCounter("rejectedExecutions"); throwableCounter = Monitors.newCounter("throwables"); threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build()); Monitors.registerObject(name, this); } @Override public void run() { Future<?> future = null; try { future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); } catch (TimeoutException e) { logger.warn("task supervisor timed out", e); timeoutCounter.increment(); long currentDelay = delay.get(); // 若是出现异常,则将时间*2,而后取 定时时间 和 最长定时时间中最小的为下次任务执行的延时时间 long newDelay = Math.min(maxDelay, currentDelay * 2); delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, reject the task", e); } else { logger.warn("task supervisor rejected the task", e); } rejectedCounter.increment(); } catch (Throwable e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, can't accept the task"); } else { logger.warn("task supervisor threw an exception", e); } throwableCounter.increment(); } finally { if (future != null) { future.cancel(true); } if (!scheduler.isShutdown()) { scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } } }
咱们能够仔细看看run方法的具体实现,由于这里有一个值得借鉴的设计思路!!!this
咱们简单来看看这个方法具体执行流程:
1.执行submit()方法提交任务
2.执行future.get()方法,若是没有在规定的时间获得返回值或者任务出现异常,则进入异常处理catch代码块。
3.若是发生异常
a. 发生TimeoutException异常,则执行Math.min(maxDelay, currentDelay ✖️ 2);获得任务延时时间 ✖️ 2 和 最大延时时间的最小值,而后改变任务的延时时间timeoutMillis(延时任务时间默认值是30s)
b.发生RejectedExecutionException异常,则将rejectedCounter值+1
c.发生Throwable异常,则将throwableCounter值+1
4.若是没有发生异常,则再设置一次延时任务时间timeoutMillis
5.进入finally代码块
a.若是future不为null,则执行future.cancel(true),中断线程中止任务
b.若是线程池没有shutdown,则建立一个新的定时任务spa
\(\color{red}{注意}\):不知道有没有小伙伴发现,无论咱们的定时任务执行是成功仍是结束(若是尚未执行结束,也会被中断),而后会再从新初始化一个新的任务。而且这个任务的延时时间还会由于不一样的状况受到改变,在try代码块中若是不发现异常,则会从新初始化延时时间,若是发生TimeoutException异常,则会更改延时时间,更改成 任务延时时间 ✖️ 2 和 最大延时时间的最小值。因此咱们会发现这样的设计会让整个延时任务很灵活。若是不发生异常,则延时时间不会变;若是发现异常,则增加延时时间;若是程序又恢复正常了,则延时时间又恢复成了默认值。线程
总结:咱们在设计延时/周期性任务时就能够参考TimedSupervisorTask的实现,程序一旦遇到发生超时异常,就将间隔时间调大,若是连续超时,那么每次间隔时间都会增大一倍,一直到达外部参数设定的上限为止,一旦新任务再也不发生超时异常,间隔时间又会自动恢复为初始值。设计