Scrapy爬虫去重效率优化之Bloom Filter的算法的对接

首先回顾一下Scrapy-Redis的去重机制。Scrapy-Redis将Request的指纹存储到了Redis集合中,每一个指纹的长度为40,例如27adcc2e8979cdee0c9cecbbe8bf8ff51edefb61就是一个指纹,它的每一位都是16进制数。
python

咱们计算一下用这种方式耗费的存储空间。每一个十六进制数占用4 b,1个指纹用40个十六进制数表示,占用空间为20 B,1万个指纹即占用空间200 KB,1亿个指纹占用2 GB。当爬取数量达到上亿级别时,Redis的占用的内存就会变得很大,并且这仅仅是指纹的存储。Redis还存储了爬取队列,内存占用会进一步提升,更别说有多个Scrapy项目同时爬取的状况了。当爬取达到亿级别规模时,Scrapy-Redis提供的集合去重已经不能知足咱们的要求。因此咱们须要使用一个更加节省内存的去重算法Bloom Filter。git

1. 了解Bloom Filter

Bloom Filter,中文名称叫做布隆过滤器,是1970年由Bloom提出的,它能够被用来检测一个元素是否在一个集合中。Bloom Filter的空间利用效率很高,使用它能够大大节省存储空间。Bloom Filter使用位数组表示一个待检测集合,并能够快速地经过几率算法判断一个元素是否存在于这个集合中。利用这个算法咱们能够实现去重效果。github

本节咱们来了解Bloom Filter的基本算法,以及Scrapy-Redis中对接Bloom Filter的方法。redis

2. Bloom Filter的算法

在Bloom Filter中使用位数组来辅助实现检测判断。在初始状态下,咱们声明一个包含m位的位数组,它的全部位都是0,以下图所示。算法

如今咱们有了一个待检测集合,其表示为S={x1, x2, …, xn}。接下来须要作的就是检测一个x是否已经存在于集合S中。在Bloom Filter算法中,首先使用k个相互独立、随机的散列函数来将集合S中的每一个元素x1, x2, …, xn映射到长度为m的位数组上,散列函数获得的结果记做位置索引,而后将位数组该位置索引的位置1。例如,咱们取k为3,表示有三个散列函数,x1通过三个散列函数映射获得的结果分别为一、四、8,x2通过三个散列函数映射获得的结果分别为四、六、10,那么位数组的一、四、六、八、10这五位就会置为1,以下图所示。数组

若是有一个新的元素x,咱们要判断x是否属于S集合,咱们仍然用k个散列函数对x求映射结果。若是全部结果对应的位数组位置均为1,那么x属于S这个集合;若是有一个不为1,则x不属于S集合。架构

例如,新元素x通过三个散列函数映射的结果为四、六、8,对应的位置均为1,则x属于S集合。若是结果为四、六、7,而7对应的位置为0,则x不属于S集合。并发

注意,这里m、n、k知足的关系是m>nk,也就是说位数组的长度m要比集合元素n和散列函数k的乘积还要大。app

这样的断定方法很高效,可是也是有代价的,它可能把不属于这个集合的元素误认为属于这个集合。咱们来估计一下这种方法的错误率。当集合S={x1, x2,…, xn} 的全部元素都被k个散列函数映射到m位的位数组中时,这个位数组中某一位仍是0的几率是:scrapy

散列函数是随机的,则任意一个散列函数选中这一位的几率为1/m,那么1-1/m就表明散列函数从未没有选中这一位的几率,要把S彻底映射到m位数组中,须要作kn次散列运算,最后的几率就是1-1/m的kn次方。

一个不属于S的元素x若是误断定为在S中,那么这个几率就是k次散列运算获得的结果对应的位数组位置都为1,则误判几率为:

根据:

能够将误判几率转化为:

在给定m、n时,能够求出使得f最小化的k值为:

这里将误判几率概括以下:

表中第一列为m/n的值,第二列为最优k值,其后列为不一样k值的误判几率。当k值肯定时,随着m/n的增大,误判几率逐渐变小。当m/n的值肯定时,当k越靠近最优K值,误判几率越小。误判几率整体来看都是极小的,在容忍此误判几率的状况下,大幅减少存储空间和断定速度是彻底值得的。

接下来,咱们将Bloom Filter算法应用到Scrapy-Redis分布式爬虫的去重过程当中,以解决Redis内存不足的问题。

3. 对接Scrapy-Redis

实现Bloom Filter时,首先要保证不能破坏Scrapy-Redis分布式爬取的运行架构。咱们须要修改Scrapy-Redis的源码,将它的去重类替换掉。同时,Bloom Filter的实现须要借助于一个位数组,既然当前架构仍是依赖于Redis,那么位数组的维护直接使用Redis就行了。

首先实现一个基本的散列算法,将一个值通过散列运算后映射到一个m位数组的某一位上,代码以下:

class HashMap(object):
    def __init__(self, m, seed):
        self.m = m
        self.seed = seed
   
   def hash(self, value):        """        Hash Algorithm        :param value: Value        :return: Hash Value        """        ret = 0        for i in range(len(value)):            ret += self.seed * ret + ord(value[i])        
       return (self.m - 1) & ret

这里新建了一个HashMap类。构造函数传入两个值,一个是m位数组的位数,另外一个是种子值seed。不一样的散列函数须要有不一样的seed,这样能够保证不一样的散列函数的结果不会碰撞。

hash()方法的实现中,value是要被处理的内容。这里遍历了value的每一位,并利用ord()方法取到每一位的ASCII码值,而后混淆seed进行迭代求和运算,最终获得一个数值。这个数值的结果就由valueseed惟一肯定。咱们再将这个数值和m进行按位与运算,便可获取到m位数组的映射结果,这样就实现了一个由字符串和seed来肯定的散列函数。当m固定时,只要seed值相同,散列函数就是相同的,相同的value必然会映射到相同的位置。因此若是想要构造几个不一样的散列函数,只须要改变其seed就行了。以上内容即是一个简易的散列函数的实现。

接下来咱们再实现Bloom Filter。Bloom Filter里面须要用到k个散列函数,这里要对这几个散列函数指定相同的m值和不一样的seed值,构造以下:

BLOOMFILTER_HASH_NUMBER = 6
BLOOMFILTER_BIT = 30

class BloomFilter(object):    def __init__(self, server, key, bit=BLOOMFILTER_BIT, hash_number=BLOOMFILTER_HASH_NUMBER):        """        Initialize BloomFilter        :param server: Redis Server        :param key: BloomFilter Key        :param bit: m = 2 ^ bit        :param hash_number: the number of hash function        """        # default to 1 << 30 = 10,7374,1824 = 2^30 = 128MB, max filter 2^30/hash_number = 1,7895,6970 fingerprints        self.m = 1 << bit        self.seeds = range(hash_number)        self.maps = [HashMap(self.m, seed) for seed in self.seeds]        self.server = server        self.key = key

因为咱们须要亿级别的数据的去重,即前文介绍的算法中的n为1亿以上,散列函数的个数k大约取10左右的量级。而m>kn,这里m值大约保底在10亿,因为这个数值比较大,因此这里用移位操做来实现,传入位数bit,将其定义为30,而后作一个移位操做1<<30,至关于2的30次方,等于1073741824,量级也是刚好在10亿左右,因为是位数组,因此这个位数组占用的大小就是2^30 b=128 MB。开头咱们计算过Scrapy-Redis集合去重的占用空间大约在2 GB左右,可见Bloom Filter的空间利用效率极高。

随后咱们再传入散列函数的个数,用它来生成几个不一样的seed。用不一样的seed来定义不一样的散列函数,这样咱们就能够构造一个散列函数列表。遍历seed,构造带有不一样seed值的HashMap对象,而后将HashMap对象保存成变量maps供后续使用。

另外,server就是Redis链接对象,key就是这个m位数组的名称。

接下来,咱们要实现比较关键的两个方法:一个是断定元素是否重复的方法exists(),另外一个是添加元素到集合中的方法insert(),实现以下:

def exists(self, value):
    """    if value exists    :param value:    :return:    """
    if not value:        
       return False    exist = 1    for map in self.maps:        offset = map.hash(value)        exist = exist & self.server.getbit(self.key, offset)    
   return exist
   
def insert(self, value):    """    add value to bloom    :param value:    :return:    """    for f in self.maps:        offset = f.hash(value)        self.server.setbit(self.key, offset, 1)

首先看下insert()方法。Bloom Filter算法会逐个调用散列函数对放入集合中的元素进行运算,获得在m位位数组中的映射位置,而后将位数组对应的位置置1。这里代码中咱们遍历了初始化好的散列函数,而后调用其hash()方法算出映射位置offset,再利用Redis的setbit()方法将该位置1。

exists()方法中,咱们要实现断定是否重复的逻辑,方法参数value为待判断的元素。咱们首先定义一个变量exist,遍历全部散列函数对value进行散列运算,获得映射位置,用getbit()方法取得该映射位置的结果,循环进行与运算。这样只有每次getbit()获得的结果都为1时,最后的exist才为True,即表明value属于这个集合。若是其中只要有一次getbit()获得的结果为0,即m位数组中有对应的0位,那么最终的结果exist就为False,即表明value不属于这个集合。

Bloom Filter的实现就已经完成了,咱们能够用一个实例来测试一下,代码以下:

conn = StrictRedis(host='localhost', port=6379, password='foobared')
bf = BloomFilter(conn, 'testbf', 5, 6)
bf.insert('Hello')
bf.insert('World')
result = bf.exists('Hello')
print(bool(result))
result = bf.exists('Python')
print(bool(result))

这里首先定义了一个Redis链接对象,而后传递给Bloom Filter。为了不内存占用过大,这里传的位数bit比较小,设置为5,散列函数的个数设置为6。

调用insert()方法插入HelloWorld两个字符串,随后判断HelloPython这两个字符串是否存在,最后输出它的结果,运行结果以下:

True
False

很明显,结果彻底没有问题。这样咱们就借助Redis成功实现了Bloom Filter的算法。

接下来继续修改Scrapy-Redis的源码,将它的dupefilter逻辑替换为Bloom Filter的逻辑。这里主要是修改RFPDupeFilter类的request_seen()方法,实现以下:

def request_seen(self, request):
    fp = self.request_fingerprint(request)    
   if self.bf.exists(fp):        
       return True    self.bf.insert(fp)    
       return False

利用request_fingerprint()方法获取Request的指纹,调用Bloom Filter的exists()方法断定该指纹是否存在。若是存在,则说明该Request是重复的,返回True,不然调用Bloom Filter的insert()方法将该指纹添加并返回False。这样就成功利用Bloom Filter替换了Scrapy-Redis的集合去重。

对于Bloom Filter的初始化定义,咱们能够将__init__()方法修改成以下内容:

def __init__(self, server, key, debug, bit, hash_number):
    self.server = server
    self.key = key
    self.debug = debug
    self.bit = bit
    self.hash_number = hash_number
    self.logdupes = True
    self.bf = BloomFilter(server, self.key, bit, hash_number)

其中bithash_number须要使用from_settings()方法传递,修改以下:

@classmethod
def from_settings(cls, settings):
   server = get_redis_from_settings(settings)    key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())}    debug = settings.getbool('DUPEFILTER_DEBUG', DUPEFILTER_DEBUG)    bit = settings.getint('BLOOMFILTER_BIT', BLOOMFILTER_BIT)    hash_number = settings.getint('BLOOMFILTER_HASH_NUMBER', BLOOMFILTER_HASH_NUMBER)    
   return cls(server, key=key, debug=debug, bit=bit, hash_number=hash_number)

其中,常量DUPEFILTER_DEBUGBLOOMFILTER_BIT统必定义在defaults.py中,默认以下:

BLOOMFILTER_HASH_NUMBER = 6
BLOOMFILTER_BIT = 30

如今,咱们成功实现了Bloom Filter和Scrapy-Redis的对接。

4. 本节代码

本节代码地址为:https://github.com/Python3WebSpider/ScrapyRedisBloomFilter。

5. 使用

为了方便使用,本节的代码已经打包成一个Python包并发布到PyPi,连接为https://pypi.python.org/pypi/scrapy-redis-bloomfilter,能够直接使用ScrapyRedisBloomFilter,不须要本身实现一遍。

咱们能够直接使用pip来安装,命令以下:

pip3 install scrapy-redis-bloomfilter

使用的方法和Scrapy-Redis基本类似,在这里说明几个关键配置。

# 去重类,要使用Bloom Filter请替换DUPEFILTER_CLASS
DUPEFILTER_CLASS = "scrapy_redis_bloomfilter.dupefilter.RFPDupeFilter"
# 散列函数的个数,默认为6,能够自行修改
BLOOMFILTER_HASH_NUMBER = 6
# Bloom Filter的bit参数,默认30,占用128MB空间,去重量级1亿
BLOOMFILTER_BIT = 30
  • DUPEFILTER_CLASS是去重类,若是要使用Bloom Filter,则DUPEFILTER_CLASS须要修改成该包的去重类。

  • BLOOMFILTER_HASH_NUMBER是Bloom Filter使用的散列函数的个数,默认为6,能够根据去重量级自行修改。

  • BLOOMFILTER_BIT即前文所介绍的BloomFilter类的bit参数,它决定了位数组的位数。若是BLOOMFILTER_BIT为30,那么位数组位数为2的30次方,这将占用Redis 128 MB的存储空间,去重量级在1亿左右,即对应爬取量级1亿左右。若是爬取量级在10亿、20亿甚至100亿,请务必将此参数对应调高。

6. 测试

源代码附有一个测试项目,放在tests文件夹,该项目使用了ScrapyRedisBloomFilter来去重,Spider的实现以下:

from scrapy import Request, Spider

class TestSpider(Spider):    name = 'test'    base_url = 'https://www.baidu.com/s?wd='    def start_requests(self):        for i in range(10):            url = self.base_url + str(i)            
           yield Request(url, callback=self.parse)
       
       # Here contains 10 duplicated Requests            for i in range(100):            url = self.base_url + str(i)            
           yield Request(url, callback=self.parse)    

   def parse(self, response):        self.logger.debug('Response of ' + response.url)

start_requests()方法首先循环10次,构造参数为0~9的URL,而后从新循环了100次,构造了参数为0~99的URL。那么这里就会包含10个重复的Request,咱们运行项目测试一下:

scrapy crawl test

最后的输出结果以下:

{'bloomfilter/filtered': 10, 
'downloader/request_bytes': 34021,
'downloader/request_count': 100,
'downloader/request_method_count/GET': 100,
'downloader/response_bytes': 72943,
'downloader/response_count': 100,
'downloader/response_status_count/200': 100,
'finish_reason': 'finished',
'finish_time': datetime.datetime(2017, 8, 11, 9, 34, 30, 419597),
'log_count/DEBUG': 202,
'log_count/INFO': 7,
'memusage/max': 54153216,
'memusage/startup': 54153216,
'response_received_count': 100,
'scheduler/dequeued/redis': 100,
'scheduler/enqueued/redis': 100,
'start_time': datetime.datetime(2017, 8, 11, 9, 34, 26, 495018)}

最后统计的第一行的结果:

'bloomfilter/filtered': 10,

这就是Bloom Filter过滤后的统计结果,它的过滤个数为10个,也就是它成功将重复的10个Reqeust识别出来了,测试经过。

7. 结语

以上内容即是Bloom Filter的原理及对接实现,Bloom Filter的使用能够大大节省Redis内存。在数据量大的状况下推荐此方案。

相关文章
相关标签/搜索