先贴上前面几篇的连接:
rxjava2源码解析(一)基本流程分析
rxjava2源码解析(二)线程切换分析
rxjava2源码解析(三)线程池原理分析java
上一篇说了rxjava2
的线程池原理,这篇咱们来讲说rxjava
的变换。缓存
变换和线程切换算是rxjava
最关键的两个功能。常见的变换有map()
,flatMap()
。咱们先从map
方法提及吧。bash
咱们先举一个简单的例子,来看看map
能作什么:并发
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
...
};
Observable.from(students)
.map(new Function<Student, String>() {
@Override
public String apply(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
复制代码
上面的例子是一个功能,打印一个班级里students的名字。很简单,经过from
方法对student进行遍历,一个map
方法将student变换成name,而后下游打印就完事了。咱们知道rxjava2
里面是有不少泛型设定的,若是类型错误是会直接标红。from
方法返回的下游数据类型是student,而subscriber
中接收的数据类型必须是String。很显然,这里map就将下游的数据类型进行了变换。
具体在源码中是如何实现的呢?咱们先看map
的源码:app
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
复制代码
仍是老样子,抛开判空代码和hock机制,直接看ObservableMap
类。不过在此以前,先看看map
方法里面设定的泛型。T是Observable里设定的上游数据类型,map方法会返回一个Observable,这里就将整个链条的数据类型进行了变换。异步
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
复制代码
看过前面的几篇就知道,这里仍是老套路,仍是装饰器模式,仍是建立一个内部处理器MapObserver
。内部处理器MapObserver
负责与上游绑定,因此它的处理数据类型仍为T。ObservableMap
与下游进行绑定订阅,因此ObservableMap
中数据的类型为R。咱们在看MapObserver
以前,先看看Function
是什么。ide
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
复制代码
OK,Function是一个接口,只有一个接口方法apply
。Function
规定了两个泛型:T、R。其中T是apply
的参数类型,R是返回值类型。咱们在使用过程当中,重写apply
方法进行数据类型变换,而后再用map
方法插入到整条流水线中,就达到了变换的目的。oop
下面看看MapObserver
中具体是怎么实现的:post
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
复制代码
很简单,MapObserver
的onNext
负责处理上游下来的数据,在onNext
方法中调用Function
的apply
方法,将T
变换为下游须要的U
(也就是前面的R
),而后再将数据传递下去,达到变换的目的。ui
map的使用和源码都很简单,咱们来看看flatMap
的。
仍是先用一个简单的例子来看flatMap
的用途:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Function<Student, Observable<Course>>() {
@Override
public Observable<Course> apply(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
复制代码
产品说功能要改一改,不是打印每一个student的名字,而是要打印每一个sutdent全部课程名称。正常状况下,咱们在subscriber
中获取到每一个student,而后用个for循环进行遍历打印就行,可是flatMap
能够直接一步搞定。
细心的已经发现,这里的Function
比较奇怪,它的返回值类型居然是Observable
。具体怎么回事,咱们看看源码:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
//这里的delayErrors,maxConcurrency,bufferSize都是默认值。
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码
先解释一下,delayErrors
,maxConcurrency
,bufferSize
这几个参数的意义:
delayErrors
表示异常是否须要延迟到全部内部数据都传输完毕后抛出。默认值是false
。maxConcurrency
表示最大并发数,默认值为Integer.MAX_VALUE
。bufferSize
缓存的内部被观察者事件总数大小,默认值为128.老样子,咱们直接看ObservableFlatMap
:
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
复制代码
仍是原来的配方,仍是原来的味道。咱们来看看MergeObserver
的源码一探究竟:
@Override
public void onNext(T t) {
//调用apply方法,获取到转换的Observable
ObservableSource<? extends U> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
upstream.dispose();
onError(e);
return;
}
//隐藏了一些判断代码
subscribeInner(p);
}
@SuppressWarnings("unchecked")
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
//这里会走到else
if (p instanceof Callable) {
...
} else {
//这里新建一个InnerObserver,调用addInner添加到队列中,而后用apply中生成的Observable与之订阅。
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
复制代码
如注释中所示,这里根据上游每个数据,生成一个Observable
,而后新建一个InnerObserver
,将这个InnerObserver
添加到内部处理器队列中,并将Observable
与这个InnerObserver
进行订阅。
咱们以Observable.from()
为例,看看这中间的流程是什么样的。
//from 方法返回一个ObservableFromArray装饰器
public static <T> Observable<T> fromArray(T... items) {
//省略部分判空代码
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
//ObservableFromArray源码
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
//订阅后,建立一个FromArrayDisposable内部类对象
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
//这个方法很关键,咱们待会能够看看InnerObserver的onSubscribe方法。
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
//FromArrayDisposable不是一个处理器,他只是一个带简单队列的Disposable
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> downstream;
final T[] array;
int index;
boolean fusionMode;
volatile boolean disposed;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.downstream = actual;
this.array = array;
}
// 这里显然是返回同步
@Override
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}
//poll方法会逐个返回队列中的数据
@Nullable
@Override
public T poll() {
int i = index;
T[] a = array;
if (i != a.length) {
index = i + 1;
return ObjectHelper.requireNonNull(a[i], "The array element is null");
}
return null;
}
@Override
public boolean isEmpty() {
return index == array.length;
}
@Override
public void clear() {
index = array.length;
}
@Override
public void dispose() {
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
//在run方法中,开始向下游传递数据。不过这时候已经不重要了,由于在InnerObserver的onSubscribe方法中,已经经过poll方法将队列中的数据都传递出去了。固然这仅仅是在这个示例中是这样
void run() {
T[] a = array;
int n = a.length;
//开始向下游传递数据
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The element at index " + i + " is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
}
}
复制代码
如上面注释所示,from
方法返回一个简单的ObservableFromArray
,ObservableFromArray
的subscribe
中,调用下游处理器的onSubscribe
方法,而后调用自身的run
方法。咱们看看InnerObserver
中是怎么处理的:
static final class InnerObserver<T, U> extends AtomicReference<Disposable>
implements Observer<U> {
private static final long serialVersionUID = -4606175640614850599L;
final long id;
final MergeObserver<T, U> parent;
volatile boolean done;
volatile SimpleQueue<U> queue;
int fusionMode;
//这里会用一个独特的ID来给每一个InnerObserver作标记
InnerObserver(MergeObserver<T, U> parent, long id) {
this.id = id;
this.parent = parent;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
//FromArrayDisposable知足这个条件
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<U> qd = (QueueDisposable<U>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
//由上面FromArrayDisposable的源码可知这里返回SYNC
if (m == QueueDisposable.SYNC) {
fusionMode = m;
queue = qd;
//这里直接将done设置为true,是由于下面的parent.drain()会直接取出全部数据并传递给下游
done = true;
//数据在这其中进行下发和传递
parent.drain();
return;
}
if (m == QueueDisposable.ASYNC) {
fusionMode = m;
queue = qd;
}
}
}
}
@Override
public void onNext(U t) {
if (fusionMode == QueueDisposable.NONE) {
parent.tryEmit(t, this);
} else {
//当上游执行到这里时,数据已经被传递完毕了。这里单指此次示例
parent.drain();
}
}
....
}
复制代码
具体的信息都写在上面的注释中,咱们直接来看MergeObserver
的drain()
方法。
void drain() {
//这里进行判断,确保drainLoop还在执行时不会被再次调用
if (getAndIncrement() == 0) {
drainLoop();
}
}
void drainLoop() {
//获取到下游Observer
final Observer<? super U> child = this.downstream;
int missed = 1;
for (;;) {
//判断是否有error
if (checkTerminate()) {
return;
}
...
boolean d = done;
svq = queue;
InnerObserver<?, ?>[] inner = observers.get();
int n = inner.length;
int nSources = 0;
...
int innerCompleted = 0;
if (n != 0) {
//初始lastId lastIndex都为0
long startId = lastId;
int index = lastIndex;
...
int j = index;
sourceLoop:
for (int i = 0; i < n; i++) {
//获取到当前InnerObserver
@SuppressWarnings("unchecked")
InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
//q就是FromArrayDisposable。
SimpleQueue<U> q = is.queue;
if (q != null) {
for (;;) {
U o;
try {
//在这里循环调取FromArrayDisposable队列中数据,而后传递到下游
o = q.poll();
} catch (Throwable ex) {
....
}
if (o == null) {
break;
}
child.onNext(o);
...
}
}
//前面标记过,在onSubscribe中已经将done设置为true.
boolean innerDone = is.done;
SimpleQueue<U> innerQueue = is.queue;
//因为上面已经将数据处理完毕,这里innerQueue.isEmpty()返回为true。
if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
//将该InnerObserver从队列中移除
removeInner(is);
if (checkTerminate()) {
return;
}
innerCompleted++;
}
j++;
if (j == n) {
j = 0;
}
}
lastIndex = j;
lastId = inner[j].id;
}
...
//这里与开头getAndIncrement()相呼应,确保drainLoop在执行时不会被再次调用
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
复制代码
OK,整个流程就清晰了,划重点:
flatMap()
是基础装饰器Observable
的一个方法,参数是一个Function
,只不过这个Function
中apply()
方法返回类型为一个Observable
。flatMap()
返回一个ObservableFlatMap
装饰器对象。ObservableFlatMap
被订阅后会调用subscribeActual()
方法,在此方法中,会建立一个内部类MergeObserver
对象,并将上游装饰器与之订阅。MergeObserver
在接收到上游数据后,会调用Function
中apply()
方法,将数据转换为一个Observable
,并建立一个内部InnerObserver
,将这个InnerObserver
放入队列中,而后将生成的Observable
与之订阅。InnerObserver
的onSubscribe()
方法会直接调用MergeObserver
的drain()
方法,将数据所有都直接传递给下游。从而完成整个流程。观察代码会发现,同步仅仅是flatMap
的一个简单状况,更复杂的状况在于异步。具体的你们能够去源码里研究一下,毕竟这篇的篇幅已经够长了。下一篇预告一下,咱们来看看背压。