Python 协程(Coroutine)体验

概述

本文经过运行一段Python小程序,模拟一个真实的任务。比较在多线程(Multi-thread)和在多协程(Coroutine)环境下的编程实现。发现和解释一些有趣的现象。以期为你们带来一些对协程的直观感觉,加深对这种新鲜事物的理解。python

Python中的协程

协程(coroutine)是一个有很长历史的概念,它是计算机程序的一类组件,推广了协做式多任务的子程序。其详细的概念和历史请参照维基百科中的条目:https://en.wikipedia.org/wiki...
Python天生支持的生成器(generator)其实就是协程的一种实现,生成器容许执行被挂起与被恢复。可是因为缺少更多语法上的支持,以及缺少利用生成器实现异步编程的成熟模式,限制了生成器做为协程参与协做式多任务编程的用途。不过如今状况发生的改变,Python自3.6版本开始添加了async/await的语法直接支持协程的异步编程,同时在asyncio库中提供了协程编程的接口以及必要的基础实现。社区也在不断努力为现有的IO库提供异步的版本以便用于协程开发环境,例如http client目前至少在aiohttp以及tornado中都提供了可用于协程的异步版本。咱们知道IO操做天生是异步的,为了适应普遍应用的同步编程模式,不少的IO库都采用阻塞调用者的方式来实现同步。这样虽然简化了编程,可也带来的并行度不高的问题。在一些有大量耗时IO操做的环境里,应用不得不忍受串行操做形成的漫长等待,或是转向多进程(Multi-Process)多线程编程以期提升并行程度。而多进程多线程编程又会引入争用、通信,同步、保护等棘手的问题。并且咱们知道即便是做为轻量级的线程也会对应一个独立的运行栈。线程的调度和切换不可避免地包括运行栈的切换和加载。若是在一个进程中有成百上千的线程,那么相应的调度开销会急剧上升到难以忍受的程度。并且线程之间的同步和互锁也将成为一个噩梦。除去boss级别的死锁问题,其余任何的bug或是缺陷在多线程环境下都难于重现和追踪,这是由于线程的调度有很大的随机性。编程

一个Python小程序

下面是一个Python的小程序,能够在Python3.8或者更新的版本上运行。小程序

import threading
import time
import asyncio


def gen():
    s = 0
    while s < 1000:
        yield s
        s += 1


def unsafe_thread_worker(g):
    t = 0
    try: 
        while True:
            v = next(g)
            time.sleep(0.01)
            t += v
    except StopIteration:
        print(f" {t} ")


async def wrong_coroutine_worker(g):
    t = 0
    try: 
        while True:
            v = next(g)
            time.sleep(0.01)
            t += v
    except StopIteration:
        print(f" {t} ")
        

async def starter_with_wrong_workers():
    tasks = []
    for _ in range(10):
        task = asyncio.create_task(wrong_coroutine_worker(g))
        tasks.append(task)
    await asyncio.gather(*tasks)
    

async def right_coroutine_worker(g):
    t = 0
    try: 
        while True:
            v = next(g)
            await asyncio.sleep(0.01)
            t += v
    except StopIteration:
        print(f" {t} ")
        

async def starter_with_right_workers():
    tasks = []
    for _ in range(10):
        task = asyncio.create_task(right_coroutine_worker(g))
        tasks.append(task)
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    
    print('----------------- Sequence  -----------------')
    g = gen()
    started_at = time.monotonic()
    t = 0
    for v in g:
        time.sleep(0.01)
        t += v
    print(t)
    total_time = time.monotonic() - started_at
    print(f'total time consumed: {total_time:.2f} seconds')
    
    print('----------------- Unsafe threading  -----------------')
    g = gen()
    started_at = time.monotonic()
    threads =[]
    for _ in range(10):
        w = threading.Thread(target=unsafe_thread_worker, args=[g])
        w.start()
        threads.append(w)
    for w in threads:
        w.join()
    total_time = time.monotonic() - started_at
    print(f'total time consumed: {total_time:.2f} seconds')

    print('----------------- Async with wrong coroutine  -----------------')
    g = gen()
    started_at = time.monotonic()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(starter_with_wrong_workers())
    total_time = time.monotonic() - started_at
    print(f'total time consumed: {total_time:.2f} seconds')
            
    print('----------------- Async with right coroutine  -----------------')
    g = gen()
    started_at = time.monotonic()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(starter_with_right_workers())
    total_time = time.monotonic() - started_at
    print(f'total time consumed: {total_time:.2f} seconds')

一个典型的运行输出看起来像是这个样子的:服务器

----------------- Sequence  -----------------
499500
total time consumed: 10.53 seconds
----------------- Unsafe threading  -----------------
 49804  49609 

 50033 
 49682 
 49574 
 50005 
 50143 
 50069  50219 

 50362 
total time consumed: 1.09 seconds
----------------- Async with wrong coroutine  -----------------
 499500 
 0 
 0 
 0 
 0 
 0 
 0 
 0 
 0 
 0 
total time consumed: 10.55 seconds
----------------- Async with right coroutine  -----------------
 49500 
 49967 
 49973 
 50100 
 49965 
 50000 
 49968 
 49963 
 49964 
 50100 
total time consumed: 1.16 seconds

这个小程序实际上模拟了一个常见的真实任务。设想咱们经过一个http的数据API一页一页地获取一个比较大地数据集。每页数据经过一个带有页号或是起始位置的URL予以标识,而后经过向API服务器发送一个http request,并解析返回的http response中所包含的数据。其中的http访问显然是一个耗时的IO操做。返回数据的解析和处理是一个计算密集型的操做,相比IO等待,其消耗的时间不值一提。那个生成器gen能够看做是一个数据页面URL的生成器,也就是任务生成器。而后咱们使用sleep来模拟一个耗时的IO操做,使用加法来模拟数据的合并与分析。你也能够把这个小程序想象成为一个网络爬虫,咱们在一个全局的列表里保存了全部目标网站的地址,而后或串行或并行地访问全部地目标,取回咱们感兴趣的数据存储并合并分析。
总之,咱们有1000个比较独立的小任务。因为任务之间的没有依赖性,因此多个任务是能够并行执行的。每一个任务又分为领取并明确任务,获取数据(这是一个耗时0.01秒的IO操做),返回数据的存储和处理几个步骤。在一个任务内部,各个步骤间均有依赖,不能并行执行。
如今让咱们来看看主函数,其代码分为4段,分别对应了4种不一样得实现方法。方法一是最为传统的串行方式,经过一个简单的循环,一个一个地获取并完成任务,在一个任务完成后再领取下一个任务。不出所料,因为IO操做是主要的耗时操做,串行执行的时间等于每一个任务耗时的总和,0.01*1000 = 10秒。方法二使用了多线程,模拟了一个有10个线程的线程池,池中的每一个线程均独立地像方法一那样工做。因为全部的线程都是并行运行的,因此总的耗时几乎是串行方法的1/10。方法三使用了协程,也是模拟了一个有10个协程的协程池,但是因为使用了错误的IO操做,致使多个协程事实上不能并行执行,其总的耗时和方法一至关,咱们稍后会仔细分析比较。方法四修正了方法三的错误,使得协程可以并行运行,其总耗时与方法二至关。网络

相关文章
相关标签/搜索