关于
Flowable
的源码解析能够看RxJava2 Flowable源码浅析java
关于
Subject
的源码解析能够看RxJava2 Subject源码浅析缓存
Observable
、Flowable
、Subject
Observer
、Subscrption
Observable#subscribe
才开始请求上游发送数据。当下游请求dispose()
中止通知上游中止发送。Rxjava1
开始就有人说Rxjava能够看做流水线,上游怎么加工对于下游来讲是无感知的,下游只要负责接收响应对应数据事件就行。对于rxajva的思考,能够参考一下:Rxjava沉思录系列和Rxjava主要负责人系列博客bash
通常cold Observable建立都是经过just
、create
、fromXX
、just
建立的。最简单粗暴的建立方式:多线程
Observable.create<String> { it.onNext("") }.subscribe()
复制代码
//[仅关注点相关代码]
//ObservableOnSubscribe仅一个subscribe方法
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}
public abstract class Observable<T> implements ObservableSource<T> {
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//RxJavaPlugins这是一个全局Hook,#onAssembly不实现默认直接返回
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public final void subscribe(Observer<? super T> observer) {
try {
........
//真正调用subscribe的实现
subscribeActual(observer);
}
......
}
//整个Observable惟一的抽象方法,由子Observable实现,经过这个方法将上游和下游关联起来
protected abstract void subscribeActual(Observer<? super T> observer);
}
复制代码
由Observable#create
真正返回的是ObservableCreate
,当调用Observable#subscribe
才真正通知上游Observable
开始发送数据。其实质是经过#subscribeActual
将上下游创建联系,并调用上游#subscribe
(在ObservableCreate
中就是ObservableOnSubscribe#subscribe
)方法通知上游,下游已订阅能够开始发送数据。app
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
//source(上游)即Observable#create传入的ObservableOnSubscribe
//这里就将上下游真正的联系了起来。
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
.....
}
}
复制代码
因此实质就是下游通知上游,下游已产生订阅触发上游下发数据/事件,上游再经过下发数据/事件,最终下游经过指定方法响应上游下发的数据/事件。因此一开始说的流水线方式就能够理解了。异步
由于每次下游产生一次订阅都会通知到上游的
#subscribe
,因此若是上游只在#subscribe
中去建立初始数据源就能够每一个作到不一样下游的数据不关联ide
Observable.create<String> { it.onNext("") }.subscribe()
流程图以下:post
//Observable#map
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
//**ObservableMap将上游Observable和当前的转换Function创建联系
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
//ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
//将下游包装成MapObserver,并将MapObserver和上游创建联系
//这样上游下发时,先经过MapObserver处理才下发给真正的Observer
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
....
U v;
try {
//经过Function获取到map后的数据
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch ....
//向下游下发数据
actual.onNext(v);
}
...
}
}
复制代码
能够看到map
操做符的做用就是经过将上游拦截返回ObservableMap
提供给下游订阅,并在map上游返回数据前经过mapper
将上游数据转化并下发给下游。ui
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//emmmm,是否是点眼熟
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
//ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//onSubscribe()方法执行在 订阅处所在的线程
s.onSubscribe(parent);
//将上游放入scheduler中调用,且当即执行
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
//scheduler#scheduleDirect中执行完后
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//该方法调用已经在scheduler中调用
source.subscribe(parent);
}
}
}
复制代码
由源码能够看出由scheduler.scheduleDirect
->SubscribeTask#run
->SubscribeOnObserver#subscribe(observer)
将整个调度切换到指定线程中。this
由于订阅是用下自上的,因此
subscribeOn
也老是离源最近的一个生效。由于触发源的subscribe
是离源最近一个。
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码
能够看出Rxjava的操做符套路基本是将源
Observable
经过装饰者模式封装一层再返回新的Observable
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;//默认false
final int bufferSize;//通常128
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//建立主线程调度器
Scheduler.Worker w = scheduler.createWorker();
//关联上下游,触发上游订阅过程
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
复制代码
这里能够看出ObservableObserveOn
仍是很简单的,上游订阅过程并不用关心,下游的触发则由ObserveOnObserver
处理。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
//上游数据的缓存队列
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
......
//建立对接缓存数据
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//回调下游onSubscribe
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {//执行过complete/error则done为true
return;
}
if (sourceMode != QueueDisposable.ASYNC) {//非异步数据,默认同步数据
queue.offer(t);//入队列
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;//标记已完成
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;//标记已完成
schedule();
}
@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
s.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return cancelled;
}
void schedule() {
//自旋+1,!=0则表示worker.schedule已在执行无需在调度
if (getAndIncrement() == 0) {
worker.schedule(this);//经过调度器处理,将数据取出下发到下游
}
}
@Override
public void run() {
if (outputFused) {//默认false
drainFused();
} else {
drainNormal();//取出数据下发
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (; ; ) {
//检测是否不用再处理
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (; ; ) {
boolean d = done;
T v;
try {
v = q.poll();//取出一个数据
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {//可能已经提早disposed了
return;
}
if (empty) {//数据为空队列无数据,退出下发循环
break;
}
//下发
a.onNext(v);
}
//可能有错过的schedule,再次循环检测
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
//检测是否compelte/error/队列已空
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {//已经disposed
queue.clear();
return true;
}
if (d) {//是否已结束
Throwable e = error;
if (delayError) {//延迟error,等待队列清空
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
queue.clear();
a.onError(e);//下发error
worker.dispose();
return true;
} else if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
}
复制代码
ObserveOnObserver
继承于BasicIntQueueDisposable
继承于AtomicInteger
,经过自身的原子性(自旋/CAS)来消除多线程对#schedule
的调用。
能够看出#observeOn
只对下游有影响。
由于subscribeOn()
切换线程是在subscribeActual
中切换,经过切换上游订阅过程的整个线程,从而影响发射数据的下发所在线程。因此subscribeOn()
只有最靠近源的一次生效。
而observeOn
主动切换下发过程,对下发过程产生影响,·且屡次调用屡次生效。 PS:操做符的转换效果都是在onXXX
下发过程当中实现的,因此对操做符也有做用。