文中部分代码使用 lambda 表达式以简化代码。html
Java中的线程池是运用场景最多的并发框架,几乎全部须要异步或并发执行任务的程序均可以使用线程池。在开发过程当中,合理地使用线程池可以带来3个好处。java
线程生命周期数组
当用new操做符建立一个线程时, 例如new Thread(r),线程尚未开始运行,此时线程处在新建状态。缓存
new Thread(() -> System.out.println("run 任务执行"))
一个新建立的线程并不自动开始运行,要执行线程,必须调用线程的start()方法。处于就绪状态的线程并不必定当即运行run()方法,线程还必须同其余线程竞争CPU时间,只有得到CPU时间才能够运行线程。安全
new Thread(() -> System.out.println("run 任务执行")).start();
当线程得到CPU时间后,它才进入运行状态,真正开始执行run()方法。也就是 System.out.println("run 任务执行") 。多线程
run 任务执行
线程运行过程当中,可能因为各类缘由进入阻塞状态 :并发
线程执行了Thread.sleep(int n)方法,线程放弃CPU,睡眠n毫秒,而后恢复运行。框架
线程要执行一段同步代码,因为没法得到相关的同步锁,只好进入阻塞状态,等到得到了同步锁,才能恢复运行。异步
线程执行了一个对象的wait()方法,进入阻塞状态,只有等到其余线程执行了该对象的notify()或notifyAll()方法,才可能将其唤醒。ide
有两个缘由会致使线程死亡:
使用 isAlive() 方法可判断线程在当前是否存活着
在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为表明的高性能队列非阻塞队列,一个是以BlockingQueue接口为表明的阻塞队列,不管哪一种都继承自Queue。
重要方法:
适用于高并发场景下的队列,经过无锁的方式,实现了高并发状态下的高性能,一般ConcurrentLinkedQueue性能好于BlockingQueue.它是一个基于连接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最早
加入的,尾是最近加入的,该队列不容许null元素
public static void main(String[] args) { // 非阻塞式用法 无界队列 - 先进先出的原则 ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue<String>(); concurrentLinkedQueue.offer("第一个进队列"); concurrentLinkedQueue.offer("第二个进队列"); // 获取单个队列信息 peek 获取队列 System.out.println(concurrentLinkedQueue.peek()); System.out.println(concurrentLinkedQueue.size()); // 获取单个队列信息 poll 获取队列以后 会删除队列信息 移除 System.out.println(concurrentLinkedQueue.poll()); System.out.println(concurrentLinkedQueue.size()); }
第一个进队列 2 第一个进队列 1
被阻塞的状况主要有以下两种:
如下简单介绍 BlockingQueue 成员
ArrayBlockingQueue
有边界的阻塞队列,它的内部实现是一个数组,先进先出原则。使用阻塞必须加加超时时间
public static void main(String[] args) throws InterruptedException { // 阻塞式队列 ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3); // 沒有加超时时间都是为非阻塞式 arrayBlockingQueue.offer("张三"); // 获取单个队列信息,而且会删除该队列 System.out.println(arrayBlockingQueue.poll(3, TimeUnit.SECONDS)); System.out.println(arrayBlockingQueue.poll(3, TimeUnit.SECONDS)); System.out.println(arrayBlockingQueue.size()); }
new ArrayBlockingQueue<>(3) 设定(队列)有界大小3。
offer("张三") - “张三“ 进入队列
第一个 poll("张三") 取出数据,“张三”从队列中移除
第二个 poll("张三") ,此时队列已经没有数据了会阻塞等待3秒,若是队列在这3秒内有数据入队列会当即取出数据并结束阻塞,3秒阻塞超时就返回null
张三 null 0
以上示例是 当队列空了的时候进行出队列阻塞
public static void main(String[] args) throws InterruptedException { // 阻塞式队列 ArrayBlockingQueue<Object> arrayBlockingQueue = new ArrayBlockingQueue<>(3); // 沒有加超时时间都是为非阻塞式 arrayBlockingQueue.offer("张一", 3, TimeUnit.SECONDS); arrayBlockingQueue.offer("张二", 3, TimeUnit.SECONDS); arrayBlockingQueue.offer("张三", 3, TimeUnit.SECONDS); arrayBlockingQueue.offer("张四", 3, TimeUnit.SECONDS); // 获取单个队列信息,而且会删除该队列 System.out.println(arrayBlockingQueue.poll(3, TimeUnit.SECONDS)); System.out.println(arrayBlockingQueue.size()); }
new ArrayBlockingQueue<>(3) 设定(队列)有界大小3。
offer("张一") - “张一“ 进入队列
offer("张二") - “张二“ 进入队列
offer("张三") - “张三“ 进入队列 , 此时队列已满
offer("张四",3, TimeUnit.SECOND) 此时队列已满 , 这里给了3秒阻塞时间等待数据出列才有位置,期间若是有数据出列当即添加“张四”入队列并结束阻塞,若是没数据出列阻塞超时会废弃当前数据“张四”
张一 2
以上示例是 当队列满了的时候进行入队列阻塞
常见的阻塞式队列以下:
有兴趣能够去实践下
Executor框架的最顶层实现是ThreadPoolExecutor类,Executors工厂类中提供的newScheduledThreadPool、newFixedThreadPool、newCachedThreadPool 静态方法其实也只是ThreadPoolExecutor的构造函数参数不一样而已。经过传入不一样的参数,就能够构造出适用于不一样应用场景下的线程池,来看下构造的参数。
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { ... }
这里用 可缓存的 newCachedThreadPool 线程池来分析
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newCachedThreadPool的corePoolSize被设置为0, maximumPoolSize被设置为Integer.MAX_VALUE, 即maximum是无界的。这里keepAliveTime设置为60秒,意味着空闲的线程最多能够等待任务60秒,不然将被回收。
newCachedThreadPool使用没有容量的SynchronousQueue做为主线程池的工做队列,它是一个不存储元素阻塞队列。
线程池中的线程是被线程池缓存了的,也就是说,线程没有任务要执行时,便处于空闲状态,处于空闲状态的线程并不会被当即销毁(会被缓存住),只有当空闲时间超出一段时间(默认为60s)后,线程池才会销毁该线程(至关于清除过期的缓存)。新任务到达后,线程池首先会让被缓存住的线程(空闲状态)去执行任务,若是没有可用线程(无空闲线程),便会建立新的线程。
根据特性可知 :
- 长时间不提交任务的CachedThreadPool不会占用系统资源
- 极端状况下,CachedThreadPool会由于建立过多线程而耗尽CPU资源及内存
代码示例
public static void main(String[] args) { // 建立线程(四种方式) 1.可缓存、定长、定时、单例 ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 100; i++) { final int temp = i; // execute方法做用: 执行任务 newCachedThreadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + ",i:" + temp); }); } }
newCachedThreadPool 线程池执行 100 次任务
pool-1-thread-3,i:2 pool-1-thread-1,i:0 pool-1-thread-2,i:1 pool-1-thread-4,i:3 pool-1-thread-5,i:4 pool-1-thread-7,i:6 pool-1-thread-6,i:5 pool-1-thread-9,i:8 pool-1-thread-10,i:9 pool-1-thread-8,i:7 pool-1-thread-11,i:10 pool-1-thread-1,i:21 pool-1-thread-3,i:20
能够看到线程 pool-1-thread-三、pool-1-thread-1 被重复使用了
定长的 newFixedThreadPool 线程池
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor( nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
能够看到 newFixedThreadPool 的核心线程和最大的线程都是同样的,最多3个线程将处于活动状态。若是提交了3个以上的线程,那么它们将保持在队列中,直到线程可用。
代码示例
public static void main(String[] args) { // 可固定长度的线程池 核心线程数 为3 最多建立3个線程 ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { final int temp = i; // execute方法做用: 执行任务 newFixedThreadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + ",i:" + temp); }); } }
pool-1-thread-3,i:2 pool-1-thread-2,i:1 pool-1-thread-1,i:0 pool-1-thread-3,i:3 pool-1-thread-2,i:5 pool-1-thread-3,i:6 pool-1-thread-1,i:4 pool-1-thread-3,i:8 pool-1-thread-2,i:7 pool-1-thread-1,i:9
能够看到始终只有pool-1-thread-一、pool-1-thread-二、pool-1-thread-3 三个线程
可定时的 newScheduledThreadPool 线程池
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
newScheduledThreadPool 的核心线程为 传入的corePoolSize,最大线程为Integer.MAX_VALUE , DelayedWorkQueue队列实现BlockingQueue接口,因此使用阻塞式的BlockingQueue队列。
代码示例
public static void main(String[] args) { // 可定时线程池 3核心线程数 ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(3); newScheduledThreadPool.scheduleAtFixedRate( // Runnable 任务 () -> System.out.println(Thread.currentThread().getName() + "run "), 1000, // 初次执行需等待时间 100, TimeUnit.MILLISECONDS); // 周期执行时间(3个线程抢cpu时间) }
pool-1-thread-1run pool-1-thread-2run pool-1-thread-3run pool-1-thread-3run pool-1-thread-2run
每 100 毫秒执行一次
单例的 newSingleThreadExecutor 线程池
代码示例
public static void main(String[] args) { // 单线线程 ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int temp = i; // execute方法做用: 执行任务 newSingleThreadExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + ",i:" + temp)); } }
pool-1-thread-1,i:0 pool-1-thread-1,i:1 pool-1-thread-1,i:2 pool-1-thread-1,i:3 pool-1-thread-1,i:4 pool-1-thread-1,i:5 pool-1-thread-1,i:6 pool-1-thread-1,i:7 pool-1-thread-1,i:8 pool-1-thread-1,i:9
始终只有一个线程执行任务
提交一个任务到线程池中,线程池的处理流程以下:
1 .判断线程池里的核心线程是否都在执行任务,若是不是(核心线程空闲或者还有核心线程没有被建立)则建立一个新的工做线程来执行任务。若是核心线程都在执行任务,则进入下个流程。
2 .线程池判断工做队列是否已满,若是工做队列没有满,则将新提交的任务存储在这个工做队列里。若是工做队列满了,则进入下个流程。
3 . 判断线程池里的线程是否都处于工做状态,若是没有,则建立一个新的工做线程来执行任务。若是已经满了,则交给饱和策略来处理这个任务。
给你们推荐一个交流学习qq群:698581634 进群便可免费获取资料。
上面列举到 newCachedThreadPool、newFixedThreadPool ... 底层都是对ThreadPoolExecutor的构造器进行包装使用。在来看下
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
因此我们若是要实现一个自定义线程池就 直接 newThreadPoolExecutor(...) 就就行,如今来自定义一个。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4));
这里自定义线程池 :核心线程1个,最大建立2个线程,60s等待超时销毁,使用阻塞式有界ArrayBlockingQueue 队列,最多缓存4个任务。
public class CustomThread { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(4)); for (int i = 1; i <= 6; i++) { TaskThred t1 = new TaskThred("任务" + i); executor.execute(t1); } executor.shutdown(); } } class TaskThred implements Runnable { private String taskName; public TaskThred(String taskName) { this.taskName = taskName; } @Override public void run() { System.out.println(Thread.currentThread().getName() + taskName); } }
execute 执行6个任务
pool-1-thread-1任务1 pool-1-thread-2任务6 pool-1-thread-2任务2 pool-1-thread-1任务4 pool-1-thread-1任务5 pool-1-thread-2任务3
RejectedExecutionException
缘由:
java.util.concurrent.RejectedExecutionException: Task com.snail.demo.TaskThred@74a14482 rejected from java.util.concurrent.ThreadPoolExecutor@1540e19d[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
要出现这个异常只需将上边 new ArrayBlockingQueue<>(4) 大小 4 改成 2,以下
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2));
原文连接:https://www.jianshu.com/p/3a3b00bb6ab9?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io