Scrapy是一个比较好用的Python爬虫框架,你只须要编写几个组件就能够实现网页数据的爬取。可是当咱们要爬取的页面很是多的时候,单个主机的处理能力就不能知足咱们的需求了(不管是处理速度仍是网络请求的并发数),这时候分布式爬虫的优点就显现出来。html
而Scrapy-Redis则是一个基于Redis的Scrapy分布式组件。它利用Redis对用于爬取的请求(Requests)进行存储和调度(Schedule),并对爬取产生的项目(items)存储以供后续处理使用。scrapy-redi重写了scrapy一些比较关键的代码,将scrapy变成一个能够在多个主机上同时运行的分布式爬虫。python
原生的Scrapy的架构是这样子的:git
加上了Scrapy-Redis以后的架构变成了:github
scrapy-redis的官方文档写的比较简洁,没有说起其运行原理,因此若是想全面的理解分布式爬虫的运行原理,仍是得看scrapy-redis的源代码才行,不过scrapy-redis的源代码不多,也比较好懂,很快就能看完。redis
scrapy-redis工程的主体仍是是redis和scrapy两个库,工程自己实现的东西不是不少,这个工程就像胶水同样,把这两个插件粘结了起来。数据库
scrapy-redis所实现的两种分布式:爬虫分布式以及item处理分布式。分别是由模块scheduler和模块pipelines实现。json
connection.py服务器
负责根据setting中配置实例化redis链接。被dupefilter和scheduler调用,总之涉及到redis存取的都要使用到这个模块。网络
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
import
redis
import
six
from
scrapy
.
utils
.
misc
import
load_object
DEFAULT_REDIS_CLS
=
redis
.
StrictRedis
# Sane connection defaults.
DEFAULT_PARAMS
=
{
'socket_timeout'
:
30
,
'socket_connect_timeout'
:
30
,
'retry_on_timeout'
:
True
,
}
# Shortcut maps 'setting name' -> 'parmater name'.
SETTINGS_PARAMS_MAP
=
{
'REDIS_URL'
:
'url'
,
'REDIS_HOST'
:
'host'
,
'REDIS_PORT'
:
'port'
,
}
def
get_redis_from_settings
(
settings
)
:
""
"Returns a redis client instance from given Scrapy settings object.
This function uses ``get_client`` to instantiate the client and uses
``DEFAULT_PARAMS`` global as defaults values for the parameters. You can
override them using the ``REDIS_PARAMS`` setting.
Parameters
----------
settings : Settings
A scrapy settings object. See the supported settings below.
Returns
-------
server
Redis client instance.
Other Parameters
----------------
REDIS_URL : str, optional
Server connection URL.
REDIS_HOST : str, optional
Server host.
REDIS_PORT : str, optional
Server port.
REDIS_PARAMS : dict, optional
Additional client parameters.
"
""
params
=
DEFAULT_PARAMS
.
copy
(
)
params
.
update
(
settings
.
getdict
(
'REDIS_PARAMS'
)
)
# XXX: Deprecate REDIS_* settings.
for
source
,
dest
in
SETTINGS_PARAMS_MAP
.
items
(
)
:
val
=
settings
.
get
(
source
)
if
val
:
params
[
dest
]
=
val
# Allow ``redis_cls`` to be a path to a class.
if
isinstance
(
params
.
get
(
'redis_cls'
)
,
six
.
string_types
)
:
params
[
'redis_cls'
]
=
load_object
(
params
[
'redis_cls'
]
)
return
get_redis
(
*
*
params
)
# Backwards compatible alias.
from_settings
=
get_redis_from_settings
def
get_redis
(
*
*
kwargs
)
:
""
"Returns a redis client instance.
Parameters
----------
redis_cls : class, optional
Defaults to ``redis.StrictRedis``.
url : str, optional
If given, ``redis_cls.from_url`` is used to instantiate the class.
**kwargs
Extra parameters to be passed to the ``redis_cls`` class.
Returns
-------
server
Redis client instance.
"
""
redis_cls
=
kwargs
.
pop
(
'redis_cls'
,
DEFAULT_REDIS_CLS
)
url
=
kwargs
.
pop
(
'url'
,
None
)
if
url
:
return
redis_cls
.
from_url
(
url
,
*
*
kwargs
)
else
:
return
redis_cls
(
*
*
kwargs
)
|
connect文件引入了redis模块,这个是redis-python库的接口,用于经过python访问redis数据库,可见,这个文件主要是实现链接redis数据库的功能(返回的是redis库的Redis对象或者StrictRedis对象,这俩都是能够直接用来进行数据操做的对象)。这些链接接口在其余文件中常常被用到。其中,咱们能够看到,要想链接到redis数据库,和其余数据库差很少,须要一个ip地址、端口号、用户名密码(可选)和一个整形的数据库编号,同时咱们还能够在scrapy工程的setting文件中配置套接字的超时时间、等待时间等。数据结构
dupefilter.py
负责执行requst的去重,实现的颇有技巧性,使用redis的set数据结构。可是注意scheduler并不使用其中用于在这个模块中实现的dupefilter键作request的调度,而是使用queue.py模块中实现的queue。当request不重复时,将其存入到queue中,调度时将其弹出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
import
logging
import
time
from
scrapy
.
dupefilters
import
BaseDupeFilter
from
scrapy
.
utils
.
request
import
request_fingerprint
from
.
connection
import
get_redis_from_settings
DEFAULT_DUPEFILTER_KEY
=
"dupefilter:%(timestamp)s"
logger
=
logging
.
getLogger
(
__name__
)
# TODO: Rename class to RedisDupeFilter.
class
RFPDupeFilter
(
BaseDupeFilter
)
:
""
"Redis-based request duplicates filter.
This class can also be used with default Scrapy's scheduler.
"
""
logger
=
logger
def
__init__
(
self
,
server
,
key
,
debug
=
False
)
:
""
"Initialize the duplicates filter.
Parameters
----------
server : redis.StrictRedis
The redis server instance.
key : str
Redis key Where to store fingerprints.
debug : bool, optional
Whether to log filtered requests.
"
""
self
.
server
=
server
self
.
key
=
key
self
.
debug
=
debug
self
.
logdupes
=
True
@
classmethod
def
from_settings
(
cls
,
settings
)
:
""
"Returns an instance from given settings.
This uses by default the key ``dupefilter:<timestamp>``. When using the
``scrapy_redis.scheduler.Scheduler`` class, this method is not used as
it needs to pass the spider name in the key.
Parameters
----------
settings : scrapy.settings.Settings
Returns
-------
RFPDupeFilter
A RFPDupeFilter instance.
"
""
server
=
get_redis_from_settings
(
settings
)
# XXX: This creates one-time key. needed to support to use this
# class as standalone dupefilter with scrapy's default scheduler
# if scrapy passes spider on open() method this wouldn't be needed
# TODO: Use SCRAPY_JOB env as default and fallback to timestamp.
key
=
DEFAULT_DUPEFILTER_KEY
%
{
'timestamp'
:
int
(
time
.
time
(
)
)
}
debug
=
settings
.
getbool
(
'DUPEFILTER_DEBUG'
)
return
cls
(
server
,
key
=
key
,
debug
=
debug
)
@
classmethod
def
from_crawler
(
cls
,
crawler
)
:
""
"Returns instance from crawler.
Parameters
----------
crawler : scrapy.crawler.Crawler
Returns
-------
RFPDupeFilter
Instance of RFPDupeFilter.
"
""
return
cls
.
from_settings
(
crawler
.
settings
)
def
request_seen
(
self
,
request
)
:
""
"Returns True if request was already seen.
Parameters
----------
request : scrapy.http.Request
Returns
-------
bool
"
""
fp
=
self
.
request_fingerprint
(
request
)
# This returns the number of values added, zero if already exists.
added
=
self
.
server
.
sadd
(
self
.
key
,
fp
)
return
added
==
0
def
request_fingerprint
(
self
,
request
)
:
""
"Returns a fingerprint for a given request.
Parameters
----------
request : scrapy.http.Request
Returns
-------
str
"
""
return
request_fingerprint
(
request
)
def
close
(
self
,
reason
=
''
)
:
""
"Delete data on close. Called by Scrapy's scheduler.
Parameters
----------
reason : str, optional
"
""
self
.
clear
(
)
def
clear
(
self
)
:
""
"Clears fingerprints data."
""
self
.
server
.
delete
(
self
.
key
)
def
log
(
self
,
request
,
spider
)
:
""
"Logs given request.
Parameters
----------
request : scrapy.http.Request
spider : scrapy.spiders.Spider
"
""
if
self
.
debug
:
msg
=
"Filtered duplicate request: %(request)s"
self
.
logger
.
debug
(
msg
,
{
'request'
:
request
}
,
extra
=
{
'spider'
:
spider
}
)
elif
self
.
logdupes
:
msg
=
(
"Filtered duplicate request %(request)s"
" - no more duplicates will be shown"
" (see DUPEFILTER_DEBUG to show all duplicates)"
)
msg
=
"Filtered duplicate request: %(request)s"
self
.
logger
.
debug
(
msg
,
{
'request'
:
request
}
,
extra
=
{
'spider'
:
spider
}
)
self
.
logdupes
=
False
|
这个文件看起来比较复杂,重写了scrapy自己已经实现的request判重功能。由于自己scrapy单机跑的话,只须要读取内存中的request队列或者持久化的request队列(scrapy默认的持久化彷佛是json格式的文件,不是数据库)就能判断此次要发出的request url是否已经请求过或者正在调度(本地读就好了)。而分布式跑的话,就须要各个主机上的scheduler都链接同一个数据库的同一个request池来判断此次的请求是不是重复的了。
在这个文件中,经过继承BaseDupeFilter重写他的方法,实现了基于redis的判重。根据源代码来看,scrapy-redis使用了scrapy自己的一个fingerprint接request_fingerprint,这个接口颇有趣,根据scrapy文档所说,他经过hash来判断两个url是否相同(相同的url会生成相同的hash结果),可是当两个url的地址相同,get型参数相同可是顺序不一样时,也会生成相同的hash结果(这个真的比较神奇。。。)因此scrapy-redis依旧使用url的fingerprint来判断request请求是否已经出现过。这个类经过链接redis,使用一个key来向redis的一个set中插入fingerprint(这个key对于同一种spider是相同的,redis是一个key-value的数据库,若是key是相同的,访问到的值就是相同的,这里使用spider名字+DupeFilter的key就是为了在不一样主机上的不一样爬虫实例,只要属于同一种spider,就会访问到同一个set,而这个set就是他们的url判重池),若是返回值为0,说明该set中该fingerprint已经存在(由于集合是没有重复值的),则返回False,若是返回值为1,说明添加了一个fingerprint到set中,则说明这个request没有重复,因而返回True,还顺便把新fingerprint加入到数据库中了。 DupeFilter判重会在scheduler类中用到,每个request在进入调度以前都要进行判重,若是重复就不须要参加调度,直接舍弃就行了,否则就是白白浪费资源。
queue.py
其做用如dupefilter.py所述,可是这里实现了三种方式的queue:FIFO的SpiderQueue,SpiderPriorityQueue,以及LIFI的SpiderStack。默认使用的是第二种,这也就是出现以前文章中所分析状况的缘由(连接)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
from
scrapy
.
utils
.
reqser
import
request_to_dict
,
request_from_dict
from
.
import
picklecompat
class
Base
(
object
)
:
""
"Per-spider queue/stack base class"
""
def
__init__
(
self
,
server
,
spider
,
key
,
serializer
=
None
)
:
""
"Initialize per-spider redis queue.
Parameters:
server -- redis connection
spider -- spider instance
key -- key for this queue (e.g. "
%
(
spider
)
s
:
queue
")
"
""
if
serializer
is
None
:
# Backward compatibility.
# TODO: deprecate pickle.
serializer
=
picklecompat
if
not
hasattr
(
serializer
,
'loads'
)
:
raise
TypeError
(
"serializer does not implement 'loads' function: %r"
%
serializer
)
if
not
hasattr
(
serializer
,
'dumps'
)
:
raise
TypeError
(
"serializer '%s' does not implement 'dumps' function: %r"
%
serializer
)
self
.
server
=
server
self
.
spider
=
spider
self
.
key
=
key
%
{
'spider'
:
spider
.
name
}
self
.
serializer
=
serializer
def
_encode_request
(
self
,
request
)
:
""
"Encode a request object"
""
obj
=
request_to_dict
(
request
,
self
.
spider
)
return
self
.
serializer
.
dumps
(
obj
)
def
_decode_request
(
self
,
encoded_request
)
:
""
"Decode an request previously encoded"
""
obj
=
self
.
serializer
.
loads
(
encoded_request
)
return
request_from_dict
(
obj
,
self
.
spider
)
def
__len__
(
self
)
:
""
"Return the length of the queue"
""
raise
NotImplementedError
def
push
(
self
,
request
)
:
""
"Push a request"
""
raise
NotImplementedError
def
pop
(
self
,
timeout
=
0
)
:
""
"Pop a request"
""
raise
NotImplementedError
def
clear
(
self
)
:
""
"Clear queue/stack"
""
self
.
server
.
delete
(
self
.
key
)
class
SpiderQueue
(
Base
)
:
""
"Per-spider FIFO queue"
""
def
__len__
(
self
)
:
""
"Return the length of the queue"
""
return
self
.
server
.
llen
(
self
.
key
)
def
push
(
self
,
request
)
:
""
"Push a request"
""
self
.
server
.
lpush
(
self
.
key
,
self
.
_encode_request
(
request
)
)
def
pop
(
self
,
timeout
=
0
)
:
""
"Pop a request"
""
if
timeout
>
0
:
data
=
self
.
server
.
brpop
(
self
.
key
,
timeout
)
if
isinstance
(
data
,
tuple
)
:
data
=
data
[
1
]
else
:
data
=
self
.
server
.
rpop
(
self
.
key
)
if
data
:
return
self
.
_decode_request
(
data
)
class
SpiderPriorityQueue
(
Base
)
:
""
"Per-spider priority queue abstraction using redis' sorted set"
""
def
__len__
(
self
)
:
""
"Return the length of the queue"
""
return
self
.
server
.
zcard
(
self
.
key
)
def
push
(
self
,
request
)
:
""
"Push a request"
""
data
=
self
.
_encode_request
(
request
)
score
=
-
request
.
priority
# We don't use zadd method as the order of arguments change depending on
# whether the class is Redis or StrictRedis, and the option of using
# kwargs only accepts strings, not bytes.
self
.
server
.
execute_command
(
'ZADD'
,
self
.
key
,
score
,
data
)
def
pop
(
self
,
timeout
=
0
)
:
""
"
Pop a request
timeout not support in this queue class
"
""
# use atomic range/remove using multi/exec
pipe
=
self
.
server
.
pipeline
(
)
pipe
.
multi
(
)
pipe
.
zrange
(
self
.
key
,
0
,
0
)
.
zremrangebyrank
(
self
.
key
,
0
,
0
)
results
,
count
=
pipe
.
execute
(
)
if
results
:
return
self
.
_decode_request
(
results
[
0
]
)
class
SpiderStack
(
Base
)
:
""
"Per-spider stack"
""
def
__len__
(
self
)
:
""
"Return the length of the stack"
""
return
self
.
server
.
llen
(
self
.
key
)
def
push
(
self
,
request
)
:
""
"Push a request"
""
self
.
server
.
lpush
(
self
.
key
,
self
.
_encode_request
(
request
)
)
def
pop
(
self
,
timeout
=
0
)
:
""
"Pop a request"
""
if
timeout
>
0
:
data
=
self
.
server
.
blpop
(
self
.
key
,
timeout
)
if
isinstance
(
data
,
tuple
)
:
data
=
data
[
1
]
else
:
data
=
self
.
server
.
lpop
(
self
.
key
)
if
data
:
return
self
.
_decode_request
(
data
)
__all__
=
[
'SpiderQueue'
,
'SpiderPriorityQueue'
,
'SpiderStack'
]
|
该文件实现了几个容器类,能够看这些容器和redis交互频繁,同时使用了咱们上边picklecompat中定义的serializer。这个文件实现的几个容器大致相同,只不过一个是队列,一个是栈,一个是优先级队列,这三个容器到时候会被scheduler对象实例化,来实现request的调度。好比咱们使用SpiderQueue最为调度队列的类型,到时候request的调度方法就是先进先出,而实用SpiderStack就是先进后出了。
咱们能够仔细看看SpiderQueue的实现,他的push函数就和其余容器的同样,只不过push进去的request请求先被scrapy的接口request_to_dict变成了一个dict对象(由于request对象实在是比较复杂,有方法有属性很差串行化),以后使用picklecompat中的serializer串行化为字符串,而后使用一个特定的key存入redis中(该key在同一种spider中是相同的)。而调用pop时,其实就是从redis用那个特定的key去读其值(一个list),从list中读取最先进去的那个,因而就先进先出了。
这些容器类都会做为scheduler调度request的容器,scheduler在每一个主机上都会实例化一个,而且和spider一一对应,因此分布式运行时会有一个spider的多个实例和一个scheduler的多个实例存在于不一样的主机上,可是,由于scheduler都是用相同的容器,而这些容器都链接同一个redis服务器,又都使用spider名加queue来做为key读写数据,因此不一样主机上的不一样爬虫实例公用一个request调度池,实现了分布式爬虫之间的统一调度。
picklecompat.py
1
2
3
4
5
6
7
8
9
|
""
"A pickle wrapper module with protocol=-1 by default."
""
try
:
import
cPickle
as
pickle
# PY2
except
ImportError
:
import
pickle
def
loads
(
s
)
:
return
pickle
.
loads
(
s
)
def
dumps
(
obj
)
:
return
pickle
.
dumps
(
obj
,
protocol
=
-
1
)
|
这里实现了loads和dumps两个函数,其实就是实现了一个serializer,由于redis数据库不能存储复杂对象(value部分只能是字符串,字符串列表,字符串集合和hash,key部分只能是字符串),因此咱们存啥都要先串行化成文本才行。这里使用的就是python的pickle模块,一个兼容py2和py3的串行化工具。这个serializer主要用于一会的scheduler存reuqest对象,至于为何不实用json格式,我也不是很懂,item pipeline的串行化默认用的就是json。
pipelines.py
这是是用来实现分布式处理的做用。它将Item存储在redis中以实现分布式处理。另外能够发现,一样是编写pipelines,在这里的编码实现不一样于文章中所分析的状况,因为在这里须要读取配置,因此就用到了from_crawler()函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
from
scrapy
.
utils
.
misc
import
load_object
from
scrapy
.
utils
.
serialize
import
ScrapyJSONEncoder
from
twisted
.
internet
.
threads
import
deferToThread
from
.
import
connection
default_serialize
=
ScrapyJSONEncoder
(
)
.
encode
class
RedisPipeline
(
object
)
:
""
"Pushes serialized item into a redis list/queue"
""
def
__init__
(
self
,
server
,
key
=
'%(spider)s:items'
,
serialize_func
=
default_serialize
)
:
self
.
server
=
server
self
.
key
=
key
self
.
serialize
=
serialize
_func
@
classmethod
def
from_settings
(
cls
,
settings
)
:
params
=
{
'server'
:
connection
.
from_settings
(
settings
)
,
}
if
settings
.
get
(
'REDIS_ITEMS_KEY'
)
:
params
[
'key'
]
=
settings
[
'REDIS_ITEMS_KEY'
]
if
settings
.
get
(
'REDIS_ITEMS_SERIALIZER'
)
:
params
[
'serialize_func'
]
=
load_object
(
settings
[
'REDIS_ITEMS_SERIALIZER'
]
)
return
cls
(
*
*
params
)
@
classmethod
def
from_crawler
(
cls
,
crawler
)
:
return
cls
.
from_settings
(
crawler
.
settings
)
def
process_item
(
self
,
item
,
spider
)
:
return
deferToThread
(
self
.
_process_item
,
item
,
spider
)
def
_process_item
(
self
,
item
,
spider
)
:
key
=
self
.
item_key
(
item
,
spider
)
data
=
self
.
serialize
(
item
)
self
.
server
.
rpush
(
key
,
data
)
return
item
def
item_key
(
self
,
item
,
spider
)
:
""
"Returns redis key based on given spider.
Override this function to use a different key depending on the item
and/or spider.
"
""
return
self
.
key
%
{
'spider'
:
spider
.
name
}
|
pipeline文件实现了一个item pipieline类,和scrapy的item pipeline是同一个对象,经过从settings中拿到咱们配置的REDIS_ITEMS_KEY做为key,把item串行化以后存入redis数据库对应的value中(这个value能够看出出是个list,咱们的每一个item是这个list中的一个结点),这个pipeline把提取出的item存起来,主要是为了方便咱们延后处理数据。
scheduler.py
此扩展是对scrapy中自带的scheduler的替代(在settings的SCHEDULER变量中指出),正是利用此扩展实现crawler的分布式调度。其利用的数据结构来自于queue中实现的数据结构。
scrapy-redis所实现的两种分布式:爬虫分布式以及item处理分布式就是由模块scheduler和模块pipelines实现。上述其它模块做为为两者辅助的功能模块。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
import
importlib
import
six
from
scrapy
.
utils
.
misc
import
load_object
from
.
import
connection
# TODO: add SCRAPY_JOB support.
class
Scheduler
(
object
)
:
""
"Redis-based scheduler"
""
def
__init__
(
self
,
server
,
persist
=
False
,
flush_on_start
=
False
,
queue_key
=
'%(spider)s:requests'
,
queue_cls
=
'scrapy_redis.queue.SpiderPriorityQueue'
,
dupefilter_key
=
'%(spider)s:dupefilter'
,
dupefilter_cls
=
'scrapy_redis.dupefilter.RFPDupeFilter'
,
idle_before_close
=
0
,
serializer
=
None
)
:
""
"Initialize scheduler.
Parameters
----------
server : Redis
The redis server instance.
persist : bool
Whether to flush requests when closing. Default is False.
flush_on_start : bool
Whether to flush requests on start. Default is False.
queue_key : str
Requests queue key.
queue_cls : str
Importable path to the queue class.
dupefilter_key : str
Duplicates filter key.
dupefilter_cls : str
Importable path to the dupefilter class.
idle_before_close : int
Timeout before giving up.
"
""
if
idle_before_close
<
0
:
raise
TypeError
(
"idle_before_close cannot be negative"
)
self
.
server
=
server
self
.
persist
=
persist
self
.
flush_on_start
=
flush_on_start
self
.
queue_key
=
queue_key
self
.
queue_cls
=
queue_cls
self
.
dupefilter_cls
=
dupefilter_cls
self
.
dupefilter_key
=
dupefilter_key
self
.
idle_before_close
=
idle_before_close
self
.
serializer
=
serializer
self
.
stats
=
None
def
__len__
(
self
)
:
return
len
(
self
.
queue
)
@
classmethod
def
from_settings
(
cls
,
settings
)
:
kwargs
=
{
'persist'
:
settings
.
getbool
(
'SCHEDULER_PERSIST'
)
,
'flush_on_start'
:
settings
.
getbool
(
'SCHEDULER_FLUSH_ON_START'
)
,
'idle_before_close'
:
settings
.
getint
(
'SCHEDULER_IDLE_BEFORE_CLOSE'
)
,
}
# If these values are missing, it means we want to use the defaults.
optional
=
{
# TODO: Use custom prefixes for this settings to note that are
# specific to scrapy-redis.
'queue_key'
:
'SCHEDULER_QUEUE_KEY'
,
'queue_cls'
:
'SCHEDULER_QUEUE_CLASS'
,
'dupefilter_key'
:
'SCHEDULER_DUPEFILTER_KEY'
,
# We use the default setting name to keep compatibility.
'dupefilter_cls'
:
'DUPEFILTER_CLASS'
,
'serializer'
:
'SCHEDULER_SERIALIZER'
,
}
for
name
,
setting_name
in
optional
.
items
(
)
:
val
=
settings
.
get
(
setting_name
)
if
val
:
kwargs
[
name
]
=
val
# Support serializer as a path to a module.
if
isinstance
(
kwargs
.
get
(
'serializer'
)
,
six
.
string_types
)
:
kwargs
[
'serializer'
]
=
importlib
.
import_module
(
kwargs
[
'serializer'
]
)
server
=
connection
.
from_settings
(
settings
)
# Ensure the connection is working.
server
.
ping
(
)
return
cls
(
server
=
server
,
*
*
kwargs
)
@
classmethod
def
from_crawler
(
cls
,
crawler
)
:
instance
=
cls
.
from_settings
(
crawler
.
settings
)
# FIXME: for now, stats are only supported from this constructor
instance
.
stats
=
crawler
.
stats
return
instance
def
open
(
self
,
spider
)
:
self
.
spider
=
spider
try
:
self
.
queue
=
load_object
(
self
.
queue_cls
)
(
server
=
self
.
server
,
spider
=
spider
,
key
=
self
.
queue_key
%
{
'spider'
:
spider
.
name
}
,
serializer
=
self
.
serializer
,
)
except
TypeError
as
e
:
raise
ValueError
(
"Failed to instantiate queue class '%s': %s"
,
self
.
queue_cls
,
e
)
try
:
self
.
df
=
load_object
(
self
.
dupefilter_cls
)
(
server
=
self
.
server
,
key
=
self
.
dupefilter_key
%
{
'spider'
:
spider
.
name
}
,
debug
=
spider
.
settings
.
getbool
(
'DUPEFILTER_DEBUG'
)
,
)
except
TypeError
as
e
:
raise
ValueError
(
"Failed to instantiate dupefilter class '%s': %s"
,
self
.
dupefilter_cls
,
e
)
|