Java-协程

什么是协程
大多数的开发人员可能对进程,线程这两个名字比较熟悉。可是为了追求最大力度的发挥硬件的性能和提高软件的速度,出现了协程或者叫纤程(Fiber),或者绿色线程(GreenThread)。那咱们来聊下什么是协程,以及在java中是怎么体现和运用协程的。html

在说协程以前,咱们先来回想下,如今大多数的程序中,都是使用了多线程技术来解决一些须要长时间阻塞的场景。JAVA中每一个线程栈默认1024K,没有办法开成千上万个线程,并且就算经过JVM参数调小,CPU也没法分配时间片给每一个线程,大多数的线程仍是在等待中,因此咱们通常会使用
Runtime.getRuntime().availableProcessors()来配置线程数的大小(或者会根据实际状况调整,就不展开讨论了),可是就算是咱们开了新的线程,该线程也多是在等待系统IO的返回或者网络IO的返回,并且线程的切换有着大量的开销。java

为了解决上面说的问题,你们可能会想到回调。如今不少框架都是基于回调来解决那些耗时的操做。但层数嵌套多了反而会引发反人类的回调地狱,而且回调后就丢失原函数的上下文。其中的表明呢就好比说nodeJs。node

终于能够来聊聊协程。它的基本原理是:在某个点挂起当前的任务,而且保存栈信息,去执行另外一个任务;等完成或达到某个条件时,在还原原来的栈信息并继续执行。上面提到的几个点你们会想到JVM的结构,栈, 程序计数器等等,可是JVM原生是不支持这样的操做的(至少java是不支持的,kotlin是能够的)。所以若是要在纯java代码里须要使用协程的话须要引入第三方包,如kilim,Quasar。而kilim已经好久未更新了,那么咱们来看看Quasar。python

Quasar原理
一、利用字节码加强,将普通的java代码转换为支持协程的代码。
二、在调用pausable方法的时候,若是pause了就保存当前方法栈的State,中止执行当前协程,将控制权交给调度器
三、调度器负责调度就绪的协程
四、协程resume的时候,自动恢复State,根据协程的pc计数跳转到上次执行的位置,继续执行。
这些第三方的框架大部分实现是一致的。经过对字节码直接操做,在编译期把你写的代码变为支持协程的版本,并在运行时把你全部须要用到协程的部分由他来控制和调度,同时也支持在运行期这样作。
Quasar中使用了抛异常的方式来中断线程,可是 实际上若是咱们捕获了这个异常就会产生问题,因此通常是以这种方式来注册:
@Suspendable
public int f() {
try {
// do some stuff
return g() * 2;
} catch(SuspendExecution s) {
//这里不该该捕获到异常.
throw new AssertionError(s);
}
}
在调度方面,Quasar中默认使用了JDK7以上才有的ForkJoinPool,它的优点就在于空闲的线程会去从其余线程任务队列尾部”偷取”任务来本身处理,所以也叫work-stealing功能。这个功能能够大大的利用CPU资源,不让线程白白空闲着。程序员

Quasar模块golang

Fiber
Fiber能够认为是一个微线程,使用方式基本上和Thread相同,启动start:
new Fiber<V>() {
@Override
protected V run() throws SuspendExecution, InterruptedException {算法

// your code

}
}.start();
new Fiber<Void>(new SuspendableRunnable() {
public void run() throws SuspendExecution, InterruptedException {
// your code
}
}).start(); 网络

其实它更相似于一个CallBack,是能够携带返回值的,而且能够抛异常SuspendExecution,InterruptedException。你也能够向其中传递SuspendableRunnable 或 SuspendableCallable 给Fiber的构造函数。你甚至能够像线程同样调用join(),或者get()来阻塞线程等待他完成。
当Fiber比较大的时候,Fiber能够在调用parkAndSerialize 方法时被序列化,在调用unparkSerialized时被反序列化。
从以上咱们能够看出Fiber与Thread很是相似,极大的减小了迁移的成本。多线程

FiberScheduler
FiberScheduler是Quasar框架中核心的任务调度器,负责管理任务的工做者线程WorkerThread,以前提到的他是一个FiberForkJoinScheduler。
ForkJoinPool的默认初始化个数为Runtime.getRuntime().availableProcessors()。框架

instrumentation
当一个类被加载时,Quasar的instrumentation模块 (使用 Java agent时) 搜索suspendable 方法。每个suspendable 方法 f经过下面的方式 instrument:

它搜索对其它suspendable方法的调用。对suspendable方法g的调用,一些代码会在这个调用g的先后被插入,它们会保存和恢复fiber栈本地变量的状态,记录这个暂停点。在这个“suspendable function chain”的最后,咱们会发现对Fiber.park的调用。park暂停这个fiber,扔出 SuspendExecution异常。

当g block的时候,SuspendExecution异常会被Fiber捕获。 当Fiber被唤醒(使用unpark), 方法f会被调用, 执行记录显示它被block在g的调用上,因此程序会当即跳到f调用g的那一行,而后调用它。最终咱们会到达暂停点,而后继续执行。当g返回时, f中插入的代码会恢复f的本地变量。

过程听起来很复杂,可是它只会带来3% ~ 5%的性能的损失。

下面看一个简单的例子, 方法m2声明抛出SuspendExecution异常,方法m1调用m2和m3,因此也声明抛出这个异常,最后这个异常会被Fiber所捕获:

public class Helloworld {
static void m1() throws SuspendExecution, InterruptedException {

String m = "m1";
   System.out.println("m1 begin");
   m = m2();
   m = m3();
   System.out.println("m1 end");
   System.out.println(m);

}
static String m2() throws SuspendExecution, InterruptedException {

return "m2";

}
static String m3() throws SuspendExecution, InterruptedException {

return "m3";

}
static public void main(String[] args) throws ExecutionException, InterruptedException {

new Fiber<Void>("Caller", new SuspendableRunnable() {
       @Override
       public void run() throws SuspendExecution, InterruptedException {
           m1();
       }
   }).start();

}
}

// 反编译后的代码
@Instrumented(suspendableCallSites={16, 17}, methodStart=13, methodEnd=21, methodOptimized=false)
static void m1()
throws SuspendExecution, InterruptedException
{
// Byte code:
// 0: aconst_null
// 1: astore_3
// 2: invokestatic 88 co/paralleluniverse/fibers/Stack:getStack ()Lco/paralleluniverse/fibers/Stack;
// 5: dup
// 6: astore_1
// 7: ifnull +42 -> 49
// 10: aload_1
// 11: iconst_1
// 12: istore_2
// 13: invokevirtual 92 co/paralleluniverse/fibers/Stack:nextMethodEntry ()I
// 16: tableswitch default:+24->40, 1:+64->80, 2:+95->111
// 40: aload_1
// 41: invokevirtual 96 co/paralleluniverse/fibers/Stack:isFirstInStackOrPushed ()Z
// 44: ifne +5 -> 49
// 47: aconst_null
// 48: astore_1
// 49: iconst_0
// 50: istore_2
// 51: ldc 2
// 53: astore_0
// 54: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 57: ldc 4
// 59: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 62: aload_1
// 63: ifnull +26 -> 89
// 66: aload_1
// 67: iconst_1
// 68: iconst_1
// 69: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 72: aload_0
// 73: aload_1
// 74: iconst_0
// 75: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 78: iconst_0
// 79: istore_2
// 80: aload_1
// 81: iconst_0
// 82: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 85: checkcast 110 java/lang/String
// 88: astore_0
// 89: invokestatic 6 com/colobu/fiber/Helloworld:m2 ()Ljava/lang/String;
// 92: astore_0
// 93: aload_1
// 94: ifnull +26 -> 120
// 97: aload_1
// 98: iconst_2
// 99: iconst_1
// 100: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 103: aload_0
// 104: aload_1
// 105: iconst_0
// 106: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 109: iconst_0
// 110: istore_2
// 111: aload_1
// 112: iconst_0
// 113: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 116: checkcast 110 java/lang/String
// 119: astore_0
// 120: invokestatic 7 com/colobu/fiber/Helloworld:m3 ()Ljava/lang/String;
// 123: astore_0
// 124: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 127: ldc 8
// 129: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 132: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 135: aload_0
// 136: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 139: aload_1
// 140: ifnull +7 -> 147
// 143: aload_1
// 144: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 147: return
// 148: aload_1
// 149: ifnull +7 -> 156
// 152: aload_1
// 153: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 156: athrow
// Line number table:
// Java source line #13 -> byte code offset #51
// Java source line #15 -> byte code offset #54
// Java source line #16 -> byte code offset #62
// Java source line #17 -> byte code offset #93
// Java source line #18 -> byte code offset #124
// Java source line #19 -> byte code offset #132
// Java source line #21 -> byte code offset #139
// Local variable table:
// start length slot name signature
// 53 83 0 m String
// 6 147 1 localStack co.paralleluniverse.fibers.Stack
// 12 99 2 i int
// 1 1 3 localObject Object
// 156 1 4 localSuspendExecution SuspendExecution
// Exception table:
// from to target type
// 49 148 148 finally
// 49 148 156 co/paralleluniverse/fibers/SuspendExecution
// 49 148 156 co/paralleluniverse/fibers/RuntimeSuspendExecution
}
我并无更深刻的去了解Quasar的实现细节以及调度算法,有兴趣的读者能够翻翻它的代码。

实战
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.5.1</version>
<executions>
<execution>
<id>getClasspathFilenames</id>
<goals>
<goal>properties</goal>
</goals>
</execution>
</executions>
</plugin>

public class Helloworld {

@Suspendable
static void m1() throws InterruptedException, SuspendExecution {

String m = "m1";
   //System.out.println("m1 begin");
   m = m2();
   //System.out.println("m1 end");
   //System.out.println(m);

}
static String m2() throws SuspendExecution, InterruptedException {

String m = m3();
   Strand.sleep(1000);
   return m;

}
//or define in META-INF/suspendables
@Suspendable
static String m3() {

List l = Stream.of(1,2,3).filter(i -> i%2 == 0).collect(Collectors.toList());
   return l.toString();

}
static public void main(String[] args) throws ExecutionException, InterruptedException {

int count = 10000;
   testThreadpool(count);
   testFiber(count);

}
static void testThreadpool(int count) throws InterruptedException {

final CountDownLatch latch = new CountDownLatch(count);
   ExecutorService es = Executors.newFixedThreadPool(200);
   LongAdder latency = new LongAdder();
   long t = System.currentTimeMillis();
   for (int i =0; i< count; i++) {
       es.submit(() -> {
           long start = System.currentTimeMillis();
           try {
               m1();
           } catch (InterruptedException e) {
               e.printStackTrace();
           } catch (SuspendExecution suspendExecution) {
               suspendExecution.printStackTrace();
           }
           start = System.currentTimeMillis() - start;
           latency.add(start);
           latch.countDown();
       });
   }
   latch.await();
   t = System.currentTimeMillis() - t;
   long l = latency.longValue() / count;
   System.out.println("thread pool took: " + t + ", latency: " + l + " ms");
   es.shutdownNow();

}
static void testFiber(int count) throws InterruptedException {

final CountDownLatch latch = new CountDownLatch(count);
   LongAdder latency = new LongAdder();
   long t = System.currentTimeMillis();
   for (int i =0; i< count; i++) {
       new Fiber<Void>("Caller", new SuspendableRunnable() {
           @Override
           public void run() throws SuspendExecution, InterruptedException {
               long start = System.currentTimeMillis();
               m1();
               start = System.currentTimeMillis() - start;
               latency.add(start);
               latch.countDown();
           }
       }).start();
   }
   latch.await();
   t = System.currentTimeMillis() - t;
   long l = latency.longValue() / count;
   System.out.println("fiber took: " + t  + ", latency: " + l + " ms");

}
}

OUTPUT:
1
2
thread pool took: 50341, latency: 1005 ms
fiber took: 1158, latency: 1000 ms

能够看到很明显的时间差距,存在多线程阻塞的状况下,协程的性能很是的好,可是。若是把sleep这段去掉,Fiber的性能反而更差:

这说明Fiber并不意味着它能够在全部的场景中均可以替换Thread。当fiber的代码常常会被等待其它fiber阻塞的时候,就应该使用fiber。

对于那些须要CPU长时间计算的代码,不多遇到阻塞的时候,就应该首选thread

扩展
其实协程这个概念在其余的语言中有原生的支持,如:
kotlin 1.30以后已经稳定
: https://www.kotlincn.net/docs...
golang : https://gobyexample.com/gorou...
python : http://www.gevent.org/content...
在这些语言中协程就看起来至少没这么奇怪或者难以理解了,并且开发起开也相比java简单不少。

总结
协程的概念也不算是很新了,可是在像Java这样的语言或者特定的领域并非很火,也并无彻底普及。不是很明白是它的学习成本高,仍是说应用场景是在过小了。可是当我听到这个概念的时候确实是挺好奇,也挺好奇的。也但愿以后会有更多的框架和特性来简化咱们苦逼程序员的开发~~

参考文献
http://docs.paralleluniverse....

相关文章
相关标签/搜索