本系列文章索引《响应式Spring的道法术器》
前情提要 响应式流 | Reactor 3快速上手
本文源码html
上一章在响应式编程库方面,本着“快速上手”的原则,介绍了响应式流的概念,以及Reactor 3的使用。这一章,咱们基于对Reactor 3源码的模仿,从《响应式流规范》入手,深刻了解响应式流开发库。java
现代软件对近乎实时地处理数据的需求愈来愈强烈,对不断变化的信息的即时响应,意味着更大的商业价值,流处理是一种快速将数据转换为有用信息的手段。react
数据流中的元素能够是一个一个的待计算的数据,也能够是一个一个待响应的事件。前者多用于大数据处理,好比Storm、Spark等产品,后者经常使用于响应式编程,好比Netflix在使用的RxJava、Scala编程语言的发明者Typesafe公司(已改名为Lightbend)的Akka Stream、Java开发者都熟悉的Pivotal公司的Project Reactor、走在技术前沿的Vert.x等。git
软件行业是一个很是注重分享和交流的行业。随着对响应式编程技术的讨论与沟通逐渐深刻,2013年底的时候,Netflix、Pivotal、Typesafe等公司的工程师们共同发起了关于制定“响应式流规范(Reactive Stream Specification)”的倡议和讨论,并在github上建立了reactive-streams-jvm项目。到2015年5月份,1.0版本的规范出炉,项目README就是规范正文。github
各个响应式开发库都要遵循这个规范,其好处也是显而易见的。之因此咱们编写的Java代码能够在Hotspot、J9和Zing等JVM运行,是由于它们都遵循Java虚拟机规范。相似的,因为各个响应式开发库都遵循响应式流规范,所以互相兼容,不一样的开发库之间能够进行交互,咱们甚至能够同时在项目中使用多个响应式开发库。对于Spring WebFlux来讲,也可使用RxJava做为响应式库。编程
<img height=300em src="https://leanote.com/api/file/getImage?fileId=5a9e3ed2ab644159cf00128e"/>;api
虽然响应式流规范是用来约束响应式开发库的,做为使用者的咱们若是可以了解这一规范对于咱们理解开发库的使用也是颇有帮助的,由于规范的内容都是对响应式编程思想的精髓的呈现。访问reactive-streams-jvm项目,能够浏览规范的细节,包括其中定义的响应式流的特色:数组
响应式流规范定义了四个接口,以下:安全
1.Publisher
是可以发出元素的发布者。并发
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
2.Subscriber
是接收元素并作出响应的订阅者。
public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
当执行subscribe
方法时,发布者会回调订阅者的onSubscribe
方法,这个方法中,一般订阅者会借助传入的Subscription
向发布者请求n个数据。而后发布者经过不断调用订阅者的onNext
方法向订阅者发出最多n个数据。若是数据所有发完,则会调用onComplete
告知订阅者流已经发完;若是有错误发生,则经过onError
发出错误数据,一样也会终止流。
订阅后的回调用表达式表示就是onSubscribe onNext* (onError | onComplete)?
,即以一个onSubscribe
开始,中间有0个或多个onNext
,最后有0个或1个onError
或onComplete
事件。
Publisher
和Subscriber
融合了迭代器模式和观察者模式。
咱们常常用到的Iterable
和Iterator
就是迭代器模式的体现,能够知足上边第1和2个特色关于按需处理数据流的要求;而观察者模式基于事件的回调机制有助于知足第3个特色关于异步传递元素的要求。
3.Subscription
是Publisher
和Subscriber
的“中间人”。
public interface Subscription { public void request(long n); public void cancel(); }
当发布者调用subscribe
方法注册订阅者时,会经过订阅者的回调方法onSubscribe
传入Subscription
对象,以后订阅者就可使用这个Subscription
对象的request
方法向发布者“要”数据了。回压机制正是基于此来实现的,所以第4个特色也可以实现了。
4.Processor
集Publisher
和Subscriber
于一身。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
这四个接口在JEP 266跟随Java 9版本被引入了Java SDK。
这四个接口是实现各开发库之间互相兼容的桥梁,响应式流规范也仅仅聚焦于此,而对诸如转换、合并、分组等等的操做一律未作要求,所以是一个很是抽象且精简的接口规范。
若是这时候有人要造轮子,再写一套响应式开发库,如何基于这几个接口展开呢?
Reactor 3是遵循响应式流规范的实现,所以,小撸一把Reactor的源码有助于咱们理解规范中定义的接口的使用。
Reactor中,咱们最早接触的生成Publisher
的方法就是Flux.just()
,下面咱们来动手写代码模拟一下Reactor的实现方式。不过具有生产能力开发库会考虑性能、并发安全性等诸多因素,所谓“照虎画猫”,咱们的代码只是模拟出实现思路,代码少的多,但五脏俱全。
源码位于:https://github.com/get-set/get-reactive/tree/master/my-reactor
首先,引入响应式流规范的四个接口定义,基于Java 9的话能够直接使用java.util.concurrent.Flow:
<dependency> <groupId>org.reactivestreams</groupId> <artifactId>reactive-streams</artifactId> <version>1.0.2</version> </dependency>
首先建立最最基础的类Flux
,它是一个Publisher
。
package reactor.core.publisher; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; public abstract class Flux<T> implements Publisher<T> { public abstract void subscribe(Subscriber<? super T> s); }
在Reactor中,Flux
既是一个发布者,又充当工具类的角色,当咱们用Flux.just()
、Flux.range()
或Flux.interval()
等工厂方法生成Flux
时,会new一个新的Flux
,好比Flux.just
会返回一个FluxArray
对象。
public static <T> Flux<T> just(T... data) { return new FluxArray<>(data); }
返回的FluxArray
对象是Flux.just
生成的Publisher
,它继承自Flux
,并实现了subscribe
方法。
public class FluxArray<T> extends Flux<T> { private T[] array; // 1 public FluxArray(T[] data) { this.array = data; } @Override public void subscribe(Subscriber<? super T> actual) { actual.onSubscribe(new ArraySubscription<>(actual, array)); // 2 } }
FluxArray
内部使用一个数组来保存数据;subscribe
方法一般会回调Subscriber
的onSubscribe
方法,该方法须要传入一个Subscription
对象,从而订阅者以后能够经过回调传回的Subscription
的request
方法跟FluxArray
请求数据,这也是回压的应有之义。继续编写ArraySubscription
:
public class FluxArray<T> extends Flux<T> { ... static class ArraySubscription<T> implements Subscription { // 1 final Subscriber<? super T> actual; final T[] array; // 2 int index; boolean canceled; public ArraySubscription(Subscriber<? super T> actual, T[] array) { this.actual = actual; this.array = array; } @Override public void request(long n) { if (canceled) { return; } long length = array.length; for (int i = 0; i < n && index < length; i++) { actual.onNext(array[index++]); // 3 } if (index == length) { actual.onComplete(); // 4 } } @Override public void cancel() { // 5 this.canceled = true; } } }
ArraySubscription
是一个静态内部类。静态内部类是最简单的一种内部类,你尽能够把它当成普通的类,只不过刚好定义在其余类的内部;Subscription
内也有一份数据;onNext
方法传递元素;onComplete
方法;Subscription
取消订阅。到此为止,发布者就开发完了。咱们测试一下:
@Test public void fluxArrayTest() { Flux.just(1, 2, 3, 4, 5).subscribe(new Subscriber<Integer>() { // 1 @Override public void onSubscribe(Subscription s) { System.out.println("onSubscribe"); s.request(6); // 2 } @Override public void onNext(Integer integer) { System.out.println("onNext:" + integer); } @Override public void onError(Throwable t) { } @Override public void onComplete() { System.out.println("onComplete"); } }); }
Subscriber
经过匿名内部类定义,其中须要实现接口的四个方法;测试方法运行以下:
1 2 3 4 5 Completed.
若是请求3个元素呢?输出以下:
1 2 3
没有完成事件,OK,一个简单的Flux.just
就完成了,经过这个例子咱们可以初步摸出Flux
工厂方法的一些“套路”:
Flux
子类的实例,如FluxArray
;FluxArray
的subscribe
方法会返回给订阅者一个Subscription
实现类的对象,这个ArraySubscription
是FluxArray
的静态内部类,定义了“如何发布元素”的逻辑;ArraySubscription
对象向发布者请求n个数据;发布者也能够借助这个ArraySubscription
对象向订阅者传递数据元素(onNext/onError/onComplete)。用图来表示以下(因为Subscription是静态内部类,能够看作普通类,就单独放一边了):
上图的这个过程基本适用于大多数的用于生成Flux
/Mono
的静态工厂方法,如Flux.just
、Flux.range
等。
首先,使用相似Flux.just
的方法建立发布者后,会建立一个具体的发布者(Publisher
),如FluxArray
。
.subscribe
订阅这个发布者时,首先会new一个具备相应逻辑的Subscription
(如ArraySubscription
,这个Subscription
定义了如何处理下游的request
,以及如何“发出数据”);Subscription
经过订阅者的.onSubscribe
方法传给订阅者;.onSubscribe
方法中,须要经过Subscription
发起第一次的请求.request
;Subscription
收到请求,就能够经过回调订阅者的onNext
方法发出元素了,有多少发多少,但不能超过请求的个数;onNext
中一般定义对元素的处理逻辑,处理完成以后,能够继续发起请求;onComplete
予以告知;固然序列发送过程当中若是有错误,则经过订阅者的onError
予以告知并传递错误信息;这两种状况都会致使序列终止,订阅过程结束。以上从1~7这些阶段称为订阅期(subscribe time)。
响应式开发库的一个很赞的特性就是能够像组装流水线同样将操做符串起来,用来声明复杂的处理逻辑。好比:
Flux ff = Flux.just(1, 2, 3, 4, 5) .map(i -> i * i) .filter(i -> (i % 2) == 0); ff.subscribe(...)
经过源码,咱们能够了解这种“流水线”的实现机制。下面咱们仍然是经过照虎画猫的方式模拟一下Reactor中Flux.map
的实现方式。
Flux.map
用于实现转换,转换后元素的类型可能会发生变化,转换的逻辑由参数Function
决定。方法自己返回的是一个转换后的Flux
,基于此,该方法实现以下:
public abstract class Flux<T> implements Publisher<T> { ... public <V> Flux<V> map(Function<? super T, ? extends V> mapper) { // 1 return new FluxMap<>(this, mapper); // 2 } }
FluxMap
就是新的Flux。既然FluxMap
是一个新的Flux,那么与2.1.2中FluxArray
相似,其内部定义有MapSubscription
,这是一个Subscription
,可以根据其订阅者的请求发出数据。
public class FluxMap<T, R> extends Flux<R> { private final Flux<? extends T> source; private final Function<? super T, ? extends R> mapper; public FluxMap(Flux<? extends T> source, Function<? super T, ? extends R> mapper) { this.source = source; this.mapper = mapper; } @Override public void subscribe(Subscriber<? super R> actual) { source.subscribe(new MapSubscriber<>(actual, mapper)); } static final class MapSubscription<T, R> implements Subscription { private final Subscriber<? super R> actual; private final Function<? super T, ? extends R> mapper; MapSubscriber(Subscriber<? super R> actual, Function<? super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void request(long n) { // 1 // TODO 收到请求,发出元素 } @Override public void cancel() { // TODO 取消订阅 } } }
map
操做符并不产生数据,只是数据的搬运工。收到request
后要发出的数据来自上游。因此MapSubscription
同时也应该是一个订阅者,它订阅上游的发布者,并将数据处理后传递给下游的订阅者(为了跟Reactor源码一致,将MapSubscription
更名为MapSubscriber
,其实没差)。
如图,对下游是做为发布者,传递上游的数据到下游;对上游是做为订阅者,传递下游的请求到上游。
static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription { // 1 ... }
Subscriber
和Subscription
。这样,总共有5个方法要实现:来自Subscriber
接口的onSubscribe
、onNext
、onError
、onComplete
,和来自Subscription
接口的request
和cancel
。下面咱们本着“搬运工”的角色实现这几个方法便可。
static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription { private final Subscriber<? super R> actual; private final Function<? super T, ? extends R> mapper; boolean done; Subscription subscriptionOfUpstream; MapSubscriber(Subscriber<? super R> actual, Function<? super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void onSubscribe(Subscription s) { this.subscriptionOfUpstream = s; // 1 actual.onSubscribe(this); // 2 } @Override public void onNext(T t) { if (done) { return; } actual.onNext(mapper.apply(t)); // 3 } @Override public void onError(Throwable t) { if (done) { return; } done = true; actual.onError(t); // 4 } @Override public void onComplete() { if (done) { return; } done = true; actual.onComplete(); // 5 } @Override public void request(long n) { this.subscriptionOfUpstream.request(n); // 6 } @Override public void cancel() { this.subscriptionOfUpstream.cancel(); // 7 } }
onSubscribe
,将自身做为Subscription
传递过去;从这个对源码的模仿,能够体会到,当有多个操做符串成“操做链”的时候:
onSubscribe
、onNext
、onError
、onComplete
)是经过每个操做符向下传递的,传递的过程当中进行相应的操做处理,这一点并不难理解;request
,所以回压(backpressure)能够实现从下游向上游的传递。这一节最开头的那一段代码的执行过程以下图所示:
在1.3.2节的时候,介绍了.subscribe
的几个不一样方法签名的变种:
subscribe( Consumer<? super T> consumer) subscribe( @Nullable Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer) subscribe( @Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer) subscribe( @Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer)
用起来很是方便,可是响应式流规范中只定义了一个订阅方法subscribe(Subscriber subscriber)
。实际上,这几个方法最终都是调用的subscribe(LambdaSubscriber subscriber)
,并经过LambdaSubscriber
实现了对不一样个数参数的组装。以下图所示:
所以,
flux.subscribe(System.out::println, System.err::println);
是调用的:
flux.subscribe(new LambdaSubscriber(System.out::println, System.err::println, null, null));