spark 布隆过滤器(bloomFilter)

        数据过滤在不少场景都会应用到,特别是在大数据环境下。在数据量很大的场景实现过滤或者全局去重,须要存储的数据量和计算代价是很是庞大的。不少小伙伴第一念头确定会想到布隆过滤器,有必定的精度损失,可是存储性能和计算性能能够达到几何级别的提高。不少第三方框架也实现了相应的功能,好比hbase框架实现的布隆过滤器性能是很是的棒,redis也能够实现相应的功能。这些须要借助于第三方框架,须要维护第三方框架。若是公司没有部署相应架构,单独为使用布隆过滤器部署一套集群,代价仍是很是大的。java

        咱们在作流式计算时须要实现数据小时级别去重和天级别数据去重,初始功能版本使用的是基于redis实现的布隆过滤器。性能也很是的好,三个节点的redis集群(三主三从,主从交叉策略)性能能够达到每秒十几万的处理性能。在后期的使用中主要瓶颈就在redis的吞吐量的性能上。一直想在这块作必定的性能优化。redis

        后来,发现spark官方封装了基于DataFrame的布隆过滤器,使用起来至关方便。性能再也不受制于第三方框架的吞吐量限制,依赖于spark的并行资源。能够减小架构设计的复杂度,提升可维护性。在流式计算应用中能够将布隆过滤器作成driver级别的全局变量,在batch结束更新布隆过滤器。若是考虑容错,能够将布隆过滤器数据按期持久化到磁盘(hdfs/redis)。
数组

       直接上代码,看一下使用方法性能优化

val bf = df.stat.bloomFilter("dd",dataLen,0.01)
    val rightNum = rdd.map(x=>(x.toInt,bf.mightContainString(x)))

        首先,在生成布隆过滤器直接调用bloomFilter(colName:String,expectedNumItems:Long,fpp:Double)就能够了,第一个参数是使用的数据列,第二个参数是数据量指望会有多少,第三个参数是损失精度。损失精度越低生成的布隆数组长度就会越长,占用的空间就会越多,计算过程就会越漫长。架构

        在用有些场景布隆过滤器还须要合并,官方也提供了相应的API
框架

                mergeInPlace(BloomFilter var1):BloomFilter工具

        断定数据是否存在,官方一共提供了四个方法:性能

                mightContain(Object var1),大数据

                mightContainString(String val1),优化

                mightContainLong(long var1),

                mightContainBinary(byte[] var1)

        不一样的方法适用于不一样的类型,bloomFilter(calname:String...)这个方法中使用列的数据类型必定要和以上四个方法对应,不然会出问题。

        官方还很贴心的提供了序列化和反序列化工具:writeTo和readFrom,能够很方便的将布隆过滤器序列化到磁盘和从磁盘加载布隆过滤器。