在Python中使用asyncio进行异步编程

对于来自JavaScript编码者来讲,异步编程不是什么新东西,但对于Python开发者来讲,async函数和future(相似JS的promise)可不是那么容易能理解的。python

Concurrency vs Parallelism

Concurrency和Parallelism听起来同样,但在实际编程里它们有着较大的不一样。编程

想象下你在作饭的时候写书,看起来好像你在同一时间作两件事情,实际你只是在两项事情中相互切换,当你在等水开的时候你就能够去写书,但你切菜时你要暂停写做。这就叫作concurrency。惟一一种使用parallel作这两项工做的办法就是得有两我的,一我的写做,一我的作饭,这就是多核CPU的工做方式了。promise

alt

为何asyncio

异步编程容许你在单个线程中并发执行代码。对比多线程处理方式,该方式由你来决定如何由一个任务切换到另外一个任务,tasks之间共享数据也更加容易和简单。session

def queue_push_back(x):
        if len(list) < max_size:
            list.append(x)

若是咱们在多线程执行上面的额代码,有可能第二行代码在同一时间被执行,那么同一时间就有两个元素被加入到列表中,那实际列表长度就会操做max_size多线程

另外一个异步编程的好处是内存使用。每次一个新的线程建立,也须要开辟一些新内存用来进行上下文切换。若是使用了异步编程,这在单线程中就不存在该问题。并发

如何在Python编程async代码

Asyncio包含三个主要组件:coroutine, event loop和futureapp

Coroutine

coroutine是异步函数,经过在函数定义def前使用async关键字。dom

async def my_task(args):
    pass

my_coroutine = my_task(args)

咱们使用了async关键字定义了一个函数,该函数并无执行,返回了一个coroutine对象。异步

有两种从一个coroutine中获取异步函数的结果async

第一种使用await关键字,仅只能在async函数中用来等待coroutine结束返回结果

result = await my_task(args)

第二种是将它加入到event loop中,接下来咱们作详尽讨论。

Event loop

event loop对象负责执行异步代码以及决定异步函数如何进行切换。在建立了event loop后,咱们就能够添加多个coroutines给它,coroutines将会调用了run_until_complete或者run_forever执行。

# create loop
loop = asyncio.new_event_loop()
# add coroutine to the loop
future = loop.create_task(my_coroutine)
# stop the program and execute all coroutine added
# to the loop concurrently
loop.run_until_complete(future)
loop.close()

Future

future相似一个占位对象用来存放异步函数结果,提供函数状态。当coroutine添加到event lop时建立future.有两种方式建立:

future1 = loop.create_task(my_coroutine)
# or
future2 = asyncio.ensure_future(my_coroutine)

第一个方法是增长一个coroutine到loop中,返回一个task,它是future的子类。第二种方法很是相似,它接收一个coroutine,并加入到了默认loop中,惟一的区别是,它也能够接收一个future参数,它将不会作任何事情,直接将futrue返回。

一个简单的程序

import asyncio

async def my_task(args):
    pass

def main():
    loop = asyncio.new_event_loop()
    coroutine1 = my_task()
    coroutine2 = my_task()
    task1 = loop.create_task(coroutine1)
    task2 = loop.create_task(coroutine2)
    loop.run_until_complete(asycnio.wait([task1, task2]))
    print('task1 result:', task1.result())
    print('task2 result:', task2.result())
    loop.close()

就让如你所看见的,咱们在执行异步函数前须要先创一个coroutine,而后咱们将建立future/task,把它添加到event loop。到如今病没有如何的异步函数被执行,只有当咱们调用loop.run_until_completed,event loop开始执行全部的经过loop.createt_task或者asyncio.ensure_future添加的coroutines。loop.run_until_completed将会阻塞应用程序,仅当全部的future执行完毕后。在本例中,咱们使用asyncio.wait()建立future,当传递的全部future执行完后咱们就获取到了future全部的结果。

异步函数

有一件事须要注意的是在Python中使用async声明的函数并不意味着函数会并发执行。若是使用一个普通函数,在前面加入async关键字,event loop并不会中断你的函数去执行另外一个coroutine。容许event loop进行切换coroutine至关简单,使用await关键字就会容许event loop能够切换其余注册到loop中的coroutine。

import asyncio

async def print_numbers_async1(n, prefix):
    for i in range(n):
        print(prefix, i)

async def print_numbers_async2(n, prefix):
    for i in range(n):
        print(prefix, i)
        if i % 5 == 0:
            await asyncio.sleep(0)
            
loop1 = asyncio.new_event_loop()
count1_1 = loop1.create_task(print_numbers_async1(10, 'c1_1'))
count2_1 = loop1.create_task(print_numbers_async1(10, 'c2_1'))
loop1.run_until_complete(asyncio.wait([count1_1, count2_1]))
loop1.close()

loop2 = asyncio.new_event_loop()
count1_2 = loop2.create_task(print_numbers_async2(10, 'c1_2'))
count2_2 = loop2.create_task(print_numbers_async2(10, 'c2_2'))
loop2.run_until_complete(asyncio.wait([count1_2, count2_2]))
loop2.close()

若是咱们执行该代码,咱们能够看到loop1将会在c1_1彻底执行完后才去执行c2_1。而在loop2每打印五个数值后就会进行切换。

真实案例

如今咱们Python中最进本的异步编程,如今让咱们写一个真实例子,咱们从互联网下载一系列页面,并打印出开头三行。

import aiohttp
import asyncio

async def print_preview(url):
    # connect to the server
    async with aiohttp.ClientSession() as session:
        # create get request
        async with session.get(url) as response:
            # wait for response
            response = await response.text()

            # print first 3 not empty lines
            count = 0
            lines = list(filter(lambda x: len(x) > 0, response.split('\n')))
            print('-'*80)
            for line in lines[:3]:
                print(line)
            print()

def print_all_pages():
    pages = [
        'http://textfiles.com/adventure/amforever.txt',
        'http://textfiles.com/adventure/ballyhoo.txt',
        'http://textfiles.com/adventure/bardstale.txt',
    ]

    tasks =  []
    loop = asyncio.new_event_loop()
    for page in pages:
        tasks.append(loop.create_task(print_preview(page)))

    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

if __name__ == "__main__":
    print_all_pages()

这里的代码也很容易理解,咱们使用异步函数下载URL,而且打印了前三行。而后咱们建立了一个函数用来构建一个页面了列表,交给print_preview去执行,将coroutine加入到loop,把future放到了一个列表中 ,咱们执行event loop,在全部coroutine执行完后程序结束。

异步生成器

最后我想谈谈的是异步生成器。要实现一个异步生成器至关简单:

import asyncio
import math
import random

async def is_prime(n):
    if n < 2:
        return True

    for i in range(2, n):
        await asyncio.sleep(0)
        if n % i == 0:
            return False

    return True


async def prime_generator(n_prime):
    counter = 0
    n = 0
    while counter < n_prime:
        n += 1
        prime = await is_prime(n)
        if prime:
            yield n
            counter += 1
    
async def check_email(limit):
    for i in range(limit):
        if random.random() > 0.8:
            print('1 new email')
        else:
            print('0 new email')
        await asyncio.sleep(2)

async def print_prime(n):
    async for prime in prime_generator(n):
        print('new prime number found:', prime)


def main():
    loop = asyncio.new_event_loop()
    prime = loop.create_task(print_prime(3000))
    email = loop.create_task(check_email(10))
    loop.run_until_complete(asyncio.wait([prime, email]))
    loop.close()

if __name__ == "__main__":
    main()

异常处理

在coroutine内部抛出异常时并不会中断应用程序,若是你没有处理异常的话你将看到相似以下错误:

Task exception was never retrieved

有两个方法来修正,在获取future结果时捕获异常,或者在future调用exception方法.

深刻了解

如今你已经了解如何使用asyncio编写并发代码,若是你想深刻了解的话,查看官方文档。

相关文章
相关标签/搜索