用Flask+Aiohttp+Redis维护动态代理池

在网上有大量公开的免费代理,或者咱们也能够购买付费的代理IP,可是代理不管是免费的仍是付费的,都不能保证都是可用的,由于可能此IP被其余人使用来爬取一样的目标站点而被封禁,或者代理服务器忽然发生故障或网络繁忙。一旦咱们选用了一个不可用的代理,这势必会影响爬虫的工做效率。
html

因此,咱们须要提早作筛选,将不可用的代理剔除掉,保留可用代理。接下来咱们就搭建一个高效易用的代理池。git

1、准备工做

首先须要成功安装Redis数据库并启动服务,另外还须要安装aiohttp、requests、RedisPy、pyquery、Flask库。
github

2、代理池的目标

咱们须要作到下面的几个目标,来实现易用高效的代理池。
redis

基本模块分为4块:存储模块、获取模块、检测模块、接口模块。数据库

  • 存储模块负责存储抓取下来的代理。首先要保证代理不重复,要标识代理的可用状况,还要动态实时处理每一个代理,因此一种比较高效和方便的存储方式就是使用Redis的Sorted Set,即有序集合。json

  • 获取模块须要定时在各大代理网站抓取代理。代理能够是免费公开代理也能够是付费代理,代理的形式都是IP加端口,此模块尽可能从不一样来源获取,尽可能抓取高匿代理,抓取成功以后将可用代理保存到数据库中。flask

  • 检测模块须要定时检测数据库中的代理。这里须要设置一个检测连接,最好是爬取哪一个网站就检测哪一个网站,这样更加有针对性,若是要作一个通用型的代理,那能够设置百度等连接来检测。另外,咱们须要标识每个代理的状态,如设置分数标识,100分表明可用,分数越少表明越不可用。检测一次,若是代理可用,咱们能够将分数标识当即设置为100满分,也能够在原基础上加1分;若是代理不可用,能够将分数标识减1分,当分数减到必定阈值后,代理就直接从数据库移除。经过这样的标识分数,咱们就能够辨别代理的可用状况,选用的时候会更有针对性。api

  • 接口模块须要用API来提供对外服务的接口。其实咱们能够直接链接数据库来取对应的数据,可是这样就须要知道数据库的链接信息,而且要配置链接,而比较安全和方便的方式就是提供一个Web API接口,咱们经过访问接口便可拿到可用代理。另外,因为可用代理可能有多个,那么咱们能够设置一个随机返回某个可用代理的接口,这样就能保证每一个可用代理均可以取到,实现负载均衡。浏览器

以上内容是设计代理的一些基本思路。接下来咱们设计总体的架构,而后用代码实现代理池。安全

3、代理池的架构

根据上文的描述,代理池的架构能够以下图所示。

图片

代理池分为4个模块:存储模块、获取模块、检测模块、接口模块。

  • 存储模块使用Redis的有序集合,用来作代理的去重和状态标识,同时它也是中心模块和基础模块,将其余模块串联起来。

  • 获取模块定时从代理网站获取代理,将获取的代理传递给存储模块,并保存到数据库。

  • 检测模块定时经过存储模块获取全部代理,并对代理进行检测,根据不一样的检测结果对代理设置不一样的标识。

  • 接口模块经过Web API提供服务接口,接口经过链接数据库并经过Web形式返回可用的代理。

4、代理池的实现

接下来,咱们用代码分别实现这4个模块。

1. 存储模块

这里咱们使用Redis的有序集合,集合的每个元素都是不重复的,对于代理池来讲,集合的元素就变成了一个个代理,也就是IP加端口的形式,如60.207.237.111:8888,这样的一个代理就是集合的一个元素。另外,有序集合的每个元素都有一个分数字段,分数是能够重复的,能够是浮点数类型,也能够是整数类型。该集合会根据每个元素的分数对集合进行排序,数值小的排在前面,数值大的排在后面,这样就能够实现集合元素的排序了。

对于代理池来讲,这个分数能够做为判断一个代理是否可用的标志,100为最高分,表明最可用,0为最低分,表明最不可用。若是要获取可用代理,能够从代理池中随机获取分数最高的代理,注意是随机,这样能够保证每一个可用代理都会被调用到。

分数是咱们判断代理稳定性的重要标准,设置分数规则以下所示。

  • 分数100为可用,检测器会定时循环检测每一个代理可用状况,一旦检测到有可用的代理就当即置为100,检测到不可用就将分数减1,分数减至0后代理移除。

  • 新获取的代理的分数为10,若是测试可行,分数当即置为100,不可行则分数减1,分数减至0后代理移除。

这只是一种解决方案,固然可能还有更合理的方案。之因此设置此方案有以下几个缘由。

  • 在检测到代理可用时,分数当即置为100,这样能够保证全部可用代理有更大的机会被获取到。你可能会问,为何不将分数加1而是直接设为最高100呢?设想一下,有的代理是从各大免费公开代理网站获取的,经常一个代理并无那么稳定,平均五次请求可能有两次成功,三次失败,若是按照这种方式来设置分数,那么这个代理几乎不可能达到一个高的分数,也就是说即使它有时是可用的,可是筛选的分数最高,那这样的代理几乎不可能被取到。若是想追求代理稳定性,能够用上述方法,这种方法可确保分数最高的代理必定是最稳定可用的。因此,这里咱们采起“可用即设置100”的方法,确保只要可用的代理均可以被获取到。

  • 在检测到代理不可用时,分数减1,分数减至0后,代理移除。这样一个有效代理若是要被移除须要失败100次,也就是说当一个可用代理若是尝试了100次都失败了,就一直减分直到移除,一旦成功就从新置回100。尝试机会越多,则这个代理拯救回来的机会越多,这样就不容易将曾经的一个可用代理丢弃,由于代理不可用的缘由极可能是网络繁忙或者其余人用此代理请求太过频繁,因此在这里将分数为100。

  • 新获取的代理的分数设置为10,代理若是不可用,分数就减1,分数减到0,代理就移除,若是代理可用,分数就置为100。因为不少代理是从免费网站获取的,因此新获取的代理无效的比例很是高,可能不足10%。因此在这里咱们将分数设置为10,检测的机会没有可用代理的100次那么多,这也能够适当减小开销。

上述代理分数的设置思路不必定是最优思路,但据我的实测,它的实用性仍是比较强的。

如今咱们须要定义一个类来操做数据库的有序集合,定义一些方法来实现分数的设置、代理的获取等。代码实现以下所示:

MAX_SCORE = 100
MIN_SCORE = 0
INITIAL_SCORE = 10
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = None
REDIS_KEY = 'proxies'

import redis
from random import choice

class RedisClient(object):
   def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):
       """
       初始化
       :param host: Redis 地址
       :param port: Redis 端口
       :param password: Redis密码
       """

       self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)

   def add(self, proxy, score=INITIAL_SCORE):
       """
       添加代理,设置分数为最高
       :param proxy: 代理
       :param score: 分数
       :return: 添加结果
       """

       if not self.db.zscore(REDIS_KEY, proxy):
           return self.db.zadd(REDIS_KEY, score, proxy)

   def random(self):
       """
       随机获取有效代理,首先尝试获取最高分数代理,若是最高分数不存在,则按照排名获取,不然异常
       :return: 随机代理
       """

       result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
       if len(result):
           return choice(result)
       else:
           result = self.db.zrevrange(REDIS_KEY, 0, 100)
           if len(result):
               return choice(result)
           else:
               raise PoolEmptyError

   def decrease(self, proxy):
       """
       代理值减一分,分数小于最小值,则代理删除
       :param proxy: 代理
       :return: 修改后的代理分数
       """

       score = self.db.zscore(REDIS_KEY, proxy)
       if score and score > MIN_SCORE:
           print('代理', proxy, '当前分数', score, '减1')
           return self.db.zincrby(REDIS_KEY, proxy, -1)
       else:
           print('代理', proxy, '当前分数', score, '移除')
           return self.db.zrem(REDIS_KEY, proxy)

   def exists(self, proxy):
       """
       判断是否存在
       :param proxy: 代理
       :return: 是否存在
       """

       return not self.db.zscore(REDIS_KEY, proxy) == None

   def max(self, proxy):
       """
       将代理设置为MAX_SCORE
       :param proxy: 代理
       :return: 设置结果
       """

       print('代理', proxy, '可用,设置为', MAX_SCORE)
       return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)

   def count(self):
       """
       获取数量
       :return: 数量
       """

       return self.db.zcard(REDIS_KEY)

   def all(self):
       """
       获取所有代理
       :return: 所有代理列表
       """

       return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)

首先咱们定义了一些常量,如MAX_SCOREMIN_SCOREINITIAL_SCORE分别表明最大分数、最小分数、初始分数。REDIS_HOSTREDIS_PORTREDIS_PASSWORD分别表明了Redis的链接信息,即地址、端口、密码。REDIS_KEY是有序集合的键名,咱们能够经过它来获取代理存储所使用的有序集合。

接下来定义了一个RedisClient类,这个类能够用来操做Redis的有序集合,其中定义了一些方法来对集合中的元素进行处理,它的主要功能以下所示。

  • __init__()方法是初始化的方法,其参数是Redis的链接信息,默认的链接信息已经定义为常量,在__init__()方法中初始化了一个StrictRedis的类,创建Redis链接。

  • add()方法向数据库添加代理并设置分数,默认的分数是INITIAL_SCORE,也就是10,返回结果是添加的结果。

  • random()方法是随机获取代理的方法,首先获取100分的代理,而后随机选择一个返回。若是不存在100分的代理,则此方法按照排名来获取,选取前100名,而后随机选择一个返回,不然抛出异常。

  • decrease()方法是在代理检测无效的时候设置分数减1的方法,代理传入后,此方法将代理的分数减1,若是分数达到最低值,那么代理就删除。

  • exists()方法可判断代理是否存在集合中。

  • max()方法将代理的分数设置为MAX_SCORE,即100,也就是当代理有效时的设置。

  • count()方法返回当前集合的元素个数。

  • all()方法返回全部的代理列表,以供检测使用。

定义好了这些方法,咱们能够在后续的模块中调用此类来链接和操做数据库。如想要获取随机可用的代理,只须要调用random()方法便可,获得的就是随机的可用代理。

2. 获取模块

获取模块的逻辑相对简单,首先要定义一个Crawler来从各大网站抓取代理,示例以下所示:

import json
from .utils import get_page
from pyquery import PyQuery as pq

class ProxyMetaclass(type):
   def __new__(cls, name, bases, attrs):
       count = 0
       attrs['__CrawlFunc__'] = []
       for k, v in attrs.items():
           if 'crawl_' in k:
               attrs['__CrawlFunc__'].append(k)
               count += 1
       attrs['__CrawlFuncCount__'] = count
       return type.__new__(cls, name, bases, attrs)

class Crawler(object, metaclass=ProxyMetaclass):
   def get_proxies(self, callback):
       proxies = []
       for proxy in eval("self.{}()".format(callback)):
           print('成功获取到代理', proxy)
           proxies.append(proxy)
       return proxies

   def crawl_daili66(self, page_count=4):
       """
       获取代理66
       :param page_count: 页码
       :return: 代理
       """

       start_url = 'http://www.66ip.cn/{}.html'
       urls = [start_url.format(page) for page in range(1, page_count + 1)]
       for url in urls:
           print('Crawling', url)
           html = get_page(url)
           if html:
               doc = pq(html)
               trs = doc('.containerbox table tr:gt(0)').items()
               for tr in trs:
                   ip = tr.find('td:nth-child(1)').text()
                   port = tr.find('td:nth-child(2)').text()
                   yield ':'.join([ip, port])

   def crawl_proxy360(self):
       """
       获取Proxy360
       :return: 代理
       """

       start_url = 'http://www.proxy360.cn/Region/China'
       print('Crawling', start_url)
       html = get_page(start_url)
       if html:
           doc = pq(html)
           lines = doc('div[name="list_proxy_ip"]').items()
           for line in lines:
               ip = line.find('.tbBottomLine:nth-child(1)').text()
               port = line.find('.tbBottomLine:nth-child(2)').text()
               yield ':'.join([ip, port])

   def crawl_goubanjia(self):
       """
       获取Goubanjia
       :return: 代理
       """

       start_url = 'http://www.goubanjia.com/free/gngn/index.shtml'
       html = get_page(start_url)
       if html:
           doc = pq(html)
           tds = doc('td.ip').items()
           for td in tds:
               td.find('p').remove()
               yield td.text().replace(' ', '')

方便起见,咱们将获取代理的每一个方法统必定义为以crawl开头,这样扩展的时候只须要添加crawl开头的方法便可。

在这里实现了几个示例,如抓取代理6六、Proxy360、Goubanjia三个免费代理网站,这些方法都定义成了生成器,经过yield返回一个个代理。程序首先获取网页,而后用pyquery解析,解析出IP加端口的形式的代理而后返回。

而后定义了一个get_proxies()方法,将全部以crawl开头的方法调用一遍,获取每一个方法返回的代理并组合成列表形式返回。

你可能会想知道,如何获取全部以crawl开头的方法名称呢?其实这里借助了元类来实现。咱们定义了一个ProxyMetaclassCrawl类将它设置为元类,元类中实现了__new__()方法,这个方法有固定的几个参数,第四个参数attrs中包含了类的一些属性。咱们能够遍历attrs这个参数便可获取类的全部方法信息,就像遍历字典同样,键名对应方法的名称。而后判断方法的开头是否crawl,若是是,则将其加入到__CrawlFunc__属性中。这样咱们就成功将全部以crawl开头的方法定义成了一个属性,动态获取到全部以crawl开头的方法列表。

因此,若是要作扩展,咱们只须要添加一个以crawl开头的方法。例如抓取快代理,咱们只须要在Crawler类中增长crawl_kuaidaili()方法,仿照其余几个方法将其定义成生成器,抓取其网站的代理,而后经过yield返回代理便可。这样,咱们能够很是方便地扩展,而不用关心类其余部分的实现逻辑。

代理网站的添加很是灵活,不只能够添加免费代理,也能够添加付费代理。一些付费代理的提取方式也相似,也是经过Web的形式获取,而后进行解析。解析方式可能更加简单,如解析纯文本或JSON,解析以后以一样的形式返回便可,在此再也不代码实现,能够自行扩展。

既然定义了Crawler类,接下来再定义一个Getter类,用来动态地调用全部以crawl开头的方法,而后获取抓取到的代理,将其加入到数据库存储起来。

from db import RedisClient
from crawler import Crawler

POOL_UPPER_THRESHOLD = 10000

class Getter():
   def __init__(self):
       self.redis = RedisClient()
       self.crawler = Crawler()

   def is_over_threshold(self):
       """
       判断是否达到了代理池限制
       """

       if self.redis.count() >= POOL_UPPER_THRESHOLD:
           return True
       else:
           return False

   def run(self):
       print('获取器开始执行')
       if not self.is_over_threshold():
           for callback_label in range(self.crawler.__CrawlFuncCount__):
               callback = self.crawler.__CrawlFunc__[callback_label]
               proxies = self.crawler.get_proxies(callback)
               for proxy in proxies:
                   self.redis.add(proxy)

Getter类就是获取器类,它定义了一个变量POOL_UPPER_THRESHOLD来表示代理池的最大数量,这个数量能够灵活配置,而后定义了is_over_threshold()方法来判断代理池是否已经达到了容量阈值。is_over_threshold()方法调用了RedisClient的count()方法来获取代理的数量,而后进行判断,若是数量达到阈值,则返回True,不然返回False。若是不想加这个限制,能够将此方法永久返回True

接下来定义run()方法。该方法首先判断了代理池是否达到阈值,而后在这里就调用了Crawler类的__CrawlFunc__属性,获取到全部以crawl开头的方法列表,依次经过get_proxies()方法调用,获得各个方法抓取到的代理,而后再利用RedisClientadd()方法加入数据库,这样获取模块的工做就完成了。

3. 检测模块

咱们已经成功将各个网站的代理获取下来了,如今就须要一个检测模块来对全部代理进行多轮检测。代理检测可用,分数就设置为100,代理不可用,分数减1,这样就能够实时改变每一个代理的可用状况。如要获取有效代理只须要获取分数高的代理便可。

因为代理的数量很是多,为了提升代理的检测效率,咱们在这里使用异步请求库aiohttp来进行检测。

requests做为一个同步请求库,咱们在发出一个请求以后,程序须要等待网页加载完成以后才能继续执行。也就是这个过程会阻塞等待响应,若是服务器响应很是慢,好比一个请求等待十几秒,那么咱们使用requests完成一个请求就会须要十几秒的时间,程序也不会继续往下执行,而在这十几秒的时间里程序其实彻底能够去作其余的事情,好比调度其余的请求或者进行网页解析等。

异步请求库就解决了这个问题,它相似JavaScript中的回调,即在请求发出以后,程序能够继续执行去作其余的事情,当响应到达时,程序再去处理这个响应。因而,程序就没有被阻塞,能够充分利用时间和资源,大大提升效率。

对于响应速度比较快的网站来讲,requests同步请求和aiohttp异步请求的效果差距没那么大。可对于检测代理来讲,检测一个代理通常须要十多秒甚至几十秒的时间,这时候使用aiohttp异步请求库的优点就大大致现出来了,效率可能会提升几十倍不止。

因此,咱们的代理检测使用异步请求库aiohttp,实现示例以下所示:

VALID_STATUS_CODES = [200]
TEST_URL = 'http://www.baidu.com'
BATCH_TEST_SIZE = 100

class Tester(object):
   def __init__(self):
       self.redis = RedisClient()

   async def test_single_proxy(self, proxy):
       """
       测试单个代理
       :param proxy: 单个代理
       :return: None
       """

       conn = aiohttp.TCPConnector(verify_ssl=False)
       async with aiohttp.ClientSession(connector=conn) as session:
           try:
               if isinstance(proxy, bytes):
                   proxy = proxy.decode('utf-8')
               real_proxy = 'http://' + proxy
               print('正在测试', proxy)
               async with session.get(TEST_URL, proxy=real_proxy, timeout=15) as response:
                   if response.status in VALID_STATUS_CODES:
                       self.redis.max(proxy)
                       print('代理可用', proxy)
                   else:
                       self.redis.decrease(proxy)
                       print('请求响应码不合法', proxy)
           except (ClientError, ClientConnectorError, TimeoutError, AttributeError):
               self.redis.decrease(proxy)
               print('代理请求失败', proxy)

   def run(self):
       """
       测试主函数
       :return: None
       """

       print('测试器开始运行')
       try:
           proxies = self.redis.all()
           loop = asyncio.get_event_loop()
           # 批量测试
           for i in range(0, len(proxies), BATCH_TEST_SIZE):
               test_proxies = proxies[i:i + BATCH_TEST_SIZE]
               tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
               loop.run_until_complete(asyncio.wait(tasks))
               time.sleep(5)
       except Exception as e:
           print('测试器发生错误', e.args)

这里定义了一个类Tester__init__()方法中创建了一个RedisClient对象,供该对象中其余方法使用。接下来定义了一个test_single_proxy()方法,这个方法用来检测单个代理的可用状况,其参数就是被检测的代理。注意,test_single_proxy()方法前面加了async关键词,这表明这个方法是异步的。方法内部首先建立了aiohttp的ClientSession对象,此对象相似于requests的Session对象,能够直接调用该对象的get()方法来访问页面。在这里,代理的设置是经过proxy参数传递给get()方法,请求方法前面也须要加上async关键词来标明其是异步请求,这也是aiohttp使用时的常见写法。

测试的连接在这里定义为常量TEST_URL。若是针对某个网站有抓取需求,建议将TEST_URL设置为目标网站的地址,由于在抓取的过程当中,代理自己多是可用的,可是该代理的IP已经被目标网站封掉了。例如,某些代理能够正常访问百度等页面,可是对知乎来讲可能就被封了,因此咱们能够将TEST_URL设置为知乎的某个页面的连接,当请求失败、代理被封时,分数天然会减下来,失效的代理就不会被取到了。

若是想作一个通用的代理池,则不须要专门设置TEST_URL,能够将其设置为一个不会封IP的网站,也能够设置为百度这类响应稳定的网站。

咱们还定义了VALID_STATUS_CODES变量,这个变量是一个列表形式,包含了正常的状态码,如能够定义成[200]。固然某些目标网站可能会出现其余的状态码,能够自行配置。

程序在获取Response后须要判断响应的状态,若是状态码在VALID_STATUS_CODES列表里,则表明代理可用,能够调用RedisClientmax()方法将代理分数设为100,不然调用decrease()方法将代理分数减1,若是出现异常也一样将代理分数减1。

另外,咱们设置了批量测试的最大值BATCH_TEST_SIZE为100,也就是一批测试最多100个,这能够避免代理池过大时一次性测试所有代理致使内存开销过大的问题。

随后,在run()方法里面获取了全部的代理列表,使用aiohttp分配任务,启动运行,这样就能够进行异步检测了。可参考aiohttp的官方示例:http://aiohttp.readthedocs.io/。

这样,测试模块的逻辑就完成了。

4. 接口模块

经过上述三个模块,咱们已经能够作到代理的获取、检测和更新,数据库就会以有序集合的形式存储各个代理及其对应的分数,分数100表明可用,分数越小表明越不可用。

可是咱们怎样方便地获取可用代理呢?能够用RedisClient类直接链接Redis,而后调用random()方法。这样作没问题,效率很高,可是会有几个弊端。

  • 若是其余人使用这个代理池,他须要知道Redis链接的用户名和密码信息,这样很不安全。

  • 若是代理池须要部署在远程服务器上运行,而远程服务器的Redis只容许本地链接,那么咱们就不能远程直连Redis来获取代理。

  • 若是爬虫所在的主机没有链接Redis模块,或者爬虫不是由Python语言编写的,那么咱们就没法使用RedisClient来获取代理。

  • 若是RedisClient类或者数据库结构有更新,那么爬虫端必须同步这些更新,这样很是麻烦。

综上考虑,为了使代理池能够做为一个独立服务运行,咱们最好增长一个接口模块,并以Web API的形式暴露可用代理。

这样一来,获取代理只须要请求接口便可,以上的几个缺点弊端也能够避免。

咱们使用一个比较轻量级的库Flask来实现这个接口模块,实现示例以下所示:

from flask import Flask, g
from db import RedisClient

__all__ = ['app']
app = Flask(__name__)

def get_conn():
   if not hasattr(g, 'redis'):
       g.redis = RedisClient()
   return g.redis

@app.route('/')
def index():
   return '<h2>Welcome to Proxy Pool System</h2>'

@app.route('/random')
def get_proxy():
   """
   获取随机可用代理
   :return: 随机代理
   """

   conn = get_conn()
   return conn.random()

@app.route('/count')
def get_counts():
   """
   获取代理池总量
   :return: 代理池总量
   """

   conn = get_conn()
   return str(conn.count())

if __name__ == '__main__':
   app.run()

在这里,咱们声明了一个Flask对象,定义了三个接口,分别是首页、随机代理页、获取数量页。

运行以后,Flask会启动一个Web服务,咱们只须要访问对应的接口便可获取到可用代理。

5. 调度模块

调度模块就是调用以上所定义的三个模块,将这三个模块经过多进程的形式运行起来,示例以下所示:

TESTER_CYCLE = 20
GETTER_CYCLE = 20
TESTER_ENABLED = True
GETTER_ENABLED = True
API_ENABLED = True

from multiprocessing import Process
from api import app
from getter import Getter
from tester import Tester

class Scheduler():
   def schedule_tester(self, cycle=TESTER_CYCLE):
       """
       定时测试代理
       """

       tester = Tester()
       while True:
           print('测试器开始运行')
           tester.run()
           time.sleep(cycle)

   def schedule_getter(self, cycle=GETTER_CYCLE):
       """
       定时获取代理
       """

       getter = Getter()
       while True:
           print('开始抓取代理')
           getter.run()
           time.sleep(cycle)

   def schedule_api(self):
       """
       开启API
       """

       app.run(API_HOST, API_PORT)

   def run(self):
       print('代理池开始运行')
       if TESTER_ENABLED:
           tester_process = Process(target=self.schedule_tester)
           tester_process.start()

       if GETTER_ENABLED:
           getter_process = Process(target=self.schedule_getter)
           getter_process.start()

       if API_ENABLED:
           api_process = Process(target=self.schedule_api)
           api_process.start()

三个常量TESTER_ENABLEDGETTER_ENABLEDAPI_ENABLED都是布尔类型,表示测试模块、获取模块、接口模块的开关,若是都为True,则表明模块开启。

启动入口是run()方法,这个方法分别判断三个模块的开关。若是开关开启,启动时程序就新建一个Process进程,设置好启动目标,而后调用start()方法运行,这样三个进程就能够并行执行,互不干扰。

三个调度方法结构也很是清晰。好比,schedule_tester()方法用来调度测试模块,首先声明一个Tester对象,而后进入死循环不断循环调用其run()方法,执行完一轮以后就休眠一段时间,休眠结束以后从新再执行。在这里,休眠时间也定义为一个常量,如20秒,即每隔20秒进行一次代理检测。

最后,只须要调用Scheduler的run()方法便可启动整个代理池。

以上内容即是整个代理池的架构和相应实现逻辑。

5、运行

接下来,咱们将代码整合一下,将代理运行起来,运行以后的输出结果以下图所示。

图片

以上是代理池的控制台输出,能够看到,可用代理设置为100,不可用代理分数减1。

咱们再打开浏览器,当前配置了运行在5555端口,因此打开http://127.0.0.1:5555,便可看到其首页,以下图所示。

图片

再访问:http://127.0.0.1:5555/random,便可获取随机可用代理,以下图所示。

图片

咱们只须要访问此接口便可获取一个随机可用代理,这很是方便。

获取代理的代码以下所示:

import requests

PROXY_POOL_URL = 'http://localhost:5555/random'

def get_proxy():
   try:
       response = requests.get(PROXY_POOL_URL)
       if response.status_code == 200:
           return response.text
   except ConnectionError:
       return None

以后即是一个字符串类型的代理,此代理能够按照上一节所示的方法设置,如requests的使用方法以下所示:

import requests

proxy = get_proxy()
proxies = {
   'http': 'http://' + proxy,
   'https': 'https://' + proxy,
}
try:
   response = requests.get('http://httpbin.org/get', proxies=proxies)
   print(response.text)
except requests.exceptions.ConnectionError as e:
   print('Error', e.args)

有了代理池以后,咱们再取出代理便可有效防止IP被封禁的状况。

6、本节代码

本节代码地址为:https://github.com/Python3WebSpider/ProxyPool。

7、结语

本节实现了一个比较高效的代理池,来获取随机可用的代理。接下来,咱们会利用代理池来实现数据的抓取。

相关文章
相关标签/搜索