Elasticsearch整合scrapy在AI量化引擎中的应用

采集到的数据的存储,咱们使用elasticscarch,下文简称es。es是基于lucene的全文索引服务,lucene是一个全文索引的开发包,而es在此基础上扩展了不少搜索引擎必备的功能,并且提供的restful的API,es愈来愈像一个nosql数据。与之类似的产品是solr。solr的schema对于中文应用的配置不太方便。html

es的python api文档地址以下:python

https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/index.htmlgit

es的基本操做,与数据库的概念能够对应上,index对应的是数据库,doc_type对应数据表, id是unique_key,doc是一个dict格式的数据记录。github

from elasticsearch import Elasticsearch
from datetime import datetime
class ESMgr(object):
    def __init__(self,index_name,doc_type):
        self.es = Elasticsearch(hosts='your ip')
        self.index_name = index_name
        self.doc_type = doc_type

    def add_doc(self,doc):
        id = doc.get('id')
        doc.pop('id')
        self.es.index(index=self.index_name, doc_type=self.doc_type,id=id, body=doc)

doc是一个dict格式,es同mongodb这样的nosql相似,能够不须要预先定义schema(es里叫field mapping),会按照文档字段的格式猜想字段的类型,但须要注意,一旦这个字段生成,它的mapping就是不可修改的,要修改只能删除重建索引,因此在建索引之初这个mapping就要考虑清楚。sql

相同的id会自动把整个doc覆盖掉,在保存网页数据的时候,咱们能够直接使用url,固然若是考虑存储空间,能够存储url的hashcode。mongodb

查询直接get便可,数据库

#按id字段查询
def get(self,id):
    try:
        source = self.es.get(index=self.index_name, doc_type=self.doc_type, id=id)['_source']
    except:
        return None
    return source

可使用es库直接对内容增量排重,若是es库里已存在,也就是已经完成采集的url,就再也不request请求。api

class IngoreRequestMiddleware(object):
    def __init__(self):
        self.es = ESMgr(index_name='index_article',doc_type='article')

    def process_request(self, request, spider):
        # 查不到会返回None,不为None,则已存在,无需再request
        # es只有article的url,库里存在就再也不采集了
        if self.es.is_doc_exist(request.url):
            logging.info('exist:%s' % request.url)
            raise IgnoreRequest("IgnoreRequest : %s" % request.url)
        else:
            return None

这里提供一个“忽略”请求的中间件,须要在scrapy settings.py里进行挂载restful

DOWNLOADER_MIDDLEWARES = {
'eagle.middlewares.IngoreRequestMiddleware': 533,
...

这里值得注意一下,downloader minddlewares和spider middlewares所处的位置。app

  • 引擎打开一个网站(open a domain),找处处理该网站的Spider并向该spider请求第一个要爬取的URL(s)。

  • 引擎从Spider中获取到第一个要爬取的URL并在调度器(Scheduler)以Request调度。

  • 引擎向调度器请求下一个要爬取的URL。

  • 调度器返回下一个要爬取的URL给引擎,引擎将URL经过下载中间件(请求(request)方向)转发给下载器(Downloader)。

  • 一旦页面下载完毕,下载器生成一个该页面的Response,并将其经过下载中间件(返回(response)方向)发送给引擎。

  • 引擎从下载器中接收到Response并经过Spider中间件(输入方向)发送给Spider处理。

  • Spider处理Response并返回爬取到的Item及(跟进的)新的Request给引擎。

  • 引擎将(Spider返回的)爬取到的Item给Item Pipeline,将(Spider返回的)Request给调度器。

  • (从第二步)重复直到调度器中没有更多地request,引擎关闭该网站。

process_request(request, spider),若是每一个middleware都返回None,则这个请求会正常被处理,除非返回一个IgnoreRequest,这时这个请求就会被过滤。

使用elasticsearch-head组件能够像数据库管理软件同样查看索引状态以及浏览文档,这个插件如何安装你们能够自行google/百度。另外,ELK套件里的kibana里有一个Dev Tools,这个比较有用,能够直接在里边使用dsl语法访问数据。

es若是当成普通nosql来使用,能够不手动定义mapping,它内置第一次本身选择字段的mapping,但后续就没法修改了,这也是底层lucene的限制。默认自符串,会被当成“keyword”类型,就是没有进行分词和索引,就是普通的一个串,像数据库那般CURD没有任何问题,但用到es的搜索功能就检索不到了。

es毕竟主要是服务于全文索引,不然咱们直接用mongodb就行了,查询语法更简单,因此es还重在这个search的服务。

以下代码对es的查询做了封装,同时按分页查询,并对关键词作了高亮(highlight)显示。

from elasticsearch import Elasticsearch
from datetime import datetime

class ESMgr(object):
    def __init__(self,index_name='index_article',doc_type='article'):
        self.es = Elasticsearch(hosts='47.94.133.21')
        self.index_name = index_name
        self.doc_type = doc_type

    def search(self,keywords,page = 1):
        response = self.es.search(
            index=self.index_name,
            body={
                "query": {
                    "multi_match": {
                        "query": keywords,
                        "fields": ["title", "content"]
                    }
                },
                "from": (page - 1) * 10,
                "size": 10,
                "highlight": {
                    "pre_tags": ['<span class="keyWord">'],
                    "post_tags": ['</span>'],
                    "fields": {
                        "title": {},
                        "content": {},
                    }
                }
            }
        )

        total_nums = response["hits"]["total"]

        hit_list = []
        for hit in response["hits"]["hits"]:
            hit_dict = {}

            if "title" in hit["highlight"]:
                #这里是一个list,join后变成string
                hit_dict["title"] = "".join(hit["highlight"]["title"])
            else:
                hit_dict["title"] = hit["_source"]["title"]
            if "content" in hit["highlight"]:
                hit_dict["content"] = "".join(hit["highlight"]["content"])[:500]
            else:
                hit_dict["content"] = hit["_source"]["content"][:500]

            hit_dict["datetime"] = hit["_source"]["datetime"]
            hit_dict["url"] = hit["_source"]["url"]
            hit_dict["score"] = hit["_score"]

            hit_list.append(hit_dict)

        return hit_list

github上有人提供了elasticsearch-py的高级封装库:elasticsearch-dsl-py

https://github.com/elastic/elasticsearch-dsl-py

使用起来会直观一点,固然有时候,对于底层api的理解也会带来一些麻烦。

最后要解决的一个问题是mapping,对于中文而言,咱们对title,content是须要分词的。

使用kibana的Dev Tools,能够对一个新索引设定一次mapping,设定以后没法修改新增doc若是有新的field,es会自动按第一次写入的数据添加对应的mapping。以下是对title和content字段配置ik分词的mapping命令。

PUT index_article_ik

{

  "mappings":{

    "article":{

      "properties": {

            "content": {

                "type": "text",

                "analyzer": "ik_max_word",

                "search_analyzer": "ik_max_word"

            },

            "title": {

                "type": "text",

                "analyzer": "ik_max_word",

                "search_analyzer": "ik_max_word"

            }

        }

    }

}

}

关于做者:魏佳斌,互联网产品/技术总监,北京大学光华管理学院(MBA),特许金融分析师(CFA),资深产品经理/码农。偏心python,深度关注互联网趋势,人工智能,AI金融量化。致力于使用最前沿的认知技术去理解这个复杂的世界。

扫描下方二维码,关注:AI量化实验室(ailabx),了解AI量化最前沿技术、资讯。

相关文章
相关标签/搜索