深刻Asyncio(二)从线程到协程

线程的真相

多线程并非一无可取,在实际问题中,要权衡优劣势来选择多线程、多进程或是协程。协程为多线程的某些问题提供了一种解决方案,因此学习协程首先要对线程有必定了解。python


多线程优势

  1. 代码可读性
    多线程代码即便是并发执行的,但依然能够线性阅读,可读性高。
  2. 共享内存
    在多核CPU中仍然共享内存数据,这对解决某些问题很重要,避免了数据通讯。
  3. 很容易对现有代码进行改造
    有不少多线程编程的实例,也有不少阻塞程序依赖多线程的代码参考。

在Python中,因为GIL的存在,并行执行依然是不可能的(CPython解释器)。GIL带来的一个影响就是,全部的线程只能用到一个CPU核心,这几乎断绝了多线程并行的优点。编程

使用多线程的最佳方式就是使用concurrent.futures库的ThreadPoolExecutor(),将全部的数据丢到submit()方法中。bash

from concurrent.futures import ThreadPoolExecutor as Executor

def worker(data):
    pass

with Executor(max_workers=10) as exe:
    future = exe.submit(worker, data)

要关闭线程则执行Executor.shutdown(wait=True),它容许等待1-2秒来等线程执行完毕。对于上述例子,要尽量地在worker()函数中不使用全局变量。多线程


多线程的背景

为了文章的完整性,即便读者早就有所了解,在这里仍是要补充:
1. 多线程编程中出现的bug是最难修复的bug,根据经验,设计一款新软件可能不容易出现这些问题,但对于一些现存软件的维护,即便专家也没办法解决这些问题;
2. 线程是资源密集型的,须要额外的系统资源来建立,如为每一个线程预分配内存栈同时会提早消耗进程的虚拟内存,能够经过threading.stack_size([size])修改栈大小,但对于函数递归嵌套调用的栈深度有影响;
3. 在高并发环境中,因为上下文切换成本,会对吞吐量形成影响;
4. 多线程不够灵活,全部的线程共享CPU时间,而无论线程是否准备工做。并发

总之,多线程编程让代码bug难查,而且对于高并发场景并不高效。异步


例子:机器人与餐具

假设你开了一家餐馆,你的雇员都是机器人,如今就雇员与餐具构建一个简单的程序。async

# Robot
import threading
from queue import Queue

class ThreadBot(threading.Thread):  # 线程子类,继承start/join等方法
    def __init__(self):
        super().__init__(target=self.manage_table)  # 目标函数,下面定义
        self.cutlery = Cutlery(knives=0, forks=0)   # 每一个机器人携带的餐具
        self.tasks = Queue()    # 机器人接收的任务被添加到任务队列

    def manage_table(self): 
        while True:   # 机器人只接受三种工做
            task = self.tasks.get()
            if task == 'prepare table':
                kitchen.give(to=self.cutlery, knives=4, forks=4)
            elif task == 'clear table':
                self.cutlery.give(to=kitchen, knives=4, forks=4)
            elif task == 'shutdown':
                return
# Cutlery
from attr import attrs, attrib  # 开源库,不影响线程或协程,使实例属性的初始化更轻松

@attrs
class Cutlery:
    knives = attrib(default=0)
    forks = attrib(default=0)

    def give(self, to: 'Cutlery', knives=0, forks=0):   # 用于与其它实例交互
        self.change(-knives, -forks)
        to.change(knives, forks)

    def change(self, knives, forks):
        self.knives += knives
        self.forks += forks

kitchen = Cutlery(knives=100, forks=100)
bots = [ThreadBot() for i in range(10)]    # 建立了10个线程机器人

import sys

for bot in bots:
    for i in range(int(sys.argv[1])):   # 从命令行获取桌子的数量,而后给每一个机器人安排全部桌子的任务
        bot.tasks.put('prepare table')
        bot.tasks.put('clear table')
    bot.tasks.put('shutdown')

print(f'Kitchen inventory before service: {kitchen}')
for bot in bots: bot.start()
for bot in bots: bot.join()
print(f'Kitchen inventory after service: {kitchen}')

指望是通过程序运行后,全部的刀叉都应该回到厨房而且数量与初始同样。函数

λ  python test.py 100
Kitchen inventory before service: Cutlery(knives=100, forks=100)
Kitchen inventory after service: Cutlery(knives=100, forks=100)

λ  python test.py 10000
Kitchen inventory before service: Cutlery(knives=100, forks=100)
Kitchen inventory after service: Cutlery(knives=104, forks=80)

能够看到,在提供10000张桌子后,结果出现了严重的错误,实际上即便尝试屡次都不乐观。咱们知道这些机器人构造良好,也不会出现错误,那么是什么地方错了呢?高并发

回忆下场景:代码简单易读,逻辑没错,甚至用100张桌子进行了测试,但10000张桌子测试就失败了,而且错误每次都不同。oop

其实这是典型的竞态条件bug,错误出如今这一段代码中:

def change(self, knives, forks):
    self.knives += knives
    self.forks += forks

自加在C解释器中运行并非原子的,它被分为几步:
1. 读取原变量(self.knives)值到临时变量;
2. 将knives值加到临时变量;
3. 将终值赋值给原变量。

抢占式多任务(多线程)会打乱上述步骤,可经过加锁的方式修复bug:

def change(self, knives, forks):
    with self.lock:
        self.knives += knives
        self.forks += forks

但加锁须要了解多线程代码中,什么地方会发生数据共享,若是是我的写的程序比较好控制,但若是有第三方的代码夹杂进来,就会很头痛了。

光看源码很难找出有竞态条件,这主要是由于源码里没有指出什么时候何处切换线程,即便指出也没用,由于切换是由操做系统决定的,它可能发生在任什么时候间任何位置。

解决问题的一个办法就是让机器人不处理餐具,而是交由某一个单独的线程去处理。

不过在协程中,咱们能够显式地知道上下文在什么时候切换,由于await关键字很显眼。

import asyncio

class CoroBot:  # 单线程管理多个机器人实例
    def __init__(self):
        self.cutlery = Cutlery(knives=0, forks=0)
        self.tasks = asyncio.Queue()    # 使用异步队列

    async def manage_table(self):
        while True:
            task = await self.tasks.get()   # 关键点,协程惟一能够切换的位置
            if task == 'prepare table':
                kitchen.give(to=self.cutlery, knives=4, forks=4)
            elif task == 'clear table':
                self.cutlery.give(to=kitchen, knives=4, forks=4)
            elif task == 'shutdown':
                return

from attr import attrs, attrib

@attrs
class Cutlery:
    knives = attrib(default=0)
    forks = attrib(default=0)

    def give(self, to: 'Cutlery', knives=0, forks=0):
        self.change(-knives, -forks)
        to.change(knives, forks)

    def change(self, knives, forks):
        self.knives += knives
        self.forks += forks

kitchen = Cutlery(knives=100, forks=100)
bots = [CoroBot() for i in range(10)]    # 建立了10个协程机器人,但它们是由一个线程管理的

import sys

for bot in bots:
    for i in range(int(sys.argv[1])):
        bot.tasks.put_nowait('prepare table')   # 异步写入队列
        bot.tasks.put_nowait('clear table')
    bot.tasks.put_nowait('shutdown')

print(f'Kitchen inventory before service: {kitchen}')
loop = asyncio.get_event_loop()
tasks = [loop.create_task(bot.manage_table()) for bot in bots]
task_group = asyncio.gather(*tasks)
loop.run_until_complete(task_group)
print(f'Kitchen inventory after service: {kitchen}')

因为只有一个位置提供切换协程,所以在运行过程当中不存在竞态条件,结果也是明显的,不管多少测试都能经过。

相关文章
相关标签/搜索