在咱们编程的过程当中会遇到一些在程序中须要重试使用的数据,在这种状况下咱们就能够考虑利用缓存(内存)的优点来提供程序访问这些数据的一个性能了。利用了缓存能够在必定程度上缓解很大的性能消耗:java
网络传输开销算法
数据序列化反序列话spring
数据库、文件系统数据访问慢数据库
缓存器是利用内存进行数据存储的,在存储容量上有必定的限制,因此咱们在咱们使用缓存的时候也分两种场景:编程
全量数据缓存数组
缓存热数据,这也是基于缓存容量的一个考虑缓存
好了本篇咱们就来聊聊写程序过程当中常能用到的本地缓存的方式。网络
缓存数据的存储格式通常都是以Key-Value的方式,那这里咱们主要来讨论下Map的实现ConcurrentHashMap实现的缓存。数据结构
String key = StringUtils.EMPTY; ConcurrentMap<String, String> localCache = new ConcurrentHashMap(); if(StringUtils.isEmpty(localCache.get(key))) { String value = queryFromDB(key); localCache.put(key,value); return value; } return localCache.get(key);
这样就能构造一个很是简单的缓存。app
注意:这个缓存仍是有很是多的问题
没有一个清除缓存的策略,最终全部被访问过得数据都会全量给缓存起来,直到显式清除。
同时缓存没命中的状况下须要应用显式去加载(queryFromDB
)。
好了主角要登场了,先简单介绍下这个cache的一些用法,这个cache比较好的解决了我上面提到经过Map用做缓存的两个缺陷。
LoadingCache<Key, Graph> graphs = CacheBuilder.newBuilder() .maximumSize(10000) .expireAfterWrite(10, TimeUnit.MINUTES) .removalListener(MY_LISTENER) .build( new CacheLoader<Key, Graph>() { public Graph load(Key key) throws AnyException { return createExpensiveGraph(key); } });
经过这种方式一个缓存就已经建立好了,上面定义的load函数在缓存中不存在key对应的value的时候会去执行将数据load放到缓存中。
其底层存储采用基于数组的java.util.concurrent.atomic.AtomicReferenceArray
进行缓存元素的存取。
先分析下load函数是怎么被执行的:graphs.getUnchecked(new Key());
从缓存中获取数据,若是没有进行put操做,首次get的时候缓存中没有其缓存值,这个时候必然要触发load函数进行value load了,那咱们就从get函数进行深刻分析(分析源码基于16.0.1)。
com.google.common.cache.LocalCache.Segment#get(K, int, com.google.common.cache.CacheLoader<? super K,V>) V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException { checkNotNull(key); checkNotNull(loader); try { if (count != 0) { // read-volatile // don't call getLiveEntry, which would ignore loading values ReferenceEntry<K, V> e = getEntry(key, hash); if (e != null) { long now = map.ticker.read(); V value = getLiveValue(e, now); if (value != null) { recordRead(e, now); statsCounter.recordHits(1); return scheduleRefresh(e, key, hash, value, now, loader); } ValueReference<K, V> valueReference = e.getValueReference(); if (valueReference.isLoading()) { return waitForLoadingValue(e, key, valueReference); } } } // at this point e is either null or expired; return lockedGetOrLoad(key, hash, loader); } catch (ExecutionException ee) { Throwable cause = ee.getCause(); if (cause instanceof Error) { throw new ExecutionError((Error) cause); } else if (cause instanceof RuntimeException) { throw new UncheckedExecutionException(cause); } throw ee; } finally { postReadCleanup(); } }
首次调用会执行lockedGetOrLoad函数
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException { ReferenceEntry<K, V> e; ValueReference<K, V> valueReference = null; LoadingValueReference<K, V> loadingValueReference = null; boolean createNewEntry = true; lock(); try { // re-read ticker once inside the lock long now = map.ticker.read(); preWriteCleanup(now); int newCount = this.count - 1; AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table; int index = hash & (table.length() - 1); ReferenceEntry<K, V> first = table.get(index); for (e = first; e != null; e = e.getNext()) { K entryKey = e.getKey(); if (e.getHash() == hash && entryKey != null && map.keyEquivalence.equivalent(key, entryKey)) { valueReference = e.getValueReference(); if (valueReference.isLoading()) { createNewEntry = false; } else { V value = valueReference.get(); if (value == null) { enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED); } else if (map.isExpired(e, now)) { // This is a duplicate check, as preWriteCleanup already purged expired // entries, but let's accomodate an incorrect expiration queue. enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED); } else { recordLockedRead(e, now); statsCounter.recordHits(1); // we were concurrent with loading; don't consider refresh return value; } // immediately reuse invalid entries writeQueue.remove(e); accessQueue.remove(e); this.count = newCount; // write-volatile } break; } } if (createNewEntry) { loadingValueReference = new LoadingValueReference<K, V>(); if (e == null) { e = newEntry(key, hash, first); e.setValueReference(loadingValueReference); table.set(index, e); } else { e.setValueReference(loadingValueReference); } } } finally { unlock(); postWriteCleanup(); } if (createNewEntry) { try { // Synchronizes on the entry to allow failing fast when a recursive load is // detected. This may be circumvented when an entry is copied, but will fail fast most // of the time. synchronized (e) { return loadSync(key, hash, loadingValueReference, loader); } } finally { statsCounter.recordMisses(1); } } else { // The entry already exists. Wait for loading. return waitForLoadingValue(e, key, valueReference); } }
最后调用loadSync(key, hash, loadingValueReference, loader);
进行进行数据load。
public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) { stopwatch.start(); V previousValue = oldValue.get(); try { if (previousValue == null) { V newValue = loader.load(key); return set(newValue) ? futureValue : Futures.immediateFuture(newValue); } ListenableFuture<V> newValue = loader.reload(key, previousValue); if (newValue == null) { return Futures.immediateFuture(null); } // To avoid a race, make sure the refreshed value is set into loadingValueReference // *before* returning newValue from the cache query. return Futures.transform(newValue, new Function<V, V>() { @Override public V apply(V newValue) { LoadingValueReference.this.set(newValue); return newValue; } }); } catch (Throwable t) { if (t instanceof InterruptedException) { Thread.currentThread().interrupt(); } return setException(t) ? futureValue : fullyFailedFuture(t); } }
执行loader.load
将数据load进缓存,可能你会想若是这个时候从DB或其余非内存存储中也没找到数据,这个时候LocalLoadingCache
是怎么处理的呢?其实在这种状况下只须要throw异常信息就好,这样LocalLoadingCache
会放弃缓存。
可是读源代码细心的你可能会发如今lockedGetOrLoad
中会先newEntry后面才load
if (createNewEntry) { loadingValueReference = new LoadingValueReference<K, V>(); if (e == null) { e = newEntry(key, hash, first); e.setValueReference(loadingValueReference); table.set(index, e); } else { e.setValueReference(loadingValueReference); } } finally { unlock(); postWriteCleanup(); } if (createNewEntry) { try { // Synchronizes on the entry to allow failing fast when a recursive load is // detected. This may be circumvented when an entry is copied, but will fail fast most // of the time. synchronized (e) { return loadSync(key, hash, loadingValueReference, loader); } } finally { statsCounter.recordMisses(1); } } else { // The entry already exists. Wait for loading. return waitForLoadingValue(e, key, valueReference); }
其实实现很简单他在cache到异常信息后又会对缓存中的entry进行remove操做,当时找这段异常被cache的代码也是找了好久时间了。
com.google.common.cache.LocalCache.Segment#getAndRecordStats V getAndRecordStats(K key, int hash, LoadingValueReference<K, V> loadingValueReference, ListenableFuture<V> newValue) throws ExecutionException { V value = null; try { value = getUninterruptibly(newValue); if (value == null) { throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + "."); } statsCounter.recordLoadSuccess(loadingValueReference.elapsedNanos()); storeLoadedValue(key, hash, loadingValueReference, value); return value; } finally { if (value == null) { statsCounter.recordLoadException(loadingValueReference.elapsedNanos()); removeLoadingValue(key, hash, loadingValueReference); } } }
执行removeLoadingValue将load异常后的key删除。
从用法那小结能够看到咱们在建立缓存的时候除了load还有一些其余特性以下:
maximumSize(10000) expireAfterWrite(10, TimeUnit.MINUTES)
这又是什么意思呢?这其实就是LocalLoadingCache
提供的缓存策略。
maximumSize(10000)
设置缓存能保存的最多元素数量。expireAfterWrite(10, TimeUnit.MINUTES)
设置元素在写后多久进行销毁。
其实还有maximumWeight、expireAfterAccess
两种元素过时策略。
maximumSize
是maximumWeight
的一种特殊形式,将全部的元素设置weight为1,也即就转化为能存储元素个数的上限值了。
expireAfterAccess
和expireAfterWrite
基本就一个意思,只是内部用了两种不一样的计数方式(经过不一样的queue进行管理,被访问/修改进行入队操做)进行访问、写操做的记录。
很少说让源码说话。
根据过时时间进行缓存的淘汰策略思路:在进行get/put操做完成后对队列(每次对缓存的操做头会被其记录下来)进行一次遍历,而后按照过时时间淘汰过时的元素。
根据元素个数上限进行清理的策略思路:在load新缓存值的时候比对下是否缓存容量(元素个数)已经达到上限,若是达到上限按照LRU算法进行淘汰元素。
过时时间淘汰策略
从分析load那小结咱们已经展现过get的代码,其中最后finally中有段postReadCleanup();
方法,深刻下去方法体就否则看出:
@GuardedBy("Segment.this") void expireEntries(long now) { drainRecencyQueue(); ReferenceEntry<K, V> e; while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) { if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) { throw new AssertionError(); } } }
进行过时key清除策略,从这段代码也能看出我为何说expireAfterAccess
和expireAfterWrite
基本就一个意思了吧。
其实还有一种清除缓存的策略:基于引用的回收可是还没研究清除不便多说,这个策略清除的时机和过时时间策略同样。
@GuardedBy("Segment.this") void drainReferenceQueues() { if (map.usesKeyReferences()) { drainKeyReferenceQueue(); } if (map.usesValueReferences()) { drainValueReferenceQueue(); } }
容量回收策略
在新key对应的value load完后须要将value存放到缓存中去,插入完成后会进行容量的check若是超过容量限制会执行淘汰策略。对应源码:
com.google.common.cache.LocalCache.Segment#storeLoadedValue boolean storeLoadedValue(K key, int hash, LoadingValueReference<K, V> oldValueReference, V newValue) { lock(); try { long now = map.ticker.read(); preWriteCleanup(now); int newCount = this.count + 1; if (newCount > this.threshold) { // ensure capacity expand(); newCount = this.count + 1; } AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table; int index = hash & (table.length() - 1); ReferenceEntry<K, V> first = table.get(index); for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) { K entryKey = e.getKey(); if (e.getHash() == hash && entryKey != null && map.keyEquivalence.equivalent(key, entryKey)) { ValueReference<K, V> valueReference = e.getValueReference(); V entryValue = valueReference.get(); // replace the old LoadingValueReference if it's live, otherwise // perform a putIfAbsent if (oldValueReference == valueReference || (entryValue == null && valueReference != UNSET)) { ++modCount; if (oldValueReference.isActive()) { RemovalCause cause = (entryValue == null) ? RemovalCause.COLLECTED : RemovalCause.REPLACED; enqueueNotification(key, hash, oldValueReference, cause); newCount--; } setValue(e, key, newValue, now); this.count = newCount; // write-volatile evictEntries(); return true; } // the loaded value was already clobbered valueReference = new WeightedStrongValueReference<K, V>(newValue, 0); enqueueNotification(key, hash, valueReference, RemovalCause.REPLACED); return false; } } ++modCount; ReferenceEntry<K, V> newEntry = newEntry(key, hash, first); setValue(newEntry, key, newValue, now); table.set(index, newEntry); this.count = newCount; // write-volatile evictEntries(); return true; } finally { unlock(); postWriteCleanup(); } }
上面的存储操做最终在进行setValue后会执行:
com.google.common.cache.LocalCache.Segment#evictEntries @GuardedBy("Segment.this") void evictEntries() { if (!map.evictsBySize()) { return; } drainRecencyQueue(); while (totalWeight > maxSegmentWeight) { ReferenceEntry<K, V> e = getNextEvictable(); if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) { throw new AssertionError(); } } } // TODO(fry): instead implement this with an eviction head ReferenceEntry<K, V> getNextEvictable() { for (ReferenceEntry<K, V> e : accessQueue) { int weight = e.getValueReference().getWeight(); if (weight > 0) { return e; } } throw new AssertionError(); }
这里最终会根据LRU从缓存中将最近没有使用过的元素进行剔除操做。
在LocalLoadingCache中提供了在元素被移除的时候供应用进行回调的函数,这个函数经过removalListener进行注册,当有元素从缓存中淘汰后就会触发其进行调用。
接着上面移除元素进行分析函数removeEntry
@GuardedBy("Segment.this") boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) { int newCount = this.count - 1; AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table; int index = hash & (table.length() - 1); ReferenceEntry<K, V> first = table.get(index); for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) { if (e == entry) { ++modCount; ReferenceEntry<K, V> newFirst = removeValueFromChain( first, e, e.getKey(), hash, e.getValueReference(), cause); newCount = this.count - 1; table.set(index, newFirst); this.count = newCount; // write-volatile return true; } } return false; }
最终会调用
@GuardedBy("Segment.this") void enqueueNotification(@Nullable K key, int hash, ValueReference<K, V> valueReference, RemovalCause cause) { totalWeight -= valueReference.getWeight(); if (cause.wasEvicted()) { statsCounter.recordEviction(); } if (map.removalNotificationQueue != DISCARDING_QUEUE) { V value = valueReference.get(); RemovalNotification<K, V> notification = new RemovalNotification<K, V>(key, value, cause); map.removalNotificationQueue.offer(notification); } }
将创建一个RemovalNotification队列进行保存删除元素。
在读/写完成后会进行通知
com.google.common.cache.LocalCache.Segment#postWriteCleanup /** * Performs routine cleanup following a write. */ void postWriteCleanup() { runUnlockedCleanup(); } void cleanUp() { long now = map.ticker.read(); runLockedCleanup(now); runUnlockedCleanup(); }
runUnlockedCleanup
源码会回调com.google.common.cache.RemovalListener#onRemoval
进行缓存元素删除后置处理。
void processPendingNotifications() { RemovalNotification<K, V> notification; while ((notification = removalNotificationQueue.poll()) != null) { try { removalListener.onRemoval(notification); } catch (Throwable e) { logger.log(Level.WARNING, "Exception thrown by removal listener", e); } } }
以为图不够清晰能够点击查看大图。
本篇也主要是对LocalLoadingCache从运用这个层次更向前走了一步,对咱们使用过程其逻辑背后的实现进行了必定深刻分析。我在初次看到这个方式也是很疑惑其底层究竟是如何实现的,因而有了这篇文章,经过源码进行跟踪分析其背后的实现逻辑。
后面还会分析org.springframework.cache.guava.GuavaCacheManager
如何将GuavaCache进行管理的,经过和spring更好的结合而消除显式调用cache get/put的方式。