本文做者JasonChen,原文地址: chblog.me/2018/12/19/…html
ReactiveX 响应式编程库,这是一个程序库,经过使用可观察的事件序列来构成异步和事件驱动的程序。java
其简化了异步多线程编程,在之前多线程编程的世界中,锁、可重入锁、同步队列器、信号量、并发同步器、同步计数器、并行框架等都是具备必定的使用门槛,稍有不慎或者使用不成熟或对其源码理解不深刻都会形成相应的程序错误和程序性能的低下。android
24种设计模式的一种,观察者Observer和主题Subject之间创建组合关系:Subject类实例中包含观察者Observer的引用,增长引用的目的就是为了通知notify,重要点就是要在Subject的notify功能中调用Observer的接受处理函数receiveAndHandle。git
我的理解:观察者模型实际上是一种异步回调通知,将数据的处理者先注册到数据的输入者那边,这样经过数据输入者执行某个函数去调用数据处理者的某个处理方法。github
Rx有不少语言的实现库,目前比较出名的就是RxJava2。本文主要讲Rxjava2的部分源码解读,内部设计机制和内部执行的线程模型。编程
RxJava是近两年来愈来愈流行的一个异步开发框架,其使用起来十分简单方便,功能一应俱全,十分强大。设计模式
使用RxJava2大体分为四个操做:缓存
// 建立flowable
Flowable<Map<String, Map<String,Object>>> esFlowable = Flowable.create(new ElasticSearchAdapter(), BackpressureStrategy.BUFFER);
Disposable disposeable = esFlowable
// map操做 1.采集、2.清洗
.map(DataProcess::dataProcess)
.subscribeOn(Schedulers.single())
//计算任务调度器
.observeOn(Schedulers.computation())
// 订阅者 consumer 执行运算
.subscribe(keyMaps -> new PredictEntranceForkJoin().predictLogic(keyMaps));
复制代码
以上就是一个实际的例子,里面的ElasticSearchAdapter实际隐藏了一个用户自定义实现数据生产的subscribe接口:bash
FlowableOnSubscribe<T> source
复制代码
用户须要实现这个接口函数:微信
void subscribe(@NonNull FlowableEmitter<T> emitter) throws Exception
复制代码
这个接口主要用于内部回调,后面会有具体分析, emitter 英文翻译发射器,很形象,数据就是由它产生的,也是业务系统须要对接的地方,通常业务代码实现这个接口类而后发射出须要处理的原始数据。
map函数做为数据变换处理的功能函数将原来的数据输入变换为另外的数据集合,而后设置发布的线程池机制subscribeOn(Schedulers.single())
,订阅的线程池机制observeOn(Schedulers.computation())
,最后添加数据订阅函数,也就是业务系统须要实现另一个地方,从而实现数据的自定义处理消费。
有一篇博客详细介绍了rxjava的各类操做符,连接maxwell-nc.github.io/android/rxj…
阅读源码我的比较喜欢带着疑惑去看,这样与目标有方向。接下来的分析以Flowable为例,这里全部的例子都是按照Flowable为例,由于Flowable在实际项目中比Observable可能用的多,由于实际场景中数据生产速度和数据消费速度都会有必定的不一致甚至数据生产速度远大于数据消费速度。
首先从数据订阅者开始,点进源码看进一步解析,里面有不少subscribe重载接口:
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Subscription> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
//组装成FlowableSubscriber
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
//调用核心的订阅方法
subscribe(ls);
return ls;
}
public final void subscribe(FlowableSubscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
//注册一些钩子这里对此不进行讲解,主要不是核心方法
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//核心订阅方法,从名字也能读出是指订阅实际调用处
//不一样的数据产生类也就是实现Flowable抽象类的类
//好比FlowableCreate,FlowSingle,FlowMap等等去实现本身的实际方法
subscribeActual(z);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Subscription has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
复制代码
下面选择FlowCreate的subscribeActual(Subscriber<? super T> t)方法进行剖析。
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
//根据不一样的回压模式选择不同的数据发射类
//神奇的回压模式其实本质上就是一个个数据发射-消费模式
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
//...
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
//回调注册的FlowableSubscriber的onSubscribe方法
//这里很是重要,由于这里涉及了rxjava特有的 request请求再消费数据的模式
//也就是说若是没有request数据,那么就不会调用数据发射(发布)者的onNext方法,
//那么数据订阅者也就不会消费到数据
t.onSubscribe(emitter);
try {
//回调注册的FlowableOnSubscribe<T> source的subscribe方法
//这个source其实就是在建立Flow流时注册的数据产生类,进一步验证了上文中
//说起的其须要实现FlowableOnSubscribe<T>接口
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
//重点分析BufferAsyncEmitter这个类,看字面意思这是一个switch的默认选择类,
//但其实它是回压策略为BUFFER时的数据发射类
//首先这个类的构造函数具备两个参数,很明显这是 actul就是前面的t这个变量,也就是
//注册的数据消费(订阅)者,capacityHint则是设置容量大小的,默认是128,若是须要扩大须要
//自行设置环境变量 rx2.buffer-size
BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
super(actual);
this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
this.wip = new AtomicInteger();
}
public void onNext(T t) {
if (done || isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// queue 是存储元素的队列,也就是buffer的核心存储。
// 当咱们开始向下游发送数据的时候首先存入队列,而后下面的drain则是进行核心的
queue.offer(t);
drain();
}
//核心的类
void drain() {
//关键的地方 解决生产速率和消费速率不一致的关键地方,也是咱们写并发程序值得借鉴的地方。
//当数据的产生者(发布)频繁调用onNext方法时,这里产生并发调用关系,wip变量是atomic变量,
//当第一次执行drain函数时,为0继续执行后面的流程,当快速的继续调用onNext方法时,wip不为0而后返回
//那么后面的流程咱们其实已经很大几率会猜想到应该是去取队列的数据而后作一些操做
if (wip.getAndIncrement() != 0) {
return;
}
int missed = 1;
//这里的downstream其实就是注册的数据订阅者,它是基类BaseEmitter的变量,前面初始化时调用了基类的构造函数
final Subscriber<? super T> a = downstream;
final SpscLinkedArrayQueue<T> q = queue;
for (;;) {
long r = get();
long e = 0L;
while (e != r) {
if (isCancelled()) {
q.clear();
return;
}
boolean d = done;
//取队列中的数据
T o = q.poll();
boolean empty = o == null;
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
if (empty) {
break;
}
//此处回调订阅者的onNext方法去真正的执行数据实例程序
//到此数据从产生到消费其生命周期已经走完
a.onNext(o);
e++;
}
if (e == r) {
if (isCancelled()) {
q.clear();
return;
}
boolean d = done;
boolean empty = q.isEmpty();
if (d && empty) {
Throwable ex = error;
if (ex != null) {
error(ex);
} else {
complete();
}
return;
}
}
if (e != 0) {
//标记已经消费的个数
BackpressureHelper.produced(this, e);
}
//前面说过wip会原子性的增长,并且是每调用一次onNext增长一次
//missed从其名解释是指错过的意思,我的理解是错过消费的数据个数,错过消费
//的意思其实就是指没有进行a.onNext数据消费处理的数据
missed = wip.addAndGet(-missed);
if (missed == 0) {
//若是没有错过的数据也就是所有都消费完那就跳出for循环
//此处for循环方式和JUC源码中Doug Lea的作法都有相似之处
break;
}
}
}
复制代码
首先在进行源码分析以前讲述一下一种模式:装饰者模式 24种模式中的一种,在java io源码包中普遍应用 简单的来讲是与被装饰者具备相同接口父类同时又对被装饰者进行一层封装(持有被装饰者的引用),以此用来加上自身的特性。
回归主题,当咱们使用操做符和线程池机制的时候作法都是在数据发布者后面进行相应的函数操做:
Disposable disposeable = scheduleObservable
.map(aLong -> dataAdapter.handlerDpti())
.map(DataProcess::dataProcess)
.subscribeOn(Schedulers.single())
复制代码
那么为什么这么作,接下来咱们进行源码分析:
public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new FlowableMap<T, R>(this, mapper));
}
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
}
复制代码
这里是实例方法调用,传进了this对象这个很关键,这里其实就是咱们前面提到的装修者模式,持有上游对象也就是数据源source的引用。
以FlowableSubscribeOn为例进行分析,这个类常常会用到,由于其内部设置了线程池的机制因此在实际使用项目中会大量使用,那么是如何作到线程池方式的呢?进一步利用源码进行分析。
2.装饰者的内部代码分析
以subscribeOn 为例:
//很明显 实现的抽象类实际上是装修者抽象类
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> // 这个在前面咱们重点分析过这是实际订阅执行的类方法,其实也就是咱们说的装饰方法,里面实现了每一个类本身的特定“装修”方法 @Override public void subscribeActual(final Subscriber<? super T> s) {
// 获取订阅者,下一篇文章会重点讲述rxjava的线程池分配机制
Scheduler.Worker w = scheduler.createWorker();
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
// 跟前面同样调用数据订阅者的onSubscribe方法
s.onSubscribe(sos);
// 由分配的调度者进行订阅任务的执行
w.schedule(sos);
}
// 开始分析SubscribeOnSubscriber这个静态内部类的内部代码
// 实现了Runable用来异步执行
static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread> implements FlowableSubscriber<T>, Subscription, Runnable // 下游订阅引用 final Subscriber<? super T> downstream;
// 上游发射类引用
final AtomicReference<Subscription> upstream;
// 上游数据源引用 跟上游引用有区别,简单的说每一个上游数据源引用有本身的上游发射类
Publisher<T> source;
// 这里是装饰的核心代码
@Override
public void run() {
lazySet(Thread.currentThread());
// source即为上游,表示其所装饰的源
Publisher<T> src = source;
source = null;
// 调用上游的自身的subscribe方法,在上面一开始咱们说这个方法内部会去调用自身实现的subscribeActual方法
// 从而实现上游本身的特定方法,好比假设source是FlowCreate那么此处就会调用前面一开始咱们所讲到的数据的发射
src.subscribe(this);
}
// 既然已经保证了数据的发射那么数据的处理是否是也要处理
// 很明显这是调用了下游订阅者的onNext方法
@Override
public void onNext(T t) {
downstream.onNext(t);
}
复制代码
笔者喜欢总结,总结意味着咱们反思和学习前面的知识点,应用点以及自身的不足。