02.第二阶段、实战Java高并发程序设计模式-5.JDK并发包2

6.JDK并发包2 ......................................................................................................................1java

1.线程池的基本使用 ......................................................................................................2缓存

  1. 1.1.  为何须要线程池...............................................................................................2并发

  2. 1.2.  JDK为咱们提供了哪些支持 .................................................................................2ide

  3. 1.3.  线程池的使用.......................................................................................................2this

    1. 1.3.1.  线程池的种类 ...............................................................................................2atom

    2. 1.3.2.  不一样线程池的共同性 ...................................................................................2操作系统

1.4. 线程池使用的小例子...........................................................................................2线程

  1. 1.4.1.  简单线程池 ...................................................................................................3code

  2. 1.4.2.  ScheduledThreadPool ....................................................................................3blog

扩展和加强线程池 .....................................................................................................3

  1. 2.1.  回调接口...............................................................................................................3

  2. 2.2.  拒绝策略...............................................................................................................3

  3. 2.3.  自定义ThreadFactory ...........................................................................................3

线程池及其核心代码分析 .........................................................................................

 ForkJoin ........................................................................................................................3

4.1. 思想.......................................................................................................................3

4.2. 使用接口...............................................................................................................4

  1. 4.2.1.  RecursiveAction .............................................................................................4

  2. 4.2.2.  RecursiveTask ................................................................................................4

4.3. 简单例子...............................................................................................................4

4.4. 实现要素...............................................................................................................4

4.4.1. 工做窃取 .......................................................................................................

  1. 简单例子

  2. newCachedThreadPool 建立一个可缓存的线程池。默认的,若是线程池的大小超过了处理任务所须要的线程, 那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增长时,此线程池又能够智能的添加新线程来处理任务。此线程池不会对线程池大小作限制,线程池大小彻底依赖于操做系统(或者说JVM)可以建立的最大线程大小。

  3. newFixedThreadPool 建立固定大小的线程池。每次提交一个任务就建立一个线程,线程可复用,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,若是某个线程由于执行异常而结束,那么线程池会补充一个新线程。

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ThreadPollDemo {
    
    	public static class MyTask implements Runnable {
    		@Override
    		public void run() {
    			System.out.println(System.currentTimeMillis() + ":Thread ID:" +
    					Thread.currentThread().getId());
    			try {
    				Thread.sleep(1000);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    	public static void main(String[] args) {
    		MyTask task = new MyTask();
    		// 建立一个可重用固定线程数的线程池
    		ExecutorService pool = Executors.newFixedThreadPool(5);
    
    		for (int i = 0; i < 10; i++) {
    			pool.submit(task);
    		}
    		pool.shutdown();
    	}
    }
  4. newSingleThreadExecutor 建立一个单线程的线程池。这个线程池只有一个线程在工做,也就是至关于单线程串行执行全部任务。若是这个惟一的线程由于异常结束,那么会有一个新的线程来替代它。此线程池保证全部任务的执行顺序按照任务的提交顺序执行。

  5. newScheduledThreadPool 建立一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPollDemo {
    
    	public static class MyTask implements Runnable {
    		@Override
    		public void run() {
    			System.out.println(System.currentTimeMillis() + ":Thread ID:" +
    					Thread.currentThread().getId());
    			try {
    				Thread.sleep(1000);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    	public static void main(String[] args) {
    		MyTask task = new MyTask();
    		ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
    		//若是前面的任务没有完成,则调度也不会启动
    		//每3秒重复执行一次
    		pool.scheduleWithFixedDelay(task, 0,3, TimeUnit.SECONDS);
    		//3秒后执行一次
    //        pool.schedule(task, 3, TimeUnit.SECONDS);
    //        pool.shutdown();
    	}
    }

    扩展和加强线程池

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPollDemo {
    
    	public static class MyTask implements Runnable {
    
    		public String name;
    
    		public MyTask(String name) {
    			this.name = name;
    		}
    
    		@Override
    		public void run() {
    			System.out.println("正在执行" + ":Thread ID:" +
    					Thread.currentThread().getId() + "task name= " + name);
    			try {
    				Thread.sleep(100);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    
    
    	}
    
    
    	public static void main(String[] args) throws InterruptedException {
    		ExecutorService pool = new ThreadPoolExecutor(5,
    				5,
    				0L,
    				TimeUnit.MILLISECONDS,
    				new LinkedBlockingQueue<Runnable>()
    		) {
    			@Override
    			protected void beforeExecute(Thread t, Runnable r) {
    				System.out.println("准备执行" + ((MyTask) r).name);
    				super.beforeExecute(t, r);
    			}
    
    			@Override
    			protected void afterExecute(Runnable r, Throwable t) {
    				super.afterExecute(r, t);
    				System.out.println("执行完成" + ((MyTask) r).name);
    			}
    
    			@Override
    			protected void terminated() {
    				super.terminated();
    				System.out.println("线程池退出");
    			}
    		};
    		for (int i = 0; i < 5; i++) {
    			MyTask task = new MyTask("TASK-GEYM" + i);
    			pool.execute(task);
    			Thread.sleep(10);
    		}
    		pool.shutdown();
    	}
    }
    准备执行TASK-GEYM0
    正在执行:Thread ID:10task name= TASK-GEYM0
    准备执行TASK-GEYM1
    正在执行:Thread ID:11task name= TASK-GEYM1
    准备执行TASK-GEYM2
    正在执行:Thread ID:12task name= TASK-GEYM2
    准备执行TASK-GEYM3
    正在执行:Thread ID:13task name= TASK-GEYM3
    准备执行TASK-GEYM4
    正在执行:Thread ID:14task name= TASK-GEYM4
    执行完成TASK-GEYM0
    执行完成TASK-GEYM1
    执行完成TASK-GEYM2
    执行完成TASK-GEYM3
    执行完成TASK-GEYM4
    线程池退出

    拒绝策略

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.SynchronousQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPollDemo {
    
    	public static class MyTask implements Runnable {
    
    		public String name;
    
    		public MyTask(String name) {
    			this.name = name;
    		}
    
    		@Override
    		public void run() {
    			System.out.println("正在执行" + ":Thread ID:" +
    					Thread.currentThread().getId() + "task name= " + name);
    			try {
    				Thread.sleep(100);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    
    
    	}
    
    
    	public static void main(String[] args) throws InterruptedException {
    		ExecutorService pool = new ThreadPoolExecutor(5,
    				5,
    				0L,
    				TimeUnit.MILLISECONDS,
    				new SynchronousQueue<Runnable>(),
    				Executors.defaultThreadFactory(),
    				new RejectedExecutionHandler() {
    					@Override
    					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    						System.out.println(r.toString()+" is discard");
    					}
    				}
    		) ;
    		for (int i = 0; i < Integer.MAX_VALUE; i++) {
    			MyTask task = new MyTask("TASK-GEYM" + i);
    			pool.submit(task);
    			Thread.sleep(10);
    		}
    //        pool.shutdown();
    	}
    }
    正在执行:Thread ID:10task name= TASK-GEYM0
    正在执行:Thread ID:11task name= TASK-GEYM1
    正在执行:Thread ID:12task name= TASK-GEYM2
    正在执行:Thread ID:13task name= TASK-GEYM3
    正在执行:Thread ID:14task name= TASK-GEYM4
    java.util.concurrent.FutureTask@7b23ec81 is discard
    java.util.concurrent.FutureTask@6acbcfc0 is discard
    java.util.concurrent.FutureTask@5f184fc6 is discard
    java.util.concurrent.FutureTask@3feba861 is discard
    正在执行:Thread ID:10task name= TASK-GEYM9
    正在执行:Thread ID:11task name= TASK-GEYM10
    正在执行:Thread ID:12task name= TASK-GEYM11
    正在执行:Thread ID:13task name= TASK-GEYM12
    正在执行:Thread ID:14task name= TASK-GEYM13
    java.util.concurrent.FutureTask@5b480cf9 is discard
    java.util.concurrent.FutureTask@6f496d9f is discard
    java.util.concurrent.FutureTask@723279cf is discard
    java.util.concurrent.FutureTask@10f87f48 is discard
    正在执行:Thread ID:10task name= TASK-GEYM18
    正在执行:Thread ID:11task name= TASK-GEYM19

    自定义线程池名称

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.SynchronousQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPollDemo {
    
    	public static class MyTask implements Runnable {
    
    		public String name;
    
    		public MyTask(String name) {
    			this.name = name;
    		}
    
    		@Override
    		public void run() {
    			System.out.println("正在执行" + ":Thread ID:" +
    					Thread.currentThread().getId() + "task name= " + name);
    			System.out.println(Thread.currentThread().getName());
    
    			try {
    				Thread.sleep(100);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    
    
    	}
    
    
    	public static void main(String[] args) throws InterruptedException {
    
    		ExecutorService pool = new ThreadPoolExecutor(5,
    				5,
    				0L,
    				TimeUnit.MILLISECONDS,
    				new SynchronousQueue<Runnable>(),
    				new MssThreadFactory("个人专属线程池"),
    //                Executors.defaultThreadFactory(),
    				new RejectedExecutionHandler() {
    					@Override
    					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    						System.out.println(r.toString() + " is discard");
    					}
    				}
    		);
    		for (int i = 0; i < Integer.MAX_VALUE; i++) {
    			MyTask task = new MyTask("TASK-GEYM" + i);
    			pool.submit(task);
    			Thread.sleep(10);
    		}
    //        pool.shutdown();
    	}
    }
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * description:
     *
     * @author: dawn.he QQ:       905845006
     * @email: dawn.he@cloudwise.com
     * @email: 905845006@qq.com
     * @date: 2019/10/4    5:00 PM
     */
    public class MssThreadFactory implements ThreadFactory {
    	private final AtomicInteger threadNumber = new AtomicInteger(1);
    	private final String namePrefix;
    
    	MssThreadFactory(String namePrefix) {
    		this.namePrefix = namePrefix+"-";
    	}
    
    	public Thread newThread(Runnable r) {
    		Thread t = new Thread( r,namePrefix + threadNumber.getAndIncrement());
    		if (t.isDaemon())
    			t.setDaemon(true);
    		if (t.getPriority() != Thread.NORM_PRIORITY)
    			t.setPriority(Thread.NORM_PRIORITY);
    		return t;
    	}
    }

    ForkJoin

    发送消息 RecursiveAction 无返回值

    /**
     * description: 发送消息
     *
     * @author: dawn.he QQ:       905845006
     * @email: dawn.he@cloudwise.com
     * @email: 905845006@qq.com
     * @date: 2019/10/4    5:12 PM
     */
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveAction;
    import java.util.concurrent.TimeUnit;
    
    public class ForkJoinPoolDemo {
    
    	class SendMsgTask extends RecursiveAction {
    
    		private final int THRESHOLD = 10;
    
    		private int start;
    		private int end;
    		private List<String> list;
    
    		public SendMsgTask(int start, int end, List<String> list) {
    			this.start = start;
    			this.end = end;
    			this.list = list;
    		}
    
    		@Override
    		protected void compute() {
    
    			if ((end - start) <= THRESHOLD) {
    				for (int i = start; i < end; i++) {
    					System.out.println(Thread.currentThread().getName() + ": " + list.get(i));
    				}
    			}else {
    				int middle = (start + end) / 2;
    				//批量提交任务集
    				invokeAll(new SendMsgTask(start, middle, list), new SendMsgTask(middle, end, list));
    			}
    
    		}
    
    	}
    
    	public static void main(String[] args) throws InterruptedException {
    		List<String> list = new ArrayList<>();
    		for (int i = 0; i < 123; i++) {
    			list.add(String.valueOf(i+1));
    		}
    
    		ForkJoinPool pool = new ForkJoinPool();
    		pool.submit(new ForkJoinPoolDemo().new SendMsgTask(0, list.size(), list));
    		pool.awaitTermination(10, TimeUnit.SECONDS);
    		pool.shutdown();
    	}
    
    }

    计算求和

    /**
     * description: 求和 RecursiveTask 有返回值
     *
     * @author: dawn.he QQ:       905845006
     * @email: dawn.he@cloudwise.com
     * @email: 905845006@qq.com
     * @date: 2019/10/4    5:25 PM
     */
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    public class ForkJoinTaskDemo {
    
    	private class SumTask extends RecursiveTask<Integer> {
    
    		private static final int THRESHOLD = 1000;
    
    		private int arr[];
    		private int start;
    		private int end;
    
    		public SumTask(int[] arr, int start, int end) {
    			this.arr = arr;
    			this.start = start;
    			this.end = end;
    		}
    
    		/**
    		 * 小计
    		 */
    		private Integer subtotal() {
    			Integer sum = 0;
    			for (int i = start; i < end; i++) {
    				sum += arr[i];
    			}
    			System.out.println(Thread.currentThread().getName() + ": ∑(" + start + "~" + end + ")=" + sum);
    			return sum;
    		}
    
    		@Override
    		protected Integer compute() {
    
    			if ((end - start) <= THRESHOLD) {
    				return subtotal();
    			}else {
    				int middle = (start + end) / 2;
    				SumTask left = new SumTask(arr, start, middle);
    				SumTask right = new SumTask(arr, middle, end);
    				left.fork();
    				right.fork();
    
    				return left.join() + right.join();
    			}
    		}
    	}
    
    	public static void main(String[] args) throws ExecutionException, InterruptedException {
    		int[] arr = new int[10000];
    		for (int i = 0; i < 10000; i++) {
    			arr[i] = i + 1;
    		}
    
    		ForkJoinPool pool = new ForkJoinPool();
    		ForkJoinTask<Integer> result = pool.submit(new ForkJoinTaskDemo().new SumTask(arr, 0, arr.length));
    		//提交任务
    		System.out.println("最终计算结果: " + result.invoke());
    		pool.shutdown();
    	}
    
    }

相关文章
相关标签/搜索