python中的协程(三)

asyncio

asyncio 是干什么的?html

  • 异步网络操做
  • 并发
  • 协程

python3.0时代,标准库里的异步网络模块:select(很是底层) python3.0时代,第三方异步网络库:Tornado python3.4时代,asyncio:支持TCP,子进程python

如今的asyncio,有了不少的模块已经在支持:aiohttp,aiodns,aioredis等等 https://github.com/aio-libs 这里列出了已经支持的内容,并在持续更新git

固然到目前为止实现协程的不只仅只有asyncio,tornado和gevent都实现了相似功能github

关于asyncio的一些关键字的说明:web

  • event_loop 事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当知足事件发生的时候,调用相应的协程函数redis

  • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会当即执行函数,而是会返回一个协程对象。协程对象须要注册到事件循环,由事件循环调用。网络

  • task 任务:一个协程对象就是一个原生能够挂起的函数,任务则是对协程进一步封装,其中包含了任务的各类状态多线程

  • future: 表明未来执行或没有执行的任务的结果。它和task上没有本质上的区别并发

  • async/await 关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。app

事件循环(Event Loop)

  • 事件循环 — 把它想成 asyncio 的中心执行器。

如今咱们看一下全部这些如何融为一体。正如我以前提到的,异步代码在一个线程中运行。

从上图可知:

1.消息循环是在线程中执行

2.从队列中取得任务

3.每一个任务在协程中执行下一步动做

4.若是在一个协程中调用另外一个协程(await <coroutine_name>),会触发上下文切换,挂起当前协程,并保存现场环境(变量,状态),而后载入被调用协程

5.若是协程的执行到阻塞部分(阻塞I/O,Sleep),当前协程会挂起,并将控制权返回到线程的消息循环中,而后消息循环继续从队列中执行下一个任务...以此类推

 6  . 队列中的全部任务执行完毕后,消息循环返回第一个任务

定义一个协程

import time
import asyncio

now = lambda: time.time()
start = now()


# 0、经过async关键字定义一个协程对象,调用时不会执行,会返回一个协程对象
async def do_some_work(x):
    print("waiting:", x)

# 一、这里是一个协程对象,这个时候do_some_work函数并无执行
coro = do_some_work(2)  # 返回了一个协程对象,并赋值
print(coro)  # <coroutine object do_some_work at 0x000001FFF0E10D00>
# 二、建立一个事件loop
loop = asyncio.get_event_loop()
# 三、将协程加入到事件循环loop
loop.run_until_complete(coro)

print("Time:", now()-start)

  在上面代码中咱们经过async关键字定义一个协程(coroutine),固然协程不能直接运行,须要将协程加入到事件循环loop中,asyncio.get_event_loop:建立一个事件循环,而后使用run_until_complete将协程注册到事件循环,并启动事件循环。

建立一个task

  协程对象不能直接运行,在注册事件循环的时候,实际上是run_until_complete方法将协程包装成为了一个任务(task)对象. task对象是Future类的子类,保存了协程运行后的状态,用于将来获取协程的结果。

import asyncio
import time

now = lambda: time.time()
start = now()

async def do_some_work(x):  # 0
    print("waiting:", x)

coroutine = do_some_work(2)  # 1
loop = asyncio.get_event_loop()  # 2
task = loop.create_task(coroutine)  # 三、建立task对象
print(task)  # <Task pending coro=<do_some_work() running at H:/python/project_workspace/web/协程/task_test.py:7>>
loop.run_until_complete(task)  # 4
print(task)  # <Task finished coro=<do_some_work() done, defined at H:/python/project_workspace/web/协程/task_test.py:7> result=None> 
print("Time:", now()-start)

  建立task后,在task加入事件循环以前为pending状态,当完成后,状态为finished

关于上面经过loop.create_task(coroutine)建立task,一样的能够经过 asyncio.ensure_future(coroutine)建立task

关于这两个命令的官网解释: https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future

asyncio.ensure_future(coro_or_future, *, loop=None)¶
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

If the argument is a Future, it is returned directly.

  https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task

AbstractEventLoop.create_task(coro)
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

Third-party event loops can use their own subclass of Task for interoperability. In this case, the result type is a subclass of Task.

This method was added in Python 3.4.2. Use the async() function to support also older Python versions.

绑定回调

绑定回调,在task执行完成的时候能够获取执行的结果,回调的最后一个参数是future对象,经过该对象能够获取协程返回值。

import time
import asyncio

now = lambda: time.time()
start = now()

async def do_some_work(x):
    print("waiting:", x)
    return "Done after {}s".format(x)

def callback(future):
    print("callback:", future.result())

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
print(task)
task.add_done_callback(callback)  # 绑定回调函数,把参数传给了callback
print(task)
loop.run_until_complete(task)
print(task)

print("Time:", now()-start)

#########################
<Task pending coro=<do_some_work() running at H:/python/project_workspace/web/协程/绑定回调.py:7>>
<Task pending coro=<do_some_work() running at H:/python/project_workspace/web/协程/绑定回调.py:7> cb=[callback() at H:/python/project_workspace/web/协程/绑定回调.py:11]>
waiting: 2
callback: Done after 2s
<Task finished coro=<do_some_work() done, defined at H:/python/project_workspace/web/协程/绑定回调.py:7> result='Done after 2s'>
Time: 0.0010023117065429688

  经过add_done_callback方法给task任务添加回调函数,当task(也能够说是coroutine)执行完成的时候,就会调用回调函数。并经过参数future获取协程执行的结果。这里咱们建立 的task和回调里的future对象其实是同一个对象

阻塞和await

使用async能够定义协程对象,使用await能够针对耗时的操做进行挂起,就像生成器里的yield同样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其余的协程也挂起或者执行完毕,再进行下一个协程的执行

耗时的操做通常是一些IO操做,例如网络请求,文件读取等。咱们使用asyncio.sleep函数来模拟IO操做。协程的目的也是让这些IO操做异步化。

import asyncio
import time

now = lambda :time.time()

async def do_some_work(x):
    print("waiting:",x)
    # await 后面就是调用耗时的操做
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

print("Task ret:", task.result())
print("Time:", now() - start)

  在await asyncio.sleep(x),由于这里sleep了,模拟了阻塞或者耗时操做,这个时候就会让出控制权。 即当遇到阻塞调用的函数的时候,使用await方法将协程的控制权让出,以便loop调用其余的协程。

await实现并发

# Tools:Pycharm 2017.3.2
# author ="wlx"
__date__ = '2018/9/18 19:43'
import asyncio
import time

now = lambda :time.time()

async def do_some_work(x):
    print("Waiting:",x)
    await asyncio.sleep(x)  # 这里阻塞时自动跳转
    return "Done after {}s".format(x)

start = now()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))  # 多个任务时,不能直接传tasks,须要asyncio.wait()封装一下

print('123567890')
for task in tasks:
    print("Task ret:",task.result())

print("Time:",now()-start)

  运行结果:

Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Time: 4.004154920578003

  总时间为4s左右。4s的阻塞时间,足够前面两个协程执行完毕。若是是同步顺序的任务,那么至少须要7s。此时咱们使用了aysncio实现了并发。asyncio.wait(tasks) 也可使用 asyncio.gather(*tasks) ,前者接受一个task列表,后者接收一堆task。

关于asyncio.gather和asyncio.wait官网的说明:

https://docs.python.org/3/library/asyncio-task.html#asyncio.gather

Return a future aggregating results from the given coroutine objects or futures.

All futures must share the same event loop. If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

https://docs.python.org/3/library/asyncio-task.html#asyncio.wait

Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).

The sequence futures must not be empty.

timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

return_when indicates when this function should return.

协程嵌套

使用async能够定义协程,协程用于耗时的io操做,咱们也能够封装更多的io操做过程,这样就实现了嵌套的协程,即一个协程中await了另一个协程,如此链接起来。

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
    print("waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]

    dones, pendings = await asyncio.wait(tasks)
    for task in dones:
        print("Task ret:", task.result())

    # results = await asyncio.gather(*tasks)
    # for result in results:
    #     print("Task ret:",result)


start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)

若是咱们把上面代码中的:

    dones, pendings = await asyncio.wait(tasks)
    for task in dones:
        print("Task ret:", task.result())

替换为:

    results = await asyncio.gather(*tasks)
    for result in results:
        print("Task ret:",result)

这样获得的就是一个结果的列表

不在main协程函数里处理结果,直接返回await的内容,那么最外层的run_until_complete将会返回main协程的结果。 将上述的代码更改成:

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
    print("waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    return await asyncio.gather(*tasks)

start = now()

loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
    print("Task ret:",result)

print("Time:", now()-start)

或者返回使用asyncio.wait方式挂起协程。

将代码return await asyncio.gather(*tasks)

更改成:return await asyncio.wait(tasks)

也可使用asyncio的as_completed方法

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
    print("waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print("Task ret: {}".format(result))

start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)

  从上面也能够看出,协程的调用和组合很是灵活,主要体如今对于结果的处理:如何返回,如何挂起

协程的中止

future对象有几个状态:

  • Pending
  • Running
  • Done
  • Cacelled

建立future的时候,task为pending,事件循环调用执行的时候固然就是running,调用完毕天然就是done,若是须要中止事件循环,就须要先把task取消。可使用asyncio.Task获取事件循环的task

import asyncio
import time


now = lambda :time.time()


async def do_some_work(x):
    print("Waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

coroutine1 =do_some_work(1)
coroutine2 =do_some_work(2)
coroutine3 =do_some_work(2)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3),
]

start = now()

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
    print(asyncio.Task.all_tasks())
    for task in asyncio.Task.all_tasks():
        print(task.cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

print("Time:",now()-start)

  启动事件循环以后,立刻ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt。而后经过循环asyncio.Task取消future。能够看到输出以下:

Waiting: 1
Waiting: 2
Waiting: 2
^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>}
False
True
True
True
Time: 1.0707225799560547

True表示cannel成功,loop stop以后还须要再次开启事件循环,最后在close,否则还会抛出异常

循环task,逐个cancel是一种方案,但是正如上面咱们把task的列表封装在main函数中,main函数外进行事件循环的调用。这个时候,main至关于最外出的一个task,那么处理包装的main函数便可。

多线程的事件循环

不少时候,咱们的事件循环用于注册协程,而有的协程须要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程建立一个事件循环,而后在新建一个线程,在新线程中启动事件循环。当前线程不会被block。

import asyncio
from threading import Thread
import time

now = lambda :time.time()

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)  # run_coroutine_threadsafe新注册协程对象
new_loop.call_soon_threadsafe(more_work, 3)

  启动上述代码以后,当前线程不会被block,新线程中会按照顺序执行call_soon_threadsafe方法注册的more_work方法, 后者由于time.sleep操做是同步阻塞的,所以运行完毕more_work须要大体6 + 3

多线程异步的事件循环

import asyncio
import time
from threading import Thread

now = lambda :time.time()


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)  # 使本来同步的子线程变成了异步
    print('Done after {}s'.format(x))

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

  上述的例子,主线程中建立一个new_loop,而后在另外的子线程中开启一个无限事件循环。 主线程经过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操做,同时主线程又不会被block。一共执行的时间大概在6s左右。

相关文章
相关标签/搜索