Python爬虫进阶 | 异步协程

1、背景

  以前爬虫使用的是requests+多线程/多进程,后来随着前几天的深刻了解,才发现,对于爬虫来讲,真正的瓶颈并非CPU的处理速度,而是对于网页抓取时候的往返时间,由于若是采用requests+多线程/多进程,他自己是阻塞式的编程,因此时间都花费在了等待网页结果的返回和对爬取到的数据的写入上面。而若是采用非阻塞编程,那么就没有这个困扰。这边首先要理解一下阻塞和非阻塞的区别。php

(1)阻塞调用是指调用结果返回以前,当前线程会被挂起(线程进入非可执行状态,在这个状态下,CPU不会给线程分配时间片,即线程暂停运行)。函数只有在获得结果以后才会返回。python

(2)对于非阻塞则不会挂起,直接执行接下去的程序,返回结果后再回来处理返回值。程序员

 

  其实爬虫的本质就是client发请求,批量获取server的响应数据,若是咱们有多个url待爬取,只用一个线程且采用串行的方式执行,那只能等待爬取一个结束后才能继续下一个,效率会很是低。须要强调的是:对于单线程下串行N个任务,并不彻底等同于低效,若是这N个任务都是纯计算的任务,那么该线程对cpu的利用率仍然会很高,之因此单线程下串行多个爬虫任务低效,是由于爬虫任务是明显的IO密集型(阻塞)程序。那么该如何提升爬取性能呢?express

 

2、基本概念

2.1 阻塞编程

阻塞状态指程序未获得所需计算资源时被挂起的状态。程序在等待某个操做完成期间,自身没法继续干别的事情,则称该程序在该操做上是阻塞的。flask

常见的阻塞形式有:网络 I/O 阻塞、磁盘 I/O 阻塞、用户输入阻塞等。阻塞是无处不在的,包括 CPU 切换上下文时,全部的进程都没法真正干事情,它们也会被阻塞。若是是多核 CPU 则正在执行上下文切换操做的核不可被利用。服务器

 

2.2 非阻塞网络

程序在等待某操做过程当中,自身不被阻塞,能够继续运行干别的事情,则称该程序在该操做上是非阻塞的。session

非阻塞并非在任何程序级别、任何状况下均可以存在的。仅当程序封装的级别能够囊括独立的子程序单元时,它才可能存在非阻塞状态。多线程

非阻塞的存在是由于阻塞存在,正由于某个操做阻塞致使的耗时与效率低下,咱们才要把它变成非阻塞的。

 

2.3 同步

不一样程序单元为了完成某个任务,在执行过程当中需靠某种通讯方式以协调一致,称这些程序单元是同步执行的。例如购物系统中更新商品库存,须要用“行锁”做为通讯信号,让不一样的更新请求强制排队顺序执行,那更新库存的操做是同步的。简言之,同步意味着有序。

 

2.4 异步

为完成某个任务,不一样程序单元之间过程当中无需通讯协调,也能完成任务的方式,不相关的程序单元之间能够是异步的例如,爬虫下载网页。调度程序调用下载程序后,便可调度其余任务,而无需与该下载任务保持通讯以协调行为。不一样网页的下载、保存等操做都是无关的,也无需相互通知协调。这些异步操做的完成时刻并不肯定。简言之,异步意味着无序。

 

2.5 多进程

多进程就是利用 CPU 的多核优点,在同一时间并行地执行多个任务,能够大大提升执行效率。

 

2.6 协程

协程,英文叫作 Coroutine,又称微线程,纤程,协程是一种用户态的轻量级线程。

协程拥有本身的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。所以协程能保留上一次调用时的状态,即全部局部状态的一个特定组合,每次过程重入时,就至关于进入上一次调用的状态。

协程本质上是个单进程,协程相对于多进程来讲,无需线程上下文切换的开销,无需原子操做锁定及同步的开销,编程模型也很是简单。

咱们可使用协程来实现异步操做,好比在网络爬虫场景下,咱们发出一个请求以后,须要等待必定的时间才能获得响应,但其实在这个等待过程当中,程序能够干许多其余的事情,等到响应获得以后才切换回来继续处理,这样能够充分利用 CPU 和其余资源,这就是异步协程的优点。

 

3、分析处理 

  同步调用:即提交一个任务后就在原地等待任务结束,等到拿到任务的结果后再继续下一行代码,效率低

import requests

def get_page(url):
    print('下载 %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        return response.text

def parse_page(res):
    print('解析 %s' %(len(res)))


def main():
    urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
    for url in urls:
        res=get_page(url)                         #调用一个任务,就在原地等待任务结束拿到结果后才继续日后执行
        parse_page(res)

if __name__ == "__main__":
    main()

a. 解决同步调用方案之多线程/多进程

好处:在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每一个链接都拥有独立的线程(或进程),这样任何一个链接的阻塞都不会影响其余的链接。

弊端:开启多进程或都线程的方式,咱们是没法无限制地开启多进程或多线程的:在遇到要同时响应成百上千路的链接请求,则不管多线程仍是多进程都会严重占据系统资源,下降系统对外界响应效率,并且线程与进程自己也更容易进入假死状态。

b. 解决同步调用方案之线程/进程池

好处:不少程序员可能会考虑使用线程池链接池线程池旨在减小建立和销毁线程的频率,其维持必定合理数量的线程,并让空闲的线程从新承担新的执行任务。能够很好的下降系统开销。

弊端:线程池链接池技术也只是在必定程度上缓解了频繁调用IO接口带来的资源占用。并且,所谓始终有其上限,当请求大大超过上限时,构成的系统对外界的响应并不比没有池的时候效果好多少。因此使用必须考虑其面临的响应规模,并根据响应规模调整的大小。

 

案例:基于multiprocessing.dummy线程池爬取梨视频的视频信息

import requests
import re
from lxml import etree
from multiprocessing.dummy import Pool
import random

header = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36'
}

def get_page(url):
    response = requests.get(url=url,headers=header)
    if response.status_code == 200:
        return response.text
    return None

def parse_page(res):
    tree = etree.HTML(res)
    li_list = tree.xpath('//div[@id="listvideoList"]/ul/li')

    video_url_list = []
    for li in li_list:
        detail_url = 'https://www.pearvideo.com/' + li.xpath('./div/a/@href')[0]
        detail_page = requests.get(url=detail_url, headers=header).text
        video_url = re.findall('srcUrl="(.*?)",vdoUrl', detail_page, re.S)[0]
        video_url_list.append(video_url)

    return video_url_list

# 获取视频
def getVideoData(url):
    return requests.get(url=url, headers=header).content

# 持久化存储
def saveVideo(data):
    fileName = str(random.randint(0, 5000)) + '.mp4'  # 因回调函数只能传一个参数,因此没办法再传名字了,只能本身取名
    with open(fileName, 'wb') as fp:
        fp.write(data)

def main():
    url = 'https://www.pearvideo.com/category_1'
    res = get_page(url)
    links = parse_page(res)


    pool = Pool(5)  # 实例化一个线程池对象

    #  pool.map(回调函数,可迭代对象)函数依次执行对象
    video_data_list = pool.map(getVideoData, links)
    pool.map(saveVideo, video_data_list)
    
 pool.close() pool.join() if __name__== "__main__":
    main()

总结:对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,线程池链接池或许能够缓解部分压力,可是不能解决全部问题。总之,多线程模型能够方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,能够用非阻塞接口来尝试解决这个问题。

 

终极处理方案

  上述不管哪一种方案都没有解决一个性能相关的问题:IO阻塞,不管是多进程仍是多线程,在遇到IO阻塞时都会被操做系统强行剥夺走CPU的执行权限,程序的执行效率所以就下降了下来。

  解决这一问题的关键在于,咱们本身从应用程序级别检测IO阻塞而后切换到咱们本身程序的其余任务执行,这样把咱们程序的IO降到最低,咱们的程序处于就绪态就会增多,以此来迷惑操做系统,操做系统便觉得咱们的程序是IO比较少的程序,从而会尽量多的分配CPU给咱们,这样也就达到了提高程序执行效率的目的。

  实现方式:单线程+协程实现异步IO操做。

  异步IO:就是你发起一个 网络IO 操做,却不用等它结束,你能够继续作其余事情,当它结束时,你会获得通知。

 

4、 异步协程

python3.4以后新增了asyncio模块,能够帮咱们检测IO(只能是网络IOHTTP链接就是网络IO操做】),实现应用程序级别的切换(异步IO)。注意:asyncio只能发tcp级别的请求,不能发http协议。

asyncio 是干什么的?

  • 异步网络操做
  • 并发
  • 协程 

 

几个概念:

event_loop:事件循环,至关于一个无限循环,咱们能够把一些函数注册到这个事件循环上,当知足条件发生的时候,就会调用对应的处理方法。

coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,咱们能够将协程对象注册到时间循环中,它会被事件循环调用。咱们可使用 async 关键字来定义一个方法,这个方法在调用时不会当即被执行,而是返回一个协程对象。

task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。

future:表明未来执行或没有执行的任务的结果,实际上和 task 没有本质区别。

async关键字async 定义一个协程;

await 关键字用来挂起阻塞方法的执行。

 

注意事项:在特殊函数内部不能够出现不支持异步模块相关的代码。(例:time,request)

 

1.定义一个协程

import asyncio

async def execute(x):
    print('Number:', x)


coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine) print('After calling loop'

运行结果:

Coroutine: <coroutine object execute at 0x1034cf830>

After calling execute

Number: 1

After calling loop

可见,async 定义的方法就会变成一个没法直接执行的 coroutine 对象,必须将其注册到事件循环中才能够执行。

上文咱们还提到了 task,它是对 coroutine 对象的进一步封装,它里面相比 coroutine 对象多了运行状态,好比 runningfinished 等,咱们能够用这些状态来获取协程对象的执行状况。

 

在上面的例子中,当咱们 coroutine 对象传递给 run_until_complete() 方法的时候,实际上它进行了一个操做就是将 coroutine 封装成了 task 对象咱们也能够显式地进行声明,以下所示:

import asyncio

async def execute(x):
    print('Number:',x)
    return x

coroutine = execute(1)
print('Coroutine:',coroutine)print('After calling execute')

loop = asyncio.get_event_loop()
task = loop.create_task(coroutine) print('Task:',task)

loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')

运行结果:

Coroutine: <coroutine object execute at 0x10e0f7830>

After calling execute

Task: <Task pending coro=<execute() running at demo.py:4>>

Number: 1

Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1>

After calling loop

这里咱们定义了 loop 对象以后,接着调用了它的 create_task() 方法将 coroutine 对象转化为了 task 对象,随后咱们打印输出一下,发现它是 pending 状态。接着咱们将 task 对象添加到事件循环中获得执行,随后咱们再打印输出一下 task 对象,发现它的状态就变成了 finished,同时还能够看到其 result 变成了 1,也就是咱们定义的 execute() 方法的返回结果。

 

另外,定义 task 对象还有一种方式,就是直接经过 asyncio ensure_future() 方法,返回结果也是 task 对象,这样的话咱们就能够不借助于 loop 来定义,即便咱们尚未声明 loop 也能够提早定义好 task 对象,写法以下:

import asyncio

async def execute(x):
    print('Number:',x)
    return x

coroutine = execute(1)
print('Coroutine:',coroutine)
print('After calling execute')

task=asyncio.ensure_future(coroutine) print('Task:',task)

loop=asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')

2.绑定回调:也能够为某个 task 绑定一个回调方法.

import asyncio
import requests

async def request():
    url='https://www.baidu.com'
    status = requests.get(url).status_code
    return status

def  callback(task):
    print('Status:',task.result())

coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback) print('Task:',task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)

 运行结果:

Task: <Task pending coro=<request() running at demo.py:5> cb=[callback() at demo.py:11]>

Status: <Response [200]>

Task: <Task finished coro=<request() done, defined at demo.py:5> result=<Response [200]>>

在这里咱们定义了一个 request() 方法,请求了百度,返回状态码,可是这个方法里面咱们没有任何 print() 语句。随后咱们定义了一个 callback() 方法,这个方法接收一个参数,是 task 对象,而后调用 print() 方法打印了 task 对象的结果。这样咱们就定义好了一个 coroutine 对象和一个回调方法,咱们如今但愿的效果是,当 coroutine 对象执行完毕以后,就去执行声明的 callback() 方法。

那么它们两者怎样关联起来呢?很简单,只须要调用 add_done_callback() 方法便可,咱们将 callback() 方法传递给了封装好的 task 对象,这样当 task 执行完毕以后就能够调用 callback() 方法了,同时 task 对象还会做为参数传递给 callback() 方法,调用 task 对象的 result() 方法就能够获取返回结果了。

实际上不用回调方法,直接在 task 运行完毕以后也能够直接调用 result() 方法获取结果,运行结果是同样的。以下所示:

import asyncio
import requests

async def request():
    url='https://www.baidu.com'
    status=requests.get(url).status_code
    return status

coroutine=request()
task=asyncio.ensure_future(coroutine)
print('Task:',task)

loop=asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)
print('Task Result:',task.result())

 

3.多任务协程

  上面的例子咱们只执行了一次请求,若是咱们想执行屡次请求应该怎么办呢?咱们能够定义一个 task 列表,而后使用 asyncio 的 wait() 方法便可执行。

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url).status_code
    return status

tasks = [asyncio.ensure_future(request()) for _ in range(5)] print('Tasks:',tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
    print('Task Result:',task.result())

运行结果:

Tasks: [<Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>]

Task Result: <Response [200]>

Task Result: <Response [200]>

Task Result: <Response [200]>

Task Result: <Response [200]>

Task Result: <Response [200]>

这里咱们使用一个 for 循环建立了五个 task,组成了一个列表,而后把这个列表首先传递给了 asyncio wait() 方法,而后再将其注册到时间循环中,就能够发起五个任务了。

 

4.协程实现

  上面的案例只是为后面的使用做铺垫,接下来咱们正式来看下协程在解决 IO 密集型任务上有怎样的优点吧!

  为了表现出协程的优点,咱们须要先建立一个合适的实验环境,最好的方法就是模拟一个须要等待必定时间才能够获取返回结果的网页,上面的代码中使用了百度,但百度的响应太快了,并且响应速度也会受本机网速影响,因此最好的方式是本身在本地模拟一个慢速服务器,这里咱们选用 Flask

服务器代码:
from flask import Flask
import time
 
app = Flask(__name__)
 
@app.route('/')
def index():
    time.sleep(3)
return 'Hello!'

if __name__ == '__main__':
    app.run(threaded=True)                #这代表 Flask 启动了多线程模式,否则默认是只有一个线程的。

接下来咱们再从新使用上面的方法请求一遍:

import asyncio
import requests
import time
 
start = time.time()
 
async def request():
    url 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = requests.get(url)
    print('Get response from', url, 'Result:', response.text)
 
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
 
end = time.time()
print('Cost time:', end - start)
运行结果以下:
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.049368143081665

在这里咱们仍是建立了五个 task,而后将 task 列表传给 wait() 方法并注册到时间循环中执行。

其实,要实现异步处理,咱们得先要有挂起的操做,当一个任务须要等待 IO 结果的时候,能够挂起当前任务,转而去执行其余任务,这样咱们才能充分利用好资源,上面方法都是一本正经的串行走下来,连个挂起都没有,怎么可能实现异步?

要实现异步,接下来咱们再了解一下 await 的用法,使用 await 能够将耗时等待的操做挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其余的协程挂起或执行完毕。

因此,咱们可能会将代码中的 request() 方法改为以下的样子:

async def request():
    url 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = await requests.get(url)
print('Get response from', url, 'Result:', response.text)

仅仅是在 requests 前面加了一个 await,然而执行如下代码,会获得以下报错:

Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Cost time: 15.048935890197754 Task exception was never retrieved future: <Task finished coro=<request() done, defined at demo.py:7> exception=TypeError("object Response can't be used in 'await' expression",)> Traceback (most recent call last):   File "demo.py", line 10, in request     status = await requests.get(url) TypeError: object Response can't be used in 'await' expression

此次它遇到 await 方法确实挂起了,也等待了,可是最后却报了这么个错,这个错误的意思是 requests 返回的 Response 对象不能和 await 一块儿使用,为何呢?由于根据官方文档说明,await 后面的对象必须是以下格式之一:

  • A native coroutine object returned from a native coroutine function,一个原生 coroutine 对象。
  • A generator-based coroutine object returned from a function decorated with types.coroutine(),一个由 types.coroutine() 修饰的生成器,这个生成器能够返回 coroutine 对象。
  • An object with an await__ method returning an iterator,一个包含 __await 方法的对象返回的一个迭代器。

reqeusts 返回的 Response 不符合上面任一条件,所以就会报上面的错误了。既然 await 后面能够跟一个 coroutine 对象,那么我将请求页面的方法独立出来,并用 async 修饰,这样就获得了一个 coroutine 对象

import asyncio
import requests
import time
 
start = time.time()
 
async def get(url):
    return requests.get(url)
 
async def request():
    url 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = await get(url)
    print('Get response from', url, 'Result:', response.text)
 
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
 
end = time.time()
print('Cost time:', end - start)

这里咱们,咱们运行一下看看:

Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.134317874908447

仍是不行,它还不是异步执行,也就是说咱们仅仅将涉及 IO 操做的代码封装到 async 修饰的方法里面是不可行的!咱们必需要使用支持异步操做的请求方式才能够实现真正的异步,因此这里就须要 aiohttp 派上用场了。(因为requests 模块不支持异步,因此用aiohttp 模块)

 

5.使用 aiohttp 

-环境安装:pip install aiohttp

咱们将 aiohttp 用上来将请求库由 requests 改为了 aiohttp,经过 aiohttp ClientSession 类的 get() 方法进行请求

import asyncio
import aiohttp
import time

start= time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result

async def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for',url)
    result = await get(url)
    print('Get response from',url,'Result:',result)

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

结果以下:咱们发现此次请求的耗时由 15 秒变成了 3 秒,耗时直接变成了原来的 1/5

Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Cost time: 3.0199508666992188

代码里面咱们使用了 await,后面跟了 get() 方法,在执行这五个协程的时候,若是遇到了 await,那么就会将当前协程挂起,转而去执行其余的协程,直到其余的协程也挂起或执行完毕,再进行下一个协程的执行。充分利用 CPU 时间,而没必要把时间浪费在等待 IO 上

开始运行时,时间循环会运行第一个 task,针对第一个 task 来讲,当执行到第一个 await 跟着的 get() 方法时,它被挂起,但这个 get() 方法第一步的执行是非阻塞的,挂起以后立马被唤醒,因此当即又进入执行,建立了 ClientSession 对象,接着遇到了第二个 await,调用了 session.get() 请求方法,而后就被挂起了,因为请求须要耗时好久,因此一直没有被唤醒,好第一个 task 被挂起了,那接下来该怎么办呢?事件循环会寻找当前未被挂起的协程继续执行,因而就转而执行第二个 task 了,也是同样的流程操做,直到执行了第五个 task 的 session.get() 方法以后,所有的 task 都被挂起了。全部 task 都已经处于挂起状态,那咋办?只好等待了。3 秒以后,几个请求几乎同时都有了响应,而后几个 task 也被唤醒接着执行,输出请求结果,最后耗时,3 秒!

在上面的例子中,在发出网络请求后,既然接下来的 3 秒都是在等待的,在 3 秒以内,CPU 能够处理的 task 数量远不止这些,那么岂不是咱们放 不少 个 task 一块儿执行,最后获得全部结果的耗时不都是 3 秒左右吗?由于这几个任务被挂起后都是一块儿等待的。理论来讲确实是这样的,不过有个前提,那就是服务器在同一时刻接受无限次请求都能保证正常返回结果,也就是服务器无限抗压,另外还要忽略 IO 传输时延,确实能够作到无限 task 一块儿执行且在预想时间内获得结果。

咱们这里将 task 数量设置成 100,再试一下:

tasks = [asyncio.ensure_future(request()) for _ in range(100)]
耗时结果以下:
Cost time: 3.106252670288086

最后运行时间也是在 3 秒左右,固然多出来的时间就是 IO 时延了。可见,使用了异步协程以后,咱们几乎能够在相同的时间内实现成百上千倍次的网络请求,把这个运用在爬虫中,速度提高可谓是很是可观了。

 

6. 与单进程、多进程对比

单进程

import requests
import time

start = time.time()


def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    result = requests.get(url).text
    print('Get response from', url, 'Result:', result)


for _ in range(100):
    request()

end = time.time()
print('Cost time:', end - start)
最后耗时:
Cost time: 305.16639709472656

多进程
import requests
import time
import multiprocessing

start = time.time()


def request(_):
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    result = requests.get(url).text
    print('Get response from', url, 'Result:', result)


cpu_count = multiprocessing.cpu_count()
print('Cpu count:', cpu_count)
pool = multiprocessing.Pool(cpu_count)
pool.map(request, range(100))

end = time.time()
print('Cost time:', end - start)

这里我使用了multiprocessing 里面的 Pool 类,即进程池。个人电脑的 CPU 个数是 8 个,这里的进程池的大小就是 8。

耗时:
Cost time: 48.17306900024414

 

7.与多进程结合

在最新的 PyCon 2018 上,来自 Facebook John Reese 介绍了 asyncio multiprocessing 各自的特色,并开发了一个新的库,叫作 aiomultiprocess须要 Python 3.6 及更高版本才可以使用。

安装:pip install aiomultiprocess

使用这个库,咱们能够将上面的例子改写以下:

import asyncio
import aiohttp
import time
from aiomultiprocess import Pool
 
start = time.time()
 
async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result
 
async def request():
    url = 'http://127.0.0.1:5000'
    urls = [url for _ in range(100)]
    async with Pool() as pool:
        result = await pool.map(get, urls) return result
 
coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
 
end = time.time()
print('Cost time:', end - start)

这样就会同时使用多进程和异步协程进行请求,固然最后的结果其实和异步是差很少的:

Cost time: 3.1156570434570312

由于个人测试接口的缘由,最快的响应也是 3 秒,因此这部分多余的时间基本都是 IO 传输时延。但在真实状况下,咱们在作爬取的时候遇到的状况变幻无穷,一方面咱们使用异步协程来防止阻塞,另外一方面咱们使用 multiprocessing 来利用多核成倍加速,节省时间其实仍是很是可观的。

 

更多案例

import aiohttp
import asyncio
from lxml import etree

all_titles = []

headers = {
    'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'

}
async def request(url):
    async with aiohttp.ClientSession() as s:
        async with await s.get(url,headers=headers) as response:
            page_text = await response.text()
            return page_text


def parse(task):
    page_text = task.result()
    page_text = page_text.encode('gb2312').decode('gbk')
    tree = etree.HTML(page_text)
    tr_list = tree.xpath('//*[@id="morelist"]/div/table[2]//tr/td/table//tr')
    for tr in tr_list:
        title = tr.xpath('./td[2]/a[2]/text()')[0]
        print(title)
        all_titles.append(title)

urls = []
url = 'http://wz.sun0769.com/index.php/question/questionType?type=4&page=%d'
for page in range(100):
    u_page = page * 30
    new_url = format(url%u_page)
    urls.append(new_url)

tasks = []
for url in urls:
    c = request(url)
    task = asyncio.ensure_future(c)
    task.add_done_callback(parse)
    tasks.append(task)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

 

 

参考连接: https://blog.csdn.net/zhusongziye/article/details/81637088

相关文章
相关标签/搜索