分析这个项目的源码缘由是须要有去重过滤,增量爬取两个功能,而scrapy-redis
项目已经帮咱们实现了,想看看他是怎么实现的。这里只贴出部分主要代码,查看时请打开源码对照,笔记有点长,建议看的童鞋按部分看。这是第一次分析源码,限于我的水平,若有错误恳请指正,谢谢!python
地址:https://github.com/rmax/scrapy-redis/tree/master/src/scrapy_redislinux
tips: 源码涉及scrapy的方法,不知道的在文档里搜一下就知道它的做用了,redis也是。git
安装git:https://git-scm.com/download/wingithub
git clone https://github.com/rmax/scrapy-redis.git
查看源码的话仍是visual studio code 和pycharm方便,只要安装了某个包,在源码里用Ctrl+鼠标左键
点击方法名就能够跳转到包的源码里。在linux下,则麻烦点要安装vim跳转的插件exuberant-ctags
,有兴趣能够本身百度。redis
pip install --upgrade scrapy pip install --upgrade redis
这里仅copy部分关键代码用于理清原理,所有代码请查看GitHub源码。mongodb
__init__.py
。从当前目录下的connection.py
文件中import两个函数get_redis
和get_redis_from_settings
__init__.py
文件from .connection import ( # NOQA get_redis, get_redis_from_settings, )
接着先看看get_redis
函数。此函数返回一个redis客户端实例。此函数定义了一个redis_cls
类,其值为redis.StrictRedis
,是从default.py
里的设置的值,全部默认值都放在了这个文件。(这里就忽略了)docker
还有一个url
,默认为None
。若是scrapy的settings.py
启用了REDIS_URL
这个参数,就会传递到这里,而后调用redis.StrictRedis
的类方法from_url
,这个方法返回一个链接到传入url
的redis客户端对象。若是scrapy的settings.py
没有启用REDIS_URL
这个参数,则返回一个redis的默认客户端对象,即默认链接到redis://[:password]@localhost:6379/0
,而不是给定的redis地址。数据库
connection.py
文件def get_redis(**kwargs): redis_cls = kwargs.pop('redis_cls', defaults.REDIS_CLS) url = kwargs.pop('url', None) if url: return redis_cls.from_url(url, **kwargs) else: return redis_cls(**kwargs)
再看第二个函数get_redis_from_settings
,它有个参数settings
。这个函数首先设置了一个从当前目录下的defaults.py
获取默认参数的副本,而后再用从scrapy项目的settings.py
中获取的有关redis字典型的配置参数来更新替换默认参数;而后引用six
库作了python版本兼容,最后返回个redis客户端实例。注:scrapy的getdict方法用于将settings里的配置转为字典json
connection.py
vim
def get_redis_from_settings(settings): params = defaults.REDIS_PARAMS.copy() params.update(settings.getdict('REDIS_PARAMS')) # XXX: Deprecate REDIS_* settings. for source, dest in SETTINGS_PARAMS_MAP.items(): val = settings.get(source) if val: params[dest] = val # Allow ``redis_cls`` to be a path to a class. if isinstance(params.get('redis_cls'), six.string_types): params['redis_cls'] = load_object(params['redis_cls']) return get_redis(**params)
这样__init__
以后,就可以实例化一个链接到本身设置的redis
地址的redis
客户端实例了。
可以链接redis
后就要将scrapy
请求的url
存到redis
。这里做者实现了个调度器Scheduler
类来替换scrapy
默认的调度器scrapy.core.scheduler.Scheduler
。在本身项目的配置文件settings.py
中设置成SCHEDULER = "scrapy_redis.scheduler.Scheduler"
来替换默认的调度器。
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
scheduler.py
就只有一个Scheduler
类。先看__init__
函数,只需传入一个server
参数,即本身的redis
实例,其余均是选默认参数。def __init__(self, server, ...): self.server = server ...
Scheduler
类的类方法from_settings
,设置了个字典kargs
用于从本身项目的settings.py
中读取参数SCHEDULER_PERSIST
,SCHEDULER_FLUSH_ON_START
,SCHEDULER_IDLE_BEFORE_CLOSE
等,其中SCHEDULER_PERSIST这个参数就是用于实现增量爬取功能的,若是为TRUE则已经存入redis
队列里的url
就会一直保存不会清空,在咱们中止了爬虫,下次再继续运行时就能够直接跳过已经在redis
队列里的url
了;接着这里还有个可选字典optional
用于替换刚才提到的初始化函数init
里的默认参数;接着将optional
设置了的值加入到kwargs
里;接着做者为了支持本地文件能像包同样导入,就用importlib.import_module
函数转化了下;最后实例化一个链接到本身的redis
实例对象,检查对象连通性;return cls()
中的cls
返回的是一个redis
实例做为参数的Scheduler
类对象自己,它被下一个类方法from_crawler
调用,这样调用类方法后返回cls
就会调用这个类的__init__
方法再次初始化。@classmethod def from_settings(cls, settings): kwargs = { 'persist': settings.getbool('SCHEDULER_PERSIST'), ... } optional = { 'queue_key': 'SCHEDULER_QUEUE_KEY', ... } for name, setting_name in optional.items(): val = settings.get(setting_name) if val: kwargs[name] = val server = connection.from_settings(settings) server.ping() return cls(server=server, **kwargs)
scrapy是如何调用自定义scheduler的
from_crawler
也是类方法,它须要传入一个crawler
对象做为参数,便是本身项目中的crawler
;接着调用该类自己的类方法from_settings
,并将crawler.settings
做为参数传入,像上面说的同样,就会获得一个包含本身项目配置的redis
实例;获取本身项目crawler
的stats
状态,返回实例。@classmethod def from_crawler(cls, crawler): instance = cls.from_settings(crawler.settings) instance.stats = crawler.stats return instance
open
,传入一个spider
对象做为参数;用load_object
模块加载scrapy-redis
项目默认配置中的队列类,默认为scrapy_redis.queue.PriorityQueue
,传入参数队列类必须的参数server, spider, key, serializer
等(做者在queue.py
定义了3中队列类,都是操做redis
的,等下咱们再看)。跟队列类同样,加载去重过滤类,调用这个类的类方法from_spider
,即dupefilter.py
里的类方法,而from_spider
,传入一个本身项目的spider
对象,获取spider
对象对应的配置,获取一个链接到本身配置的redis
地址的redis
对象,而后将本身项目的spider name
结合scrapy-redis
的默认配置生成spider_name:dupefilter
做为过滤去重的redis key
,搜索官网可知debug
为本身项目的默认值False
,最后返回调用此类方法的对象自己,这样就获得了一个链接到本身配置的redis
实例和本身项目配置及默认配置的spider
对象这二者结合的对象;接着判断是否清空redis
的去重队列,默认不清空;经过判断队列长度判断是否还有请求在爬取。(这个过程感受挺难理解的,下一篇笔记会用视频记录pycharm debug类方法调用的过程)def open(self, spider): self.spider = spider ... self.df = load_object(self.dupefilter_cls).from_spider(spider) ...
dupefilter.py
的类方法from_spider
class RFPDupeFilter(BaseDupeFilter): @classmethod def from_spider(cls, spider): settings = spider.settings server = get_redis_from_settings(settings) dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY) key = dupefilter_key % {'spider': spider.name} debug = settings.getbool('DUPEFILTER_DEBUG') return cls(server, key=key, debug=debug)
close
和flush
函数,经过persist
来肯定是否清空去重队列和请求队列,默认False
,可是语句为if not False
,即为True
,因此默认会清空;close
的参数reason
为scrapy
默认异常cancelled
操做。def close(self, reason): if not self.persist: self.flush() def flush(self): self.df.clear() self.queue.clear()
queue.py
的实例方法clear
def clear(self): """Clear queue/stack""" self.server.delete(self.key)
dupfilter.py
的实例方法clear
def clear(self): """Clears fingerprints data.""" self.server.delete(self.key)
enqueue_request
,顾名思义即为入列请求的函数;传入scrapy
的request
对象,判断request
的dont_filter
参数,默认为False
和上面获得的去重过滤对象self.df
的request_seen
方法,它又调用request_fingerprint
方法,request_fingerprint
方法调用request
的默认方法request_fingerprint
来获取请求的指纹,而后将指纹做为值存入redis
的去重队列中,若是存入成功,则redis
返回0,即该请求的指纹没有重复,返回added == 0
,if not request.dont_filter and self.df.request_seen(request)
也就是if not False and 0 == 0
时,调用去重过滤对象self.df
的log
函数,当这个函数的参数debug
为True
时启用log
库的debug
模式记录日志,不然记录日志添加no more duplicates will be shown
,记录后将参数self.logdupes
设置为False
,以后返回False
;接着,用scrapy
的stats.inc_value
收集统计spider
的状态(不太理解这个函数,有知道的童鞋能够告知下,谢谢),最后入列请求后返回True
。def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False if self.stats: self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) self.queue.push(request) return True
dupefilter.py
的实例方法request_seen
def request_seen(self, request): fp = self.request_fingerprint(request) added = self.server.sadd(self.key, fp) return added == 0
dupefilter.py
的实例方法log
def log(self, request, spider): if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {'request': request}, extra={'spider': spider}) elif self.logdupes: msg = ("Filtered duplicate request %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {'request': request}, extra={'spider': spider}) self.logdupes = False
next_request
函数;block_pop_timeout
为默认值0,调用redis
的pop
每隔0秒从队列取出一个请求,取出操做使用redis
的pipeline
,要先执行multi()
操做,而后执行取请求操做zrange(0, 0)
取出一个请求并用zremrangebyrank(0, 0)
删除这个索引对应的请求,而后执行execute
获取结果;最后返回解码json
格式后的结果;若是取出了请求而且状态不为None
时,用scrapy
的stats.inc_value
收集统计spider
的状态,以后返回request
请求。def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout) if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider) return request
queue.py
的实例方法pop
def pop(self, timeout=0): pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0])
在本身项目的settings
替换成scrapy-redis
的Scheduler
后,就会将项目的crawler
对象传入到Scheduler
类中,Scheduler
类会生成一个本身定义的redis
对象,一个去重过滤的df
对象,一个存取请求的队列对象queue
,再进行一些spider
对象状态值的统计,请求的入列、清空等操做,通过这部分就实现了去重过滤功能了。dupefilter.py
的方法基本在这里都用到了,因此就不单独再分析了。
scheduler部分只用到了PriorityQueue
有优先级的队列。做者其实在queue.py
中实现了FifoQueue
、PriorityQueue
、LifoQueue
3中操做redis
的方案,下面逐一看一下。
先看3个类都继承了的基类Base
;初始化须要一个redis
客户端实例server
,一个spider
实例,一个key
,一个默认为None
的serializer
;若是serializer
为None
则pass
,异常处理,若是没有loads
、dumps
则报错;初始化参数。
接下来是私有方法_encode_request
和_decode_request
,这里用到了scrapy
内置的函数request_to_dict
和request_from_dict
来实现;_encode_request
将请求转为字典类型,用serializer.dumps
转换成json
格式类型的数据并返回;_decode_request
也相似,过程相反,将json
类型数据转为字典类型后返回。
def _encode_request(self, request): """Encode a request object""" obj = request_to_dict(request, self.spider) return self.serializer.dumps(obj) def _decode_request(self, encoded_request): """Decode an request previously encoded""" obj = self.serializer.loads(encoded_request) return request_from_dict(obj, self.spider)
__len__
、push
、pop
等,它们都是没有实现的,clear
则是通用的实现,用于删除redis
指定key
FifoQueue
类实现了基类Base
没有实现的__len__
、push
、pop
方法;__len__
返回列表类型的redis
队列长度;
push
将编码成json
格式的数据存入列表类型的redis
队列;
pop
判断timeout
参数是否大于0,是则使用redis
的brpop(key, timeout)
方法,当这个key
里面没有值时会等待n秒后才返回tuple
类型的数据,返回第一个是key
键,第二个是值;若是timeout
不大于0,则用rpop
方法删除并获取列表中的最后一个元素,当队列里面没有值时,2种方法都会返回None
,即data
为None
,最后若是data
不为None
返回解码后的请求数据。
def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: return self._decode_request(data)
与FifoQueue
类相似,PriorityQueue
类也实现了基类Base
没有实现的__len__
、push
、pop
方法,不过这里用的是redis
的有序集合sorted set
;__len__
方法用zcard
获取有序集合长度;
push
方法用_encode_request
获取请求,设置了score
值为scrapy
的内置属性-request.priority
默认值为0,最后用redis
的zadd
方法将key
即spider.name
,score
,data
即request
请求等添加到有序集合中。
def push(self, request): data = self._encode_request(request) score = -request.priority self.server.execute_command('ZADD', self.key, score, data)
pop
在scheduler
部分调用的时候已经分析过了。就是调用redis
的pop
每隔0秒从队列取出一个请求,取出操做使用redis
的pipeline
,要先执行multi()
操做,而后执行取请求操做zrange(0, 0)
取出一个请求并用zremrangebyrank(0, 0)
删除这个索引对应的请求,而后执行execute
获取结果,若是取出告终果就返回解码后的results[0]
即request
对象。def pop(self, timeout=0): pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute() if results: return self._decode_request(results[0])
LifoQueue
类也实现了__len__
、push
、pop
方法;__len__
、push
方法跟FifoQueue
类的一致,再也不赘述;pop
方法,其实也差很少,不过FifoQueue
类用的是redis
的brpop
和rpop
用于从最早进入队列删除key
(最旧),LifoQueue
类用的是blpop
和lpop
用于从最后进入队列删除key
(最新)。def pop(self, timeout=0): if timeout > 0: data = self.server.blpop(self.key, timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.lpop(self.key) if data: return self._decode_request(data
做者实现了3中队列类,先进先出队列、优先级队列、后进先出队列,项目用到的是优先级队列。
其实到这里已经可以知足个人去重过滤、增量爬取的需求了。但做者还提供了本身的spiders.py
来执行爬取请求,和pipelines.py
来将数据存储到redis
的功能,有兴趣的接着看。
先看spider部分,这里定义了3个类,RedisMixin
类用于实现从redis队列读取urls
,RedisSpider
类继承RedisMixin
和scrapy的Spider
类;RedisCrawlSpider
继承RedisMixin
和scrapy的CrawlerSpider
类;都用于空闲时从redis队列中读取爬取的请求urls
。
先看看RedisSpider
类和RedisCrawlSpider
。它们都实现了scrapy
的一个类方法from_crawler
。这个类方法是干吗的呢,不知道,因此去官网文档搜索下Spider from_crawler
,发现method -- scrapy.spiders.Spider.from_crawler -- in Spiders
这个内容比较符合咱们想找的类方法。进去文档搜索from_crawler
,的确找到如出一辙的类方法from_crawler(crawler, *args, **kwargs)
,它是scrapy下的class scrapy.spiders.Spider
类下的类方法,用来建立spider对象。
RedisSpider
类和RedisCrawlSpider
功能是同样的,就拿RedisSpider
这个类来讲吧,调用from_crawler
类方法,里面的super
会继承scrapy.spiders.CrawlSpider
建立的spider
对象,而后RedisSpider
类也就具备了spider
对象的全部属性和方法,同时又继承了RedisMixin
类,那么类RedisSpider
又具备了RedisMixin
类的全部属性和方法,因此就能够调用RedisMixin
类里的setup_redis
方法了。(这个过程感受挺难理解的,下一篇笔记会用视频记录pycharm debug类方法调用的过程)
from scrapy.spiders import Spider, CrawlSpider class RedisMixin(object): def setup_redis(self, crawler=None): pass class RedisSpider(RedisMixin, Spider): @classmethod def from_crawler(self, crawler, *args, **kwargs): obj = super(RedisSpider, self).from_crawler(crawler, *args, **kwargs) obj.setup_redis(crawler) return obj class RedisCrawlSpider(RedisMixin, CrawlSpider): @classmethod def from_crawler(self, crawler, *args, **kwargs): obj = super(RedisCrawlSpider, self).from_crawler(crawler, *args, **kwargs) obj.setup_redis(crawler) return obj
接着说RedisMixin
类,通过调用类方法from_crawler
,RedisMixin
类已经具备了spider
对象的全部属性和方法,那么就能够在RedisMixin
类里面使用它们了。
首先这个类定义了start_requests
函数直接返回next_requests
函数,next_requests
函数返回一个要调度的request
或返回none
;
next_requests
函数具体实现:先设置了个标志位use_set
,其名为REDIS_START_URLS_AS_SET
,其值为default.py
设置的默认值False
;由于use_set
为False
,因此fetch_one
调用上面说过的init部分生成的redis
实例的spop
方法,不然调用lpop
方法;初始化时found
为0,进入循环,redis_batch_size
的值为scrapy项目的settings.py
设置的CONCURRENT_REQUESTS
的值,默认并发值是16;调用fetch_one
从redis
获取一个redis_key
即通过去重过滤的请求url
,若是没有获取到请求就说明队列为空,跳出循环;有的话接着调用make_request_from_data
方法将字节类型url
编码成str
类型再返回(这个函数返回make_requests_from_url
,但我找不到哪里有定义,不知道是否是做者写错了,在github问也没人回答。。);若是有返回,则用yield
同时处理多个请求url
,而后将请求个数加一,并将日志输出。
def next_requests(self): use_set = self.settings.getbool('REDIS_START_URLS_AS_SET', defaults.START_URLS_AS_SET) fetch_one = self.server.spop if use_set else self.server.lpop found = 0 while found < self.redis_batch_size: data = fetch_one(self.redis_key) if not data: # Queue empty. break req = self.make_request_from_data(data) if req: yield req found += 1 else: self.logger.debug("Request not made from data: %r", data) if found: self.logger.debug("Read %s requests from '%s'", found, self.redis_key) def make_request_from_data(self, data): url = bytes_to_str(data, self.redis_encoding) return self.make_requests_from_url(url)
setup_redis
方法,按照注释,这是用于设置redis
链接和空闲信号的,须要在spider
对象设置了它的crawler
对象以后才能够被调用,也就是要使用上面提到的RedisSpider
和RedisCrawlSpider
两个类以后,继承了spider
和crawler
以后才行;这个方法须要传入参数crawler
,默认为None
,若是传入值为None
则会报错,提示crawler is required
;若是没报错说明已经继承了spider
和crawler
对象,那接着就是从crawler
对象获取配置信息等属性;接着判断redis
队列中是否有请求url
,若是没有则将本项目 的配置赋值给redis_key
,而后格式化成'name': 本身项目爬虫名
这样的格式;接着用字符串的strip()
方法判断redis_key
是否为空字符串,是则报错;而后判断redis_batch_size
,若是为None
则将默认值赋值给它,而后异常处理redis_batch_size
是否为整形;再来就是判断redis_encoding
,为None
则将默认值赋值给它;判断参数都ok
后,将参数信息做为日志输出;而后如init
部分分析的,生成一个redis
客户端对象;最后调用crawler.signals.connect
方法,这个方法调用spider_idle
方法,spider_idle
方法又调用schedule_next_requests
方法,schedule_next_requests
方法调用next_requests
方法从reids
队列来获取新的请求url
,而后用scrapy
的crawler.engine.crawl
方法在爬虫空闲时来获取并执行爬取请求,执行完了返回到spider_idle
执行raise DontCloseSpider
来禁止关闭爬虫spider
,正常来讲执行完了请求就会关闭爬虫,到没法再获取到新的请求时,也就是redis
请求队列没有请求了才会关闭spider
。(其实这里有点蒙,做者用了DontCloseSpider
来禁止关闭爬虫spider
,最后是怎么关闭爬虫的?有知道的童鞋望告知,谢谢!)class RedisMixin(object): """Mixin class to implement reading urls from a redis queue.""" redis_key = None redis_batch_size = None redis_encoding = None server = None def setup_redis(self, crawler=None): #...忽略部份内容 self.server = connection.from_settings(crawler.settings) # The idle signal is called when the spider has no requests left, # that's when we will schedule new requests from redis queue crawler.signals.connect(self.spider_idle, signal=signals.spider_idle) def schedule_next_requests(self): # TODO: While there is capacity, schedule a batch of redis requests. for req in self.next_requests(): self.crawler.engine.crawl(req, spider=self) def spider_idle(self): """Schedules a request if available, otherwise waits.""" # XXX: Handle a sentinel to close the spider. self.schedule_next_requests() raise DontCloseSpider
根据scrapy
的原理可知,通过spider
模块engine
模块scheduler
模块后到达pipelines
模块,请求url
爬取的内容将在这里被处理。
根据文档,自定义pipeline
要实现from_crawler
、process_item
、open_spider
、close_spider
这几个方法(这里没有实现open_spider
、close_spider
,有点不理解)。
先看__init__
,初始化一个server
对象,也就是本身的redis
客户端对象,一个用于存储爬取数据的item
的key
,还有个用于编码成json
格式的序列化函数,默认使用ScrapyJSONEncoder().encode
接着是2个类方法,from_crawler
类方法调用from_settings
类方法,from_settings
类方法首先从项目配置settings.py
读取配置,若是读取到有REDIS_ITEMS_KEY
这个关键字就将其做为参数添加到params
字典中,REDIS_ITEMS_SERIALIZER
也是同样,最后return cls(**params)
,此时类方法就会再次初始化,将params
字典中的参数赋值给__init__
中的参数。(这里的类方法跟上面scheduler
,spiders
提到的是相似的)
而后是实现process_item
方法,它调用deferToThread
方法,这个方法的做用是在线程中运行函数,并将结果做为延迟返回。这个方法传入一个私有方法_process_item
来处理item
,首先调用item_key
格式化,将spidername:items
做为key
,而后序列化items
的内容做为值,最后用rpush
将键值对存入redis
并返回item
,让它继续走完scrapy
的流程。
通过spiders.py
和pipelines.py
后就能够将爬虫爬取的内容存储到本身的redis
了。
若是有童鞋看到了最后,手动给你点个赞!真有耐心,哈哈。本人水平有限,有问题欢迎留言交流,谢谢。
scrapy过滤重复数据和增量爬取
redis基础笔记
scrapy电影天堂实战(二)建立爬虫项目
scrapy电影天堂实战(一)建立数据库
scrapy基础笔记
在docker镜像中加入环境变量
笔记 | mongodb 入门操做
笔记 | python元类
笔记 | python2和python3使用super()
那些你在python3中可能没用到但应该用的东西
superset docker 部署
开机启动容器里面的程序
博客 | 三步部署hitchhiker-api