6.JDK并发包2 ......................................................................................................................1java
1.线程池的基本使用 ......................................................................................................2缓存
1.1. 为何须要线程池...............................................................................................2并发
1.2. JDK为咱们提供了哪些支持 .................................................................................2ide
1.3. 线程池的使用.......................................................................................................2this
1.3.1. 线程池的种类 ...............................................................................................2atom
1.3.2. 不一样线程池的共同性 ...................................................................................2操作系统
1.4. 线程池使用的小例子...........................................................................................2线程
1.4.1. 简单线程池 ...................................................................................................3code
1.4.2. ScheduledThreadPool ....................................................................................3blog
扩展和加强线程池 .....................................................................................................3
2.1. 回调接口...............................................................................................................3
2.2. 拒绝策略...............................................................................................................3
2.3. 自定义ThreadFactory ...........................................................................................3
线程池及其核心代码分析 .........................................................................................
ForkJoin ........................................................................................................................3
4.1. 思想.......................................................................................................................3
4.2. 使用接口...............................................................................................................4
4.2.1. RecursiveAction .............................................................................................4
4.2.2. RecursiveTask ................................................................................................4
4.3. 简单例子...............................................................................................................4
4.4. 实现要素...............................................................................................................4
4.4.1. 工做窃取 .......................................................................................................
简单例子
newCachedThreadPool 建立一个可缓存的线程池。默认的,若是线程池的大小超过了处理任务所须要的线程, 那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增长时,此线程池又能够智能的添加新线程来处理任务。此线程池不会对线程池大小作限制,线程池大小彻底依赖于操做系统(或者说JVM)可以建立的最大线程大小。
newFixedThreadPool 建立固定大小的线程池。每次提交一个任务就建立一个线程,线程可复用,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,若是某个线程由于执行异常而结束,那么线程池会补充一个新线程。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPollDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyTask task = new MyTask(); // 建立一个可重用固定线程数的线程池 ExecutorService pool = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { pool.submit(task); } pool.shutdown(); } }
newSingleThreadExecutor 建立一个单线程的线程池。这个线程池只有一个线程在工做,也就是至关于单线程串行执行全部任务。若是这个惟一的线程由于异常结束,那么会有一个新的线程来替代它。此线程池保证全部任务的执行顺序按照任务的提交顺序执行。
newScheduledThreadPool 建立一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ThreadPollDemo { public static class MyTask implements Runnable { @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyTask task = new MyTask(); ScheduledExecutorService pool = Executors.newScheduledThreadPool(5); //若是前面的任务没有完成,则调度也不会启动 //每3秒重复执行一次 pool.scheduleWithFixedDelay(task, 0,3, TimeUnit.SECONDS); //3秒后执行一次 // pool.schedule(task, 3, TimeUnit.SECONDS); // pool.shutdown(); } }
扩展和加强线程池
import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPollDemo { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在执行" + ":Thread ID:" + Thread.currentThread().getId() + "task name= " + name); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() ) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("准备执行" + ((MyTask) r).name); super.beforeExecute(t, r); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); System.out.println("执行完成" + ((MyTask) r).name); } @Override protected void terminated() { super.terminated(); System.out.println("线程池退出"); } }; for (int i = 0; i < 5; i++) { MyTask task = new MyTask("TASK-GEYM" + i); pool.execute(task); Thread.sleep(10); } pool.shutdown(); } }
准备执行TASK-GEYM0 正在执行:Thread ID:10task name= TASK-GEYM0 准备执行TASK-GEYM1 正在执行:Thread ID:11task name= TASK-GEYM1 准备执行TASK-GEYM2 正在执行:Thread ID:12task name= TASK-GEYM2 准备执行TASK-GEYM3 正在执行:Thread ID:13task name= TASK-GEYM3 准备执行TASK-GEYM4 正在执行:Thread ID:14task name= TASK-GEYM4 执行完成TASK-GEYM0 执行完成TASK-GEYM1 执行完成TASK-GEYM2 执行完成TASK-GEYM3 执行完成TASK-GEYM4 线程池退出
拒绝策略
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPollDemo { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在执行" + ":Thread ID:" + Thread.currentThread().getId() + "task name= " + name); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString()+" is discard"); } } ) ; for (int i = 0; i < Integer.MAX_VALUE; i++) { MyTask task = new MyTask("TASK-GEYM" + i); pool.submit(task); Thread.sleep(10); } // pool.shutdown(); } }
正在执行:Thread ID:10task name= TASK-GEYM0 正在执行:Thread ID:11task name= TASK-GEYM1 正在执行:Thread ID:12task name= TASK-GEYM2 正在执行:Thread ID:13task name= TASK-GEYM3 正在执行:Thread ID:14task name= TASK-GEYM4 java.util.concurrent.FutureTask@7b23ec81 is discard java.util.concurrent.FutureTask@6acbcfc0 is discard java.util.concurrent.FutureTask@5f184fc6 is discard java.util.concurrent.FutureTask@3feba861 is discard 正在执行:Thread ID:10task name= TASK-GEYM9 正在执行:Thread ID:11task name= TASK-GEYM10 正在执行:Thread ID:12task name= TASK-GEYM11 正在执行:Thread ID:13task name= TASK-GEYM12 正在执行:Thread ID:14task name= TASK-GEYM13 java.util.concurrent.FutureTask@5b480cf9 is discard java.util.concurrent.FutureTask@6f496d9f is discard java.util.concurrent.FutureTask@723279cf is discard java.util.concurrent.FutureTask@10f87f48 is discard 正在执行:Thread ID:10task name= TASK-GEYM18 正在执行:Thread ID:11task name= TASK-GEYM19
自定义线程池名称
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPollDemo { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在执行" + ":Thread ID:" + Thread.currentThread().getId() + "task name= " + name); System.out.println(Thread.currentThread().getName()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new MssThreadFactory("个人专属线程池"), // Executors.defaultThreadFactory(), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is discard"); } } ); for (int i = 0; i < Integer.MAX_VALUE; i++) { MyTask task = new MyTask("TASK-GEYM" + i); pool.submit(task); Thread.sleep(10); } // pool.shutdown(); } }
import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * description: * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/4 5:00 PM */ public class MssThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; MssThreadFactory(String namePrefix) { this.namePrefix = namePrefix+"-"; } public Thread newThread(Runnable r) { Thread t = new Thread( r,namePrefix + threadNumber.getAndIncrement()); if (t.isDaemon()) t.setDaemon(true); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
ForkJoin
发送消息 RecursiveAction 无返回值
/** * description: 发送消息 * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/4 5:12 PM */ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; public class ForkJoinPoolDemo { class SendMsgTask extends RecursiveAction { private final int THRESHOLD = 10; private int start; private int end; private List<String> list; public SendMsgTask(int start, int end, List<String> list) { this.start = start; this.end = end; this.list = list; } @Override protected void compute() { if ((end - start) <= THRESHOLD) { for (int i = start; i < end; i++) { System.out.println(Thread.currentThread().getName() + ": " + list.get(i)); } }else { int middle = (start + end) / 2; //批量提交任务集 invokeAll(new SendMsgTask(start, middle, list), new SendMsgTask(middle, end, list)); } } } public static void main(String[] args) throws InterruptedException { List<String> list = new ArrayList<>(); for (int i = 0; i < 123; i++) { list.add(String.valueOf(i+1)); } ForkJoinPool pool = new ForkJoinPool(); pool.submit(new ForkJoinPoolDemo().new SendMsgTask(0, list.size(), list)); pool.awaitTermination(10, TimeUnit.SECONDS); pool.shutdown(); } }
计算求和
/** * description: 求和 RecursiveTask 有返回值 * * @author: dawn.he QQ: 905845006 * @email: dawn.he@cloudwise.com * @email: 905845006@qq.com * @date: 2019/10/4 5:25 PM */ import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; public class ForkJoinTaskDemo { private class SumTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 1000; private int arr[]; private int start; private int end; public SumTask(int[] arr, int start, int end) { this.arr = arr; this.start = start; this.end = end; } /** * 小计 */ private Integer subtotal() { Integer sum = 0; for (int i = start; i < end; i++) { sum += arr[i]; } System.out.println(Thread.currentThread().getName() + ": ∑(" + start + "~" + end + ")=" + sum); return sum; } @Override protected Integer compute() { if ((end - start) <= THRESHOLD) { return subtotal(); }else { int middle = (start + end) / 2; SumTask left = new SumTask(arr, start, middle); SumTask right = new SumTask(arr, middle, end); left.fork(); right.fork(); return left.join() + right.join(); } } } public static void main(String[] args) throws ExecutionException, InterruptedException { int[] arr = new int[10000]; for (int i = 0; i < 10000; i++) { arr[i] = i + 1; } ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Integer> result = pool.submit(new ForkJoinTaskDemo().new SumTask(arr, 0, arr.length)); //提交任务 System.out.println("最终计算结果: " + result.invoke()); pool.shutdown(); } }