asyncio
包含各类特定系统实现的模块化事件循环
传输和协议抽象
对TCP、UDP、SSL、子进程、延时调用以及其余的具体支持
模仿futures模块但适用于事件循环使用的Future类
基于 yield from 的协议和任务,能够让你用顺序的方式编写并发代码
必须使用一个将产生阻塞IO的调用时,有接口能够把这个事件转移到线程池
模仿threading模块中的同步原语、能够用在单线程内的协程之间html
事件循环+回调(驱动生成器)+epoll(IO多路复用)
asyncio是python用于解决异步io编程的一整套解决方案
tornado、gevent、twisted(scrapy, django channels)
torando(实现web服务器), django+flask(uwsgi, gunicorn+nginx)
tornado能够直接部署, nginx+tornadopython
import asyncio import time # 再也不这使用同步阻塞的time async def get_html(url): print("start get url") await asyncio.sleep(2) # time.sleep(2) 不要这样写 print("end get url") if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() tasks = [get_html("http://www.imooc.com") for i in range(10)] loop.run_until_complete(asyncio.wait(tasks)) print(time.time() - start_time) """ start get url start get url start get url start get url start get url start get url start get url start get url start get url start get url end get url end get url end get url end get url end get url end get url end get url end get url end get url end get url 2.001918077468872 """
import asyncio import time from functools import partial # 偏函数 async def get_html(url): print("start get url") await asyncio.sleep(2) return "lewen" def callback(url, future): print(url) print("send callback email to lewen") if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() # 事件循环 # task = asyncio.ensure_future(get_html("http://www.imooc.com")) # 任务的两种不一样写法 task = loop.create_task(get_html("http://www.imooc.com")) task.add_done_callback(partial(callback, "http://www.imooc.com")) loop.run_until_complete(task) print(task.result()) """ start get url http://www.imooc.com send callback email to lewen lewen """
import asyncio import time async def get_html(url): print("start get url") await asyncio.sleep(2) print("end get url") if __name__ == "__main__": start_time = time.time() loop = asyncio.get_event_loop() tasks = [get_html("http://www.imooc.com") for i in range(10)] # loop.run_until_complete(asyncio.gather(*tasks)) loop.run_until_complete(asyncio.wait(tasks)) # print(time.time()-start_time) # gather和wait的区别 # gather更加高层 high-level 分组 group1 = [get_html("http://projectsedu.com") for i in range(2)] group2 = [get_html("http://www.imooc.com") for i in range(2)] group1 = asyncio.gather(*group1) group2 = asyncio.gather(*group2) # group2.cancel() #取消 loop.run_until_complete(asyncio.gather(group1, group2)) print(time.time() - start_time)
import asyncio def callback(sleep_times, loop): print("success time {}".format(loop.time())) def stoploop(loop): loop.stop() # call_later, call_at if __name__ == "__main__": loop = asyncio.get_event_loop() # 立刻执行队列里面的task # loop.call_soon(callback, 4, loop) # loop.call_soon(stoploop, loop) # call_later() 等待多少秒后执行 # loop.call_later(2, callback, 2, loop) # loop.call_later(1, callback, 1, loop) # loop.call_later(3, callback, 3, loop) # call_at() 在某一时刻执行 now = loop.time() loop.call_at(now+2, callback, 2, loop) loop.call_at(now+1, callback, 1, loop) loop.call_at(now+3, callback, 3, loop) loop.run_forever() # loop.call_soon_threadsafe()
# 使用多线程:在协程中集成阻塞io # 数据库等阻塞式IO import asyncio from concurrent.futures import ThreadPoolExecutor import socket from urllib.parse import urlparse def get_url(url): # 经过socket请求html url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" # 创建socket链接 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # client.setblocking(False) client.connect((host, 80)) # 阻塞不会消耗cpu # 不停的询问链接是否创建好, 须要while循环不停的去检查状态 # 作计算任务或者再次发起其余的链接请求 client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8")) data = b"" while True: d = client.recv(1024) if d: data += d else: break data = data.decode("utf8") html_data = data.split("\r\n\r\n")[1] print(html_data) client.close() if __name__ == "__main__": import time start_time = time.time() loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(3) # 线程池 tasks = [] for url in range(20): url = "http://www.baidu.com/s?wd={}/".format(url) task = loop.run_in_executor(executor, get_url, url) # 将阻塞的放到执行器里面 tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print("last time:{}".format(time.time() - start_time)) # 将线程池直接应用到协程里面
# coding=utf-8 # asyncio 没有提供http协议的接口 aiohttp import asyncio from urllib.parse import urlparse async def get_url(url): # 经过socket请求html url = urlparse(url) host = url.netloc path = url.path if path == "": path = "/" # 创建socket链接 reader, writer = await asyncio.open_connection(host, 80) writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8")) all_lines = [] async for raw_line in reader: data = raw_line.decode("utf8") all_lines.append(data) html = "\n".join(all_lines) return html async def main(): tasks = [] for url in range(20): url = "http://www.baidu.com/s?wd={}/".format(url) tasks.append(asyncio.ensure_future(get_url(url))) for task in asyncio.as_completed(tasks): result = await task print(result) if __name__ == "__main__": import time start_time = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print('last time:{}'.format(time.time() - start_time))
future 结果容器nginx
task 是 future 的子类,协程和future之间的桥梁,启动协程web
total = 0 async def add(): # 1,dosomething1 # 2.io操做 # 1.dosomething3 global total for i in range(100000): total += 1 async def desc(): global total for i in range(100000): total -= 1 if __name__ == "__main__": import asyncio tasks = [add(), desc()] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) print(total)
13.8 aiohttp实现高并发爬虫数据库