上篇文章咱们简单的介绍了Reactor的发展史和基本的Flux和Mono的使用,本文将会进一步挖掘Reactor的高级用法,一块儿来看看吧。java
以前的文章咱们提到了4个Flux的subscribe的方法:react
Disposable subscribe(); Disposable subscribe(Consumer<? super T> consumer); Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer); Disposable subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer);
这四个方法,须要咱们使用lambda表达式来自定义consumer,errorConsumer,completeSonsumer和subscriptionConsumer这四个Consumer。git
写起来比较复杂,看起来也不太方便,咱们考虑一下,这四个Consumer是否是和Subscriber接口中定义的4个方法是一一对应的呢?github
public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); }
对的,因此咱们有一个更加简单点的subscribe方法:多线程
public final void subscribe(Subscriber<? super T> actual)
这个subscribe方法直接接收一个Subscriber类。从而实现了全部的功能。异步
本身写Subscriber太麻烦了,Reactor为咱们提供了一个BaseSubscriber的类,它实现了Subscriber中的全部功能,还附带了一些其余的方法。 ide
咱们看下BaseSubscriber的定义:fetch
public abstract class BaseSubscriber<T> implements CoreSubscriber<T>, Subscription, Disposable
注意,BaseSubscriber是单次使用的,这就意味着,若是它首先subscription到Publisher1,而后subscription到Publisher2,那么将会取消对第一个Publisher的订阅。
由于BaseSubscriber是一个抽象类,因此咱们须要继承它,而且重写咱们须要本身实现的方法。this
下面看一个自定义的Subscriber:线程
public class CustSubscriber<T> extends BaseSubscriber<T> { public void hookOnSubscribe(Subscription subscription) { System.out.println("Subscribed"); request(1); } public void hookOnNext(T value) { System.out.println(value); request(1); } }
BaseSubscriber中有不少以hook开头的方法,这些方法都是咱们能够重写的,而Subscriber原生定义的on开头的方法,在BaseSubscriber中都是final的,都是不能重写的。
咱们看一个定义:
@Override public final void onSubscribe(Subscription s) { if (Operators.setOnce(S, this, s)) { try { hookOnSubscribe(s); } catch (Throwable throwable) { onError(Operators.onOperatorError(s, throwable, currentContext())); } } }
能够看到,它内部实际上调用了hook的方法。
上面的CustSubscriber中,咱们重写了两个方法,一个是hookOnSubscribe,在创建订阅的时候调用,一个是hookOnNext,在收到onNext信号的时候调用。
在这些方法中,给了咱们足够的自定义空间,上面的例子中咱们调用了request(1),表示再请求一个元素。
其余的hook方法还有: hookOnComplete, hookOnError, hookOnCancel 和 hookFinally。
咱们以前讲过了,reactive stream的最大特征就是能够处理Backpressure。
什么是Backpressure呢?就是当consumer处理过不来的时候,能够通知producer来减小生产速度。
咱们看下BaseSubscriber中默认的hookOnSubscribe实现:
protected void hookOnSubscribe(Subscription subscription){ subscription.request(Long.MAX_VALUE); }
能够看到默认是request无限数目的值。 也就是说默认状况下没有Backpressure。
经过重写hookOnSubscribe方法,咱们能够自定义处理速度。
除了request以外,咱们还能够在publisher中限制subscriber的速度。
public final Flux<T> limitRate(int prefetchRate) { return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate)); }
在Flux中,咱们有一个limitRate方法,能够设定publisher的速度。
好比subscriber request(100),而后咱们设置limitRate(10),那么最多producer一次只会产生10个元素。
接下来,咱们要讲解一下怎么建立Flux,一般来说有4种方法来建立Flux。
第一种方法就是最简单的同步建立的generate.
先看一个例子:
public void useGenerate(){ Flux<String> flux = Flux.generate( () -> 0, (state, sink) -> { sink.next("3 x " + state + " = " + 3*state); if (state == 10) sink.complete(); return state + 1; }); flux.subscribe(System.out::println); }
输出结果:
3 x 0 = 0 3 x 1 = 3 3 x 2 = 6 3 x 3 = 9 3 x 4 = 12 3 x 5 = 15 3 x 6 = 18 3 x 7 = 21 3 x 8 = 24 3 x 9 = 27 3 x 10 = 30
上面的例子中,咱们使用generate方法来同步的生成元素。
generate接收两个参数:
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
第一个参数是stateSupplier,用来指定初始化的状态。
第二个参数是一个generator,用来消费SynchronousSink,并生成新的状态。
上面的例子中,咱们每次将state+1,一直加到10。
而后使用subscribe来将全部的生成元素输出。
Flux也提供了一个create方法来建立Flux,create能够是同步也能够是异步的,而且支持多线程操做。
由于create没有初始的state状态,因此能够用在多线程中。
create的一个很是有用的地方就是能够将第三方的异步API和Flux关联起来,举个例子,咱们有一个自定义的EventProcessor,当处理相应的事件的时候,会去调用注册到Processor中的listener的一些方法。
interface MyEventListener<T> { void onDataChunk(List<T> chunk); void processComplete(); }
咱们怎么把这个Listener的响应行为和Flux关联起来呢?
public void useCreate(){ EventProcessor myEventProcessor = new EventProcessor(); Flux<String> bridge = Flux.create(sink -> { myEventProcessor.register( new MyEventListener<String>() { public void onDataChunk(List<String> chunk) { for(String s : chunk) { sink.next(s); } } public void processComplete() { sink.complete(); } }); }); }
使用create就够了,create接收一个consumer参数:
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
这个consumer的本质是去消费FluxSink对象。
上面的例子在MyEventListener的事件中对FluxSink对象进行消费。
push和create同样,也支持异步操做,可是同时只能有一个线程来调用next, complete 或者 error方法,因此它是单线程的。
Handle和上面的三个方法不一样,它是一个实例方法。
它和generate很相似,也是消费SynchronousSink对象。
Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);
不一样的是它的参数是一个BiConsumer,是没有返回值的。
看一个使用的例子:
public void useHandle(){ Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20) .handle((i, sink) -> { String letter = alphabet(i); if (letter != null) sink.next(letter); }); alphabet.subscribe(System.out::println); } public String alphabet(int letterNumber) { if (letterNumber < 1 || letterNumber > 26) { return null; } int letterIndexAscii = 'A' + letterNumber - 1; return "" + (char) letterIndexAscii; }
本文的例子learn-reactive
本文做者:flydean程序那些事本文连接:http://www.flydean.com/reactor-core-in-depth/
本文来源:flydean的博客
欢迎关注个人公众号:「程序那些事」最通俗的解读,最深入的干货,最简洁的教程,众多你不知道的小技巧等你来发现!