在下图中能够看到items.py与pipeline.py,其中items是用来定义抓取内容的实体;pipeline则是用来处理抓取的item的管道
Item管道的主要责任是负责处理有蜘蛛从网页中抽取的Item,他的主要任务是清晰、验证和存储数据。当页面被蜘蛛解析后,将被发送到Item管道,并通过几个特定的次序处理数据。每一个Item管道的组件都是有一个简单的方法组成的Python类。获取了Item并执行方法,同时还须要肯定是否须要在Item管道中继续执行下一步或是直接丢弃掉不处理。简而言之,就是经过spider爬取的数据都会经过这个pipeline处理,能够在pipeline中不进行操做或者执行相关对数据的操做。html
1.清理HTML数据
2.验证解析到的数据(检查Item是否包含必要的字段)
3.检查是不是重复数据(若是重复就删除)
4.将解析到的数据存储到数据库中python
process_item(item, spider)
每个item管道组件都会调用该方法,而且必须返回一个item对象实例或raise DropItem异常。被丢掉的item将不会在管道组件进行执行。此方法有两个参数,一个是item,即要处理的Item对象,另外一个参数是spider,即爬虫。
此外,咱们也能够在类中实现如下方法
open_spider(spider)
当spider执行的时候将调用该方法
close_spider(spider)
当spider关闭的时候将调用该方法mysql
1.生成json数据sql
class JsonWithEncodingPipeline(object): def __init__(self): self.file=codecs.open('article.json', 'w', encoding="utf-8") def process_item(self, item, spider): lines=json.dumps(dict(item), ensure_ascii=False) + '\n' self.file.write(lines) return item def spider_closed(self, spider): self.file.close()
2.操做mysql关系数据库数据库
class MysqlPipeline(object): def __init__(self): self.conn=MySQLdb.connect('localhost', 'root', '*****', 'article_spider', charset="utf8", use_unicode=True) self.cursor=self.conn.cursor() def process_item(self, item, spider): insert_sql=""" insert into article_items(title, url, url_object_id , create_date) VALUES(%s, %s, %s, %s) """ self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"])) self.conn.commit()
3.异步操做mysql关系数据库json
# 异步处理关系数据库 class MysqlTwistedPipline(object): def __init__(self, dbpool): self.dbpool=dbpool @classmethod def from_settings(cls, settings): dbparms=dict( host=settings["MYSQL_HOST"], #这里要在settings中事先定义好 db=settings["MYSQL_DBNAME"], user=settings["MYSQL_USER"], passwd=settings["MYSQL_PASSWORD"], charset="utf8", cursorclass=MySQLdb.cursors.DictCursor, use_unicode=True, ) dbpool=adbapi.ConnectPool("MySQLdb", **dbparms) return cls(dbpool) def process_item(self, item, spider): # 使用twisted将mysql插入变成异步执行 query = self.dbpool.runInteraction(self.do_insert, item) query.addErrback(self.handle_error) def handle_error(self, failure, item, spider): #处理异步插入的异常 print(failure) def do_insert(self, cursor, item): #执行具体的插入 insert_sql = """ insert into article_items(title, url, url_object_id , create_date) VALUES(%s, %s, %s, %s) """ self.cursor.execute(insert_sql, (item["title"], item["url"], item['url_object_id'], item["create_date"]))
4.数据去重api
from scrapy.exceptions import DropItem class DuplicatesPipeline(object): def __init__(self): self.ids_seen = set() def process_item(self, item, spider): if item['id'] in self.ids_seen: raise DropItem("Duplicate item found: %s" % item) else: self.ids_seen.add(item['id']) return item
# Configure item pipelines # See https://doc.scrapy.org/en/latest/topics/item-pipeline.html ITEM_PIPELINES = { # 'ArticleSpider.pipelines.ArticlespiderPipeline': 300, # 'scrapy.pipelines.images.ImagesPipeline': 1, 'ArticleSpider.pipelines.MysqlPipeline': 1, # 'ArticleSpider.pipelines.JsonExporterPipeline': 2, # 'ArticleSpider.pipelines.ArticleImagePipeline': 1 }
每一个pipeline后面有一个数值,这个数组的范围是0-1000,这个数值是这些在pipeline中定义的类的优先级,越小越优先。
在异步处理数据库的时候会传递一个参数为后面的操做进行初始化,process_item()函数其实是将处理的操做传回给这个_init__。数组