Java并发编程基础(五)之Executor

线程池与Executor框架

线程池的做用

线程池提供了一种限制和管理资源(包括执行一个任务)。每一个线程池还维护一些基本统计信息,例如完成任务的数量。整体上讲,他有如下好处:编程

  • 下降资源消耗:经过反复利用已经建立的线程下降线程建立和销毁形成的额消耗.小程序

  • 提升响应速度:当任务到达时,任五能够不须要的等到线程建立就能当即执行。服务器

  • 提升线程的可管理性:线程时稀缺资源,若是无限的建立,会形成计算机资源的浪费,还会下降系统的稳定性,使用线程池能够实现统一的管理分配。并发

Executor框架

Executor框架是Java5引入的,使用Executor能够更好的使用线程,使用效率更高,除此以外还有助于避免this逃逸(指构造函数在返回该对象引用以前就被其余的线程持有)框架

Executor的组成部分

任务

执行的任务须要实现Runnble接口或者Callable接口(Runnable接口不会返回结果可是Callable接口能够返回结果)。Runnable接口或者Callable接口实现类能够被ThreadPoolExector或者ScheduledThreadPoolExecutor自行异步

任务执行

经过上面的图能够知道任务的执行的核心是Executor和继承Executor接口的ExecutorService接口,ScheduledThreadPoolExecutor和ThreadPoolExecutor这两个关键类实现了ExecutorService接口.函数

注意:经过查看ScheduledThreadPoolExecutor源码能够发现ScheduledThreadPoolExecutor其实是继承了ThreadPoolExeecutor并实现了ScheduledExecutorService,而ScheduledExecutorService又实现了ExecutorService。

结果返回

Future接口以及Future接口的实现类FutureTask类。当咱们把Runnable接口或者Callable接口的实现类提交(调用usubmit方法)给ThreadPoolExector时,会返回一个FutureTask对象.工具

public <T> Future<T> submit(Runnable task, T result) {
		return schedule(Executors.callable(task, result), 0, NANOSECONDS);
	}
 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
										   long delay,
										   TimeUnit unit) {
		if (callable == null || unit == null)
			throw new NullPointerException();
		RunnableScheduledFuture<V> t = decorateTask(callable,
			new ScheduledFutureTask<V>(callable,
									   triggerTime(delay, unit)));
		delayedExecute(t);
		return t;
	}

Executor框架使用示意图

1.主线程首先须要建立实现Runnable或者Callable接口的任务对象.备注:工具类Executors能够实现Runnable对象和Callable对象之间的相互转换。(Executors.callable(Runnable task)或者Executor.callable(Runnable task,Object result))this

2.将建立的任务Runnable或者Callable对象直接交给ExecutorService执行 (ExecutorService.execute(Runnable command));湖泊这也能够把Runnable对象或者Callable对象提交给E小ecotrService执行(ExecutorService.submit(Callable task))..net

3.执行ExecutorService.submit(Callable task),将返回一个实现Future接口的对象。

4.主线程能够执行FutureTask.get()方法来等待任务执行完成。主线程也能够执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消任务的执行。

ThreadPoolExecutor源码解读

根据源码能够知道,ThreadPoolExecutor有四个构造方法,可是其余几个都时创建在下面解析的这个构造方法上。

/**
*用给定的初始值建立一个新的[@code](https://my.oschina.net/codeo) threadpoolExecutor
*参数。
*
*[@param](https://my.oschina.net/u/2303379) corepoolsize保留在池中的线程数,偶数
*若是它们是空闲的,除非设置了[@code](https://my.oschina.net/codeo) allowcorethreadTimeout
*[@param](https://my.oschina.net/u/2303379) maximumpoolsize容许的最大线程数
*池
*[@param](https://my.oschina.net/u/2303379) keepalivetime当线程数大于
*核心,这是多余空闲线程的最长时间
*将在终止前等待新任务。
*@param unit@code keepalivetime参数的时间单位
*@param workqueue用于在任务以前保存任务的队列
*执行。此队列将只包含@代码可运行
*由@code execute方法提交的任务。
*@param threadfactory执行器时要使用的工厂
*建立新线程
*@param handler执行被阻止时要使用的处理程序
*由于达到了线程边界和队列容量
*@throws illegalargumentexception if one of the following holds:<br>
*@code corepoolsize<0<br>
*@code keepaliveTime<0<br>
*@code maximumpoolsize<=0<br>
*@code maximumpoolsize<corepoolsize
*@throws nullpointerException if@code workqueue
*或@code threadFactory或@code handler为空
*/
public ThreadPoolExecutor(int corePoolSize,
							  int maximumPoolSize,
							  long keepAliveTime,
								  TimeUnit unit,
				 BlockingQueue<Runnable> workQueue,
							  ThreadFactory threadFactory,
		  RejectedExecutionHandler handler) {
		if (corePoolSize < 0 ||
			maximumPoolSize <= 0 ||
			maximumPoolSize < corePoolSize ||
			keepAliveTime < 0)
			throw new IllegalArgumentException();
		if (workQueue == null || threadFactory == null || handler == null)
			throw new NullPointerException();
		this.acc = System.getSecurityManager() == null ?
				null :
				AccessController.getContext();
		this.corePoolSize = corePoolSize;
		this.maximumPoolSize = maximumPoolSize;
		this.workQueue = workQueue;
		this.keepAliveTime = unit.toNanos(keepAliveTime);
		this.threadFactory = threadFactory;
		this.handler = handler;
	}

经过上面的图片和代码片断的解析能够知道该方法的参数比较多,并且都比较麻烦,因此不建议使用构造函数去建立ThreadPoolExecutor对象,可是可使用下面的几种方式建立ThreadPoolExecutor对象.

FixedThreadPool源码解读

/**
	  *	建立重用固定数量线程的线程池
	  *	使用提供的在须要时建立新线程。在任什么时候候,
	  *最多N个线程将处于活动处理状态
	  *任务。若是在全部线程
	  *活动,它们将在队列中等待,直到线程
	  *可用。若是任何线程在
	  *在关闭前执行,若是
	  *须要执行后续任务。池中的线程将
	  *存在,直到它显式执行服务关闭
	 * @param nThreads 线程池的线程数
	 * @param threadFactory 建立线程工厂类
	 * @return 返回新建立的线程池
	 * @throws NullPointerException if threadFactory is null
	 * @throws IllegalArgumentException if {@code nThreads <= 0}
	 */
	public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
		return new ThreadPoolExecutor(nThreads, nThreads,
									  0L, TimeUnit.MILLISECONDS,
									  new LinkedBlockingQueue<Runnable>(),
									  threadFactory);
	}

或者

	public static ExecutorService newFixedThreadPool(int nThreads) {
		return new ThreadPoolExecutor(nThreads, nThreads,
									  0L, TimeUnit.MILLISECONDS,
									  new LinkedBlockingQueue<Runnable>());
	}

FixedThreadPool的执行的方式大体如《并发编程艺术的》的示意图所示:

说明:

  1. 若是当前的运行的线程小于corePoolSize,则建立新的线程来执行任务

  2. 当前运行的线程等于corePoolSize后,将任务加入LinkedBlockingQueue<Runnable>

  3. 线程执行完成步骤1的任务以后,会在循环中反复的从LinkedBlockingQueue<Runnable>中获取任务来执行。

FixedThreadPool使用的LinkedBlockingQueue(队列容量为Integr.MAX_VALUE,若是构造函数没有设置大小的时候),有些文章说FixedThreadPool是无界队列是不正确的。除此以外还会有以下的影响:

1.当线程中的线程达到了CorePoolSize后,新任务将在无界对垒中等待,所以线程不会超过corePoolSize;

2.因为1,使用的没设置大小的LinkedBlockingQueue,将使得maxinumPoolSize将是一个无效的参数。

3.因为1,2使用不设置大小的LinkedBlockingQueue,将使得keepAliveTime将是无效的参数

4.运行中FixedThreadPool(未执行shudown()或者shudownNow())就不会拒绝任务.

SigleThreadExecutor

1.当前的运行的线程数少云corePoolSize,则建立一个新的线程执行任务。

2.当前线程池中有一个运行的线程后,将任务加入LinkedBlockQueue

3.线程执行完1中的任务后,会在循环中反复的从LinkedBlockingQueue中获取任务来执行。

CacheThreadPool

CacheThreadPool是一个会根据须要建立新线程的线程池。

/**
	 * 建立一个线程池,根据须要建立新线程,但会在先前构建的线程可用时重用它,
	 *并在须要时使用提供的ThreadFactory建立新线程。
	 * @param threadFactory 建立新线程使用的factory
	 * @return 新建立的线程池
	 * @throws NullPointerException 若是threadFactory为空
	 */
	public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
	//CachedThreadPool的corePoolSize被设置为空(0),maximumPoolSize被设置为Integer.MAX.VALUE,即它是无界的,这也就意味着若是主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断建立新的线程。极端状况下,这样会致使耗尽cpu和内存资源。
		return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
									  60L, TimeUnit.SECONDS,
									  new SynchronousQueue<Runnable>(),
									  threadFactory);
	}

	public static ExecutorService newCachedThreadPool() {
		return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
									  60L, TimeUnit.SECONDS,
									  new SynchronousQueue<Runnable>());
	}

ScheduledThreadPoolExecutor

scheduledThreadPoolExecutor主要用來在給定的延遲運行任務,或者按期執行任務。ScheduledThreadPoolExecutor使用任務隊列DelayQueue封裝了一個PriorityQueue,PrioprityQueue會對隊列中的任務進行排序,執行所需時間短的放到最前面執行,若是執行時間一樣則使先提交的先執行。

ScheduledThreadPoolExecutor和Time的比較

  • Time對系統時鐘的變化比較敏感,ScheduledThreadPoolExecutor則相反。

  • Timer只有一個執行執行現場,所以長時間運行任務能够延遲其余任務。ScheduledThreadPoolExecutor能够配置任意現場,此外,若是你想(通過ThreadFactory),你能够彻底控制創建的綫程。

  • 在TimerTask中抛出的运行异常会杀死一个线程,从而致使司机:(计划任务将再也不运行,ScheduledThreadExecutor不只捕获运行异常,还容许您再须要时处理他们(经过重写afterExecute方法)。抛出的异常将被取消,其余的任务能够继续运行。

ScheduledThreadPoolExecutor运行机制

ScheduledThreadPoolExecutor的执行主要包括两部分:

1.当调用ScheduledThreadPoolExecutor的scheduleAtFixRate()或者scheduleWirhFixedDelay()时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFuture接口的ScheduledFutureTask.

2.线程池中的线程从DelayQueue中获取ScheduledFutureTask,而后执行任务.

ScheduledThreadPoolExecutor未了实现周期性的执行任务,对ThreadPoolExecutor作了以下的修改:

  • 使用DelayQueue做为任务队列
  • 获取任务的方法不一样
  • 执行周期任务后,增长了额外的处理.

各类线程的使用场景

  • FixedThreadPool: 适用于为了知足资源管理需求,限制当前线程数量的应用场景。它适用于负载比较重的服务器;

  • SingleThreadExecutor: 适用于保证顺序地执行各个任务而且在任意时间点,不会有多个线程是活动的应用场景。

  • CachedThreadPool: 适用于执行不少的短时间异步任务的小程序,或者是负载较轻的服务器;

  • ScheduledThreadPoolExecutor: 适用于须要多个后台执行周期任务,同时为了知足资源管理需求而须要限制后台线程的数量的应用场景,

  • SingleThreadScheduledExecutor: 适用于须要单个后台线程执行周期任务,同时保证顺序地执行各个任务的应用场景。

相关文章
相关标签/搜索