在web开发中,服务器须要接受并处理请求,因此会为一个请求来分配一个线程来进行处理。若是每次请求都新建立一个线程的话实现起来很是简便,可是存在一个问题:若是并发的请求数量很是多,但每一个线程执行的时间很短,这样就会频繁的建立和销毁线程,如此一来会大大下降系统的效率。可能出现服务器在为每一个请求建立新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。git
那么有没有一种办法使执行完一个任务,并不被销毁,而是能够继续执行其余的任务呢?这就是线程池的目的了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。经过对多个任务重用线程,线程建立的开销被分摊到了多个任务上。github
实现的线程池须要知足如下基本条件:web
一、线程必须在池子已经建立好了,而且能够保持住,要有容器保存多个线程;
二、线程还要可以接受外部的任务,运行这个任务。容器保持这个来不及运行的任务.面试
如下是线程池的具体实现:算法
线程池中实现了任务队列,用来保存全部的任务;工做线程,来执行具体的任务。数据库
public class MyThreadPool2 { // 线程池中默认线程的个数为5 private static int WORK_NUM = 5; // 队列默认任务个数为100 private static int TASK_COUNT = 100; // 用户在构造这个池,但愿的启动的线程数 private final int worker_num; // 工做线程组 private WorkThread[] workThreads; // 任务队列,做为一个缓冲 private final BlockingQueue<Runnable> taskQueue; // 建立具备默认线程个数的线程池 public MyThreadPool2() { this(WORK_NUM,TASK_COUNT); } // 建立线程池,worker_num为线程池中工做线程的个数 public MyThreadPool2(int worker_num,int taskCount) { if (worker_num<=0) worker_num = WORK_NUM; if(taskCount<=0) taskCount = TASK_COUNT; this.worker_num = worker_num; taskQueue = new ArrayBlockingQueue<>(taskCount); workThreads = new WorkThread[worker_num]; for(int i=0;i<worker_num;i++) { workThreads[i] = new WorkThread(); workThreads[i].start(); } } // 执行任务,其实只是把任务加入任务队列,何时执行有线程池管理器决定 public void execute(Runnable task) { try { taskQueue.put(task); } catch (InterruptedException e) { e.printStackTrace(); } } // 销毁线程池,该方法保证在全部任务都完成的状况下才销毁全部线程,不然等待任务完成才销毁 public void destroy() { // 工做线程中止工做,且置为null System.out.println("ready close pool....."); for(int i=0;i<worker_num;i++) { workThreads[i].stopWorker(); workThreads[i] = null;//help gc } taskQueue.clear();// 清空任务队列 } // 覆盖toString方法,返回线程池信息:工做线程个数和已完成任务个数 @Override public String toString() { return "WorkThread number:" + worker_num + " wait task number:" + taskQueue.size(); } /** * 内部类,工做线程 */ private class WorkThread extends Thread{ @Override public void run(){ Runnable r = null; try { while (!isInterrupted()) { r = taskQueue.take(); if(r!=null) { System.out.println(getId()+" ready exec :"+r); r.run(); } r = null;//help gc; } } catch (Exception e) { // TODO: handle exception } } public void stopWorker() { interrupt(); } } }
如下是测试程序:后端
分别建立多个任务,并放入线程池进行执行。缓存
public class TestMyThreadPool { public static void main(String[] args) throws InterruptedException { // 建立3个线程的线程池 MyThreadPool2 t = new MyThreadPool2(3,0); t.execute(new MyTask("testA")); t.execute(new MyTask("testB")); t.execute(new MyTask("testC")); t.execute(new MyTask("testD")); t.execute(new MyTask("testE")); t.execute(new MyTask("testF")); t.execute(new MyTask("testG")); t.execute(new MyTask("testH")); System.out.println(t); Thread.sleep(10000); t.destroy();// 全部线程都执行完成才destory System.out.println(t); } // 任务类 static class MyTask implements Runnable { private String name; private Random r = new Random(); public MyTask(String name) { this.name = name; } public String getName() { return name; } @Override public void run() {// 执行任务 try { Thread.sleep(r.nextInt(1000)+2000); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:" +Thread.currentThread().isInterrupted()); } System.out.println("任务 " + name + " 完成"); } } }
Executor框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。服务器
Executor框架的类继承关系以下图:网络
J.U.C中有三个Executor接口:
下面分别进行介绍:
Executor接口只有一个execute方法,用来替代一般建立或启动线程的方法。
public interface Executor { void execute(Runnable command); }
Executor接口只有一个execute方法,用来替代一般建立或启动线程的方法。
executor.execute(new Thread())
对于不一样的Executor实现,execute()方法多是建立一个新线程并当即启动,也有多是使用已有的工做线程来运行传入的任务,也多是根据设置线程池的容量或者阻塞队列的容量来决定是否要将传入的线程放入阻塞队列中或者拒绝接收传入的线程。
ExecutorService接口继承自Executor接口,提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行情况而生成 Future 的方法。增长了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。若是须要支持即时关闭,也就是shutDownNow()方法,则任务须要正确处理中断。
ScheduledExecutorService扩展ExecutorService接口并增长了schedule方法。调用schedule方法能够在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔按期执行任务的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。
基本使用流程以下:
ThreadPoolExecutor继承自AbstractExecutorService,也实现了ExecutorService接口。JDK中的提供的内置线程池基本都基于ThreadPoolExecutor实现,后面会仔细介绍。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
构造方法中的字段含义以下:
workQueue:保存等待执行的任务的阻塞队列,当提交一个新的任务到线程池之后, 线程池会根据当前线程池中正在运行着的线程的数量来决定对该任务的处理方式,主要有如下几种处理方式:
- 使用直接切换队列:这种方式经常使用的队列是SynchronousQueue.
- 使用无界队列:通常使用基于链表的阻塞队列LinkedBlockingQueue。若是使用这种方式,那么线程池中可以建立的最大线程数就是corePoolSize,而maximumPoolSize就不会起做用了(后面也会说到)。当线程池中全部的核心线程都是RUNNING状态时,这时一个新的任务提交就会放入等待队列中。
- 使用有界队列:通常使用ArrayBlockingQueue。使用该方式能够将线程池的最大线程数量限制为maximumPoolSize,这样可以下降资源的消耗,但同时这种方式也使得线程池对线程的调度变得更困难,由于线程池和队列的容量都是有限的值,因此要想使线程池处理任务的吞吐率达到一个相对合理的范围,又想使线程调度相对简单,而且还要尽量的下降线程池对资源的消耗,就须要合理的设置这两个数量。
threadFactory:它是ThreadFactory类型的变量,用来建立新线程。默认使用Executors.defaultThreadFactory() 来建立线程。使用默认的ThreadFactory来建立线程时,会使新建立的线程具备相同的NORM_PRIORITY优先级而且是非守护线程,同时也设置了线程的名称。
handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。若是阻塞队列满了而且没有空闲的线程,这时若是继续提交任务,就须要采起一种策略处理该任务。
线程池提供了4种策略:
- AbortPolicy:直接抛出异常,这是默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
提交任务执行,主要有execute和submit两种方式,主要区别是后者须要有返回值。
下面主要介绍execute的流程:
简单来讲,在执行execute()方法时且状态一直是RUNNING时,的执行过程以下:
workerCount < corePoolSize
,则建立并启动一个线程来执行新提交的任务;workerCount >= corePoolSize
,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;workerCount >= corePoolSize && workerCount < maximumPoolSize
,且线程池内的阻塞队列已满,则建立并启动一个线程来执行新提交的任务;workerCount >= maximumPoolSize
,而且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。整个流程能够用下图来总结:
接下来结合代码进行分析:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * clt记录着runState和workerCount */ int c = ctl.get(); /* * workerCountOf方法取出低29位的值,表示当前活动的线程数; * 若是当前活动线程数小于corePoolSize,则新建一个线程放入线程池中; * 并把任务添加到该线程中。 */ if (workerCountOf(c) < corePoolSize) { /* * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断仍是maximumPoolSize来判断; * 若是为true,根据corePoolSize来判断; * 若是为false,则根据maximumPoolSize来判断 */ if (addWorker(command, true)) return; /* * 若是添加失败,则从新获取ctl值 */ c = ctl.get(); } /* * 若是当前线程池是运行状态而且任务添加到队列成功 */ if (isRunning(c) && workQueue.offer(command)) { // 从新获取ctl值 int recheck = ctl.get(); // 再次判断线程池的运行状态,若是不是运行状态,因为以前已经把command添加到workQueue中了, // 这时须要移除该command // 执行事后经过handler使用拒绝策略对该任务进行处理,整个方法返回 if (! isRunning(recheck) && remove(command)) reject(command); /* * 获取线程池中的有效线程数,若是数量是0,则执行addWorker方法 * 这里传入的参数表示: * 1. 第一个参数为null,表示在线程池中建立一个线程,但不去启动; * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断; * 若是判断workerCount大于0,则直接返回,在workQueue中新增的command会在未来的某个时刻被执行。 */ else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* * 若是执行到这里,有两种状况: * 1. 线程池已经不是RUNNING状态; * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize而且workQueue已满。 * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize; * 若是失败则拒绝该任务 */ else if (!addWorker(command, false)) reject(command); }
addWorker方法的主要工做是在线程池中建立一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前须要判断当前活动线程数是否少于maximumPoolSize,代码以下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 获取运行状态 int rs = runStateOf(c); /* * 这个if判断 * 若是rs >= SHUTDOWN,则表示此时再也不接收新任务; * 接着判断如下3个条件,只要有1个不知足,则返回false: * 1. rs == SHUTDOWN,这时表示关闭状态,再也不接受新提交的任务,但却能够继续处理阻塞队列中已保存的任务 * 2. firsTask为空 * 3. 阻塞队列不为空 * * 首先考虑rs == SHUTDOWN的状况 * 这种状况下不会接受新提交的任务,因此在firstTask不为空的时候会返回false; * 而后,若是firstTask为空,而且workQueue也为空,则返回false, * 由于队列中已经没有任务了,不须要再添加线程了 */ // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取线程数 int wc = workerCountOf(c); // 若是wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false; // 这里的core是addWorker方法的第二个参数,若是为true表示根据corePoolSize来比较, // 若是为false则根据maximumPoolSize来比较。 // if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 尝试增长workerCount,若是成功,则跳出第一个for循环 if (compareAndIncrementWorkerCount(c)) break retry; // 若是增长workerCount失败,则从新获取ctl的值 c = ctl.get(); // Re-read ctl // 若是当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 根据firstTask来建立Worker对象 w = new Worker(firstTask); // 每个Worker对象都会建立一个线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // rs < SHUTDOWN表示是RUNNING状态; // 若是rs是RUNNING状态或者rs是SHUTDOWN状态而且firstTask为null,向线程池中添加线程。 // 由于在SHUTDOWN时不会在添加新的任务,但仍是会执行workQueue中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers是一个HashSet workers.add(w); int s = workers.size(); // largestPoolSize记录着线程池中出现过的最大线程数量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
关闭线程池一般有以下两种方式:
一般来说,根据任务的性质来分,能够划分为:计算密集型(CPU),IO密集型,混合型。
线程池的运行状态. 线程池一共有五种状态, 分别是:
下图是线程池的状态转换过程,
一般开发者都是利用 Executors 提供的通用线程池建立方法,去建立不一样配置的线程池,主要区别在于不一样的 ExecutorService 类型或者不一样的初始参数。
Executors 目前提供了 5 种不一样的线程池建立配置:
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false, 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS); }
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
如下是ScheduledThreadPoolExecutor的构造函数,该类继承于ThreadPoolExecutor,能够看到任务存放在DelayedWorkQueue。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }
类中提供了多种执行定时任务的方法,
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
总结下来,主要分三种:
注意scheduleAtFixedRate和scheduleWithFixedDelay的区别,下图给出了二者执行任务时间上的示意图。scheduleAtFixedRate老是间隔固定的时间来执行task,可是若是下图中Task1执行超时,也就是超过了Fixed Time,当Task1执行完以后,Task2将马上执行。scheduleWithFixedDelay不一样的是,每一个任务老是在上一个任务结束以后,等待固定的Fixed Delay Time后开始执行。
public class ScheduleWorkerTime implements Runnable{ public final static int Long_8 = 8;//任务耗时8秒 public final static int Short_2 = 2;//任务耗时2秒 public final static int Normal_5 = 5;//任务耗时5秒 public static SimpleDateFormat formater = new SimpleDateFormat( "HH:mm:ss"); public static AtomicInteger count = new AtomicInteger(0); @Override public void run() { if(count.get()==0) { System.out.println("Long_8....begin:"+formater.format(new Date())); SleepTools.second(Long_8); System.out.println("Long_8....end:"+formater.format(new Date())); count.incrementAndGet(); }else if(count.get()==1) { System.out.println("Short_2 ...begin:"+formater.format(new Date())); SleepTools.second(Short_2); System.out.println("Short_2 ...end:"+formater.format(new Date())); count.incrementAndGet(); }else { System.out.println("Normal_5...begin:"+formater.format(new Date())); SleepTools.second(Normal_5); System.out.println("Normal_5...end:"+formater.format(new Date())); count.incrementAndGet(); } } public static void main(String[] args) { ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1); //任务间隔6秒 schedule.scheduleAtFixedRate(new ScheduleWorkerTime(), 0, 6000, TimeUnit.MILLISECONDS); } }
代码中定义了3个任务,分别执行8s,2s,5s,设置的固定间隔为6s。从输出结果能够看到,第一个场任务结束后,第二个任务马上开始执行,第二个任务执行完时,到了10s,此时等待2s后,第三个任务开始执行。由此能够看到,当前序任务没超时,后续任务会按照指定的时间进行执行;若是有超时,则会立刻执行。
执行结果以下: Long_8....begin:14:56:27 Long_8....end:14:56:35 Short_2 ...begin:14:56:35 Short_2 ...end:14:56:37 Normal_5...begin:14:56:39 Normal_5...end:14:56:44
注意最好在提交给ScheduledThreadPoolExecutor的任务要catch异常,不然发生异常以后,程序会终止运行。
当向Executor提交多个任务而且但愿得到它们在完成以后的结果,若是用FutureTask,能够循环获取task,并调用get方法去获取task执行结果,可是若是task还未完成,获取结果的线程将阻塞直到task完成,因为不知道哪一个task优先执行完毕,使用这种方式效率不会很高。
在jdk5时候提出接口CompletionService,它整合了Executor和BlockingQueue的功能,能够更加方便在多个任务执行时,按任务完成顺序获取结果。
CompletionService的使用流程以下:
声明task执行载体,线程池executor;
声明CompletionService,来包装执行task的线程池,存放已完成状态task的阻塞队列,队列默认为基于链表结构的阻塞队列LinkedBlockingQueue;
调用submit方法提交task;
调用take方法获取已完成状态task。
public class CompletionServiceTest { // 声明线程池 private static ExecutorService executorService = Executors.newFixedThreadPool(100); public void test() { // 声明CompletionService包装Executor CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executorService); final int groupNum = 10000000 / 100; for ( int i = 1; i <= 100; i++) { int start = (i-1) * groupNum + 1; int end = i * groupNum; completionService.submit(new Callable<Long>() { @Override public Long call() throws Exception { Long sum = 0L; for (int j = start; j <= end; j++) { sum += j; } return sum; } }); } long result = 0L; try { for (int i = 0; i < 100; i++) { long taskResult = completionService.take().get(); System.out.println(taskResult); result += taskResult; } } catch (Exception e) { e.printStackTrace(); } System.out.println("the result is " + result); } public static void main(String[] args) { new CompletionServiceTest().test(); } }
CompletionService接口提供五个方法:
Future
提交Callable类型的task;
Future
提交Runnable类型的task;
Future
获取并移除已完成状态的task,若是目前不存在这样的task,则等待;
Future
获取并移除已完成状态的task,若是目前不存在这样的task,返回null;
Future
获取并移除已完成状态的task,若是在指定等待时间内不存在这样的task,返回null。
CompletionService与普通用FutureTask获取结果的最大不一样是,能够按照任务完成的顺序返回结果。具体是如何实现的呢?
内部封装了一个QueueingFuture对象,而且实现了done方法,在task执行完成以后将当前task添加到completionQueue。
private static class QueueingFuture<V> extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue) { super(task, null); this.task = task; this.completionQueue = completionQueue; } private final Future<V> task; private final BlockingQueue<Future<V>> completionQueue; protected void done() { completionQueue.add(task); } }
done方法将在FutureTask的finishCompletion方法中被调用。只是默认done方法是空的,completionQueue实现了该方法。
/** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
参考:
本文由『后端精进之路』原创,首发于博客 http://teckee.github.io/ , 转载请注明出处
搜索『后端精进之路』关注公众号,马上获取最新文章和价值2000元的BATJ精品面试课程。