代理池——代理采集,测试,保存和接口使用

代理池的设置主要有四部

  1. 获取代理
  2. 代理测试
  3. 数据存储
  4. API接口

 

1.1先设置须要获取的代理的网站和解析规则

config.pyhtml

# 全部网站得解析方式
 parse_list = [ { 'urls': ['http://www.66ip.cn/{}.html'.format(n) for n in range(1, 10)], # 66代理 'pattern': '//div[@id="main"]//div/div/table//tr[position()>1]', 'position': {'ip': './td[1]', 'port': './td[2]', 'address': './td[3]'}, }, { 'urls':['https://www.kuaidaili.com/free/inha/{}/'.format(n) for n in range(1, 10)], # 快代理 'pattern': '//div[@id="list"]/table/tbody/tr', 'position': {'ip': './td[1]', 'port': './td[2]', 'address': './td[5]'}, }, { 'urls':['http://www.xicidaili.com/nn/{}'.format(n) for n in range(1, 5)], # 西刺代理 'pattern': '//table[@id="ip_list"]/tr[position()>1]', 'position': {'ip': './td[2]', 'port': './td[3]', 'address': './td[4]/a/'} } ] DEFAULT_HEADERS = { 'User-Agent': 'ozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', }

 

1.2 设置数据解析的方法sql

Parser.py数据库

# 解析方法
import re from lxml import etree class Parser_response(object): # 对传递过来的html代码进行数据解析
 @staticmethod def parser_xpath(response, parser): '''根据parser规则对response进行解析''' etre = etree.HTML(response) items = etre.xpath(parser['pattern']) proxy_list = [] for item in items: try: # 解析数据
                ip = item.xpath(parser['position']['ip'])[0].text port = item.xpath(parser['position']['port'])[0].text address = item.xpath(parser['position']['address'])[0].text except Exception: continue
            else: proxy = {'ip': ip, 'port': port, 'address': address} # 设置成字典格式
                proxy_list.append(proxy) # 加入到列表中返回

        return proxy_list @staticmethod def parser_re(): pass

1.3 主程序中调用他们proxy_pool.pyflask

 
 
from Parser import Parser_response # 解析过程


'''
代理url的请求测试''' class Download_HTML(object): @staticmethod def download(url): try: print('正在请求{}.....'.format(url)) r = requests.get(url, headers=DEFAULT_HEADERS) # 对代理的网站url进行请求 r.encoding = 'utf-8' # 设置编码格式 if not r.ok: # 请求不成功 raise ConnectionError else: return r.text # 成功返回网络代码 except Exception as e: print('请求失败,异常: ', e) ....... def crawl(self, parser): '''对配置文件中的网站进行解析''' Download = Download_HTML() # 实例化代理网站 for url in parser['urls']: # 取出url列表,多页 time.sleep(2) # 延时两秒 response = Download.download(url) # 调用方法获取网页的html代码 if not response: return None # 将html代码和代码解析规则传给相应类的方法,获取返回的代理信息列表(ip,port, address) proxy_list = Parser_response.parser_xpath(response, parser) for proxy in proxy_list: # 遍历 while True: if self.queue1.full(): # 若是队列满了的话 time.sleep(1) else: self.queue1.put(proxy) # 没满就加入到队列queue1中 break

 

2代理的检测

def detect(queue1, queue2): #queue1是全部的代理数据,queue2是可用的代理的数据 '''对queue1中的代理进行检测''' proxy_list = [] while True: if not queue1.empty(): # 若是队列不为空
            proxy = queue1.get() # 从队列取出数据
            proxy_list.append(proxy) # 再将数据添加到列表中

            if len(proxy_list) > 9: # 若是列表中得数量到10时
                spanws = [] for proxy in proxy_list: # 遍历列表
                    spanws.append(gevent.spawn(detect_baidu, proxy, queue2)) # 设置协程,传入方法和变量(代理和一个表明可用队列的队列)
 gevent.joinall(spanws) # 开启协程
                proxy_list = [] # 将列表清空

'''代理测试放入可用队列'''
def detect_baidu(proxy, queue2=None): '''请求测试''' proxies = { 'http': 'http://{}:{}'.format(proxy['ip'], proxy['port']), 'https': 'https://{}:{}'.format(proxy['ip'], proxy['port']), } # 完善代理的数据
    result = None # 设置初始标志None
    try: print(proxies) # 用代理进行网络请求以测试代理是否可用
        result = requests.get(url='https://www.baidu.com', headers=DEFAULT_HEADERS, proxies=proxies, timeout=5) except Exception: print('代理不可用,已丢弃....') '''detect_baidu方法被调用了两次,主要看调用时是否传入了queue2这个参数, 若是加了queue2这个参数,表示调用判断代理可用性决定加入到可用队列queue2中; 没有带queue2这个参数的话,表示可用代理回测,返回的标志决定了queue2中的代理是否删除 '''
    # 判断加入到可用队列与否(主要看是否传入了queue2)
    if result and queue2: print('代理{}可用,已添加到可用队列....'.format(proxy)) queue2.put(proxy) # 判断加入到数据库中与否的前提(主要看是否传入了queue2)
    elif result: return True

 

3 数据的存储

3.1 数据库的方法设置网络

Mongo_DB.py并发

import pymongo class MongoHelper(object): def __init__(self): self.client = pymongo.MongoClient(host='127.0.0.1', port=27017) self.db = self.client['tanzhou_homework'] self.proxy = self.db['proxy'] def insert(self, data=None): #
        if data: self.proxy.save(data) def delete(self, data=None): #
        if data: self.proxy.remove(data) def update(self, data, conditions): #
        if data and conditions: self.proxy.update(data, {'$set': conditions}) def select(self, data=None): #
        if data: items = self.proxy.find(data) else: data={} items = self.proxy.find(data) results = [] for item in items: result = (item['ip'], item['port'], item['address']) results.append(result) return results if __name__ == '__main__': mongo = MongoHelper() 

 

 

3.2 主程序中调用proxy_pool.pyapp

from Mongo_DB import MongoHelper # 数据库操做
 mongo = MongoHelper() def insert_sql(queue2): '''读取queue2里面得数据再添加到数据库'''
    while True: proxy = queue2.get() if proxy: mongo.insert(proxy) # print('代理{}已经成功添加到数据库'.format(proxy))

 

4 API接口

Service.pydom

from random import choice from flask import Flask from Mongo_DB import MongoHelper mongo = MongoHelper() app = Flask(__name__) @app.route('/') # 路由根目录r
def index(): result = mongo.select() # 获取IP列表
    text = choice(result) # 随机选择一个出来
    return '{}:{}'.format(text[0], text[1]) # 返回

def start_service(): app.run(debug=True, host='0.0.0.0', port=9999) if __name__ == '__main__': start_service() 

主程序中ide

from Service import start_service # 接口配置


if __name__ == '__main__': p4 = Process(target=start_service) # 接口

 

 


 

 

完整代码以下:

config.py高并发

# 全部网站得解析方式
 parse_list = [ { 'urls': ['http://www.66ip.cn/{}.html'.format(n) for n in range(1, 10)], 'pattern': '//div[@id="main"]//div/div/table//tr[position()>1]', 'position': {'ip': './td[1]', 'port': './td[2]', 'address': './td[3]'}, }, { 'urls':['https://www.kuaidaili.com/free/inha/{}/'.format(n) for n in range(1, 10)], 'pattern': '//div[@id="list"]/table/tbody/tr', 'position': {'ip': './td[1]', 'port': './td[2]', 'address': './td[5]'}, }, { 'urls':['http://www.xicidaili.com/nn/{}'.format(n) for n in range(1, 5)], 'pattern': '//table[@id="ip_list"]/tr[position()>1]', 'position': {'ip': './td[2]', 'port': './td[3]', 'address': './td[4]/a/'} } ] DEFAULT_HEADERS = { 'User-Agent': 'ozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', }

Mongo_DB.py

import pymongo class MongoHelper(object): def __init__(self): self.client = pymongo.MongoClient(host='127.0.0.1', port=27017) self.db = self.client['tanzhou_homework'] self.proxy = self.db['proxy'] def insert(self, data=None): #
        if data: self.proxy.save(data) def delete(self, data=None): #
        if data: self.proxy.remove(data) def update(self, data, conditions): #
        if data and conditions: self.proxy.update(data, {'$set': conditions}) def select(self, data=None): #
        if data: items = self.proxy.find(data) else: data={} items = self.proxy.find(data) results = [] for item in items: result = (item['ip'], item['port'], item['address']) results.append(result) return results if __name__ == '__main__': mongo = MongoHelper() 

Parser.py

# 解析方法
import re from lxml import etree class Parser_response(object): # 对传递过来的html代码进行数据解析
 @staticmethod def parser_xpath(response, parser): '''根据parser规则对response进行解析''' etre = etree.HTML(response) items = etre.xpath(parser['pattern']) proxy_list = [] for item in items: try: # 解析数据
                ip = item.xpath(parser['position']['ip'])[0].text port = item.xpath(parser['position']['port'])[0].text address = item.xpath(parser['position']['address'])[0].text except Exception: continue
            else: proxy = {'ip': ip, 'port': port, 'address': address} # 设置成字典格式
                proxy_list.append(proxy) # 加入到列表中返回

        return proxy_list @staticmethod def parser_re(): pass

Service.py

from random import choice from flask import Flask from Mongo_DB import MongoHelper mongo = MongoHelper() app = Flask(__name__) @app.route('/') # 路由根目录r
def index(): result = mongo.select() # 获取IP列表
    text = choice(result) # 随机选择一个出来
    return '{}:{}'.format(text[0], text[1]) # 返回

def start_service(): app.run(debug=True, host='0.0.0.0', port=9999) if __name__ == '__main__': start_service() 

proxy_pool.py

import time import requests from multiprocessing import Process, Queue import gevent from gevent import monkey;monkey.patch_all() from config import parse_list, DEFAULT_HEADERS # 代理网站及相应解析规则,请求头
from Parser import Parser_response  # 解析过程
from Mongo_DB import MongoHelper # 数据库操做
from Service import start_service # 接口配置
 mongo = MongoHelper() '''代理url的请求测试'''
class Download_HTML(object): @staticmethod def download(url): try: print('正在请求{}.....'.format(url)) r = requests.get(url, headers=DEFAULT_HEADERS) # 对代理的网站url进行请求
            r.encoding = 'utf-8' # 设置编码格式
            if not r.ok: # 请求不成功
                raise ConnectionError else: return r.text # 成功返回网络代码
        except Exception as e: print('请求失败,异常: ', e) '''爬取代理数据并添加到队列中'''
class Proxy_crawl(object): porxies_set = set() # 空集合,查重用

    def __init__(self, queue1): self.queue1 = queue1 def open_spider(self): while True: proxy_list = mongo.select() # 获取数据库中得全部代理
            spawns = [] for proxy in proxy_list: # 协程,再进行可用队列中回测代理可用性
 spawns.append(gevent.spawn(detece_from_db,proxy,self.porxies_set)) gevent.joinall(spawns) if len(self.porxies_set) < 5: # 当集合数量小于5的时候
                print('当前可用代理数量为{},开始获取代理...'.format(len(self.porxies_set))) for parser in parse_list: self.crawl(parser) # 继续爬取
            else: print('当前可用代理数量大于阈值....开始休眠') time.sleep(300) def crawl(self, parser): '''对配置文件中的网站进行解析''' Download = Download_HTML() # 实例化代理网站
        for url in parser['urls']: # 取出url列表,多页
            time.sleep(2) # 延时两秒
            response = Download.download(url) # 调用方法获取网页的html代码
            if not response: return None # 将html代码和代码解析规则传给相应类的方法,获取返回的代理信息列表(ip,port, address)
            proxy_list = Parser_response.parser_xpath(response, parser) for proxy in proxy_list: # 遍历
                while True: if self.queue1.full(): # 若是队列满了的话
                        time.sleep(1) else: self.queue1.put(proxy) # 没满就加入到队列queue1中
                        break

def detece_from_db(proxy, porxies_set): proxy = {'ip':proxy[0], 'port':proxy[1]} result = detect_baidu(proxy) # 对数据库中的代理进行回测
    if not result: mongo.delete(proxy) # 不可用就删除
        return None porxies_set.add('{}:{}'.format(proxy['ip'], proxy['port'])) # 可用就添加到set集合中去

'''设置协程实现高并发'''
def detect(queue1, queue2): '''对queue1中的代理进行检测''' proxy_list = [] while True: if not queue1.empty(): # 若是队列不为空
            proxy = queue1.get() # 从队列取出数据
            proxy_list.append(proxy) # 再将数据添加到列表中

            if len(proxy_list) > 9: # 若是列表中得数量到10时
                spanws = [] for proxy in proxy_list: # 遍历列表
                    spanws.append(gevent.spawn(detect_baidu, proxy, queue2)) # 设置协程,传入方法和变量(代理和一个表明可用队列的队列)
 gevent.joinall(spanws) # 开启协程
                proxy_list = [] # 将列表清空

'''代理测试放入可用队列'''
def detect_baidu(proxy, queue2=None): '''请求测试''' proxies = { 'http': 'http://{}:{}'.format(proxy['ip'], proxy['port']), 'https': 'https://{}:{}'.format(proxy['ip'], proxy['port']), } # 完善代理的数据
    result = None # 设置初始标志None
    try: print(proxies) # 用代理进行网络请求以测试代理是否可用
        result = requests.get(url='https://www.baidu.com', headers=DEFAULT_HEADERS, proxies=proxies, timeout=5) except Exception: print('代理不可用,已丢弃....') '''detect_baidu方法被调用了两次,主要看调用时是否传入了queue2这个参数, 若是加了queue2这个参数,表示调用判断代理可用性决定加入到可用队列queue2中; 没有带queue2这个参数的话,表示可用代理回测,返回的标志决定了queue2中的代理是否删除 '''
    # 判断加入到可用队列与否(主要看是否传入了queue2)
    if result and queue2: print('代理{}可用,已添加到可用队列....'.format(proxy)) queue2.put(proxy) # 判断加入到数据库中与否的前提(主要看是否传入了queue2)
    elif result: return True '''爬取代理数据的前提调用'''
def start_crawl(queue1): p = Proxy_crawl(queue1) # 类的实例化
 p.open_spider() def insert_sql(queue2): '''读取queue2里面得数据再添加到数据库'''
    while True: proxy = queue2.get() if proxy: mongo.insert(proxy) # print('代理{}已经成功添加到数据库'.format(proxy))


if __name__ == '__main__': q1 = Queue() # 全部代理的队列
    q2 = Queue() # 可用代理的队列
    p1 = Process(target=detect, args=(q1, q2)) # 代理检测
    p2 = Process(target=start_crawl, args=(q1, )) # 下载代理
    p3 = Process(target=insert_sql, args=(q2, )) # 存入数据
    p4 = Process(target=start_service) # 接口
 p2.start() # 开启进程
 p1.start() p3.start() p4.start() 
相关文章
相关标签/搜索