scrapy流程图
旧版
php
新版
css
Scrapy Engine(引擎)
: 负责Spider、ItemPipeline、Downloader、Scheduler中间的通信,信号、数据传递等。html
Scheduler(调度器)
: 它负责接受引擎发送过来的Request请求,并按照必定的方式进行整理排列,入队,当引擎须要时,交还给引擎。python
Downloader(下载器)
:负责下载Scrapy Engine(引擎)发送的全部Requests请求,并将其获取到的Responses交还给Scrapy Engine(引擎),由引擎交给Spider来处理,git
Spider(爬虫)
:它负责处理全部Responses,从中分析提取数据,获取Item字段须要的数据,并将须要跟进的URL提交给引擎,再次进入Scheduler(调度器),github
Item Pipeline(管道)
:它负责处理Spider中获取到的Item,并进行进行后期处理(详细分析、过滤、存储等)的地方.web
Downloader Middlewares(下载中间件)
:你能够看成是一个能够自定义扩展下载功能的组件。正则表达式
Spider Middlewares(Spider中间件)
:你能够理解为是一个能够自定扩展和操做引擎和Spider中间通讯的功能组件(好比进入Spider的Responses;和从Spider出去的Requests)redis
数据流(Data flow)mongodb
引擎获取起始url并发起请求,将获取的响应内容返回给spider,
在spider中进行数据的提取和下一个url的连接,
数据交给item和pipeline进行处理,
url继续发起请求,
制做 Scrapy 爬虫 一共须要4步:
命令行输入
scrapy startproject tutorial
目录结构
scrapy.cfg: 项目的配置文件;(用于发布到服务器)
tutorial/: 该项目文件夹。以后将在此编写Python代码。
tutorial/items.py: 项目中的item文件;(定义结构化数据字段field).
tutorial/pipelines.py: 项目中的pipelines文件;(用于存放执行后期数据处理的功能,定义如何存储结构化数据)
tutorial/settings.py: 项目的设置文件;(如何修改User-Agent,设置爬取时间间隔,设置代理,配置中间件等等)
tutorial/spiders/: 放置spider代码的目录;(编写爬取网站规则)
定义item,在items.py
文件中编写item
相似与django
import scrapy class DmozItem(scrapy.Item): title = scrapy.Field() link = scrapy.Field() desc = scrapy.Field()
Spider是用户编写用于从单个网站(或者一些网站)爬取数据的类。
其包含了一个用于下载的初始URL,如何跟进网页中的连接以及如何分析页面中的内容, 提取生成 item 的方法。
为了建立一个Spider,您必须继承scrapy.Spider
类, 且定义一些属性:
name
: 用于区别Spider。 该名字必须是惟一的。start_urls
: 包含了Spider在启动时进行爬取的url列表。 所以,第一个被获取到的页面将是其中之一。 后续的URL则从初始的URL获取到的数据中提取。parse()
是spider
的一个方法。 被调用时,每一个初始URL完成下载后生成的 Response 对象将会做为惟一的参数传递给该函数。 该方法负责解析返回的数据(response data),提取数据(生成item)以及生成须要进一步处理的URL的 Request 对象。scrapy genspider name "example.com"
import scrapy class DmozSpider(scrapy.Spider): name = "dmoz" allowed_domains = ["dmoz.org"] start_urls = [ "http://www.dmoz.org/Computers/Programming/Languages/Python/Books/", "http://www.dmoz.org/Computers/Programming/Languages/Python/Resources/" ] def parse(self, response): filename = response.url.split("/")[-2] + '.html' with open(filename, 'wb') as f: f.write(response.body)
启动爬虫
scrapy crawl dmoz
Selectors选择器简介
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/selectors.html
Scrapy Selectors
内置XPath
和 CSS Selector
表达式机制
Selector有四个基本的方法:
- xpath(): 传入xpath表达式,返回该表达式所对应的全部节点的selector list列表
- extract(): 序列化该节点为Unicode字符串并返回list
- css(): 传入CSS表达式,返回该表达式所对应的全部节点的selector list列表,语法同 BeautifulSoup4
- re(): 根据传入的正则表达式对数据进行提取,返回Unicode字符串list列表
经过shell能够很方便的提取出须要的数据
当Item在Spider中被收集以后,它将会被传递到Item Pipeline
每一个Item Pipeline组件接收到Item,定义一些操做行为,好比决定此Item是丢弃而存储。
如下是item pipeline的一些典型应用:
编写item pipeline
很简单,item pipiline
组件是一个独立的Python类,其中process_item()
方法必须实现:
import something class SomethingPipeline(object): def __init__(self): # 可选实现,作参数初始化等 # doing something def process_item(self, item, spider): # item (Item 对象) – 被爬取的item # spider (Spider 对象) – 爬取该item的spider # 这个方法必须实现,每一个item pipeline组件都须要调用该方法, # 这个方法必须返回一个 Item 对象,被丢弃的item将不会被以后的pipeline组件所处理。 return item def open_spider(self, spider): # spider (Spider 对象) – 被开启的spider # 可选实现,当spider被开启时,这个方法被调用。 def close_spider(self, spider): # spider (Spider 对象) – 被关闭的spider # 可选实现,当spider被关闭时,这个方法被调用
import json class JsonWriterPipeline(object): def __init__(self): self.file = open('items.json', 'wb') def process_item(self, item, spider): line = json.dumps(dict(item),ensure_ascii=False) + "\n" self.file.write(line) return item
为了启用Item Pipeline
组件,必须将它的类添加到 settings.py
文件ITEM_PIPELINES
配置,就像下面这个例子:
ITEM_PIPELINES = { #'tutorial.pipelines.PricePipeline': 300, 'tutorial.pipelines.JsonWriterPipeline': 800, }
分配给每一个类的整型值,肯定了他们运行的顺序,item按数字从低到高的顺序,经过pipeline,一般将这些数字定义在0-1000范围内。数值越低,越先运行
pipeline中还有一个from_crawler(cls, crawler)
类方法
若是使用,这个类方法被调用建立爬虫管道实例。必须返回管道的一个新实例。crawler提供存取全部Scrapy核心组件配置和信号管理器; 对于pipelines这是一种访问配置和信号管理器 的方式。
在这个例子中,咱们将使用pymongo
将Item写到MongoDB。MongoDB的地址和数据库名称在Scrapy setttings.py
配置文件中;
这个例子主要是说明如何使用from_crawler()
方法
import pymongo class MongoPipeline(object): collection_name = 'scrapy_items' def __init__(self, mongo_uri, mongo_db): self.mongo_uri = mongo_uri self.mongo_db = mongo_db @classmethod def from_crawler(cls, crawler): return cls( mongo_uri=crawler.settings.get('MONGO_URI'), mongo_db=crawler.settings.get('MONGO_DATABASE', 'items') ) def open_spider(self, spider): self.client = pymongo.MongoClient(self.mongo_uri) self.db = self.client[self.mongo_db] def close_spider(self, spider): self.client.close() def process_item(self, item, spider): self.db[self.collection_name].insert(dict(item)) return item
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/spiders.html
Spider类定义了如何爬取某个(或某些)网站。包括了爬取的动做(例如:是否跟进连接)以及如何从网页的内容中提取结构化数据(爬取item)。 换句话说,Spider就是定义爬取的动做及分析某个网页(或者是有些网页)的地方。
class scrapy.spider.Spider
Spider是最简单的spider。每一个spider必须继承自该类。Spider并无提供什么特殊的功能。其仅仅请求给定的 start_urls/start_requests
,并根据返回的结果调用spider的parse方法。
源码参考
#全部爬虫的基类,用户定义的爬虫必须从这个类继承 class Spider(object_ref): #定义spider名字的字符串(string)。spider的名字定义了Scrapy如何定位(并初始化)spider,因此其必须是惟一的。 #name是spider最重要的属性,并且是必须的。 #通常作法是以该网站(domain)(加或不加 后缀 )来命名spider。 例如,若是spider爬取 mywebsite.com ,该spider一般会被命名为 mywebsite name = None #初始化,提取爬虫名字,start_ruls def __init__(self, name=None, **kwargs): if name is not None: self.name = name # 若是爬虫没有名字,中断后续操做则报错 elif not getattr(self, 'name', None): raise ValueError("%s must have a name" % type(self).__name__) # python 对象或类型经过内置成员__dict__来存储成员信息 self.__dict__.update(kwargs) #URL列表。当没有指定的URL时,spider将从该列表中开始进行爬取。 所以,第一个被获取到的页面的URL将是该列表之一。 后续的URL将会从获取到的数据中提取。 if not hasattr(self, 'start_urls'): self.start_urls = [] # 打印Scrapy执行后的log信息 def log(self, message, level=log.DEBUG, **kw): log.msg(message, spider=self, level=level, **kw) # 判断对象object的属性是否存在,不存在作断言处理 def set_crawler(self, crawler): assert not hasattr(self, '_crawler'), "Spider already bounded to %s" % crawler self._crawler = crawler @property def crawler(self): assert hasattr(self, '_crawler'), "Spider not bounded to any crawler" return self._crawler @property def settings(self): return self.crawler.settings #该方法将读取start_urls内的地址,并为每个地址生成一个Request对象,交给Scrapy下载并返回Response #该方法仅调用一次 def start_requests(self): for url in self.start_urls: yield self.make_requests_from_url(url) #start_requests()中调用,实际生成Request的函数。 #Request对象默认的回调函数为parse(),提交的方式为get def make_requests_from_url(self, url): return Request(url, dont_filter=True) #默认的Request对象回调函数,处理返回的response。 #生成Item或者Request对象。用户必须实现这个类 def parse(self, response): raise NotImplementedError @classmethod def handles_request(cls, request): return url_is_from_spider(request.url, cls) def __str__(self): return "<%s %r at 0x%0x>" % (type(self).__name__, self.name, id(self)) __repr__ = __str__
主要属性和方法
name
定义spider名字的字符串。
例如,若是spider爬取 mywebsite.com ,该spider一般会被命名为 mywebsite
allowed_domains
包含了spider容许爬取的域名(domain)的列表,可选。
start_urls
初始URL元祖/列表。当没有制定特定的URL时,spider将从该列表中开始进行爬取。
start_requests(self)
该方法必须返回一个可迭代对象(iterable)。该对象包含了spider用于爬取(默认实现是使用start_urls 的url)的第一个Request。
当spider启动爬取而且未指定start_urls时,该方法被调用。
parse(self, response)
当请求url返回网页没有指定回调函数时,默认的Request对象回调函数。用来处理网页返回的response,以及生成Item或者Request对象。
log(self, message[, level, component])
使用 scrapy.log.msg() 方法记录(log)message。 更多数据请参见 logging
腾讯招聘网自动翻页
(代码采集自互联网)
from mySpider.items import TencentItem import scrapy import re class TencentSpider(scrapy.Spider): name = "tencent" allowed_domains = ["hr.tencent.com"] start_urls = [ "http://hr.tencent.com/position.php?&start=0#a" ] def parse(self, response): for each in response.xpath('//*[@class="even"]'): item = TencentItem() name = each.xpath('./td[1]/a/text()').extract()[0] detailLink = each.xpath('./td[1]/a/@href').extract()[0] positionInfo = each.xpath('./td[2]/text()').extract()[0] peopleNumber = each.xpath('./td[3]/text()').extract()[0] workLocation = each.xpath('./td[4]/text()').extract()[0] publishTime = each.xpath('./td[5]/text()').extract()[0] #print name, detailLink, catalog, peopleNumber, workLocation,publishTime item['name'] = name.encode('utf-8') item['detailLink'] = detailLink.encode('utf-8') item['positionInfo'] = positionInfo.encode('utf-8') item['peopleNumber'] = peopleNumber.encode('utf-8') item['workLocation'] = workLocation.encode('utf-8') item['publishTime'] = publishTime.encode('utf-8') curpage = re.search('(\d+)',response.url).group(1) page = int(curpage) + 10 url = re.sub('\d+', str(page), response.url) # 发送新的url请求加入待爬队列,并调用回调函数 self.parse yield scrapy.Request(url, callback = self.parse) # 将获取的数据交给pipeline yield item
经过下面的命令能够快速建立 CrawlSpider模板 的代码:
scrapy genspider -t crawl tencent tencent.com
class scrapy.spiders.CrawlSpider
它是Spider的派生类,Spider类的设计原则是只爬取start_url列表中的网页,而CrawlSpider类定义了一些规则(rule)来提供跟进link的方便的机制,从爬取的网页中获取link并继续爬取的工做更适合。
源码解析参考
class CrawlSpider(Spider): rules = () def __init__(self, *a, **kw): super(CrawlSpider, self).__init__(*a, **kw) self._compile_rules() #首先调用parse()来处理start_urls中返回的response对象 #parse()则将这些response对象传递给了_parse_response()函数处理,并设置回调函数为parse_start_url() #设置了跟进标志位True #parse将返回item和跟进了的Request对象 def parse(self, response): return self._parse_response(response, self.parse_start_url, cb_kwargs={}, follow=True) #处理start_url中返回的response,须要重写 def parse_start_url(self, response): return [] def process_results(self, response, results): return results #从response中抽取符合任一用户定义'规则'的连接,并构形成Resquest对象返回 def _requests_to_follow(self, response): if not isinstance(response, HtmlResponse): return seen = set() #抽取以内的全部连接,只要经过任意一个'规则',即表示合法 for n, rule in enumerate(self._rules): links = [l for l in rule.link_extractor.extract_links(response) if l not in seen] #使用用户指定的process_links处理每一个链接 if links and rule.process_links: links = rule.process_links(links) #将连接加入seen集合,为每一个连接生成Request对象,并设置回调函数为_repsonse_downloaded() for link in links: seen.add(link) #构造Request对象,并将Rule规则中定义的回调函数做为这个Request对象的回调函数 r = Request(url=link.url, callback=self._response_downloaded) r.meta.update(rule=n, link_text=link.text) #对每一个Request调用process_request()函数。该函数默认为indentify,即不作任何处理,直接返回该Request. yield rule.process_request(r) #处理经过rule提取出的链接,并返回item以及request def _response_downloaded(self, response): rule = self._rules[response.meta['rule']] return self._parse_response(response, rule.callback, rule.cb_kwargs, rule.follow) #解析response对象,会用callback解析处理他,并返回request或Item对象 def _parse_response(self, response, callback, cb_kwargs, follow=True): #首先判断是否设置了回调函数。(该回调函数多是rule中的解析函数,也多是 parse_start_url函数) #若是设置了回调函数(parse_start_url()),那么首先用parse_start_url()处理response对象, #而后再交给process_results处理。返回cb_res的一个列表 if callback: #若是是parse调用的,则会解析成Request对象 #若是是rule callback,则会解析成Item cb_res = callback(response, **cb_kwargs) or () cb_res = self.process_results(response, cb_res) for requests_or_item in iterate_spider_output(cb_res): yield requests_or_item #若是须要跟进,那么使用定义的Rule规则提取并返回这些Request对象 if follow and self._follow_links: #返回每一个Request对象 for request_or_item in self._requests_to_follow(response): yield request_or_item def _compile_rules(self): def get_method(method): if callable(method): return method elif isinstance(method, basestring): return getattr(self, method, None) self._rules = [copy.copy(r) for r in self.rules] for rule in self._rules: rule.callback = get_method(rule.callback) rule.process_links = get_method(rule.process_links) rule.process_request = get_method(rule.process_request) def set_crawler(self, crawler): super(CrawlSpider, self).set_crawler(crawler) self._follow_links = crawler.settings.getbool('CRAWLSPIDER_FOLLOW_LINKS', True)
CrawlSpider继承于Spider类,除了继承过来的属性外(name、allow_domains),还提供了新的属性和方法:
class scrapy.linkextractors.LinkExtractor
Link Extractors
的目的很简单: 提取连接。
每一个LinkExtractor
有惟一的公共方法是 extract_links()
,它接收一个 Response
对象,并返回一个 scrapy.link.Link
对象。
Link Extractors
要实例化一次,而且 extract_links
方法会根据不一样的response
调用屡次提取连接。
class scrapy.linkextractors.LinkExtractor( allow = (), deny = (), allow_domains = (), deny_domains = (), deny_extensions = None, restrict_xpaths = (), tags = ('a','area'), attrs = ('href'), canonicalize = True, unique = True, process_value = None )
主要参数:
allow
:知足括号中“正则表达式”的值会被提取,若是为空,则所有匹配。deny
:与这个正则表达式(或正则表达式列表)不匹配的URL必定不提取。allow_domains
:会被提取的连接的domains。deny_domains
:必定不会被提取连接的domains。restrict_xpaths
:使用xpath表达式,和allow共同做用过滤连接。在rules中包含一个或多个Rule对象,每一个Rule对爬取网站的动做定义了特定操做。若是多个rule匹配了相同的连接,则根据规则在本集合中被定义的顺序,第一个会被使用。
class scrapy.spiders.Rule( link_extractor, callback = None, cb_kwargs = None, follow = None, process_links = None, process_request = None )
link_extractor
:是一个Link Extractor对象,用于定义须要提取的连接。callback
: 从link_extractor中每获取到连接时,参数所指定的值做为回调函数,该回调函数接受一个response做为其第一个参数。
注意:当编写爬虫规则时,避免使用parse做为回调函数。因为CrawlSpider使用parse方法来实现其逻辑,若是覆盖了 parse方法,crawl spider将会运行失败。
follow
:是一个布尔(boolean)值,指定了根据该规则从response提取的连接是否须要跟进。 若是callback为None,follow 默认设置为True ,不然默认为False。process_links
:指定该spider中哪一个的函数将会被调用,从link_extractor中获取到连接列表时将会调用该函数。该方法主要用来过滤。process_request
:指定该spider中哪一个的函数将会被调用, 该规则提取到每一个request时都会调用该函数。 (用来过滤request)
翻页
import scrapy from scrapy.spiders import CrawlSpider, Rule from scrapy.linkextractors import LinkExtractor from mySpider.items import TencentItem class TencentSpider(CrawlSpider): name = "tencent" allowed_domains = ["hr.tencent.com"] start_urls = [ "http://hr.tencent.com/position.php?&start=0#a" ] page_lx = LinkExtractor(allow=("start=\d+")) rules = [ Rule(page_lx, callback = "parseContent", follow = True) ] def parseContent(self, response): for each in response.xpath('//*[@class="even"]'): name = each.xpath('./td[1]/a/text()').extract()[0] detailLink = each.xpath('./td[1]/a/@href').extract()[0] positionInfo = each.xpath('./td[2]/text()').extract()[0] peopleNumber = each.xpath('./td[3]/text()').extract()[0] workLocation = each.xpath('./td[4]/text()').extract()[0] publishTime = each.xpath('./td[5]/text()').extract()[0] #print name, detailLink, catalog,recruitNumber,workLocation,publishTime item = TencentItem() item['name']=name.encode('utf-8') item['detailLink']=detailLink.encode('utf-8') item['positionInfo']=positionInfo.encode('utf-8') item['peopleNumber']=peopleNumber.encode('utf-8') item['workLocation']=workLocation.encode('utf-8') item['publishTime']=publishTime.encode('utf-8') yield item
某些网站会为每个url增长一个sessionid属性,多是为了标记用户访问历史,并且这个seesionid随着每次访问都会动态变化,这就为爬虫的去重处理(即标记已经爬取过的网站)和提取规则增长了难度。
https://bitsharestalk.org/index.php?board=5.0
会变成https://bitsharestalk.org/index.phpPHPSESSID=9771d42640ab3c89eb77e8bd9e220b53&board=5.0
,下面介绍集中处理方法
仅适用你的爬虫使用的是 scrapy.contrib.spiders.CrawlSpider, 在这个内置爬虫中,你提取url要经过Rule类来进行提取,其自带了对提取后的url进行加工的函数。
rules = ( Rule(LinkExtractor(allow = ( "https://bitsharestalk\.org/index\.php\?PHPSESSID\S*board=\d+\.\d+$", "https://bitsharestalk\.org/index\.php\?board=\d+\.\d+$" )), process_links = 'link_filtering' ), #默认函数process_links Rule(LinkExtractor(allow = ( " https://bitsharestalk\.org/index\.php\?PHPSESSID\S*topic=\d+\.\d+$" , "https://bitsharestalk\.org/index\.php\?topic=\d+\.\d+$", ),), callback = "extractPost" , follow = True, process_links = 'link_filtering' ), Rule(LinkExtractor(allow = ( "https://bitsharestalk\.org/index\.php\?PHPSESSID\S*action=profile;u=\d+$" , "https://bitsharestalk\.org/index\.php\?action=profile;u=\d+$" , ),), callback = "extractUser", process_links = 'link_filtering' ) ) def link_filtering(self, links): ret = [] for link in links: url = link.url # print "This is the yuanlai ", link.url urlfirst, urllast = url.split( " ? " ) if urllast: link.url = urlfirst + " ? " + urllast.split( " & " , 1)[1] # print link.url return links
class WeiboSpider(CrawlSpider): name = 'weibo' allowed_domains = ['weibo.com'] start_urls = ['http://www.weibo.com/u/1876296184'] # 不加www,则匹配不到cookie, get_login_cookie()方法正则代完善 rules = ( Rule(LinkExtractor(allow=r'^http:\/\/(www\.)?weibo.com/[a-z]/.*'), # 微博我的页面的规则,或/u/或/n/后面跟一串数字 process_request='process_request', callback='parse_item', follow=True), ) cookies = None def process_request(self, request): link=request.url page = re.search('page=\d*', link).group() type = re.search('type=\d+', link).group() newrequest = request.replace(cookies =self.cookies, url='.../questionType?' + page + "&" + type) return newrequest
Scrapy提供了log功能,能够经过 logging 模块使用。
Scrapy提供5层logging级别:
默认状况下python的logging模块将日志打印到了标准输出中,且只显示了大于等于WARNING级别的日志,这说明默认的日志级别设置为WARNING(日志级别等级CRITICAL > ERROR > WARNING > INFO > DEBUG,默认的日志格式为DEBUG级别
经过在setting.py中
进行如下设置能够被用来配置logging
:
print("hello")
,其将会在Scrapy log中显示。#coding:utf-8 ###################### ##Logging的使用 ###################### import logging ''' 1. logging.CRITICAL - for critical errors (highest severity) 致命错误 2. logging.ERROR - for regular errors 通常错误 3. logging.WARNING - for warning messages 警告+错误 4. logging.INFO - for informational messages 消息+警告+错误 5. logging.DEBUG - for debugging messages (lowest severity) 低级别 ''' logging.warning("This is a warning") logging.log(logging.WARNING,"This is a warning") #获取实例对象 logger=logging.getLogger() logger.warning("这是警告消息") #指定消息发出者 logger = logging.getLogger('SimilarFace') logger.warning("This is a warning") #在爬虫中使用log import scrapy class MySpider(scrapy.Spider): name = 'myspider' start_urls = ['http://scrapinghub.com'] def parse(self, response): #方法1 自带的logger self.logger.info('Parse function called on %s', response.url) #方法2 本身定义个logger logger.info('Parse function called on %s', response.url) ''' Logging 设置 • LOG_FILE • LOG_ENABLED • LOG_ENCODING • LOG_LEVEL • LOG_FORMAT • LOG_DATEFORMAT • LOG_STDOUT 命令行中使用 --logfile FILE Overrides LOG_FILE --loglevel/-L LEVEL Overrides LOG_LEVEL --nolog Sets LOG_ENABLED to False ''' import logging from scrapy.utils.log import configure_logging configure_logging(install_root_handler=False) #定义了logging的些属性 logging.basicConfig( filename='log.txt', format='%(levelname)s: %(levelname)s: %(message)s', level=logging.INFO ) #运行时追加模式 logging.info('进入Log文件') logger = logging.getLogger('SimilarFace') logger.warning("也要进入Log文件")
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/settings.html
Scrapy设置(settings)提供了定制Scrapy组件的方法。能够控制包括核心(core),插件(extension),pipeline及spider组件。好比 设置Json Pipeliine、LOG_LEVEL
BOT_NAME
默认: scrapybot
当您使用 startproject
命令建立项目时其也被自动赋值。
CONCURRENT_ITEMS
默认: 100
Item Processor
(即 Item Pipeline
) 同时处理(每一个response的)item的最大值。
CONCURRENT_REQUESTS
默认: 16
Scrapy downloader
并发请求(concurrent requests
)的最大值。
{ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en', }
Scrapy HTTP Request
使用的默认header。
DEPTH_LIMIT
默认: 0
爬取网站最大容许的深度
(depth)值。若是为0,则没有限制。
DOWNLOAD_DELAY
默认: 0
下载器在下载同一个网站下一个页面前须要等待的时间。该选项能够用来限制爬取速度, 减轻服务器压力。同时也支持小数:DOWNLOAD_DELAY = 0.25 # 250 ms of delay
该设置影响(默认启用的) RANDOMIZE_DOWNLOAD_DELAY
设置。 默认状况下,Scrapy在两个请求间不等待一个固定的值, 而是使用0.5到1.5之间的一个随机值 DOWNLOAD_DELAY
的结果做为等待间隔。
DOWNLOAD_TIMEOUT
默认: 180
下载器超时时间(单位: 秒)。
ITEM_PIPELINES
ITEM_PIPELINES = { 'mybot.pipelines.validate.ValidateMyItem': 300, 'mybot.pipelines.validate.StoreMyItem': 800, }
LOG_ENABLED
默认: True
是否启用logging。
LOG_ENCODING
默认: 'utf-8'
logging使用的编码。
LOG_LEVEL
默认: 'DEBUG'
log的最低级别。可选的级别有: CRITICAL、 ERROR、WARNING、INFO、DEBUG
。
USER_AGENT
默认: Scrapy/VERSION (+http://scrapy.org)
爬取的默认User-Agent
,除非被覆盖。
https://docs.scrapy.org/en/latest/topics/request-response.html
Request 部分源码:
# 部分代码 class Request(object_ref): def __init__(self, url, callback=None, method='GET', headers=None, body=None, cookies=None, meta=None, encoding='utf-8', priority=0, dont_filter=False, errback=None): self._encoding = encoding # this one has to be set first self.method = str(method).upper() self._set_url(url) self._set_body(body) assert isinstance(priority, int), "Request priority not an integer: %r" % priority self.priority = priority assert callback or not errback, "Cannot use errback without a callback" self.callback = callback self.errback = errback self.cookies = cookies or {} self.headers = Headers(headers or {}, encoding=encoding) self.dont_filter = dont_filter self._meta = dict(meta) if meta else None @property def meta(self): if self._meta is None: self._meta = {} return self._meta
经常使用参数
url
: 就是须要请求,并进行下一步处理的url
callback
: 指定该请求返回的Response
,由那个函数来处理。
method
: 请求通常不须要指定,默认GET方法,可设置为"GET", "POST", "PUT"等,且保证字符串大写
headers
: 请求时,包含的头文件。通常不须要。内容通常以下:
# 本身写过爬虫的确定知道 Host: media.readthedocs.org User-Agent: Mozilla/5.0 (Windows NT 6.2; WOW64; rv:33.0) Gecko/20100101 Firefox/33.0 Accept: text/css,*/*;q=0.1 Accept-Language: zh-cn,zh;q=0.8,en-us;q=0.5,en;q=0.3 Accept-Encoding: gzip, deflate Referer: http://scrapy-chs.readthedocs.org/zh_CN/0.24/ Cookie: _ga=GA1.2.1612165614.1415584110; Connection: keep-alive If-Modified-Since: Mon, 25 Aug 2014 21:59:35 GMT Cache-Control: max-age=0
meta
: 比较经常使用,在不一样的请求之间传递数据使用的。字典dict型
request_with_cookies = Request( url="http://www.example.com", cookies={'currency': 'USD', 'country': 'UY'}, meta={'dont_merge_cookies': True} )
encoding
: 使用默认的 'utf-8' 就行。
dont_filter
: 代表该请求不禁调度器过滤。这是当你想使用屡次执行相同的请求,忽略重复的过滤器。默认为False。
errback
: 指定错误处理函数
Response
# 部分代码 class Response(object_ref): def __init__(self, url, status=200, headers=None, body='', flags=None, request=None): self.headers = Headers(headers or {}) self.status = int(status) self._set_body(body) self._set_url(url) self.request = request self.flags = [] if flags is None else list(flags) @property def meta(self): try: return self.request.meta except AttributeError: raise AttributeError("Response.meta not available, this response " \ "is not tied to any request")
大部分参数和上面的差很少:
status: 响应码 _set_body(body): 响应体 _set_url(url):响应url self.request = request
https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
下载中间件是处于引擎(crawler.engine)和下载器(crawler.engine.download())之间的一层组件,能够有多个下载中间件被加载运行。
当引擎传递请求给下载器的过程当中,下载中间件能够对请求进行处理 (例如增长http header信息,增长proxy信息等);
在下载器完成http请求,传递响应给引擎的过程当中, 下载中间件能够对响应进行处理(例如进行gzip的解压等)
要激活下载器中间件组件,将其加入到 DOWNLOADER_MIDDLEWARES 设置中。 该设置是一个字典(dict),键为中间件类的路径,值为其中间件的顺序(order)。
这里是一个例子:
DOWNLOADER_MIDDLEWARES = { 'mySpider.middlewares.MyDownloaderMiddleware': 543, }
编写下载器中间件十分简单。每一个中间件组件是一个定义了如下一个或多个方法的Python类:
class scrapy.contrib.downloadermiddleware.DownloaderMiddleware
process_request()
必须返回如下其中之一:一个 None 、一个 Response 对象、一个 Request 对象或 raise IgnoreRequest:若是其raise一个 IgnoreRequest 异常,则安装的下载中间件的 process_exception() 方法会被调用。若是没有任何一个方法处理该异常, 则request的errback(Request.errback)方法会被调用。若是没有代码处理抛出的异常, 则该异常被忽略且不记录(不一样于其余异常那样)。
参数:
request (Request 对象) – 处理的request
spider (Spider 对象) – 该request对应的spider
process_request()
必须返回如下其中之一: 返回一个 Response 对象、 返回一个 Request 对象或raise一个 IgnoreRequest 异常。若是其抛出一个 IgnoreRequest 异常,则调用request的errback(Request.errback)。 若是没有代码处理抛出的异常,则该异常被忽略且不记录(不一样于其余异常那样)。
参数:
request (Request 对象) – response所对应的request
response (Response 对象) – 被处理的response
spider (Spider 对象) – response所对应的spider
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/jobs.html
要启用持久化支持,你只须要经过 JOBDIR
设置 job directory
选项。这个路径将会存储 全部的请求数据来保持一个单独任务的状态(例如:一次spider爬取(a spider run))。必需要注意的是,这个目录不容许被不一样的spider 共享,甚至是同一个spider的不一样jobs/runs也不行。也就是说,这个目录就是存储一个 单独 job的状态信息。
scrapy crawl somespider -s JOBDIR=crawls/somespider-1
而后,你就能在任什么时候候安全地中止爬虫(按Ctrl-C或者发送一个信号)。恢复这个爬虫也是一样的命令:
scrapy crawl somespider -s JOBDIR=crawls/somespider-1
在scrapy源码中找到scrapy/dupefilters.py
文件,部分源码
class RFPDupeFilter(BaseDupeFilter): """Request Fingerprint duplicates filter""" def __init__(self, path=None, debug=False): self.file = None self.fingerprints = set() self.logdupes = True self.debug = debug self.logger = logging.getLogger(__name__) if path: self.file = open(os.path.join(path, 'requests.seen'), 'a+') self.file.seek(0) self.fingerprints.update(x.rstrip() for x in self.file) @classmethod def from_settings(cls, settings): debug = settings.getbool('DUPEFILTER_DEBUG') return cls(job_dir(settings), debug) def request_seen(self, request): fp = self.request_fingerprint(request) if fp in self.fingerprints: return True self.fingerprints.add(fp) if self.file: self.file.write(fp + os.linesep) def request_fingerprint(self, request): return request_fingerprint(request) def close(self, reason): if self.file: self.file.close() def log(self, request, spider): if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {'request': request}, extra={'spider': spider}) elif self.logdupes: msg = ("Filtered duplicate request: %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {'request': request}, extra={'spider': spider}) self.logdupes = False spider.crawler.stats.inc_value('dupefilter/filtered', spider=spider)
里面有一个request_seen
方法,这个方法在scrapy/core/scheduler.py
中被调用
class Scheduler(object): ... def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False dqok = self._dqpush(request) if dqok: self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider) else: self._mqpush(request) self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider) self.stats.inc_value('scheduler/enqueued', spider=self.spider) return True ...
回到request_seen
方法继续查看
def request_seen(self, request): fp = self.request_fingerprint(request) if fp in self.fingerprints: return True self.fingerprints.add(fp) if self.file: self.file.write(fp + os.linesep) # 返回的`request_fingerprint`是`from scrapy.utils.request import request_fingerprint` def request_fingerprint(self, request): return request_fingerprint(request)
scrapy\utils\request.py
这个函数将request
进行hash
,最后生成摘要(fp.hexdigest()
)
def request_fingerprint(request, include_headers=None): """ Return the request fingerprint. The request fingerprint is a hash that uniquely identifies the resource the request points to. For example, take the following two urls: http://www.example.com/query?id=111&cat=222 http://www.example.com/query?cat=222&id=111 Even though those are two different URLs both point to the same resource and are equivalent (ie. they should return the same response). Another example are cookies used to store session ids. Suppose the following page is only accesible to authenticated users: http://www.example.com/members/offers.html Lot of sites use a cookie to store the session id, which adds a random component to the HTTP Request and thus should be ignored when calculating the fingerprint. For this reason, request headers are ignored by default when calculating the fingeprint. If you want to include specific headers use the include_headers argument, which is a list of Request headers to include. """ if include_headers: include_headers = tuple(to_bytes(h.lower()) for h in sorted(include_headers)) cache = _fingerprint_cache.setdefault(request, {}) if include_headers not in cache: fp = hashlib.sha1() fp.update(to_bytes(request.method)) fp.update(to_bytes(canonicalize_url(request.url))) fp.update(request.body or b'') if include_headers: for hdr in include_headers: if hdr in request.headers: fp.update(hdr) for v in request.headers.getlist(hdr): fp.update(v) cache[include_headers] = fp.hexdigest() return cache[include_headers]
咱们能够看到,去重指纹是sha1(method + url + body + header)
因此,实际可以去掉重复的比例并不大。
若是咱们须要本身提取去重的finger,须要本身实现Filter,并配置上它。
下面这个Filter只根据url去重:
from scrapy.dupefilter import RFPDupeFilter class SeenURLFilter(RFPDupeFilter): """A dupe filter that considers the URL""" def __init__(self, path=None): self.urls_seen = set() RFPDupeFilter.__init__(self, path) def request_seen(self, request): if request.url in self.urls_seen: return True else: self.urls_seen.add(request.url)
不要忘记配置上:
DUPEFILTER_CLASS ='scraper.custom_filters.SeenURLFilter'
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/telnetconsole.html
Scrapy运行的有telnet服务,咱们能够经过这个功能来获得一些性能指标。经过telnet命令链接到6023端口,而后就会获得一个在爬虫内部环境的Python命令行。要当心的是,若是你在这里运行了一些阻塞的操做,好比time.sleep(),正在运行的爬虫就会被停止。经过内建的est()函数能够打印出一些性能指标。
打开第一个命令行,运行如下代码:
```shell
$ telnet localhost 6023
est()
...
len(engine.downloader.active) : 16
...
len(engine.slot.scheduler.mqs) : 4475
...
len(engine.scraper.slot.active) : 115
engine.scraper.slot.active_size : 117760
engine.scraper.slot.itemproc_size : 105
```
在这里咱们忽略了dqs指标,若是你启用了持久化支持的功能,亦即设置了JOBDIR设置项,你也
会获得非零的dqs(len(engine.slot.scheduler.dqs)
)值,这时候就应当把dqs加到mqs上去,以便后续的分析。
mqs
len(engine.downloader.active)
CONCURRENT_REQUESTS
值是同样的,因此也没问题。len(engine.scraper.slot.active)
scraper
中处理,这些响应的总的大小能够从engine.scraper.slot.active_size
指标获得,共是115kb。除了这些响应,pipeline
中正有105个Item
engine.scraper.slot.itemproc_size
中得知,也就是说,还有10个正在爬虫中进行处理。总的来讲,能够肯定下载器就是系统的瓶颈,由于在下载器以前有不少请求(mqs)在队列中等待处理,下载器已经被充分地利用了;在下载器以后,咱们有一个或多或少比较很稳定的工做量(能够经过屡次调用est()
函数来证明这一点)。另外一个信息来源是stats
对象,它通常状况下会在爬虫运行结束后打印出来。而在telnet
中,咱们能够随时经过stats.get_stats()
获得一个dict
对象,并用p()函数打印出来:
$ p(stats.get_stats()) {'downloader/request_bytes': 558330, ... 'item_scraped_count': 2485, ...}
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/stats.html
scrapy/statscollectors.py
""" Scrapy extension for collecting scraping stats """ import pprint import logging logger = logging.getLogger(__name__) class StatsCollector(object): def __init__(self, crawler): self._dump = crawler.settings.getbool('STATS_DUMP') self._stats = {} def get_value(self, key, default=None, spider=None): return self._stats.get(key, default) def get_stats(self, spider=None): return self._stats def set_value(self, key, value, spider=None): self._stats[key] = value def set_stats(self, stats, spider=None): self._stats = stats def inc_value(self, key, count=1, start=0, spider=None): d = self._stats d[key] = d.setdefault(key, start) + count def max_value(self, key, value, spider=None): self._stats[key] = max(self._stats.setdefault(key, value), value) def min_value(self, key, value, spider=None): self._stats[key] = min(self._stats.setdefault(key, value), value) def clear_stats(self, spider=None): self._stats.clear() def open_spider(self, spider): pass def close_spider(self, spider, reason): if self._dump: logger.info("Dumping Scrapy stats:\n" + pprint.pformat(self._stats), extra={'spider': spider}) self._persist_stats(self._stats, spider) def _persist_stats(self, stats, spider): pass class MemoryStatsCollector(StatsCollector): def __init__(self, crawler): super(MemoryStatsCollector, self).__init__(crawler) self.spider_stats = {} def _persist_stats(self, stats, spider): self.spider_stats[spider.name] = stats class DummyStatsCollector(StatsCollector): def get_value(self, key, default=None, spider=None): return default def set_value(self, key, value, spider=None): pass def set_stats(self, stats, spider=None): pass def inc_value(self, key, count=1, start=0, spider=None): pass def max_value(self, key, value, spider=None): pass def min_value(self, key, value, spider=None): pass
404页面收集
class JobboleSpider(scrapy.Spider): name = 'jobbole' allowed_domains = ['blog.jobbole.com'] start_urls = ['http://blog.jobbole.com/all-posts/'] # 收集404的url和数量 handle_httpstatus_list = [404,] def __init__(self): self.fail_urls = [] super(JobboleSpider, self).__init__() def parse(self, response): if response.status == 404: self.fail_urls.append(response.url) self.crawler.stats.inc_value('failed_url') ...
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/signals.html
在spider关闭时对fail_urls
进行处理
def __init__(self): self.fail_urls = [] super(JobboleSpider, self).__init__() dispatcher.connect(self.handle_spider_closed, signal=signals.spider_closed) def handle_spider_closed(self, spider, response): self.crawler.stats.set_value('failed_urls', ','.join(self.fail_urls)) ...
https://scrapy-chs.readthedocs.io/zh_CN/1.0/topics/extensions.html
scrapy/extensions
包里有一些扩展实例
http://doc.scrapy.org/en/master/topics/practices.html#distributed-crawls
Scrapy并无提供内置的机制支持分布式(多服务器)爬取。不过仍是有办法进行分布式爬取, 取决于您要怎么分布了。
若是您有不少spider,那分布负载最简单的办法就是启动多个Scrapyd,并分配到不一样机器上。
若是想要在多个机器上运行一个单独的spider,那您能够将要爬取的url进行分块,并发送给spider。 例如:
首先,准备要爬取的url列表,并分配到不一样的文件url里:
http://somedomain.com/urls-to-crawl/spider1/part1.list http://somedomain.com/urls-to-crawl/spider1/part2.list http://somedomain.com/urls-to-crawl/spider1/part3.list
接着在3个不一样的Scrapd服务器中启动spider。spider会接收一个(spider)参数 part , 该参数表示要爬取的分块:
curl http://scrapy1.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=1 curl http://scrapy2.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=2 curl http://scrapy3.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=3
https://github.com/rmax/scrapy-redis
Redis 命令参考
http://redisdoc.com/
pip install scrapy-redis
Scrapy 是一个通用的爬虫框架,可是不支持分布式,Scrapy-redis是为了更方便地实现Scrapy分布式爬取,而提供了一些以redis为基础的组件(仅有组件)。
Scrapy-redis提供了下面四种组件(components):(四种组件意味着这四个模块都要作相应的修改)
- Scheduler
- Duplication Filter
- Item Pipeline
- Base Spider
如上图所⽰示,scrapy-redis在scrapy的架构上增长了redis,基于redis的特性拓展了以下组件:
Scheduler
collection.deque
(双向队列)造成了本身的Scrapy queue(https://github.com/scrapy/queuelib/blob/master/queuelib/queue.py)),可是Scrapy多个spider不能共享
待爬取队列Scrapy queue, 即Scrapy自己不支持爬虫分布式,scrapy-redis
的解决是把这个Scrapy queue
换成redis
数据库(也是指redis队列),从同一个redis-server
存放要爬取的request
,便能让多个spider
去同一个数据库里读取。Scrapy中跟“待爬队列”直接相关的就是调度器Scheduler
,它负责对新的request进行入列操做(加入Scrapy queue
),取出下一个要爬取的request
(从Scrapy queue
中取出)等操做。它把待爬队列按照优先级创建了一个字典结构,好比:
{ 优先级0 : 队列0 优先级1 : 队列1 优先级2 : 队列2 }
而后根据request中的优先级,来决定该入哪一个队列,出列时则按优先级较小的优先出列。为了管理这个比较高级的队列字典,Scheduler
须要提供一系列的方法。可是原来的Scheduler
已经没法使用,因此使用Scrapy-redis
的scheduler
组件。
Duplication Filter
request
去重功能,Scrapy中把已经发送的request指纹放入到一个集合中,把下一个request的指纹拿到集合中比对,若是该指纹存在于集合中,说明这个request发送过了,若是没有则继续操做。这个核心的判重功能是这样实现的:def request_seen(self, request): # self.request_figerprints就是一个指纹集合 fp = self.request_fingerprint(request) # 这就是判重的核心操做 if fp in self.fingerprints: return True self.fingerprints.add(fp) if self.file: self.file.write(fp + os.linesep)
在scrapy-redis
中去重是由Duplication Filter
组件来实现的,它经过redis
的set
不重复的特性,巧妙的实现了Duplication Filter
去重。scrapy-redis
调度器从引擎接受request
,将request
的指纹存⼊redis
的set
检查是否重复,并将不重复的request push
写⼊redis
的 request queue
。
引擎请求request(Spider发出的)
时,调度器从redis
的request queue
队列⾥里根据优先级pop
出⼀个request
返回给引擎,引擎将此request
发给spider
处理。
Item Pipeline
引擎将(Spider返回的)爬取到的Item给Item Pipeline
,scrapy-redis
的Item Pipeline
将爬取到的 Item
存⼊redis
的 items queue
。
修改后的Item Pipeline
能够很方便的根据 key
从 items queue
提取item
,从⽽实现 items processes
集群。
Base Spider
不在使用scrapy原有的Spider类,重写的RedisSpider
继承了Spider
和RedisMixin
这两个类,RedisMixin
是用来从redis
读取url
的类。
当咱们生成一个Spider
继承RedisSpider
时,调用setup_redis
函数,这个函数会去链接redis
数据库,而后会设置signals
(信号):
一个是当spider空闲时候的signal,会调用
spider_idle
函数,这个函数调用schedule_next_request
函数,保证spider
是一直活着的状态,而且抛出DontCloseSpider
异常。一个是当抓到一个item时的signal,会调用
item_scraped
函数,这个函数会调用schedule_next_request
函数,获取下一个request
。
scrapy-redis的源码并很少,工程的主体仍是是redis和scrapy两个库,工程自己实现的东西不是不少,这个工程就像胶水同样,把这两个插件粘结了起来。下面咱们来看看,scrapy-redis的每个源代码文件都实现了什么功能,最后如何实现分布式的爬虫系统
负责根据setting
中配置实例化redis
链接。被dupefilter
和scheduler
调用,总之涉及到redis
存取的都要使用到这个模块。
import six from scrapy.utils.misc import load_object from . import defaults # 链接redis数据库 # Shortcut maps 'setting name' -> 'parmater name'. SETTINGS_PARAMS_MAP = { 'REDIS_URL': 'url', 'REDIS_HOST': 'host', 'REDIS_PORT': 'port', 'REDIS_ENCODING': 'encoding', } def get_redis_from_settings(settings): """Returns a redis client instance from given Scrapy settings object. This function uses ``get_client`` to instantiate the client and uses ``defaults.REDIS_PARAMS`` global as defaults values for the parameters. You can override them using the ``REDIS_PARAMS`` setting. Parameters ---------- settings : Settings A scrapy settings object. See the supported settings below. Returns ------- server Redis client instance. Other Parameters ---------------- REDIS_URL : str, optional Server connection URL. REDIS_HOST : str, optional Server host. REDIS_PORT : str, optional Server port. REDIS_ENCODING : str, optional Data encoding. REDIS_PARAMS : dict, optional Additional client parameters. """ params = defaults.REDIS_PARAMS.copy() params.update(settings.getdict('REDIS_PARAMS')) # XXX: Deprecate REDIS_* settings. for source, dest in SETTINGS_PARAMS_MAP.items(): val = settings.get(source) if val: params[dest] = val # Allow ``redis_cls`` to be a path to a class. if isinstance(params.get('redis_cls'), six.string_types): params['redis_cls'] = load_object(params['redis_cls']) return get_redis(**params) # Backwards compatible alias. from_settings = get_redis_from_settings def get_redis(**kwargs): """Returns a redis client instance. Parameters ---------- redis_cls : class, optional Defaults to ``redis.StrictRedis``. url : str, optional If given, ``redis_cls.from_url`` is used to instantiate the class. **kwargs Extra parameters to be passed to the ``redis_cls`` class. Returns ------- server Redis client instance. """ redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS) url = kwargs.pop('url', None) if url: return redis_cls.from_url(url, **kwargs) else: return redis_cls(**kwargs)
scrapy-redis默认配置
import redis # For standalone use. DUPEFILTER_KEY = 'dupefilter:%(timestamp)s' PIPELINE_KEY = '%(spider)s:items' REDIS_CLS = redis.StrictRedis REDIS_ENCODING = 'utf-8' # Sane connection defaults. # 套接字的超时时间、等待时间等 REDIS_PARAMS = { 'socket_timeout': 30, 'socket_connect_timeout': 30, 'retry_on_timeout': True, 'encoding': REDIS_ENCODING, } SCHEDULER_QUEUE_KEY = '%(spider)s:requests' SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue' SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter' SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter' START_URLS_KEY = '%(name)s:start_urls' START_URLS_AS_SET = False
负责执行requst
的去重,实现的颇有技巧性,使用redis
的set
数据结构。可是注意scheduler
并不使用其中用于在这个模块中实现的dupefilter
键作request
的调度,而是使用queue.py
模块中实现的queue
。
当request
不重复时,将其存入到queue
中,调度时将其弹出。
import logging import time from scrapy.dupefilters import BaseDupeFilter from scrapy.utils.request import request_fingerprint from . import defaults from .connection import get_redis_from_settings logger = logging.getLogger(__name__) # TODO: Rename class to RedisDupeFilter. class RFPDupeFilter(BaseDupeFilter): """Redis-based request duplicates filter. This class can also be used with default Scrapy's scheduler. """ logger = logger def __init__(self, server, key, debug=False): """Initialize the duplicates filter. Parameters ---------- server : redis.StrictRedis The redis server instance. key : str Redis key Where to store fingerprints. debug : bool, optional Whether to log filtered requests. """ self.server = server self.key = key self.debug = debug self.logdupes = True @classmethod def from_settings(cls, settings): """Returns an instance from given settings. This uses by default the key ``dupefilter:<timestamp>``. When using the ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as it needs to pass the spider name in the key. Parameters ---------- settings : scrapy.settings.Settings Returns ------- RFPDupeFilter A RFPDupeFilter instance. """ server = get_redis_from_settings(settings) # XXX: This creates one-time key. needed to support to use this # class as standalone dupefilter with scrapy's default scheduler # if scrapy passes spider on open() method this wouldn't be needed # TODO: Use SCRAPY_JOB env as default and fallback to timestamp. key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())} debug = settings.getbool('DUPEFILTER_DEBUG') return cls(server, key=key, debug=debug) @classmethod def from_crawler(cls, crawler): """Returns instance from crawler. Parameters ---------- crawler : scrapy.crawler.Crawler Returns ------- RFPDupeFilter Instance of RFPDupeFilter. """ return cls.from_settings(crawler.settings) def request_seen(self, request): """Returns True if request was already seen. Parameters ---------- request : scrapy.http.Request Returns ------- bool """ fp = self.request_fingerprint(request) # This returns the number of values added, zero if already exists. added = self.server.sadd(self.key, fp) return added == 0 def request_fingerprint(self, request): """Returns a fingerprint for a given request. Parameters ---------- request : scrapy.http.Request Returns ------- str """ return request_fingerprint(request) @classmethod def from_spider(cls, spider): settings = spider.settings server = get_redis_from_settings(settings) dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY) key = dupefilter_key % {'spider': spider.name} debug = settings.getbool('DUPEFILTER_DEBUG') return cls(server, key=key, debug=debug) def close(self, reason=''): """Delete data on close. Called by Scrapy's scheduler. Parameters ---------- reason : str, optional """ self.clear() def clear(self): """Clears fingerprints data.""" self.server.delete(self.key) def log(self, request, spider): """Logs given request. Parameters ---------- request : scrapy.http.Request spider : scrapy.spiders.Spider """ if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {'request': request}, extra={'spider': spider}) elif self.logdupes: msg = ("Filtered duplicate request %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {'request': request}, extra={'spider': spider}) self.logdupes = False
这个文件看起来比较复杂,重写了scrapy
自己已经实现的request
判重功能。由于自己scrapy
单机跑的话,只须要读取内存中的request
队列或者持久化的request
队列,就能判断此次要发出的request url
是否已经请求过或者正在调度(本地读就好了)。而分布式跑的话,就须要各个主机上的scheduler
都链接同一个数据库的同一个request池
来判断此次的请求是不是重复的了。
在这个文件中,经过继承BaseDupeFilter
重写他的方法,实现了基于redis
的判重。根据源代码来看,scrapy-redis
使用了scrapy
自己的一个fingerprint
即request_fingerprint
,这个函数在前面去重原理中已经说过了.
这个类经过链接redis
,使用一个key
来向redis
的一个set
中插入fingerprint
(这个key
对于同一种spider
是相同的,redis
是一个key-value
的数据库,若是key
是相同的,访问到的值就是相同的,这里使用spider名字
+DupeFilter的key
就是为了在不一样主机上的不一样爬虫实例,只要属于同一种spider
,就会访问到同一个set
,而这个set
就是他们的url
判重池),若是返回值为0
,说明该set
中该fingerprint
已经存在(由于集合是没有重复值的),则返回False
,若是返回值为1
,说明添加了一个fingerprint
到set
中,则说明这个request
没有重复,因而返回True
,还顺便把新fingerprint
加入到数据库中了。 DupeFilter
判重会在scheduler
类中用到,每个request
在进入调度以前都要进行判重,若是重复就不须要参加调度,直接舍弃就行了,否则就是白白浪费资源。
"""A pickle wrapper module with protocol=-1 by default.""" try: import cPickle as pickle # PY2 except ImportError: import pickle def loads(s): return pickle.loads(s) def dumps(obj): return pickle.dumps(obj, protocol=-1)
这里实现了loads
和dumps
两个函数,其实就是实现了一个序列化器。
由于redis
数据库不能存储复杂对象(key
部分只能是字符串,value
部分只能是字符串,字符串列表,字符串集合和hash),因此咱们存啥都要先串行化成文本才行。
这里使用的就是python
的pickle
模块,一个兼容py2和py3的串行化工具。这个serializer
主要用于一会的scheduler
存reuqest
对象。
这是是用来实现分布式处理的做用。它将Item
存储在redis
中以实现分布式处理。因为在这里须要读取配置,因此就用到了from_crawler()
函数。
from scrapy.utils.misc import load_object from scrapy.utils.serialize import ScrapyJSONEncoder from twisted.internet.threads import deferToThread from . import connection, defaults default_serialize = ScrapyJSONEncoder().encode class RedisPipeline(object): """Pushes serialized item into a redis list/queue Settings -------- REDIS_ITEMS_KEY : str Redis key where to store items. REDIS_ITEMS_SERIALIZER : str Object path to serializer function. """ def __init__(self, server, key=defaults.PIPELINE_KEY, serialize_func=default_serialize): """Initialize pipeline. Parameters ---------- server : StrictRedis Redis client instance. key : str Redis key where to store items. serialize_func : callable Items serializer function. """ self.server = server self.key = key self.serialize = serialize_func @classmethod def from_settings(cls, settings): params = { 'server': connection.from_settings(settings), } if settings.get('REDIS_ITEMS_KEY'): params['key'] = settings['REDIS_ITEMS_KEY'] if settings.get('REDIS_ITEMS_SERIALIZER'): params['serialize_func'] = load_object( settings['REDIS_ITEMS_SERIALIZER'] ) return cls(**params) @classmethod def from_crawler(cls, crawler): return cls.from_settings(crawler.settings) def process_item(self, item, spider): return deferToThread(self._process_item, item, spider) def _process_item(self, item, spider): key = self.item_key(item, spider) data = self.serialize(item) self.server.rpush(key, data) return item def item_key(self, item, spider): """Returns redis key based on given spider. Override this function to use a different key depending on the item and/or spider. """ return self.key % {'spider': spider.name}
pipelines
文件实现了一个item pipieline
类,和scrapy
的item pipeline
是同一个对象,经过从settings
中拿到咱们配置的REDIS_ITEMS_KEY
做为key
,把item
串行化以后存入redis
数据库对应的value
中(这个value
能够看出出是个list
,咱们的每一个item
是这个list
中的一个结点),这个pipeline
把提取出的item
存起来,主要是为了方便咱们后续处理数据。(集中处理放在同一台服务器,仍是各自保存各自的)
该文件实现了几个容器类,这些容器与redis进行交互,在交互时,会对request请求
进行编码和解码操做(序列化和反序列化)
from scrapy.utils.reqser import request_to_dict, request_from_dict from . import picklecompat class Base(object): """Per-spider base queue class""" def __init__(self, server, spider, key, serializer=None): """Initialize per-spider redis queue. Parameters ---------- server : StrictRedis Redis client instance. spider : Spider Scrapy spider instance. key: str Redis key where to put and get messages. serializer : object Serializer object with ``loads`` and ``dumps`` methods. """ if serializer is None: # Backward compatibility. # TODO: deprecate pickle. serializer = picklecompat if not hasattr(serializer, 'loads'): raise TypeError("serializer does not implement 'loads' function: %r" % serializer) if not hasattr(serializer, 'dumps'): raise TypeError("serializer '%s' does not implement 'dumps' function: %r" % serializer) self.server = server self.spider = spider self.key = key % {'spider': spider.name} self.serializer = serializer def _encode_request(self, request): """Encode a request object""" obj = request_to_dict(request, self.spider) return self.serializer.dumps(obj) def _decode_request(self, encoded_request): """Decode an request previously encoded""" obj = self.serializer.loads(encoded_request) return request_from_dict(obj, self.spider) def __len__(self): """Return the length of the queue""" raise NotImplementedError def push(self, request): """Push a request""" raise NotImplementedError def pop(self, timeout=0): """Pop a request""" raise NotImplementedError def clear(self): """Clear queue/stack""" self.server.delete(self.key) # 先进先出, 队列 class FifoQueue(Base): """Per-spider FIFO queue""" def __len__(self): """Return the length of the queue""" return self.server.llen(self.key) # 压头, 出尾 def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: return self._decode_request(data) # 有序队列 class PriorityQueue(Base): """Per-spider priority queue abstraction using redis' sorted set""" def __len__(self): """Return the length of the queue""" return self.server.zcard(self.key) def push(self, request): """Push a request""" data = self._encode_request(request) score = -request.priority # We don't use zadd method as the order of arguments change depending on # whether the class is Redis or StrictRedis, and the option of using # kwargs only accepts strings, not bytes. self.server.execute_command('ZADD', self.key, score, data) def pop(self, timeout=0): """ Pop a request timeout not support in this queue class """ # use atomic range/remove using multi/exec pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0]) # 后进先出 栈 class LifoQueue(Base): """Per-spider LIFO queue.""" def __len__(self): """Return the length of the stack""" return self.server.llen(self.key) # 压头, 出头 def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request)) def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.blpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.lpop(self.key) if data: return self._decode_request(data) # TODO: Deprecate the use of these names. SpiderQueue = FifoQueue SpiderStack = LifoQueue SpiderPriorityQueue = PriorityQueue
此扩展是对scrapy
中自带的scheduler
的替代(在settings
的SCHEDULER
变量中指出),正是利用此扩展实现crawler
的分布式调度。其利用的数据结构来自于queue
中实现的数据结构。
scrapy-redi
s所实现的两种分布式:爬虫分布式
以及item处理分布式
就是由模块scheduler
和模块pipelines
实现。上述其它模块做为为两者辅助的功能模块
import importlib import six from scrapy.utils.misc import load_object from . import connection, defaults # TODO: add SCRAPY_JOB support. class Scheduler(object): """Redis-based scheduler Settings -------- SCHEDULER_PERSIST : bool (default: False) Whether to persist or clear redis queue. SCHEDULER_FLUSH_ON_START : bool (default: False) Whether to flush redis queue on start. SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0) How many seconds to wait before closing if no message is received. SCHEDULER_QUEUE_KEY : str Scheduler redis key. SCHEDULER_QUEUE_CLASS : str Scheduler queue class. SCHEDULER_DUPEFILTER_KEY : str Scheduler dupefilter redis key. SCHEDULER_DUPEFILTER_CLASS : str Scheduler dupefilter class. SCHEDULER_SERIALIZER : str Scheduler serializer. """ def __init__(self, server, persist=False, flush_on_start=False, queue_key=defaults.SCHEDULER_QUEUE_KEY, queue_cls=defaults.SCHEDULER_QUEUE_CLASS, dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY, dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS, idle_before_close=0, serializer=None): """Initialize scheduler. Parameters ---------- server : Redis The redis server instance. persist : bool Whether to flush requests when closing. Default is False. flush_on_start : bool Whether to flush requests on start. Default is False. queue_key : str Requests queue key. queue_cls : str Importable path to the queue class. dupefilter_key : str Duplicates filter key. dupefilter_cls : str Importable path to the dupefilter class. idle_before_close : int Timeout before giving up. """ if idle_before_close < 0: raise TypeError("idle_before_close cannot be negative") self.server = server self.persist = persist self.flush_on_start = flush_on_start self.queue_key = queue_key self.queue_cls = queue_cls self.dupefilter_cls = dupefilter_cls self.dupefilter_key = dupefilter_key self.idle_before_close = idle_before_close self.serializer = serializer self.stats = None def __len__(self): return len(self.queue) @classmethod def from_settings(cls, settings): kwargs = { 'persist': settings.getbool('SCHEDULER_PERSIST'), 'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'), 'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'), } # If these values are missing, it means we want to use the defaults. optional = { # TODO: Use custom prefixes for this settings to note that are # specific to scrapy-redis. 'queue_key': 'SCHEDULER_QUEUE_KEY', 'queue_cls': 'SCHEDULER_QUEUE_CLASS', 'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY', # We use the default setting name to keep compatibility. 'dupefilter_cls': 'DUPEFILTER_CLASS', 'serializer': 'SCHEDULER_SERIALIZER', } for name, setting_name in optional.items(): val = settings.get(setting_name) if val: kwargs[name] = val # Support serializer as a path to a module. if isinstance(kwargs.get('serializer'), six.string_types): kwargs['serializer'] = importlib.import_module(kwargs['serializer']) server = connection.from_settings(settings) # Ensure the connection is working. server.ping() return cls(server=server, **kwargs) @classmethod def from_crawler(cls, crawler): instance = cls.from_settings(crawler.settings) # FIXME: for now, stats are only supported from this constructor instance.stats = crawler.stats return instance def open(self, spider): self.spider = spider try: self.queue = load_object(self.queue_cls)( server=self.server, spider=spider, key=self.queue_key % {'spider': spider.name}, serializer=self.serializer, ) except TypeError as e: raise ValueError("Failed to instantiate queue class '%s': %s", self.queue_cls, e) self.df = load_object(self.dupefilter_cls).from_spider(spider) if self.flush_on_start: self.flush() # notice if there are requests already in the queue to resume the crawl if len(self.queue): spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue)) def close(self, reason): if not self.persist: self.flush() def flush(self): self.df.clear() self.queue.clear() def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) self.queue.push(request) return True def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) return request def has_pending_requests(self): return len(self) > 0
这个文件重写了scheduler
类,用来代替scrapy.core.scheduler
的原有调度器。其实对原有调度器的逻辑没有很大的改变,主要是使用了redis
做为数据存储的媒介,以达到各个爬虫之间的统一调度。scheduler
负责调度各个spider
的request请求
,scheduler
初始化时,经过settings
文件读取queue
和dupefilters
的类型(通常就用上边默认的),配置queue
和dupefilters
使用的key
(通常就是spider name
加上queue
或者dupefilters
,这样对于同一种spider
的不一样实例,就会使用相同的数据块了)。每当一个request
要被调度时,enqueue_request
被调用,scheduler
使用dupefilters
来判断这个url
是否重复,若是不重复,就添加到queue
的容器中(先进先出,先进后出和优先级均可以,能够在settings
中配置)。当调度完成时,next_request
被调用,scheduler
就经过queue
容器的接口,取出一个request
,把他发送给相应的spider
,让spider
进行爬取工做。
设计的这个spider从redis中读取要爬的url,而后执行爬取,若爬取过程当中返回更多的url,那么继续进行直至全部的request完成。以后继续从redis中读取url,循环这个过程。
分析:在这个spider中经过signals.spider_idle
(空闲)信号实现对crawler
状态的监视。当idle时,返回新的make_requests_from_url(url)
给引擎,进而交给调度器调度。
from scrapy import signals from scrapy.exceptions import DontCloseSpider from scrapy.spiders import Spider, CrawlSpider from . import connection, defaults from .utils import bytes_to_str class RedisMixin(object): """Mixin class to implement reading urls from a redis queue.""" redis_key = None redis_batch_size = None redis_encoding = None # Redis client placeholder. server = None def start_requests(self): """Returns a batch of start requests from redis.""" return self.next_requests() def setup_redis(self, crawler=None): """Setup redis connection and idle signal. This should be called after the spider has set its crawler object. """ if self.server is not None: return if crawler is None: # We allow optional crawler argument to keep backwards # compatibility. # XXX: Raise a deprecation warning. crawler = getattr(self, 'crawler', None) if crawler is None: raise ValueError("crawler is required") settings = crawler.settings if self.redis_key is None: self.redis_key = settings.get( 'REDIS_START_URLS_KEY', defaults.START_URLS_KEY, ) self.redis_key = self.redis_key % {'name': self.name} if not self.redis_key.strip(): raise ValueError("redis_key must not be empty") if self.redis_batch_size is None: # TODO: Deprecate this setting (REDIS_START_URLS_BATCH_SIZE). self.redis_batch_size = settings.getint( 'REDIS_START_URLS_BATCH_SIZE', settings.getint('CONCURRENT_REQUESTS'), ) try: self.redis_batch_size = int(self.redis_batch_size) except (TypeError, ValueError): raise ValueError("redis_batch_size must be an integer") if self.redis_encoding is None: self.redis_encoding = settings.get('REDIS_ENCODING', defaults.REDIS_ENCODING) self.logger.info("Reading start URLs from redis key '%(redis_key)s' " "(batch size: %(redis_batch_size)s, encoding: %(redis_encoding)s", self.__dict__) self.server = connection.from_settings(crawler.settings) # The idle signal is called when the spider has no requests left, # that's when we will schedule new requests from redis queue crawler.signals.connect(self.spider_idle, signal=signals.spider_idle) def next_requests(self): """Returns a request to be scheduled or none.""" use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET) fetch_one = self.server.spop if use_set else self.server.lpop # XXX: Do we need to use a timeout here? found = 0 # TODO: Use redis pipeline execution. while found < self.redis_batch_size: data = fetch_one(self.redis_key) if not data: # Queue empty. break req = self.make_request_from_data(data) if req: yield req found += 1 else: self.logger.debug("Request not made from data: %r", data) if found: self.logger.debug("Read %s requests from '%s'", found, self.redis_key) def make_request_from_data(self, data): """Returns a Request instance from data coming from Redis. By default, ``data`` is an encoded URL. You can override this method to provide your own message decoding. Parameters ---------- data : bytes Message from redis. """ url = bytes_to_str(data, self.redis_encoding) return self.make_requests_from_url(url) def schedule_next_requests(self): """Schedules a request if available""" # TODO: While there is capacity, schedule a batch of redis requests. for req in self.next_requests(): self.crawler.engine.crawl(req, spider=self) def spider_idle(self): """Schedules a request if available, otherwise waits.""" # XXX: Handle a sentinel to close the spider. self.schedule_next_requests() raise DontCloseSpider class RedisSpider(RedisMixin, Spider): """Spider that reads urls from redis queue when idle. Attributes ---------- redis_key : str (default: REDIS_START_URLS_KEY) Redis key where to fetch start URLs from.. redis_batch_size : int (default: CONCURRENT_REQUESTS) Number of messages to fetch from redis on each attempt. redis_encoding : str (default: REDIS_ENCODING) Encoding to use when decoding messages from redis queue. Settings -------- REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls") Default Redis key where to fetch start URLs from.. REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS) Default number of messages to fetch from redis on each attempt. REDIS_START_URLS_AS_SET : bool (default: False) Use SET operations to retrieve messages from the redis queue. If False, the messages are retrieve using the LPOP command. REDIS_ENCODING : str (default: "utf-8") Default encoding to use when decoding messages from redis queue. """ @classmethod def from_crawler(self, crawler, *args, **kwargs): obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs) obj.setup_redis(crawler) return obj class RedisCrawlSpider(RedisMixin, CrawlSpider): """Spider that reads urls from redis queue when idle. Attributes ---------- redis_key : str (default: REDIS_START_URLS_KEY) Redis key where to fetch start URLs from.. redis_batch_size : int (default: CONCURRENT_REQUESTS) Number of messages to fetch from redis on each attempt. redis_encoding : str (default: REDIS_ENCODING) Encoding to use when decoding messages from redis queue. Settings -------- REDIS_START_URLS_KEY : str (default: "<spider.name>:start_urls") Default Redis key where to fetch start URLs from.. REDIS_START_URLS_BATCH_SIZE : int (deprecated by CONCURRENT_REQUESTS) Default number of messages to fetch from redis on each attempt. REDIS_START_URLS_AS_SET : bool (default: True) Use SET operations to retrieve messages from the redis queue. REDIS_ENCODING : str (default: "utf-8") Default encoding to use when decoding messages from redis queue. """ @classmethod def from_crawler(self, crawler, *args, **kwargs): obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs) obj.setup_redis(crawler) return obj
spider
的改动也不是很大,主要是经过connect
接口,给spider
绑定了spider_idle
信号,spider
初始化时,经过setup_redis
函数初始化和redis
的链接,以后经过next_requests
函数从redis
中取出strat url
,使用的key
是settings
中REDIS_START_URLS_AS_SET
定义的(注意了这里的初始化url
池和咱们上边的queue
的url
池不是一个东西,queue
的池是用于调度的,初始化url
池是存放入口url
的,他们都存在redis
中,可是使用不一样的key
来区分,就当成是不一样的表吧),spider
使用少许的start url
,能够发展出不少新的url
,这些url
会进入scheduler
进行判重和调度。直到spider
跑到调度池内没有url
的时候,会触发spider_idle
信号,从而触发spider
的next_requests
函数,再次从redis
的start url
池中读取一些url
。
py2和py3字符串兼容
import six def bytes_to_str(s, encoding='utf-8'): """Returns a str if a bytes object is given.""" if six.PY3 and isinstance(s, bytes): return s.decode(encoding) return s
这个工程经过重写scheduler
和spider
类,实现了调度
、spider启动
和redis的交互
。实现新的dupefilter
和queue
类,达到了判重
和调度容器
和redis的交互
,由于每一个主机上的爬虫进程都访问同一个redis
数据库,因此调度和判重都统一进行统一管理,达到了分布式爬虫的目的。 当spider
被初始化时,同时会初始化一个对应的scheduler
对象,这个调度器对象经过读取settings
,配置好本身的调度容器queue
和判重工具dupefilter
。每当一个spider
产出一个request
的时候,scrapy
内核会把这个reuqest
递交给这个spider
对应的scheduler
对象进行调度,scheduler
对象经过访问redis
对request
进行判重,若是不重复就把他添加进redis
中的调度池。当调度条件知足时,scheduler
对象就从redis
的调度池中取出一个request
发送给spider
,让他爬取。当spider
爬取的全部暂时可用url
以后,scheduler
发现这个spider
对应的redis
的调度池空了,因而触发信号spider_idle
,spider
收到这个信号以后,直接链接redis
读取strart url
池,拿去新的一批url
入口,而后再次重复上边的工做。
Scrapy-Redis调度的任务是Request对象,里面信息量比较大(不只包含url,还有callback函数、headers等信息),可能致使的结果就是会下降爬虫速度、并且会占用Redis大量的存储空间
,因此若是要保证效率,那么就须要必定硬件水平,尤为是主机。
https://piaosanlang.gitbooks.io/spiders/09day/section9.1.html
https://pypi.org/project/pybloomfiltermmap3/#description
https://pypi.org/project/pybloom_live
scrapy_redis
是利用set
数据结构来去重的,去重的对象是request
的fingerprint
。
去重原理说过了.
def request_seen(self, request): fp = self.request_fingerprint(request) # This returns the number of values added, zero if already exists. added = self.server.sadd(self.key, fp) return added == 0
若是要使用Bloomfilter优化,能够修改去重函数request_seen
def request_seen(self, request): fp = self.request_fingerprint(request) if self.bf.isContains(fp): # 若是已经存在 return True else: self.bf.insert(fp) return False
self.bf
是类Bloomfilter()
的实例化
# encoding=utf-8 import redis from hashlib import md5 class SimpleHash(object): def __init__(self, cap, seed): self.cap = cap self.seed = seed def hash(self, value): ret = 0 for i in range(len(value)): ret += self.seed * ret + ord(value[i]) return (self.cap - 1) & ret class BloomFilter(object): def __init__(self, host='localhost', port=6379, db=0, blockNum=1, key='bloomfilter'): """ :param host: the host of Redis :param port: the port of Redis :param db: witch db in Redis :param blockNum: one blockNum for about 90,000,000; if you have more strings for filtering, increase it. :param key: the key's name in Redis """ self.server = redis.Redis(host=host, port=port, db=db) self.bit_size = 1 << 31 # Redis的String类型最大容量为512M,现使用256M= 2^8 *2^20 字节 = 2^28 * 2^3bit self.seeds = [5, 7, 11, 13, 31, 37, 61] self.key = key self.blockNum = blockNum self.hashfunc = [] for seed in self.seeds: self.hashfunc.append(SimpleHash(self.bit_size, seed)) def isContains(self, str_input): if not str_input: return False m5 = md5() m5.update(str_input) str_input = m5.hexdigest() ret = True name = self.key + str(int(str_input[0:2], 16) % self.blockNum) for f in self.hashfunc: loc = f.hash(str_input) ret = ret & self.server.getbit(name, loc) return ret def insert(self, str_input): m5 = md5() m5.update(str_input) str_input = m5.hexdigest() name = self.key + str(int(str_input[0:2], 16) % self.blockNum) for f in self.hashfunc: loc = f.hash(str_input) self.server.setbit(name, loc, 1) if __name__ == '__main__': """ 第一次运行时会显示 not exists!,以后再运行会显示 exists! """ bf = BloomFilter() if bf.isContains('http://www.baidu.com'): # 判断字符串是否存在 print 'exists!' else: print 'not exists!' bf.insert('http://www.baidu.com')
基于Redis
的Bloomfilter
去重,既用上了Bloomfilter
的海量去重能力,又用上了Redis
的可持久化能力,基于Redis
也方便分布式机器的去重
https://github.com/scrapy/scrapyd
在Scrapy的默认配置中,是根据url进行去重的。这个对付通常网站是够的。可是有一些网站的SEO作的很变态:为了让爬虫多抓,会根据request,动态的生成一些连接,致使爬虫 在网站上抓取大量的随机页面,甚至是死循环。。
为了解决这个问题,有2个方案:
(1) 在setting.py中,设定爬虫的嵌套次数上限(全局设定,实际是经过DepthMiddleware实现的):
DEPTH_LIMIT = 20
(2) 在parse中经过读取response来自行判断(spider级别设定) :
def parse(self, response): if response.meta['depth'] > 100: print ('Loop?')