假设有3个url须要发请求。html
import requests urls_list = [ 'https://www.cnblogs.com/longyunfeigu/p/9491496.html', 'https://dig.chouti.com/', 'https://dig.chouti.com/r/pic/hot/1' ] for url in urls_list: requests.get(url)
串行确定是最慢的,怎么改进?第一反应:开多个线程。好吧,lowB 第一反应都是这个方法python
import time from concurrent.futures import ThreadPoolExecutor import requests urls_list = [ 'https://www.cnblogs.com/longyunfeigu/p/9491496.html', 'https://dig.chouti.com/', 'https://dig.chouti.com/r/pic/hot/1' ] start_time = time.time() pool = ThreadPoolExecutor(3) def task(url): requests.get(url) print('ending') for i in urls_list: pool.submit(task, i) pool.shutdown() end_time = time.time() print(end_time - start_time)
开多个线程虽然能够实现并发,可是开线程总归是要耗费资源的,那能不能利用一个线程帮忙提升效率呢,这就须要异步非阻塞登场了react
什么是异步非阻塞?异步非阻塞就是 异步+非阻塞。异步就是回调,非阻塞指的是单个任务不等待。
在网络IO中,有两个阶段会出现"浪费时间"的状况,一个阶段是connect的时候,请求发出去等待信息回来通知这个客户端socket就绪能够发http报文信息。
另外一个阶段是recv接受数据的时候(发送数据不须要等待,直接发就行)须要等待,由于服务端需经过网络把数据发送过来。非阻塞就是再也不等待,connect原先须要等待是吧,ok如今我不等了,
recv须要等待是吧,ok我也不等了。不等报错咋办?报错就报错呗,大不了异常捕获就行。若是是非阻塞socket,那么3个url的请求在connect的时候都发出去了,注意,即便报错,请求也是如弦上之箭射出去了。
那么等connect成功后就应该发送数据了呀,这时候就体现出回调了,成功就回来调用一段代码。全部的抽象的话语均可以结合大体的代码来理解。之后若是遇到一些抽象话语很差理解,那么就应该用代码去理解这个抽象话语。
有了异步非阻塞的概念是,那么咱们就能够利用一个线程把全部的链接都发送出去,等链接都发送出去并且任意一个connect的信息都没返回的时候,线程就只好等待了,
等有connect的信息返回表示这个客户端socket准备就绪能够发http报文了,这时候再去执行发送数据的代码。这个过程提及来简单,但有一个问题还不清晰:程序怎么知道哪一个socket是就绪的?这个大体有2种解决方式:要么是通知(叫醒服务,这个更偏向底层,咱们写的应用程序代码能够利用底层已经实现好的技术);要么是用while死循环不断去检测 。python中有一些模块已经帮咱们实现了异步非阻塞的功能。git
from twisted.web.client import getPage, defer from twisted.internet import reactor def all_done(arg): reactor.stop() def callback(contents): print(contents) deferred_list = [] url_list = ['http://www.bing.com', 'http://www.baidu.com', ] for url in url_list: deferred = getPage(bytes(url, encoding='utf8')) # 次数使用回调函数,猜想用到了异步功能 deferred.addCallback(callback) deferred_list.append(deferred) dlist = defer.DeferredList(deferred_list) dlist.addBoth(all_done) # 死循环,不断地处理deferred_list里的socket对象,按理说处理完成就应该自动中止了,但多是这个框架架构设计问题,不能作到自动中止,须要手动停 reactor.run()
gevent 是基于greenlet作的,greenlet实现了协程。协程和线程不同,协程不是真实存在的,是程序员伪造出来的具备相似于线程的切换效果。单纯的协程不能完成提升效率的方式,如greenlet须要手动切换,因此这出现了gevent。
gevent 能够在遇到IO阻塞的时候自动切换协程的运行,这样就能够提高效率。而协程切来切去的功能正好能够利用过来实现异步非阻塞程序员
import gevent from gevent import monkey monkey.patch_all() import requests def fetch_async(method, url, req_kwargs): print(method, url, req_kwargs) response = requests.request(method=method, url=url, **req_kwargs) print(response.url) # ##### 发送请求 ##### gevent.joinall([ gevent.spawn(fetch_async, method='get', url='https://www.cnblogs.com/longyunfeigu/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://www.baidu.com/', req_kwargs={}), gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}), ])
requests模块发请求,内部也是用socket,先去connect,而后发送请求,最后收到响应github
asyncio默认只支持发tcp层的报文,想要发http请求,就须要本身封装http报文web
import asyncio @asyncio.coroutine def fetch_async(host, url='/'): print(host, url) reader, writer = yield from asyncio.open_connection(host, 80) request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,) request_header_content = bytes(request_header_content, encoding='utf-8') writer.write(request_header_content) yield from writer.drain() text = yield from reader.read() print(host, url, text) writer.close() tasks = [ fetch_async('www.cnblogs.com', '/longyunfeigu/'), fetch_async('baidu.com', '/') ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
运用知识:非阻塞socket + IO多路复用网络
from socket import * import select import time class HttpContext(object): def __init__(self, sock, url_dict): import time self._start_time = time.time() self.sock = sock self.port = '' self.host = '' self.method = '' self.data = '' self.path = '' self.content = b'' self.text = '' self.timeout = 0 self.callback = None self.initial(url_dict) def initial(self, url_dict): self.port = url_dict.get('port') self.host = url_dict.get('host') self.method = url_dict.get('method') self.data = url_dict.get('data') self.path = url_dict.get('path') self.timeout = url_dict.get('timeout', 5) self.callback = url_dict.get('callback') def connect(self): self.sock.connect((self.host, self.port)) def fileno(self): return self.sock.fileno() def send_get(self): content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n"""%( self.method.upper(), self.path, self.host) self.sock.sendall(bytes(content, encoding='utf8')) def sendall(self): if self.method.upper() == 'GET': self.send_get() elif self.method.upper() == 'POST': self.send_post() else: pass def send_post(self): content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s"""%( self.method.upper(), self.path, self.host, self.data) self.sock.sendall(bytes(content, encoding='utf8')) def recv(self): import time while 1: data = self.sock.recv(8096) if not data: break self.content += data self.text += str(data, encoding='utf8') time.sleep(0.1) self.finish() def finish(self, msg=''): if msg: self.text = msg self.content = bytes(msg, encoding='utf8') if self.callback: self.callback(self.text) class AsynchRequest(object): def __init__(self): self.conn_socket_list = [] self.recv_socket_list = [] def add_request(self, **url_dict): # 当即发起connect链接 soc = socket(AF_INET,SOCK_STREAM) soc.setblocking(0) ctx = HttpContext(soc, url_dict) self.conn_socket_list.append(ctx) self.recv_socket_list.append(ctx) try: ctx.connect() except BlockingIOError as e: pass # print('request is sended') def check_timeout(self): # 检验是否超时 ctime = time.time() for ctx in self.recv_socket_list: if ctx._start_time + ctx.timeout <= ctime: self.recv_socket_list.remove(ctx) self.conn_socket_list.remove(ctx) ctx.finish('connect超时') def run(self): while 1: # r_list 表明socket对象是否有数据能够读, w_list表明socket是否能够写,也就是是否能够发送数据,写服务端程序通常只须要用到 r_list # IO多路复用监听的对象不必定是socket对象,只要对象有fileno方法都能监听,内部也是拿对象的fileno的返回值来监听 r_list, w_list, e_list = select.select(self.conn_socket_list, self.recv_socket_list, [], 0.05) for w in w_list: w.sendall() self.recv_socket_list.remove(w) for r in r_list: r.recv() self.conn_socket_list.remove(r) if not self.conn_socket_list: break self.check_timeout() def callback(response): print(response) url_list = [ {'host': 'www.baidu.com','port': 80, 'path':'/', 'name':'baidu', 'method':'GET', 'callback': callback}, {'host': 'cn.bing.com','port': 80,'path':'/', 'name':'chouti', 'method':'GET'}, ] if __name__ == '__main__': obj = AsynchRequest() for i in url_list: obj.add_request(**i) obj.run()
这里的自定义IO模块是站在客户端的角度来定义的,借助底层的IO多路复用来帮咱们检测socket是否已经准备好(不用咱们手动写死循环去检测socket对象是否已经准备好)多线程