在互联网项目中,通常以堆内缓存的使用居多,不管是Guava,Memcache,仍是JDK自带的HashMap,ConcurrentHashMap等,都是在堆内内存中作数据计算操做。这样作的好处显而易见,用户彻底没必要在乎数据的分配,溢出,回收等操做,所有交由JVM来进行处理。因为JVM提供了诸多的垃圾回收算法,能够保证在不影响甚至微影响系统的前提下,作到堆内内存接近完美的管控。君不见,小如图书管理这样的系统,大如整个电商交易平台,都在JVM的加持下,服务于几个,十几个,乃至于上亿用户,而在这些系统中,堆内缓存组件所带来的收益但是居功至伟。在自下而上的互联网架构中,堆内缓存就像把卫这宫廷入口的剑士,神圣而庄严,真可谓谁敢横刀立马,惟我堆内缓存将军。html
堆内缓存的劣势java
可是,事物都是有两面性的,堆内缓存在JVM的管理下,纵然无可挑剔,可是在GC过程当中产生的程序小停顿和程序大停顿,则像一把利剑同样,斩断了对构造出完美高并发系统的念想。简单的以HashMap这个JDK自带的缓存组件为例,benchmark结果以下:git
Benchmark Mode Cnt Score Error Units localCacheBenchmark.testlocalCacheSet thrpt 20 85056.759 ± 126702.544 ops/s
其插入速度最快为85056.759+126702.544=211759.303ops,最慢为0,也就是每秒插入速度最快为20w,最慢为0。之因此为0,是由于HashMap中的数据在快速的增加过程当中,引发了频繁的GC操做,为了给当前HashMap腾出足够的空间进行插入操做,不得不释放一些对象。频繁的GC,势必对插入速度有不小的影响,形成应用的偶尔性暂停。因此这也能解释为啥最慢的时候,ops为0了。 同时从benchmark数据,咱们能够看到偏差率为126702.544ops,比正常操做的85056.756要大不少,说明GC的影响,对HashMap的插入操做影响特别的大。github
因为GC的存在,堆内缓存操做的ops会受到不小的影响,会形成本来小流量下10ms可以完成的内存计算,大流量下500ms还未完成。若是内存计算过于庞杂,则形成总体流程的ops吞吐量下降,也是极有可能的事儿。因此从这里能够看出,堆内缓存组件,在高并发的压力下,若是计算量巨大,尤为是写操做巨大,使其不会成为护城的利剑,反而成了性能的帮凶,何其可惧。redis
堆外缓存的优点算法
为了缓解在高并发,高写入操做下,堆内缓存组件形成的频繁GC问题,堆外缓存应运而生。从前面的描述咱们知道,堆内缓存是受JVM管控的,因此咱们没必要担忧垃圾回收的问题。可是堆外缓存是不受JVM管控的,因此也不受GC的影响致使的应用暂停问题。可是因为堆外缓存的使用,是以byte数组来进行的,因此须要本身进行序列化反序列化操做。目前已知的知名开源项目中,netty4的buffer pool采用了堆外缓存实现,具体的比对信息能够参考此处,具体的比对信息截图以下:数据库
带有Direct字眼的即为offheap堆外Buffer,x轴为分配的内存大小,Y轴为耗时。从上面能够看出,小块内存分配,JVM要稍微优秀一点;可是大块内存分配,明显的堆外缓存要优秀一些。因为堆外Buffer操做不受GC影响,实际上性能更好一些。可是须要的垃圾回收管控也须要本身去作,要麻烦不少。数组
堆外缓存实现原理缓存
说到堆外缓存实现原理,不可不提到sun.misc.Unsafe这个package包。此包提供了底层的Unsafe操做方法,让咱们能够直接在堆外内存作数据分配操做。因为是底层包,因此用户层面不多用到,只是一些jdk里面的核心类库会用到。其实例的初始化方式以下:安全
public static Unsafe getUnsafe() { Class cc = sun.reflect.Reflection.getCallerClass(2); if (cc.getClassLoader() != null) throw new SecurityException("Unsafe"); return theUnsafe; }
能够看出是一个单例模式。让咱们来尝试使用一下(下面代码是先分配了一个100bytes的空间,获得分配好的地址,而后在此地址里面放入1,最后将此地址里面的数据取出,打印出来):
long address = unsafe.allocateMemory(100); unsafe.putLong(address,1); System.out.println(unsafe.getLong(address));
可是在运行的过程当中,咱们却遇到了以下的错误:
java.lang.SecurityException: Unsafe at sun.misc.Unsafe.getUnsafe(Unsafe.java:90) at UnsafeTest.testUnsafe(UnsafeTest.java:18) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) ....... Process finished with exit code -1
能够看出,因为安全性的缘由,咱们是没法直接使用Unsafe的实例来进行数据操做的,主要缘由是由于cc.getClassLoader()对theUnsafe实例作了过滤限制。可是咱们能够直接用theUnsafe来实现,因为是private修饰,咱们能够用反射来将private修饰改为public修饰,让其暴露出来供咱们使用:
Field f = Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); Unsafe unsafe = (Unsafe) f.get(null); long address = unsafe.allocateMemory(100); unsafe.putLong(address,1); System.out.println(unsafe.getLong(address));
这样就能够了,可以正确的获取运行结果。从这里咱们能够看出,堆外内存必须本身分配地址空间,那么对应的,本身须要控制好地址边界,若是控制很差,经典的OOM Exception将会出现。这也是比堆内内存使用麻烦的地方。
上面的代码展现,其实已经说明了Unsafe方法的基本使用方式。若是想查看更多的Unsafe实现方式,我的推荐能够看看Cassandra源码中的中的Object mapper - Caffinitas里面关于Unsafe的实现。此类的名称为Uns.java,因为类精简,我的认为很值得一看,我贴出部分代码来:
static { try { Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); unsafe = (Unsafe) field.get(null); if (unsafe.addressSize() > 8) throw new RuntimeException("Address size " + unsafe.addressSize() + " not supported yet (max 8 bytes)"); if (__DEBUG_OFF_HEAP_MEMORY_ACCESS) LOGGER.warn("Degraded performance due to off-heap memory allocations and access guarded by debug code enabled via system property " + OHCacheBuilder.SYSTEM_PROPERTY_PREFIX + "debugOffHeapAccess=true"); IAllocator alloc; String allocType = __ALLOCATOR != null ? __ALLOCATOR : "jna"; switch (allocType) { case "unsafe": alloc = new UnsafeAllocator(); LOGGER.info("OHC using sun.misc.Unsafe memory allocation"); break; case "jna": default: alloc = new JNANativeAllocator(); LOGGER.info("OHC using JNA OS native malloc/free"); } allocator = alloc; } catch (Exception e) { throw new AssertionError(e); } } 。。。。。。 static long getLongFromByteArray(byte[] array, int offset) { if (offset < 0 || offset + 8 > array.length) throw new ArrayIndexOutOfBoundsException(); return unsafe.getLong(array, (long) Unsafe.ARRAY_BYTE_BASE_OFFSET + offset); } static int getIntFromByteArray(byte[] array, int offset) { if (offset < 0 || offset + 4 > array.length) throw new ArrayIndexOutOfBoundsException(); return unsafe.getInt(array, (long) Unsafe.ARRAY_BYTE_BASE_OFFSET + offset); } static short getShortFromByteArray(byte[] array, int offset) { if (offset < 0 || offset + 2 > array.length) throw new ArrayIndexOutOfBoundsException(); return unsafe.getShort(array, (long) Unsafe.ARRAY_BYTE_BASE_OFFSET + offset); }
堆外缓存实现进阶
写到这里,原理什么的大概都懂了,咱们准备进阶一下,写个基于Off-heap堆外缓存的Int数组,因为On-heap Array的空间请求分配到了堆上,因此这里天然而然的就把空间分配到了堆外。代码以下:
public class OffheapIntArray { /** * 此list分配的地址 */ private long address; /** * 默认分配空间大小 */ private static final int defaultSize = 1024; /** * 带参构造 * 因为Integer类型在java中占用4个字节,因此在分配地址的时候,一个integer,须要分配 4*8 = 32 bytes的空间 * @param size * @throws NoSuchFieldException * @throws IllegalAccessException */ public OffheapIntArray(Integer size) throws NoSuchFieldException, IllegalAccessException { if (size == null) { address = alloc(defaultSize * 4 * 8); } else { address = alloc(size * 4 * 8); } } public int get(int index) throws NoSuchFieldException, IllegalAccessException { return getUnsafe().getInt(address + index * 4 * 8); } public void set(int index, int value) throws NoSuchFieldException, IllegalAccessException { getUnsafe().putInt(address + index * 4 * 8, value); } private Unsafe getUnsafe() throws IllegalAccessException, NoSuchFieldException { Field f = Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); return (Unsafe) f.get(null); } private long alloc(int size) throws NoSuchFieldException, IllegalAccessException { long address = getUnsafe().allocateMemory(size); return address; } public void free() throws NoSuchFieldException, IllegalAccessException { if (address == 0) { return; } getUnsafe().freeMemory(address); } }
咱们来简单的测试一下:
@Test public void testOffheap() throws NoSuchFieldException, IllegalAccessException { OffheapIntArray offheapArray = new OffheapIntArray(10); offheapArray.set(0,11111); offheapArray.set(1,1112); offheapArray.set(2,1113); offheapArray.set(3,1114); System.out.println(offheapArray.get(0)); System.out.println(offheapArray.get(1)); System.out.println(offheapArray.get(2)); System.out.println(offheapArray.get(3)); offheapArray.free(); }
输出结果以下:
11111 1112 1113 1114
能够看到获得了正确的输出结果。固然我这里只是简单的模拟使用。具体的使用方式,推荐以下两篇文章,能够对堆外内存的使用有更近一步的认识:
Java Magic. Part 4: sun.misc.Unsafe
堆外缓存组件实战
知道了堆外缓存的简单使用后,这里咱们要更近一步,使用现有的堆外缓存组件到项目中。
目前在市面上,有诸多的缓存组件,好比mapdb,ohc,ehcache3等,可是因为ehcache3收费,因此这里不作讨论,主要讨论mapdb和ohc这两个。咱们先经过benchmark来筛选一下两者的性能差别,因为这两个缓存组件提供的都是基于key-value模型的数据存储,因此benchmark的指标有9个,分别是get,set方法,hget,hset方法(value存储的是hashmap),sadd,smember方法(value存储的是set),zadd,zrange方法(value存储的是treeset)。
benchmark结果以下:
Benchmark Mode Cnt Score Error Units OffheapCacheBenchmark.testMapdbGet thrpt 20 69699.610 ± 4578.888 ops/s OffheapCacheBenchmark.testMapdbHGet thrpt 20 63663.523 ± 3595.413 ops/s OffheapCacheBenchmark.testMapdbHGetAll thrpt 20 64235.582 ± 4009.039 ops/s OffheapCacheBenchmark.testMapdbHSet thrpt 20 25777.077 ± 480.461 ops/s OffheapCacheBenchmark.testMapdbSAdd thrpt 20 335.973 ± 39.353 ops/s OffheapCacheBenchmark.testMapdbSet thrpt 20 39417.070 ± 830.689 ops/s OffheapCacheBenchmark.testMapdbSmember thrpt 20 67432.314 ± 2799.983 ops/s OffheapCacheBenchmark.testMapdbZAdd thrpt 20 21220.595 ± 1128.103 ops/s OffheapCacheBenchmark.testMapdbZRange thrpt 20 45425.162 ± 4533.071 ops/s Benchmark Mode Cnt Score Error Units OhcheapOHCBenchmark.testOhcGet thrpt 20 1196976.452 ± 27291.669 ops/s OhcheapOHCBenchmark.testOhcHGet thrpt 20 348383.355 ± 23304.696 ops/s OhcheapOHCBenchmark.testOhcHGetAll thrpt 20 350798.417 ± 11870.685 ops/s OhcheapOHCBenchmark.testOhcHSet thrpt 20 349370.322 ± 8619.813 ops/s OhcheapOHCBenchmark.testOhcSAdd thrpt 20 11700.160 ± 611.794 ops/s OhcheapOHCBenchmark.testOhcSet thrpt 20 538314.544 ± 132111.037 ops/s OhcheapOHCBenchmark.testOhcSmember thrpt 20 458817.772 ± 15817.159 ops/s OhcheapOHCBenchmark.testOhcZAdd thrpt 20 323979.906 ± 9842.344 ops/s OhcheapOHCBenchmark.testOhcZRange thrpt 20 192776.479 ± 12988.484 ops/s
从上面的结果能够看出,ohc属于性能怪兽类型,性能十倍于mapdb。并且因为ohc自己支持entry过时,可是mapdb不支持。因此这里综合一下,选择ohc做为咱们的堆外缓存组件。须要说明一下的是,在我进行benchmark测试过程当中,堆外缓存中会进行大量的数据读写操做,可是这些读写ops总体很是平稳,从error和score的对比就能够看出。不会出现应用暂停的状况。说明GC对堆外缓存的影响是很是小的。
总体类结构图以下(考虑到扩展性,暂时将mapdb加入到告终构图中):
从总体的类组织结构图看来,使用了策略模式+模板模式组合的方式来进行。 屏蔽不一样cache底层接口的不一致性,用的是策略模式;为不一样的堆外缓存组件提供一致的操做方法用的是模板模式。组合起来使用就使得开发和扩展显得很是容易。
部分类的封装方式以下:
public class OhcCacheStrategy implements CacheStrategy { /** * 日志 */ private static Logger logger = LoggerFactory.getLogger(OhcCacheStrategy.class); /** * 缓存组件 */ public OHCache<byte[], byte[]> dataCache; /** * 过时时间组件 */ public OHCache<byte[], byte[]> expireCache; /** * 缓存table最大容量 */ private long level2cacheMax = 1024000L; /** * 锁 */ private final Object lock = new Object(); /** * 键过时回调 */ public ExpirekeyAction expirekeyAction; /** * db引擎初始化 */ @PostConstruct public void initOhcEngine() { try { dataCache = OHCacheBuilder.<byte[], byte[]>newBuilder() .keySerializer(new OhcSerializer()) .valueSerializer(new OhcSerializer()) .segmentCount(2 * 4) .hashTableSize((int) level2cacheMax / 102400) .capacity(2 * 1024 * 1024 * 1024L) .defaultTTLmillis(OffheapCacheConst.EXPIRE_DEFAULT_SECONDS * 1000) .timeouts(true) .timeoutsSlots(64) .timeoutsPrecision(512) .eviction(Eviction.LRU) .build(); logger.error("ohc data cache init ok..."); expireCache = OHCacheBuilder.<byte[], byte[]>newBuilder() .keySerializer(new OhcSerializer()) .valueSerializer(new OhcSerializer()) .segmentCount(1) .hashTableSize((int) level2cacheMax / 102400) .capacity(2 * 1024 * 1024 * 1024L) .defaultTTLmillis(OffheapCacheConst.EXPIRE_DEFAULT_SECONDS * 1000) .timeouts(true) .timeoutsSlots(64) .timeoutsPrecision(512) .eviction(Eviction.NONE) .build(); logger.error("ohc expire cache init ok..."); } catch (Exception ex) { logger.error(OffheapCacheConst.PACKAGE_CONTAINER_OHC + OffheapCacheConst.ENGINE_INIT_FAIL, ex); AlarmUtil.alarm(OffheapCacheConst.PACKAGE_CONTAINER_OHC + OffheapCacheConst.ENGINE_INIT_FAIL, ex.getMessage()); throw ex; } } @Override public <T> boolean putEntry(String key, T entry, long expireAt) { synchronized (lock) { byte[] entryKey = SerializationUtils.serialize(key); byte[] entryVal = SerializationUtils.serialize((Serializable) entry); //缓存数据入库 if (dataCache.put(entryKey, entryVal, expireAt)) { //过时时间入库 putExpire(key, expireAt); //返回执行结果 return true; } return false; } } @Override public <T> T queryEntry(String key) { byte[] result = dataCache.get(SerializationUtils.serialize(key)); if (result == null) { return null; } return SerializationUtils.deserialize(result); } @Override public long queryExpireTime(String key) { byte[] entryKey = SerializationUtils.serialize(key); return expireCache.get(entryKey) == null ? 0 : SerializationUtils.deserialize(expireCache.get(entryKey)); } @Override public boolean removeEntry(String key) { byte[] entryKey = SerializationUtils.serialize(key); if (dataCache.remove(entryKey)) { removeExpire(key); return true; } return false; } @Override public boolean removeAll() { Iterable<byte[]> dataKey = () -> dataCache.keyIterator(); dataCache.removeAll(dataKey); Iterable<byte[]> expireKey = () -> expireCache.keyIterator(); expireCache.removeAll(expireKey); return true; } @Override public List<String> queryKeys() { List<String> list = new ArrayList<>(); Iterator<byte[]> iterator = expireCache.keyIterator(); while (iterator.hasNext()) { list.add(SerializationUtils.deserialize(iterator.next())); } return list; } /** * key过时时间同步入库 * * @param key * @param expireAt */ private void putExpire(String key, long expireAt) { try { expireCache.put(SerializationUtils.serialize(key), SerializationUtils.serialize(expireAt)); } catch (Exception ex) { logger.error("key[" + key + "]过时时间入库失败..."); } } /** * 同步清理过时键 * * @param key */ private void removeExpire(String key) { try { if (expireCache.remove(SerializationUtils.serialize(key))) { if (expirekeyAction != null) { expirekeyAction.keyExpiredNotification(key); } } } catch (Exception ex) { logger.error("key[" + key + "]过时时间清除失败..."); } } }
上面这个类是堆外缓存的核心策略类。全部其余的数据模型读写操做均可以依据此类来扩展,好比相似redis的sortedset,value能够存储一个Treeset便可。须要说明一下,上面代码中,dataCache主要用于存储数据部分,expireCache主要用于存储键过时时间。以便于能够实现键主动过时和被动过时功能。用户添加删除键的时候,会同步删除expireCache中的键,以便于两者可以统一。因为ohc自己并未实现keyExpireCallback,因此这里我实现了这个功能,只要有键被移除(主动删除仍是被动删除,都会触发通知),就会通知用户,用户能够按照以下方式使用:
@PostConstruct public void Init() { ohcCacheTemplate.registerExpireKeyAction(key -> { logger.error("key " + key + " expired..."); }); }
键被动过时功能,模仿了redis的键被动驱逐方式,实现以下:
public class OffheapCacheWorker { /** * 带参注入 * * @param cacheStrategy */ public OffheapCacheWorker(CacheStrategy cacheStrategy) { this.cacheStrategy = cacheStrategy; this.offheapCacheHelper = new OffheapCacheHelper(); } /** * 日志 */ private static Logger logger = LoggerFactory.getLogger(OffheapCacheWorker.class); /** * 缓存帮助类 */ private OffheapCacheHelper offheapCacheHelper; /** * 缓存构建器 */ private CacheStrategy cacheStrategy; /** * 过时key检测线程 */ private Thread expireCheckThread; /** * 线程状态 */ private volatile boolean started; /** * 线程开启 * * @throws IOException */ public synchronized void start() { if (started) { return; } expireCheckThread = new Thread("expire key check thread") { @Override public void run() { logger.error("expire key check thread start..."); while (!Thread.currentThread().isInterrupted()) { try { processLoop(); } catch (RuntimeException suppress) { logger.error("Thread `" + getName() + "` occured a error, suppressed.", suppress); throw suppress; } catch (Exception exception) { logger.error("Thread `" + getName() + "` occured a error, exception.", exception); } } logger.info("Thread `{}` was stopped normally.", getName()); } }; expireCheckThread.start(); started = true; } /** * 线程中止 * * @throws IOException */ public synchronized void stop() throws IOException { started = false; if (expireCheckThread != null) { expireCheckThread.interrupt(); } } /** * 过时键驱逐 * 模仿的redis键过时机制
*/ private void processLoop() throws InterruptedException { //每次采集样本数 int sampleCheckNumber = 20; //过时key计数 int sampleExpiredCount = 0; //抽样次数迭代 int sampleCheckIteration = 0; //缓存的key List<String> keys = cacheStrategy.queryKeys(); //抽样开始时间 long start = System.currentTimeMillis(); //循环开始 do { //键数量 long expireContainerSize = keys.size(); //默认为键数量 long loopCheckNumber = expireContainerSize; //每次检查的键数量,若是超过样本数,则以样本数为准 if (loopCheckNumber > sampleCheckNumber) { loopCheckNumber = sampleCheckNumber; } //开始检测 while (loopCheckNumber-- > 0) { //取随机下标 int rndNum = offheapCacheHelper.getRandomNumber(toIntExact(expireContainerSize) + 1); //取随机键 String rndKey = keys.get(rndNum); //获取过时时间 long expireTime = cacheStrategy.queryExpireTime(rndKey); //过时时间比对 if (expireTime <= System.currentTimeMillis()) { //键驱逐 boolean result = cacheStrategy.removeEntry(rndKey); if (result) { expireContainerSize--; sampleExpiredCount++; } } } //抽样次数递增 sampleCheckIteration++; //抽样达到16次(16的倍数,&0xf都为0)且本批次耗时超过0.5秒,将退出,避免阻塞正常业务操做 if ((sampleCheckIteration % 16) == 0 && (System.currentTimeMillis() - start) > 300) { logger.error("清理数据库过时键操做耗时过长,退出,预备从新开始..."); return; } } while (sampleExpiredCount > sampleCheckNumber / 4); Thread.sleep(1500); } }
键被动驱逐,会随机抽取20个key检测,若是过时键小于5个,则直接进行下一次抽样。不然将进行键驱逐操做。一旦抽样次数达到限定次数且键驱逐耗时过长,为了避免影响业务,将会退出本次循环,继续下一次循环操做。此worker在后台运行,实测6W个过时key一块儿过时,cpu占用控制在10%,60w个过时key基本上一块儿过时,cpu占用控制在60%左右。达到预期效果。在大量的读写操做过程当中,能够看到堆内内存几乎没有变化。
写到最后,上面就是此次我要介绍的堆外缓存的总体内容了,从Unsafe讲到原理,从实现讲到ohc,但愿你们可以提出更好的东西来,多谢。