<!-- create time: 2016-03-14 09:49:12 -->java
本节主要结合排序的实例来演示多线程执行任务的流程,主要使用了线程池
ExecutorService
, 闭锁Futrue
, 完成服务CompletionService
以及最多见的冒泡排序算法算法
ExecutorService 线程池数组
private static ExecutorService executorService = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("int-sort"), new ThreadPoolExecutor.CallerRunsPolicy());多线程
上面是建立一个线程池的实例,其中几个参数分别为:(来自jdk) ```java /** * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } ``` - Futrue > 就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时能够经过get方法获取执行结果,该方法会阻塞直到任务返回结果 ```java public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
CompletionService并发
若是向Executor提交了一组计算任务,而且但愿在计算完成后得到结果,那么能够保留与每一个任务关联的Future,而后反复使用get方法,同时将参数timeout指定为0,从而经过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务CompletionServiceless
import io.netty.util.concurrent.DefaultThreadFactory; import org.junit.Test; import java.util.concurrent.*; /** * Created by yihui on 16/3/11. */ public class RunnableTest { private static ExecutorService executorService = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10), new DefaultThreadFactory("sort-calculate"), new ThreadPoolExecutor.CallerRunsPolicy()); /** * 随机生成一些数 * * @param size * @return */ private int[] genNums(final int size) { int[] num = new int[size]; for (int i = 0; i < size; i++) { num[i] = (int) (Math.random() * 1230); } return num; } // 冒泡排序 private int[] sort(int[] num, int size) { if (size <= 1) { return num; } int tmp; for (int i = 0; i < size; i++) { for (int j = i + 1; j < size; j++) { if (num[i] > num[j]) { tmp = num[i]; num[i] = num[j]; num[j] = tmp; } } } return num; } // 合并两个排序数组 public int[] merge(int[] ans, int[] sub) { if (ans == null) { return sub; } int ansSize = ans.length; int subSize = sub.length; int[] result = new int[subSize + ansSize]; for (int i =0, ansIndex=0, subIndex=0; i < ansSize + subSize; i ++) { if (subIndex >= subSize) { result[i] = ans[ansIndex ++]; continue; } if (ansIndex >= ansSize) { result[i] = sub[subIndex ++]; continue; } if (ans[ansIndex] < sub[subIndex]) { result[i] = ans[ansIndex ++]; } else { result[i] = sub[subIndex ++]; } } return result; } public int[] calculate(int[] numbers, int size) { CompletionService<int[]> completionService = new ExecutorCompletionService<int[]>(executorService); if (size <= 50) { return this.sort(numbers, size); } // 将数组分割,50个做为一组,进行排序 int subNum = (size - 1) / 50 + 1; for (int i = 0; i < subNum; i++) { int len = 50; if (i == subNum - 1) { len = size - 50 * i; } final int[] subNumbers = new int[len]; System.arraycopy(numbers, i * 50 + 0, subNumbers, 0, len); final int finalLen = len; Callable<int[]> runnable = new Callable<int[]>() { @Override public int[] call() throws Exception { return sort(subNumbers, finalLen); } }; completionService.submit(runnable); } int[] ans = null; // 开始对提交的排序任务的结果进行合并 try{ for (int i = 0; i < subNum; i ++) { // get and remove the result Future<int[]> f = completionService.take(); int[] tmp = f.get(); ans = this.merge(ans, tmp); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return ans; } // 输出数组 private void print(int[] num, int size, boolean newLine) { for (int i = 0; i < size; i++) { System.out.print(num[i] + ","); } if (newLine) { System.out.println(); } } @Test public void tt() { int size = 250; int[] numbers = this.genNums(size); int[] numbers2 = new int[size]; System.arraycopy(numbers, 0, numbers2, 0, size); long start = System.nanoTime(); this.sort(numbers, size); long end = System.nanoTime(); this.print(numbers, size, true); System.out.println("Cost is : " + (end - start) / 1000); this.print(numbers2, size, true); start = System.nanoTime(); int[] ans = this.calculate(numbers2, size); end = System.nanoTime(); this.print(ans, size, true); System.out.println("cost is : " + (end - start) / 1000); } // 用于测试排序算法,以及合并算法的正确性 @Test public void test() { int size = 10; int[] numbers = this.genNums(size); int[] ans1 = this.sort(numbers, size); this.print(ans1, size, true); size += 5; int[] numbers2 = this.genNums(size); int[] ans2 = this.sort(numbers2, size); this.print(ans2, size, true); int[] ans = this.merge(ans1, ans2); this.print(ans, 25, true); } }
针对上面的实例,咱们重点须要关注的对象集中在 calculate
方法中dom
执行流程:ide
completionQueue
队列中// 提交task completionService.submit(new Callable<int[]>() { @Override public int[] call() throws Exception { return sort(subNumbers, finalLen); } }); // 获取结果 for (...) { // take 表示从阻塞队列中获取并移除Future === get以后remove掉 Futrue<int[]> futrue = completionService.take(); int[] ans = futrue.get(); }