在读 Hystrix 源码时,发现一些奇特的写法。稍做搜索,知道使用了最新流行的响应式编程库RxJava。那么响应式编程到底是怎样的呢? 本文对响应式编程及 RxJava 库做一个初步的探索。html
在学习新的编程模型时,我喜欢将其与原来的编程模型联系起来。由于新的编程模型每每是对原来编程模型的承袭和组合。响应式编程的两个基本要素是:java
函数式编程,在以前的文章 “彻底”函数式编程”、“Java8函数式编程探秘”、“精练代码:一次Java函数式编程的重构之旅” 等有较多探索,观察者模式在 “设计模式之观察者模式:实现配置更新实时推送” 有讲述过。咱们将在这二者的基础上探索响应式编程。
react
初次接触 RxJava ,很容易被一连串的 Observer, Observable, Disposable, subscribeOn, onSubscribe, onNext, onError, onComplete 等绕晕。不过软件里面无新鲜事。大多无非是用一种新的方式来组织逻辑罢了。基于观察者模式的事件驱动也不例外。咱们只要梳理清楚脉络,就能够容易地理解。观察者模式有三个基本参与者:git
基本流程是:被观察者 Observable 装备发射装置 Emitter,发射消息,建立事件;观察者 Observer 监听到事件,接收观察者发射的消息,调用对应的函数 onNext, onError 和 onComplete 进行处理。onError 和 OnComplete 只能有一个被触发。github
不妨写个基本 Demo 来模拟下基本流程。为了更好滴理解,我把三者都区分开了。编程
首先定义观察者 MyObserver,继承抽象类 DefaultObserver ,这样实现成本最小。json
package zzz.study.reactor; import com.alibaba.fastjson.JSON; import io.reactivex.observers.DefaultObserver; /** * @Description 观察者定义 * @Date 2021/1/23 4:13 下午 * @Created by qinshu */ public class MyObserver extends DefaultObserver { @Override public void onStart() { System.out.println("MyObserver: Start"); } @Override public void onNext(Object o) { System.out.println("Observed: " + JSON.toJSONString(o)); } @Override public void onError(Throwable e) { System.out.println("Observed: " + e.getMessage()); } @Override public void onComplete() { System.out.println("MyObserver: Complete"); } }
接着,定义发射装置(发射消息) MyEmitter:设计模式
package zzz.study.reactor; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import java.util.Random; import java.util.concurrent.TimeUnit; /** * @Description 发射装置 * @Date 2021/1/24 7:04 上午 * @Created by qinshu */ public class MyEmitter implements ObservableOnSubscribe { Random random = new Random(System.currentTimeMillis()); @Override public void subscribe(ObservableEmitter emitter) throws Exception { TimeUnit.SECONDS.sleep(1); emitter.onNext("next"); if (random.nextInt(3) == 0) { emitter.onError(new RuntimeException("A RuntimeException")); } else { emitter.onComplete(); } } }
最后,建立被观察者,并串起流程:并发
package zzz.study.reactor; import io.reactivex.Observable; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; /** * @Description RxJava基本Demo * @Date 2021/1/23 12:28 下午 * @Created by qinshu */ public class RxJavaBasic { public static void main(String[] args) { for (int i=0; i<5; i++) { ObservableOnSubscribe observableOnSubscribe = new MyEmitter(); Observable observable = Observable.create(observableOnSubscribe); Observer observer = new MyObserver(); observable.subscribe(observer); } } }
运行,可得结果:dom
MyObserver: Start Observed: "next" MyObserver: Complete MyObserver: Start Observed: "next" MyObserver: Complete MyObserver: Start Observed: "next" Observed: A RuntimeException MyObserver: Start Observed: "next" MyObserver: Complete MyObserver: Start Observed: "next" MyObserver: Complete
如何理解上述流程及结果呢?最好的办法就是单步调试。通过单步调试,能够知道整个过程以下:
步骤1: 整个过程由这一行触发 observable.subscribe(observer);
,会去调用 Observable.subscribeActual 方法,分派给具体实现类 ObservableCreate.subscribeActual ;单步调试的好处就是能肯定具体实现者;
步骤2: ObservableCreate.subscribeActual 所作的事情,调用 observer.onSubscribe ( MyObserver.onStart 方法 ),而后转发给 MyEmitter.subscribe 来发射消息。
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
步骤3:MyEmitter 执行 onNext ,分派给具体实现类 CreateEmitter.onNext ,进而调用 observer.onNext 方法;
步骤4:MyEmitter 执行 onError ,分派给具体实现类 CreateEmitter.onError ,进而 调用 observer.onError 方法;若是 MyEmitter 发射 onComplete ,那么就会分派给具体实现类 CreateEmitter.onComplete ,进而调用 observer.onComplete 方法。注意,onError 和 onComplete 二者只可能执行一个。
基本流程就是这样。
除了订阅自定义 Emitter 来发射消息,类 Observable 还提供了各类工具方法,更便捷滴作订阅和推送。好比:
public static void testDirectSubscribe() { Observable.fromArray("I", "Have", "a", "dream").subscribe(new MyObserver()); }
会输出:
MyObserver: Start Observed: "I" Observed: "Have" Observed: "a" Observed: "dream" MyObserver: Complete
具体实现是: fromArray 方法会建立一个 Observable 的具体类 ObservableFromArray,而这个类的 subscribeActual 方法会建立一个 FromArrayDisposable 来处理。FromArrayDisposable 的 run 方法被调用,依次遍历所指定列表,调用 observer.onNext ,最后调用 observer.onComplete。具体源码以下:
public final class ObservableFromArray<T> extends Observable<T> { final T[] array; public ObservableFromArray(T[] array) { this.array = array; } @Override public void subscribeActual(Observer<? super T> observer) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array); observer.onSubscribe(d); if (d.fusionMode) { return; } d.run(); } static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> { final Observer<? super T> downstream; final T[] array; int index; boolean fusionMode; volatile boolean disposed; FromArrayDisposable(Observer<? super T> actual, T[] array) { this.downstream = actual; this.array = array; } // other methods @Override public void dispose() { disposed = true; } @Override public boolean isDisposed() { return disposed; } void run() { T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { downstream.onError(new NullPointerException("The element at index " + i + " is null")); return; } downstream.onNext(value); } if (!isDisposed()) { downstream.onComplete(); } } } }
那么 Disposable 的意义何在呢 ? 个人理解是:它做为订阅完成的一个流程闭环。好比重复订阅同一个观察者,以下代码:
public static void testDirectSubscribe() { Observer observer = new MyObserver(); Observable.fromArray("I", "Have", "a", "dream").subscribe(observer); Observable.fromArray("changed").subscribe(observer); }
会抛出异常:
io.reactivex.exceptions.ProtocolViolationException: It is not allowed to subscribe with a(n) zzz.study.reactor.MyObserver multiple times. Please create a fresh instance of zzz.study.reactor.MyObserver and subscribe that to the target source instead. at io.reactivex.internal.util.EndConsumerHelper.reportDoubleSubscription(EndConsumerHelper.java:148) at io.reactivex.internal.util.EndConsumerHelper.validate(EndConsumerHelper.java:57) at io.reactivex.observers.DefaultObserver.onSubscribe(DefaultObserver.java:70) at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:34) at io.reactivex.Observable.subscribe(Observable.java:12284) at zzz.study.reactor.RxJavaBasic.testDirectSubscribe(RxJavaBasic.java:34) at zzz.study.reactor.RxJavaBasic.main(RxJavaBasic.java:17)
这个异常是在调用 DefaultObserver.onSubscribe 抛出的:
@Override public final void onSubscribe(@NonNull Disposable d) { if (EndConsumerHelper.validate(this.upstream, d, getClass())) { this.upstream = d; onStart(); } } public static boolean validate(Disposable upstream, Disposable next, Class<?> observer) { ObjectHelper.requireNonNull(next, "next is null"); if (upstream != null) { next.dispose(); if (upstream != DisposableHelper.DISPOSED) { reportDoubleSubscription(observer); } return false; } return true; }
这就是说,若是同一个观察者,它的上一个 Disposable 订阅没有结束,那么再次订阅 Disposable 就会出错。怎么解决呢?能够在 MyObserver 的 onError 和 onComplete 添加 super.cancel 调用,能够结束上一次的订阅,再次订阅就不抛出异常了:
@Override public void onError(Throwable e) { System.out.println("Observed: " + e.getMessage()); super.cancel(); } @Override public void onComplete() { System.out.println("MyObserver: Complete"); super.cancel(); } /** * Cancels the upstream's disposable. */ protected final void cancel() { Disposable upstream = this.upstream; this.upstream = DisposableHelper.DISPOSED; upstream.dispose(); }
可是,即使这样,也没法发射咱们新的订阅消息。这是由于上一次的 upstream 不为 null,本次的订阅就没法发射。
咱们无法覆写 DefaultObserver.onSubscribe 方法,由于该方法声明为 final 的,且 upstream 声明为 private ,也没有公共方法能够设置 upstream。这明确代表了设计者的意图:这是 Observer 订阅 Disposable 的前置检测约定,不可被破坏,不然后果自负。
咱们能够绕过 DefaultObserver , 不继承它,而是直接实现 Observer 接口:
public static void testDirectSubscribe() { Observer observer = new RepeatedSubscribeMyObserver(); Observable.fromArray("I", "Have", "a", "dream").subscribe(observer); Observable.fromArray("changed").subscribe(observer); } /** * @Description 可重复订阅的观察者 * @Date 2021/1/24 10:11 上午 * @Created by qinshu */ public class RepeatedSubscribeMyObserver<T> implements Observer<T> { public Disposable upstream; @Override public void onSubscribe(@NonNull Disposable d){ System.out.println(getName() + ": Start"); this.upstream = d; } @Override public void onNext(T o) { System.out.println(getName() + ": " + JSON.toJSONString(o)); } @Override public void onError(Throwable e) { System.out.println(getName() + "RepeatedSubscribeMyObserver: " + e.getMessage()); cancel(); } @Override public void onComplete() { System.out.println(getName() + "RepeatedSubscribeMyObserver: Complete"); cancel(); } public String getName() { return this.getClass().getSimpleName(); } /** * Cancels the upstream's disposable. */ protected final void cancel() { Disposable upstream = this.upstream; this.upstream = DisposableHelper.DISPOSED; upstream.dispose(); } }
这样就能够实现屡次订阅同一个 Observer 了。运行结果:
RepeatedSubscribeMyObserver: Start RepeatedSubscribeMyObserver: "I" RepeatedSubscribeMyObserver: "Have" RepeatedSubscribeMyObserver: "a" RepeatedSubscribeMyObserver: "dream" RepeatedSubscribeMyObserver: Complete RepeatedSubscribeMyObserver: Start RepeatedSubscribeMyObserver: "changed" RepeatedSubscribeMyObserver: Complete
弄懂了 Observable.fromArray 的实现原理,就弄清楚了 Observable 中不少基本方法的基本套路。好比 just 方法有两个及以上参数时,实际上是 fromArray 的包装,而 range 方法则是建立一个 RangeDisposable 来处理。
Observable.just(1,2,3).subscribe(observer); Observable.range(1,4).subscribe(observer);
上文谈到了响应式编程的一大基本元素是函数式编程。函数式的优点是能够无限叠加组合,构建出灵活多变的函数和行为。这使得观察者的行为也能够定制得更加灵活。能够组合多个 Observable 的发射行为。
合并
简单的组合使用 merge 方法,构造一个 Observable 的列表,依次遍历合并后的每一个 Observable 的发射信息:
Iterable<? extends ObservableSource<? extends Integer>> observableSourceSet = Sets.newHashSet( Observable.fromArray(3,4,5), Observable.range(10,3) ); Observable.merge(observableSourceSet).subscribe(observer);
流式
Observable 能够经过 Stream 进行组合,这里就是函数式编程的用武之地了。以下代码所示:
Observable.range(1,10).filter(x -> x%2 ==0).subscribe(observer);
注意到,这里使用到了装饰器模式。filter 方法会建立一个 ObservableFilter 对象,而在这个对象里,subscribeActual 方法会建立一个 FilterObserver 将传入的 observer 装饰起来。downstream 便是传入的 observer。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Observable<T> filter(Predicate<? super T> predicate) { ObjectHelper.requireNonNull(predicate, "predicate is null"); return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate)); } public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> { final Predicate<? super T> predicate; public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) { super(source); this.predicate = predicate; } @Override public void subscribeActual(Observer<? super T> observer) { source.subscribe(new FilterObserver<T>(observer, predicate)); // FilterObserver 装饰了传入的自定义的 observer } static final class FilterObserver<T> extends BasicFuseableObserver<T, T> { final Predicate<? super T> filter; FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) { super(actual); this.filter = filter; } @Override public void onNext(T t) { // 这里对传入的 Observer.onNext 作了个装饰,仅当条件成立时才调用 if (sourceMode == NONE) { boolean b; try { b = filter.test(t); } catch (Throwable e) { fail(e); return; } if (b) { downstream.onNext(t); // downstream 便是咱们传入的自定义的 Observer } } else { downstream.onNext(null); } } }
正如 filter 对发射数据流进行过滤,map 或 flatMap 则对发射数据流进行映射变换,与 stream.map 或 stream.flatMap 的功能相似:
Observable.range(1,10).map(x -> x*x).subscribe(observer); Observable.range(1,10).flatMap(x -> Observable.just(x*x)).subscribe(observer);
map 方法将建立一个 ObservableMap 对象,在 subscribeActual 中用 MapObserver 将所传入的 observer 装饰起来;flatMap 将建立一个 ObservableFlatMap 对象,在 subscribeActual 中 MergeObserver 将传入的 observer 装饰起来。
还可使用 scan:对于生成的每一个值,使用累加器 (x,y) -> x*y 生成新的值并发射。
Observable.range(1, 10).scan(1, (x,y) -> x*y).subscribe(observer);
最后再给个分组的示例:
Observable.just(28,520,25,999).groupBy( i -> ( i > 100 ? "old": "new")).subscribe(new GroupedRepeatedSubscribeMyObserver()); /** * @Description 可重复订阅的分组观察者 * @Date 2021/1/24 10:11 上午 * @Created by qinshu */ public class GroupedRepeatedSubscribeMyObserver extends RepeatedSubscribeMyObserver<GroupedObservable> { @Override public void onNext(GroupedObservable o) { o.subscribe(new RepeatedSubscribeMyObserver() { @Override public void onNext(Object v) { String info = String.format("GroupedRepeatedSubscribeMyObserver: [group=%s][value=%s]", o.getKey(), JSON.toJSONString(v)); System.out.println(info); } }); } }
groupBy 方法生成的是一个 GroupedObservable ,所以要订阅一个 Observer
本文先写到这里。
项目代码见工程: “ALLIN” 的包 zzz.study.reactor 下。须要引入 Maven 依赖:
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.2.20</version> </dependency>
本文讲解了响应式编程及 RxJava 库的最基本概念:Observable , Observer 及 Emitter, Disposable ,也讲到了如何组合 Observable 来构建更灵活的消息发射机制。这些基本构成了响应式编程的基本骨架流程。
响应式编程的强大能力构建在事件驱动机制和函数式编程上,里面大量应用了装饰器模式。所以,熟悉这些基本编程思想,对掌握响应式编程模型亦大有裨益。