缓存做为计算机历史上最重要的发明之一,对计算机历史起到了举足轻重的做用,由于缓存能够协调两个速度不一致的组件之间的并行运做。内存做为CPU和非易失性存储介质之间的缓存,避免CPU每次读取指令,读取数据都去速度缓慢的硬盘读取。快速缓存做为内存和CPU之间的缓存进一步提升了CPU的效率,如今大部分CPU都支持指令预取,CPU会预测后面要执行的指令预取到快速缓存中。而咱们平时也直接或间接地会用到缓存技术,那若是要本身实现一个线程安全的缓存,要注意哪些问题呢?咱们一步步来探讨这个问题。缓存
假设咱们提供一个服务,客户端提供一个字符串,咱们返回一个对应的Long数值(固然这只是为了演示方便举的简单例子),为了提升效率,咱们不但愿每次都重复计算,所以咱们把计算结果保存在一个缓存里,若是下次有相同的请求过来就直接返回缓存中的数据。安全
首先咱们把计算任务抽象成Compute接口:并发
public interface Compute<A,V> { V compute(A args); }
一个不使用缓存计算的类:异步
public class NoCache implements Compute<String, Long> { @Override public Long compute(String args) { // TODO Auto-generated method stub return Long.valueOf(args); } }
这样每次都要重复计算,效率比较低,所以咱们引入了一个Map来保存计算结果:ide
public class BasicCache1<A,V> implements Compute<A, V> { private final Map<A, V> cache=new HashMap<>(); private final Compute<A, V> c; public BasicCache1(Compute<A, V> c) { this.c=c; } @Override public synchronized V compute(A args) throws Exception { V ans=cache.get(args); if(ans==null) { ans=c.compute(args); cache.put(args, ans); } return ans; } }
这里由于HashMap不是线程安全的,所以计算方法被写成了同步方法,这样的话,每次请求最后实际都是串行执行的,大大下降了系统的吞吐量。就像下面这幅图表示的:this
既然这样,咱们就改用ConcurrentHashMap试试:spa
public class BasicCache2<A,V> implements Compute<A, V> { private final Map<A,V> cache=new ConcurrentHashMap<>(); private final Compute<A, V> c; public BasicCache2(Compute<A, V> c) { this.c=c; } @Override public V compute(A args) throws Exception { V ans=cache.get(args); if(ans==null) { ans=c.compute(args); cache.put(args, ans); } return ans; } }
这里没有同步compute操做,所以系统能够并发地执行请求,可是假如多个有相同参数的请求短期内前后到达,这个时候先到达的请求还没来得及把结果写入缓存(由于计算耗时),后来的请求就会重复计算,下降了缓存的效率。一图胜前言:线程
所以咱们就想,能不能先更新缓存再去计算呢,这样不就能够消除了重复计算吗?听起来不错,但是咱们如何在更新了缓存后获取计算结果呢(由于这时计算尚未完成)?这里就要用到JDK提供的Future和Callable接口,Callable接口和Runnable接口同样,是线程任务的抽象接口,不一样的是Callable的call方法能够返回一个Future对象,而Future对象的get方法会阻塞等待任务执行结果。既然有这么好的基础设施,那咱们赶忙开撸:code
public class BasicCache3<A,V> implements Compute<A, V> { private final Map<A, Future<V>> cache=new ConcurrentHashMap(); private final Compute<A, V> c; private ExecutorService executors=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()+1); public BasicCache3(Compute<A, V> c) { this.c=c; } @Override public V compute(final A args) throws Exception { Future<V> ans=cache.get(args); if(ans==null) { Callable<V> computeTask=new Callable<V>() { @Override public V call() throws Exception { return c.compute(args); } }; ans= executors.submit(computeTask); cache.put(args, ans); } return ans.get(); } }
上面这段代码里把计算任务提交到线程池去执行,返回了一个结果句柄供后面获取计算结果。但是仔细观察后,咱们发现彷佛仍是有点不对,这样彷佛减少了重复计算的几率,可是其实只是减少了发生的窗口,由于判断是否在缓存中和put到缓存中两个操做虽然单独都是线程安全的,可是仍是会发生先到达的请求还没来得及put到缓存的状况,而其本质缘由就是先检查再插入这样的复杂操做不是原子操做,就比如++这个操做,CPU须要先取原值,再操做加数,最后写回原值也会出现后一次写入覆盖前一次的状况,本质都是由于复杂操做的非原子性。下图演示了这种状况:对象
所以JDK中的ConcurrentMap接口提供了putIfAbsent的原子操做方法,但是若是咱们像前一个例子中同样先获取计算结果的Future句柄,即使是咱们不会重复更新缓存,计算任务仍是会执行,依然没达到缓存的效果,所以咱们须要一个可以在任务还没启动就能够获取结果句柄,同时可以自由控制任务的启动、中止的东西。固然JDK里也有这样的东西(这里插一句,JDK的concurrent包的代码基本都是Doug Lea写的,老头子代码写的太牛逼了),就是FutureTask,既然如此,赶忙开撸:
public class BasicCache4<A,V> implements Compute<A, V> { private final ConcurrentMap<A, Future<V>> cache=new ConcurrentHashMap<>(); private final Compute<A, V> c; public BasicCache4(Compute<A, V> c) { this.c=c; } @Override public V compute(final A args) throws Exception { Future<V> ans=cache.get(args); if(ans==null) { Callable<V> computeTask=new Callable<V>() { @Override public V call() throws Exception { return c.compute(args); } }; FutureTask<V> ft=new FutureTask<V>(computeTask); ans=cache.putIfAbsent(args, ft); if(ans==null)// { ans=ft; ft.run(); } } return ans.get(); } }
上面这段代码中,咱们建立了一个FutureTask任务,可是并无当即执行这个异步任务,而是先调用ConcurrentHashMap的putIfAbsent方法来尝试把结果句柄更新到缓存中去,这个方法的返回值是Map中的旧值,所以若是返回的是null,也就是说原来缓存中不存在,那咱们就启动异步计算任务,而若是缓存中已经存在的话,咱们就直接调用缓存中的Future对象的get方法获取计算结果,若是其余请求中的计算任务尚未执行完毕的话,get方法会阻塞直到计算完成。实际运行效果见下图:
至此,咱们算是构建了一个有效线程安全的缓存了,固然这个版本其实仍是会有不少问题,好比若是异步计算任务被取消的话,咱们应该循环重试,可是一方面咱们为了简单只考虑了正常状况,另外一方面FutureTask是局部变量,在线程栈层面已经保证了其余线程或代码没法拿到该对象。最后用一张xmind图做为总结:
参考资料:《Java Concurrency in Practice》