理解 Python 中的异步编程

让咱们来写一些 Python 代码

你能够在这个 GitHub 仓库 下载全部的示例代码。python

这篇文章中的全部例子都已经在 Python 3.6.1 环境下测试过,并且在代码示例中的这个 requirements.txt 文件包含了运行全部这些测试所须要的模块。react

我强烈建议建立一个 Python 虚拟环境来运行这些代码,这样就不会和系统级别的 Python 产生耦合。git

示例 1:同步编程

第一个例子展现的是一种有些刻意设计的方式,即有一个任务先从队列中拉取"工做"以后再执行这个工做。在这种状况下,这个工做的内容只是获取一个数字,而后任务会把这个数字叠加起来。在每一个计数步骤中,它还打印了字符串代表该任务正在运行,而且在循环的最后还打印出了总的计数。咱们设计的部分即这个程序为多任务处理在队列中的工做提供了很天然的基础。程序员

"""
example_1.py

Just a short example showing synchronous running of 'tasks'
"""

import queue

def task(name, work_queue):
    if work_queue.empty():
        print(f'Task {name} nothing to do')
    else:
        while not work_queue.empty():
            count = work_queue.get()
            total = 0
            for x in range(count):
                print(f'Task {name} running')
                total += 1
            print(f'Task {name} total: {total}')


def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # create some tasks
    tasks = [
        (task, 'One', work_queue),
        (task, 'Two', work_queue)
    ]

    # run the tasks
    for t, n, q in tasks:
        t(n, q)

if __name__ == '__main__':
    main()

该程序中的"任务"就是一个函数,该函数能够接收一个字符串和一个队列做为参数。在执行时,它会去看队列里是否有任何须要处理的工做,若是有,它就会把值从队列中取出来,开启一个 for 循环来叠加这个计数值而且在最后打印出总数。它会一直这样运行直到队列里什么都没剩了才会结束离开。github

当咱们在执行这个任务时,咱们会获得一个列表代表任务一(即代码中的 task One)作了全部的工做。它内部的循环消费了队列里的所有工做,而且执行这些工做。当退出任务一的循环后,任务二(即代码中的 task Two)有机会运行,可是它会发现队列是空的,由于这个影响,该任务会打印一段语句以后退出。代码中并无任何地方可让任务一和任务二协做的很好而且能够在它们之间切换。web

示例 2: 简单的协做并发

程序(example_2.py)的下个版本经过使用生成器增长了两个任务能够跟好相互协做的能力。在任务函数中添加 yield 语句意味着循环会在执行到这个语句时退出,可是仍然保留当时的上下文,这样以后就能够恢复先前的循环。在程序后面 "run the tasks" 的循坏中当 t.next() 被调用时就能够利用这个。这条语句会在以前生成(即调用 yield 的语句处)的地方从新开始以前的任务。编程

这是一种协做并发的方式。这个程序会让出对它当前上下文的控制,这样其它的任务就能够运行。在这种状况下,它容许咱们主要的 "run the tasks" 调度器能够运行任务函数的两个实例,每个实例都从相同的队列中消费工做。这种作法虽然聪明一些,可是为了和第一个示例达成一样结果的同时作了更多的工做。并发

"""
example_2.py

Just a short example demonstrating a simple state machine in Python
"""

import queue

def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        for x in range(count):
            print(f'Task {name} running')
            total += 1
            yield
        print(f'Task {name} total: {total}')

def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # create some tasks
    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]

    # run the tasks
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True


if __name__ == '__main__':
    main()

当程序运行时,输出代表任务一和任务二都在运行,它们都从队列里消耗工做而且处理它。这就是咱们想要的,两个任务都在处理工做,并且都是以处理从队列中的两个项目结束。可是再一次,须要作一点工做来实现这个结果。异步

这里的技巧在于使用 yield 语句,它将任务函数转变为生成器,来实现一个 "上下文切换"。这个程序使用这个上下文切换来运行任务的两个实例。async

示例 3:经过阻塞调用来协做并发

程序(example_3.py)的下个版本和上一个版本几乎彻底同样,除了在咱们任务循环体内添加了一个 time.sleep(1) 调用。这使任务循环中的每次迭代都添加了一秒的延迟。这个添加的延迟是为了模拟在咱们任务中出现缓慢 IO 操做的影响。

我还导入了一个简单的 Elapsed Time 类来处理报告中使用的开始时间/已用时间功能。

"""
example_3.py

Just a short example demonstraing a simple state machine in Python
However, this one has delays that affect it
"""

import time
import queue
from lib.elapsed_time import ET


def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        et = ET()
        for x in range(count):
            print(f'Task {name} running')
            time.sleep(1)
            total += 1
            yield
        print(f'Task {name} total: {total}')
        print(f'Task {name} total elapsed time: {et():.1f}')


def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)


    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]
    # run the scheduler to run the tasks
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print()
    print('Total elapsed time: {}'.format(et()))


if __name__ == '__main__':
    main()

当该程序运行时,输出代表任务一和任务二都在运行,消费从队列里来的工做并像以前那样处理它们。随着增长的模拟 IO 操做延迟,咱们发现咱们协做式的并发并无为咱们作任何事,延迟会中止整个程序的运行,而 CPU 就只会等待这个 IO 延迟的结束。

这就是异步文档中 ”阻塞代码“的确切含义。注意运行整个程序所须要的时间,你会发现这就是全部 IO 延迟的累积时间。这再次意味着经过这种方式运行程序并非胜利了。

示例 4:使用非阻塞调用来协做并发

程序(example_4.py)的下一个版本已经修改了很多代码。它在程序一开始就使用了 gevent 异步编程模块。该 模块以及另外一个叫作 monkey 的模块被导入了。

以后 monkey 模块一个叫作 patch_all() 的方法被调用。这个方法是用来干吗的呢?简单来讲它配置了这个应用程序,使其它全部包含阻塞(同步)代码的模块都会被打上"补丁",这样这些同步代码就会变成异步的。

就像大多数简单的解释同样,这个解释对你并无很大的帮助。在咱们示例代码中与之相关的就是 time.sleep(1)(咱们模拟的 IO 延迟)不会再"阻塞"整个程序。取而代之的是它让出程序的控制返回给系统。请注意,"example_3.py" 中的 "yield" 语句再也不存在,它如今已是 time.sleep(1) 函数调用内的一部分。

因此,若是 time.sleep(1) 已经被 gevent 打补丁来让出控制,那么这个控制又到哪里去了?使用 gevent 的一个做用是它会在程序中运行一个事件循环的线程。对于咱们的目的来讲,这个事件循环就像在 example_3.py 中 "run the tasks" 的循环。当 time.sleep(1) 的延迟结束时,它就会把控制返回给 time.sleep(1) 语句的下一条可执行语句。这样作的优势是 CPU 不会由于延迟被阻塞,而是能够有空闲去执行其它代码。

咱们 "run the tasks" 的循环已经再也不存在了,取而代之的是咱们的任务队列包含了两个对 gevent.spawn(...) 的调用。这两个调用会启动两个 gevent 线程(叫作 greenlet),它们是相互协做进行上下文切换的轻量级微线程,而不是像普通线程同样由系统切换上下文。

注意在咱们任务生成以后的 gevent.joinall(tasks) 调用。这条语句会让咱们的程序会一直等待任务一和任务二都完成。若是没有这个的话,咱们的程序将会继续执行后面打印的语句,可是实际上没有作任何事。

"""
example_4.py

Just a short example demonstrating a simple state machine in Python
However, this one has delays that affect it
"""

import gevent
from gevent import monkey
monkey.patch_all()

import time
import queue
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        count = work_queue.get()
        total = 0
        et = ET()
        for x in range(count):
            print(f'Task {name} running')
            time.sleep(1)
            total += 1
        print(f'Task {name} total: {total}')
        print(f'Task {name} total elapsed time: {et():.1f}')


def main():
    """
    This is the main entry point for the programWhen
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # run the tasks
    et = ET()
    tasks = [
        gevent.spawn(task, 'One', work_queue),
        gevent.spawn(task, 'Two', work_queue)
    ]
    gevent.joinall(tasks)
    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

当这个程序运行的时候,请注意任务一和任务二都在一样的时间开始,而后等待模拟的 IO 调用结束。这代表 time.sleep(1) 调用已经再也不阻塞,其它的工做也正在被作。

在程序结束时,看下总的运行时间你就会发现它其实是 example_3.py 运行时间的一半。如今咱们开始看到异步程序的优点了。

在并发运行两个或者多个事件能够经过非阻塞的方式来执行 IO 操做。经过使用 gevent greenlets 和控制上下文切换,咱们就能够在多个任务之间实现多路复用,这个实现并不会遇到太多麻烦。

示例 5:异步(阻塞)HTTP 下载

程序(example_5.py)的下一个版本有一点进步也有一点退步。这个程序如今处理的是有真正 IO 操做的工做,即向一个 URL 列表发起 HTTP 请求来获取页面内容,可是它仍然是以阻塞(同步)的方式运行的。

咱们修改了这个程序导入了很是棒的 requests 模块 来建立真实的 HTTP 请求,并且咱们把一份 URL 列表加入到队列中,而不是像以前同样只是数字。在这个任务中,咱们也没有再用计数器,而是使用 requests 模块来获取从队列里获得 URL 页面的内容,而且咱们打印了执行这个操做的时间。

"""
example_5.py

Just a short example demonstrating a simple state machine in Python
This version is doing actual work, downloading the contents of
URL's it gets from a queue
"""

import queue
import requests
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        url = work_queue.get()
        print(f'Task {name} getting URL: {url}')
        et = ET()
        requests.get(url)
        print(f'Task {name} got URL: {url}')
        print(f'Task {name} total elapsed time: {et():.1f}')
        yield


def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://shutterfly.com",
        "http://mypublisher.com",
        "http://facebook.com"
    ]:
        work_queue.put(url)

    tasks = [
        task('One', work_queue),
        task('Two', work_queue)
    ]
    # run the scheduler to run the tasks
    et = ET()
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

和这个程序以前版本同样,咱们使用一个 yield 关键字来把咱们的任务函数转换成生成器,而且为了让其余任务实例能够执行,咱们执行了一次上下文切换。

每一个任务都会从工做队列中获取到一个 URL,获取这个 URL 指向页面的内容而且报告获取这些内容花了多长时间。

和以前同样,这个 yield 关键字让咱们两个任务都能运行,可是由于这个程序是以同步的方式运行的,每一个 requests.get() 调用在获取到页面以前都会阻塞 CPU。注意在最后运行整个程序的总时间,这对于下一个示例会颇有意义。

示例 6:使用 gevent 实现异步(非阻塞)HTTP 下载

这个程序(example_6.py)的版本修改了先前的版本再次使用了 gevent 模块。记得 gevent 模块的 monkey.patch_all() 调用会修改以后的全部模块,这样这些模块的同步代码就会变成异步的,其中也包括 requests 模块。

如今的任务已经改为移除了对 yield 的调用,由于 requests.get(url) 调用已经不会再阻塞了,反而是执行一次上下文切换让出控制给 gevent 的事件循环。在 “run the task” 部分咱们使用 gevent 来产生两个任务生成器,以后使用 joinall() 来等待它们完成。

"""
example_6.py

Just a short example demonstrating a simple state machine in Python
This version is doing actual work, downloading the contents of
URL's it gets from a queue. It's also using gevent to get the
URL's in an asynchronous manner.
"""

import gevent
from gevent import monkey
monkey.patch_all()

import queue
import requests
from lib.elapsed_time import ET


def task(name, work_queue):
    while not work_queue.empty():
        url = work_queue.get()
        print(f'Task {name} getting URL: {url}')
        et = ET()
        requests.get(url)
        print(f'Task {name} got URL: {url}')
        print(f'Task {name} total elapsed time: {et():.1f}')

def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://shutterfly.com",
        "http://mypublisher.com",
        "http://facebook.com"
    ]:
        work_queue.put(url)

    # run the tasks
    et = ET()
    tasks = [
        gevent.spawn(task, 'One', work_queue),
        gevent.spawn(task, 'Two', work_queue)
    ]
    gevent.joinall(tasks)
    print()
    print(f'Total elapsed time: {et():.1f}')

if __name__ == '__main__':
    main()

在程序运行的最后,你能够看下总共的时间和获取每一个 URL 分别的时间。你将会看到总时间会少于 requests.get() 函数调用的累计时间。

这是由于这些函数调用是异步运行的,因此咱们能够同一时间发送多个请求,从而更好地发挥出 CPU的优点。

示例 7:使用 Twisted 实现异步(非阻塞)HTTP 下载

程序(example_7.py)的版本使用了 Twisted 模块 ,该模块本所作的质上和 gevent 模块同样,即以非阻塞的方式下载 URL 对应的内容。

Twisted是一个很是强大的系统,采用了和 gevent 根本上不同的方式来建立异步程序。gevent 模块是修改其模块使它们的同步代码变成异步,Twisted 提供了它本身的函数和方法来达到一样的结果。

以前在 example_6.py 中使用被打补丁的 requests.get(url) 调用来获取 URL 内容的位置,如今咱们使用 Twisted 函数 getPage(url)

在这个版本中,@defer.inlineCallbacks 函数装饰器和语句 yield getPage(url) 一块儿实现把上下文切换到 Twisted 的事件循环。

在 gevent 中这个事件循环是隐含的,可是在 Twisted 中,事件循环由位于程序底部的 reactor.run() 明确提供。

"""
example_7.py

Just a short example demonstrating a simple state machine in Python
This version is doing actual work, downloading the contents of
URL's it gets from a work_queue. This version uses the Twisted
framework to provide the concurrency
"""

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor, task

import queue
from lib.elapsed_time import ET


@defer.inlineCallbacks
def my_task(name, work_queue):
    try:
        while not work_queue.empty():
            url = work_queue.get()
            print(f'Task {name} getting URL: {url}')
            et = ET()
            yield getPage(url)
            print(f'Task {name} got URL: {url}')
            print(f'Task {name} total elapsed time: {et():.1f}')
    except Exception as e:
        print(str(e))


def main():
    """
    This is the main entry point for the program
    """
    # create the work_queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the work_queue
    for url in [
        b"http://google.com",
        b"http://yahoo.com",
        b"http://linkedin.com",
        b"http://shutterfly.com",
        b"http://mypublisher.com",
        b"http://facebook.com"
    ]:
        work_queue.put(url)

    # run the tasks
    et = ET()
    defer.DeferredList([
        task.deferLater(reactor, 0, my_task, 'One', work_queue),
        task.deferLater(reactor, 0, my_task, 'Two', work_queue)
    ]).addCallback(lambda _: reactor.stop())

    # run the event loop
    reactor.run()

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

注意最后的结果和 gevent 版本同样,整个程序运行的时间会小于获取每一个 URL 内容的累计时间。

示例8:使用 Twisted 回调函数实现异步(非阻塞)HTTP 下载

程序 (example_8.py)的这个版本也是使用 Twisted 库,可是是以更传统的方式使用 Twisted。

这里个人意思是再也不使用 @defer.inlineCallbacks / yield 这种代码风格,这个版本会使用明确的回调函数。一个"回调函数"是一个被传递给系统的函数,该函数能够在以后的事件响应中被调用。在下面的例子中,success_callback() 被提供给 Twisted,用来在 getPage(url) 调用完成后被调用。

注意在这个程序中 @defer.inlineCallbacks 装饰器并无在 my_task() 函数中使用。除此以外,这个函数产出一个叫作 d 的变量,该变量是延后调用的缩写,是调用函数 getPage(url) 获得的返回值。

延后是 Twisted 处理异步编程的方式,回调函数就附加在其之上。当这个延后"触发"(即当 getPage(url) 完成时),会以回调函数被附加时定义的变量做为参数,来调用这个回调函数。

"""
example_8.py

Just a short example demonstrating a simple state machine in Python
This version is doing actual work, downloading the contents of
URL's it gets from a queue. This version uses the Twisted
framework to provide the concurrency
"""

from twisted.internet import defer
from twisted.web.client import getPage
from twisted.internet import reactor, task

import queue
from lib.elapsed_time import ET


def success_callback(results, name, url, et):
    print(f'Task {name} got URL: {url}')
    print(f'Task {name} total elapsed time: {et():.1f}')


def my_task(name, queue):
    if not queue.empty():
        while not queue.empty():
            url = queue.get()
            print(f'Task {name} getting URL: {url}')
            et = ET()
            d = getPage(url)
            d.addCallback(success_callback, name, url, et)
            yield d


def main():
    """
    This is the main entry point for the program
    """
    # create the queue of 'work'
    work_queue = queue.Queue()

    # put some 'work' in the queue
    for url in [
        b"http://google.com",
        b"http://yahoo.com",
        b"http://linkedin.com",
        b"http://shutterfly.com",
        b"http://mypublisher.com",
        b"http://facebook.com"
    ]:
        work_queue.put(url)

    # run the tasks
    et = ET()

    # create cooperator
    coop = task.Cooperator()

    defer.DeferredList([
        coop.coiterate(my_task('One', work_queue)),
        coop.coiterate(my_task('Two', work_queue)),
    ]).addCallback(lambda _: reactor.stop())

    # run the event loop
    reactor.run()

    print()
    print(f'Total elapsed time: {et():.1f}')


if __name__ == '__main__':
    main()

运行这个程序的最终结果和先前的两个示例同样,运行程序的总时间小于获取 URLs 内容的总时间。

不管你使用 gevent 仍是 Twisted,这只是我的的喜爱和代码风格问题。这两个都是强大的库,提供了让程序员能够编写异步代码的机制

相关文章
相关标签/搜索