执行一个异步任务你还只是以下new Thread吗?数据库
new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start();
那你就out太多了,new Thread的弊端以下:缓存
相比new Thread,Java提供的四种线程池的好处在于:安全
Java经过Executors提供四种线程池,分别为:并发
建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码以下:异步
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; try { Thread.sleep(index * 1000); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(new Runnable() { @Override public void run() { System.out.println(index); } }); }
线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。ide
建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码以下:函数
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { final int index = i; fixedThreadPool.execute(new Runnable() { @Override public void run() { try { System.out.println(index); Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); }
由于线程池大小为3,每一个任务输出index后sleep 2秒,因此每两秒打印3个数字。 定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。可参考PreloadDataCache。性能
建立一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码以下:this
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.schedule(new Runnable() { @Override public void run() { System.out.println("delay 3 seconds"); } }, 3, TimeUnit.SECONDS);
表示延迟3秒执行。线程
按期执行示例代码以下:
scheduledThreadPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("delay 1 seconds, and excute every 3 seconds"); } }, 1, 3, TimeUnit.SECONDS);
表示延迟1秒后每3秒执行一次。 ScheduledExecutorService比Timer更安全,功能更强大,后面会有一篇单独进行对比。
建立一个单线程化的线程池,它只会用惟一的工做线程来执行任务,保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码以下:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int index = i; singleThreadExecutor.execute(new Runnable() { @Override public void run() { try { System.out.println(index); Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); }
结果依次输出,至关于顺序执行各个任务。 现行大多数GUI程序都是单线程的。Android中单线程可用于数据库操做,文件操做,应用批量安装,应用批量删除等不适合并发但可能IO阻塞性及影响UI线程响应的操做。
当使用ExecutorService启动了多个Callable后,每一个Callable会产生一个Future,咱们须要将多个Future存入一个线性表,用于以后处理数据。固然,还有更复杂的状况,有5个生产者线程,每一个生产者线程都会建立任务,全部任务的Future都存放到同一个线性表中。而后遍历线性表,经过调用future.get(0, TimeUnit.SECONDS)不断尝试获取完成结果,直到获取到全部的结果。以下
public class ExecutorServiceTest { static class Task implements Callable<String>{ private int i; public Task(int i){ this.i = i; } @Override public String call() throws Exception { Thread.sleep(10000); return Thread.currentThread().getName() + "执行完任务:" + i; } } public static void main(String[] args){ testUseFuture(); } private static void testUseFuture(){ int numThread = 5; ExecutorService executor = Executors.newFixedThreadPool(numThread); List<Future<String>> futureList = new ArrayList<Future<String>>(); for(int i = 0;i<numThread;i++ ){ Future<String> future = executor.submit(new ExecutorServiceTest.Task(i)); futureList.add(future); } int i=0; while(numThread > 0){ i++; for(Future<String> future : futureList){ String result = null; try { result = future.get(0, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { //超时异常直接忽略 } if(null != result){ futureList.remove(future); numThread--; System.out.println(result); //此处必须break,不然会抛出并发修改异常。(也能够经过将futureList声明为CopyOnWriteArrayList类型解决) break; } } } System.out.println("共"+i+"次遍历任务列表"); } }
执行后输出
pool-1-thread-2执行完任务:1 pool-1-thread-3执行完任务:2 pool-1-thread-4执行完任务:3 pool-1-thread-1执行完任务:0 pool-1-thread-5执行完任务:4 共1265559次遍历任务列表
根据输出咱们能够看到,为了获取到5个任务的执行结果,程序一共遍历了1265559次任务列表,这不是一种很是好的办法。
CompletionService正是为此而存在,它是一个更高级的ExecutorService,它自己自带一个线程安全的线性表,无需用户额外建立。它提供了3种方法从线性表中取出结果:poll()是非阻塞的,若目前无结果,返回一个null,线程继续运行不阻塞;poll(long timeout, TimeUnit unit)是阻塞的,若目前无结果,则会等待一段时间;**take()**是阻塞的,若当前无结果,则线程阻塞,直到产生一个结果,被取出返回,线程才继续运行。 修改后的程序以下:
public class CompletionServiceTest { static class Task implements Callable<String>{ private int i; public Task(int i){ this.i = i; } @Override public String call() throws Exception { Thread.sleep(10000); return Thread.currentThread().getName() + "执行完任务:" + i; } } public static void main(String[] args) throws InterruptedException, ExecutionException{ testExecutorCompletionService(); } private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{ int numThread = 5; ExecutorService executor = Executors.newFixedThreadPool(numThread); CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); for(int i = 0;i<numThread;i++ ){ completionService.submit(new CompletionServiceTest.Task(i)); } for(int i = 0;i<numThread;i++ ){ System.out.println(completionService.take().get()); System.out.println("第"+(i+1)+"次获取结果"); } } }
使用completionService.take()阻塞方法来获取已完成Future<>,不须要一直遍历查询。
pool-1-thread-2执行完任务:1 第1次获取结果 pool-1-thread-3执行完任务:2 第2次获取结果 pool-1-thread-1执行完任务:0 第3次获取结果 pool-1-thread-5执行完任务:4 第4次获取结果 pool-1-thread-4执行完任务:3 第5次获取结果
CompletionService整合了Executor和BlockingQueue的功能。你能够将Callable任务提交给它去执行,而后使用相似于队列中的take和poll方法,在结果完整可用时得到这个结果,像一个打包的Future。ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。
ExecutorCompletionService的实现至关直观。它在构造函数中建立一个BlockingQueue,用它去保持完成的结果。计算完成时会调用FutureTask中的done方法。当提交一个任务后,首先把这个任务包装为一个QueueingFuture,它是FutureTask的一个子类,而后覆写done方法,将结果置入BlockingQueue中,take和poll方法委托给了BlockingQueue,它会在结果不可用时阻塞。
public class ExecutorCompletionService<V> implements CompletionService<V> { private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; /** * FutureTask extension to enqueue upon completion */ private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; } private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); } /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and a * {@link LinkedBlockingQueue} as a completion queue. * * @param executor the executor to use * @throws NullPointerException if executor is <tt>null</tt> */ public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and the supplied queue as its * completion queue. * * @param executor the executor to use * @param completionQueue the queue to use as the completion queue * normally one dedicated for use by this service * @throws NullPointerException if executor or completionQueue are <tt>null</tt> */ public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; } public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; } public Future<V> take() throws InterruptedException { return completionQueue.take(); } public Future<V> poll() { return completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); } }