所谓线程池,就是将多个线程放在一个池子里面(所谓池化技术),而后须要线程的时候不是建立一个线程,而是从线程池里面获取一个可用的线程,而后执行咱们的任务。线程池的关键在于它为咱们管理了多个线程,咱们不须要关心如何建立线程,咱们只须要关系咱们的核心业务,而后须要线程来执行任务的时候从线程池中获取线程。任务执行完以后线程不会被销毁,而是会被从新放到池子里面,等待机会去执行任务。java
咱们为何须要线程池呢?首先一点是线程池为咱们提升了一种简易的多线程编程方案,咱们不须要投入太多的精力去管理多个线程,线程池会自动帮咱们管理好,它知道何时该作什么事情,咱们只要在须要的时候去获取就能够了。其次,咱们使用线程池很大程度上归咎于建立和销毁线程的代价是很是昂贵的,甚至咱们建立和销毁线程的资源要比咱们实际执行的任务所花费的时间还要长,这显然是不科学也是不合理的,并且若是没有一个合理的管理者,可能会出现建立了过多的线程的状况,也就是在JVM中存活的线程过多,而存活着的线程也是须要销毁资源的,另一点,过多的线程可能会形成线程过分切换的尴尬境地。编程
对线程池有了一个初步的认识以后,咱们来看看如何使用线程池。数组
// 建立一个线程池 ExecutorService executorService = Executors.newFixedThreadPool(1); // 提交任务 executorService.submit(() -> System.out.println("run")); Future<String> stringFuture = executorService.submit(() -> "run"); // 建立一个调度线程池 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); // 提交一个周期性执行的任务 scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println("schedule"),0,1, TimeUnit.SECONDS); // shutdown executorService.shutdown(); scheduledExecutorService.shutdown();
能够发现使用线程池很是简单,只须要极少的代码就能够建立出咱们须要的线程池,而后将咱们的任务提交到线程池中去。咱们只须要在结束之时记得关闭线程池就能够了。本文的重点并不是在于如何使用线程池,而是试图剖析线程池的实现,好比一个调度线程池是怎么实现的?是靠什么实现的?为何能这样实现等等问题。数据结构
Java中与线程池相关的类都在java.util.concurrent
包下,以下展现了一些:多线程
经过上面一节中的使用示例,能够发现Executors类是一个建立线程池的有用的类,事实上,Executors类的角色也就是建立线程池,它是一个工厂类,能够产生不一样类型的线程池。而Executor是线程池的鼻祖类,它有两个子类是ExecutorService和ScheduledExecutorService,而ThreadPoolExecutor和ScheduledThreadPoolExecutor则是真正的线程池,咱们的任务将被这两个类交由其所管理者的线程池运行,能够发现,ScheduledThreadPoolExecutor是一个万千宠爱于一身的类,下面咱们能够看看它的类关系图:架构
ScheduledThreadPoolExecutor
继承了ThreadPoolExecutor
,ThreadPoolExecutor
实现了通常的线程池,没有调度功能,而ScheduledThreadPoolExecutor
继承了ThreadPoolExecutor
的实现,而后增长了调度功能。less
最为原始的Executor只有一个方法execute,它接受一个Runnable类型的参数,意思是使用线程池来执行这个Runnable,能够发现Executor不提供有返回值的任务。ExecutorService继承了Executor,而且极大的加强了Executor的功能,不只支持有返回值的任务执行,并且还有不少十分有用的方法来为你提供服务,下面展现了ExecutorService提供的方法:函数
ScheduledExecutorService继承了ExecutorService
,而且增长了特有的调度(schedule)功能。关于Executor、ExecutorService和ScheduledExecutorService的关系,能够见下图:oop
总结一下,通过咱们的调研,能够发现其实对于咱们编写多线程代码来讲,最为核心的是Executors类,根据咱们是须要ExecutorService类型的线程池仍是ScheduledExecutorService类型的线程池调用相应的工厂方法就能够了,而ExecutorService的实现表如今ThreadPoolExecutor上,ScheduledExecutorService的实现则表如今ScheduledThreadPoolExecutor上,下文将分别剖析这二者,尝试弄清楚线程池的原理。学习
上文中描述了Java中线程池相关的架构,了解了这些内容其实咱们就可使用java的线程池为咱们工做了,使用其提供的线程池咱们能够很方便的写出高质量的多线程代码,本节将分析ThreadPoolExecutor的实现,来探索线程池的运行原理。下面的图片展现了ThreadPoolExecutor的类图:
private final BlockingQueue<Runnable> workQueue; // 任务队列,咱们的任务会添加到该队列里面,线程将从该队列获取任务来执行 private final HashSet<Worker> workers = new HashSet<Worker>();//全部工做线程的集合,来消费workQueue里面的任务 private volatile ThreadFactory threadFactory;//线程工厂 private volatile RejectedExecutionHandler handler;//拒绝策略,默认会抛出异常,还要其余几种拒绝策略以下: 一、CallerRunsPolicy:在调用者线程里面运行该任务 二、DiscardPolicy:丢弃任务 三、DiscardOldestPolicy:丢弃workQueue的头部任务 private volatile int corePoolSize;//最下保活work数量 private volatile int maximumPoolSize;//work上限
咱们尝试执行submit方法,下面是执行的关键路径,总结起来就是:若是Worker数量还没达到上限则继续建立,不然提交任务到workQueue,而后让worker来调度运行任务。
step 1: <ExecutorService> Future<?> submit(Runnable task); step 2:<AbstractExecutorService> public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } step 3:<Executor> void execute(Runnable command); step 4:<ThreadPoolExecutor> public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //提交咱们的任务到workQueue int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) //使用maximumPoolSize做为边界 reject(command); //还不行?拒绝提交的任务 } step 5:<ThreadPoolExecutor> private boolean addWorker(Runnable firstTask, boolean core) step 6:<ThreadPoolExecutor> w = new Worker(firstTask); //包装任务 final Thread t = w.thread; //获取线程(包含任务) workers.add(w); // 任务被放到works中 t.start(); //执行任务
上面的流程是高度归纳的,实际状况远比这复杂得多,可是咱们关心的是怎么打通整个流程,因此这样分析问题是没有太大的问题的。观察上面的流程,咱们发现其实关键的地方在于Worker,若是弄明白它是如何工做的,那么咱们也就大概明白了线程池是怎么工做的了。下面分析一下Worker类。
上面的图片展现了Worker的类关系图,关键在于他实现了Runnable接口,因此问题的关键就在于run方法上。在这以前,咱们来看一下Worker类里面的关键成员:
/** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; // 咱们提交的任务,可能被马上执行,也可能被放到队列里面
thread是Worker的工做线程,上面的分析咱们也发现了在addWorker中会获取worker里面的thread而后start,也就是这个线程的执行,而Worker实现了Runnable接口,因此在构造thread的时候Worker将本身传递给了构造函数,thread.start执行的其实就是Worker的run方法。下面是run方法的内容:
/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
咱们来分析一下runWorker这个方法,这就是整个线程池的核心。首先获取到了咱们刚提交的任务firstTask,而后会循环从workQueue里面获取任务来执行,获取任务的方法以下:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
其实核心也就一句:Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
咱们再回头看一下execute,其实咱们上面只走了一条逻辑,在execute的时候,咱们的worker的数量尚未到达咱们设定的corePoolSize的时候,会走上面咱们分析的逻辑,而若是达到了咱们设定的阈值以后,execute中会尝试去提交任务,若是提交成功了就结束,不然会拒绝任务的提交。咱们上面还提到一个成员:maximumPoolSize,其实线程池的最大的Worker数量应该是maximumPoolSize,可是咱们上面的分析是corePoolSize,这是由于咱们的private boolean addWorker(Runnable firstTask, boolean core)的参数core的值来控制的,core为true则使用corePoolSize来设定边界,不然使用maximumPoolSize来设定边界。直观的解释一下,当线程池里面的Worker数量尚未到corePoolSize,那么新添加的任务会伴随着产生一个新的worker,若是Worker的数量达到了corePoolSize,那么就将任务存放在阻塞队列中等待Worker来获取执行,若是没有办法再向阻塞队列听任务了,那么这个时候maximumPoolSize就变得有用了,新的任务将会伴随着产生一个新的Worker,若是线程池里面的Worker已经达到了maximumPoolSize,那么接下来提交的任务只能被拒绝策略拒绝了。能够参考下面的描述来理解:
* When a new task is submitted in method {@link #execute(Runnable)}, * and fewer than corePoolSize threads are running, a new thread is * created to handle the request, even if other worker threads are * idle. If there are more than corePoolSize but less than * maximumPoolSize threads running, a new thread will be created only * if the queue is full. By setting corePoolSize and maximumPoolSize * the same, you create a fixed-size thread pool. By setting * maximumPoolSize to an essentially unbounded value such as {@code * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary * number of concurrent tasks. Most typically, core and maximum pool * sizes are set only upon construction, but they may also be changed * dynamically using {@link #setCorePoolSize} and {@link * #setMaximumPoolSize}.
在此须要说明一点,有一个重要的成员:keepAliveTime
,当线程池里面的线程数量超过corePoolSize了,那么超出的线程将会在空闲keepAliveTime以后被terminated。能够参考下面的文档:
* If the pool currently has more than corePoolSize threads, * excess threads will be terminated if they have been idle for more * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
ScheduledThreadPoolExecutor适用于延时执行,或者周期性执行的任务调度,ScheduledThreadPoolExecutor在实现上继承了ThreadPoolExecutor,因此你依然能够将ScheduledThreadPoolExecutor当成ThreadPoolExecutor来使用,可是ScheduledThreadPoolExecutor的功能要强大得多,由于ScheduledThreadPoolExecutor能够根据设定的参数来周期性调度运行,下面的图片展现了四个和周期性相关的方法:
下面来看一下这四个方法的一些细节:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
经过上面的代码咱们能够发现,前两个方法是相似的,后两个方法也是相似的。前两个方法属于一次性调度,因此period都为0,区别在于参数不一样,一个是Runnable,而一个是Callable,好笑的是,最后都变为了Callable了,见下面的构造函数:
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
对于后两个方法,区别仅仅在于period的,scheduleWithFixedDelay对参数进行了操做,将原来的时间变为负数了,然后面在计算下次被调度的时间的时候会根据这个参数的正负值来分别处理,正数表明scheduleAtFixedRate,而负数表明了scheduleWithFixedDelay。
一个须要被咱们注意的细节是,以上四个方法最后都会调用一个方法: delayedExecute(t),下面看一下这个方法:
private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
大概的意思就是先判断线程池是否被关闭了,若是被关闭了,则拒绝任务的提交,不然将任务加入到任务队列中去等待被调度执行。最后的ensurePrestart的意思是须要确保线程池已经被启动起来了。下面是这个方法:
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
主要是增长了一个没有任务的worker,有什么用呢?咱们还记得Worker的逻辑吗?addWorker方法的执行,会触发Worker的run方法的执行,而后runWorker方法就会被执行,而runWorker方法是循环从workQueue中取任务执行的,因此确保线程池被启动起来是重要的,而只须要简单的执行addWorker便会触发线程池的启动流程。对于调度线程池来讲,只要执行了addWorker方法,那么线程池就会一直在后台周期性的调度执行任务。
到此,彷佛咱们仍是没有闹明白ScheduledThreadPoolExecutor是如何实现周期性的,上面讲到四个scheduled方法时,咱们没有提一个重要的类:ScheduledFutureTask,对,全部神奇的事情将会发生在这个类中,下面来分析一下这个类。
看上面的类图,貌似这个类很是复杂,还好,咱们发现他实现了Runnable接口,那么必然会有一个run方法,而这个run方法必然是整个类的核心,下面来看一下这个run方法的内容:
public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
首先,判断是不是周期性的任务,若是不是,则直接执行(一次性),不然执行后,而后设置下次执行的时间,而后从新调度,等待下次执行。这里有一个方法须要注意,也就是setNextRunTime,上面咱们提到scheduleAtFixedRate和scheduleWithFixedDelay在传递参数时不同,后者将delay值变为了负数,因此下面的处理正好印证了前文所述。
/** * Sets the next time to run for a periodic task. */ private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
下面来看一下reExecutePeriodic方法是如何作的,他的目标是将任务再次被调度执行,下面的代码展现了这个功能的实现:
void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } }
能够看到,这个方法就是将咱们的任务再次放到了workQueue里面,那这个参数是什么?在上面的run方法中咱们调用了reExecutePeriodic方法,参数为outerTask,而这个变量是什么?看下面的代码:
/** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFuture<V> outerTask = this;
这个变量指向了本身,而this的类型是什么?是ScheduledFutureTask,也就是能够被调度的task,这样就实现了循环执行任务了。
上面的分析已经到了循环执行,可是ScheduledThreadPoolExecutor的功能是周期性执行,因此咱们接着分析ScheduledThreadPoolExecutor是如何根据咱们的参数走走停停的。这个时候,是应该看一下ScheduledThreadPoolExecutor的构造函数了,咱们来看一个最简单的构造函数:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
咱们知道ScheduledThreadPoolExecutor的父类是ThreadPoolExecutor,因此这里的super实际上是ThreadPoolExecutor的构造函数,咱们发现其中有一个参数DelayedWorkQueue,看名字貌似是一个延迟队列的样子,进一步跟踪代码,发现了下面的一行代码(构造函数中):this.workQueue = workQueue;
因此在ScheduledThreadPoolExecutor中,workQueue是一个DelayedWorkQueue类型的队列,咱们暂且认为DelayedWorkQueue是一种具有延迟功能的队列吧,那么,到此咱们即可以想明白了,上面的分析咱们明白了ScheduledThreadPoolExecutor是如何循环执行任务的,而这里咱们明白了ScheduledThreadPoolExecutor使用DelayedWorkQueue来达到延迟的目标,因此组合起来,就能够实现ScheduledThreadPoolExecutor周期性执行的目标。下面咱们来看一下DelayedWorkQueue是如何作到延迟的吧,上文中提到一个方法:getTask,这个方法的做用是从workQueue中取出任务来执行,而在ScheduledThreadPoolExecutor里面,getTask方法是从DelayedWorkQueue中取任务的,而取任务无非两个方法:poll或者take,下面咱们对DelayedWorkQueue的take方法来分析一下:
public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
在for循环里面,首先从queue中获取第一个任务,而后从任务中取出延迟时间,然后使用available变量来实现延迟效果。这里面须要几个点须要探索一下:
对于第一个问题,看下面的代码:
`
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];`
它是一个RunnableScheduledFuture类型的数组,下面是RunnableScheduledFuture类的类关系图:
数组里面保存了咱们的RunnableScheduledFuture,对queue的操做,主要来看一下增长元素和消费元素的操做。首先,假设使用add方法来增长RunnableScheduledFuture到queue,调用的链路以下:
public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
解释一下,add方法直接转到了offer方法,该方法中,首先判断数组的容量是否足够,若是不够则grow,增加的策略以下: int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
每次增加50%。增加完成后,若是这是第一个元素,则放在坐标为0的位置,不然,使用siftUp操做,下面是该方法的内容:
private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
这个数组实现了堆这种数据结构,使用对象比较将最须要被调度执行的RunnableScheduledFuture放到数组的前面,而这得力于compareTo方法,下面是RunnableScheduledFuture类的compareTo方法的实现,主要是经过延迟时间来作比较。
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
上面是生产元素,下面来看一下消费数据。在上面咱们提到的take方法中,使用了一个方法以下:
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { int s = --size; RunnableScheduledFuture<?> x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; }
这个方法中调用了一个方法siftDown,这个方法以下:
private void siftDown(int k, RunnableScheduledFuture<?> key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; RunnableScheduledFuture<?> c = queue[child]; int right = child + 1; if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); }
对其的解释就是:Replaces first element with last and sifts it down. Call only when holding lock.
总结一下,当咱们向queue插入任务的时候,会发生siftUp方法的执行,这个时候会把任务尽可能往根部移动,而当咱们完成任务调度以后,会发生siftDown方法的执行,与siftUp相反,siftDown方法会将任务尽可能移动到queue的末尾。总之,大概的意思就是queue经过compareTo实现了相似于优先级队列的功能。
下面咱们来看一下第二个问题:延迟时间的前因后果。在上面的take方法里面,首先获取了delay,而后再使用available来作延迟效果,那这个delay从哪里来的呢?经过上面的类图RunnableScheduledFuture的类图咱们知道,RunnableScheduledFuture类实现了Delayed接口,而Delayed接口里面的惟一方法是getDelay,咱们到RunnableScheduledFuture里面看一下这个方法的具体实现:
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); }
time是咱们设定的下次执行的时间,因此延迟就是(time - now()),没毛病!
第三个问题:available变量的前因后果,至于这个问题,咱们看下面的代码:
/** * Condition signalled when a newer task becomes available at the * head of the queue or a new thread may need to become leader. */ private final Condition available = lock.newCondition();
这是一个条件变量,take方法里面使用这个变量来作延迟效果。Condition能够在多个线程间作同步协调工做,更为具体细致的关于Condition的内容,能够参考更多的资料来学习,本文对此知识点点到为止。
到此为止,咱们梳理了ScheduledThreadPoolExecutor是如何实现周期性调度的,首先分析了它的循环性,而后分析了它的延迟效果。
本文到此也就结束了,对于线程池的学习如今才刚刚起步,须要更多更专业的知识类帮我理解更为底层的内容,固然,为了更进一步理解线程池的实现细节,首先须要对线程间通讯有足够的把握,其次是要对各类数据结构有清晰的认识,好比队列、优先级队列、堆等高级的数据结构,以及java语言对于这些数据结构的实现,更为重要的是要结合实际状况分析问题,在工做和平时的学习中不断总结,不断迭代对于线程、线程池的认知。