假设咱们编写了一个Servlet应用,当用户经过浏览器发起一个请求到达咱们服务器时,传统的Servlet应用通常针对一个用户请求建立一个线程去执行请求,等到请求执行完毕后,再销毁线程。这种设计在用户量几百或者几千的状况下通常不会有什么大问题,可是若是咱们的用户量上达几万甚至几十万几百万,频繁的建立、销毁线程,将会给服务器带来巨大的开销,甚至会出现OOM(Out Of Memory)异常。所以,为了节省资源的消耗,提升资源的利用率,引出了线程池化技术。java
线程池会维护若干线程,等待任务的到来,避免重复建立、销毁线程形成的消耗,提升任务的响应速度,不须要建立线程就能够当即执行任务,使用线程池能够进行统一的分配、调优和监控,避免无节制的建立线程,提升系统的稳定性。浏览器
固然,线程池也并不是十全十美,它也有力不从心的场景,例如:线程池适合生命周期较短的任务,不适合耗时较长的任务;线程池没法设置任务的优先级,也没法单独启动或者终止某个线程。缓存
如今,咱们对比一下线程池执行任务和建立线程执行任务的优点。在ThreadTest中,咱们声明了一个线程安全的list,并建立10000个线程并发往list添加随机值,最后咱们等待全部线程执行完毕,打印程序的执行时间和list的大小。安全
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; public class ThreadTest { public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); final Random random = new Random(); final List<Integer> list = Collections.synchronizedList(new ArrayList<>()); final List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 10000; i++) { Thread thread = new Thread(() -> list.add(random.nextInt())); thread.start(); threads.add(thread); } for (Thread thread : threads) { thread.join(); } System.out.println("时间:" + (System.currentTimeMillis() - start)); System.out.println("大小:" + list.size()); } }
执行结果:bash
时间:882 大小:10000
能够看到list的长度为10000,程序执行了882毫秒。服务器
下面,咱们用线程池的方式来执行相同的逻辑,咱们声明一个线程池executorService,并往线程池中提交10000个任务,每一个任务都向list添加一个随机值:网络
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class ThreadPoolTest { public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); final Random random = new Random(); final List<Integer> list = Collections.synchronizedList(new ArrayList<>()); ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10000; i++) { executorService.execute(() -> list.add(random.nextInt())); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); System.out.println("时间:" + (System.currentTimeMillis() - start)); System.out.println("大小:" + list.size()); } }
执行结果:并发
时间:52 大小:10000
能够看到线程池的执行时间相比建立线程,大大缩短。dom
下面,咱们就从源码的角度,来剖析线程池的工做原理。ThreadPoolExecutor是java.util.concurrent包下提供的线程池实现类,下图是ThreadPoolExecutor类的继承关系,咱们将从上至下逐个分析ThreadPoolExecutor的父类:Executor、ExecutorService、AbstractExecutorService。异步
咱们先来看下Executor接口定义:
public interface Executor { void execute(Runnable command); }
Executor容许咱们提交若干待执行的任务,咱们再也不像之前同样用new Thread(new RunnableTask()).start()的方式启动一个线程去执行RunnableTask的run()方法,取而代之的是用Executor的实现类去执行,好比:
Executor executor = anExecutor; executor.execute(new RunnableTask1()); executor.execute(new RunnableTask2()); ...
Executor提供了一种新的方式,咱们只需提交任务,Executor自身负责如何调度线程来执行任务。Executor并不要求任务的执行必须是异步的,也能够在提交完任务后,同步执行任务:
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } }
一般状况下,Executor会将提交过来的任务放在另外一个线程执行,而不是经过调用线程来执行:
class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
一些Executor接口的实如今调度线程执行任务时会添加一些限制,好比咱们能够以代理模式的思想来封装Executor:
class SerialExecutor implements Executor { final Queue<Runnable> tasks = new ArrayDeque<Runnable>(); final Executor executor; Runnable active; SerialExecutor(Executor executor) { this.executor = executor; } public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { executor.execute(active); } } }
在建立SerialExecutor对象时,会要求传入一个executor对象,执行任务并非SerialExecutor对象自己,SerialExecutor并不执行任务,只是将任务缓存到队列tasks,而executor才是真正负责执行任务。
ExecutorService扩展了Executor,ExecutorService不但具有Executor执行任务的能力,咱们还能够关闭ExecutorService,这将使ExecutorService拒绝接受新提交的任务。ExecutorService.submit(...)方法是基于Executor.execute(Runnable command)封装的,execute方法没有任何返回,而submit会返回一个Future对象,经过Future对象咱们能够取消任务或者等待任务将来的执行结果。
public interface ExecutorService extends Executor { /** * 关闭线程池,调用此方法后再也不接受新任务,但会处理线程池内 * 还没有完成的任务,若是线程池已经关闭,再次调用此方法将无事发生。 * 这个方法不会等待已提交但还没有完成的任务执行完毕,须要调用awaitTermination(long timeout, TimeUnit unit) * 来等待。 */ void shutdown(); /** * 调用此方法会尝试中止全部正在运行的线程,好比:调用Thread.interrupt() * 标记线程已中断,若是任务没有响应中断则线程没法中止。这个方法会返回还没有 * 执行的任务列表,它不会等待正在执行的任务执行完毕,须要调用awaitTermination(long timeout, TimeUnit unit) * 来等待。 */ List<Runnable> shutdownNow(); /** * 判断线程池是否已关闭,true为关闭。 */ boolean isShutdown(); /** * 若是调用shutdown()或者shutdownNow()后,全部任务都已完成,则返回true */ boolean isTerminated(); /** * 使当前调用此方法线程陷入阻塞,直到: * 1.调用中止线程池方法后,完成全部任务。 * 2.阻塞超时。 * 3.调用此方法线程被中断。 * * @param timeout 最大等待时长 * @param unit 时长单位 * @return 若是任务都执行完毕返回true,若是超时或中断则返回false */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; /** * 提交一个有返回值的任务后,将返回一个Future对象表明任务的运行结果, * 能够调用Future.get()得到任务的执行结果,若是提交任务后想当即得到结果, * 能够用:result = exec.submit(aCallable).get();这样的方式得到, * 调用线程将陷入阻塞,直到线程池执行完任务,执行结果被放入到Future对象。 * * @param task 待执行的任务。 * @param <T> 任务执行结果的类型。 * @return 返回Future对象,线程池执行完毕任务后,执行结果会被放入到Future对象。 */ <T> Future<T> submit(Callable<T> task); /** * 提交一个可执行的任务和给定的执行结果,并返回一个Future对象表明该任务未来的运行结果, * 若是任务成功执行,Future对象将返回咱们给定的执行结果。 * * @param task 待执行的任务 * @param result 任务执行完毕后的返回结果 * @param <T> 返回结果的类型 * @return 返回Future对象,线程池执行完毕任务后,执行结果会被放入到Future对象。 */ <T> Future<T> submit(Runnable task, T result); /** * 提交待执行的任务并返回Future对象表明该任务将来的运行结果,若是任务执行成功, * 调用Future.get()将返回null。 * * @param task 待执行的任务 * @return 返回Future对象,表明该任务将来的运行结果。 */ Future<?> submit(Runnable task); }
咱们能够用ExecutorService来模拟一个网络服务,用Executors.newFixedThreadPool(int)工厂方法生成的线程池中的线程来处理传入的网络请求:
class NetworkService implements Runnable { private final ServerSocket serverSocket; private final ExecutorService pool; public NetworkService(int port, int poolSize) throws IOException { serverSocket = new ServerSocket(port); pool = Executors.newFixedThreadPool(poolSize); } public void run() { // run the service try { for (;;) { pool.execute(new Handler(serverSocket.accept())); } } catch (IOException ex) { pool.shutdown(); } } } class Handler implements Runnable { private final Socket socket; Handler(Socket socket) { this.socket = socket; } public void run() { // read and service request on socket } }
关闭ExecutorService分两个阶段,首先调用shutdown()拒绝再有新的任务提交,而后在必要的时候调用shutdownNow(),尝试中断正在执行的任务。
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } }
AbstractExecutorService是juc(java.util.concurrent)包下提供的ExecutorService接口的默认实现,在newTaskFor方法中返回FutureTask做为RunnableFuture接口的实现。能够看到不管是Runnable仍是Callable类型的任务,都会被封装成RunnableFuture类型的任务来执行。
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } }