注:若是该文章侵犯数据来源网站数据安全,本人会当即删除前端
当前环境,对于大多数人而言,理财渠道愈来愈窄,就连余额宝的年化都降至2%如下,大多数状况是没法跑赢通胀的,并且市面其余理财产品不是风险过高就是要求额度比较大亦或收益不达预期,所以,闲来无事,简单的分析下基金定投。node
分析数据以前固然离不开数据,固然,对于基金数据各大平台也提供了专门的对外接口供数据分析,但大多数都是收费的,并且还要注册帐号,折腾太烦人,仍是直接把数据搞进本身的数据库舒坦。mysql
先简单总结下前端几种爬虫方式sql
方式 | 简介 | 优势 | 缺点 |
---|---|---|---|
浏览器dom操做 | 浏览器的控制台直接dom操做获取内容 | 方便简洁 | 单页面少许数据获取 |
iframe | 经过在网站插入iframe标签,不断更改src监听iframe内容加载完成事件获取数据 | 解决同源问题,速度快 | 数据收集困难,只能采集完一次性打印到控制台 |
headless | 经过无头浏览器在node端模拟人为操做,获取数据 | 能够伪造权限,来源,可以获取页面的几乎全部数据 | 采集速度慢 |
接口请求 | 对于异步数据,抓取接口,寻找参数规律无脑请求 | 抓取速度快,数据易分析 | 部分网站须要伪造来源和身份,IP容易被封(不断更换代理能够解决) |
切记,分析以前必定要查看数据网站的robots.txt文件和申明,是否容许爬取chrome
咱们发现基金概况页面数据服务端渲染,所以须要经过headless获取,并且每一个页面对应的get参数为该基金编码,所以第一步获取的基金列表的编码能够用来拼凑基金概况连接数据库
咱们发现能够经过接口获取,有效参数为基金编码,页码,每页数以及时间段json
经过对网站分析,创建sql表,基金列表和基金净值表浏览器
分析完数据来源网站并建完数据表,接下来开始撸代码,go!!!安全
因为须要将爬取的数据存入数据库,因此对mysql进行了简单的封装bash
新建文件log.js封装简单的日志打印
const log = console.log;
const chalk = require('chalk');
module.exports = {
info(s){
log(chalk.green(s));
},
warn(s){
log(chalk.yellow(s));
},
err(s){
log(chalk.red(s));
}
}
复制代码
新建TransactionConnection.js对sql的事务进行async/await封装
const log = require('./log');
class TransactionConnection{
constructor(conn){
this._connection = conn; //链接
this._isReleased = false; //改链接是否已释放
}
/**
* @description: query封装
* @param {type}
* @return:
*/
query(sql, values){
if( ! this._connection ){
return Promise.reject('当前MySQL链接已经释放, 不能调用 query');
}
return new Promise( (resolve, reject) => {
this._connection.query(sql, values, function(err, results, fields){
if( err ){
return reject(err);
}
resolve({
results : results,
fields : fields
});
});
});
}
/**
* @description: 释放链接
* @param {type}
* @return:
*/
release(){
if( ! this._isReleased && this._connection ){
this._connection.release();
this._isReleased = true;
this._connection = null;
log.info('链接已释放')
}
}
/**
* @description: 销毁链接
* @param {type}
* @return:
*/
destroy(){
if( this._connection ){
this._connection.destroy();
this._connection = null;
}
}
/**
* @description: 开始事务
* @param {type}
* @return:
*/
beginTransaction(){
if( ! this._connection ){
return Promise.reject('当前MySQL链接已经释放, 不能调用 beginTransaction');
}
let that = this;
return new Promise( (resolve, reject) => {
this._connection.beginTransaction( function(err){
if( err ){
return reject(err);
}
resolve(that);
} );
});
}
/**
* @description: 提交事务
* @param {type}
* @return:
*/
commit(){
if( ! this._connection ){
return Promise.reject('当前MySQL链接已经释放, 不能调用 commit');
}
let that = this;
return new Promise( (resolve, reject) => {
this._connection.commit( function(err){
if( err ){
return reject(err);
}
resolve(that);
} );
});
}
/**
* @description: 事务回滚
* @param {type}
* @return:
*/
rollback(){
if( ! this._connection ){
return Promise.reject('当前MySQL链接已经释放, 不能调用 rollback');
}
let that = this;
return new Promise( (resolve, reject) => {
this._connection.rollback( function(err){
if( err ){
return reject(err);
}
resolve(that);
} );
});
}
}
复制代码
新建mysqlFactory.js对sql进行简单封装
'use strict';
const mysql = require('mysql');
const log = require('./log');
const TransactionConnection = require('./TransactionConnection');
module.exports = function mysqlFactory(config){
const pool = mysql.createPool(config);
let singleton = null;
class MysqlClient{
/**
* @description: 获取单例静态方式
* @param {type}
* @return:
*/
static getSingle(){
if(! singleton){
singleton = new MysqlClient();
return singleton;
}
}
/**
* @description: 断开数据库链接
* @param {type}
* @return:
*/
close(){
return new Promise( function(resolve, reject){
pool.end(err => {
if(err){
log.err('断开数据库链接失败');
return reject(err);
}
resolve();
})
})
}
/**
* @description: 获取事务操做链接
* @param {type}
* @return:
*/
getConnection(){
return new Promise( function(resolve, reject){
//当前配置的是 链接池 模式, 直接从池子中获取
pool.getConnection( function(err, connection){
if( err ){
log.err(`从MySQL链接池中获取connection失败: ${err}`);
return reject(err);
}
let conWrap = new TransactionConnection(connection);
resolve( conWrap );
});
});
}
/**
* @description: query
* @param {type}
* @return:
*/
query(sql, values){
return new Promise( (resolve, reject) => {
let finalSQL = sql;
if( arguments.length === 2 ){
finalSQL = mysql.format(sql, values);
}
// log.info(`执行MySQL的SQL语句: ${finalSQL}`);
pool.query(finalSQL, function(err, results, fields){
if( err ){
return reject(err);
}
resolve({
results : results,
fields : fields
});
});
});
}
/**
* @description: 格式化字符串
* @param {type}
* @return:
*/
format(sql, values){
return mysql.format(sql, values);
}
/**
* @description: 批量插入数据
* @param {type}
* @return:
*/
async insert(table, rows){
if (!Array.isArray(rows)) {
rows = [rows];
}
const fields = Object.keys(rows[0]), len = rows.length;
const template = `(${new Array(fields.length).fill('?').join(', ')})`;
const sql = rows.reduce((str, obj, idx) => {
const currentVal = fields.map(key => obj[key]);
if(idx >= len - 1) return `${str} ${this.format(template, currentVal)}`
return `${str} ${this.format(template, currentVal)}, `
}, `INSERT INTO ${table} (${fields.join(', ')}) VALUES`)
let out = await this.query(sql);
return out.results;
}
}
return MysqlClient;
}
复制代码
因为抓取数据牵扯到页面抓取和接口抓取,决定使用urllib
进行接口请求,对页面用puppeteer-core
进行浏览器模拟抓取
const urllib = require('urllib');
const path = require('path');
const puppeteer = require('puppeteer-core');
const {chromePath} = require('../config');
class Request{
constructor(){
this.browser = null;
this.page = null;
}
/**
* @description: 请求封装
* @param {type}
* @return:
*/
curl(url, options){
return new Promise((resolve, reject)=>{
urllib.request(url, options, (err, data, res)=>{
if(err) return reject(err);
resolve({
status: res.statusCode,
data: data.toString()
})
})
})
}
/**
* @description: 打开浏览器并新建窗口
* @param {type}
* @return:
*/
async initBrowser(){
if(this.browser && this.page) return this.page;
this.browser = await puppeteer.launch({
executablePath: chromePath,
headless: true,
})
this.page = await this.browser.newPage();
}
/**
* @description: 打开页面获取数据
* @param {type}
* @return:
*/
async goPage(url, config, callback){
if(!this.page){
await this.initBrowser();
}
config = Object.assign({
timeout: 0,
waitUntil: 'domcontentloaded'
}, config);
await this.page.goto(url, config);
const result = await this.page.evaluate(callback);
return result;
}
/**
* @description: 关闭浏览器
* @param {type}
* @return:
*/
async close(){
if(this.browser || this.page){
await this.browser.close();
}
}
}
module.exports = new Request();
复制代码
新建config/index.js增长配置文件,因为数据比较敏感,因此将配置文件中的有关连接部分隐去
const path = require('path')
module.exports = {
// mysql配置
mysql: {
connectionLimit: 10,
host: '127.0.0.1',
user: 'root',
password: '123456',
database: 'fund',
charset: 'UTF8_GENERAL_CI',
timezone: 'local',
connectTimeout: 10000
},
// headless配置
chromePath: path.resolve(__dirname, '../Chromium.app/Contents/MacOS/Chromium'),
//数据接口配置
fund: {
fundList: {
url: '基金列表请求连接',
config: {
method: 'GET',
headers: {
referer: '请求来源伪造'
},
data: {
page: '1,50000'
}
}
},
fundDetail: (code) => '基金详情页面连接',
dayValue: {
url: '基金每日净值连接',
getConfig: function (fundCode) {
return {
method: 'GET',
headers: {
referer: '请求来源伪造'
},
data: {
fundCode,
pageIndex: 1,
pageSize: 365 * 10,
startDate: '',
endDate: '',
}
}
}
}
}
}
复制代码
将基金列表先写入json文件,后续和详情一块儿写入基金列表数据表
const mysqlFactory = require('./lib/mysqlFactory');
const request = require('./lib/Request');
const fs = require('fs-extra');
const moment = require('moment');
const {
mysql,
fund
} = require('./config');
const log = require('./lib/log');
// 获取基金列表
async function getFundList() {
const {
url,
config
} = fund.fundList;
const {
data,
status
} = await request.curl(url, config);
if (status === 200) {
eval(data); // 该接口是jsonp直接执行获得变量db
const {
datas
} = db;
log.info(`基金列表总共${datas.length}条数据,最后一条的名字为${datas[datas.length-1][1]}`);
const result = datas.map(e => {
return {
code: e[0],
name: e[1]
};
})
await fs.outputJSON('data.json', JSON.stringify(result));
log.info(`基金列表写入完成!!!`);
return result;
} else {
log.err('获取基金列表出错')
process.exit(1);
}
}
复制代码
unction _callback() {
const tr = document.querySelectorAll('.info tr td');
const [establishDate, total] = tr[5].textContent && tr[5].textContent.split('/');
return {
type: tr[3].textContent || '暂无',
publicDate: tr[4].textContent || '暂无',
establishDate: establishDate.trim() || '暂无',
total: total.trim() || '暂无',
company: tr[8].textContent || '暂无',
trusteeship: tr[9].textContent || '暂无',
manager: tr[10].textContent || '暂无',
}
}
// 获取基金详情
async function getFundDetail(list = [], sql) {
try {
for (let [index, {
code,
name
}] of list.entries()) {
log.info(`开始获取第${index}条数据,code为${code}`)
const url = fund.fundDetail(code);
const result = await request.goPage(url, {}, _callback);
await sql.insert('fund_list', Object.assign({
name,
code
}, result))
}
} catch (err) {
log.err(`插入基金列表发生错误:${err}`);
process.exit(1);
}
}
复制代码
因为数据量比较大,作了简单的并发50个请求和一次插入2万条数据的sql处理,固然你们也能够采用worker_threads
和child_process
以及proxy
处理
async function getConcurrencyData(arrs, logs) {
log.info(`开始获取第${logs.index.join(', ')}条数据的历史净值,code为${logs.code.join(', ')}`);
try {
const result = await Promise.all(arrs);
if (result.every(res => res.status === 200)) {
let allData = [];
result.forEach((single, idx) => {
const {
Data
} = JSON.parse(single.data);
const {
LSJZList
} = Data;
log.info(`第${logs.index[idx]}条的数据总量为${LSJZList.length}`);
const dealResult = LSJZList.map(item => {
return {
code: logs.code[idx],
FSRQ: item.FSRQ ? moment(item.FSRQ).valueOf() : '暂无', //净值日期
DWJZ: item.DWJZ || '暂无', // 单位净值
LJJZ: item.LJJZ || '暂无', // 累计净值
JZZZL: item.JZZZL || '暂无', //日增加率
SGZT: item.SGZT || '暂无', //申购状态
SHZT: item.SHZT || '暂无', //赎回状态
FHSP: item.FHSP || '暂无', //分成
}
});
allData = [...allData, ...dealResult];
})
return allData;
} else {
throw new Error('获取基金净值失败')
}
} catch (err) {
log.err(err);
process.exit(1);
}
}
async function getDayValue(list = [], sql) {
try {
const {
url,
getConfig
} = fund.dayValue, arrs = [], logs = {
index: [],
code: []
}, len = list.length;
for (let [index, {
code
}] of list.entries()) {
// if(index < 8450) continue;
if (arrs.length >= 50 || index >= len-1) {
const allData = await getConcurrencyData(arrs, logs);
// 每次插入500条
while (allData.length > 0) {
console.log(allData.length)
await sql.insert('fund_value', allData.splice(0, 20000));
};
// 清空
arrs.splice(0, arrs.length, request.curl(url, getConfig(code)));
logs.index.splice(0, logs.index.length, index);
logs.code.splice(0, logs.code.length, code);
} else {
logs.index.push(index);
logs.code.push(code);
arrs.push(request.curl(url, getConfig(code)));
}
}
} catch (err) {
log.err(`插入基金净值发生错误:${err}`);
process.exit(1);
}
}
复制代码
async function run() {
const sql = new(mysqlFactory(mysql));
const data = await getFundList();
await getFundDetail(data);
await getDayValue(data, sql);
await sql.close();
}
run()
复制代码
若是对代码要求严格的能够采用爬去出错process.exit()
监听,重启程序来从断开的数据继续爬去(这也是当时为啥将基金列表保存一份在json文件中的缘由,以便于出错重启程序继续存断开的条数继续爬去)