Quasar 提供不少的功能(Go-like channels, Erlang-like actors),这篇博客主要介绍Quasar的核心Fiber和使用Fiber来处理异步IO(本博客给出的例子是发起Http 请求)。html
本博客所指的线程均指Linux下的线程,Linux下不区分线程和进程,特别的地方会再作说明。git
Fiber(中文翻译成纤程) 是JVM上实现的轻量级用户态线程,和go 语言的goroutine 相似。Fiber 有以下几个特色:github
先看一下在JVM 中用户态线程和内核态线程的对应关系:spring
Thread: 1:1 一个Java 线程对应一个内核线程.(能够被内核调度,消耗context switch) Fiber: M:N mapping to kernel threads. Strand: abstraction of thread or fiber(后面会有介绍).
传统作法:并发
public void messageFriend() { withModule(() -> { withConnection(richard -> { richard.dataHandler(data -> { assertEquals("bob>oh its you!", data.toString()); moduleTestComplete(); }); richard.write("richard\n"); withConnection(bob -> { bob.dataHandler(data -> { assertEquals("richard>hai",data.toString()); bob.write("richard<oh its you!"); }); bob.write("bob\n"); vertx.setTimer(6, id -> richard.write("bob<hai")); }); }); }); }
2.Monadsapp
In Java8 like:异步
CompletableFuture.supplyAsync().thenAccept()...
这个要优于回调,它去除回调金字塔,可是也有以下的缺点,手动的上下文管理(须要在CompletableFuture 里面执行),并发逻辑不清晰(你会有不少的 .then().then()),还须要改变接口(方法须要返回 CompletableFuture)ide
你能够在这里看到更多的内容Monads vs Scoped Continuations测试
Fiber 的作法: Just Block, 由于Fiber 是轻量的,能够Suspend 和 Resume,Fiber 的执行过程是这样的,你建立并启动一个Fiber(Fiber建立和使用和线程同样):编码
new Fiber<Void>(new SuspendableRunnable() { public void run() throws SuspendExecution, InterruptedException { // your code bar(); // call bar; } }).start();
而后由 Schedule(有默认提供) 调度,当Fiber须要block的时候,调用Fiber.park(),Schedule 能够执行其它的操做(效率就是在这个时候体现出来的),而后block完的时候 又经过 Fiber.unpark()继续执行。
1.怎么 instrument:
Quasar fibers 依赖 bytecode instrumentation. 能够经过Java Agent在类加载的时候实现, 也能够经过在编译期间经过 Ant task来是实现. 实现的效果就是在原来的代码中插入一些额外的代码(或者说字节码)。
2.为何Fiber是Suspendable 和 Resumeable 的,就是经过一个Stack来存储代码执行的相关信息:
class Stack{ int[] method; // PC(程序计数器), SP(栈指针) long[] dataLong; // stack premitives(基地址) Object[] dataObject; // stack refs (相关引用) }
3.Instrumentation 以后在JVM中的执行过程:
before :
if bar() run in a fiber, and it call foo(),so foo() should be Suspendable. bar(){ baz(); foo(); } foo(){ Fiber.park(); // throw Exception }
after:
bar(){ int pc = isFiber ? s.pc :0; switch(pc){ case 0: baz(): if(isFiber){ s.pc=1; // store locals -> s } case 1: if(isFiber) // load locals <-s foo(); // suspendable } } foo(){ int pc =isFiber ? s.pc : 0; switch(pc){ if(isFiber){ s.pc = 3; // store locals -> s } Fiber.park(); // throw Exception case 3: if(isFiber) // load locals <- s } }
- Interfaces/superclasses:iff have a suspendable implementation (dynamic proxies marked manually)
- Reflection,invoke dynamic:presumed suspendable(except lambdas)
- Which methods to instrument?(manual/graph analysis)
- Instrument "infection":if we`re conservative and automatic,a lot of methods must be instrumented(e.g. consider Runnable being suspendable - and all it`s callers...)
这些问题的意思是说在进行Instrument的过程当中,针对接口和动态代理方法,只有在运行时才能决定具体的实现类(决定具体的调用方法),因此须要手动管理列出这些类,这也是Fiber使用起来比较繁琐的一点,你们能够参考我最后给出的例子,例子中会给你们一个大概的说明(主要是为了新手少踩一些坑)。官方文档也给了详细的说明,须要耐心一点看完。
Fibers are not meant to replace threads in all circumstances. A fiber should be used when its body (the code it executes) blocks very often waiting on other fibers (e.g. waiting for messages sent by other fibers on a channel, or waiting for the value of a dataflow-variable). For long-running computations that rarely block, traditional threads are preferable. Fortunately, as we shall see, fibers and threads interoperate very well.
Fiber适用于阻塞频繁的代码(好比IO阻塞),并且若是须要消耗CPU的代码使用Thread,它们能够同时抽象为前面的Strand,这样的话优点就会高于Node(Node 经过单线程异步来高效实现IO请求处理,具体对比我会给出另一篇博客)。
建议感兴趣的朋友从官方文档开始Quasar
另外我作了一个使用Spring Boot 和 Fiber来作HttpClient的demo,放在GitHub上,以供你们参考 spring-quasar-demo。
下一篇我会介绍在使用Fiber来作HttpServer。