书上写的:
Future 模式是多线程开发中很是常见的一种设计模式,它的核心思想是异步调用。当咱们须要调用一一个函数方法时,若是这个函数执行很慢,那么咱们就要进行等待。但有时候,咱们可能并不急着要结果。所以,咱们可让被调者当即返回,让它在后台慢慢处理这个请求。对于调用者来讲,则能够先处理一些其余任务,在真正须要数据的场合再去尝试得到须要的数据。 Future 模式有点相似在网上买东西。若是咱们在网上下单买了一一个手机,当咱们支付完成后,手机并无办法当即送到家里,可是在电脑上会当即产生一个订单。这个订单就是未来发货或者领取手机的重要凭证,这个凭证也就是Future模式中会给出的一个契约。在支付活动结束后,你们不会傻傻地等着手机到来,而是能够各忙各的。而这张订单就成为了商家配货、发货的驱动力。固然,这一切你并不用关心。你要作的,只是在快递上门时,开一下门,拿一下货而已。 对于Future模式来讲,虽然它没法当即给出你须要的数据。可是,它会返回给你一一个契约,未来,你能够凭借着这个契约去从新获取你须要的信息。
java
如图,显示了经过传统的同步方法,调用一段比较耗时的程序。客户端发出call请求,这个请求须要至关长一段时间才能返回。客户端一直等待,直到数据返回,随后, 再进行其余任务的处理。 git
下图是future模式程序员
package com.test.version3future; 1. @author: zhangzeli 2. @date 20:35 2018/4/20 3. [博客版权](http://https://my.oschina.net/u/3703858/blog/1798696) 4. [项目地址 ](https://gitee.com/zhangzeli/java-concurrent) public class CommonCook { // 网购厨具线程 static class OnlineShopping extends Thread { private Chuju chuju; @Override public void run() { System.out.println("第一步:下单"); System.out.println("第一步:等待送货"); try { Thread.sleep(5000); // 模拟送货时间 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一步:快递送到"); chuju = new Chuju(); } } // 用厨具烹饪食材 static void cook(Chuju chuju, Shicai shicai) {} // 厨具类 static class Chuju {} // 食材类 static class Shicai {} public static void main(String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); // 第一步 网购厨具 OnlineShopping thread = new OnlineShopping(); thread.start(); thread.join(); // 保证厨具送到 // 第二步 去超市购买食材 Thread.sleep(2000); // 模拟购买食材时间 Shicai shicai = new Shicai(); System.out.println("第二步:食材到位"); // 第三步 用厨具烹饪食材 System.out.println("第三步:开始展示厨艺"); cook(thread.chuju, shicai); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); } }
运行结果:数据库
能够看到,多线程已经失去了意义。在厨具送到期间,咱们不能干任何事。对应代码,就是调用join方法阻塞主线程。设计模式
有人问了,不阻塞主线程行不行???数据结构
不行!!!多线程
从代码来看的话,run方法不执行完,属性chuju就没有被赋值,仍是null。换句话说,没有厨具,怎么作饭。Java如今的多线程机制,核心方法run是没有返回值的;若是要保存run方法里面的计算结果,必须等待run方法计算完,不管计算过程多么耗时。 面对这种尴尬的处境,程序员就会想:在子线程run方法计算的期间,能不能在主线程里面继续异步执行??? Where there is a will,there is a way!!! 这种想法的核心就是Future模式,下面先应用一下Java本身实现的Future模式。
并发
package com.test.version3future; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; 1. @author: zhangzeli 2. @date 20:35 2018/4/20 3. [博客版权](http://https://my.oschina.net/u/3703858/blog/1798696) 4. [项目地址 ](https://gitee.com/zhangzeli/java-concurrent) public class FutureCook { // 用厨具烹饪食材 static void cook(Chuju chuju, Shicai shicai) {} // 厨具类 static class Chuju {} // 食材类 static class Shicai {} public static void main(String[] args) throws InterruptedException, ExecutionException { long startTime = System.currentTimeMillis(); // 第一步 网购厨具 Callable<Chuju> onlineShopping = new Callable<Chuju>() { @Override public Chuju call() throws Exception { System.out.println("第一步:下单"); System.out.println("第一步:等待送货"); Thread.sleep(5000); // 模拟送货时间 System.out.println("第一步:快递送到"); return new Chuju(); } }; FutureTask<Chuju> task = new FutureTask<Chuju>(onlineShopping); new Thread(task).start(); // 第二步 去超市购买食材 Thread.sleep(2000); // 模拟购买食材时间 Shicai shicai = new Shicai(); System.out.println("第二步:食材到位"); // 第三步 用厨具烹饪食材 if (!task.isDone()) { // 联系快递员,询问是否到货 System.out.println("第三步:厨具还没到,心情好就等着(心情很差就调用cancel方法取消订单)"); } Chuju chuju = task.get(); System.out.println("第三步:厨具到位,开始展示厨艺"); cook(chuju, shicai); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms"); } }
运行结果:异步
1)把耗时的网购厨具逻辑,封装到了一个Callable的call方法里面。ide
public interface Future<V> { /**若是任务还没开始,执行cancel(...)方法将返回false; 若是任务已经启动,执行cancel(true)方法将以中断执行此任务线程的方式来试图中止任务,若是中止成功,返回true; 当任务已经启动,执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回false; 当任务已经完成,执行cancel(...)方法将返回false。mayInterruptRunning参数表示是否中断执行中的线程。 **/ boolean cancel(boolean mayInterruptIfRunning); //若是任务完成前被取消,则返回true。 boolean isCancelled(); //若是任务执行结束,不管是正常结束或是中途取消仍是发生异常,都返回true。 boolean isDone(); //获取异步执行的结果,若是没有结果可用,此方法会阻塞直到异步计算完成。 V get() throws InterruptedException, ExecutionException; //获取异步执行结果,若是没有结果可用,此方法会阻塞,可是会有时间限制,若是阻塞时间超 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; } // Callable接口能够看做是Runnable接口的补充,call方法带有返回值,而且能够抛出异常。
2)把Callable实例看成参数,生成一个FutureTask的对象,而后把这个对象看成一个Runnable,做为参数另起线程。
public class FutureTask<V> implements RunnableFuture<V>{} public interface RunnableFuture<V> extends Runnable, Future<V>{} public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } //这个继承体系中的核心接口是Future。Future的核心思想是:一个方法f, //计算过程可能很是耗时,等待f返回,显然不明智。能够在调用f的时候,立马返回一个Future, //能够经过Future这个数据结构去控制方法f的计算过程。
3)在第三步里面,调用了isDone方法查看状态,而后直接调用task.get方法获取厨具,不过这时还没送到,因此仍是会等待3秒。对比第一段代码的执行结果,这里咱们节省了2秒。这是由于在快递员送货期间,咱们去超市购买食材,这两件事在同一时间段内异步执行。
经过以上3步,咱们就完成了对Java原生Future模式最基本的应用。下面具体分析下FutureTask的实现,先看JDK8的,再比较一下JDK6的实现。 既然FutureTask也是一个Runnable,那就看看它的run方法
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 这里的callable是从构造方法里面传人的 if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); // 保存call方法抛出的异常 } if (ran) set(result); // 保存call方法的执行结果 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
先看try语句块里面的逻辑,发现run方法的主要逻辑就是运行Callable的call方法,而后将保存结果或者异常(用的一个属性result)。这里比较难想到的是,将call方法抛出的异常也保存起来了。
这里表示状态的属性state是个什么鬼
* Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
把FutureTask看做一个Future,那么它的做用就是控制Callable的call方法的执行过程,在执行的过程当中天然会有状态的转换:
1)一个FutureTask新建出来,state就是NEW状态;COMPETING和INTERRUPTING用的进行时,
表示瞬时状态,存在时间极短(为何要设立这种状态???不解);
NORMAL表明顺利完成;EXCEPTIONAL表明执行过程出现异常;CANCELED表明执行过程被取消;
INTERRUPTED被中断
2)执行过程顺利完成:NEW -> COMPLETING -> NORMAL
3)执行过程出现异常:NEW -> COMPLETING -> EXCEPTIONAL
4)执行过程被取消:NEW -> CANCELLED
5)执行过程当中,线程中断:NEW -> INTERRUPTING -> INTERRUPTED
再看看get方法的实现:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
get方法的逻辑很简单,若是call方法的执行过程已完成,就把结果给出去;若是未完成,就将当前线程挂起等待。awaitDone方法里面死循环的逻辑,推演几遍就能弄懂;它里面挂起线程的主要创新是定义了WaitNode类,来将多个等待线程组织成队列,这是与JDK6的实现最大的不一样。
最后来看看,Future模式衍生出来的更高级的应用。
再上一个场景:咱们本身写一个简单的数据库链接池,可以复用数据库链接,而且能在高并发状况下正常工做。
package test; import java.util.concurrent.ConcurrentHashMap; public class ConnectionPool { private ConcurrentHashMap<String, Connection> pool = new ConcurrentHashMap<String, Connection>(); public Connection getConnection(String key) { Connection conn = null; if (pool.containsKey(key)) { conn = pool.get(key); } else { conn = createConnection(); pool.putIfAbsent(key, conn); } return conn; } public Connection createConnection() { return new Connection(); } class Connection {} }
咱们用了ConcurrentHashMap,这样就没必要把getConnection方法置为synchronized(固然也能够用Lock),当多个线程同时调用getConnection方法时,性能大幅提高。
貌似很完美了,可是有可能致使多余链接的建立,推演一遍:
某一时刻,同时有3个线程进入getConnection方法,调用pool.containsKey(key)都返回false,而后3个线程各自都建立了链接。虽然ConcurrentHashMap的put方法只会加入其中一个,但仍是生成了2个多余的链接。若是是真正的数据库链接,那会形成极大的资源浪费。
因此,咱们如今的难点是:如何在多线程访问getConnection方法时,只执行一次createConnection。
结合以前Future模式的实现分析:当3个线程都要建立链接的时候,若是只有一个线程执行createConnection方法建立一个链接,其它2个线程只须要用这个链接就好了。再延伸,把createConnection方法放到一个Callable的call方法里面,而后生成FutureTask。咱们只须要让一个线程执行FutureTask的run方法,其它的线程只执行get方法就行了。
上代码:
package test; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class ConnectionPool { private ConcurrentHashMap<String, FutureTask<Connection>> pool = new ConcurrentHashMap<String, FutureTask<Connection>>(); public Connection getConnection(String key) throws InterruptedException, ExecutionException { FutureTask<Connection> connectionTask = pool.get(key); if (connectionTask != null) { return connectionTask.get(); } else { Callable<Connection> callable = new Callable<Connection>() { @Override public Connection call() throws Exception { return createConnection(); } }; FutureTask<Connection> newTask = new FutureTask<Connection>(callable); connectionTask = pool.putIfAbsent(key, newTask); if (connectionTask == null) { connectionTask = newTask; connectionTask.run(); } return connectionTask.get(); } } public Connection createConnection() { return new Connection(); } class Connection { } }
推演一遍:当3个线程同时进入else语句块时,各自都建立了一个FutureTask, 可是ConcurrentHashMap只会加入其中一个。第一个线程执行pool.putIfAbsent方法后返回null, 而后connectionTask被赋值,接着就执行run方法去建立链接,最后get。后面的线程执行pool.putIfAbsent方法不会返回null, 就只会执行get方法。在并发的环境下,经过FutureTask做为中间转换,成功实现了让某个方法只被一个线程执行。
下一篇 会讲到加强的future模式CompletableFuture