Python并行编程(十四):异步编程

一、基本概念程序员

  除了顺序执行和并行执行的模型之外,还有异步模型,这是事件驱动模型的基础。异步活动的执行模型能够只有一个单一的主控制流,能在单核心系统和多核心系统中运行。编程

  在并发执行的异步模型中,许多任务被穿插在同一时间线上,全部的任务都由一个控制流执行(单一线程)。任务的执行可能被暂停或恢复,中间的这段时间线程将会执行其余任务。大体以下:多线程

  

  如上图所示,任务(不一样的颜色表示不一样的任务)可能被其余任务插入,可是都处在同一个线程下。这代表当某一个任务执行的时候,其余任务都暂停了。与多线程编程模型很大的一点不一样是,多线程的某个任务在时间线上何时挂起某个活动或恢复某个活动由系统决定,而在异步中,程序员必须假设线程可能在任什么时候间被挂起和替换。并发

  程序员能够将任务编写成许多能够间隔执行的小步骤,若是一个任务须要另外一个任务的输出,那么被依赖的任务必须接收它的输入。dom

二、使用Python的concurrent.futures模块异步

  这个模块具备线程池和进程池、管理并行编程任务、处理非肯定性的执行流程、进程/线程同步等功能。async

  此模块由一下部分组成:函数

  - concurrent.futures.Executor:这是一个虚拟基类,提供了异步执行的方法。oop

  - submit(function, argument):调度函数(可调用的对象)的执行,将argument做为参数传入。性能

  - map(function, argument):将argument做为参数执行函数,以异步的方式。

  - shutdown(Wait=True):发出让执行者释放全部资源的信号。

  - concurrent.futures.Future:其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。

  Executor是抽象类,能够经过子类访问,即线程或进程的ExecutorPools。由于线程或进程的实例是依赖于资源的任务,因此最好以池的形式将他们组织在一块儿,做为能够重用的launcher和executor。

  线程池和进程池是用于在程序中优化和简化线程/进程的使用。经过池能够提交任务给executor。池由两部分组成,一部分是内部的队列,存放着待执行的任务;另外一部分是一系列的进程或线程,用于执行这些任务。池的概念主要目的是为了重用:让线程或进程在生命周期内能够屡次使用。他减小了建立线程和进程的开销,提升了程序性能。重用不是必须的规则,但它是程序员在应用中使用池的主要缘由。

  

  current.Futures提供了两种Executor的子类,各自独立操做一个线程池和一个进程池。这两个子类分别是:

  - concurrent.futures.ThreadPoolExecutor(max_workers)

  - concurrent.futures.ProcessPoolExecutor(max_workers)

  max_workers参数表示最多有多少个worker并行执行任务

  代码测试:

import concurrent.futures
import time

number_list = [1,2,3,4,5,6,7,8,9,10]

def evaluate_item(x):
    #For time consuming
    result_item = count(x)
    return result_item

def count(number):
    for i in range(0, 10000000):
        i = i + 1
    return i * number

if __name__ == "__main__":
    # Sequential execution
    start_time = time.time()
    for item in number_list:
        print(evaluate_item(item))
    print("Sequential execution in %s seconds" %(str(time.time() - start_time)))
    # Thread pool execution
    start_time_1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(evaluate_item, item) for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Thread pool execution in %s seconds"  %(str(time.time() - start_time_1)))

    # Process pool execution
    start_time_2 = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(evaluate_item, item) for item in number_list]
    print("Process pool execution in %s seconds" %(str(time.time() - start_time_2)))

  运行结果:

10000000
20000000
30000000
40000000
50000000
60000000
70000000
80000000
90000000
100000000
Sequential execution in 8.975373029708862 seconds
10000000
20000000
30000000
40000000
60000000
70000000
50000000
80000000
90000000
100000000
Thread pool execution in 8.699156045913696 seconds
Process pool execution in 5.916198968887329 seconds

  建立一个list存放10个数字,而后使用一个循环计算从1加到10000000,打印出和与number_list的乘积。

number_list = [1,2,3,4,5,6,7,8,9,10]

def evaluate_item(x):
    #For time consuming
    result_item = count(x)
    return result_item

def count(number):
    for i in range(0, 10000000):
        i = i + 1
    return i * number

  在主程序中,首先顺序执行了一次程序并打印其执行时间:

start_time = time.time()
    for item in number_list:
        print(evaluate_item(item))
    print("Sequential execution in %s seconds" %(str(time.time() - start_time)))

  其次使用futures.ThreadPoolExecutor模块的线程池并打印其时间:

start_time_1 = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(evaluate_item, item) for item in number_list]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())
    print("Thread pool execution in %s seconds"  %(str(time.time() - start_time_1)))

  ThreadPoolExecutor使用线程池中的一个线程执行给定任务。池中一共有5个线程,每个线程从池中取得一个任务而后执行它,当任务执行完成,再从池中拿到另外一个任务。

  最后是使用进程池:

start_time_2 = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(evaluate_item, item) for item in number_list]
    print("Process pool execution in %s seconds" %(str(time.time() - start_time_2)))

  和ThreadPoolExecutor同样,ProcessPoolExecutor是一个executor,使用一个线程池来并行执行任务。由于ProcessPoolExecutor使用了多核处理的模块,让咱们能够不受GIL的限制,大大缩短执行时间。

  几乎全部须要处理多个客户端请求的服务应用都会使用池。也有应用要求须要当即执行,或者要求对任务的线程有更多的控制器,这种状况下,池不是一个最佳选择。

三、使用Asyncio管理事件循环

  先入为主:

import asyncio
import datetime
import time

def function_1(end_time, loop):
    print("function_1 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_2, end_time, loop)
    else:
        loop.stop()

def function_2(end_time, loop):
    print("function_2 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_3, end_time, loop)
    else:
        loop.stop()

def function_3(end_time, loop):
    print("function_3 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_1, end_time, loop)
    else:
        loop.stop()

def function_4(end_time, loop):
    print("function_4 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_4, end_time, loop)
    else:
        loop.stop()

loop = asyncio.get_event_loop()
print(loop.time())
end_loop = loop.time() + 9.0
print(end_loop)
loop.call_soon(function_1, end_loop, loop)
#loop.call_soon(function_4, end_loop, loop)
loop.run_forever()
loop.close()

  执行结果:

  

  上述例子定义了三个异步任务,相继执行,如图所示:

  

  首先,咱们要获得这个事件循环:

loop = asyncio.get_event_loop()

  而后咱们经过call_soon方法调用了function_1()函数。

end_loop = loop.time() + 9.0
loop.call_soon(function_1, end_loop, loop)

  function_1:

def function_1(end_time, loop):
    print("function_1 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_2, end_time, loop)
    else:
        loop.stop()

  - end_time定义了function_1能够运行的最长时间,并经过call_later方法传入到function_2中做为参数

  - loop经过get_event_loop()方法获得的事件循环

  任务执行结束以后,它会比较loop.time() + 1s和设定的运行时间,若是没有超过,使用call_later在1秒以后执行function_2(),function_2和3做用相似

  若是运行时间超过了设定,时间循环终止。

  概念解释:

  Python的Asyncio模块提供了管理事件、协程、任务和线程方法,以及编写并发代码的原语。主要组件和概念包括:

  - 事件循环:在Asyncio模块中,每个进程都有一个事件循环。

  - 协程:这是子程序的泛化概念。协程能够在执行期间暂停,这样就能够等待外部的处理(例如IO)完成以后,从以前暂停的地方恢复执行。

  - Futures:定义了Future对象,和concurrent.futures模块同样,表示还没有完成的计算。

  - Tasks:这是Asyncio的子类,用于封装和管理并行模式下的协程。

  事件循环:

    在计算机系统中,能够产生事件的实体叫作事件源,能处理事件的实体叫作事件处理者,还有一些第三方实体叫作事件循环。它的做用是管理全部的事件,在整个程序运行过程当中不断循环执行,追踪事件发生的顺序将他们放到队列中,当主线程空闲的时候,调用相应的事件处理者处理事件。

   Asyncio管理事件循环的方法:

  - loop = get_event_loop():获得当前上下文的事件循环。

  - loop.call_later(time_delay, callback, argument):延后time_delay秒再执行callback方法。

  - loop.call_soon(callback, argument):尽量快的调用callback。call_soon()函数结束,主线程回到事件循环以后就会立刻调用callback。

  - loop.time():以float类型返回当前时间循环的内部时间。

  - asyncio.set_event_loop():为当前上下文设置时间循环。

  - asyncio.new_event_loop():根据此策略建立一个新的时间循环并返回。

  - loop.run_forever():在调用stop()以前将一直运行。run_forever真正开始执行函数。

 

四、使用Asyncio管理协程

  上述例子中一个程序变得很大并且复杂时,将其划分为子程序,每一部分实现特定的任务。子程序不能单独执行,只能在主程序的请求下执行,主程序负责协调使用各个子程序。协程是子程序的泛化,和子程序同样的是,协程只负责计算任务的一步;不一样的是协程没有主程序来进行调度。由于协程经过管道链接在一块儿,没有监视函数负责顺序调用他们。在协程中,执行点能够被挂起,能够被以前挂起的点恢复执行。经过协程池就能够插入到计算中:运行第一个任务,直到它返回yield执行权,而后运行下一个,这样顺着执行下去。

  这种插入的控制组件就是前文提到的事件循环,它持续追踪全部的协程并执行它们。

  协程的另一些重要特性以下:

  - 协程能够有多个入口点,并能够yield屡次

  - 协程能够将执行权交给其余协程

  yield表示协程在此暂停,而且将执行权交给其余协程,由于协程能够将值与控制权一块儿传递给另外一个协程,因此yield一个值就表示将值传给下一个执行的协程。

  测试用例:

import asyncio
import time
from random import randint

@asyncio.coroutine
def StartState():
    print("Start State called \n")
    input_value = randint(0,1)
    time.sleep(1)
    print("I am StartState.input_value is %s" %input_value)
    if (input_value == 0):
        result = yield from State2(input_value)
    else:
        result = yield from State1(input_value)
    print("Resume of the Transition : \nStart State calling %s" %result)

@asyncio.coroutine
def State1(transition_value):
    outputValue = str("State 1 with transition value = %s \n" %transition_value)
    input_value = randint(0,1)
    time.sleep(1)
    print("...Evaluation...")
    print("I am State1.input_value is %s" %input_value)
    if input_value == 0:
        result = yield from State3(input_value)
    else:
        result = yield from State2(input_value)
    result = "State 1 calling %s" %result
    return outputValue + str(result)

@asyncio.coroutine
def State2(transition_value):
    outputValue = str("State 2 with transition value = %s \n" %transition_value)
    input_value = randint(0,1)
    time.sleep(1)
    print("...Evaluation...")
    print("I am State2.input_value is %s" %input_value)
    if input_value == 0:
        result = yield from State1(input_value)
    else:
        result = yield from State3(input_value)
    result = "State 2 calling %s" %result
    return outputValue + str(result)

@asyncio.coroutine
def State3(transition_value):
    outputValue = str("State 3 with transition value = %s \n" %transition_value)
    input_value = randint(0,1)
    time.sleep(1)
    print("...Evaluation...")
    print("I am State3.input_value is %s" %input_value)
    if input_value == 0:
        result = yield from State1(input_value)
    else:
        result = yield from EndState(input_value)
    result = "State 1 calling %s" %result
    return outputValue + str(result)

@asyncio.coroutine
def EndState(transition_value):
    outputValue = str("End State with transition value = %s \n" %transition_value)
    print("I am EndState.outputValue is %s" %outputValue)
    print("...Stop Computation...")

    return outputValue

if __name__ == "__main__":
    print("Finite State Machine simulation with Asyncio Coroutine")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(StartState())

  上述代码为使用Asyncio的协程来模拟有限状态机(一个数学模型,不只在工程领域应用普遍,在科学领域也很著名)。模拟的状态机以下:

  

  系统有四个状态,0和1是状态机能够从一个状态到另外一个状态的值,这个过程叫转换。

  运行结果(结果不惟一):

  

  每个状态都由一个装饰器装饰:@asyncio.coroutine

  经过yield from命令调用下一个协程。

  启动事件循环:

if __name__ == "__main__":
    print("Finite State Machine simulation with Asyncio Coroutine")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(StartState())

 

五、使用Asyncio控制任务

  Asyncio是用来处理事件循环中的异步进程和并发任务执行的。它还提供了asyncio.Task()类,能够在任务中使用协程。它的做用是在同一事件循环中,运行某一个任务的同时能够并发地运行多个任务。当协程被包在任务中,它会自动将任务和事件循环链接起来,当事件循环启动的时候,任务自动运行。这样就提供了一个能够自动驱动协程的机制。

  Asyncio模块为咱们提供了asyncio.Task(coroutine)方法来处理计算任务,它能够调度协程的执行。任务对协程对象在事件循环的执行负责。若是被包裹的协程要从future yield,那么任务会被挂起,等待future的计算结果。

  当future计算完成,被包裹的协程将会拿到future返回的结果或异常(exception)继续执行。另外,须要注意的是事件循环一次只能运行一个任务,除非还有其它事件循环在不一样的线程并行运行,此任务才有可能和其余任务并行。当一个任务在等待future执行的期间,事件循环会运行一个新的任务。

  测试用例:

import asyncio

@asyncio.coroutine
def factorial(number):
    f = 1
    for i in range(2, number + 1):
        print("Asyncio.Task: Compute factorial(%s)" %i)
        yield from asyncio.sleep(0.5)
        f *= i
    print("Asyncio.Task - factorial(%s) = %s" %(number, f))

@asyncio.coroutine
def fibonacci(number):
    a,b = 0,1
    for i in range(number):
        print("Asyncio.Task: Compute fibonacci(%s)" %i)
        yield from asyncio.sleep(0.5)
        a, b = b, a+b
    print("Asyncio.Task - fibonacci(%s) = %s" %(number, a))

@asyncio.coroutine
def binomialCoeff(n, k):
    result = 1
    for i in range(1, k+1):
        result = result * (n-i+1)/i
        print("Asyncio.Task:Compute binomialCoeff(%s)" %i)
        yield from asyncio.sleep(0.5)
    print("Asyncio.Task - binomialCoeff(%s, %s) = %s" %(n, k, result))

if __name__ == "__main__":
    tasks = [asyncio.Task(factorial(10)), asyncio.Task(fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

  执行结果:

Asyncio.Task: Compute factorial(2)
Asyncio.Task: Compute fibonacci(0)
Asyncio.Task:Compute binomialCoeff(1)
Asyncio.Task: Compute factorial(3)
Asyncio.Task: Compute fibonacci(1)
Asyncio.Task:Compute binomialCoeff(2)
Asyncio.Task: Compute factorial(4)
Asyncio.Task: Compute fibonacci(2)
Asyncio.Task:Compute binomialCoeff(3)
Asyncio.Task: Compute factorial(5)
Asyncio.Task: Compute fibonacci(3)
Asyncio.Task:Compute binomialCoeff(4)
Asyncio.Task: Compute factorial(6)
Asyncio.Task: Compute fibonacci(4)
Asyncio.Task:Compute binomialCoeff(5)
Asyncio.Task: Compute factorial(7)
Asyncio.Task: Compute fibonacci(5)
Asyncio.Task:Compute binomialCoeff(6)
Asyncio.Task: Compute factorial(8)
Asyncio.Task: Compute fibonacci(6)
Asyncio.Task:Compute binomialCoeff(7)
Asyncio.Task: Compute factorial(9)
Asyncio.Task: Compute fibonacci(7)
Asyncio.Task:Compute binomialCoeff(8)
Asyncio.Task: Compute factorial(10)
Asyncio.Task: Compute fibonacci(8)
Asyncio.Task:Compute binomialCoeff(9)
Asyncio.Task - factorial(10) = 3628800
Asyncio.Task: Compute fibonacci(9)
Asyncio.Task:Compute binomialCoeff(10)
Asyncio.Task - fibonacci(10) = 55
Asyncio.Task - binomialCoeff(20, 10) = 184756.0

  上述例子定义了三个线程,factorial,fibonacci,binomialCoeff,每个都带有asyncio.coroutine装饰器:

  将三个task放入到一个list中:

tasks = [asyncio.Task(factorial(10)), asyncio.Task(fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))]

  经过run_until_complete并行运行三个协程,asyncio.wait(tasks)表示运行直到全部给定的协程都完成。

  最后关闭事件循环:loop.close()

 

六、使用Asyncio和Futures

  Asyncio模块的另外一个重要的组件是Futures。它和concurrent.futures.Futures很像,可是针对Asyncio的事件循环作了不少定制。asyncio.Futures类表明还未完成的结果,有多是一个Exception,因此综合来讲,它是一种抽象的表明尚未作完的事情。

  实际上,必须处理一些结果的回调函数被加入到了这个类的实例中。

  基本方法:

  - cancel():取消future的执行,调度回调函数

  - result():返回future表明的结果

  - exception():返回future中的Exception

  - add_done_callback(fn):添加一个回调函数,当future执行的时候会调用这个回调函数

  - remove_done_callback(fn):从call when done列表中移除全部的callback的实例

  - set_result(result):将future标为执行完成,而且设置result的值

  - set_exception(exception):将future标为执行完成,并设置Exception

  测试用例:

# coding : utf-8
import asyncio
import sys

@asyncio.coroutine
def first_coroutine(future, n):
    # 计算前n个数的和
    count = 0
    for i in range(1, n+1):
        count = count + i
    print("first yield")
    yield from asyncio.sleep(2)
    print("first_coroutine finished")
    # 将future标记为已完成,并设置result的值
    future.set_result("first coroutine (sum of n integers) result = %s" %str(count))

@asyncio.coroutine
def second_coroutine(future, n):
    count = 1
    for i in range(2, n+1):
        count *= i
    print("second yield")
    yield from asyncio.sleep(1)
    print("second_coroutine finished")
    future.set_result("second coroutine (factorial) result = %s" %str(count))

def got_result(future):
    # 获取future的set_result结果
    print(future.result())

if __name__ == "__main__":
    N1 = int(sys.argv[1])
    N2 = int(sys.argv[2])
    loop = asyncio.get_event_loop()
    future1 = asyncio.Future()
    future2 = asyncio.Future()
    tasks = [first_coroutine(future1, N1), second_coroutine(future2, N2)]

    # 添加回调函数
    future1.add_done_callback(got_result)
    future2.add_done_callback(got_result)
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

  运行结果:

  

相关文章
相关标签/搜索