Rxjava 2.x 源码系列 - 线程切换 (下)bash
Rxjava 2.x 源码系列 - 变换操做符 Map(上)并发
在前几篇博客中,咱们介绍了 Rxjava Observable 与 Observer 之间是如何订阅与取消订阅的,以及 Rxjava 是如何控制 subsribe 线程和 observer 的回调线程的。app
今天,让咱们一块儿来看一下 Rxjava 中另一个比较重要的功能,操做符变化功能框架
操做符 | 做用 |
---|---|
map | 映射,将一种类型的数据流/Observable映射为另一种类型的数据流/Observable |
cast | 强转 传入一个class,对Observable的类型进行强转. |
flatMap | 平铺映射,从数据流的每一个数据元素中映射出多个数据,并将这些数据依次发射。(注意是无序的) |
concatMap | concatMap 与 flatMap 的功能很是相似,只不过发送的数据是有序的 |
buffer | 缓存/打包 按照必定规则从Observable收集一些数据到一个集合,而后把这些数据做为集合打包发射。 |
groupby | 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每个Observable发射一组不一样的数据 |
to... | 将数据流中的对象转换为List/SortedList/Map/MultiMap集合对象,并打包发射 |
timeInterval | 将每一个数据都换为包含本次数据和离上次发射数据时间间隔的对象并发射 |
timestamp | 将每一个数据都转换为包含本次数据和发射数据时的时间戳的对象并发射 |
接下来,咱们一块儿来看一下一个 demo,咱们经过 map 操做符将 Integer 转化为 String。ide
// 采用RxJava基于事件流的链式操做
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 被观察者发送事件 = 参数为整型 = 一、二、3
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
// 2. 使用Map变换操做符中的Function函数对被观察者发送的事件进行统一变换:整型变换成字符串类型
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "使用 Map变换操做符 将事件" + integer +"的参数从 整型"+integer + " 变换成 字符串类型" + integer ;
}
// 3. 观察者接收事件时,是接收到变换后的事件 = 字符串类型
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
复制代码
输出结果函数
使用 Map变换操做符 将事件1的参数从 整型1 变换成 字符串类型1
使用 Map变换操做符 将事件2的参数从 整型2 变换成 字符串类型2
使用 Map变换操做符 将事件3的参数从 整型3 变换成 字符串类型3
复制代码
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
复制代码
接下来,咱们一块儿来看一下 ObservableMap。源码分析
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
}
复制代码
在前面博客中,咱们已经说到,当咱们调用 observable.subscribe(observer) 的时候,代码调用逻辑是这样的。ui
在 observable 的 subscribeActual 方法中
在 ObservableMap 的 subscribeActual 方法里面,MapObserver 类对 Observer 进行包装,又是这样的套路,装饰者模式。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
// 1 判断是否 done,若是已经 done ,直接返回
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
// 2 调用 mapper.apply(t) ,进行相应的转化
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 3 调用下游的 onNext 方法,并将 V 暴露出去
actual.onNext(v);
}
----
}
复制代码
首先看他的构造方法,有两个参数, actual,mapper。 actual 表明下游的 Observer,mapper 为传入的 Function。
接着咱们来看下 onNext 方法
这样就完成了操做符的操做功能
OK,咱们在回到上面的 demo,来整理一下他的流程
当咱们调用 observable.subscribe(observer) 的时候
扫一扫,欢迎关注个人公众号。若是你有好的文章,也欢迎你的投稿。