爬虫是 IO 密集型任务,好比咱们使用 requests 库来爬取某个站点的话,发出一个请求以后,程序必需要等待网站返回响应以后才能接着运行,而在等待响应的过程当中,整个爬虫程序是一直在等待的,实际上没有作任何的事情。html
所以,有必要提升程序的运行效率,异步就是其中有效的一种方法。python
今天咱们一块儿来学习下异步爬虫的相关内容。编程
阻塞状态指程序未获得所需计算资源时被挂起的状态。程序在等待某个操做完成期间,自身没法继续处理其余的事情,则称该程序在该操做上是阻塞的。常见的阻塞形式有:网络 I/O 阻塞、磁盘 I/O 阻塞、用户输入阻塞等。阻塞是无处不在的,包括 CPU 切换上下文时,全部的进程都没法真正处理事情,它们也会被阻塞。若是是多核 CPU 则正在执行上下文切换操做的核不可被利用。api
程序在等待某操做过程当中,自身不被阻塞,能够继续处理其余的事情,则称该程序在该操做上是非阻塞的。非阻塞并非在任何程序级别、任何状况下均可以存在的。仅当程序封装的级别能够囊括独立的子程序单元时,它才可能存在非阻塞状态。非阻塞的存在是由于阻塞存在,正由于某个操做阻塞致使的耗时与效率低下,咱们才要把它变成非阻塞的。服务器
不一样程序单元为了完成某个任务,在执行过程当中需靠某种通讯方式以协调一致,咱们称这些程序单元是同步执行的。例如购物系统中更新商品库存,须要用“行锁”做为通讯信号,让不一样的更新请求强制排队顺序执行,那更新库存的操做是同步的。简言之,同步意味着有序。markdown
为完成某个任务,不一样程序单元之间过程当中无需通讯协调,也能完成任务的方式,不相关的程序单元之间能够是异步的。例如,爬虫下载网页。调度程序调用下载程序后,便可调度其余任务,而无需与该下载任务保持通讯以协调行为。不一样网页的下载、保存等操做都是无关的,也无需相互通知协调。这些异步操做的完成时刻并不肯定。简言之,异步意味着无序。网络
多进程就是利用 CPU 的多核优点,在同一时间并行地执行多个任务,能够大大提升执行效率。session
协程,英文叫做 Coroutine,又称微线程、纤程,协程是一种用户态的轻量级线程。协程拥有本身的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其余地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。所以协程能保留上一次调用时的状态,即全部局部状态的一个特定组合,每次过程重入时,就至关于进入上一次调用的状态。协程本质上是个单进程,协程相对于多进程来讲,无需线程上下文切换的开销,无需原子操做锁定及同步的开销,编程模型也很是简单。咱们可使用协程来实现异步操做,好比在网络爬虫场景下,咱们发出一个请求以后,须要等待必定的时间才能获得响应,但其实在这个等待过程当中,程序能够干许多其余的事情,等到响应获得以后才切换回来继续处理,这样能够充分利用 CPU 和其余资源,这就是协程的优点。多线程
从 Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程仍是以生成器对象为基础的,在 Python 3.5 则增长了 async/await,使得协程的实现更加方便。并发
asyncio
Python 中使用协程最经常使用的库莫过于 asyncio
async/await 关键字,是从 Python 3.5 才出现的,专门用于定义协程。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。
定义协程
定义一个协程,感觉它和普通进程在实现上的不一样之处,代码以下:
import asyncio
async def execute(x):
print('Number:', x)
coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')
复制代码
运行结果以下:
Coroutine: <coroutine object execute at 0x0000027808F5BE48>
After calling execute
Number: 666
After calling loop
Process finished with exit code 0
复制代码
首先导入 asyncio 这个包,这样才可使用 async 和 await,而后使用 async 定义了一个 execute 方法,方法接收一个数字参数,方法执行以后会打印这个数字。
随后咱们直接调用了这个方法,然而这个方法并无执行,而是返回了一个 coroutine 协程对象。随后咱们使用 get_event_loop 方法建立了一个事件循环 loop,并调用了 loop 对象的 run_until_complete 方法将协程注册到事件循环 loop 中,而后启动。最后咱们才看到了 execute 方法打印了输出结果。
可见,async 定义的方法就会变成一个没法直接执行的 coroutine 对象,必须将其注册到事件循环中才能够执行。
前面还提到了 task,它是对 coroutine 对象的进一步封装,它里面相比 coroutine 对象多了运行状态,好比 running、finished 等,咱们能够用这些状态来获取协程对象的执行状况。在上面的例子中,当咱们将 coroutine 对象传递给 run_until_complete 方法的时候,实际上它进行了一个操做就是将 coroutine 封装成了 task 对象。task也能够显式地进行声明,以下所示:
import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
复制代码
运行结果以下
Coroutine: <coroutine object execute at 0x000001CB3F90BE48>
After calling execute
Task: <Task pending coro=<execute() running at D:/python/pycharm2020/program/test_003.py:3>>
Number: 666
Task: <Task finished coro=<execute() done, defined at D:/python/pycharm2020/program/test_003.py:3> result=666>
After calling loop
Process finished with exit code 0
复制代码
这里咱们定义了 loop 对象以后,接着调用了它的 create_task 方法将 coroutine 对象转化为了 task 对象,随后咱们打印输出一下,发现它是 pending 状态。接着咱们将 task 对象添加到事件循环中获得执行,随后咱们再打印输出一下 task 对象,发现它的状态就变成了 finished,同时还能够看到其 result 变成了 666,也就是咱们定义的 execute 方法的返回结果。
定义 task 对象还有一种经常使用方式,就是直接经过 asyncio 的 ensure_future 方法,返回结果也是 task 对象,这样的话咱们就能够不借助于 loop 来定义,即便尚未声明 loop 也能够提早定义好 task 对象,写法以下:
import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
复制代码
运行效果以下:
Coroutine: <coroutine object execute at 0x0000019794EBBE48>
After calling execute
Task: <Task pending coro=<execute() running at D:/python/pycharm2020/program/test_003.py:3>>
Number: 666
Task: <Task finished coro=<execute() done, defined at D:/python/pycharm2020/program/test_003.py:3> result=666>
After calling loop
Process finished with exit code 0
复制代码
发现其运行效果都是同样的
task对象的绑定回调操做
能够为某个 task 绑定一个回调方法,举以下例子:
import asyncio
import requests
async def call_on():
status = requests.get('https://www.baidu.com')
return status
def call_back(task):
print('Status:', task.result())
corountine = call_on()
task = asyncio.ensure_future(corountine)
task.add_done_callback(call_back)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
复制代码
定义了一个call_on 方法,请求了百度,获取其状态码,可是这个方法里面咱们没有任何 print 语句。随后咱们定义了一个 call_back 方法,这个方法接收一个参数,是 task 对象,而后调用 print打印了 task 对象的结果。这样咱们就定义好了一个 coroutine 对象和一个回调方法,
但愿达到的效果是,当 coroutine 对象执行完毕以后,就去执行声明的 callback 方法。实现这样的效果只须要调用 add_done_callback 方法便可,咱们将 callback 方法传递给了封装好的 task 对象,这样当 task 执行完毕以后就能够调用 callback 方法了,同时 task 对象还会做为参数传递给 callback 方法,调用 task 对象的 result 方法就能够获取返回结果了。
运行结果以下:
Task: <Task pending coro=<call_on() running at D:/python/pycharm2020/program/test_003.py:4> cb=[call_back() at D:/python/pycharm2020/program/test_003.py:8]>
Status: <Response [200]>
Task: <Task finished coro=<call_on() done, defined at D:/python/pycharm2020/program/test_003.py:4> result=<Response [200]>>
复制代码
也能够不用回调方法,直接在 task 运行完毕以后也能直接调用 result 方法获取结果,以下所示:
import asyncio
import requests
async def call_on():
status = requests.get('https://www.baidu.com')
return status
def call_back(task):
print('Status:', task.result())
corountine = call_on()
task = asyncio.ensure_future(corountine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task:', task.result())
复制代码
运行效果同样:
Task: <Task pending coro=<call_on() running at D:/python/pycharm2020/program/test_003.py:4>>
Task: <Task finished coro=<call_on() done, defined at D:/python/pycharm2020/program/test_003.py:4> result=<Response [200]>>
Task: <Response [200]>
复制代码
要实现异步处理,得先要有挂起的操做,当一个任务须要等待 IO 结果的时候,能够挂起当前任务,转而去执行其余任务,这样才能充分利用好资源,要实现异步,须要了解一下 await 的用法,使用 await 能够将耗时等待的操做挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其余的协程挂起或执行完毕。
await 后面的对象必须是以下格式之一:
aiohttp 的使用
aiohttp 是一个支持异步请求的库,利用它和 asyncio 配合咱们能够很是方便地实现异步请求操做。下面以访问博客里面的文章,并返回 reponse.text()
为例,实现异步爬虫。
from lxml import etree
import requests
import logging
import time
import aiohttp
import asyncio
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
url = 'https://blog.csdn.net/?spm=1001.2014.3001.4477'
start_time = time.time()
# 先获取博客里的文章连接
def get_urls():
headers = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}
resp = requests.get(url, headers=headers)
html = etree.HTML(resp.text)
url_list = html.xpath("//div[@class='list_con']/div[@class='title']/h2/a/@href")
return url_list
async def request_page(url):
logging.info('scraping %s', url)
async with aiohttp.ClientSession() as session:
response = await session.get(url)
return await response.text()
def main():
url_list = get_urls()
tasks = [asyncio.ensure_future(request_page(url)) for url in url_list]
loop = asyncio.get_event_loop()
tasks = asyncio.gather(*tasks)
loop.run_until_complete(tasks)
if __name__ == '__main__':
main()
end_time = time.time()
logging.info('total time %s seconds', end_time - start_time)
复制代码
实例中将请求库由 requests 改为了 aiohttp,经过 aiohttp 的 ClientSession 类的 get 方法进行请求,运行效果以下:
异步操做的便捷之处在于,当遇到阻塞式操做时,任务被挂起,程序接着去执行其余的任务,而不是傻傻地等待,这样能够充分利用 CPU 时间,而没必要把时间浪费在等待 IO 上。
上面的例子与单线程版和多线程版的比较以下:
多线程版
import requests
import logging
import time
from lxml import etree
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
url = 'https://blog.csdn.net/?spm=1001.2014.3001.4477'
headers = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}
start_time = time.time()
# 先获取博客里的文章连接
def get_urls():
resp = requests.get(url, headers=headers)
html = etree.HTML(resp.text)
url_list = html.xpath("//div[@class='list_con']/div[@class='title']/h2/a/@href")
return url_list
def request_page(url):
logging.info('scraping %s', url)
resp = requests.get(url, headers=headers)
return resp.text
def main():
url_list = get_urls()
with ThreadPoolExecutor(max_workers=6) as executor:
executor.map(request_page, url_list)
if __name__ == '__main__':
main()
end_time = time.time()
logging.info('total time %s seconds', end_time - start_time)
复制代码
运行结果以下:
单线程版:
import requests
import logging
import time
from lxml import etree
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
url = 'https://blog.csdn.net/?spm=1001.2014.3001.4477'
headers = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}
start_time = time.time()
# 先获取博客里的文章连接
def get_urls():
resp = requests.get(url, headers=headers)
html = etree.HTML(resp.text)
url_list = html.xpath("//div[@class='list_con']/div[@class='title']/h2/a/@href")
return url_list
def request_page(url):
logging.info('scraping %s', url)
resp = requests.get(url, headers=headers)
return resp.text
def main():
url_list = get_urls()
for url in url_list:
request_page(url)
if __name__ == '__main__':
main()
end_time = time.time()
复制代码
运行效果以下:
通过测试能够发现,若是能将异步请求灵活运用在爬虫中,在服务器能承受高并发的前提下增长并发数量,爬取效率提高是很是可观的。