在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引发等待,从而使得请求总体变慢。html
1. 单线程串行python
# -*- coding: UTF-8 -*- import requests, time def be_async(url): print(url) res = requests.get(url) print(res) url_list = ["https://www.baidu.com", "http://github.com/"] a = time.time() # 起始时间 for url in url_list: be_async(url) b = time.time() # 结束时间 print("cost time: %s s"%(b-a))
2. 多线程react
# -*- coding: UTF-8 -*- from concurrent.futures import ThreadPoolExecutor import requests import time def be_async(url): print(url) res = requests.get(url) print(res) url_list = ["https://www.baidu.com", "http://github.com/"] pool = ThreadPoolExecutor(4) a = time.time() # 起始时间 for url in url_list: pool.submit(be_async, url) pool.shutdown(wait=True) b = time.time() # 结束时间 print("cost time: %s s"%(b-a))
注:python2里没有线程池,只有python3里面有 git
+回调函数github
# -*- coding: UTF-8 -*- import requests, time from concurrent.futures import ThreadPoolExecutor def be_async(url): print(url) res = requests.get(url) print(res) url_list = ["https://www.baidu.com", "http://github.com/"] def callback(future): print(future.result) pool = ThreadPoolExecutor(4) a = time.time() # 起始时间 for url in url_list: r = pool.submit(be_async, url) r.add_done_callback(callback) pool.shutdown(wait=True) b = time.time() # 结束时间 print("cost time: %s s"%(b-a))
3. 多进程web
# -*- coding: UTF-8 -*- from concurrent.futures import ProcessPoolExecutor import requests import time def be_async(url): print(url) res = requests.get(url) print(res) url_list = ["https://www.baidu.com", "http://github.com/"] if __name__ == '__main__': pool = ProcessPoolExecutor(4) a = time.time() # 起始时间 for url in url_list: pool.submit(be_async, url) pool.shutdown(wait=True) b = time.time() # 结束时间 print("cost time: %s s"%(b-a))
+回调函数json
# -*- coding: UTF-8 -*- import requests, time from concurrent.futures import ProcessPoolExecutor def be_async(url): print(url) res = requests.get(url) print(res) url_list = ["https://www.baidu.com", "http://github.com/"] def callback(future): print(future.result) if __name__ == '__main__': pool = ProcessPoolExecutor(4) a = time.time() # 起始时间 for url in url_list: r = pool.submit(be_async, url) r.add_done_callback(callback) pool.shutdown(wait=True) b = time.time() # 结束时间 print("cost time: %s s"%(b-a))
经过上述代码都可以完成对请求性能的提升,对于多线程和多进行的缺点是在IO阻塞时会形成了线程和进程的浪费,因此异步会是首选:cookie
1. asyncio 示例,该模块只能发送tcp协议的请求,python3.3以后才有多线程
原理:app
代码:
# -*- coding: UTF-8 -*- import asyncio @asyncio.coroutine def be_async(host, url = "/"): print(host, url) reader,writer = yield from asyncio.open_connection(host, 80) req_header = '''GET %s HTTP/1.0\r\nHost: %s\r\n\r\n'''%(url, host) req_header = req_header.encode(encoding="utf-8") writer.write(req_header) yield from writer.drain() text = yield from reader.read() print(host, url, text) writer.close() tasks = [ be_async("www.cnblogs.com", url="/gaosy-math"), be_async("github.com", url="/gaoshao52") ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
知识点:伪造http请求
GET: GET <url地址> HTTP/1.0\r\nHost: <ip或主机名>\r\n\r\n
POST: POST <url地址> HTTP/1.0\r\nHost: <ip或主机名>\r\n\r\n<请求体> 请求头的数据是用\r\n隔开的,请求头与请求体是用\r\n\r\n隔开的
格式: 请求方式 请求地址 协议 请求头数据(用\r\n隔开) 请求体(k1=v1&k2=v2 格式)
例:基于socket实现http请求
# -*- coding: UTF-8 -*- import socket client = socket.socket() # 链接 client.connect(("www.51cto.com", 80)) # 发送请求 client.sendall(b'GET / HTTP/1.0\r\nHost: www.51cto.com\r\n\r\n') # 接受 res = client.recv(81960) print(res.decode("gbk")) client.close()
2. asyncio+aiohttp 示例
原理: 与上面的一致
代码:
# -*- coding: UTF-8 -*- import asyncio import aiohttp @asyncio.coroutine def be_async(url): print(url) res = yield from aiohttp.request('GET', url) data = yield from res.read() print(url, data) res.close() tasks = [ be_async("https://github.com/gaoshao52"), be_async("https://www.cnblogs.com/gaosy-math") ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
注:python3运行有问题
3. asyncio+requests 示例
原理:一致
代码:
# -*- coding: UTF-8 -*- import asyncio import requests @asyncio.coroutine def be_async(func, *args): loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args) res = yield from future print(res.url, res.text) tasks = [ be_async(requests.get, "https://github.com/gaoshao52"), be_async(requests.get, "https://www.cnblogs.com/gaosy-math") ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
4. gevent+requests 示例
原理:一致
代码:
# -*- coding: UTF-8 -*- import gevent from gevent import monkey; monkey.patch_all() import requests def be_async(method, url ,req_kwargs): print(method, url, req_kwargs) res = requests.request(method=method, url=url, **req_kwargs) print(res.url, res.status_code) gevent.joinall([ gevent.spawn(be_async, method='GET', url="https://github.com/gaoshao52", req_kwargs={}), gevent.spawn(be_async, method='GET', url="https://www.cnblogs.com/gaosy-math", req_kwargs={}) ]) # ##### 发送请求(协程池控制最大协程数量) ##### # from gevent.pool import Pool # pool = Pool(None) # gevent.joinall([ # pool.spawn(be_async, method='GET', url="https://github.com/gaoshao52", req_kwargs={}), # pool.spawn(be_async, method='GET', url="https://www.cnblogs.com/gaosy-math", req_kwargs={}) # ])
monkey.patch_all() 会把原来的socket 设置成非阻塞
5. grequests 示例
原理:gevent+requests的封装
代码:
# -*- coding: UTF-8 -*- import grequests request_list = [ grequests.get("http://www.runoob.com/python/python-json.html"), grequests.get("https://www.git-scm.com/download/win") ] ##### 执行并获取响应列表 ##### # response_list = grequests.map(request_list) # print(response_list) # ##### 执行并获取响应列表(处理异常) ##### def exception_handler(request, exception): print(request,exception) print("Request failed") response_list = grequests.map(request_list, exception_handler=exception_handler) print(response_list)
6. twisted 示例
原理:
代码:
# -*- coding: UTF-8 -*- from twisted.web.client import getPage, defer from twisted.internet import reactor def all_done(arg): reactor.stop() def callback(contents): print(contents.decode("utf-8")) deferred_list = [] url_list = ['http://www.bing.com', 'http://www.baidu.com', ] for url in url_list: deferred = getPage(url.encode(encoding="utf-8")) deferred.addCallback(callback) deferred_list.append(deferred) dlist = defer.DeferredList(deferred_list) dlist.addBoth(all_done) reactor.run()
# -*- coding: UTF-8 -*- from twisted.internet import reactor from twisted.web.client import getPage import urllib.parse def one_done(arg): # arg 是二进制 print(arg.decode("utf-8")) reactor.stop() post_data = urllib.parse.urlencode({'check_data': 'adf'}) post_data = post_data.encode(encoding="utf-8") headers = {b'Content-Type': b'application/x-www-form-urlencoded'} response = getPage('http://dig.chouti.com/login'.encode(encoding="utf-8"), method='POST'.encode(encoding="utf-8"), postdata=post_data, cookies={}, headers=headers) response.addBoth(one_done) reactor.run()
Twisted模块安装:
1. https://www.lfd.uci.edu/~gohlke/pythonlibs/ 下载
2. pip install Twisted-18.7.0-cp36-cp36m-win_amd64.whl
7.tornado 示例
原理:与twisted一致
代码:
# -*- coding: UTF-8 -*- from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import HTTPRequest from tornado import ioloop def handle_response(response): """ 处理返回值内容(须要维护计数器,来中止IO循环),调用 ioloop.IOLoop.current().stop() :param response: :return: """ if response.error: print("Error:", response.error) else: print(response.body) def func(): url_list = [ 'http://www.baidu.com', 'http://www.bing.com', ] for url in url_list: print(url) http_client = AsyncHTTPClient() http_client.fetch(HTTPRequest(url), handle_response) ioloop.IOLoop.current().add_callback(func) ioloop.IOLoop.current().start()
8. 自定义一个异步非阻塞模块,与twisted tornado原理一致
# -*- coding: UTF-8 -*- import socket, select class Request(object): def __init__(self, sock, info): self.sock = sock self.info = info def fileno(self): return self.sock.fileno() class MyName(object): def __init__(self): self.sock_list = [] self.conns = [] def add_request(self, req_info): ''' 建立socket链接 :param req_info: :return: ''' client = socket.socket() client.setblocking(False) try: client.connect((req_info.get("host"), req_info.get("port"))) except BlockingIOError as e: pass obj = Request(client, req_info) self.sock_list.append(obj) self.conns.append(obj) def run(self): ''' 开始事件循环,检测链接成功?是否有数据返回? :return: ''' while True: # select.select([sock对象]) # 能够是任何对象,对象必定要有fileno方法 # select 内部会执行 对象.fileno() 方法 # select.select([Request对象]) r, w, e = select.select(self.sock_list, self.conns, [], 0.05) # 每隔0.05秒检测是否有变化 # w, 是否链接成功 for obj in w: # 检查obj:request对象 data = "GET {} HTTP/1.0\r\nHost: {}\r\n\r\n".format(obj.info['path'], obj.info['host']) obj.sock.send(data.encode("utf-8")) self.conns.remove(obj) # # 数据返回,接受 for obj in r: res = obj.sock.recv(8096) print(obj.info['host']) obj.info['callback'](res) self.sock_list.remove(obj) # 全部的请求已经返回 if not self.sock_list: break def done01(res): print(res) def done02(res): print(res) url_list = [ {"host": "www.baidu.com", "port": 80 , "path": "/", "callback": done01}, {"host": "www.51cto.com", "port": 80, "path": "/", "callback": done02}, {"host": "www.cnblogs.com", "port": 80, "path": "/alex3714/articles/5885096.html", "callback": done01}, ] my_name = MyName() for item in url_list: my_name.add_request(item) my_name.run()
本文参考这里