简单谈谈协程

前提:html

  1. 默认你已经对协程有了基本了解。
  2. 写此文时,我自觉得我对协程了解仍是很不深刻,更多的是作个记录,依据我的经验,我相信再过段时间我会不忍看这篇文章的。
  3. 水平有限(太菜了),若是你发现文中有表述不当的地方,欢迎指正啦~

协程定义

协程的提出最先能够追溯到 Melvin E. Conway 于 1963 年发布的一篇论文《Design of a Separable Transition-Diagram Compiler》python

在这篇论文中,他提出将编译器组织为一组协程,这使得在调试中使用单独的通道,而后在生产中运行单传递编译器成为可能。文章中有这样一段话:c++

coroutines are subroutines all at the same level, each acting as if it were the master program when in fact there is no master program.git

这也许是协程最先的 描述了,不过,遗憾的是就目前来讲,协程这一律念彷佛仍是缺失清晰明确并被广泛承认的定义。所以你能看到各类语言/系统在具体实现时基于不一样的理解和预期要求,方案也是不尽相同。在这里,咱们结合 Marlin 博士在论文中总结的协程特征来定义它:程序员

  • 协程的本地数据在连续调用中能得以存留
  • 当发生控制转移时,原协程的执行被挂起,只有当控制权在稍后的某个阶段从新回到进入该协程时,才会从原先离开的地方继续执行。

固然了,这种基于表象的描述仍是没有明确其内在构造,留下了一些疑问,由这些疑问产生了对协程在不一样角度的分类。github

  1. 基于协程间控制转移机制的不一样,分为对称协程和非对称协程。
  2. 基于协程是否被实现为堆栈结构,分为堆栈协程 (stackful coroutine) 和无堆栈协程(stackless coroutine)。
  3. 另外还有基于其在语言中是做为 first-class 对象提供仍是做为受约束的结构来分类。

在这篇文章中,咱们主要关注于 stackful coroutine 和 stackless coroutine 的区别。编程

Why Coroutine?

咱们以生产者消费者模型来讲明协程的使用场景,要注意的是,系统层级的协程为了与语言中的实现加以区分,一般会叫另一个名字:纤程(fiber),这里咱们仍是统称协程了。promise

传统的生产者消费者模型是假设一个进程拥有两个线程,一个线程是消费者,一个线程是生产者,它们公用一份地址空间。咱们能够经过这种方式简单的实现生产者消费者模型。可是这种方式很是繁琐也消耗性能,由于你须要不断地进行生产者和消费者的上下文切换。而且这种线程的切换是由 OS 调度的,计算机上可能运行着不少的程序,调度器并不知道生产者和消费者之间的关系,极可能生产者早就完成了数据的生成,消费者却迟迟不能获得调度。浏览器

对于上述问题,一种解决方案是直接把它们放到一个线程上下文里,写一堆相互依赖检查的代码,但这反却是一种回退了:markdown

  1. 咱们用多线程模型去解决生产者消费者问题,就是为了实现生产者和消费者之间解耦和异步,但愿它们能尽量的独立工做而不考虑对方。
  2. 这种作法也并不天然,生产者和消费者更应该是一种相互协做的状态,而不是有明确的 caller 和 callee

协程和线程的概念很类似,实际上在 OS 中,协程也被叫作用户态线程,不过不一样于线程的抢占式调度执行,协程的主动让出式调度执行起来更为高效,由于从用户态转换到内核态的切换成本,须要维持的上下文也少得多。

import asyncio
import random

async def produce(queue, n):
    for item in range(n):
        # 生产一个项目,使用sleep模拟I/O操做
        print('producing item {} ->'.format(item)) 
        await asyncio.sleep(random.random())
        # 将项目放入队列
        await queue.put(item)
    # 指示生产完毕
    await queue.put(None)

async def consume(queue):
    while True:
        # 等待来自生产者的项目
        item = await queue.get()
        if item is None:
            break
        # 消费这个项目,使用sleep模拟I/O操做
        print('consuming item {} <-'.format(item))
        await asyncio.sleep(random.random()) 

async def main():
    queue = asyncio.Queue()
    task1 = asyncio.create_task(produce(queue, 10))
    task2 = asyncio.create_task(consume(queue))
    await task1
    await task2

asyncio.run(main())
复制代码

有栈协程和无栈协程

以咱们上面对协程的定义来讲,你会发现实现协程的关键点在于数据流和控制流如何在跳转中保持,咱们先不谈保持,如何实现控制流的转移呢?

  1. 调用一个新的函数

  2. 从当前函数返回

stackful

方案一就是有栈协程的思路,也是很是天然的一种想法,咱们让每一个协程都拥有本身的堆栈和控制块,在协程挂起以前,当前活动的协程的非易失性寄存器被存储在协程的控制块中。在恢复新激活的协程以前,从其关联的控制块中恢复,协程上下文切换经过栈空间的交换来实现。实际上你在 glibc/ucontext.h 中能看到 getcontextsetcontextswapcontextmakecontext 这几个函数声明,一些开源协程库就是基于此实现的。

这里牵扯到一个问题,栈空间如何分配?

一种方案是独立内存栈,即给每一个协程分配一块固定大小的栈空间,可是分配多少呢,少了容易栈溢出,多了容易浪费内存,交给程序员分配又增长了心智负担。为了防止爆栈,咱们每每要给每一个任务都分配知足上限的栈空间,任务切换时直接调整栈指针,可是一个线程同一时刻只有一个协程在运行呀。固然你也能够放到堆上,可是显然程序调用的开销也会显著增长。

另外一种方案是 共享内存栈,其作法是,咱们预先分配一块占用内存较大的栈空间做为共享栈内存,在suspend 或者说 transfer 时,咱们会基于当前协程实际所用空间对其进行备份,也就是动态分配。共享栈相比独立栈的劣势时任务切换时须要进行较为复杂的拷贝。

贴下:云风的协程库 coroutine 注释版: github.com/chenyahui/A…

stackless

无栈协程采用的就是第二种方案了,你能够将其理解基于对象模型实现的,这时协程的上下文就是对象的成员变量了。固然,你也能够将其理解为基于状态机模型去实现的,以下:

struct coroutine {
    int i;
    int value = 0;
    void next() {
        switch(value) {
        case 0:
            return frist();
        case 1:
            return second();
    }
    void frist() {
        i = 0;
        value = 1;
    }
    void second() {
        i++;
        value = 2;
    }
};
复制代码

因为不须要切换栈帧,无栈协程的性能相比有栈协程会更好一点,内存空间的占用也好得多,并且它执行的是传统的函数调用函数返回,不须要手动修改栈指针,不会破坏 return stack buffer 跳转预测,可是其实现须要编译器的支持,总体上来讲兼容性不若有栈协程。

误解、补充

栈协程和无栈协程的区别点容易被误解。

这里的有栈、无栈是指协程是否被实现为堆栈结构,换句话说当某个协程被挂起时,其是否存留在一个栈结构中。你能在不少文章以及论文中看到他们认为有栈协程和无栈协程的差别在于可否在嵌套中挂起,实际上这是不许确的。

如何理解呢?

设想一下,咱们调用了一个 coroutine c,当它调用了常规例程 f 时,f 的调用帧要去哪呢?若是对 f() 的调用致使另外一个协程挂起,那么f()自己的调用帧必须和 c() 的帧一块儿保存。

基于 stackful 的协程方案,咱们是能够作到这一点的,好比采用 Cactus Stack 的方案(以下图) ,可是仍是会带来不小的复杂性。 所以,很多语言的有栈协程也对其作了限制:要求每一个协程都放在词法嵌套的最外层声明。这个时候协程栈是彻底互不相交的,好比采用 stackful coroutine 的 Modula-2 就是如此。

仙人掌栈

而 stackless 的方案,协程 suspend 的时候根本就没有常规的调用栈结构,要实现嵌套调用怎么整呢。。。。很难

generator 和 async/await

回归到 js , 不少人会说 generator 和 async/await 就是无栈协程,其实这种说法不是太准确。

这点 python 的 pep 规范中也到了体现:www.python.org/dev/peps/pe…

引入 async/await 目的是实现一个心智模型简单的,易用的并尽量接近同步的编程模型,更好的实现并发编程,而且明确的和生成器区别开来,消除生成器与协程之间的歧义。

生成器就是生成器,或者你叫它半协程(由于你能够基于生成器去实现协程),咱们使用生成器更多的是侧重于它提供的暂停和恢复执行函数的能力,叫协程就不太恰当了。

至于 async/await 是否是 generator 语法糖的问题,彻底看编译器实现了。

咱们能够经过 d8 --print-bytecode [path].js 看看 v8 是如何处理的 js 中 generator 函数的。

好比下面这段代码:

function* testGenerator (){
  yield 1;
  yield 'a string'
}

function main(){
   let gen = testGenerator()
   gen.next()
   gen.next()
}

main()
复制代码

生成的字节码太长,这里只截出部分(函数 testGenerator 部分字节码 ),字节码命令相关解释,能够直接看 源码注释

image-20210512152432445

经过 invokeInstrinsic[_GreateJSGeneratorObject] 建立 JSGenerator 实例,类 JSGenerator 的定义 戳这里

咱们要关注的是 SuspendGeneratorResumeGenerator 这两个命令。

IGNITION_HANDLER(SuspendGenerator, InterpreterAssembler) {
  Node* generator = LoadRegisterAtOperandIndex(0);
  TNode<FixedArray> array = CAST(LoadObjectField(
      generator, JSGeneratorObject::kParametersAndRegistersOffset));
  Node* closure = LoadRegister(Register::function_closure());
  Node* context = GetContext();
  RegListNodePair registers = GetRegisterListAtOperandIndex(1);
  Node* suspend_id = BytecodeOperandUImmSmi(3);

  Node* shared =
      LoadObjectField(closure, JSFunction::kSharedFunctionInfoOffset);
  TNode<Int32T> formal_parameter_count = UncheckedCast<Int32T>(
      LoadObjectField(shared, SharedFunctionInfo::kFormalParameterCountOffset,
                      MachineType::Uint16()));

  ExportParametersAndRegisterFile(array, registers, formal_parameter_count);
  StoreObjectField(generator, JSGeneratorObject::kContextOffset, context);
  StoreObjectField(generator, JSGeneratorObject::kContinuationOffset,
                   suspend_id);

  Node* offset = SmiTag(BytecodeOffset());
  StoreObjectField(generator, JSGeneratorObject::kInputOrDebugPosOffset,
                   offset);

  UpdateInterruptBudgetOnReturn();
  Return(GetAccumulator());
}
复制代码

简单来讲,SuspendGenerator 处理函数会调用 LoadRegisterGetContext LoadObjectFieldStoreObjectField 等来保存状态,并记录字节码的偏移量,而后直接把累加器(V8 是经过模拟物理机器来执行字节码的,而且基于寄存器设计)的值给 return 了,也就是当前 generator 函数栈帧已经退出了,那暂停是怎么实现的呢?

很简单,你会发现字节码处理函数最后一行基本都会调用 Dispatch 函数,执行该函数会取出当前函数生成的下一条字节码来执行,SuspendGenerator 里没有执行 Dispatch,至于暂停位置,则是经过偏移量来记录的。

IGNITION_HANDLER(ResumeGenerator, InterpreterAssembler) {
  Node* generator = LoadRegisterAtOperandIndex(0);
  Node* closure = LoadRegister(Register::function_closure());
  RegListNodePair registers = GetRegisterListAtOperandIndex(1);

  Node* shared =
      LoadObjectField(closure, JSFunction::kSharedFunctionInfoOffset);
  TNode<Int32T> formal_parameter_count = UncheckedCast<Int32T>(
      LoadObjectField(shared, SharedFunctionInfo::kFormalParameterCountOffset,
                      MachineType::Uint16()));

  ImportRegisterFile(
      CAST(LoadObjectField(generator,
                           JSGeneratorObject::kParametersAndRegistersOffset)),
      registers, formal_parameter_count);

  SetAccumulator(
      LoadObjectField(generator, JSGeneratorObject::kInputOrDebugPosOffset));

  Dispatch();
}
} 
复制代码

ResumeGenerator 就基本是一个相反的过程了,恢复以前保存的状态,调用 Dispatch 取出下一条字节码继续执行 generator 函数。

v8 中 async/await 是在 Generator 基础上另外实现的。await 也会生成 SuspendGeneratorResumeGenerator 这两条字节码,

JSAsyncGeneratorObject 类 继承了 JSGeneratorObject 类。

除了 Generator 这个地方还和 microtask 有关,简单来讲(这个地方实在看不下去代码了,搜的),对于以下代码,编译器会建立一个 fulfilled 状态的 promise,加入一个 microtask 到队列中,保存执行状态,暂停,而后遍历微任务队列时,就会恢复状态并执行,放到浏览器中,其调度又会受到浏览器事件循环的影响。

async function testAsync(){
    const res = await 0
    return res
}

testAsync()
复制代码

参考

相关文章
相关标签/搜索