基于多核CPU的发展,使得多线程开发日趋流行。然而线程的建立和销毁,都涉及到系统调用,比较消耗系统资源,因此就引入了线程池技术,避免频繁的线程建立和销毁。html
在Java用有一个Executors工具类,能够为咱们建立一个线程池,其本质就是new了一个ThreadPoolExecutor对象。java
建议使用较为方便的 Executors 工厂方法来建立线程池。缓存
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
线程池的执行流程图 多线程
任务被提交到线程池,会先判断当前线程数量是否小于corePoolSize,若是小于则建立线程来执行提交的任务,不然将任务放入workQueue队列,若是workQueue满了,则判断当前线程数量是否小于maximumPoolSize,若是小于则建立线程执行任务,不然就会调用handler,以表示线程池拒绝接收任务。框架
public class SchedulePoolDemo { public static void main(String[] args){ ScheduledExecutorService service = Executors.newScheduledThreadPool(10); //若是前面的任务没有完成, 调度也不会启动 service.scheduleAtFixedRate(()->{ try { Thread.sleep(2000);// 每两秒打印一次. System.out.println(System.currentTimeMillis()/1000); } catch (InterruptedException e) { e.printStackTrace(); } }, 0, 2, TimeUnit.SECONDS); } }
使用Executors来建立要注意潜在宕机风险.其返回的线程池对象的弊端以下:ide
综上所述, 在可能有大量请求的线程池场景中, 更推荐自定义ThreadPoolExecutor来建立线程池, 具体构造函数配置以下:函数
通常根据任务类型进行区分, 假设CPU为N核工具
主要存放等待执行的线程, ThreadPoolExecutor中支持自定义该队列来实现不一样的排队队列.oop
线程池提供了一些回调方法, 具体使用以下所示.源码分析
ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("准备执行任务: " + r.toString()); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("结束任务: " + r.toString()); } @Override protected void terminated() { System.out.println("线程池退出"); } };
能够在回调接口中, 对线程池的状态进行监控, 例如任务执行的最长时间, 平均时间, 最短期等等, 还有一些其余的属性以下:
线程池满负荷运转后, 由于时间空间的问题, 可能须要拒绝掉部分任务的执行.
jdk提供了RejectedExecutionHandler接口, 并内置了几种线程拒绝策略
ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("reject task: " + r.toString()); } });
线程工厂用于建立池里的线程. 例如在工厂中都给线程setDaemon(true), 这样程序退出的时候, 线程自动退出.或者统一指定线程优先级, 设置名称等等.
class NamedThreadFactory implements ThreadFactory { private static final AtomicInteger threadIndex = new AtomicInteger(0); private final String baseName; private final boolean daemon; public NamedThreadFactory(String baseName) { this(baseName, true); } public NamedThreadFactory(String baseName, boolean daemon) { this.baseName = baseName; this.daemon = daemon; } public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, this.baseName + "-" + threadIndex.getAndIncrement()); thread.setDaemon(this.daemon); return thread; } }
跟直接new Thread不同, 局部变量的线程池, 须要手动关闭, 否则会致使线程泄漏问题. 默认提供两种方式关闭线程池. - shutdown: 等全部任务, 包括阻塞队列中的执行完, 才会终止, 可是不会接受新任务. - shutdownNow: 当即终止线程池, 打断正在执行的任务, 清空队列.
ctl是ThreadPoolExecutor的一个重要属性,它记录着ThreadPoolExecutor的线程数量和线程状态。
//Integer有32位,其中前三位用于记录线程状态,后29位用于记录线程的数量. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //表示用于记录线程数量的位数 private static final int COUNT_BITS = Integer.SIZE - 3; //将1左移COUNT_BITS位再减1,表示能表示的最大线程数 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//用ctl前三位分别表示线程池的状态 //(前三位为111)接受新任务而且处理已经进入队列的任务 private static final int RUNNING = -1 << COUNT_BITS; //(前三位为000)不接受新任务,可是处理已经进入队列的任务 private static final int SHUTDOWN = 0 << COUNT_BITS; //(前三位001)不接受新任务,不处理已经进入队列的任务,而且中断正在执行的任务 private static final int STOP = 1 << COUNT_BITS; //(前三位010)全部任务执行完成,workerCount为0。线程转到了状态TIDYING会执行terminated()钩子方法 private static final int TIDYING = 2 << COUNT_BITS; //(前三位011)任务已经执行完成 private static final int TERMINATED = 3 << COUNT_BITS;
//状态值就是只关心前三位的值,因此把后29位清0 private static int runStateOf(int c) { return c & ~CAPACITY; } //线程数量就是只关心后29位的值,因此把前3位清0 private static int workerCountOf(int c) { return c & CAPACITY; } //两个数相或 private static int ctlOf(int rs, int wc) { return rs | wc; }
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //判断当前活跃线程数是否小于corePoolSize if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true))//调用addWorker建立线程执行任务 return; c = ctl.get(); } //若是不小于corePoolSize,则将任务添加到workQueue队列。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get();//再次获取ctl的状态 //若是不在运行状态了,那么就从队列中移除任务 if (! isRunning(recheck) && remove(command)) reject(command); //若是在运行阶段,可是Worker数量为0,调用addWorker方法 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //尝试建立非核心线程若是建立失败就会调用reject拒绝接受任务。 else if (!addWorker(command, false)) reject(command); }
//调用handler的rejectedExecution(command,this)方法。handler是RejectedExecutionHandler接口,默认实现是AbortPolicy final void reject(Runnable command) { handler.rejectedExecution(command, this); }
addWorker方法用于建立线程,而且经过core参数表示该线程是不是核心线程,若是返回true则表示建立成功,不然失败。addWorker的代码以下所示:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);//获得线程池的运行状态 // rs>=SHUTDOWN为false,即线程池处于RUNNING状态. // rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()这个条件为true,也就意味着三个条件同时知足,即线程池状态为SHUTDOWN且firstTask为null且队列不为空,这种状况为处理队列中剩余任务。上面提到过当处于SHUTDOWN状态时,不接受新任务,可是会处理完队列里面的任务。若是firstTask不为null,那么就属于添加新任务;若是firstTask为null,而且队列为空,那么就不须要再处理了。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || //若是建立的是非核心线程(core=false)时,则须要判断当前线程数wc>=maximumPoolSize,若是返回false,建立非核心线程失败。 //若是建立的是核心线程(core=true)时,则须要判断当前线程数wc>=corePoolSize,若是返回false,建立核心线程失败。 wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))//worker+1执行成功,那么跳出外循环 break retry; c = ctl.get(); if (runStateOf(c) != rs)//再次判断当前状态,若是新获取的状态和当前状态不一致,则再次进入外循环 continue retry; // else CAS failed due to workerCount change; retry inner loop } } /* 一旦跳出外循环,表示能够建立建立线程,这里具体是Worker对象,Worker实现了Runnable接口而且继承AbstractQueueSynchronizer,内部维持一个Runnable的队列。try块中主要就是建立Worker对象,而后将其保存到workers中,workers是一个HashSet,表示工做线程的集合。而后若是添加成功,则开启Worker所在的线程。若是开启线程失败,则调用addWorkerFailed方法,addWokerFailed用于回滚worker线程的建立。 */ boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //以firstTask做为Worker的第一个任务建立Worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//对整个线程池加锁 try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//启动启动这个线程 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
private void addWorkerFailed(Worker w) { //对整个线程成绩加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //移除Worker对象 if (w != null) workers.remove(w); //减少worker数量 decrementWorkerCount(); //检查termination状态 tryTerminate(); } finally { mainLock.unlock(); } }
addWorkerFailed首先从workers集合中移除线程,而后将wokerCount减1,最后检查终结。
tryTerminate()方法用于检查是否有必要将线程池状态转移到TERMINATED。
final void tryTerminate() { for (;;) { int c = ctl.get(); /* 状态判断,若是有符合如下条件之一。则跳出循环 (1)线程池处于RUNNING状态 (2)线程池状态处于TIDYING状态 (3)线程池状态处于SHUTDOWN状态而且队列不为空 若是不知足上述的状况,那么目前状态属于SHUTDOWN切队列为空,或者状态属于STOP,那么调用interruptIdleWorkers方法中止一个Worker线程,而后退出。 */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } /* 若是没有退出循环的话,那么就首先将状态设置成TIDYING,而后调用terminated方法,最后设置状态为TERMINATED。terminated方法是个空实现,用于当线程池终结时处理一些事情。 */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
Worker继承自AbstractQueuedSynchronizer并实现Runnbale接口。AbstractQueuedSynchronizer提供了一个实现阻塞锁和其余同步工具,好比信号量、事件等依赖于等待队列的框架。Worker的构造方法中会使用threadFactory构造线程变量并持有run方法调用了runWorker方法,将线程委托给主循环线程。
Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask;//设置该线程的 this.thread = getThreadFactory().newThread(this);//建立一个线程 } //当我咱们启动一个线程时就会触发Worker中的此方法 public void run() { runWorker(this); }
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask;//首次任务是建立Worker时添加的任务 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //线程调用runWoker,会while循环调用getTask方法从workerQueue里读取任务,而后执行任务。只要getTask方法不返回null,此线程就不会退出。 while (task != null || (task = getTask()) != null) { w.lock();//对Worker加锁 //若是线程池中止了,那么中断线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task);//空实现,任务运行以前的操做 Throwable thrown = null; try { task.run();//执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);//空实现,任务运行以后的操做 } } finally { task = null;//任务执行完毕后,将task设置为null w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //必要时检查队列是否为空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); //判断是否容许超时,wc>corePoolSize则是判断当前线程数是否大于corePoolSize。 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //若是当前线程数大于corePoolSize, //则会调用workQueue的poll方法获取任务,超时时间是keepAliveTime。 //若是超过keepAliveTime时长,poll返回了null, //上边提到的while循序就会退出,线程也就执行完了。 //若是当前线程数小于corePoolSize, //则会调用workQueue的take方法阻塞当前线程,不会退出 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
参考地址: