1.异步和同步java
同步执行很容易理解,代码的操做顺序就是程序执行的顺序。可是实际使用中,不少场景经常会受限于同步执行,不能充分利用cpu的资源,例如,要查找一大批数据中的最大数,同步执行时,多是花费10单位的时间读取数据,1单位的时间进行计算,总计在11单位时间后获得结果;而,异步执行时,分派10个线程执行任务,将会花费1单位的时间读取数据,1单位时间进行计算,总计在2单位时间后获得结果。数据库
相对于同步而言,异步本质上是申请线程,提升cpu的利用率(单核cpu执行计算密集型任务时会下降)更快地获得结果。在解决问题时合理地选择同步和异步能更好地利用好计算资源。编程
从数据的角度来看,在同步操做中,数据的变化都保持必定的前后顺序关系,不会出现冲突的状况;而在异步操做中,数据随时可能被其中某个线程更改,这时须要注意数据的一致性,尤为是写操做,要保证事务性,这里能够对数据加锁来实现。另外有时线程之间也须要保证必定的顺序,须要使用线程锁(这里有的能够经过编码技巧或者回调方法解决)。多线程
2.线程池异步
在面向对象编程中,建立和销毁对象是很费时间的,由于建立一个对象要获取内存资源或者其它更多资源。在Java中更是如此,虚拟机将试图跟踪每个对象,以便可以在对象销毁后进行垃圾回收。因此提升服务程序效率的一个手段就是尽量减小建立和销毁对象的次数,特别是一些很耗资源的对象建立和销毁。如何利用已有对象来服务就是一个须要解决的关键问题,其实这就是一些"池化资源"技术产生的缘由。好比你们所熟悉的数据库链接池正是遵循这一思想而产生的,接下来将介绍的线程池技术一样符合这一思想。ide
在java中,可使用java.util.concurrent.ThreadPoolExecutor来建立线程池,this
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
建立方法中的几个参数须要注意:编码
先说最重要的3个参数,corePoolSize,maximumPoolSize,workQueue。corePoolSize设定核心线程数,maximumPoolSize设定最大线程数,workQueue设定等待队列。当有新任务时,首先检查当前的线程数是否小于corePoolSize,若是是,则新建线程处理任务;若是不是,再检查当前workQueue是否已满,若是未满,则把新的任务加入workQueue,core线程完成时会从workQueue中取得任务继续执行;若是workQueue已满,再检查当前线程数是否小于maximumPoolSize,若是是,则建立线程处理任务,若是不是,则抛出拒绝服务的异常(默认是抛出异常,具体如何处理是最后一个参数RejectExecutionHandler来决定的)。spa
其余的参数分别是,keepAliveTime、unit控制空闲线程的存活时间;threadFactory,建立新的线程时使用的建立者;handler,拒绝服务时的处理方式;(后两个参数都有默认的选择)线程
从线程池的工做方式能够看到,core,max,queue决定了线程池的表现,下面讲述这三个参数的参考设定。
通常来讲,须要处理的qps平均为n,最大为m,每一个请求的处理时间为t(秒)时,core=nt+,max=mt+,queue视须要而定(当这些请求须要尽快响应,cpu资源常有空闲时,queue=0)
对于cpu密集型任务,如大量数据的计算,匹配,排序等,这时cpu的处理能力成为瓶颈,core和max要注意不要设定得太大,要衡量好cpu的处理能力。
对于io密集型任务,如操做数据库,http请求等,core和max能够考虑设定得更大,由于线程一般处于等待之中,不会耗费多少cpu。
(20160329.add)对于cpu密集型任务:cpu为单核时,没有必要使用多线程:单线程时,cpu资源主要都在计算上;多线程时,cpu还须要额外耗费线程之间切换的资源,下降了计算效率。cpu为多核时,有必要使用多线程(单线程时只有一个核在进行计算)
——总的来讲,当cpu常有空闲的状况时,就应该考虑使用多线程了。
一些使用的例子:
1.简单的一个线程
public class TestClass { /*/*/ private SettableFuture<String> settableFuture = SettableFuture.create(); public void todo(final String param) throws Exception{ Thread thread = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(100); System.out.println("begin " + param); Thread.sleep(1000); System.out.println("finish sleep"); } catch (InterruptedException e) { e.printStackTrace(); }finally { settableFuture.set("complete " + param); } } }); thread.start(); } public SettableFuture<String> getSettableFuture() { return settableFuture; } public static void main(String[] args) throws Exception { TestClass testClass = new TestClass(); testClass.todo("test"); System.out.println("start todo"); System.out.println(testClass.getSettableFuture().get()); } }
2.线程池&异步回调
public class TestClass { private ListenableFuture<String> listenableFuture; private ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); public void todo(final String param){ listenableFuture = listeningExecutorService.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("call " + param); Thread.sleep(100); return "call " + param + " complete"; } }); Futures.addCallback(listenableFuture, new FutureCallback<String>() { @Override public void onSuccess(String s) { try { System.out.println(s + " success"); Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void onFailure(Throwable throwable) { System.out.println("failed"); } }); public static void main(String[] args) throws InterruptedException { TestClass testClass = new TestClass(); testClass.todo("test"); System.out.println("ok"); } }
3.同时执行多个异步回调任务,等待全部任务结束后,输出全部任务的执行结果
public class TestClass { private ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); public void todo(final String param, final CountDownLatch countDownLatch, final List<String> result) throws InterruptedException { ListenableFuture listenableFuture = listeningExecutorService.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(100); System.out.println("exec " + param); result.add(String.valueOf(param)); System.out.println("exec "+param+" finished"); return String.valueOf(param); } }); Futures.addCallback(listenableFuture, new FutureCallback<String>() { @Override public void onSuccess(String s) { System.out.println("success "+s); countDownLatch.countDown(); } @Override public void onFailure(Throwable throwable) { System.out.println("failed"); countDownLatch.countDown(); } }); } public static void main(String[] args) throws InterruptedException { int taskSize = 4; TestClass testClass = new TestClass(); final List<String> result = Lists.newArrayList(); final CountDownLatch countDownLatch = new CountDownLatch(taskSize); for (int i = 0; i < taskSize; i++) { testClass.todo("test" + i, countDownLatch, result); } System.out.println("add task finished"); countDownLatch.await(10, TimeUnit.SECONDS); System.out.println(result); testClass.listeningExecutorService.shutdown(); } //*/ }