首先咱们须要安装 RabbitMQ,而后经过服务启动它,默认为注册到本机的5672端口。咱们的爬虫和数据库写入脚本都须要链接到 RabbitMQ,一边往队列中写入数据,另外一边从队列中取出数据,而后插入到数据。html
Python 中使用 RabbitMQ 能够经过调用 pika 这个库,安装过程见官方文档,对于 RabbitMQ 自己也有中文教程。python
本项目用到的模型是一对一的,用 pika 写很容易,代码以下:数据库
import pika # 导入库 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) # 设置一个新链接,链接到本地的 RabbitMQ 服务端。 channel = connection.channel() # 注册到 books 队列 channel.queue_declare(queue='books') channel.basic_publish(exchange='', routing_key='books', body='Whats up') # 发送消息 body connection.close() #
在 basic_publish 这个函数中,咱们设置 exchange 为空,而 routing-key 为 books,此时 basic_publish 会默认把咱们的 body 信息根据 routing-key 的内容发送到 books 的队列中。django
这里 exchange 实际上是一个信息中转站,以下图,P 为咱们要发送的信息,X 就是信息中转站,咱们经过 exchange 字段来设置咱们的目标中转站,而后由 exchange 来决定咱们的信息要往哪里走。 app
而 routing-key 的设置也颇有讲究,能够参考教程中 Routing 一节。scrapy
到此,咱们已经写好生产者了,接下来咱们看消费者。ide
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='books') def callback(ch, method, properties, body): print body channel.basic_consume(callback, queue='books', no_ack=True) # 注册回调函数,当有消息取出时,程序调用 callback 函数,其中 body 就是取出的消息。 channel.start_consuming()
最后贴一下代码,爬虫端:函数
#!/usr/bin/env python # -*- encoding: utf-8 -*- from pyspider.libs.base_handler import * import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='books') class Handler(BaseHandler): crawl_config = {} def on_start(self): self.crawl('http://scrapy.org/', callback=self.index_page) def on_result(self, result): # 重写 on_result 函数 if not result: return assert self.task, "on_result can't outside a callback." result['callback'] = self.task['process']['callback'] if self.is_debugger(): pprint(result) channel.basic_publish(exchange='', routing_key='books', body=result) if self.__env__.get('result_queue'): channel.basic_publish(exchange='', routing_key='books', body=result) self.__env__['result_queue'].put((self.task, result)) @config(priority=5) def index_page(self, response): url_list = [] for each in response.doc('a[href^="http"]').items(): url_list.append(each.attr.href) return url_list
消费者端:ui
import os os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'spider.settings') import django django.setup() from django.core.exceptions import ObjectDoesNotExist from importer.models import Books import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='books') def callback(ch, method, properties, body): new_book = Books() new_book.url = body new_book.save() print body + ' saved!' channel.basic_consume(callback, queue='books', no_ack=True) channel.start_consuming()
本例中调用了 django 的 BOOKS 的数据库模型,在队列中取出消息后,存入 BOOKS 表中,字段为url。url