本文源地址: http://www.fullstackyang.com/...,转发请注明该地址或segmentfault地址,谢谢!
在网上已经有不少关于布隆过滤器的介绍了,这里就再也不赘述,下面简单地提炼几个要点:java
目前已经有相应实现的开源类库,如Google的Guava类库,Twitter的Algebird类库,和ScalaNLP breeze等等,其中Guava 11.0版本中增长了BloomFilter类,它使用了Funnel和Sink的设计,加强了泛化的能力,使其能够支持任何数据类型,其利用murmur3 hash来作哈希映射函数,不过它底层并无使用传统的java.util.BitSet来作bit数组,而是用long型数组进行了从新封装,大部分操做均基于位的运算,所以能达到一个很是好的性能;下面咱们就Guava类库中实现布隆过滤器的源码做详细分析,最后出于灵活性和解耦等因素的考虑,咱们想要把布隆过滤器从JVM中拿出来,因而利用了Redis自带的Bitmaps做为底层的bit数组进行重构,另外随着插入的元素愈来愈多,当实际数量远远大于建立时设置的预计数量时,布隆过滤器的误判率会愈来愈高,所以在重构的过程当中增长了自动扩容的特性,最后经过测试验证其正确性。python
Guava中,布隆过滤器的实现主要涉及到2个类,BloomFilter和BloomFilterStrategies,首先来看一下BloomFilter:git
/** The bit set of the BloomFilter (not necessarily power of 2!) */ private final BitArray bits; /** Number of hashes per element */ private final int numHashFunctions; /** The funnel to translate Ts to bytes */ private final Funnel<? super T> funnel; /** * The strategy we employ to map an element T to {@code numHashFunctions} bit indexes. */ private final Strategy strategy;
这是它的4个成员变量:github
interface Strategy extends java.io.Serializable { /** * Sets {@code numHashFunctions} bits of the given bit array, by hashing a user element. * * <p>Returns whether any bits changed as a result of this operation. */ <T> boolean put(T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits); /** * Queries {@code numHashFunctions} bits of the given bit array, by hashing a user element; * returns {@code true} if and only if all selected bits are set. */ <T> boolean mightContain( T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits); /** * Identifier used to encode this strategy, when marshalled as part of a BloomFilter. Only * values in the [-128, 127] range are valid for the compact serial form. Non-negative values * are reserved for enums defined in BloomFilterStrategies; negative values are reserved for any * custom, stateful strategy we may define (e.g. any kind of strategy that would depend on user * input). */ int ordinal(); }
对于建立布隆过滤器,BloomFilter并无公有的构造函数,只有一个私有构造函数,而对外它提供了5个重载的create方法,在缺省状况下误判率设定为3%,采用BloomFilterStrategies.MURMUR128_MITZ_64的实现。其中4个create方法最终都调用了同一个create方法,由它来负责调用私有构造函数,其源码以下:redis
static <T> BloomFilter<T> create( Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) { checkNotNull(funnel); checkArgument( expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions); checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp); checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp); checkNotNull(strategy); if (expectedInsertions == 0) { expectedInsertions = 1; } /* * TODO(user): Put a warning in the javadoc about tiny fpp values, since the resulting size * is proportional to -log(p), but there is not much of a point after all, e.g. * optimalM(1000, 0.0000000000000001) = 76680 which is less than 10kb. Who cares! */ long numBits = optimalNumOfBits(expectedInsertions, fpp); int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits); try { return new BloomFilter<T>(new BitArray(numBits), numHashFunctions, funnel, strategy); } catch (IllegalArgumentException e) { throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e); } }
在create中接受了4个参数,funnel(输入的数据),expectedInsertions(预计插入的元素总数),fpp(指望误判率),strategy(实现Strategy的实例),而后它计算了bit数组的长度以及哈希函数的个数(公式参考前文),最后用numBits建立了BitArray,并调用了构造函数完成赋值操做。算法
static long optimalNumOfBits(long n, double p) { if (p == 0) { p = Double.MIN_VALUE; } return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); } static int optimalNumOfHashFunctions(long n, long m) { // (m / n) * log(2), but avoid truncation due to division! return Math.max(1, (int) Math.round((double) m / n * Math.log(2))); }
接着再来看一下BloomFilterStrategies类,首先它是实现了BloomFilter.Strategy 接口的一个枚举类,其次它有两个2枚举值,MURMUR128_MITZ_32和MURMUR128_MITZ_64,分别对应了32位哈希映射函数,和64位哈希映射函数,后者使用了murmur3 hash生成的全部128位,具备更大的空间,不过原理是相通的,咱们选择默认的MURMUR128_MITZ_64来分析:数据库
MURMUR128_MITZ_64() { @Override public <T> boolean put( T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits) { long bitSize = bits.bitSize(); byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).getBytesInternal(); long hash1 = lowerEight(bytes); long hash2 = upperEight(bytes); boolean bitsChanged = false; long combinedHash = hash1; for (int i = 0; i < numHashFunctions; i++) { // Make the combined hash positive and indexable bitsChanged |= bits.set((combinedHash & Long.MAX_VALUE) % bitSize); combinedHash += hash2; } return bitsChanged; } @Override public <T> boolean mightContain( T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits) { long bitSize = bits.bitSize(); byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).getBytesInternal(); long hash1 = lowerEight(bytes); long hash2 = upperEight(bytes); long combinedHash = hash1; for (int i = 0; i < numHashFunctions; i++) { // Make the combined hash positive and indexable if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) { return false; } combinedHash += hash2; } return true; }
抽象来看,put是写,mightContain是读,两个方法的代码有一点类似,都是先利用murmur3 hash对输入的funnel计算获得128位的字节数组,而后高低分别取8个字节(64位)建立2个long型整数hash1,hash2做为哈希值。循环体内采用了2个函数模拟其余函数的思想,即上文提到的gi(x) = h1(x) + ih2(x) ,这至关于每次累加hash2,而后经过基于bitSize取模的方式在bit数组中索引。segmentfault
这里之因此要和Long.MAX_VALUE进行按位与的操做,是由于在除数和被除数符号不一致的状况下计算所得的结果是有差异的,在程序语言里,“%”准确来讲是取余运算(C,C++和Java均如此,python是取模),如-5%3=-2,而取模的数学定义是x
mod y=x-y[x/y](向下取整),因此-5 mod 3=
-5-3*(-2)=1,所以当哈希值为负数的时候,其取余的结果为负(bitSize始终为正数),这样就不方便在bit数组中取值,所以经过Long.MAX_VALUE(二进制为0111…1111),直接将开头的符号位去掉,从而转变为正数。固然也能够取绝对值,在另外一个MURMUR128_MITZ_32的实现中就是这么作的。
在put方法中,先是将索引位置上的二进制置为1,而后用bitsChanged记录插入结果,若是返回true代表没有重复插入成功,而mightContain方法则是将索引位置上的数值取出,并判断是否为0,只要其中出现一个0,那么当即判断为不存在。数组
最后再说一下底层bit数组的实现,主要代码以下:数据结构
static final class BitArray { final long[] data; long bitCount; BitArray(long bits) { this(new long[Ints.checkedCast(LongMath.divide(bits, 64, RoundingMode.CEILING))]); } // Used by serialization BitArray(long[] data) { checkArgument(data.length > 0, "data length is zero!"); this.data = data; long bitCount = 0; for (long value : data) { bitCount += Long.bitCount(value); } this.bitCount = bitCount; } /** Returns true if the bit changed value. */ boolean set(long index) { if (!get(index)) { data[(int) (index >>> 6)] |= (1L << index); bitCount++; return true; } return false; } boolean get(long index) { return (data[(int) (index >>> 6)] & (1L << index)) != 0; } /** Number of bits */ long bitSize() { return (long) data.length * Long.SIZE; } ... }
以前也提到了Guava没有使用java.util.BitSet,而是封装了一个long型的数组,另外还有一个long型整数,用来统计数组中已经占用(置为1)的数量,在第一个构造函数中,它把传入的long型整数按长度64分段(例如129分为3段),段数做为数组的长度,你能够想象成由若干个64位数组拼接成一个超长的数组,它的长度就是64乘以段数,即bitSize,在第二个构造函数中利用Long.bitCount方法来统计对应二进制编码中的1个数,这个方法在JDK1.5中就有了,其算法设计得很是精妙,有精力的同窗能够自行研究。
另外两个重要的方法是set和get,在get方法中,参考put和mightContain方法,传入的参数index是通过bitSize取模的,所以必定能落在这个超长数组的范围以内,为了获取index对应索引位置上的值,首先将其无符号右移6位,而且强制转换成int型,这至关于除以64向下取整的操做,也就是换算成段数,获得该段上的数值以后,又将1左移index位,最后进行按位与的操做,若是结果等于0,那么返回false,从而在mightContain中判断为不存在。在set方法中,首先调用了get方法判断是否已经存在,若是不存在,则用一样的逻辑取出data数组中对应索引位置的数值,而后按位或并赋值回去。
到这里,对Guava中布隆过滤器的实现就基本讨论完了,简单总结一下:
经过上面的分析,主要算法和逻辑的部分大致都是同样的,真正须要重构的部分是底层位数组的实现,在Guava中是封装了一个long型的数组,而对于redis来讲,自己自带了Bitmaps的“数据结构”(本质上仍是一个字符串),已经提供了位操做的接口,所以重构自己并不复杂,相对比较复杂的是,以前提到的实现自动扩容特性。
这里实现自动扩容的思想是,在redis中记录一个自增的游标cursor,若是当前key对应的Bitmaps已经达到饱和状态,则cursor自增,同时用其生成一个新的key,并建立规模同等的Bitmaps。而后在get的时候,须要判断该元素是否存在于任意一个Bitmaps中。因而整个的逻辑就变成,一个元素在每一个Bitmaps中都不存在时,才能插入当前cursor对应key的Bitmaps中。
下面是代码的实现部分。
首先,为了简化redis的操做,定义了2个函数式接口,分别执行单条命令和pipeline,另外还实现了一个简单的工具类
@FunctionalInterface public interface JedisExecutor<T> { T execute(Jedis jedis); } @FunctionalInterface public interface PipelineExecutor { void load(Pipeline pipeline); } public class JedisUtils { private static final GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); private JedisPool jedisPool; public JedisUtils() { jedisPool = new JedisPool(poolConfig, "localhost", 6379); } public <T> T execute(JedisExecutor<T> jedisExecutor) { try (Jedis jedis = jedisPool.getResource()) { return jedisExecutor.execute(jedis); } } public List<Object> pipeline(List<PipelineExecutor> pipelineExecutors) { try (Jedis jedis = jedisPool.getResource()) { Pipeline pipeline = jedis.pipelined(); for (PipelineExecutor executor : pipelineExecutors) executor.load(pipeline); return pipeline.syncAndReturnAll(); } } }
其次在Strategy中,对put和mightContain做了一点修改,其中被注释的部分是Guava中的实现。为了简化,这里咱们只接受String对象。
这里先把全部的随机函数对应的索引位置收集到一个数组中,而后交由底层的RedisBitmaps处理get或set,具体过程后面会详细说明。
bits.ensureCapacityInternal()方法,即表示自动扩容,这个函数名是从ArrayList中搬过来的。
@Override public boolean put(String string, int numHashFunctions, RedisBitmaps bits) { long bitSize = bits.bitSize(); byte[] bytes = Hashing.murmur3_128().hashString(string, Charsets.UTF_8).asBytes(); long hash1 = lowerEight(bytes); long hash2 = upperEight(bytes); boolean bitsChanged = false; long combinedHash = hash1; // for (int i = 0; i < numHashFunctions; i++) { // bitsChanged |= bits.set((combinedHash & Long.MAX_VALUE) % bitSize); // combinedHash += hash2; // } long[] offsets = new long[numHashFunctions]; for (int i = 0; i < numHashFunctions; i++) { offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize; combinedHash += hash2; } bitsChanged = bits.set(offsets); bits.ensureCapacityInternal();//自动扩容 return bitsChanged; } @Override public boolean mightContain(String object, int numHashFunctions, RedisBitmaps bits) { long bitSize = bits.bitSize(); byte[] bytes = Hashing.murmur3_128().hashString(object, Charsets.UTF_8).asBytes(); long hash1 = lowerEight(bytes); long hash2 = upperEight(bytes); long combinedHash = hash1; // for (int i = 0; i < numHashFunctions; i++) { // if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) { // return false; // } // combinedHash += hash2; // } // return true; long[] offsets = new long[numHashFunctions]; for (int i = 0; i < numHashFunctions; i++) { offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize; combinedHash += hash2; } return bits.get(offsets); }
最后,也是最重要的RedisBitmaps,其中bitSize用了Guava布隆过滤器中计算Long型数组长度的方法,获得bitSize以后使用setbit命令初始化一个所有为0的位数组。get(long offset)和set(long offset),这两个与Guava布隆过滤器中的逻辑相似,这里就再也不赘述了,而get(long[] offsets)方法中,全部的offset要与每个cursor对应的Bitmaps进行判断,若所有命中,那么这个元素就可能存在于该Bitmaps,反之若不能彻底命中,则表示该元素不存在于任何一个Bitmaps,因此当知足这个条件,在set(long[] offsets)方法中,就能够插入到当前key的Bitmaps中了。
在ensureCapacityInternal方法,须要扩容的判断条件是bitCount*2>bitSize,bitCount表示一个Bitmaps中“1”出现的个数,也就是当“1”出现的个数超过总数的一半时,进行扩容操做——首先使用incr命令对cursor自增,而后使用新的key建立一个新的Bitmaps。
RedisBitmapsJava class RedisBitmaps { private static final String BASE_KEY = "bloomfilter"; private static final String CURSOR = "cursor"; private JedisUtils jedisUtils; private long bitSize; RedisBitmaps(long bits) { this.jedisUtils = new JedisUtils(); this.bitSize = LongMath.divide(bits, 64, RoundingMode.CEILING) * Long.SIZE;//位数组的长度,至关于n个long的长度 if (bitCount() == 0) { jedisUtils.execute((jedis -> jedis.setbit(currentKey(), bitSize - 1, false))); } } boolean get(long[] offsets) { for (long i = 0; i < cursor() + 1; i++) { final long cursor = i; //只要有一个cursor对应的bitmap中,offsets所有命中,则表示可能存在 boolean match = Arrays.stream(offsets).boxed() .map(offset -> jedisUtils.execute(jedis -> jedis.getbit(genkey(cursor), offset))) .allMatch(b -> (Boolean) b); if (match) return true; } return false; } boolean get(final long offset) { return jedisUtils.execute(jedis -> jedis.getbit(currentKey(), offset)); } boolean set(long[] offsets) { if (cursor() > 0 && get(offsets)) { return false; } boolean bitsChanged = false; for (long offset : offsets) bitsChanged |= set(offset); return bitsChanged; } boolean set(long offset) { if (!get(offset)) { jedisUtils.execute(jedis -> jedis.setbit(currentKey(), offset, true)); return true; } return false; } long bitCount() { return jedisUtils.execute(jedis -> jedis.bitcount(currentKey())); } long bitSize() { return this.bitSize; } private String currentKey() { return genkey(cursor()); } private String genkey(long cursor) { return BASE_KEY + "-" + cursor; } private Long cursor() { String cursor = jedisUtils.execute(jedis -> jedis.get(CURSOR)); return cursor == null ? 0 : Longs.tryParse(cursor); } void ensureCapacityInternal() { if (bitCount() * 2 > bitSize()) grow(); } void grow() { Long cursor = jedisUtils.execute(jedis -> jedis.incr(CURSOR)); jedisUtils.execute((jedis -> jedis.setbit(genkey(cursor), bitSize - 1, false))); } void reset() { String[] keys = LongStream.range(0, cursor() + 1).boxed().map(this::genkey).toArray(String[]::new); jedisUtils.execute(jedis -> jedis.del(keys)); jedisUtils.execute(jedis -> jedis.set(CURSOR, "0")); jedisUtils.execute(jedis -> jedis.setbit(currentKey(), bitSize - 1, false)); } private PipelineExecutor apply(PipelineExecutor executor) { return executor; } }
下面咱们作一个单元测试来验证其正确性。
若是咱们插入的数量等于原预计总数,RedisBloomFilter扩容了1次,而两个布隆过滤器的结果一致,都为false,true,false。
若是插入的数量为原预计总数的3倍,RedisBloomFilter扩容了3次,而且仍判断正确,而Guava布隆过滤器则在判断str3时出现误判。
public class TestRedisBloomFilter { private static final int TOTAL = 10000; private static final double FPP = 0.0005; @Test public void test() { RedisBloomFilter redisBloomFilter = RedisBloomFilter.create(TOTAL, FPP); redisBloomFilter.resetBitmap(); BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), TOTAL, FPP); IntStream.range(0, /* 3* */TOTAL).boxed() .map(i -> Hashing.md5().hashInt(i).toString()) .collect(toList()).forEach(s -> { redisBloomFilter.put(s); bloomFilter.put(s); }); String str1 = Hashing.md5().hashInt(99999).toString(); String str2 = Hashing.md5().hashInt(9999).toString(); String str3 = "abcdefghijklmnopqrstuvwxyz123456"; System.out.println(redisBloomFilter.mightContain(str1) + ":" + bloomFilter.mightContain(str1)); System.out.println(redisBloomFilter.mightContain(str2) + ":" + bloomFilter.mightContain(str2)); System.out.println(redisBloomFilter.mightContain(str3) + ":" + bloomFilter.mightContain(str3)); } } >> grow bloomfilter-1 false:false true:true false:false >> grow bloomfilter-1 grow bloomfilter-2 grow bloomfilter-3 false:false true:true false:true
综上,本文利用了Guava布隆过滤器的思想,并结合Redis中的Bitmaps等特性实现了支持动态扩容的布隆过滤器,它将布隆过滤器底层的位数据装载到了Redis数据库中,这样的好处在于能够部署在更复杂的多应用或分布式系统中,还能够利用Redis完成持久化,定时过时等功能。