功能拆解python
- python链接Elasticsearch
- 查询Elasticsearch打印结果
- 导出全部结果数据
- 将全部结果写入csv文件
1.打通python与Elasticsearch的通讯
与python链接Oracle、MySQL差很少思路,这里须要用到Elasticsearch包,没有的赶忙使用pip install elasticsearch来安装。安装成功后,再使用from elasticsearch import Elasticsearch就不会报错了。json
from elasticsearch import Elasticsearch es = Elasticsearch(hosts="http://192.168.21.33:9200/", http_auth=('abc','dataanalysis')) print(es.info())
经过Elasticsearch()来配置链接,告诉它Elasticsearch所在服务器的IP地址,若是须要输入用户名密码,在http_auth参数中给出。若是打印链接的信息不报错,那就代表链接成功了服务器
2.经过json查询体实现ES的查询
请求体与Kibana下使用的格式彻底一致,若是不肯定请求体写的对不对,能够放在Kibana下调试一下,调试正确了再放进来。elasticsearch
以下所示,经过"_source" : "title"能够限制返回结果只返回title字段。spa
query_json = { "_source": "title", "query": { "bool": { "must": [ {"match_phrase": { "content": "汽车" }}, {"match_phrase": { "content": "房子" }} ] } } } query = es.search(index='mydata',body=query_json) print(query)
正常状况下,打印query不报错的话就能够看到结果了。可是,你会发现返回的结果只有有限的几条。这是由于Elasticsearch默认状况下只会返回10或20条结果,若是你想要获得全部结果,接下来的工做才是重点。调试
3.借助游标导出全部结果数据
敲黑板,划重点:code
- 先借助游标,将全部结果数据存储到内存中
- 而后将内存中的结果数据写入到磁盘,也就是文件中
query = es.search(index='1485073708892',body=query_json,scroll='5m',size=100) results = query['hits']['hits'] # es查询出的结果第一页 total = query['hits']['total'] # es查询出的结果总量 scroll_id = query['_scroll_id'] # 游标用于输出es查询出的全部结果 for i in range(0, int(total/100)+1): # scroll参数必须指定不然会报错 query_scroll = es.scroll(scroll_id=scroll_id,scroll='5m')['hits']['hits'] results += query_scroll
在发送查询请求的时候,就告诉ES须要使用游标,并定义每次返回数据量的大小。ip
定义一个list变量results用来存储数据结果,在代码中,能够另其为空list,即results = [],也能够先将返回结果的第一页存进来,即resutls = query[‘hits’][‘hits’]内存
对于全部结果数据,写个分页加载到内存变量的循环。utf-8
4.将结果写入csv文件
import csv with open('./data/event_title.csv','w',newline='',encoding='utf-8') as flow: csv_writer = csv.writer(flow) for res in results: # print(res) csv_writer.writerow([res['_id']+','+res['_source']['title']])
Done!所有代码以下所示:
import csv from elasticsearch import Elasticsearch # 查看参数配置:https://pypi.org/project/elasticsearch/ es = Elasticsearch(hosts="http://192.168.21.33:9200/", http_auth=('abc','dataanalysis')) query_json = { "_source": "title", "query": { "bool": { "must": [ {"match_phrase": { "content": "汽车" }}, {"match_phrase": { "content": "房子" }} ] } } } query = es.search(index='1485073708892',body=query_json,scroll='5m',size=100) results = query['hits']['hits'] # es查询出的结果第一页 total = query['hits']['total'] # es查询出的结果总量 scroll_id = query['_scroll_id'] # 游标用于输出es查询出的全部结果 for i in range(0, int(total/100)+1): # scroll参数必须指定不然会报错 query_scroll = es.scroll(scroll_id=scroll_id,scroll='5m')['hits']['hits'] results += query_scroll with open('./data/event_title.csv','w',newline='',encoding='utf-8') as flow: csv_writer = csv.writer(flow) for res in results: # print(res) csv_writer.writerow([res['_id']+','+res['_source']['title']]) print('done!') # print(es.info())