使用DelayQueue、ConcurrentHashMap、FutureTask实现的缓存工具类。java
DelayQueue是一个支持延时获取元素的无界阻塞队列。DelayQueue内部队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在建立元素时能够指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。缓存
DelayQueue很是有用,能够将DelayQueue运用在如下应用场景。多线程
import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; /** * @author WChao * @date 2018/06/21 */ public class CacheBean<V> { // 缓存计算的结果 private final static ConcurrentMap<String, Future<Object>> cache = new ConcurrentHashMap<>(); // 延迟队列来判断那些缓存过时 private final static DelayQueue<DelayedItem<String>> delayQueue = new DelayQueue<>(); // 缓存时间 private final int ms; static { // 定时清理过时缓存 Thread t = new Thread() { @Override public void run() { dameonCheckOverdueKey(); } }; t.setDaemon(true); t.start(); } private final Computable<V> c; /** * @param c Computable */ public CacheBean(Computable<V> c) { this(c, 60 * 1000); } /** * @param c Computable * @param ms 缓存多少毫秒 */ public CacheBean(Computable<V> c, int ms) { this.c = c; this.ms = ms; } public V compute(final String key) throws InterruptedException { while (true) { //根据key从缓存中获取值 Future<V> f = (Future<V>) cache.get(key); if (f == null) { Callable<V> eval = new Callable<V>() { public V call() { return (V) c.compute(key); } }; FutureTask<V> ft = new FutureTask<>(eval); //若是缓存中存在此能够,则返回已存在的value f = (Future<V>) cache.putIfAbsent(key, (Future<Object>) ft); if (f == null) { //向delayQueue中添加key,并设置该key的存活时间 delayQueue.put(new DelayedItem<>(key, ms)); f = ft; ft.run(); } } try { return f.get(); } catch (CancellationException e) { cache.remove(key, f); } catch (ExecutionException e) { e.printStackTrace(); } } } /** * 检查过时的key,从cache中删除 */ private static void dameonCheckOverdueKey() { DelayedItem<String> delayedItem; while (true) { try { delayedItem = delayQueue.take(); if (delayedItem != null) { cache.remove(delayedItem.getT()); System.out.println(System.nanoTime() + " remove " + delayedItem.getT() + " from cache"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } class DelayedItem<T> implements Delayed { private T t; private long liveTime; private long removeTime; public DelayedItem(T t, long liveTime) { this.setT(t); this.liveTime = liveTime; this.removeTime = TimeUnit.MILLISECONDS.convert(liveTime, TimeUnit.MILLISECONDS) + System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { if (o == null) return 1; if (o == this) return 0; if (o instanceof DelayedItem) { DelayedItem<T> tmpDelayedItem = (DelayedItem<T>) o; if (liveTime > tmpDelayedItem.liveTime) { return 1; } else if (liveTime == tmpDelayedItem.liveTime) { return 0; } else { return -1; } } long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); return diff > 0 ? 1 : diff == 0 ? 0 : -1; } @Override public long getDelay(TimeUnit unit) { return unit.convert(removeTime - System.currentTimeMillis(), unit); } public T getT() { return t; } public void setT(T t) { this.t = t; } @Override public int hashCode() { return t.hashCode(); } @Override public boolean equals(Object object) { if (object instanceof DelayedItem) { return object.hashCode() == hashCode() ? true : false; } return false; } }
/** * @author WChao * @date 2018/06/21 */ public interface Computable<V> { V compute(String k); }
/** * @author WChao * @date 2018/06/21 */ public class FutureTaskDemo { public static void main(String[] args) throws InterruptedException { // 子线程 Thread t = new Thread(() -> { CacheBean<String> cb = new CacheBean<>(k -> { try { System.out.println("模拟计算数据,计算时长2秒。key=" + k); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "你好:" + k; }, 5000); try { while (true) { System.out.println("thead2:" + cb.compute("b")); TimeUnit.SECONDS.sleep(1); } } catch (InterruptedException e) { e.printStackTrace(); } }); t.start(); // 主线程 while (true) { CacheBean<String> cb = new CacheBean<>(k -> { try { System.out.println("模拟计算数据,计算时长2秒。key=" + k); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "你好:" + k; }, 5000); System.out.println("thead1:" + cb.compute("b")); TimeUnit.SECONDS.sleep(1); } } }
执行结果:并发
两个线程同时访问同一个key的缓存。从执行结果发现,每次缓存失效后,同一个key只执行一次计算,而不是多个线程并发执行同一个计算而后缓存。ide