from .base_events import * from .coroutines import * #协程模块,能够将函数装饰为协程 from .events import * #事件模块,事件循环和任务调度都将使用到他 from .futures import * #异步并发模块,该模块对task封装了许多方法,表明未来执行或没有执行的任务的结果。它和task上没有本质上的区别 from .locks import * #异步保证资源同步 from .protocols import * from .queues import * from .streams import * from .subprocess import * from .tasks import * #建立任务,是对协程的封装,能够查看协程的状态。能够将任务集合 from .transports import *
1.当咱们给一个函数添加了async关键字,或者使用asyncio.coroutine装饰器装饰,就会把它变成一个异步函数。
2.每一个线程有一个事件循环,主线程调用asyncio.get_event_loop时会建立事件循环,
3.将任务封装为集合asyncio.gather(*args),以后一块儿传入事件循环中
4.要把异步的任务丢给这个循环的run_until_complete方法,事件循环会安排协同程序的执行。和方法名字同样,该方法会等待异步的任务彻底执行才会结束。
import asyncio,time
@asyncio.coroutine #设为异步函数
def func1(num):
print(num,'before---func1----')
yield from asyncio.sleep(5)
print(num,'after---func1----')
task = [func1(1),func1(2)]
if __name__ == "__main__":
begin = time.time()
loop = asyncio.get_event_loop() #进入事件循环
loop.run_until_complete(asyncio.gather(*task)) #将协同程序注册到事件循环中
loop.close()
end = time.time()
print(end-begin)
1 before---func1---- 2 before---func1---- 1 after---func1---- 2 after---func1---- 5.00528621673584
import asyncio,time async def func1(num): #使用async关键字定义一个协程,协程也是一种对象,不能直接运行,须要加入事件循环中,才能被调用。 print(num,'before---func1----') if __name__ == "__main__": begin = time.time() coroutine = func1(2) loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) loop.close() end = time.time() print(end-begin)
func1(2) #因为使用async异步关键字,因此不能直接运行
D:/MyPython/day25/mq/multhread.py:15: RuntimeWarning: coroutine 'func1' was never awaited
func1(2)html
print(type(func1),type(coroutine)) #<class 'function'> <class 'coroutine'>
同:python---await/async关键字python
coroutine = func1(2) try: coroutine.send(None) except StopIteration: pass
协程对象不能直接运行,在注册事件循环的时候,实际上是run_until_complete方法将协程包装成为了一个任务(task)对象.编程
task对象是Future类的子类,保存了协程运行后的状态,用于将来获取协程的结果session
class BaseEventLoop(events.AbstractEventLoop): def run_until_complete(self, future): """Run until the Future is done. If the argument is a coroutine, it is wrapped in a Task. WARNING: It would be disastrous to call run_until_complete() with the same coroutine twice -- it would wrap it in two different Tasks and that can't be good. Return the Future's result, or raise its exception. """ self._check_closed() new_task = not futures.isfuture(future) future = tasks.ensure_future(future, loop=self) if new_task: # An exception is raised if the future didn't complete, so there # is no need to log the "destroy pending task" message future._log_destroy_pending = False future.add_done_callback(_run_until_complete_cb) try: self.run_forever() except: if new_task and future.done() and not future.cancelled(): # The coroutine raised a BaseException. Consume the exception # to not log a warning, the caller doesn't have access to the # local task. future.exception() raise finally: future.remove_done_callback(_run_until_complete_cb) if not future.done(): raise RuntimeError('Event loop stopped before Future completed.') return future.result()
import asyncio,time async def func1(num): print(num,'before---func1----') if __name__ == "__main__": begin = time.time() coroutine = func1(2) loop = asyncio.get_event_loop() task = loop.create_task(coroutine) #建立了任务 print(task) #pending loop.run_until_complete(task) loop.close() print(task) #finished end = time.time() print(end-begin)
对于协程的4种状态:python---协程理解并发
print(task) #pending
print(getcoroutinestate(coroutine))
loop.run_until_complete(task)
loop.close()
print(task) #finished
print(getcoroutinestate(coroutine))
CORO_CREATED 2 before---func1---- <Task finished coro=<func1() done, defined at D:/MyPython/day25/mq/multhread.py:4> result=None> CORO_CLOSED
深刻了解:关于Task,create_task(),ensure_future均可以用来建立任务,那么应该使用哪一个?app
条件使用ensure_future,他是最外层函数,其中调用了create_task()方法,功能全面,而Task官方不推荐直接使用异步
isinstance(task, asyncio.Future)
将会输出True。
async def func1(num):
print(num,'before---func1----')
return "recv num %s"%num
def callback(future):
print(future.result())
if __name__ == "__main__":
begin = time.time()
coroutine1 = func1(1)
loop = asyncio.get_event_loop()
task1=asyncio.ensure_future(coroutine1)
task1.add_done_callback(callback)
loop.run_until_complete(task1)
loop.close()
end = time.time()
print(end-begin)
1 before---func1----
recv num 1
0.004000186920166016async
能够看到,coroutine执行结束时候会调用回调函数。并经过参数future获取协程执行的结果。咱们建立的task和回调里的future对象,其实是同一个对象。ide
async def func1(num): print(num,'before---func1----') return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(1) loop = asyncio.get_event_loop() task1=asyncio.ensure_future(coroutine1) loop.run_until_complete(task1) print(task1) print(task1.result()) loop.close() end = time.time() print(end-begin)
1 before---func1---- <Task finished coro=<func1() done, defined at D:/MyPython/day25/mq/multhread.py:6> result='recv num 1'> recv num 1 0.0030002593994140625
使用async关键字定义的协程对象,使用await能够针对耗时的操做进行挂起(是生成器中的yield的替代,可是本地协程函数不容许使用),让出当前控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其余协程也挂起,或者执行完毕,在进行下一个协程的执行函数
使用asyncio.sleep模拟阻塞操做。
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) loop = asyncio.get_event_loop() task1=asyncio.ensure_future(coroutine1) task2=asyncio.ensure_future(coroutine2) tasks = asyncio.gather(*[task1,task2]) #gather能够实现同时注册多个任务,实现并发操做。wait方法使用一致 loop.run_until_complete(tasks) loop.close() end = time.time() print(end-begin)
task1=asyncio.ensure_future(coroutine1) task2=asyncio.ensure_future(coroutine2) tasks = asyncio.gather(*[task1,task2]) loop.run_until_complete(tasks)
task1=asyncio.ensure_future(coroutine1) task2=asyncio.ensure_future(coroutine2) tasks = asyncio.wait([task1,task2]) loop.run_until_complete(tasks)
Usage:
done, pending = yield from asyncio.wait(fs)
二者的返回值也是不一样的
import asyncio,aiohttp async def fetch_async(url): print(url) async with aiohttp.ClientSession() as session: async with session.get(url) as resp: print(resp.status) print(await resp.text()) tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.cnblogs.com/ssyfj/')] event_loop = asyncio.get_event_loop() results = event_loop.run_until_complete(asyncio.gather(*tasks)) event_loop.close()
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num async def main(): coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] dones, pendings = await asyncio.wait(tasks) for task in dones: #对已完成的任务集合进行操做 print("Task ret: ",task.result()) if __name__ == "__main__": begin = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() end = time.time() print(end-begin)
5 before---func1---- 3 before---func1---- 4 before---func1---- Task ret: recv num 4 Task ret: recv num 5 Task ret: recv num 3 5.000285863876343
results = await asyncio.gather(*tasks) for result in results: print("Task ret: ",result)
async def main(): coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] return await asyncio.gather(*tasks) if __name__ == "__main__": begin = time.time() loop = asyncio.get_event_loop() results = loop.run_until_complete(main()) for result in results: print("Task ret: ",result) loop.close() end = time.time() print(end-begin)
return await asyncio.wait(tasks) ---------------------------------------------------- dones,pendings = loop.run_until_complete(main()) for task in dones: print("Task ret: ",task.result())
Return an iterator whose values are coroutines. #返回一个可迭代的协程函数值 When waiting for the yielded coroutines you'll get the results (or exceptions!) of the original Futures (or coroutines), in the order in which and as soon as they complete.
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num async def main(): coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] for task in asyncio.as_completed(tasks): result = await task print("Task ret: ",result) if __name__ == "__main__": begin = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() end = time.time() print(end-begin)
future对象有几个状态:
建立future的时候,task为pending,
事件循环调用执行的时候固然就是running,
调用完毕天然就是done,
若是须要中止事件循环,就须要先把task取消。
可使用asyncio.Task获取事件循环的task
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): #获取全部任务 print(task.cancel()) #单个任务取消 loop.stop() #须要先stop循环 loop.run_forever() #须要在开启事件循环 finally: loop.close() #统一关闭 end = time.time() print(end-begin)
5 before---func1---- 3 before---func1---- 4 before---func1---- {<Task pending coro=<func1() running at multhread.py:5> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<loc als>._on_completion() at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks.py:428]>, < Task pending coro=<wait() running at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks .py:361> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<func1() running at multhread.py:5> wait _for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at C:\Users\Administrator\AppData\Loca l\Programs\Python\Python35\lib\asyncio\tasks.py:428]>, <Task pending coro=<func1() running at multhread.py:5> wait_f or=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at C:\Users\Administrator\AppData\Local\ Programs\Python\Python35\lib\asyncio\tasks.py:428]>} #未处理,刚刚挂起为pending状态 True #返回True,表示cancel取消成功 True True True 3.014172315597534
True表示cannel成功,loop stop以后还须要再次开启事件循环,最后在close,否则还会抛出异常:
Task was destroyed but it is pending!
由于cancel后task的状态依旧是pending
for task in asyncio.Task.all_tasks(): print(task) print(task.cancel()) print(task)
<Task pending coro=<func1() running at multhread.py:5> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<loca ls>._on_completion() at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks.py:428]> True <Task pending coro=<func1() running at multhread.py:5> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion () at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks.py:428]>
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num async def main(): coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] dones,pendings=await asyncio.wait(tasks) for task in dones: print("Task ret: ",task.result()) if __name__ == "__main__": begin = time.time() loop = asyncio.get_event_loop() task = asyncio.ensure_future(main()) try: loop.run_until_complete(task) except KeyboardInterrupt as e: print(asyncio.gather(*asyncio.Task.all_tasks()).cancel()) #咱们只是把上面的单个写成了全部任务集合取消,和协程嵌套关系不大。上面也能够这样写。不过协程嵌套能够简化代码 loop.stop() loop.run_forever() finally: loop.close() end = time.time() print(end-begin)
5 before---func1---- 3 before---func1---- 4 before---func1---- <class 'asyncio.tasks._GatheringFuture'> True 3.008172035217285
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.gather(*tasks).cancel()) loop.stop() loop.run_forever() finally: loop.close() end = time.time() print(end-begin)
5 before---func1---- 3 before---func1---- 4 before---func1---- True 3.008171796798706
在当前线程中建立一个事件循环(不启用,单纯获取标识),开启一个新的线程,在新的线程中启动事件循环。在当前线程依据事件循环标识,能够向事件中添加协程对象。当前线程不会因为事件循环而阻塞了。
上面在一个线程中执行的事件循环,只有咱们主动关闭事件close,事件循环才会结束,会阻塞。
import asyncio,time async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3), ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.run_forever() end = time.time() print(end-begin)
import asyncio,time,threading def func1(num): print(num,'before---func1----') time.sleep(num) return "recv num %s"%num def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() if __name__ == "__main__": begin = time.time() new_loop = asyncio.new_event_loop() #在当前线程下建立时间循环,(未启用) t = threading.Thread(target=start_loop,args=(new_loop,)) #开启新的线程去启动事件循环 t.start() new_loop.call_soon_threadsafe(func1,3) new_loop.call_soon_threadsafe(func1,2) new_loop.call_soon_threadsafe(func1,6) end = time.time() print(end-begin) #当前线程未阻塞,耗时0.02800154685974121
3 before---func1---- 0.02800154685974121 2 before---func1---- 6 before---func1----
import asyncio,time,threading async def func1(num): print(num,'before---func1----') await asyncio.sleep(num) return "recv num %s"%num def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() if __name__ == "__main__": begin = time.time() coroutine1 = func1(5) coroutine2 = func1(3) coroutine3 = func1(4) new_loop = asyncio.new_event_loop() #在当前线程下建立时间循环,(未启用) t = threading.Thread(target=start_loop,args=(new_loop,)) #开启新的线程去启动事件循环 t.start() asyncio.run_coroutine_threadsafe(coroutine1,new_loop) #传参必须是协程对象 asyncio.run_coroutine_threadsafe(coroutine2,new_loop) asyncio.run_coroutine_threadsafe(coroutine3,new_loop) end = time.time() print(end-begin) #当前线程未阻塞,耗时0.010000467300415039
5 before---func1---- 3 before---func1---- 4 before---func1---- 0.010000467300415039
主线程经过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操做,同时主线程又不会被block。