版权声明:本文为博主原创文章,未经博主容许不得转载
源码:github.com/AnliaLee
你们要是看到有错误的地方或者有啥好的建议,欢迎留言评论html
本篇博客咱们将开始探索由上一章引出的线程池(ThreadPoolExecutor)的知识。因为内含大量示例,致使文章篇幅有点长,望你们耐心食用...java
往期回顾
大话Android多线程(一) Thread和Runnable的联系和区别
大话Android多线程(二) synchronized使用解析
大话Android多线程(三) 线程间的通讯机制之Handler
大话Android多线程(四) Callable、Future和FutureTaskandroid
简介这东西也写不出啥花样来,遂直接偷懒引用别人的吧哈哈git
new Thread()的缺点
• 每次new Thread()耗费性能
• 调用new Thread()建立的线程缺少管理,被称为野线程,并且能够无限制建立,之间相互竞争,会致使过多占用系统资源致使系统瘫痪
• 不利于扩展,好比如定时执行、按期执行、线程中断
采用线程池的优势
• 重用存在的线程,减小对象建立、消亡的开销,性能佳
• 可有效控制最大并发线程数,提升系统资源的使用率,同时避免过多资源竞争,避免堵塞
• 提供定时执行、按期执行、单线程、并发数控制等功能程序员
以上内容摘自Android线程管理之ExecutorService线程池github
线程池ThreadPoolExecutor的继承关系以下图所示算法
下一节咱们将介绍ThreadPoolExecutor的构造参数数组
构造ThreadPoolExecutor时需传入许多参数,咱们以参数最多的那个构造方法为例(由于参数threadFactory和handler是有默认值的,因此和其余几个构造方法的区别只是有无设置这两个参数的入口而已,就不赘述了)缓存
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 复制代码
咱们将参数分析代入到程序员开发的故事中,让你们更容易理解,如下是参数介绍多线程
int corePoolSize:
计划招聘核心程序员的数量。核心程序员是公司的顶梁柱,公司接到甲方需求(即任务)后会优先分配给核心程序员去开发
线程池中核心线程的数量
int maximumPoolSize:
计划招聘程序员的总数。程序员由核心程序员和实习生组成
线程池中线程数的最大值
long keepAliveTime:
容许员工打酱油的时间。公司招了实习生以后,若是发现一段时间(keepAliveTime)内实习生没活干,在那偷懒刷什么掘金沸点的时候,就会把他辞掉。固然核心程序员抱着的也不必定是铁饭碗,若公司采起了节省成本的经营策略(ThreadPoolExecutor.allowCoreThreadTimeOut设为true),核心程序员一段时间没活干也同样会被裁人
线程的闲置时长,默认状况下此参数只做用于非核心线程,即非核心线程闲置时间超过keepAliveTime后就会被回收。但若是ThreadPoolExecutor.allowCoreThreadTimeOut设为true,则参数一样能够做用于核心线程
TimeUnit unit:
上面时间参数的单位,有纳秒、微秒、毫秒、秒、分、时、天
可供选择的单位类型有:
TimeUnit.NANOSECONDS:纳秒
TimeUnit.MICROSECONDS:微秒
TimeUnit.MILLISECONDS:毫秒
TimeUnit.SECONDS:秒
TimeUnit.MINUTES:分
TimeUnit.HOURS:小时
TimeUnit.DAYS:天
BlockingQueue<Runnable> workQueue:
储备任务的队列
线程池中的任务队列,该队列主要用来存储已经提交但还没有分配给线程执行的任务。BlockingQueue,即阻塞队列,可供传入的队列类型有:
• ArrayBlockingQueue:基于数组的阻塞队列
• LinkedBlockingQueue:基于链表的阻塞队列
• PriorityBlockingQueue:基于优先级的阻塞队列
• DelayQueue:基于延迟时间优先级的阻塞队列
• SynchronousQueue:基于同步的阻塞队列
咱们在下面的章节中将会详细对比以上这几种队列的区别。此外,还需注意传入任务的都需实现Runnable接口
ThreadFactory threadFactory:
线程工厂接口,只有一个new Thread(Runnable r)方法,能够为线程池建立新线程。系统为咱们提供了默认的threadFactory:Executors.defaultThreadFactory(),咱们通常使用默认的就能够了
RejectedExecutionHandler handler:
拒绝策略,默认使用ThreadPoolExecutor.AbortPolicy,当新任务被拒绝时会将抛出RejectExecutorException异常。此外还有3种策略可供选择:CallerRunsPolicy,DiscardPolicy和DiscardOldestPolicy
当咱们使用submit或者execute方法将任务提交到线程池时,线程池遵循如下策略将任务分配给相应线程去执行(任务队列使用最基本的ArrayBlockingQueue)
提交任务后,若是线程池中的线程数未达到核心线程的数量(corePoolSize),则会建立一个核心线程去执行
举个栗子,咱们设置任务数(taskSize)为3,核心线程数(corePoolSize)为5,则 taskSize < corePoolSize,线程池会建立3条核心线程去执行任务(假设每一个任务都须要必定时间才能完成)
public class ExecutorTest {
//省略部分代码...
private static int taskSize = 3;//任务数
private static int corePoolSize = 5;//核心线程的数量
private static int maximumPoolSize = 20;//线程数的最大值
private static int queueSize = 128;//可储存的任务数
public static class TestTask implements Runnable {
public void run() {
if (taskSize > 0) {
try{
Thread.sleep(500);//模拟开发时间
System.out.println(getTime() + getName(Thread.currentThread().getName())
+ " 完成一个开发任务,编号为t" + (taskSize--)
);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
public static void main(String args[]){
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize)
);
TestTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
task = new TestTask();
executor.execute(task);
}
executor.shutdown();
}
}
复制代码
运行结果见下图(请忽略任务编号,如今还没用到)
提交任务后,若是线程池中的线程数已经达到核心线程的数量(corePoolSize),但任务队列(workQueue)中存储的任务数未达到最大值,则将任务存入任务队列中等待执行
咱们将任务数设为10,核心线程数设为3,任务队列的最大值设为7,此时将任务分配给核心线程后恰好能够填满任务队列
private static int taskSize = 10;//任务数
private static int corePoolSize = 3;//核心线程的数量
private static int maximumPoolSize = 10;//线程数的最大值
private static int queueSize = 7;//可储存的任务数
复制代码
运行结果见下图
提交任务后,若是线程池中的线程数达到核心线程数但未超过线程数的最大值,同时任务队列中的任务数已达到最大值,则建立一个非核心线程来执行任务
咱们将以前的任务数改成12,其余数值不变,那么将会有两位实习生参与到开发中(taskSize - (corePoolSize + queueSize) = 2)
private static int taskSize = 12;//任务数
private static int corePoolSize = 3;//核心线程的数量
private static int maximumPoolSize = 10;//线程数的最大值
private static int queueSize = 7;//可储存的任务数
复制代码
运行结果见下图(多了实习生4和5)
加...加班???
正确答案应该是推掉!拒绝!
提交任务后,若线程池中的线程数已达到最大值,且全部线程均在执行任务,任务队列也饱和了,则拒绝执行该任务,并根据拒绝策略执行相应操做
上个例子中一共建立了5条线程(3核心线程、2非核心线程),那么此次咱们只将线程数的最大值改成4,采用默认的拒绝策略
private static int taskSize = 12;//任务数
private static int corePoolSize = 3;//核心线程的数量
private static int maximumPoolSize = 4;//线程数的最大值
private static int queueSize = 7;//可储存的任务数
public static void main(String args[]){
//省略部分代码...
for (int i = 0; i < size; i++) {
executor.execute(task);
System.out.println("接到任务 " + i);
}
executor.shutdown();
}
复制代码
运行结果见下图,能够看见线程池只接收了11个任务(maximumPoolSize + queueSize = 11 ),在提交第12个任务后会抛出RejectedExecutionException的异常
另外须要注意的是,若是咱们在提交任务时抛出了异常,那么以后调用的shutdown()将变为无效代码,线程池将一直运行在主线程中没法关闭
在 corePoolSize = 0 的条件下,提交任务后,若任务队列中的任务数仍未达到最大值,线程池只会建立一条非核心线程来执行任务
private static int taskSize = 9;//任务数
private static int corePoolSize = 0;//核心线程的数量
private static int maximumPoolSize = 5;//线程数的最大值
private static int queueSize = 10;//可储存的任务数
复制代码
运行结果见下图
BlockingQueue是一个接口,它提供了3个添加元素方法:
3个删除元素的方法:
咱们以前讲到的5种类型的队列实际上都是BlockingQueue的实现类,本篇博客不会具体分析源码的实现,咱们只对比它们使用上的区别:
基于数组的阻塞队列,ArrayBlockingQueue内部维护了一个由大小固定的数组构成的数据缓冲队列,数组大小在队列初始化时就须要指定。而存储在ArrayBlockingQueue中的元素需按照FIFO(先进先出)的方式来进行存取。ArrayBlockingQueue的构造方法以下
ArrayBlockingQueue(int capacity)
ArrayBlockingQueue(int capacity, boolean fair)
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
复制代码
咱们这里仅以一个参数的构造方法为例,使用ArrayBlockingQueue时的任务执行策略和具体使用示例见上一节,就不重复再讲一次了
基于链表(单向链表)的阻塞队列,和ArrayBlockingQueue相似,其内部维护着一个由单向链表构成的数据缓冲队列。区别于ArrayBlockingQueue,LinkedBlockingQueue在初始化的时候能够不用设置容量大小,其默认大小为Integer.MAX_VALUE(即2的31次方-1,表示 int 类型可以表示的最大值)。若设置了大小,则使用起来和ArrayBlockingQueue同样。LinkedBlockingQueue的构造方法以下
LinkedBlockingQueue()
LinkedBlockingQueue(int capacity)
LinkedBlockingQueue(Collection<? extends E> c)
复制代码
这里参数和以前讲的同样,并且使用方法和ArrayBlockingQueue大同小异,就不赘述了
基于优先级的阻塞队列,用法相似于LinkedBlockingQueue,区别在于其存储的元素不是按照FIFO排序的,这些元素的排序规则得由咱们本身来定义:全部插入PriorityBlockingQueue的对象元素必须实现Comparable(天然排序)接口,咱们对Comparable接口的实现定义了队列优先级的排序规则
PriorityBlockingQueue的队列容量是“无界”的,由于新任务进来时若是发现已经超过了队列的初始容量,则会执行扩容的操做。这意味着若是corePoolSize > 0,线程池中的线程数达到核心线程数的最大值,且任务队列中的任务数也达到最大值,这时新的任务提交进来,线程池并不会建立非核心线程来执行新任务,而是对任务队列进行扩容
更加具体的内容你们能够去研究一下这篇篇博客:并发队列 – 无界阻塞优先级队列 PriorityBlockingQueue 原理探究
PriorityBlockingQueue的构造方法以下
PriorityBlockingQueue()
PriorityBlockingQueue(int initialCapacity)
PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
PriorityBlockingQueue(Collection<? extends E> c)
复制代码
重复的参数就不解释了
下面咱们来看看具体示例(以最简单的场景体验一下使用过程): 咱们将核心程序员的数量(corePoolSize)设为0,任务队列的容量使用默认的(11),这时公司接到了12个开发任务,项目经理会根据任务的优先级对任务执行的顺序进行排序,而后分配给实习生(公司只能招到一个实习生,缘由咱们在解析任务执行策略的时候已经讲过了)
public class ExecutorTest {
//省略部分代码...
private static int taskSize = 9;//任务数
private static int corePoolSize = 0;//核心线程的数量
private static int maximumPoolSize = 5;//线程数的最大值
private static int queueSize = 10;//可储存的任务数
public static class PriorityTask implements Runnable,Comparable<PriorityTask>{
private int priority;
public PriorityTask(int priority) {
this.priority = priority;
}
@Override
public void run() {
if (taskSize > 0) {
try{
Thread.sleep(1000);//模拟开发时间
System.out.println(getTime() + getName(Thread.currentThread().getName())
+ " 完成一个开发任务,编号为t" + (taskSize--) + ", 优先级为:" + priority
);
}catch (Exception e){
e.printStackTrace();
}
}
}
@Override
public int compareTo(PriorityTask task) {
if(this.priority == task.priority){
return 0;
}
return this.priority<task.priority?1:-1;//优先级大的先执行
}
}
public static void main(String args[]){
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
1,
TimeUnit.SECONDS,
new PriorityBlockingQueue<Runnable>(queueSize)
);
Random random = new Random();
PriorityTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
int p = random.nextInt(100);
task = new PriorityTask(p);
executor.execute(task);
System.out.println("接到任务 " + i + ",优先级为:" + p);
}
executor.shutdown();
}
}
复制代码
运行结果见下图
基于延迟时间优先级的阻塞队列,DelayQueue和PriorityBlockingQueue很是类似,一样是“无界”队列(DelayQueue不须要设置初始容量大小),一样基于优先级进行排序。但有一点不一样,DelayQueue中的元素必须实现 Delayed接口(Delayed继承自Comparable接口),咱们需重写Delayed.getDelay方法为元素的释放(执行任务)设置延迟(getDelay方法的返回值是队列元素被释放前的保持时间,若是返回0或一个负值,就意味着该元素已经到期须要被释放,所以咱们通常用完成时间和当前系统时间做比较)。DelayQueue的构造方法以下
DelayQueue()
DelayQueue(Collection<? extends E> c)
复制代码
此次咱们将延迟时间当成是任务开发时间,设置开发时间越短的任务优先级越高
public class ExecutorTest {
//省略部分代码...
private static int taskSize = 5;//任务数
private static int corePoolSize = 0;//核心线程的数量
private static int maximumPoolSize = 5;//线程数的最大值
public static class DelayTask implements Runnable,Delayed{
private long finishTime;
private long delay;
public DelayTask(long delay){
this. delay= delay;
finishTime = (delay + System.currentTimeMillis());//计算出完成时间
}
@Override
public void run() {
if (taskSize > 0) {
try{
System.out.println(getTime() + getName(Thread.currentThread().getName())
+ " 完成一个开发任务,编号为t" + (taskSize--) + ", 用时:" + delay/1000
);
}catch (Exception e){
e.printStackTrace();
}
}
}
@Override
public long getDelay(@NonNull TimeUnit unit) {
//将完成时间和当前时间做比较,<=0 时说明元素到期需被释放
return (finishTime - System.currentTimeMillis());
}
@Override
public int compareTo(@NonNull Delayed o) {
DelayTask temp = (DelayTask) o;
return temp.delay < this.delay?1:-1;//延迟时间越短优先级越高
}
}
public static void main(String args[]){
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
1,
TimeUnit.SECONDS,
new DelayQueue()
);
Random random = new Random();
DelayTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
long d = 1000 + random.nextInt(10000);
task = new DelayTask(d);
executor.execute(task);
System.out.println("接到任务 " + i + ",预计完成时间为:" + d/1000);
}
executor.shutdown();
}
}
复制代码
运行结果以下
基于同步的阻塞队列,这是一个很是特殊的队列,由于它内部并没有数据缓存空间,元素只有在试图取走的时候才有可能存在。也就是说,若是在插入元素时后续没有执行取出的操做,那么插入的行为就会被阻塞,若是SynchronousQueue是在线程池中使用的,那么这种场景下就会抛出RejectedExecutionException异常。可能这么解释有点绕,下面咱们会经过讲解示例辅助你们理解,先来看构造方法
SynchronousQueue()
SynchronousQueue(boolean fair)
复制代码
一样的,参数和以前同样,就不解释了,咱们来看示例:
采用了SynchronousQueue的策略后,任务队列不能储存任务了。这意味着若是接到新任务时发现没人有空来开发(程序员手上都有任务,公司招人名额也满了),那这个新任务就泡汤了(抛出异常)
例如咱们将核心程序员的数量(corePoolSize)设为3,程序员总数(maximumPoolSize)设为9,而任务数(taskSize)设为10
public class ExecutorTest {
//省略部分代码...
private static int taskSize = 10;//任务数
private static int corePoolSize = 3;//核心线程的数量
private static int maximumPoolSize = 9;//线程数的最大值
public static void main(String args[]){
ThreadPoolExecutor executor =
new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
1,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()
);
TestTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
task = new TestTask();
executor.execute(task);
System.out.println("接到任务 " + i);
}
executor.shutdown();
}
}
复制代码
在招满人的状况下,公司最多就9个程序员,当接到第10个任务时,发现没人可用了,就会抛出异常。固然,以前成功接收的任务不会受到影响
所以根据SynchronousQueue的特性,在使用SynchronousQueue时一般会将maximumPoolSize设为“无边界”,即Integer.MAX_VALUE(在系统为咱们预设的线程池中,CachedThreadPool就是这么设置的,具体的咱们后面再细说)
前面讲了这么多,其实都是教你们如何自定义一个线程池。系统为了方便咱们进行开发,早已封装好了各类线程池供咱们使用。咱们能够用Executors.newXXX的方式去实例化咱们须要的线程池。可供选择的线程池种类不少:
咱们挑其中经常使用的4种讲讲就行(其实各类线程池的区别只是构建线程池时传入的参数不一样而已,通过以前咱们对任务执行策略和各类任务队列的讲解后,理解不一样种类的线程池就变得很是简单了。这也正是博主要花费那么长的篇幅给你们举例子的缘由,但愿你们都能看得懂吧~)
咱们直接看系统是如何封装的
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()
);
}
复制代码
核心线程数为0,线程总数设为Integer.MAX_VALUE,线程的闲置时长为60s,任务队列为SynchronousQueue(同步队列),结合咱们以前说的,能够总结出CachedThreadPool的特色以下:
使用示例以下,咱们设置任务数为10
ExecutorService service = Executors.newCachedThreadPool();
TestTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
task = new TestTask();
service.execute(task);
}
service.shutdown();
复制代码
源码以下
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
);
}
复制代码
核心线程数由咱们自定义,最大线程数和核心线程数相等,线程闲置时间为0,任务队列为LinkedBlockingQueue,因此FixedThreadPool的特色以下:
使用示例以下,咱们设置任务数为10,核心线程数为5
ExecutorService service = Executors.newFixedThreadPool(corePoolSize);
TestTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
task = new TestTask();
service.execute(task);
}
service.shutdown();
复制代码
源码以下
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
复制代码
核心线程数为1,最大线程数也是1,线程闲置时间为0,任务队列为LinkedBlockingQueue,且SingleThreadExecutor是FinalizableDelegatedExecutorService类的实例,因此SingleThreadExecutor的特色以下:
使用示例以下,咱们设置任务数为10
ExecutorService service = Executors.newSingleThreadExecutor();
TestTask task;
int size = taskSize;
for (int i = 0; i < size; i++) {
task = new TestTask();
service.execute(task);
}
service.shutdown();
复制代码
源码以下
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(
corePoolSize,
Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, //DEFAULT_KEEPALIVE_MILLIS = 10L
MILLISECONDS,
new DelayedWorkQueue()
);
}
复制代码
核心线程数由咱们自定义,最大线程数为Integer.MAX_VALUE,线程闲置时长为10毫秒,任务队列采用了DelayedWorkQueue(和DelayQueue很是像)。ScheduledThreadPool的特色以下:
使用示例以下,咱们调用ScheduledExecutorService.schedule方法提交延迟启动的任务,延迟时间为3秒:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
Runnable runnable = new Runnable(){
@Override
public void run() {
System.out.println("开始执行任务,时间:" + getTime());
}
};
scheduledExecutorService.schedule(runnable,3,TimeUnit.SECONDS);
System.out.println("提交任务,时间:" + getTime());
复制代码
此外还有scheduleAtFixedRate,scheduleWithFixedDelay等提交任务的方法,就不一一举例了
一、咱们除了用execute方法提交任务之外,还可使用submit方法。submit方法提交的任务需实现Callable接口(有关Callable的知识能够看下我上一篇博客:大话Android多线程(四) Callable、Future和FutureTask),所以其具备返回值
二、线程池有两种手动关闭的方法:
emmmm...基本就这些内容了,博主已经尽量地覆盖线程池的全部知识了(除了源码解析,之后有机会会出一个单章分析下源码),如有什么遗漏或者建议的欢迎留言评论。若是以为博主写得还不错麻烦点个赞,大家的支持是我最大的动力~