from multiprocessing.dummy import Pool
from multiprocessing.dummy import Pool pool = Pool(3) # 实例化线程池对象,3是线程池的最大线程数 # 参数1:回调函数(只是函数名,不加括号);参数2:列表 # 参数1会接收参数2列表中的某一个元素,回调函数能够对该列表元素进行某种操做 pool.map(callback,list)
server.py
from flask import Flask, render_template import time app = Flask(__name__) @app.route('/xx') def index_1(): time.sleep(2) return render_template('test.html') @app.route('/yy') def index_2(): time.sleep(2) return render_template('test.html') @app.route('/oo') def index_3(): time.sleep(2) return render_template('test.html') if __name__ == '__main__': app.run(debug=True)
templates
文件夹,在该文件夹下建立一个HTML文件,我写的是test.html
,随便写点数据<html lang="en"> <head> <meta charset="UTF-8"/> <title>测试</title> </head> <body> <div> <p>百里守约</p> </div> <div class="song"> <p>李清照</p> <p>王安石</p> <p>苏轼</p> <p>柳宗元</p> <a href="http://www.song.com/" title="赵匡胤" target="_self"> <span>this is span</span> 宋朝是最强大的王朝,不是军队的强大,而是经济很强大,国民都颇有钱</a> <a href="" class="du">总为浮云能蔽日,长安不见令人愁</a> <img src="http://www.baidu.com/meinv.jpg" alt=""/> </div> <div class="tang"> <ul> <li><a href="http://www.baidu.com" title="qing">清明时节雨纷纷,路上行人欲断魂,借问酒家何处有,牧童遥指杏花村</a></li> <li><a href="http://www.163.com" title="qin">秦时明月汉时关,万里长征人未还,但使龙城飞将在,不教胡马度阴山</a></li> <li><a href="http://www.126.com" id="qi">岐王宅里寻常见,崔九堂前几度闻,正是江南好风景,落花时节又逢君</a></li> <li><a href="http://www.sina.com" class="du">杜甫</a></li> <li><a href="http://www.dudu.com" class="du">杜牧</a></li> <li><b>杜小月</b></li> <li><i>度蜜月</i></li> <li><a href="http://www.haha.com" id="feng">凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘</a></li> </ul> </div> </body> </html>
import requests from bs4 import BeautifulSoup import time # 线程池模块 from multiprocessing.dummy import Pool urls = [ 'http://127.0.0.1:5000/xx', 'http://127.0.0.1:5000/yy', 'http://127.0.0.1:5000/oo', ] # 数据的爬取,返回爬取到的页面源码数据 def get_request(url): page_text = requests.get(url=url).text return page_text # 数据的解析,返回标签的文本 def parse(page_text): soup = BeautifulSoup(page_text, 'lxml') return soup.select('#feng')[0].text # 同步代码 if __name__ == '__main__': start = time.time() for url in urls: page_text = get_request(url) text_data = parse(page_text) print(text_data) print(time.time() - start) """ 执行结果: 凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘 凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘 凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘 6.056272029876709 """ # 异步代码 if __name__ == '__main__': start = time.time() pool = Pool(3) # 实例化线程池对象 # 参数1:回调函数(只是函数名,不加括号);参数2:列表 # 参数1会接收参数2列表中的某一个元素,回调函数能够对该列表元素进行某种操做 page_text_list = pool.map(get_request,urls) text_data = pool.map(parse,page_text_list) for i in text_data: print(i) print(time.time() - start) """ 执行结果: 凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘 凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘 凤凰台上凤凰游,凤去台空江自流,吴宫花草埋幽径,晋代衣冠成古丘 2.0537397861480713 不用for循环速度能提高0.01秒左右 """
综上所述:异步代码执行效率显著提升html
import requests from lxml import etree from multiprocessing.dummy import Pool import re import os headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36' } # 梨视频财富板块的地址 main_url = 'https://www.pearvideo.com/category_3' # 解析出该板块下视频详情页的src main_page_text = requests.get(url=main_url, headers=headers).text tree = etree.HTML(main_page_text) li_list = tree.xpath('//*[@id="listvideoListUl"]/li') # 线程池 video_urls = [] for li in li_list: # 视频详情页的具体地址和视频标题 detail_url = "https://www.pearvideo.com/" + li.xpath('./div/a/@href')[0] name = li.xpath('./div/a/div[2]/text()')[0] # 对详情页发起请求 page_text = requests.get(url=detail_url, headers=headers).text # 视频详情页的video是js代码动态生成的,使用正则解析 ex = 'srcUrl="(.*?)",vdoUrl=' video_url = re.findall(ex, page_text, re.S)[0] # 返回的是列表类型 dic = { 'url': video_url, 'name': name, } video_urls.append(dic) # 回调函数 def get_video(url): # 对视频地址发请求,将二进制文件持久化存储 video_data = requests.get(url=url['url'], headers=headers).content file_name = "./video/" + url['name'] + ".mp4" with open(file_name, 'wb') as f: f.write(video_data) print(url['name'], "下载完毕!") # 建立存储视频的文件夹 dir_name = 'video' if not os.path.exists(dir_name): os.mkdir(dir_name) # 实例化线程池 pool = Pool(4) pool.map(get_video, video_urls)
asyncio
(重点)协程就是一个对象。当特殊函数被调用后,该函数就会返回一个协程对象。python
协程对象 == 特殊函数flask
import asyncio from time import sleep async def get_request(url): print('正在请求:', url) sleep(2) print('请求成功:', url) return '666' # 返回一个协程对象 g = get_request("https://www,qq.com")
就是对协程对象的进一步封装(就是一个高级的协程对象)网络
任务对象 == 协程对象 == 特殊函数(表示某个固定形式的任务)架构
asyncio.ensure_future(协程对象) task = asyncio.ensure_future(g) # g:协程对象
绑定回调:app
# 定义一个task的回调函数 def callback(task): task.result() # 表示的是当前任务对象对应的特殊函数的返回值 print("I'm callback:", task) task.add_done_callback(funcName) # task:任务对象 # funcName:回调函数的名称
funcName
这个回调函数必需要带一个参数,这个参数表示的就是当前的任务对象
参数.result()
:表示的就是当前任务对象对应的特殊函数的返回值建立事件循环对象异步
须要将任务对象注册到该事件循环对象中async
# 建立事件循环对象 loop = asyncio.get_event_loop() # 将任务对象注册/装载到事件循环对象中,而后须要启动循环对象 loop.run_until_complete(task) # 用于装载且启动事件循环 # task:任务对象
await
:当阻塞操做结束后让loop回头执行阻塞以后的代码。ide
asyncio.wait()
:将当前的任务对象交出cpu的使用权。函数
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) asyncio.wait # 挂起操做 tasks # 任务对象列表
aiohttp
(重点)requests
:不支持异步,不能够出如今特殊函数内部。
aiohttp
:支持异步的网络请求模块,和asyncio
一块儿使用
pip install aiohttp
代码的编写
import asyncio import aiohttp # 基于aiohttp实现异步的网络请求 async def get_requests(url): # 实例化了一个请求对象 with aiohttp.ClientSession() as aio: # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response: with aio.get(url=url) as response: # text() 获取字符串形式的响应数据 # read() 获取bytes类型的响应数据 page_text = await response.text() return page_text
with
前加上async
关键字await
关键字完整代码
import asyncio import aiohttp # 基于aiohttp实现异步的网络请求 async def get_requests(url): # 实例化了一个请求对象 async with aiohttp.ClientSession() as aio: # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response: async with await aio.get(url=url) as response: # text() 获取字符串形式的响应数据 # read() 获取bytes类型的响应数据 page_text = await response.text() return page_text
import asyncio from time import sleep async def get_request(url): print('正在请求:', url) sleep(2) print('请求成功:', url) return '666' # 定义一个task的回调函数 def callback(task): print("I'm callback:", task) # 返回一个协程对象 g = get_request("https://www,qq.com") # 建立一个任务对象 task = asyncio.ensure_future(g) """ # 给任务对象绑定回调函数 task.add_done_callback(callback) # 建立事件循环对象 loop = asyncio.get_event_loop() # 将任务对象注册/装载到事件循环对象中,而后须要启动循环对象 loop.run_until_complete(task) # 用于装载且启动事件循环 """ 执行结果: 正在请求: www,qq.com 正在请求: www,qq.com """
import asyncio import time start = time.time() async def get_request(url): print('正在请求:', url) # await 当阻塞操做结束后让loop回头执行阻塞以后的代码 await asyncio.sleep(2) print('请求成功:', url) return '666' urls = [ 'http://127.0.0.1:5000/xx', 'http://127.0.0.1:5000/yy', 'http://127.0.0.1:5000/oo', ] tasks = [] for url in urls: c = get_request(url) task = asyncio.ensure_future(c) tasks.append(task) loop = asyncio.get_event_loop() # 将任务列表注册到事件循环的时候必定要将任务列表进行挂起操做 # asyncio.wait() 挂起操做,将当前的任务对象交出cpu的使用权 loop.run_until_complete(asyncio.wait(tasks)) print('总耗时:', time.time() - start)
测试:同步&异步效率
,按照上述步骤启动项目;而后运行下方代码。import asyncio import time import aiohttp from lxml import etree headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36' } urls = [ 'http://127.0.0.1:5000/xx', 'http://127.0.0.1:5000/yy', 'http://127.0.0.1:5000/oo', ] start = time.time() """ # 发起请求,获取响应数据(不能够实现异步) async def get_requests(url): # requests是不支持异步的模块 page_text = requests.get(url).text return page_text """ async def get_requests(url): """ 基于aiohttp实现异步的网络请求 :param url: :return: """ # 实例化了一个请求对象 async with aiohttp.ClientSession() as aio: # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response: async with await aio.get(url=url) as response: # text() 获取字符串形式的响应数据 # read() 获取bytes类型的响应数据 page_text = await response.text() return page_text def parse(task): """ 定义回调函数 :param task: :return: """ page_text = task.result() # 获取特殊函数的返回值(请求到的页面源码数据) tree = etree.HTML(page_text) content = tree.xpath('//*[@id="feng"]/text()')[0] print(content) tasks = [] for url in urls: c = get_requests(url) task = asyncio.ensure_future(c) task.add_done_callback(parse) tasks.append(task) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print('总耗时:', time.time() - start)
案例:基于线程池爬取梨视频
import asyncio import time import aiohttp from lxml import etree import re import os import requests # time模块是为了测试爬取视频的耗时 start = time.time() headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36' } # 梨视频财富板块的地址 main_url = 'https://www.pearvideo.com/category_3' main_page_text = requests.get(url=main_url, headers=headers).text tree = etree.HTML(main_page_text) li_list = tree.xpath('//*[@id="listvideoListUl"]/li') urls = [] # [{'url': video_url,'name': name},{}...] for li in li_list: detail_url = "https://www.pearvideo.com/" + li.xpath('./div/a/@href')[0] name = li.xpath('./div/a/div[2]/text()')[0] page_text = requests.get(url=detail_url, headers=headers).text # 视频详情页的video是js代码动态生成的 ex = 'srcUrl="(.*?)",vdoUrl=' video_url = re.findall(ex, page_text, re.S)[0] # 返回的是列表类型 dic = { 'url': video_url, 'name': name, } urls.append(dic) # 基于aiohttp实现异步的网络请求 async def get_requests(url): # 实例化了一个请求对象 async with aiohttp.ClientSession() as aio: # with aio.get/post(url=url,headers=headers,data/params,proxy='http://ip:prot') as response: async with await aio.get(url=url['url'], headers=headers) as response: # text() 获取字符串形式的响应数据 # read() 获取bytes类型的响应数据 page_read = await response.read() dic = { "page_read": page_read, "name": url['name'] } return dic def parse(task): """ 定义回调函数 :param task: :return: """ dic_info = task.result() # 获取特殊函数的返回值(请求到的页面源码数据) file_name = "./video/" + dic_info["name"] + ".mp4" with open(file_name, 'wb') as f: f.write(dic_info['page_read']) print(dic_info["name"], "下载完毕!") tasks = [] for url in urls: c = get_requests(url) task = asyncio.ensure_future(c) task.add_done_callback(parse) tasks.append(task) dir_name = 'video' if not os.path.exists(dir_name): os.mkdir(dir_name) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print('总耗时:', time.time() - start)