Scrapy-Redis库已经为咱们提供了Scrapy分布式的队列、调度器、去重等功能,其GitHub地址为:https://github.com/rmax/scrapy-redis。
git
本节咱们深刻了解一下,利用Redis如何实现Scrapy分布式。github
能够把源码Clone下来,执行以下命令:redis
git clone https://github.com/rmax/scrapy-redis.git
核心源码在scrapy-redis/src/scrapy_redis目录下。数据库
从爬取队列入手,看看它的具体实现。源码文件为queue.py,它有三个队列的实现,首先它实现了一个父类Base
,提供一些基本方法和属性,以下所示:数据结构
class Base(object): """Per-spider base queue class""" def __init__(self, server, spider, key, serializer=None): if serializer is None: serializer = picklecompat
if not hasattr(serializer, 'loads'):
raise TypeError("serializer does not implement 'loads' function: %r" % serializer)
if not hasattr(serializer, 'dumps'):
raise TypeError("serializer '%s' does not implement 'dumps' function: %r" % serializer) self.server = server self.spider = spider self.key = key % {'spider': spider.name} self.serializer = serializer
def _encode_request(self, request): obj = request_to_dict(request, self.spider)
return self.serializer.dumps(obj)
def _decode_request(self, encoded_request): obj = self.serializer.loads(encoded_request)
return request_from_dict(obj, self.spider)
def __len__(self): """Return the length of the queue""" raise NotImplementedError
def push(self, request): """Push a request""" raise NotImplementedError
def pop(self, timeout=0): """Pop a request""" raise NotImplementedError
def clear(self): """Clear queue/stack""" self.server.delete(self.key)
首先看一下_encode_request()和_decode_request()
方法。咱们要把一个Request对象存储到数据库中,但数据库没法直接存储对象,因此先要将Request序列化转成字符串,而这两个方法分别能够实现序列化和反序列化的操做,这个过程能够利用pickle库来实现。队列Queue在调用push()
方法将Request存入数据库时,会调用_encode_request()
方法进行序列化,在调用pop()
取出Request时,会调用_decode_request()
进行反序列化。app
在父类中,__len__()
、push()
和pop()
这三个方法都是未实现的,三个方法直接抛出NotImplementedError
异常,所以这个类不能直接使用。那么,必需要实现一个子类来重写这三个方法,而不一样的子类就会有不一样的实现和不一样的功能。scrapy
接下来咱们定义一些子类来继承Base
类,并重写这几个方法。在源码中有三个子类的实现,它们分别是FifoQueue
、PriorityQueue
、LifoQueue
,咱们分别来看看它们的实现原理。分布式
首先是FifoQueue
,以下所示:ide
class FifoQueue(Base): """Per-spider FIFO queue""" def __len__(self): """Return the length of the queue""" return self.server.llen(self.key)
def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request))
def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout)
if isinstance(data, tuple): data = data[1]
else: data = self.server.rpop(self.key)
if data:
return self._decode_request(data)
这个类继承了Base
类,并重写了__len__()
、push()
、pop()
三个方法,这三个方法都是对server
对象的操做。server
对象就是一个Redis链接对象,咱们能够直接调用其操做Redis的方法对数据库进行操做,这里的操做方法有llen()
、lpush()
、rpop()
等,这就表明此爬取队列使用了Redis的列表。序列化后的Request会存入列表中,__len__()
方法获取列表的长度,push()
方法调用了lpush()
操做,这表明从列表左侧存入数据,pop()
方法中调用了rpop()
操做,这表明从列表右侧取出数据。this
Request在列表中的存取顺序是左侧进、右侧出,这是有序的进出,即先进先出(First Input First Output,FIFO),此类的名称就叫做FifoQueue
。
还有一个与之相反的实现类,叫做LifoQueue
,实现以下:
class LifoQueue(Base): """Per-spider LIFO queue.""" def __len__(self): """Return the length of the stack""" return self.server.llen(self.key)
def push(self, request): """Push a request""" self.server.lpush(self.key, self._encode_request(request))
def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.blpop(self.key, timeout)
if isinstance(data, tuple): data = data[1]
else: data = self.server.lpop(self.key)
if data:
return self._decode_request(data)
与FifoQueue
不一样的是LifoQueue
的pop()
方法,它使用的是lpop()
操做,也就是从左侧出,push()
方法依然使用lpush()
操做,从左侧入。那么效果就是先进后出、后进先出(Last In First Out,LIFO),此类名称就叫做LifoQueue
。这个存取方式相似栈的操做,因此也能够称做StackQueue
。
在源码中还有一个子类叫做PriorityQueue
,顾名思义,它是优先级队列,实现以下:
class PriorityQueue(Base): """Per-spider priority queue abstraction using redis' sorted set""" def __len__(self): """Return the length of the queue""" return self.server.zcard(self.key)
def push(self, request): """Push a request""" data = self._encode_request(request) score = -request.priority self.server.execute_command('ZADD', self.key, score, data)
def pop(self, timeout=0): """ Pop a request timeout not support in this queue class """ pipe = self.server.pipeline() pipe.multi() pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0) results, count = pipe.execute()
if results:
return self._decode_request(results[0])
在这里__len__()
、push()
、pop()
方法使用了server
对象的zcard()
、zadd()
、zrange()
操做,这里使用的存储结果是有序集合,这个集合中的每一个元素均可以设置一个分数,这个分数就表明优先级。
__len__()
方法调用了zcard()
操做,返回的就是有序集合的大小,也就是爬取队列的长度。push()
方法调用了zadd()
操做,就是向集合中添加元素,这里的分数指定成Request的优先级的相反数,分数低的会排在集合的前面,即高优先级的Request就会在集合的最前面。pop()
方法首先调用了zrange()
操做,取出集合的第一个元素,第一个元素就是最高优先级的Request,而后再调用zremrangebyrank()
操做,将这个元素删除,这样就完成了取出并删除的操做。
此队列是默认使用的队列,即爬取队列默认是使用有序集合来存储的。
前面说过Scrapy的去重是利用集合来实现的,而在Scrapy分布式中的去重就须要利用共享的集合,那么这里使用的就是Redis中的集合数据结构。咱们来看看去重类是怎样实现的,源码文件是dupefilter.py,其内实现了一个RFPDupeFilter
类,以下所示:
class RFPDupeFilter(BaseDupeFilter): """Redis-based request duplicates filter. This class can also be used with default Scrapy's scheduler. """ logger = logger
def __init__(self, server, key, debug=False): """Initialize the duplicates filter. Parameters ---------- server : redis.StrictRedis The redis server instance. key : str Redis key Where to store fingerprints. debug : bool, optional Whether to log filtered requests. """ self.server = server self.key = key self.debug = debug self.logdupes = True @classmethod def from_settings(cls, settings): """Returns an instance from given settings. This uses by default the key ``dupefilter:<timestamp>``. When using the ``scrapy_redis.scheduler.Scheduler`` class, this method is not used as it needs to pass the spider name in the key. Parameters ---------- settings : scrapy.settings.Settings Returns ------- RFPDupeFilter A RFPDupeFilter instance. """ server = get_redis_from_settings(settings) key = defaults.DUPEFILTER_KEY % {'timestamp': int(time.time())} debug = settings.getbool('DUPEFILTER_DEBUG')
return cls(server, key=key, debug=debug)
@classmethod def from_crawler(cls, crawler): """Returns instance from crawler. Parameters ---------- crawler : scrapy.crawler.Crawler Returns ------- RFPDupeFilter Instance of RFPDupeFilter. """ return cls.from_settings(crawler.settings)
def request_seen(self, request): """Returns True if request was already seen. Parameters ---------- request : scrapy.http.Request Returns ------- bool """ fp = self.request_fingerprint(request) added = self.server.sadd(self.key, fp)
return added == 0 def request_fingerprint(self, request): """Returns a fingerprint for a given request. Parameters ---------- request : scrapy.http.Request Returns ------- str """ return request_fingerprint(request)
def close(self, reason=''): """Delete data on close. Called by Scrapy's scheduler. Parameters ---------- reason : str, optional """ self.clear()
def clear(self): """Clears fingerprints data.""" self.server.delete(self.key)
def log(self, request, spider): """Logs given request. Parameters ---------- request : scrapy.http.Request spider : scrapy.spiders.Spider """ if self.debug: msg = "Filtered duplicate request: %(request)s" self.logger.debug(msg, {'request': request}, extra={'spider': spider})
elif self.logdupes: msg = ("Filtered duplicate request %(request)s" " - no more duplicates will be shown" " (see DUPEFILTER_DEBUG to show all duplicates)") self.logger.debug(msg, {'request': request}, extra={'spider': spider}) self.logdupes = False
这里一样实现了一个request_seen()
方法,和Scrapy中的request_seen()
方法实现极其相似。不过这里集合使用的是server
对象的sadd()
操做,也就是集合再也不是一个简单数据结构了,而是直接换成了数据库的存储方式。
鉴别重复的方式仍是使用指纹,指纹一样是依靠request_fingerprint()
方法来获取的。获取指纹以后就直接向集合添加指纹,若是添加成功,说明这个指纹本来不存在于集合中,返回值1。代码中最后的返回结果是断定添加结果是否为0,若是刚才的返回值为1,那这个断定结果就是False
,也就是不重复,不然断定为重复。
这样咱们就成功利用Redis的集合完成了指纹的记录和重复的验证。
Scrapy-Redis还帮咱们实现了配合Queue、DupeFilter使用的调度器Scheduler,源文件名称是scheduler.py。咱们能够指定一些配置,如SCHEDULER_FLUSH_ON_START
便是否在爬取开始的时候清空爬取队列,SCHEDULER_PERSIST
便是否在爬取结束后保持爬取队列不清除。咱们能够在settings.py里自由配置,而此调度器很好地实现了对接。
接下来咱们看看两个核心的存取方法,实现以下所示:
def enqueue_request(self, request): if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider)
return False if self.stats: self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider) self.queue.push(request)
return True
def next_request(self): block_pop_timeout = self.idle_before_close request = self.queue.pop(block_pop_timeout)
if request and self.stats: self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
return request
enqueue_request()
能够向队列中添加Request,核心操做就是调用Queue的push()
操做,还有一些统计和日志操做。next_request()
就是从队列中取Request,核心操做就是调用Queue的pop()
操做,此时若是队列中还有Request,则Request会直接取出来,爬取继续,不然若是队列为空,爬取则会从新开始。
目前为止,咱们就以前所说的三个分布式的问题解决了,总结以下。
爬取队列的实现。这里提供了三种队列,使用了Redis的列表或有序集合来维护。
去重的实现。这里使用了Redis的集合来保存Request的指纹,以提供重复过滤。
中断后从新爬取的实现。中断后Redis的队列没有清空,爬取再次启动时,调度器的next_request()
会从队列中取到下一个Request,爬取继续。