一、转换操做符:java
①、Map(做用:数据类型色转换)数组
//实现的功能:将Integer类型转换成String类型 Observable.create( //建立发射器 new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { //发射事件 e.onNext(0); e.onNext(1); e.onNext(2); } }) //转换 .map( //Integer类型转换成String类型 new Function<Integer, String>() { @Override public String apply(@NonNull Integer integer) throws Exception { return "Integer类型转换成String类型:"+integer; } }) //订阅 .subscribe( //建立简单的观察者 new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { System.out.println("接收到的事件:"+s); } } );运行结果:
②FlatMap(做用:将被观察者发送的事件序列进行 拆分 & 单独转换,在合并成一个新的事件序列,最后进行发送)缓存
原理:网络
一、为事件序列中每一个事件都建立一个 Observable(被观察者)app
二、将对每个 原始事件 转换后的 新的事件 都放入到对应的 Observableide
三、将新建的每个 Observable 都合并到一个 新建的、总的 Observablespa
四、新建的、总的 Observable 将合并的事件序列 发送给观察者 Observer线程
示意图:code
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
})
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
List<String> lists=new ArrayList<String>();
//把一个事件拆分红n个子事件
for (int i = 0; i < 5; i++) {
lists.add("我是事件:"+integer+",被拆分后的子事件:"+i);
}
//发射数组遍历后的数据
return Observable.fromIterable(lists);
}
})
//订阅
.subscribe(
//建立简单的观察者
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("接收到的事件:"+s);
}
}
);
运行结果:
注意:新合并生成的事件序列顺序是无序的,与旧序列发送的顺序无关。(上面的运行结果为有序纯属巧合)。server
③ConcatMap(做用:相似FlatMap,区别,它是有序的)
Observable.create( new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(0); e.onNext(1); e.onNext(2); } } ) //有序的转换符 .concatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(@NonNull Integer integer) throws Exception { List<String> lists=new ArrayList<String>(); //把事件拆分红N份 for (int i = 0; i < 5; i++) { lists.add("我是事件:"+integer+",被拆分红:"+i); } return Observable.fromIterable(lists); } }) //订阅 .subscribe( //建立观测者 new Consumer<String>() { @Override public void accept(@NonNull String s) throws Exception { System.out.println("接收到的事件:"+s); } } );运行结果:
总结:ConcatMap,新合并生成的事件序列顺序是有序的,即严格按照旧序列发送事件的顺序。而 FlatMap,新合并的事件序列是无序的。
④Buffer
做用:按期从 被观测者须要发送的事件中 取出必定数量的事件 而且 放到缓存区中,最终发送
Observable.create( new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(0); e.onNext(1); e.onNext(2); e.onNext(3); e.onNext(4); e.onNext(5); } } ) //参数:设置缓存区的大小和步长 //缓存区的大小 = 每次从被观察者中获取的事件数量 //步长 = 每次获取新事件的数量 .buffer(3,1) .subscribe( new Consumer<List<Integer>>() { @Override public void accept(@NonNull List<Integer> integers) throws Exception { System.out.println("缓存区里存放放的事件总数:"+integers.size()); for (Integer integer : integers) { System.out.println("从缓存区获取到的事件:"+integer); } } } );
二、开发实际应用:使用FlatMap嵌套网络请求(好比咱们实际开发中的,先注册,注册完成之后去作登陆操做。这样就是两步的操做,能够直接用FlatMap来实现)
//举例,为了模拟注册登陆,我只是模拟两次不一样的网络请求,而实际的注册登陆也是同一个道理:
final Observable<RegisterEntity> observableForRegister = RetrofitFactory.getRetrofit().create(NetApi.class).doRegister(); final Observable<LoginEntity> observableForLogin = RetrofitFactory.getRetrofit().create(NetApi.class).doLogin(); observableForRegister .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) //先发起注册请求 .doOnNext(new Consumer<RegisterEntity>() { @Override public void accept(@NonNull RegisterEntity registerEntity) throws Exception { System.out.println("第一次网络请求:" + "注册成功"); System.out.println("注册返回的结果:" + registerEntity.toString()); } }) //切换到io线程进行登陆请求 .subscribeOn(Schedulers.io()) //切换到主线程处理登陆请求返回的结果 .observeOn(AndroidSchedulers.mainThread()) .flatMap(new Function<RegisterEntity, ObservableSource<LoginEntity>>() { @Override public ObservableSource<LoginEntity> apply(@NonNull RegisterEntity registerEntity) throws Exception { //将注册请求转换成登陆请求,即发起第二次网络请求(即登陆请求) return observableForLogin; } }) .subscribe( //建立观察者 new Consumer<LoginEntity>() { @Override public void accept(@NonNull LoginEntity loginEntity) throws Exception { System.out.println("第二次网络请求:" + "登陆成功"); System.out.println("登陆成功还回的结果:" + loginEntity.toString()); } } , new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { System.out.println("登陆失败" + throwable); } } );