本篇文章已受权微信公众号 YYGeeker
独家发布转载请标明出处java
CSDN学院课程地址react
- RxJava2从入门到精通-初级篇:edu.csdn.net/course/deta…
- RxJava2从入门到精通-中级篇:edu.csdn.net/course/deta…
- RxJava2从入门到精通-进阶篇:edu.csdn.net/course/deta…
- RxJava2从入门到精通-源码分析篇:edu.csdn.net/course/deta…
自定义Operator属于RxJava的高级用法,能够本身自定义一些适用于常见应用场景的操做符。实现自定义Operator很简单,只须要实现RxJava提供的ObservableOperator
接口,实现对应的功能便可,同时,使用lift
操做符将自定义操做符应用到咱们的程序中。下面咱们使用自定义Operator,该操做符的做用是将List
集合转换成String
类型的输出编程
一、实现ObservableOperator,建立自定义Operatorbash
public class CustomOperator implements ObservableOperator<String, List<String>> {
@Override
public Observer<? super List<String>> apply(final Observer<? super String> observer) throws Exception {
return new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
observer.onSubscribe(d);
}
@Override
public void onNext(List<String> strings) {
observer.onNext(strings.toString());
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
}
复制代码
二、使用lift操做符添加自定义Operator微信
public class Main {
public static void main(String[] args) {
//建立被观察者
Observable.create(new ObservableOnSubscribe<List<String>>() {
@Override
//默认在主线程里执行该方法
public void subscribe(@NonNull ObservableEmitter<List<String>> e) throws Exception {
ArrayList<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
e.onNext(list);
e.onComplete();
}
})
.lift(new CustomOperator())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext=" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
}
复制代码
三、输出结果app
onNext=[1, 2, 3, 4]
onComplete
复制代码
自定义Transformer表示一个批量操做符的变换器,若是你在不少Observable中使用相同的一系列操做符,好比每次都要使用到map
+take
+doOnNext
等操做,那么就能够定义一个通用的Transformer对象,里面能够将须要重复用到的操做符打包成Transformer对象,使用compose操做符将Transformer对象应用到咱们的Observable上便可ide
实现自定义Transformer很简单,只须要实现RxJava提供的ObservableTransformer
接口,实现对应的功能便可,同时,使用compose
操做符将自定义Transformer应用到咱们的程序中。下面咱们使用自定义Transformer,该Transformer的做用是将发射的数据从Integer
转换成String
,并取2个数据项,同时在发射的时候监听发射事件,进行输出的打印函数
一、实现ObservableTransformer,建立自定义Transformer源码分析
public class CustomTransformer implements ObservableTransformer<Integer, String> {
@Override
public ObservableSource<String> apply(io.reactivex.Observable<Integer> upstream) {
return upstream.take(2).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "序号:" + integer + "发射成功";
}
}).doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s + ",准备发射");
}
});
}
}
复制代码
二、使用compose操做符添加自定义Transformerui
public class Main {
public static void main(String[] args) {
//建立被观察者
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
})
.compose(new CustomTransformer())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
复制代码
三、输出结果
序号:1发射成功,准备发射
序号:1发射成功
序号:2发射成功,准备发射
序号:2发射成功
复制代码
在安卓开发中,一般咱们也会自定义Transformer来实现咱们经常使用的线程切场景,具体以下
public static <T> ObservableTransformer<T, T> schedulersTransformer() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
};
}
public static <T> FlowableTransformer<T, T> schedulersTransformerForFlowable() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
};
}
复制代码
自定义Plugin表示自定义插件,自定义插件能够在RxJavaPlugins中提供的接口中去插入本身的一段代码操做,相似于面向切面编程,或者理解成Android的Hook。若是你须要在全部的订阅事件中去插入一段统一的操做,或者是监听全部订阅事件发生异常时的回调,均可以使用自定义插件。在实际应用中,目前并未发现有什么做用
实现自定义Plugin只须要调用RxJavaPlugins提供的set方法便可,下面咱们经过例子输出Observable和Observer的地址信息,来验证每次订阅的时候,回调自定义Plugin的方法中,插件对象和源对象是否为同一个对象
一、经过设置ObservableSubscribe,每次对Observable操做的时候回调
public class Main {
public static void main(String[] args) {
RxJavaPlugins.setOnObservableAssembly(new CustomObservableAssembly());//任意操做符都有回调
RxJavaPlugins.setOnObservableSubscribe(new CustomObservableSubscribe());//每次subscribe时候有回调
Observable observable = getObservable();
Observer<Integer> observer = getObserver();
System.out.println("main observable.toString:" + observable.toString());
System.out.println("main observer.toString:" + observer.toString());
observable.subscribe(observer);
}
public static Observable getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(5);
emitter.onNext(2);
emitter.onNext(3);
}
});
}
public static Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext=" + integer);
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
}
};
}
}
复制代码
二、CustomObservableAssembly
public class CustomObservableAssembly implements Function<Observable, Observable> {
@Override
public Observable apply(Observable observable) throws Exception {
System.out.println("CustomObservableAssembly observable.toString:" + observable.toString());
observable.take(2);
return observable;
}
}
复制代码
三、CustomObservableSubscribe
public class CustomObservableSubscribe implements BiFunction<Observable, Observer, Observer> {
@Override
public Observer apply(Observable observable, Observer observer) throws Exception {
System.out.println("CustomObservableSubscribe observable.toString:" + observable.toString() + ",observer.toString:" + observer.toString());
return observer;
}
}
复制代码
四、输出结果
地址相同说明是同个对象,自定义插件Hook成功
CustomObservableAssembly observable.toString:io.reactivex.internal.operators.observable.ObservableCreate@1a93a7ca
main observable.toString:io.reactivex.internal.operators.observable.ObservableCreate@1a93a7ca
main observer.toString:com.hensen.rxjavalearning.Chapter7.Chapter7o3.Main$2@3d82c5f3
CustomObservableSubscribe observable.toString:io.reactivex.internal.operators.observable.ObservableCreate@1a93a7ca,observer.toString:com.hensen.rxjavalearning.Chapter7.Chapter7o3.Main$2@3d82c5f3
onNext=5
onNext=2
onNext=3
复制代码
补充:
能够经过设置ErrorHandler,发生异常时会回调
RxJavaPlugins.setErrorHandler();
复制代码
能够经过设置SchedulerHandler来Hook到对应的schedule
RxJavaPlugins.setIoSchedulerHandler();
RxJavaPlugins.setNewThreadSchedulerHandler();
RxJavaPlugins.setComputationSchedulerHandler();
RxJavaPlugins.setSingleSchedulerHandler();
复制代码
错误演示:
因为CustomObservableAssembly是在任意操做符操做的时候都会回调,因此在回调里面是不能够对observable再进行操做符的操做,不然回调里面observable的操做符仍是会回调CustomObservableAssembly自身,致使死循环,发生StackOverflowError
public class CustomObservableAssembly implements Function<Observable, Observable> {
@Override
public Observable apply(Observable observable) throws Exception {
System.out.println("CustomObservableAssembly observable.toString:" + observable.toString());
observable.take(2);
return observable;
}
}
复制代码
因为CustomObservableSubscribe是在subscribe以后进行的回调,若是在回调里面对observable进行操做符的操做,这个时候是不会生效的,由于在subscribe以后onNext的函数是不会再处理后面新添的操做符,原理与源码有关
public class CustomObservableSubscribe implements BiFunction<Observable, Observer, Observer> {
@Override
public Observer apply(Observable observable, Observer observer) throws Exception {
System.out.println("CustomObservableSubscribe observable.toString:" + observable.toString() + ",observer.toString:" + observer.toString());
observable.take(2);
return observer;
}
}
复制代码