由于这个DashBoard本身会一直开发维护下去,本来会演变成一个《试用 vue-admin-template 写一个本身的dashboard》+ 一、二、三、、N。可是今天仍是把标题改掉了,由于其实重点并非dashboard自己,真正的重点是我平常想去作的业余研究,顺便研究些华丽的先后端技术。javascript
Respage01的原由是老婆大人对周边各类早教和教育培训机构春笋般出现的现象感到费解,而后向我提了一个问题,“若是咱们入股相似的一个小教育培训机构,会不会有前途?”。这是一个很难回答的问题,由于咱们都没有在培训教育行业里待过,可是既然问题已经出现了,本身也以为这个问题有些意思,便想要作一个平常研究。Respage01会从宏观角度去分析并监测教育培训机构(在金华地区)相关的数据,数据都来自互联网。html
上一次已经留下了一个壳子,相似这样:vue
可是从代码上也看到了,我是把某一天的坐标数据硬编码到了这个页面中,因此今天就是要来作成动态,恰好结合《部署Django REST Framework服务(Nginx + uWSGI + Django)》,将接口部署到线上。java
既然作了接口化,那咱们就能够定时作好天天的数据爬取 -> 数据清洗 -> 数据入库 -> 接口吐出。那么页面即可天天获取到最新的数据。node
问题又来了,既然天天有了最新的数据,那最好是有方式能够看到天天的变化。可能在必定的时间跨度内,能够看到一些明显的变化,因此我打算作一个数据回看功能,作法可能很简单: 触发了某个按钮后,定时刷新全部的数据,或者作成一个短视频形式python
在一个较短的时间跨度内,有可能很难从热力图上看到变化,因此打算加一个折线图来标识天天的数据。mysql
目前的爬虫目录很简单:redis
.
├── config.json
├── log
│ └── train-2018-09-03.log
├── result
│ └── train-2018-09-03.txt
└── spiker.py
复制代码
那完成这个事情定时运行相似这样的脚本就能够:sql
python spiker.py | python format.py | python writeToRedis.py && python redisToMysql.py
复制代码
虽然有人会建议能够直接在爬取时完成清洗、去重和入库的动做,可是我仍是喜欢用这种流式的方法来处理,这样更加清晰,功能也更加解耦。并且了解管道的同窗能够看出来,这其实就是同时完成了爬取、清洗和入库动做,只不过是每条数据串行完成了这系列动做。这里的writeToRedis.py是为了利用redis自然的去重功能,redis的读写性能也会让效率更高些。数据库
修改就只有两个:
""" 查询关键字:移到config.json """
FileKey = 'train'
KeyWord = u"早教$培训"
复制代码
## 设置标准输出的编码格式
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8')
for r in res['results']:
file.writelines(str(r).strip() + '\n')
# 增长标准输出
print(str(r).strip(), flush=True)
复制代码
首先读取上一级管道的标准输出做为输入,使用fileinput即可实现:
def main():
for line in fileinput.input():
format(line)
复制代码
分析一条的数据的结构,只保留感兴趣的数据(不用担忧丢失,由于在第一个节点上已经保存了原始数据),并尽可能把json结构拉平成只有一级:
{'name': '英伦艺术培训', 'lat': 29.109614, 'lng': 119.662018, 'address': '解放东路238号福莲汇8层', 'province': '浙江省', 'city': '金华市', 'area': '婺城区', 'street_id': '15ca1ce6773a95f7a2a9343c', 'detail': 1, 'uid': '15ca1ce6773a95f7a2a9343c', 'detail_info': {'tag': '教育培训;培训机构', 'type': 'education', 'detail_url': 'http://api.map.baidu.com/place/detail?uid=15ca1ce6773a95f7a2a9343c&output=html&source=placeapi_v2', 'overall_rating': '0.0', 'children': []}}
复制代码
因为该数据一级比较简单,因此format也只是作了很小的处理,另外,这样的好处时,不一样的数据结构能够写不一样的format就能够。
# coding: utf-8
import fileinput
import io
import sys
import chardet
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8')
def format(line):
""" :param line: :return: """
result = {}
tmp = eval(line.decode('utf-8'))
try:
result = {
"name": str(tmp["name"]),
"lat": tmp["location"]["lat"],
"lng": tmp["location"]["lng"],
"address": str(tmp["address"]),
"tag": str(tmp["detail_info"]["tag"]),
}
# 部分数据可能缺失字段
if "detail_url" in tmp["detail_info"]:
result["detail_url"] = tmp["detail_info"]["detail_url"]
else:
result["detail_url"] = ""
if "overall_rating" in tmp["detail_info"]:
result["rate"] = tmp["detail_info"]["overall_rating"]
else:
result["rate"] = "0.0"
print(str(result).strip(), flush=True)
except Exception as e:
print(e)
pass
def main():
try:
for line in fileinput.input(mode='rb'):
format(line)
sys.stderr.close()
except Exception as e:
print(e)
pass
if __name__ == '__main__':
main()
复制代码
若是数据量大,能够用相似的方法来调试:
cat node1.txt | head -n 1 | python format.py
复制代码
其实使用python的set也能够完成去重的事情,代码中也能够尝试这样的操做。关于去重的方式,在不一样场景下有各式的方案,咱们这属于简单场景,由于数据量不大。
系统安装redis服务,并配置密码:
sudo apt-get install redis-server
复制代码
在虚拟环境下安装redis库:
pip install redis
vi /etc/redis/redis.conf
# 打开 requirepass 配置项,并后面跟上密码
requirepass xxxx
复制代码
登陆测试:
redis-cli -a xxxx
复制代码
redis有String、List、Set、Hash、Sort Hash几种类型,因为咱们只是要作去重,那就用Set结构就能够:
train_2018_09_07(key) -> (数据1,数据2 ... 数据n)
复制代码
writeToRedis的简单实现:
# coding: utf-8
import fileinput
import redis
import time
from tool.tool import tool
import io
import sys
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf8')
def connectRedis():
re = redis.Redis(
host=tool.getRedisHost(),
port=tool.getRedisPort(),
password=tool.getRedisPass(),
decode_responses=True)
return re
def main():
today = time.strftime("%Y_%m_%d")
setName = tool.getFileKey() + "_" + today
try:
re = connectRedis()
for line in fileinput.input(mode='rb'):
re.sadd(setName, line.decode('utf-8').strip())
exit(0)
except Exception as e:
print(e)
exit(-1)
if __name__ == '__main__':
main()
复制代码
使用redis的set还有一个好处就是,能够理由set的差集等功能,快速获取天天发生变化的数据。这个需求打算后面加上
执行:
python spiker.py | python format.py | python writeToRedis.py
复制代码
运行后,原始数据文件条目:
cat train-2018-09-07.txt | wc -l 663
复制代码
Redis Set 内的条目数:
127.0.0.1:6379> SCARD train_2018_09_07
(integer) 640
复制代码
说明确实仍是有重复的数据,缘由多是咱们使用了10*10小矩形扫描的方式,不可避免地会有交界处重叠的问题。
关于使用了redis后仍是否须要Mysql的讨论也有不少,你们能够去参与讨论。我我的的考虑是Django上能够更好地支持Mysql来作序列化和反序列化,毕竟Mysql的查询功能也更加温馨一些。
首先从redis中读出数据的形式,可使用迭代器的方式,好处是稍微省内存些,可是问题是若是单条数据单独写入mysql的话,IO上估计也不太合算,因此我使用pandas的DataFrame写入mysql的方式来分批写入,使用pandas的好处是字典数据写入数据操做比通常的mysql库要简洁不少。
虚拟环境下安装pandas:
pip install pandas sqlalchemy
复制代码
# coding: utf-8
import redis
from tool.tool import tool
import time
import pandas as pd
from sqlalchemy import create_engine
import pymysql
def connectRedis():
""" 链接Redis :return: redis connect """
re = redis.Redis(
host=tool.getRedisHost(),
port=tool.getRedisPort(),
password=tool.getRedisPass(),
decode_responses=True)
return re
def connectMysql():
""" 链接mysql数据库 :return: engine connect """
config = tool.getMysqlConfig()
engine = create_engine(str(r"mysql+pymysql://%s:%s@%s/%s?charset=utf8") %
(config['User'],
config['Pass'],
config['Host'],
config['Name']))
return engine
def redisToMysql(re, en):
""" :param re: redis connect :param en: mysql engine connect :return: """
today = time.strftime("%Y_%m_%d")
tableName = tool.getFileKey() + '_' + today
res = []
index = 0
for item in re.sscan_iter(tool.getFileKey() + '_' + today):
tmp = eval(item.encode('utf-8').decode('utf-8'))
tmp['time'] = today
res.append(tmp)
index += 1
if index >= 100:
df = pd.DataFrame(res)
df.to_sql('respage01', con=en, if_exists='append', index=False,)
index = 0
res = []
if index != 0:
df = pd.DataFrame(res)
df.to_sql(name='respage01', con=en, if_exists='append', index=False)
# 添加主键
# print("xxxxxxxx")
# with en.connect() as con:
# con.execute("alter table respage01 add COLUMN id INT NOT NULL AUTO_INCREMENT primary key first")
def main():
re = connectRedis()
en = connectMysql()
redisToMysql(re, en)
if __name__ == '__main__':
main()
复制代码
为了后面django处理方便,我后面临时加入了一个自增id做为主键。方法能够是:
alter table respage01 add COLUMN id INT NOT NULL AUTO_INCREMENT primary key first;
复制代码
咱们设计两个api: * 获取某个时间段内的全部坐标数据 * 获取某个时间段内天天的数量值
class Respage01Info(models.Model):
""" respage 01 相关的数据 """
time = models.CharField(max_length=100)
name = models.CharField(max_length=200)
address = models.CharField(max_length=500)
detail_url = models.URLField(max_length=500)
rate = models.FloatField()
lat = models.FloatField()
lng = models.FloatField()
class Meta:
# 指定数据表
db_table = "respage01"
复制代码
须要注意的是,咱们已经拥有了数据库,而且表里已经有了数据,因此在执行migrate的时候,须要指明fake掉该项目的数据迁移:
python manage.py migrate --fake rouboapi
复制代码
因为咱们的计数接口是须要使用聚合类查询功能,简单说,就是须要返回数据库字段之外的字段给客户端,因此须要使用serializers的Field方法。
class Respage01Serializer(serializers.HyperlinkedModelSerializer):
""" 序列化Respage01相关的数据 """
class Meta:
model = Respage01Info
fields = ('time', 'lat', 'lng', 'name', 'address', 'detail_url', 'rate')
class Respage01CountSerializer(serializers.HyperlinkedModelSerializer):
""" 序列化计数数据,用于序列化聚合类查询的结果 """
time = serializers.StringRelatedField()
count = serializers.IntegerField()
class Meta:
model = Respage01Info
fields = ('time', 'count')
复制代码
这里须要用到django的数据库查询相关的知识,咱们这里用到了fiter、values、annotate几个函数,具体的能够参考官方文档,基本用法仍是比较简单。
class Respage01(APIView):
""" 获取respage01相关的数据 """
authentication_classes = []
permission_classes = []
def rangeTime(self, start_time, end_time):
""" 获取时间区间 :param start_time: :param end_time: :return: """
print("------------")
dateList = [datetime.strftime(x, "%Y_%m_%d")
for x in list(pd.date_range(start=start_time.replace('_',''), end=end_time.replace('_','')))]
return dateList
def get(self, request, format=None):
req = request.query_params
if 'type' not in req or 'start_time' not in req or 'end_time' not in req:
return Response({}, status=status.HTTP_400_BAD_REQUEST)
if req['type'] == 'location':
dateList = self.rangeTime(start_time=req['start_time'], end_time=req['end_time'])
queryset = Respage01Info.objects.filter(time__in=dateList)
serializer = Respage01Serializer(queryset, many=True)
elif req['type'] == 'count':
dateList = self.rangeTime(start_time=req['start_time'], end_time=req['end_time'])
queryset = Respage01Info.objects.filter(time__in=dateList).values('time').annotate(count=Count('id'))
serializer = Respage01CountSerializer(queryset, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
复制代码
在接口上线后测试过程当中,发现接口极其不稳定,查了一下发现mysql会异常地退出,查看了日志发现是内存不足致使。
个人vps是1G内存的基础配置,虽然小,可是不至于这么紧张。经过top【M】排序后惊奇地发现uwsgi开了10个进程,每一个进程占用了7%左右的内存。修改uwsgi ini文件重启后故障排除(咱们这种小服务,两个进程足够了)。
# mysite_uwsgi.ini file
[uwsgi]
# Django-related settings
# the base directory (full path)
chdir = /data/django/rouboApi
# Django's wsgi file
module = rouboinfo.wsgi
# the virtualenv (full path)
home = /data/django/env3
# process-related settings
# master
master = true
# maximum number of worker processes
processes = 2
# the socket (use the full path to be safe
socket = /data/django/rouboApi/rouboapi.scok
# ... with appropriate permissions - may be needed
chmod-socket = 666
# clear environment on exit
vacuum = true
复制代码
roubo’s dashboard 主要是增长了两个接口请求,并将v-charts的数据动态化。这里也简单加了一个“复盘”按钮,定时刷新数据,能够大概看到一些变化。
<template>
<div>
<div style="height: 100%">
<button @click="onPlay">复盘</button>
<ve-heatmap :data="chartDataMap" :settings="chartSettingsMap" height="600px"/>
</div>
<div>
<ve-line :data="chartDataChart" :settings="chartSettingsChart"/>
</div>
</div>
</template>
复制代码
/** * 获取某个时间区间的位置信息 * @param start_time * @param end_time */
getLocations: function(start_time, end_time) {
this.rouboapis.getRespage01Info('location', start_time, end_time, {
success: (res) => {
this.chartDataMap.rows = res
},
fail: (err) => {
console.log(err)
}
})
},
/** * 获取某个时间段的统计数据 * @param start_time * @param end_time */
getCount: function(start_time, end_time) {
this.rouboapis.getRespage01Info('count', start_time, end_time, {
success: (res) => {
this.chartDataChart.rows = res
}
})
},
/** * 点击复盘按钮事件 */
onPlay: function() {
const dateList = this.getDateList('2018_09_13', this.today('_'))
let index = 0
const timer = setInterval(() => {
this.getLocations(dateList[index], dateList[index])
this.getCount('2018_09_13', dateList[index])
index = index + 1
if (index >= dateList.length) {
clearInterval(timer)
return
}
}, 5000)
}
复制代码
页面仍是很丑,下一步抽时间美化下。