Python3.5协程学习研究

今夕何夕故人不来迟暮连山黛

   以前有研究过python协程相关的知识,但一直没有进行深刻探究。日常工做中使用的也仍是以python2为主,然而最近的项目须要使用python3协程相关的内容,所以凑出时间学习了一番python3的协程语法。    本篇主要以介绍python3.5的async/await协程语法为主,由于这种语法看上去很别扭,不容易理解。若是对python协程基础不是很了解,建议能够先看此篇:Python协程python

协程函数(异步函数)

   咱们日常使用最多的函数都是同步函数,即不一样函数执行是按顺序执行的。那么什么是异步函数呢?怎么建立异步函数?怎么在异步函数之间来回切换执行?不急,请往下看。web

建立协程函数

先来看下普通函数:bash

def test1():
    print("1")
    print("2")

def test2():
    print("3")
    print("4")

a = test1()
b = test2()
print(a,type(a))
print(b,type(b))
复制代码

运行以上代码获得结果:微信

1
2
3
4
None <class 'NoneType'>
None <class 'NoneType'>
复制代码

说明:程序顺序执行了test一、test2函数,在调用函数的时候就自动进入了函数体,并执行了函数的内容。网络

而后使用async关键词将普通函数变成协程函数,即异步函数:session

async def test1():
    print("1")
    print("2")

async def test2():
    print("3")
    print("4")

print(test1())
print(test2())

复制代码

运行以上代码获得结果:多线程

<coroutine object test1 at 0x109f4c620>
asyncio_python3_test.py:16: RuntimeWarning: coroutine 'test1' was never awaited
  print(test1())
<coroutine object test2 at 0x109f4c620>
asyncio_python3_test.py:17: RuntimeWarning: coroutine 'test2' was never awaited
  print(test2())
复制代码

说明:忽略结果中的告警,能够看到调用函数test一、test2的时候,并无进入函数体且执行函数内容,而是返回了一个coroutine(协程对象)。并发

除了函数外,类的方法也可使用async关键词将其变成协程方法:异步

class test:
    async def run(self):
        print("1")
复制代码

执行协程函数

   前面咱们成功建立了协程函数,而且在调用函数的时候返回了一个协程对象,那么怎么进入函数体并执行函数内容呢?相似于生成器,可使用send方法执行函数,修改下前面的代码:async

async def test1():
    print("1")
    print("2")

async def test2():
    print("3")
    print("4")

a = test1()
b = test2()

a.send(None)
b.send(None)
复制代码

运行以上代码获得如下结果:

1
2
Traceback (most recent call last):
  File "asyncio_python3_test.py", line 19, in <module>
    a.send(None)
StopIteration
sys:1: RuntimeWarning: coroutine 'test2' was never awaited
复制代码

   说明:程序先执行了test1协程函数,当test1执行完时报了StopIteration异常,这是协程函数执行完饭回的一个异常,咱们能够用try except捕捉,来用判断协程函数是否执行完毕。

async def test1():
    print("1")
    print("2")

async def test2():
    print("3")
    print("4")

a = test1()
b = test2()

try:
    a.send(None) # 能够经过调用 send 方法,执行协程函数
except StopIteration as e:
    print(e.value)
    # 协程函数执行结束时会抛出一个StopIteration 异常,标志着协程函数执行结束,返回值在value中
    pass
try:
    b.send(None) # 能够经过调用 send 方法,执行协程函数
except StopIteration:
    print(e.value)
    # 协程函数执行结束时会抛出一个StopIteration 异常,标志着协程函数执行结束,返回值在value中
    pass
复制代码

运行以上代码获得如下结果:

1
2
3
4
复制代码

   说明:程序先执行了test1函数,等到test1函数执行完后再执行test2函数。从执行过程上来看目前协程函数与普通函数没有区别,并无实现异步函数,那么如何交叉运行协程函数呢?

交叉执行协程函数(await)

   经过以上例子,咱们发现定义协程函数可使用async关键词,执行函数可使用send方法,那么如何实如今两个协程函数间来回切换执行呢?这里须要使用await关键词,修改一下代码:

import asyncio

async def test1():
    print("1")
    await asyncio.sleep(1) # asyncio.sleep(1)返回的也是一个协程对象
    print("2")

async def test2():
    print("3")
    print("4")

a = test1()
b = test2()

try:
    a.send(None) # 能够经过调用 send 方法,执行协程函数
except StopIteration:
    # 协程函数执行结束时会抛出一个StopIteration 异常,标志着协程函数执行结束
    pass

try:
    b.send(None) # 能够经过调用 send 方法,执行协程函数
except StopIteration:
    pas
复制代码

运行以上函数获得如下结果:

1
3
4
复制代码

   说明:程序先执行test1协程函数,在执行到await时,test1函数中止了执行(阻塞);接着开始执行test2协程函数,直到test2执行完毕。从结果中,咱们能够看到,直到程序运行完毕,test1函数也没有执行完(没有执行print("2")),那么如何使test1函数执行完毕呢?可使用asyncio自带的方法循环执行协程函数。

await与阻塞

   使用async能够定义协程对象,使用await能够针对耗时的操做进行挂起,就像生成器里的yield同样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其余的协程也挂起或者执行完毕,再进行下一个协程的执行,协程的目的也是让一些耗时的操做异步化。

注意点:await后面跟的必须是一个Awaitable对象,或者实现了相应协议的对象,查看Awaitable抽象类的代码,代表了只要一个类实现了__await__方法,那么经过它构造出来的实例就是一个Awaitable,而且Coroutine类也继承了Awaitable。

自动循环执行协程函数

   经过前面介绍咱们知道执行协程函数须要使用send方法,但一旦协程函数执行过程当中切换到其余函数了,那么这个函数就不在被继续运行了,而且使用sned方法不是很高效。那么如何在执行整个程序过程当中,自动得执行全部的协程函数呢,就如同多线程、多进程那样,隐式得执行而不是显示的经过send方法去执行函数。

事件循环方法

前面提到的问题就须要用到事件循环方法去解决,即asyncio.get_event_loop方法,修改以上代码以下:

import asyncio

async def test1():
    print("1")
    await test2()
    print("2")

async def test2():
    print("3")
    print("4")

loop = asyncio.get_event_loop()
loop.run_until_complete(test1())
复制代码

运行以上代码获得如下结果:

1
3
4
2
复制代码

说明:asyncio.get_event_loop方法能够建立一个事件循环,而后使用run_until_complete将协程注册到事件循环,并启动事件循环。

task任务

   因为协程对象不能直接运行,在注册事件循环的时候,实际上是run_until_complete方法将协程包装成为了一个任务(task)对象。所谓task对象是Future类的子类,保存了协程运行后的状态,用于将来获取协程的结果。咱们也能够手动将协程对象定义成task,修改以上代码以下:

import asyncio

async def test1():
    print("1")
    await test2()
    print("2")

async def test2():
    print("3")
    print("4")

loop = asyncio.get_event_loop()
task = loop.create_task(test1())
loop.run_until_complete(task)
复制代码

   说明:前面说到task对象保存了协程运行的状态,而且能够获取协程函数运行的返回值,那么具体该如何获取呢?这里能够分两种方式,一种须要绑定回调函数,另一种则直接在运行完task任务后输出。值得一提的是,若是使用send方法执行函数,则返回值能够经过捕捉StopIteration异常,利用StopIteration.value获取。

直接输出task结果

当协程函数运行结束后,咱们须要获得其返回值,第一种方式就是等到task状态为finish时,调用task的result方法获取返回值。

import asyncio

async def test1():
    print("1")
    await test2()
    print("2")
    return "stop"

async def test2():
    print("3")
    print("4")

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())
loop.run_until_complete(task)
print(task.result())
复制代码

运行以上代码获得如下结果:

1
3
4
2
stop
复制代码
回调函数

   获取返回值的第二种方法是能够经过绑定回调函数,在task执行完毕的时候能够获取执行的结果,回调的最后一个参数是future对象,经过该对象能够获取协程返回值。

import asyncio

async def test1():
    print("1")
    await test2()
    print("2")
    return "stop"

async def test2():
    print("3")
    print("4")

def callback(future):
    print('Callback:',future.result()) # 经过future对象的result方法能够获取协程函数的返回值

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1()) # 建立task,test1()是一个协程对象
task.add_done_callback(callback) # 绑定回调函数
loop.run_until_complete(task)
复制代码

运行以上代码获得如下结果:

1
3
4
2
Callback: stop
复制代码

若是回调函数须要接受多个参数,能够经过偏函数导入,修改代码以下:

import asyncio
import functools

async def test1():
    print("1")
    await test2()
    print("2")
    return "stop"

async def test2():
    print("3")
    print("4")

def callback(param1,param2,future):
    print(param1,param2)
    print('Callback:',future.result())

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())
task.add_done_callback(functools.partial(callback,"param1","param2"))
loop.run_until_complete(task)
复制代码

说明:回调函数中的future对象就是建立的task对象。

future对象

   future对象有几个状态:Pending、Running、Done、Cancelled。建立future的时候,task为pending,事件循环调用执行的时候固然就是running,调用完毕天然就是done,若是须要中止事件循环,就须要先把task取消,可使用asyncio.Task获取事件循环的task。

协程中止

   前面介绍了使用事件循环执行协程函数,那么怎么中止执行呢?在中止执行协程前,须要先取消task,而后再中止loop事件循环。

import asyncio

async def test1():
    print("1")
    await asyncio.sleep(3)
    print("2")
    return "stop"

tasks = [
    asyncio.ensure_future(test1()),
    asyncio.ensure_future(test1()),
    asyncio.ensure_future(test1()),
]

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
    for task in asyncio.Task.all_tasks():
        task.cancel()
    loop.stop()
    loop.run_forever()
finally:
    loop.close()
复制代码

运行以上代码,按ctrl+c能够结束执行。

本文中用到的一些概念及方法

  • event_loop事件循环:程序开启一个无限的循环,当把一些函数注册到事件循环上时,知足事件发生条件即调用相应的函数。
  • coroutine协程对象:指一个使用async关键字定义的函数,它的调用不会当即执行函数,而是会返回一个协程对象,协程对象须要注册到事件循环,由事件循环调用。
  • task任务:一个协程对象就是一个原生能够挂起的函数,任务则是对协程进一步封装,其中包含任务的各类状态。
  • future:表明未来执行或没有执行的任务的结果,它和task上没有本质的区别
  • async/await关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

并发与并行

   并发一般指有多个任务须要同时进行,并行则是同一时刻有多个任务执行。用多线程、多进程、协程来讲,协程实现并发,多线程与多进程实现并行。

asyncio协程如何实现并发

   asyncio想要实现并发,就须要多个协程来完成任务,每当有任务阻塞的时候就await,而后其余协程继续工做,这须要建立多个协程的列表,而后将这些协程注册到事件循环中。这里指的多个协程,能够是多个协程函数,也能够是一个协程函数的多个协程对象。

import asyncio

async def test1():

    print("1")
    await asyncio.sleep(1)
    print("2")
    return "stop"

a = test1()
b = test1()
c = test1()

tasks = [
    asyncio.ensure_future(a),
    asyncio.ensure_future(b),
    asyncio.ensure_future(c),
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks)) # 注意asyncio.wait方法
for task in tasks:
    print("task result is ",task.result())
复制代码

运行以上代码获得如下结果:

1
1
1
2
2
2
task result is  stop
task result is  stop
task result is  stop
复制代码

说明:代码先是定义了三个协程对象,而后经过asyncio.ensure_future方法建立了三个task,而且将全部的task加入到了task列表,最终使用loop.run_until_complete将task列表添加到事件循环中。

协程爬虫

   前面介绍了如何使用async与await建立协程函数,使用asyncio.get_event_loop建立事件循环并执行协程函数。例子很好地展现了协程并发的高效,但在实际应用场景中该如何开发协程程序?好比说异步爬虫。我尝试用requests模块、urllib模块写异步爬虫,但实际操做发现并不支持asyncio异步,所以可使用aiohttp模块编写异步爬虫。

aiohttp实现

import asyncio
import aiohttp

async def run(url):
    print("start spider ",url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print(resp.url)

url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]

tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
复制代码

运行以上代码获得如下结果:

start spider  https://thief.one
start spider  https://home.nmask.cn
start spider  https://movie.nmask.cn
start spider  https://tool.nmask.cn
https://movie.nmask.cn
https://home.nmask.cn
https://tool.nmask.cn
https://thief.one
复制代码

说明:aiohttp基于asyncio实现,既能够用来写webserver,也能够当爬虫使用。

requests实现

   因为requests模块阻塞了客户代码与asycio事件循环的惟一线程,所以在执行调用时,整个应用程序都会冻结,但若是必定要用requests模块,可使用事件循环对象的run_in_executor方法,经过run_in_executor方法来新建一个线程来执行耗时函数,所以能够这样修改代码实现:

import asyncio
import requests

async def run(url):
    print("start ",url)
    loop = asyncio.get_event_loop()
    response = await loop.run_in_executor(None, requests.get, url)
    print(response.url)
    
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]

tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
复制代码

若是要给requests带上参数,可使用functools:

import asyncio
import requests
import functools

async def run(url):
    print("start ",url)
    loop = asyncio.get_event_loop()
    try:
        response = await loop.run_in_executor(None,functools.partial(requests.get,url=url,params="",timeout=1))
    except Exception as e:
        print(e)
    else:
        print(response.url)

url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]

tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
复制代码

asyncio中使用阻塞函数

   如同前面介绍如何在asyncio中使用requests模块同样,若是想在asyncio中使用其余阻塞函数,该怎么实现呢?虽然目前有异步函数支持asyncio,但实际问题是大部分IO模块还不支持asyncio。

阻塞函数在asyncio中使用的问题

   阻塞函数(例如io读写,requests网络请求)阻塞了客户代码与asycio事件循环的惟一线程,所以在执行调用时,整个应用程序都会冻结。

解决方案

   这个问题的解决方法是使用事件循环对象的run_in_executor方法。asyncio的事件循环在背后维护着一个ThreadPoolExecutor对象,咱们能够调用run_in_executor方法,把可调用对象发给它执行,便可以经过run_in_executor方法来新建一个线程来执行耗时函数。

run_in_executor方法

AbstractEventLoop.run_in_executor(executor, func, *args)
复制代码
  • executor 参数应该是一个 Executor 实例。若是为 None,则使用默认 executor。
  • func 就是要执行的函数
  • args 就是传递给 func 的参数

实际例子(使用time.sleep()):

import asyncio
import time

async def run(url):
    print("start ",url)
    loop = asyncio.get_event_loop()
    try:
        await loop.run_in_executor(None,time.sleep,1)
    except Exception as e:
        print(e)
    print("stop ",url)

url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]

tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
复制代码

运行以上代码获得如下函数:

start  https://thief.one
start  https://home.nmask.cn
start  https://movie.nmask.cn
start  https://tool.nmask.cn
stop  https://thief.one
stop  https://movie.nmask.cn
stop  https://home.nmask.cn
stop  https://tool.nmask.cn
复制代码

说明:有了run_in_executor方法,咱们就可使用以前熟悉的模块建立协程并发了,而不须要使用特定的模块进行IO异步开发。

参考

www.oschina.net/translate/p… www.jianshu.com/p/b5e347b3a… zhuanlan.zhihu.com/p/27258289 juejin.im/entry/5aabb…

本文来自我的博客:Python3.5协程学习研究 | nMask'Blog,转载请说明出处!

如需更多优质文章,请扫一扫关注微信公众号

或者访问我的博客:thief.one

相关文章
相关标签/搜索