在实际使用中,线程是很占用系统资源的,若是对线程管理不完善的话很容易致使系统问题。所以,在大多数并发框架中都会使用线程池来管理线程,使用线程池管理线程主要有以下好处:java
流程图:数组
线程池执行所提交的任务过程:缓存
▪ 一、好比咱们设置核心线程池的数量为30个,无论有没有用户链接,咱们老是保证30个链接,这个就是核心线程数,这里的核心线程数不必定是30你能够根据你的需求、业务和并发访问量来设置,先判断线程池中核心线程池全部的线程是否都在执行任务,若是不是,则新建立一个线程执行刚提交的任务,不然,核心线程池中全部的线程都在执行任务,则进入第2步;并发
▪ 二、若是咱们核心线程数的30个数量已经满了,就须要到阻塞队列中去查看,判断当前阻塞队列是否已满,若是未满,则将提交的任务放置在阻塞队列中等待执行;不然,则进入第3步;框架
▪ 三、判断线程池中全部的线程是否都在执行任务,若是没有,则建立一个新的线程来执行任务,不然,则交给饱和策略进行处理,也叫拒绝策略,等下咱们会有详细的介绍ide
注意: 这里有一个核心线程数和一个线程池数量,这两个是不一样的概念,核心线程数表明我可以维护经常使用的线程开销,而线程池数量则表明我最大可以建立的线程数,例如在咱们农村每家每户都有吃水的井,基本上有半井深的水就能够维持咱们的平常生活的使用,这里的半井深的水就比如咱们的核心线程数,还有一半的容量是咱们井可以容纳的最大水资源了,超过了就不行,水就会漫出来,这个就相似于咱们的线程池的数量,不知道这里说明你们是否可以更好的进行理解函数
1.newCachedThreadPool: 建立一个可根据须要建立新线程的线程池,可是在之前构造的线程可用时讲重用它们,并在须要时使用提供的ThreadFactory 建立新线程高并发
特征:
(1) 线程池中的数量没有固定,能够达到最大值(Integer.MAX_VALUE=2147483647)
(2) 线程池中的线程可进行缓存重复利用和回收(回收默认时间为1分钟)
(3) 当线程池中,没有可用线程,会从新建立一个线程工具
2.newFixedThreadPool: 建立一个可重用固定线程数的线程池,以共享的***队列方式来运行这些线程,在任意点,在大多数nThreads线程会处于处理任务的活动状态。若是在全部线程处于活动状态时提交附件任务,则在有可用线程以前,附件任务将在队列中等待,若是在关闭前的执行期间因为失败而致使任何线程终止,那么一个新线程将代替它执行后续的任务(若是须要)。在某个线程被显式关闭以前,池中的线程将一直存在性能
特征:
(1) 线程池中的线程处于必定的量,能够很好的控制线程的并发量
(2) 线程能够重复被使用,在显示关闭以前,都将一直存在
(3) 超过必定量的线程被提交时需在队列中等待
3.newSingleThreadExecutor: 建立一个使用单个 worker 线程的Executor ,以***队列方式来运行该线程。(注意,若是由于在关闭前的执行期间出现失败而终止了此单个线程,那么若是须要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,而且在任意给定的时间不会有多个线程是活动的,与其余等效的 newFixedThreadPool(1)
不一样,可保证无需从新配置此方法所返回的执行程序便可使用其余的线程
特征:
(1) 线程池中最多执行一个线程,以后提交的线程将会排在队列中以此执行
4.newSingleThreadScheduledExecutor: 建立一个单线程执行程序,它可安排在给定延迟后运行命令或者按期执行
特征:
(1) 线程池中最多执行一个线程,以后提交的线程活动将会排在队列中依次执行
(2) 可定时或者延迟执行线程活动
5.newScheduledThreadPool: 建立一个线程池,它可安排在给定延迟后运行命令或者按期的执行
特征:
(1) 线程池中具备执行数量的线程,即使是空线程也将保留
(2) 可定时或者延迟执行线程活动
6.newWorkStealingPool: 建立一个带并行级别的线程池,并行级别决定了同一时刻最多有多少个线程在执行,如不传并行级别参数,将默认为当前系统的CPU个数
咱们能够在开发工具中搜索一个叫Executors
的类,在里面咱们能够看到咱们上面全部的使用方法
线程工具类——Task :
public class Task implements Runnable{ @Override public void run() { try { //休眠1秒 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //输出线程名 System.out.println(Thread.currentThread().getName()+"-------running"); } }
源码实现:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
案例:
public class CacheThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 20; i++) { //提交任务 executorService.execute(new Task()); } //启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务 executorService.shutdown(); } }
结果输出:
从开始到结束咱们总共输出了20个(pool-1-thread-1到pool-1-thread-20)线程
pool-1-thread-2-------running pool-1-thread-6-------running pool-1-thread-1-------running pool-1-thread-3-------running pool-1-thread-5-------running pool-1-thread-4-------running pool-1-thread-7-------running pool-1-thread-11-------running pool-1-thread-9-------running pool-1-thread-10-------running pool-1-thread-17-------running pool-1-thread-15-------running pool-1-thread-18-------running pool-1-thread-16-------running pool-1-thread-8-------running pool-1-thread-20-------running pool-1-thread-13-------running pool-1-thread-19-------running pool-1-thread-14-------running pool-1-thread-12-------running
源码实现:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
案例:
public class FixedThreadPoolDemo { public static void main(String[] args) { //建立线程池,最多容许五个线程执行 ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 20; i++) { //提交任务 executorService.execute(new Task()); } //启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务 executorService.shutdown(); } }
输出结果:
咱们能够看到其中的线程是每五个(pool-1-thread-1到pool-1-thread-5)一执行,在当前执行的线程运行中,最多容许五个线程进行执行
pool-1-thread-4-------running pool-1-thread-2-------running pool-1-thread-1-------running pool-1-thread-3-------running pool-1-thread-5-------running pool-1-thread-4-------running pool-1-thread-5-------running pool-1-thread-3-------running pool-1-thread-2-------running pool-1-thread-1-------running pool-1-thread-4-------running pool-1-thread-2-------running pool-1-thread-1-------running pool-1-thread-3-------running pool-1-thread-5-------running pool-1-thread-4-------running pool-1-thread-5-------running pool-1-thread-2-------running pool-1-thread-1-------running pool-1-thread-3-------running
源码实现:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
案例:
public class SingleThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 20; i++) { //提交任务 executorService.execute(new Task()); } //启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务 executorService.shutdown(); } }
结果输出:
咱们能够看到每次都是线程1输出结果
pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running pool-1-thread-1-------running
案例:
public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); // for (int i = 0; i < 20; i++) { System.out.println(System.currentTimeMillis()); scheduledExecutorService.schedule(new Runnable() { @Override public void run() { System.out.println("延迟三秒执行"); System.out.println(System.currentTimeMillis()); } },3, TimeUnit.SECONDS); // } scheduledExecutorService.shutdown(); } }
输出结果:
1606744468814 延迟三秒执行 1606744471815
案例:
public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { int i = 1; @Override public void run() { System.out.println(i); i++; } },0,1, TimeUnit.SECONDS); // scheduledExecutorService.shutdown(); }
输出结果:
1 2 3 4 5
通常来讲线程池只有两种状态,一种是Running
,一种是TERMINATED
,图中间的都是过渡状态
Running
:能接受新提交的任务,而且也能处理阻塞队列中的任务SHUTDOWN
:关闭状态,再也不接受新提交的任务,但却能够继续处理阻塞队列中已保存的任务STOP
:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程TIDYING
:若是全部的任务都已终止了,workerCount(有效线程数)为0.线程池进入该状态后会调用terminated()方法进入TERMINATED状态TERMINATED
:在terminated()方法执行完成后进入该状态,默认terminated()方法中什么也没有作
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize
:核心线程池的大小maximumPoolSize
:线程池能建立线程的最大个数keepAliveTime
:空闲线程存活时间unit
:时间单位,为keepAliveTime指定时间单位workQueue
:阻塞队列,用于保存任务的阻塞队列threadFactory
:建立线程的工程类handler
:饱和策略(拒绝策略)
ArrayBlockingQueue:
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是-个经常使用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着二者没法真正并行运行,这点尤为不一样于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue彻底能够采用分离锁,从而实现生产者和消费者操做的彻底并行运行。Doug Lea之因此没这样去作,也许是由于ArrayBlockingQueue的数据写入和获取操做已经足够轻巧,以致于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上彻底占不到任何便宜。
ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不一样之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,然后者则会生成一个额外的Node对象。这在长时间内须要高效并发地处理大批量数据的系统中,其对于GC的影响仍是存在必定的区别。而在建立ArrayBlockingQueue时,咱们还能够控制对象的内部锁是否采用公平锁,默认采用非公平锁。
LinkedBlockingQueue:
基于链表的阻塞队列,同ArrayListBlockingQueue相似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者当即返回;只有当队列缓冲区达到最大值缓存容量时( LinkedBlockingQueue能够经过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉─份数据,生产者线程会被唤醒,反之对打于消费者这端的处理也基于一样的原理。而
LinkedBlockingQueue之因此可以高效的处理并发数据,还由于其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的状况下生产者和消费者能够并行地操做队列中的数据,以此来提升整个队列的并发性能。
DelayQueue:
DelayQueue中的元素只有当其指定的延迟时间到了,才可以从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,所以往队列中插入数据的操做(生产者)永远不会被阻塞,而只有获取数据的操做(消费者)才会被阻塞。
使用场景︰
DelayQueue使用场景较少,但都至关巧妙,常见的例子好比使用一个DelayQueue来管理一个超时未响应的链接队列。
PriorityBlockingQueue:
基于优先级的阻塞队列(优先级的判断经过构造函数传入的Compator对象来决定),但须要注意的是
PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。所以使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,不然时间一长,会最终耗尽全部的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
SynchronousQueue:
一种无缓冲的等待队列,相似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,若是一方没有找到合适的目标,那么对不起,你们都在集市等待。相对于有缓冲的BlockingQueue来讲,少了一个中间经销商的环节(缓冲区),若是有经销商,生产者直接把产品批发给经销商,而无需在乎经销商最终会将这些产品卖给那些消费者,因为经销商能够库存一部分商品,所以相对于直接交易模式,整体来讲采用中间经销商的模式会吞吐量高一些(能够批量买卖)﹔但另外一方面,又由于经销商的引入,使得产品从生产者到消费者中间增长了额外的交易环节,单个产品的及时响应性能可能会下降。
声明一个SynchronousQueue有两种不一样的方式,它们之间有着不太同样的行为。公平模式和非公平模式的区别:若是采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系总体的公平策略;
但若是是非公平模式 ( SynchronousQueue默认) : SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,然后一种模式,若是生产者和消费者的处理速度有差距,则很容易出现饥渴的状况,便可能有某些生产者或者是消费者的数据永远都得不处处理。
注意:
arrayblockingqueue和linkedblockqueue的区别:1.队列中锁的实现不一样
一、ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;
LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock2.队列大小初始化方式不一样
二、ArrayBlockingQueue实现的队列中必须指定队列的大小;
LinkedBlockingQueue实现的队列中能够不指定队列的大小,可是默认是Integer.MAX_VALUE
ThreadPoolExecutor.AbortPolicy(系统默认): 丢弃任务并抛出RejectedExecutionException异常,让你感知到任务被拒绝了,咱们能够根据业务逻辑选择重试或者放弃提交等策略
ThreadPoolExecutor.DiscardPolicy: 也是丢弃任务,可是不抛出异常,相对而言存在必定的风险,由于咱们提交的时候根本不知道这个任务会被丢弃,可能形成数据丢失。
ThreadPoolExecutor.DiscardOldestPolicy: 丢弃队列最前面的任务,而后从新尝试执行任务(重复此过程),一般是存活时间最长的任务,它也存在必定的数据丢失风险
ThreadPoolExecutor.CallerRunsPolicy: 由调用线程处理该任务
submit是基方法Executor.execute(Runnable)的延伸,经过建立并返回一个Future类对象可用于取消执行和/或等待完成。