Python 原生协程------asyncio

协程

  在python3.5之前,写成的实现都是经过生成器的yield from原理实现的, 这样实现的缺点是代码看起来会很乱,因而3.5版本以后python实现了原生的协程,而且引入了async和await两个关键字用于支持协程。因而在用async定义的协程与python的生成器完全分开。html

async def downloader(url):
    return 'bobby'

async def download_url(url):
    html = await downloader(url)
    return html
if __name__ == '__main__':
    coro = download_url('http://www/imooc.com')
    coro.send(None)

输出结果:
Traceback (most recent call last):
  File "D:/MyCode/Cuiqingcai/Flask/test01.py", line 67, in <module>
    coro.send(None)
StopIteration: bobby

能够看到结果中能够将downloader(url)的结果返回。须要注意的是在原生协程里面不能用next()来预激协程。python

async def downloader(url):
    return 'bobby'

async def download_url(url):
    html = await downloader(url)
    return html
if __name__ == '__main__':
    coro = download_url('http://www/imooc.com')
    coro.next()

结果:
AttributeError: 'coroutine' object has no attribute 'next'
sys:1: RuntimeWarning: coroutine 'download_url' was never awaited

 

原生协程async代码中间是不能在使用yield生成器的,这样就为了更好的将原生协程与生成器严格区分开来。而且await只能和async语句搭配,不能和生成器搭配。由于要调用await须要调用对象实现__await__()这个魔法方法。因此在定义协程时候注意不要混用。可是理解的时候仍是能够将原生的协程中 await能够对比生成器的yield from。web

 asyncio

高并发的核心模块,3.4以后引入,最具野性的模块,web服务器,爬虫均可以胜任。它是一个模块也能够看作一个框架。服务器

协程编码模式的三个要点:并发

  1. 事件循环
  2. 回调(驱动生成器(协程))
  3. epoll(IO多路复用)

  asyncio的简单实用:

  这里须要注意的是:同步阻塞的接口不能使用在协程里面。由于协程是单线程的,只要有一个地方阻塞了,那么全部的协程都须要等待阻塞结束以后才能够向下运行,因而在协程函数中等待必定不能用time.sleep() 若是用time.sleep()就失去了协程的意义了(即程序运行的时间将会是每一个协程数乘以time.sleep()的时间数。)同时用asyncio.sleep() 以前须要加上 awaitapp

import time
async def download_url(url): print('start get %s' % url) await asyncio.sleep(2) print('get %s finished.' % url) if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(download_url('https:www.baidu.com')) # 阻塞等事件完成以后再向下运行。至关于进程线程的join()方法,或者进线程池的wait() print('一共用时:%s' % (time.time() - start_time)) start get https:www.baidu.com get https:www.baidu.com finished. 一共用时:2.0011143684387207

  一次执行多个任务。(用时和一个任务同样!)下面任务若是换成time.sleep(2)则函数须要至少20秒才能执行完毕。

import asyncio
import time async def download_url(url): print('start get %s.' % url) await asyncio.sleep(2) print('get %s end' % url) if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() url_list = [] for i in range(10): url_list.append('https:www.baidu.com.index{}'.format(i)) tasks = [download_url(url) for url in url_list] loop.run_until_complete(asyncio.wait(tasks)) print('用时 %s' %(time.time() - start_time)) ... ... 用时 2.0031144618988037

  获取协程的返回值:

  能够用两种方式先获得一个future对象。而后将该future对象放入loop.run_until_complete()中。(该函数便可以接受future对象也能够接受协程对象)而后future对象跟进程池和线程池中的future对象的方法是同样的。因而能够用.result()方法的到函数的返回结果。框架

async def download_url(url):
    # print('start get %s' % url)
    await asyncio.sleep(2) # print('get %s finished.' % url) return 'frank is a good man.' if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() # get_future = asyncio.ensure_future(download_url('https:www.baidu.com')) get_task = loop.create_task(download_url('https:www.baidu.com')) loop.run_until_complete(download_url(get_task)) print('一共用时:%s' % (time.time() - start_time)) print(get_task.result()) 一共用时:2.0021142959594727 frank is a good man.

  future完成以后的回调函数

  设置协程完成以后的回调函数:future对象的.add_done_callback(),注意在调用add_done_callback的时候会默认将future对象传递给回调函数,所以回调函数必须至少接受一个参数,同时add_done_callback必须在run_until_complete()前,(协程函数建立以后调用)由于若是在run_until_complete()以后的话,协程都应结束了。就不会起做用了。async

async def download_url(url):
    # print('start get %s' % url)
    await asyncio.sleep(2) # print('get %s finished.' % url) return 'frank is a good man.' def send_emai(future): print('网页下载完毕') print(future.result()) if __name__ == '__main__': start_time = time.time() loop = asyncio.get_event_loop() # get_future = asyncio.ensure_future(download_url('https:www.baidu.com')) get_task = loop.create_task(download_url('https:www.baidu.com')) get_task.add_done_callback(send_emai) loop.run_until_complete(download_url(get_task)) print('一共用时:%s' % (time.time() - start_time)) print(get_task.result()) 网页下载完毕 frank is a good man. 一共用时:2.0021145343780518 frank is a good man.

 

问题来了。tasks.add_done_callback方法只能接受函数名,若是回调的方法也须要参数怎么办?这就须要用到偏函数from functools import partial (偏函数能够将函数包装成为另一个函数)函数

import  asyncio
import time
from functools import partial
async def get_html(url):
    print('start get url')
    await asyncio.sleep(2)
    return 'bobby'

def callback_method(url, future):# 此处由于是future对象即(tasks)调用的,因此有一个默认的参数。同时要注意:回调函数的future只能放到最后,其它的函数实参放前面。
    print('download ended %s' % url)

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = asyncio.ensure_future(get_html('http://www.baidu.com'))
    # tasks = loop.create_task(get_html('http://www.baidu.com'))
    tasks.add_done_callback(partial(callback_method, 'http://www.baidu.com')) # 参数为回调方法
    loop.run_until_complete(tasks)
    print(tasks.result())


start get url
download ended http://www.baidu.com
bobby

 

gather 与 wait的区别。都是能够等待程序运行以后往下运行。可是gather比wait更加高级一点高并发

gather能够将任务分组:

import  asyncio
import time

async def get_html(url):
    print('start get %s' % url)
    await asyncio.sleep(2)
    print('end get %s' % url)
if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    group1 = [get_html('http://goup1.com') for i in range(2)]
    group2 = [get_html('http://group2.com') for i in range(2)]
    group1 = asyncio.gather(*group1)
    group2 = asyncio.gather(*group2)
    loop.run_until_complete(asyncio.gather(group1, group2))
    # 上面三行代码也能够合一块儿loop.run_until_complete(asyncio.gather(*group1, *group2)) 注意此参数前面要加*
    print(time.time()-start_time)


start get http://goup1.com
start get http://group2.com
start get http://group2.com
start get http://goup1.com
end get http://goup1.com
end get http://group2.com
end get http://group2.com
end get http://goup1.com
2.0021145343780518

loop.run_forever()协程的任务完成以后不会中止,而是会一直运行。老师吐槽:python中间loop和future的关系有点乱,loop会被放到future中间同时future又能够放到loop中间,形成一个循环。

如何取消future(task)

async def get_html(sleep_time):
    print('waiting')
    await asyncio.sleep(sleep_time)
    print('end after %s S' % sleep_time)

if __name__ == '__main__':
    task1 = get_html(2)
    task2 = get_html(3)
    task3 = get_html(4)
    tasks = [task1, task2, task3]
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        all_tasks = asyncio.Task.all_tasks()
        for task in all_tasks:
            print('cancel task')
            print(task.cancel())# 打印是否取消 是返回Ture,否False
        loop.stop()
        loop.run_forever() # 注意此处必定要加上loop.run_forever()否则会报异常 finally:
        loop.close()

waiting
waiting
waiting
cancel task
True
cancel task
True
cancel task
True
cancel task
True

 

将此代码进入cmd中运行,而后再中间按ctrl + C键,主动生成一个 KeyboardInterrupt 异常,而后异常被捕捉以后作出处理(即中止协程的运行)

 

协程的嵌套

import asyncio
async def compute(x, y):
    print('Computer %s + %s...') % (x, y))
    await asyncio.sleep(1)
    return x + y

async def print_sum(x, y):
    result = await computer(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

 

代码分析图:

 

  1. loop首先会为print_sum()建立一个task
  2. event_loop()驱动task运行,使task进入(pending状态)
  3. task去执行print_sum
  4. print_sum中首先进入子协程的调度(await至关于yiel from)因此转向执行computer,print_sum自身暂停。
  5. compute中存在await 因而也被迫进入暂停状态,而后能够直接返回给task(await == yield from 而yield from能够在调度方与子生成器之间掠过委托方创建双向通道)
  6. task返回给event_loop()
  7. 等待1秒钟以后,task唤醒compute,compute继续执行下一行代码(即return x + y)完成以后compute就是一个done状态,同时抛出一个stopiterationError异常,此异常将激活print_sum()(委托方),而且将异常将被await(对应以前的yield from)捕捉并提取出return的值。
  8. print_sum()被激活以后执行print而后变成done状态,也会抛出一个stopiterationError异常,而后被task接受并处理了。

 asyncio中的其余函数(如下三个为底层函数,多数条件下用得很少)

call_soon()

  在协程运行时候,能够传递一些函数去执行,注意是函数不是协程。

import asyncio
def callback(sleep_times):
    print('sleep {} success'.format(sleep_times))

if __name__ =='__main__':
    loop = asyncio.get_event_loop()
    loop.call_soon(callback, 2) #第一个参数为调用的函数名,第二个单数为被调用函数的参数
    loop.run_forever()

sleep 2 success

 

  注意此处调动函数的运行须要用到loop.run_forever()而不是loop.run_until_complete()由于oop.call_soon()只是调用函数而不是loop注册的协程。

同时loop.run_forever()会致使程序一直在运行,不会自动结束。因而须要添加如下方法使程序关闭。

import asyncio
def callback(sleep_times):
    print('sleep {} success'.format(sleep_times))

def stop_loop(loop):
    loop.stop()

if __name__ =='__main__':
    loop = asyncio.get_event_loop()
    loop.call_soon(callback, 2)
    loop.call_soon(stop_loop, loop)
    loop.run_forever()

 

call_later()

  功能,讲一个callback函数在一个指定的时候运行。

import asyncio
def callback(sleep_times):
    print('sleep {} success'.format(sleep_times))

def stop_loop(loop):
    loop.stop()

if __name__ =='__main__':
    loop = asyncio.get_event_loop()
    loop.call_later(2, callback, 2) # 参数含义:第一个参数为延迟时间,延迟越少越先运行。
    loop.call_later(1, callback, 1)
    loop.call_later(3, callback, 3)
    loop.call_soon(callback, 4)
    loop.run_forever()


sleep 4 success
sleep 1 success
sleep 2 success
sleep 3 success

 

同时存在call_soon()和call_later()时,call_soon()会在那个call_later()前面调用。

call_at()

  也是调用函数在指定时间内调用,可是它的指定时间,是指的loop内的时间,而不是本身传递的时间。能够用loop.time()来获取loop内时间。

import asyncio
def callback(sleep_times):
    print('sleep {} success'.format(sleep_times))

if __name__ =='__main__':
    loop = asyncio.get_event_loop()
    now = loop.time()
    loop.call_at(now+2, callback, 2) # 注意第一个参数是相对于loop系统时间而来的,不是自定义的几秒钟以后运行。
    loop.call_at(now+1, callback, 1)
    loop.call_at(now+3, callback, 3)
    loop.call_soon(callback, 4)
    loop.run_forever()

sleep 4 success
sleep 1 success
sleep 2 success
sleep 3 success

 

协程中线程的实现

协程不提供阻塞的方式,可是有时候有些库,和有些接口只能提供阻塞方式链接。因而就能够在协程中继承阻塞io

相关文章
相关标签/搜索