asyncio协程与并发

并发编程

Python的并发实现有三种方法。python

  1. 多线程
  2. 多进程
  3. 协程(生成器)

基本概念

串行:同时只能执行单个任务
并行:同时执行多个任务数据库

在Python中,虽然严格说来多线程与协程都是串行的,但其效率高,在遇到阻塞时会将阻塞任务交给系统执行,经过合理调度任务,使得程序高效。编程

最高效的固然是多进程了,但因为多进程依赖硬件配置,而且当任务量超过CPU核心数时,多进程会有进程上下文切换开销,而这个开销很大,因此不是最佳解决方案。安全

常见耗时场景

  1. CPU计算密集型
  2. 磁盘IO密集型
  3. 网络IO密集型

CPU计算密集型

多线程对比单线程,因为GIL的存在,切换线程须要不断加锁、释放锁,效率反而更低;多进程至关于多个CPU同时工做,所以效率很高。网络

IO密集型

IO密集型能够是磁盘IO、网络IO、数据库IO等,都属于计算量小,IO等待浪费高。越是IO等待时间长,则多线程的优点相比单线程越明显,多进程效率高但依赖配置资源。多线程

结论

单线程老是最慢的,多线程适合在IO密集型场景使用,多进程适合CPU计算要求高的场景下使用,多进程虽然老是最快的,但须要CPU资源支持。并发

多线程

Python建立多线程有两种方法。app

  1. 函数

用函数建立多线程

from threading import Thread


def func():
    for i in range(2):
        print('Hello world!')
        sleep(1)


th1 = Thread(target=func)
th1.start()
th2 = Thread(target=func)
th2.start()

用类建立多线程

这个类必须继承Thread,必须重载run()方法框架

from threading import Thread


class MyThread(Thread):
    def __init__(self):
        super().__init__()
        self.name = 'Bob'
    
    def run(self):
        for i in range(2):
            print('Hello world!')
            sleep(1)

th1 = MyThread()
th2 = MyThread()

th1.start()
th2.start()

经常使用方法

  • threading.Thread(target=func, args=())
    • start() # 启动子线程
    • join() # 阻塞子线程
    • is_alive()/isAlive() # 判断线程执行状态,正在执行返回True,不然False
    • daemon # 设置线程是否随主线程退出而退出,默认False
    • name # 设置线程名

线程锁

import threading


lock = threading.Lock()    # 生成锁,全局惟一

lock.acquire()    # 加锁

lock.release()    # 释放锁

加锁与解锁必须成对出现,或者使用上下文管理器with来管理锁。异步

可重入锁

在Redis分布式锁中提到过,用于让非阻塞线程重复得到锁来发送或读取数据,这里的可重入锁仅指让同一线程能够屡次获取锁。

import threading


rlock = threading.RLock()    # 生成可重入锁

死锁

死锁一般有两种。

  1. 同一线程内嵌套获取同一把锁,形成死锁(解决方案是用可重入锁)
  2. 多个线程不按顺序同时得到多个锁,形成死锁(解决方案一是靠编程人员人工识别,二是对锁排序)

GIL全局锁

多进程是真正的并行,而多线程是伪并行,实际是多个线程交替执行。

遇到GIL影响性能的状况,要么考虑用多进程替代多线程,要么更换Python解释器。

线程通讯

经常使用线程通讯方法。

  1. threading.Event
  2. threading.Condition
  3. queue.Queue

Event事件

import threading


event = threading.Event()

event.clear()    # 重置event,使全部该event事件都处于待命状态

event.wait()    # 等待接收event指令,决定是否阻塞程序执行

evnet.set()    # 发送event指令,全部该event事件的线程开始执行
import time

import threading


class MyThread(threading.Thread):
    def __init__(self, name, event):
        super().__init__()
        self.name = name
        self.event = event
    
    def run(self):
        self.event.wait()    # 等待event.set()才能执行下去
        time.sleep(1)
        print('{} Done'.format(self.name))

threads = []
event = threading.Event()

for i in range(5):
    threads.append(MyThread(event))

event.clear()    # 重置event,使event.wait()生效

for t in threads:
    t.start()

print('Waiting 3s')
time.sleep(3)

print('Awake all threads')
event.set()    # 发送event指令,全部绑定了event的线程开始执行

全部线程在调用start()方法后并不会执行完,而是在event.wait()处停住了,须要发送event.set()指令才能继续执行。

Condition状态

import threading


cond = threading.Condition()

cond.acquire()

cond.release()

cond.wait()    # 等待指令触发,同时临时释放锁,直到调用notify才从新占有锁

cond.notify()    # 发送指令

Condition与Event很相似,不过因为wait()notify()能够反复调用,所以通常做为编程人员可控调用锁来使用,放在run()方法下。

Queue队列

队列是线程安全的,经过put()get()方法来操做队列。

from queue import Queue

q = Queue(maxsize=0)    # 设置0表示无限长队列

q.get(timeout=0.5)    # 阻塞程序,等待队列消息,能够设置超时时间

q.put()    # 发送消息

q.join()    # 等待全部消息被消费完

# 不经常使用但要了解的方法
q.qsize()    # 返回消息个数
q.empty()    # 返回bool值,队列是否空
q.full()    # 返回bool值,队列是否满

Queue是FIFO队列,还有queue.LifoQueuequeue.PriorityQueue

线程隔离

两个线程的变量不能被相互访问。

一般使用threading.local类来实现,该类的实例是一个字典型对象,直接经过key-value形式存入变量,如threading.local().name = 'bob'

若是想要实现一个线程内的全局变量或实现线程间的信息隔离,就使用local类。

线程池

多线程并非越多越好,由于在切换线程时会切换上下文环境(固然相比多进程的开销要小的多),在量大时依然会形成CPU的开销。

所以出现了线程池的概念,即预先建立好合适数量的线程,使任务能马上使用。

经过concurrent.futures库的ThreadPoolExecutor类来实现。

import time
import threading
from concurrent.futures import ThreadPoolExecutor


def target():
    for i in range(5):
        print('{}-{}\n'.format(threading.get_ident(), i)
        time.sleep(1)

pool = ThreadPoolExecutor(5)    # 线程池数量限制为5

for i in range(100):
    pool.submit(target)    # 往线程中提交并运行

协程

学习协程,要先理解生成器,由于Python的协程是从生成器中诞生并演变到如今这个样子的。

可迭代、迭代器、生成器

可迭代对象,其类或元类都实现了__iter__()方法,而该方法返回一个对象支持迭代,既能够是string/list/tuple/dict等内置类型的对象,也能够是本身写的对象(这个对象的类实现了遍历元素的__iter__方法)。

迭代器对象,可迭代对象是迭代器的基础,迭代器只是比可迭代对象多了一个__next__()方法,这个方法让咱们能够再也不用for循环来获取元素。

生成器对象,在迭代器的基础上,实现了yield,至关于函数中的return,在每次for循环遍历或调用next()时,都会返回一个值并阻塞等待下一次调用。

可迭代对象、迭代器都是将全部元素放在内存里,而生成器则是须要时临时生成元素,因此生成器节省时间、空间。

如何运行/激活生成器

两个方法。

  1. next()
  2. send(None)

这两个方法是等价的,但因为send方法能够传值进去,因此在协程中大有用处。

生成器的执行状态

经过inspect库的getgeneratorstate方法获取状态信息。

  1. GEN_CREATED 等待开始执行
  2. GEN_RUNNING 解释器正在执行(只有多线程中才能看到)
  3. GEN_SUSPENDED 在yield表达式处暂停
  4. GEN_CLOSED 执行结束

生成器的异常

StopIteration

从生成器过渡到协程:yield

生成器引入了函数暂停执行(yield)功能,后来又引入了向暂停的生成器发送信息的功能(send),并以此催生了协程。

协程是为非抢占式多任务产生子程序的计算机程序组件,协程容许不一样入口点在不一样位置暂停或开始执行程序。

协程和线程有类似点,多个协程之间与线程同样,只会交叉串行执行;也有不一样点,线程之间要频繁切换,加锁、解锁,协程不须要。

协程经过yield暂停生成器,将程序的执行流程交给其它子程序,从而实现不一样子程序之间的交替执行。

经过例子演示如何向生成器发送信息。

def func(n):
    index = 0
    while index < n:
        num = yield index    # 这里分红两部分,yield index将index return给外部程序, num = yield接受外部send的信息并赋值给num
        if num is None:
            num = 1
        index += num


f = func(5)
print(next(f))    # 0
print(f.send(2))    # 2
print(next(f))    # 3
print(f.send(-1))    # 2

yield from语法

从Python3.3才出现的语法。

yield from后面须要添加可迭代对象(迭代器、生成器固然知足要求)。

# 拼接一个可迭代对象
# 使用yield
astr = 'ABC'

alist = [1, 2, 3]

adict = dict(name='kct', age=18)

agen = (i for i in range(5))

def gen(*args):
    for item in args:
        for i in item:
            yield i

new_list = gen(astr, alist, adict, agen)

print("use yield:", list(new_list))

# 使用yield from

def gen(*args):
    for item in args:
        yield from item

new_flist = fgen(astr, alist, adict, agen)

print("use yield from:", list(new_flist))

能够看出,使用yield from能够直接从可迭代对象中yield全部元素,减小了一个for循环,代码更简洁,固然yield from不止作了这件事。

yield from后能够接生成器,以此造成生成器嵌套,yield from就帮咱们处理了各类异常,让咱们只需专心于业务代码便可。

具体讲解yield from前先了解几个概念:

  1. 调用函数:调用委托生成器的代码
  2. 委托生成器:包含yield from表达式的生成器函数
  3. 子生成器:yield from后接的生成器函数

举个例子,实时计算平均值

# 子生成器
def average_gen():
    total = 0
    count = 0
    average = 0
    while True:
        num = yield average
        count += 1
        total += num
        average = total/count

# 委托生成器
def proxy_gen():
    while True:
        yield from average_gen()

# 调用函数
def main():
    get_average = proxy_gen()
    next(get_average)    # 第一次调用不传值,让子生成器开始运行
    print(get_average.send(10))    # 10
    print(get_average.send(20))    # 15
    print(get_average.send(30))    # 20

委托生成器的做用是在调用函数与子生成器之间创建一个双向通讯通道,调用函数能够send消息给子生成器,子生成器yield值也是直接返回给调用函数。

有时会在yield from前做赋值操做,这是用于作结束操做,改造上面的例子。

# 子生成器
def average_gen():
    total = 0
    count = 0
    average = 0
    while True:
        num = yield average
        if num is None:
            break
        count += 1
        total += num
        average = total/count
    return total, count, average    # 当协程结束时,调用return

# 委托生成器
def proxy_gen():
    while True:
        total, count, average = yield from average_gen()    # 只有子生成器的协程结束了才会进行赋值,后面的语句才会执行
        print('Count for {} times, Total is {}, Average is {}'.format(count, total, average))

# 调用函数
def main():
    get_average = proxy_gen()
    next(get_average)    # 第一次调用不传值,让子生成器开始运行
    print(get_average.send(10))    # 10
    print(get_average.send(20))    # 15
    print(get_average.send(30))    # 20
    get_average.send(None)    # 结束协程,若是后面再调用send,将会另起一协程

为何不直接调用子生成器?

yield from作了全面的异常处理。直接调用子生成器,首先就要处理StopIteration异常,其次若子生成器不是协程生成器而是迭代器,则会有其它异常抛出,所以须要知道,委托生成器在这之中扮演着重要角色,不可忽略。

asyncio

asyncio是Python3.4引入的标准库,直接内置对异步IO的支持。

虽然学了yieldyield from,但仍是不知如何入手去作并发,asyncio则是为了提供这么个框架来精简复杂的代码操做。

如何定义建立协程

经过前面学习,咱们知道调用函数/委托生成器/子生成器这三剑客中,子生成器就是协程,那么asyncio如何来定义建立协程呢?

asyncio经过在函数定义前增长async关键字来定义协程对象,经过isinstance(obj, Coroutine)便可判断是不是协程,这个协程类从collections.abc包导入。

咱们也知道,生成器是协程的基础,那么有什么办法将生成器变成协程来使用?

经过@asyncio.coroutine装饰器能够标记生成器函数为协程对象,可是经过isinstance(obj, Generator)isinstance(obj, Coroutine)仍然能够看到,这个生成器函数只是被标记为协程了,但其本质依然是生成器。

重要概念

  1. event_loop事件循环,将协程注册到时间循环中,当知足事件发生时调用相应的协程函数;
  2. coroutine协程,一个使用async关键字定义的函数,调用它不会当即执行函数,而是返回一个协程对象,这个协程对象须要注册到事件循环中,由事件循环调用;
  3. future对象,表明未来执行或没有执行的任务的结果,和task没有本质区别;
  4. task任务,一个协程对象就是一个原生能够挂起的函数,任务则是对协程的进一步包装,其中包含任务的各类状态,task对象是future的子类,它将coroutine和future联系在一块儿,将coroutine封装成一个future对象;
  5. async/await关键字,async定义一个协程,await用于挂起阻塞的异步调用接口(做用相似yield但不彻底是)。

协程工做流程

  1. 定义/建立协程对象
  2. 将协程转换为task
  3. 定义事件循环容器
  4. 把task任务扔进事件循环中并触发
import asyncio

async def hello(name):
    print('Hello, ', name)

coroutine = hello('World')

# 建立事件循环
loop = asyncio.get_event_loop()

# 将协程转换为任务
task = loop.create_task(coroutine)

# 将任务放入事件循环对象中触发
loop.run_until_complete(task)

awaityield

这二者都能实现暂停的效果,但功能是不兼容的,在生成器中不能用await,在async定义的协程中不能用yield

而且,yield from后可接可迭代对象、迭代器、生成器、future对象、协程对象,await后只能接future对象、协程对象。

建立future对象

前面咱们知道经过async能够定义一个协程对象,那么如何建立一个future对象呢?

答案是经过task,只须要建立一个task对象便可。

# 在前一个例子中,咱们先建立了事件循环,而后经过事件循环建立了task,咱们来测试下
import asyncio
from asyncio.futures import Future


async def hello(name):
    print('Hello, ', name)

coroutine = hello('World')

# 建立事件循环
loop = asyncio.get_event_loop()

# 将协程转换为任务
task = loop.create_task(coroutine)

print(isinstance(task, Future))    # 结果是True

# 不创建事件循环的方法
task = asyncio.ensure_future(coroutine)

print(isinstance(task, Future))    # 结果也是True

知道了建立future对象(也便是建立task对象)的方法,那么咱们验证下awaityield后接coroutine和future对象。

import sys
import asyncio


async def f1():
    await asyncio.sleep(2)
    return 'Hello, {}'.format(sys._getframe().f_code.co_name)


@asyncio.coroutine
def f2():
    yield from asyncio.sleep(2)
    return 'Hello, {}'.format(sys._getframe().f_code.co_name)


async def f3():
    await asyncio.ensure_future(asyncio.sleep(2))
    return 'Hello, {}'.format(sys._getframe().f_code.co_name)


@asyncio.coroutine
def f4():
    yield from asyncio.ensure_future(asyncio.sleep(2))
    return 'Hello, {}'.format(sys._getframe().f_code.co_name)


tasks = [
    asyncio.ensure_future(f1()),
    asyncio.ensure_future(f2()),
    asyncio.ensure_future(f3()),
    asyncio.ensure_future(f4())
]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print(task.result())

loop.close()

绑定回调函数

异步IO都是在IO高的地方挂起,等IO操做结束后再继续执行,大多数时候,咱们后续的代码执行都是须要依赖IO的返回值的,此时就要用到回调了。

回调的实现有两种方式。

第一种,利用同步编程实现的回调

这种方法要求咱们可以取得协程的await的返回值。经过task对象的result()方法能够得到返回结果。

import time
import asyncio

async def _sleep(x):
    time.sleep(x)
    return 'Stopped {} seconds!'.format(x)

coroutine = _sleep(2)

loop = asyncio.get_event_loop()

task = asyncio.ensure_future(coroutine)

loop.run_until_complete(task)

# 直接经过task获取任务结果
print('Result: {}'.format(task.result()))

第二种,经过asyncio自带的添加回调函数功能实现

import time
import asyncio

async def _sleep(x):
    time.sleep(x)
    return 'Stopped {} seconds!'.format(x)

def callback(future):
    print('Result: {}'.format(future.result()))

coroutine = _sleep(2)

loop = asyncio.get_event_loop()

task = asyncio.ensure_future(coroutine)

# 添加回调函数
task.add_done_callback(callback)

loop.run_until_complete(task)

协程中的并发

asyncio实现并发,就须要多个协程来完成任务,前面作awaityield的验证时就用了并发。

每当有任务阻塞的时候就await,而后其余协程继续工做。

第一步,建立多个协程的列表

# 协程函数
async def worker(n):
    print('Waiting: {}'.format(n))
    await asyncio.sleep(n)
    return 'Done {}'.format(n)

# 协程对象
c1 = worker(1)
c2 = worker(2)
c3 = worker(4)

# 协程转换为task
tasks = [
    asyncio.ensure_future(c1),
    asyncio.ensure_future(c2),
    asyncio.ensure_future(c3)
    ]

loop = asyncio.get_event_loop()

第二步,将列表注册到事件循环中

有两种方法,这两种方法的区别后面说。

return的结果能够经过task.result()查看。

# asyncio.wait()
loop.run_until_complete(asyncio.wait(tasks))

# asyncio.gather()
loop.run_until_complete(asyncio.gather(*tasks))    # *不能省略

# 查看结果
for task in tasks:
    print('Result: {}'.format(task.result()))

协程中的嵌套

使用async能够定义协程,协程用于耗时的IO操做,咱们也能够封装更多的IO操做过程,实现一个协程中await另外一个协程,实现协程的嵌套。

# 内部协程函数
async def worker(n):
    print('Waiting: {}'.format(n))
    await asyncio.sleep(n)
    return 'Done {}'.format(n)

# 外部协程函数
async def main():
    c1 = worker(1)
    c2 = worker(2)
    c3 = worker(4)
    
    tasks = [
        asyncio.ensure_future(c1),
        asyncio.ensure_future(c2),
        asyncio.ensure_future(c3)
    ]
    
    dones, pendings = await asyncio.wait(tasks)
    
    for task in tasks:
        print('Result: {}'.format(task.result()))


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

若是外部协程使用的asyncio.gather(),那么做以下替换。

results = await asyncio.gather(*tasks)

for result in results:
    print('Result: {}'.format(result))

协程中的状态

讲生成器时提到了四种状态,对协程咱们也了解一下其状态(准确地说是future/task对象的状态)。

  1. Pending:建立Future,还未执行
  2. Running:事件循环正在调用执行任务
  3. Done:任务执行完毕
  4. Cancelled:Task被取消后的状态

gather和wait

接收的参数不一样

wait接收的tasks,必须是一个list对象,该list对象中存放多个task,既能够经过asyncio.ensure_future转为task对象也能够不转。

gather也能够接收list对象,但*不能省,也能够直接将多个task做为可变长参数传入,参数能够是协程对象或future对象。

返回结果不一样

wait返回donespendings,前者表示已完成的任务,后者表示未完成的任务,须要经过task.result()手工获取结果。

gather直接将值返回。

协程控制功能

# FIRST_COMPLETED:完成第一个任务就返回
# FIRST_EXCEPTION:产生第一个异常就返回
# ALL_COMPLETED:全部任务完成再返回(默认选项)
dones, pendings = loop.run_until_complete(
    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

# 控制运行时间:1秒后返回
dones, pendings = loop.run_until_complete(
    asyncio.wait(tasks, timeout=1))

动态添加协程

asyncio中如何动态添加协程到事件循环中?

两种方法,一种是同步的,一种是异步的。

import time
import asyncio
from queue import Queue
from threading import Thread

# 在后台永远运行的事件循环
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


def do_sleep(x, queue, msg=""):
    time.sleep(x)
    queue.put(msg)

queue = Queue()

new_loop = asyncio.new_event_loop()

t = Thread(target=start_loop, args=(new_loop,))
t.start()

print(time.ctime())

# 动态添加两个协程
# 这种方法在主线程是同步的
new_loop.call_soon_threadsafe(do_sleep, 6, queue, 'First')
new_loop.call_soon_threadsafe(do_sleep, 3, queue, 'Second')

while True:
    msg = queue.get()
    print('{} is done'.format(msg))
    print(time.ctime())
import time
import asyncio
from queue import Queue
from threading import Thread

# 在后台永远运行的事件循环
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def do_sleep(x, queue, msg=""):
    await asyncio.sleep(x)
    queue.put(msg)

queue = Queue()

new_loop = asyncio.new_event_loop()

t = Thread(target=start_loop, args=(new_loop,))
t.start()

print(time.ctime())

# 动态添加两个协程
# 这种方法在主线程是异步的
asyncio.run_coroutine_threadsafe(do_sleep(6, queue, 'First'), new_loop)
asyncio.run_coroutine_threadsafe(do_sleep(3, queue, 'Second'), new_loop)

while True:
    msg = queue.get()
    print('{} is done'.format(msg))
    print(time.ctime())
相关文章
相关标签/搜索