博客首发于www.litreily.tophtml
应一位金融圈的朋友所托,帮忙写个爬虫,帮他爬取中国期货行业协议网站中全部金融机构的从业人员信息。网站数据的获取自己比较简单,可是为了学习一些新的爬虫方法和技巧,即本文要讲述的生产者消费者模型,我又学习了一下Python中队列库queue
及线程库Thread
的使用方法。python
生产者消费者模型很是简单,相信大部分程序员都知道,就是一方做为生产者不断提供资源,另外一方做为消费者不断消费资源。简单点说,就比如餐馆的厨师和顾客,厨师做为生产者不断制做美味的食物,而顾客做为消费者不断食用厨师提供的食物。此外,生产者与消费者之间能够是一对1、一对多、多对一和多对多的关系。git
那么这个模型和爬虫有什么关系呢?其实,爬虫能够认为是一个生产者,它不断从网站爬取数据,爬取到的数据就是食物;而所得数据须要消费者进行数据清洗,把有用的数据吸取掉,把无用的数据丢弃。程序员
在实践过程当中,爬虫爬取和数据清洗分别对应一个Thread
,两个线程之间经过顺序队列queue
传递数据,数据传递过程就比如餐馆服务员从厨房把食物送到顾客餐桌上的过程。爬取线程负责爬取网站数据,并将原始数据存入队列,清洗线程从队列中按入队顺序读取原始数据并提取出有效数据。github
以上即是对生产者消费者模型的简单介绍了,下面针对本次爬取任务予以详细说明。web
http://www.cfachina.org/cfainfo/organbaseinfoServlet?all=personinfojson
咱们要爬取的数据是主页显示的表格中全部期货公司的从业人员信息,每一个公司对应一个机构编号(G01001~G01198
)。从上图能够看到有主页有分页,共8页。以G01001
方正中期期货公司为例,点击该公司名称跳转至对应网页以下:数组
从网址及网页内容能够提取出如下信息:app
organid
: 机构编号,+G01001+
~ +G01198+
currentPage
: 该机构从业人员信息当前页面编号pageSize
: 每一个页面显示的人员个数,默认20selectType
: 固定为personinfo
mechanism_name
,在每页表格上方能够看到当前机构名称page_cnt
咱们最终爬取的数据能够按机构名称存储到对应的txt文件或excel文件中。ide
获取到某机构的任意从业信息页面后,使用BeautifulSoup
可快速提取机构名称。
mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
复制代码
那么有人可能会问,既然主页表格都已经包含了全部机构的编号和名称,为什么还要画蛇添足的再获取一次呢?这是由于,我压根就不想爬主页的那些表格,直接根据机构编号的递增规律生成对应的网址便可,因此获取机构名称的任务就放在了爬取每一个机构首个信息页面以后。
每一个机构的数据量是不等的,幸亏每一个页面都包含了当前页面数及总页面数。使用如下代码便可获取页码数。
url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')
page_cnt = url_re.search(html).group(1)
复制代码
从每一个机构首页获取页码数后,即可for
循环修改网址参数中的currentPage
,逐页获取机构信息。
针对如上图所示的一个特定信息页时,人员信息被存放于一个表中,除了固定的表头信息外,人员信息均被包含在一个带有id
的tr
标签中,因此使用BeautifulSoup
能够很容易提取出页面内全部人员信息。
soup.find_all('tr', id=True)
复制代码
通常的想法固然是逐页爬取主页信息,而后获取每页全部机构对应的网页连接,进而继续爬取每一个机构信息。
可是因为该网站的机构信息网址具备明显的规律,咱们根据每一个机构的编号即可直接获得每一个机构每一个信息页面的网址。因此具体爬取方案以下:
url_queue
SpiderThread
完成抓取任务url_queue
中读取一个编号,生成机构首页网址,使用requests
抓取之html_queue
DatamineThread
完成数据清洗任务html_queue
中读取一组页面信息BeautifulSoup
提取页面中的从业人员信息Storage
存入本地文件SpiderThread
爬虫线程先从队列获取一个机构编号,生成机构首页网址并进行爬取,接着判断机构页面数量是否为0,如若不为0则继续获取机构名称,并根据页面数循环爬取剩余页面,将原始html数据以以下dict
格式存入队列html_queue
:
{
'name': mechanismId_mechanismName,
'num': currentPage,
'content': html
}
复制代码
爬虫产生的数据队列html_queue
将由数据清洗线程进行处理,下面是爬虫线程的主程序,整个线程代码请看后面的源码。
def run(self):
while True:
mechanism_id = 'G0' + self.url_queue.get()
# the first page's url
url = self.__get_url(mechanism_id, 1)
html = self.grab(url)
page_cnt = self.url_re.search(html.text).group(1)
if page_cnt == '0':
self.url_queue.task_done()
continue
soup = BeautifulSoup(html.text, 'html.parser')
mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))
# put data into html_queue
self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1, 'content':html})
for i in range(2, int(page_cnt) + 1):
url = self.__get_url(mechanism_id, i)
html = self.grab(url)
self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})
self.url_queue.task_done()
复制代码
DatamineThread
数据清洗线程比较简单,就是从生产者提供的数据队列html_queue
逐一提取html
数据,而后从html
数据中提取从业人员信息,以二维数组形式存储,最后交由存储模块Storage
完成数据存储工做。
class DatamineThread(Thread):
"""Parse data from html"""
def __init__(self, html_queue, filetype):
Thread.__init__(self)
self.html_queue = html_queue
self.filetype = filetype
def __datamine(self, data):
'''Get data from html content'''
soup = BeautifulSoup(data['content'].text, 'html.parser')
infos = []
for info in soup.find_all('tr', id=True):
items = []
for item in info.find_all('td'):
items.append(item.get_text())
infos.append(items)
return infos
def run(self):
while True:
data = self.html_queue.get()
print('Datamine Thread: get %s_%d' % (data['name'], data['num']))
store = Storage(data['name'], self.filetype)
store.save(self.__datamine(data))
self.html_queue.task_done()
复制代码
Storage
我写了两类文件格式的存储函数,write_txt
, write_excel
,分别对应txt
,excel
文件。实际存储时由调用方肯定文件格式。
def save(self, data):
{
'.txt': self.write_txt,
'.xls': self.write_excel
}.get(self.filetype)(data)
复制代码
存入txt
文件是比较简单的,就是以附加(a
)形式打开文件,写入数据,关闭文件。其中,文件名称由调用方提供。写入数据时,每一个人员信息占用一行,以制表符\t
分隔。
def write_txt(self, data):
'''Write data to txt file'''
fid = open(self.path, 'a', encoding='utf-8')
# insert the header of table
if not os.path.getsize(self.path):
fid.write('\t'.join(self.table_header) + '\n')
for info in data:
fid.write('\t'.join(info) + '\n')
fid.close()
复制代码
存入Excel
文件仍是比较繁琐的,因为经验很少,选用的是xlwt
, xlrd
和xlutils
库。说实话,这3个库真心不大好用,勉强完成任务而已。为何这么说,且看:
xlwt
只能写,xlrd
只能读,须要xlutils
的copy
函数将xlrd
读取的数据复制到内存,再用xlwt
修改.xls
文件:.xlsx
经读写也会变成.xls
格式因此后续我确定会再学学其它的excel
库,固然,当前解决方案暂时还用这三个。代码以下:
def write_excel(self, data):
'''write data to excel file'''
if not os.path.exists(self.path):
header_style = xlwt.easyxf('font:name 楷体, color-index black, bold on')
wb = xlwt.Workbook(encoding='utf-8')
ws = wb.add_sheet('Data')
# insert the header of table
for i in range(len(self.table_header)):
ws.write(0, i, self.table_header[i], header_style)
else:
rb = open_workbook(self.path)
wb = copy(rb)
ws = wb.get_sheet(0)
# write data
offset = len(ws.rows)
for i in range(0, len(data)):
for j in range(0, len(data[0])):
ws.write(offset + i, j, data[i][j])
# When use xlutils.copy.copy function to copy data from exist .xls file,
# it will loss the origin style, so we need overwrite the width of column,
# maybe there some other good solution, but I have not found yet.
for i in range(len(self.table_header)):
ws.col(i).width = 256 * (10, 10, 15, 20, 50, 20, 15)[i]
# save to file
while True:
try:
wb.save(self.path)
break
except PermissionError as e:
print('{0} error: {1}'.format(self.path, e.strerror))
time.sleep(5)
finally:
pass
复制代码
说明:
offset
,即当前文件已包含数据的行数PermissionError
异常,能够在捕获该异常而后提示错误信息,并定时等待直到文件被关闭。主函数用于建立和启动生产者线程和消费者线程,同时为生产者线程提供机构编号队列。
url_queue = queue.Queue()
html_queue = queue.Queue()
def main():
for i in range(1001, 1199):
url_queue.put(str(i))
# create and start a spider thread
st = SpiderThread(url_queue, html_queue)
st.setDaemon(True)
st.start()
# create and start a datamine thread
dt = DatamineThread(html_queue, '.xls')
dt.setDaemon(True)
dt.start()
# wait on the queue until everything has been processed
url_queue.join()
html_queue.join()
复制代码
从主函数能够看到,两个队列都调用了join
函数,用于阻塞,直到对应队列为空为止。要注意的是,队列操做中,每一个出队操做queue.get()
须要对应一个queue.task_done()
操做,不然会出现队列数据已所有处理完,但主线程仍在执行的状况。
至此,爬虫的主要代码便讲解完了,下面是完整源码。
#!/usr/bin/python3
# -*-coding:utf-8-*-
import queue
from threading import Thread
import requests
import re
from bs4 import BeautifulSoup
import os
import platform
import xlwt
from xlrd import open_workbook
from xlutils.copy import copy
import time
# url format ↓
# http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+G01001+¤tPage=1&pageSize=20&selectType=personinfo&all=undefined
# organid: +G01001+, +G01002+, +G01003+, ...
# currentPage: 1, 2, 3, ...
# pageSize: 20(default)
#
# Algorithm design:
# 2 threads with 2 queues
# Thread-1, get first page url, then get page_num and mechanism_name from first page
# Thread-2, parse html file and get data from it, then output data to local file
# url_queue data -> 'url' # first url of each mechanism
# html_queue data -> {'name':'mechanism_name', 'html':data}
url_queue = queue.Queue()
html_queue = queue.Queue()
class SpiderThread(Thread):
"""Threaded Url Grab"""
def __init__(self, url_queue, html_queue):
Thread.__init__(self)
self.url_queue = url_queue
self.html_queue = html_queue
self.page_size = 20
self.url_re = re.compile('#currentPage.*\+.*\+\'(\d+)\'')
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36'}
def __get_url(self, mechanism_id, current_page):
return 'http://www.cfachina.org/cfainfo/organbaseinfoOneServlet?organid=+%s+¤tPage=%d&pageSize=%d&selectType=personinfo&all=undefined' \
% (mechanism_id, current_page, self.page_size)
def grab(self, url):
'''Grab html of url from web'''
while True:
try:
html = requests.get(url, headers=self.headers, timeout=20)
if html.status_code == 200:
break
except requests.exceptions.ConnectionError as e:
print(url + ' Connection error, try again...')
except requests.exceptions.ReadTimeout as e:
print(url + ' Read timeout, try again...')
except Exception as e:
print(str(e))
finally:
pass
return html
def run(self):
'''Grab all htmls of mechanism one by one Steps: 1. grab first page of each mechanism from url_queue 2. get number of pages and mechanism name from first page 3. grab all html file of each mechanism 4. push all html to html_queue '''
while True:
mechanism_id = 'G0' + self.url_queue.get()
# the first page's url
url = self.__get_url(mechanism_id, 1)
html = self.grab(url)
page_cnt = self.url_re.search(html.text).group(1)
if page_cnt == '0':
self.url_queue.task_done()
continue
soup = BeautifulSoup(html.text, 'html.parser')
mechanism_name = soup.find('', {'class':'gst_title'}).find_all('a')[2].get_text()
print('\nGrab Thread: get %s - %s with %s pages\n' % (mechanism_id, mechanism_name, page_cnt))
# put data into html_queue
self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':1, 'content':html})
for i in range(2, int(page_cnt) + 1):
url = self.__get_url(mechanism_id, i)
html = self.grab(url)
self.html_queue.put({'name':'%s_%s' % (mechanism_id, mechanism_name), 'num':i, 'content':html})
self.url_queue.task_done()
class DatamineThread(Thread):
"""Parse data from html"""
def __init__(self, html_queue, filetype):
Thread.__init__(self)
self.html_queue = html_queue
self.filetype = filetype
def __datamine(self, data):
'''Get data from html content'''
soup = BeautifulSoup(data['content'].text, 'html.parser')
infos = []
for info in soup.find_all('tr', id=True):
items = []
for item in info.find_all('td'):
items.append(item.get_text())
infos.append(items)
return infos
def run(self):
while True:
data = self.html_queue.get()
print('Datamine Thread: get %s_%d' % (data['name'], data['num']))
store = Storage(data['name'], self.filetype)
store.save(self.__datamine(data))
self.html_queue.task_done()
class Storage():
def __init__(self, filename, filetype):
self.filetype = filetype
self.filename = filename + filetype
self.table_header = ('姓名', '性别', '从业资格号', '投资咨询从业证书号', '任职部门', '职务', '任现职时间')
self.path = self.__get_path()
def __get_path(self):
path = {
'Windows': 'D:/litreily/Documents/python/cfachina',
'Linux': '/mnt/d/litreily/Documents/python/cfachina'
}.get(platform.system())
if not os.path.isdir(path):
os.makedirs(path)
return '%s/%s' % (path, self.filename)
def write_txt(self, data):
'''Write data to txt file'''
fid = open(self.path, 'a', encoding='utf-8')
# insert the header of table
if not os.path.getsize(self.path):
fid.write('\t'.join(self.table_header) + '\n')
for info in data:
fid.write('\t'.join(info) + '\n')
fid.close()
def write_excel(self, data):
'''write data to excel file'''
if not os.path.exists(self.path):
header_style = xlwt.easyxf('font:name 楷体, color-index black, bold on')
wb = xlwt.Workbook(encoding='utf-8')
ws = wb.add_sheet('Data')
# insert the header of table
for i in range(len(self.table_header)):
ws.write(0, i, self.table_header[i], header_style)
else:
rb = open_workbook(self.path)
wb = copy(rb)
ws = wb.get_sheet(0)
# write data
offset = len(ws.rows)
for i in range(0, len(data)):
for j in range(0, len(data[0])):
ws.write(offset + i, j, data[i][j])
# When use xlutils.copy.copy function to copy data from exist .xls file,
# it will loss the origin style, so we need overwrite the width of column,
# maybe there some other good solution, but I have not found yet.
for i in range(len(self.table_header)):
ws.col(i).width = 256 * (10, 10, 15, 20, 50, 20, 15)[i]
# save to file
while True:
try:
wb.save(self.path)
break
except PermissionError as e:
print('{0} error: {1}'.format(self.path, e.strerror))
time.sleep(5)
finally:
pass
def save(self, data):
'''Write data to local file. According filetype to choose function to save data, filetype can be '.txt' or '.xls', but '.txt' type is saved more faster then '.xls' type Args: data: a 2d-list array that need be save '''
{
'.txt': self.write_txt,
'.xls': self.write_excel
}.get(self.filetype)(data)
def main():
for i in range(1001, 1199):
url_queue.put(str(i))
# create and start a spider thread
st = SpiderThread(url_queue, html_queue)
st.setDaemon(True)
st.start()
# create and start a datamine thread
dt = DatamineThread(html_queue, '.xls')
dt.setDaemon(True)
dt.start()
# wait on the queue until everything has been processed
url_queue.join()
html_queue.join()
if __name__ == '__main__':
main()
复制代码
txt
的速度明显高于写入excel
的速度pageSize
修改成1000
或更大,则能够一次性获取某机构的全部从业人员信息,而不用逐页爬取,效率能够大大提升。