为了更好地控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效的进行线程控制。它们都在java.util.concurrent包中,是JDK并发包的核心。其中有一个比较重要的类:Executors,它扮演着线程工厂的角色,咱们经过Executors能够建立特定功能的线程池。
Executors建立线程池方法:java
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
使用例子:缓存
class Temp extends Thread { @Override public void run() { System.out.println("run..."); } } public class ScheduledJob { public static void main(String[] args) { Temp command = new Temp(); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); scheduledExecutorService.scheduleWithFixedDelay(command, 1, 3, TimeUnit.SECONDS); //1秒后执行线程,以后每隔3秒轮询 } }
运行结果:
run...
run...
run...
run...
run...
run...
run...
...多线程
若Executors工厂类没法知足咱们的需求,能够本身去建立自定义的线程池。其实Executors工厂类里面的建立线程方法其内部实现均是用了ThreadPoolExecutor这个类,这个类能够自定义线程,构造方法以下:并发
public ThreadPoolExecutor(int corePoolSize, //表示当前建立的核心线程数 int maximumPoolSize, //表示最大线程数 long keepAliveTime, //线程池空闲时存活时间 TimeUnit unit, //指定时间单位 BlockingQueue<Runnable> workQueue, //缓存队列 ThreadFactory threadFactory, // RejectedExecutionHandler handler) { //拒绝执行的方法 ... ... }
这个构造方法对于队列是什么类型的比较关键:框架
public class UserThreadPoolExecutor { public static void main(String[] args) { /** * 使用有界任务队列时:如有新的任务须要执行,若是线程池实际线程数小于corePoolSize,则优先建立线程; * 若大于corePoolSize,则会将任务加入队列, * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,建立新的线程, * 若线程数大于maximumPoolSize,则执行拒绝策略。 */ ThreadPoolExecutor pool = new ThreadPoolExecutor( 1, //coreSize 2, //maxSize 60, //无效时间 TimeUnit.SECONDS, //单位 new ArrayBlockingQueue<Runnable >(3) //有界队列 ); pool.execute(new MyTask(1, "任务1")); pool.execute(new MyTask(2, "任务2")); pool.execute(new MyTask(3, "任务3")); pool.execute(new MyTask(4, "任务4")); pool.execute(new MyTask(5, "任务5")); pool.execute(new MyTask(6, "任务6")); pool.shutdown(); } }
运行结果:
run taskId = 1
run taskId = 5
Exception in thread "main"
java.util.concurrent.RejectedExecutionException: Task Thread[Thread-5,5,main] rejected from java.util.concurrent.ThreadPoolExecutor@70dea4e[Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at Executor.UserThreadPoolExecutor.main(UserThreadPoolExecutor.java:28)
run taskId = 2
run taskId = 3
run taskId = 4
分析:
任务数大于coreSize(目前为1),则有任务加入队列,又队列(队列容量为3)已满,则建立一个线程(目前coreSize为2),因为maxSize为2,全部最多只能再建立一个线程到线程池(目前coreSize+queue=5小于任务数6),没法再建立线程,执行拒绝策略ide
public class UserThreadPoolExecutor2 implements Runnable{ /** * 当有新任务到来,系统线程数小于corePoolSize时,则新建线程执行任务, * 当达到corePoolSize后,就不会继续增长, * 若后续仍有新的任务加入,而又没有空闲的线程资源,则任务直接进入队列等待。 * 若任务建立和处理的速度差别很大,无界队列会保持快速增加,知道耗尽系统内存 */ private static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); ThreadPoolExecutor pool = new ThreadPoolExecutor( 5, 10, 120L, //2分钟 TimeUnit.SECONDS, queue ); for (int i = 0; i < 20; i++) { pool.execute(new UserThreadPoolExecutor2()); } Thread.sleep(1000); System.out.println("queue size : " + queue.size()); Thread.sleep(2000); //pool.shutdown(); } @Override public void run() { try { int num = count.incrementAndGet(); System.out.println("任务" + num); Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
运行结果:
任务2
任务4
任务1
任务3
任务5
queue size : 15
任务6
任务8
任务7
任务9
任务10
任务11
任务12
任务13
任务14
任务15
任务16
任务18
任务19
任务17
任务20线程
JDK拒绝策略:日志
若是须要自定义拒绝策略,能够实现RejectedExecutionHandle接口:code
public class MyReject implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("自定义处理..."); System.out.println("当前被拒绝任务为:" + r.toString()); //记录日志,等待其余时间处理 } } public class UserThreadPoolExecutor1 { public static void main(String[] args) throws InterruptedException { /** * 使用有界任务队列时:如有新的任务须要执行,若是线程池实际线程数小于corePoolSize,则优先建立线程; * 若大于corePoolSize,则会将任务加入队列, * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,建立新的线程, * 若线程数大于maximumPoolSize,则执行拒绝策略。 */ ThreadPoolExecutor pool = new ThreadPoolExecutor( 1, //coreSize 2, //maxSize 60, //无效时间 TimeUnit.SECONDS, //单位 new ArrayBlockingQueue<Runnable>(3), //有界队列 new MyReject() ); pool.execute(new MyTask(1, "任务1")); pool.execute(new MyTask(2, "任务2")); pool.execute(new MyTask(3, "任务3")); pool.execute(new MyTask(4, "任务4")); pool.execute(new MyTask(5, "任务5")); pool.execute(new MyTask(6, "任务6")); pool.shutdown(); } }
运行结果:
自定义处理...
当前被拒绝任务为:Thread[Thread-5,5,main]
run taskId = 5
run taskId = 1
run taskId = 2
run taskId = 3
run taskId = 4对象