ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
- corePoolSize:线程池核心线程数(平时保留的线程数)
- maximumPoolSize:线程池最大线程数(当workQueue都放不下时,启动新线程,最大线程数)
- keepAliveTime:超出corePoolSize数量的线程的保留时间。
- unit:keepAliveTime单位
-
workQueue:阻塞队列,存放来不及执行的线程java
- ArrayBlockingQueue:构造函数必定要传大小
- LinkedBlockingQueue:构造函数不传大小会默认为65536(Integer.MAX_VALUE ),当大量请求任务时,容易形成 内存耗尽。
- SynchronousQueue:同步队列,一个没有存储空间的阻塞队列 ,将任务同步交付给工做线程。
- PriorityBlockingQueue : 优先队列
- threadFactory:线程工厂
-
handler:饱和策略ide
- AbortPolicy(默认):直接抛弃
- CallerRunsPolicy:用调用者的线程执行任务
- DiscardOldestPolicy:抛弃队列中最久的任务
- DiscardPolicy:抛弃当前任务
ThreadPoolExecutor 测试代码
//test.java volatile int finishState = 0; @Test public void test4() throws InterruptedException, ExecutionException { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 7, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10)); ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService(threadPoolExecutor); Runnable runnable = new Runnable() { @Override public void run() { for (int i = 0; i < 50; i++) { String name = "name_" + i; TestCallable testCallable = new TestCallable(name); try { executorCompletionService.submit(testCallable); synchronized (lock) { System.out.print("+++添加任务 name: " + name); System.out.print(" ActiveCount: " + threadPoolExecutor.getActiveCount()); System.out.print(" poolSize: " + threadPoolExecutor.getPoolSize()); System.out.print(" queueSize: " + threadPoolExecutor.getQueue().size()); System.out.println(" taskCount: " + threadPoolExecutor.getTaskCount()); } } catch (RejectedExecutionException e) { synchronized (lock) { System.out.println("拒绝:" + name); } } try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } finishState = 1; } }; Thread addThread = new Thread(runnable); addThread.start(); //System.out.println(" taskCount: " + threadPoolExecutor.getTaskCount()); //添加的任务有被抛弃的。taskCount不必定等于添加的任务。 int completeCount = 0; while (!(completeCount == threadPoolExecutor.getTaskCount() && finishState == 1)) { Future<String> take = executorCompletionService.take(); String taskName = take.get(); synchronized (lock) { System.out.print("---完成任务 name: " + taskName); System.out.print(" ActiveCount: " + threadPoolExecutor.getActiveCount()); System.out.print(" poolSize: " + threadPoolExecutor.getPoolSize()); System.out.print(" queueSize: " + threadPoolExecutor.getQueue().size()); System.out.print(" taskCount: " + threadPoolExecutor.getTaskCount()); System.out.println(" finishTask:" + (++completeCount)); } } addThread.join(); while (threadPoolExecutor.getPoolSize() > 0) { Thread.sleep(1000); synchronized (lock) { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss"); System.out.print(simpleDateFormat.format(new Date())); //System.out.print("name: " + taskName); System.out.print(" ActiveCount: " + threadPoolExecutor.getActiveCount()); System.out.print(" poolSize: " + threadPoolExecutor.getPoolSize()); System.out.print(" queueSize: " + threadPoolExecutor.getQueue().size()); System.out.println(" taskCount: " + threadPoolExecutor.getTaskCount()); } } // Tell threads to finish off. threadPoolExecutor.shutdown(); // Wait for everything to finish. while (!threadPoolExecutor.awaitTermination(10, TimeUnit.SECONDS)) { System.out.println("complete"); } }
//TestCallable.java public class TestCallable implements Callable<String>{ private String name; public TestCallable(String name) { this.name = name; } @Override public String call() throws Exception { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return this.name; } }
执行结果
+++添加任务 name: name_0 ActiveCount: 1 poolSize: 1 queueSize: 0 taskCount: 1 +++添加任务 name: name_1 ActiveCount: 2 poolSize: 2 queueSize: 0 taskCount: 2 +++添加任务 name: name_2 ActiveCount: 3 poolSize: 3 queueSize: 0 taskCount: 3 //corePoolSize=3,因此poolSize逐渐加到3. +++添加任务 name: name_3 ActiveCount: 3 poolSize: 3 queueSize: 1 taskCount: 4 +++添加任务 name: name_4 ActiveCount: 3 poolSize: 3 queueSize: 2 taskCount: 5 +++添加任务 name: name_5 ActiveCount: 3 poolSize: 3 queueSize: 3 taskCount: 6 +++添加任务 name: name_6 ActiveCount: 3 poolSize: 3 queueSize: 4 taskCount: 7 +++添加任务 name: name_7 ActiveCount: 3 poolSize: 3 queueSize: 5 taskCount: 8 +++添加任务 name: name_8 ActiveCount: 3 poolSize: 3 queueSize: 6 taskCount: 9 +++添加任务 name: name_9 ActiveCount: 3 poolSize: 3 queueSize: 7 taskCount: 10 ---完成任务 name: name_0 ActiveCount: 3 poolSize: 3 queueSize: 6 taskCount: 10 finishTask:1 +++添加任务 name: name_10 ActiveCount: 3 poolSize: 3 queueSize: 7 taskCount: 11 ---完成任务 name: name_1 ActiveCount: 3 poolSize: 3 queueSize: 6 taskCount: 11 finishTask:2 +++添加任务 name: name_11 ActiveCount: 3 poolSize: 3 queueSize: 7 taskCount: 12 ---完成任务 name: name_2 ActiveCount: 3 poolSize: 3 queueSize: 6 taskCount: 12 finishTask:3 +++添加任务 name: name_12 ActiveCount: 3 poolSize: 3 queueSize: 7 taskCount: 13 +++添加任务 name: name_13 ActiveCount: 3 poolSize: 3 queueSize: 8 taskCount: 14 +++添加任务 name: name_14 ActiveCount: 3 poolSize: 3 queueSize: 9 taskCount: 15 +++添加任务 name: name_15 ActiveCount: 3 poolSize: 3 queueSize: 10 taskCount: 16 //由于任务队列为:new LinkedBlockingDeque<>(10),core线程不够用时,将任务添加到队列中。 //queueSize到10,队列已满。开始增长poolSize,加到maximumPoolSize=7 +++添加任务 name: name_16 ActiveCount: 4 poolSize: 4 queueSize: 10 taskCount: 17 +++添加任务 name: name_17 ActiveCount: 5 poolSize: 5 queueSize: 10 taskCount: 18 +++添加任务 name: name_18 ActiveCount: 6 poolSize: 6 queueSize: 10 taskCount: 19 +++添加任务 name: name_19 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 20 ---完成任务 name: name_3 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 20 finishTask:4 +++添加任务 name: name_20 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 21 ---完成任务 name: name_4 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 21 finishTask:5 +++添加任务 name: name_21 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 22 ---完成任务 name: name_5 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 22 finishTask:6 +++添加任务 name: name_22 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 23 //poolSize=maximumPoolSize,queueSize=10,都满了。执行饱和策略。默认AbortPolicy抛弃新任务。 拒绝:name_23 拒绝:name_24 拒绝:name_25 ---完成任务 name: name_16 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 23 finishTask:7 +++添加任务 name: name_26 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 24 ---完成任务 name: name_17 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 24 finishTask:8 +++添加任务 name: name_27 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 25 ---完成任务 name: name_18 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 25 finishTask:9 +++添加任务 name: name_28 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 26 ---完成任务 name: name_19 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 26 finishTask:10 +++添加任务 name: name_29 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 27 ---完成任务 name: name_6 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 27 finishTask:11 +++添加任务 name: name_30 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 28 ---完成任务 name: name_7 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 28 finishTask:12 +++添加任务 name: name_31 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 29 ---完成任务 name: name_8 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 29 finishTask:13 +++添加任务 name: name_32 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 30 拒绝:name_33 拒绝:name_34 拒绝:name_35 ---完成任务 name: name_9 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 30 finishTask:14 +++添加任务 name: name_36 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 31 ---完成任务 name: name_10 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 31 finishTask:15 +++添加任务 name: name_37 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 32 ---完成任务 name: name_11 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 32 finishTask:16 +++添加任务 name: name_38 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 33 ---完成任务 name: name_12 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 33 finishTask:17 +++添加任务 name: name_39 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 34 ---完成任务 name: name_13 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 34 finishTask:18 +++添加任务 name: name_40 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 35 ---完成任务 name: name_14 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 35 finishTask:19 +++添加任务 name: name_41 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 36 ---完成任务 name: name_15 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 36 finishTask:20 +++添加任务 name: name_42 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 37 拒绝:name_43 拒绝:name_44 拒绝:name_45 ---完成任务 name: name_20 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 37 finishTask:21 +++添加任务 name: name_46 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 38 ---完成任务 name: name_21 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 38 finishTask:22 +++添加任务 name: name_47 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 39 ---完成任务 name: name_22 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 39 finishTask:23 //消费速度大于添加速度,poolSize并不减小,先把queue中的任务逐一执行完成。 +++添加任务 name: name_48 ActiveCount: 7 poolSize: 7 queueSize: 10 taskCount: 40 ---完成任务 name: name_26 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 40 finishTask:24 ---完成任务 name: name_27 ActiveCount: 7 poolSize: 7 queueSize: 8 taskCount: 40 finishTask:25 +++添加任务 name: name_49 ActiveCount: 7 poolSize: 7 queueSize: 9 taskCount: 41 ---完成任务 name: name_28 ActiveCount: 7 poolSize: 7 queueSize: 8 taskCount: 41 finishTask:26 ---完成任务 name: name_29 ActiveCount: 7 poolSize: 7 queueSize: 7 taskCount: 41 finishTask:27 ---完成任务 name: name_30 ActiveCount: 7 poolSize: 7 queueSize: 6 taskCount: 41 finishTask:28 ---完成任务 name: name_31 ActiveCount: 7 poolSize: 7 queueSize: 5 taskCount: 41 finishTask:29 ---完成任务 name: name_32 ActiveCount: 7 poolSize: 7 queueSize: 4 taskCount: 41 finishTask:30 ---完成任务 name: name_36 ActiveCount: 7 poolSize: 7 queueSize: 3 taskCount: 41 finishTask:31 ---完成任务 name: name_37 ActiveCount: 7 poolSize: 7 queueSize: 2 taskCount: 41 finishTask:32 ---完成任务 name: name_38 ActiveCount: 7 poolSize: 7 queueSize: 1 taskCount: 41 finishTask:33 ---完成任务 name: name_39 ActiveCount: 7 poolSize: 7 queueSize: 0 taskCount: 41 finishTask:34 //ActiveCount开始减小。 ---完成任务 name: name_40 ActiveCount: 6 poolSize: 7 queueSize: 0 taskCount: 41 finishTask:35 ---完成任务 name: name_41 ActiveCount: 5 poolSize: 7 queueSize: 0 taskCount: 41 finishTask:36 ---完成任务 name: name_42 ActiveCount: 4 poolSize: 7 queueSize: 0 taskCount: 41 finishTask:37 ---完成任务 name: name_46 ActiveCount: 3 poolSize: 7 queueSize: 0 taskCount: 41 finishTask:38 ---完成任务 name: name_47 ActiveCount: 2 poolSize: 7 queueSize: 0 taskCount: 41 finishTask:39 ---完成任务 name: name_48 ActiveCount: 1 poolSize: 7 queueSize: 0 taskCount: 41 finishTask:40 ---完成任务 name: name_49 ActiveCount: 0 poolSize: 7 queueSize: 0 taskCount: 41 finishTask:41 22:47:49 ActiveCount: 0 poolSize: 7 queueSize: 0 taskCount: 41 22:47:50 ActiveCount: 0 poolSize: 7 queueSize: 0 taskCount: 41 22:47:51 ActiveCount: 0 poolSize: 7 queueSize: 0 taskCount: 41 22:47:52 ActiveCount: 0 poolSize: 7 queueSize: 0 taskCount: 41 22:47:53 ActiveCount: 0 poolSize: 7 queueSize: 0 taskCount: 41 22:47:54 ActiveCount: 0 poolSize: 7 queueSize: 0 taskCount: 41 22:47:55 ActiveCount: 0 poolSize: 7 queueSize: 0 taskCount: 41 22:47:56 ActiveCount: 0 poolSize: 7 queueSize: 0 taskCount: 41 22:47:57 ActiveCount: 0 poolSize: 5 queueSize: 0 taskCount: 41 //因为keepAliveTime=10,大概10s后poolSize恢复为corePoolSize。 22:47:58 ActiveCount: 0 poolSize: 3 queueSize: 0 taskCount: 41 22:47:59 ActiveCount: 0 poolSize: 3 queueSize: 0 taskCount: 41 22:48:00 ActiveCount: 0 poolSize: 3 queueSize: 0 taskCount: 41
注意
《阿里巴巴java开发手册》函数
线程池不使用 Executors 去建立,而是经过 ThreadPoolExecutor 的方式,这样 的处理方式让写的同窗更加明确线程池的运行规则,规避资源耗尽的风险。测试
说明: Executors 返回的线程池对象的弊端以下:ui
- FixedThreadPool 和 SingleThreadPool : 容许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而致使 OOM 。
- CachedThreadPool 和 ScheduledThreadPool : 容许的建立线程数量为 Integer.MAX_VALUE ,可能会建立大量的线程,从而致使 OOM 。