本篇文章主要介绍Rxjava 2.x的一些经常使用的操做符,对Rxjava不熟悉的朋友能够先去看下我以前的两篇介绍java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
复制代码
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("This is Observer"); //经过 ObservableEmitter 发射器向观察者发送事件。
e.onComplete();
}
});
复制代码
public static <T> Observable<T> just(T item)
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
复制代码
Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "-------onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "-------onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "-------onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "-------onComplete ");
}
});
复制代码
使用just()方法建立Observable对象,Observable会将事件逐个发送segmentfault
fromArray() 这个方法和 just() 相似,只不过 fromArray 能够传入一个数组数组
fromCallable() Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值bash
fromIterable() 直接发送一个 List 集合数据给观察者并发
public static <T> Observable<T> fromArray(T... items)
Integer array[] = {1, 2, 3, 4};
Observable.fromArray(array)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "--------------onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "--------------onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "--------------onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "--------------onComplete ");
}
});
复制代码
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
Observable.fromCallable(new Callable < Integer > () {
@Override
public Integer call() throws Exception {
return 1;
}
})
.subscribe(new Consumer < Integer > () {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "--------------accept " + integer);
}
});
复制代码
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "--------------onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "--------------onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "--------------onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "--------------onComplete ");
}
});
复制代码
Observable.empty()
.subscribe(new Observer < Object > () {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "---------------------onSubscribe");
}
@Override
public void onNext(Object o) {
Log.d(TAG, "---------------------onNext");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "---------------------onError " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "==================onComplete");
}
});
复制代码
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
//将 Integer 类型的数据转换成 String。
Observable.just(1, 2, 3)
.map(new Function < Integer, String > () {
@Override
public String apply(Integer integer) throws Exception {
return integer+"rxjava";
}
})
.subscribe(new Observer < String > () {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "----------------------onSubscribe");
}
@Override
public void onNext(String s) {
Log.e(TAG, "----------------------onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "---------------------onError " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "---------------------onComplete" );
}
});
复制代码
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
复制代码
flatMap() 其实与 map() 相似,可是 flatMap() 返回的是一个 Observerable。如今用一个map()的例子和flatMap()的例子来对比说明 flatMap() 的用法。app
需求:咱们如今须要经过学校拿到院系列表,而后在每一个院系中拿到学生的信息. 传统的实现方式有不少种,我就不举例了,直接使用Rxjava实现:ide
//学校
class School{
private String name;
private List<Department> departments;
public School(){}
public School(String name, List<Department> departments) {
this.name = name;
this.departments = departments;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<Department> getDepartments() {
return departments;
}
public void setDepartments(List<Department> departments) {
this.departments = departments;
}
}
复制代码
//院系
class Department{
private String name;
private List<Student> students;
public Department(){}
public Department(String name, List<Student> students) {
this.name = name;
this.students = students;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<Student> getStudents() {
return students;
}
public void setStudents(List<Student> students) {
this.students = students;
}
}
复制代码
//学生
class Student {
private String name;
private String school;
public Student(){}
public Student(String name, String school) {
this.name = name;
this.school = school;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getSchool() {
return school;
}
public void setSchool(String school) {
this.school = school;
}
}
复制代码
使用map()方法实现:post
//使用map()实现方式
Observable.fromIterable(departments)
.map(new Function<Department, List<Student>>() {
@Override
public List<Student> apply(Department department) throws Exception {
return department.getStudents();
}
})
.subscribe(new Observer<List<Student>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Student> students) {
for (Student student : students){
Log.d("----------", student.getName()+student.getSchool() );
//若是还须要获取学生全部课程以及成绩
......................
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
复制代码
//使用flatMap()实现
Observable.fromIterable(departments)
.flatMap(new Function<Department, ObservableSource<Student>>() {
@Override
public ObservableSource<Student> apply(Department department) throws Exception {
return Observable.fromIterable(department.getStudents());
}
})
.flatMap() //若是还须要获取学生全部课程以及成绩操做
.subscribe(new Observer<Student>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Student student) {
Log.d("---------",student.getName()+student.getSchool());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
复制代码
以上代码中map()方法实现中,能够看到咱们在onNext()方法中使用了for循环.若是代码逻辑在复杂一些,就可能须要嵌套for循环来实现,那就真的迷之缩进了,而使用flatMap()方法实现,只须要实现一个flatMap()转换一下就行了,随着代码逻辑增长,代码依然清晰,这就是flatMap()的强大之处,也是不少人喜欢使用Rxjava的缘由所在.fetch
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)
Observable.fromIterable(departments)
.concatMap(new Function<Department, ObservableSource<Student>>() {
@Override
public ObservableSource<Student> apply(Department department) throws Exception {
return Observable.fromIterable(department.getStudents());
}
})
复制代码
至关于handler的延迟发送事件
handler.sendEmptyMessageDelayed(0,2000);
复制代码
public final Observable<T> delay(long delay, TimeUnit unit)
Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS) //延迟两秒再发送事件
.subscribe(new Observer < Integer > () {
@Override
public void onSubscribe(Disposable d) {
Log.d("------------onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d("------------"+integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "----------------onComplete");
}
});
复制代码
前方有坑,请集中注意力ui
Observable.doOnSubscribe()方法是在subscribe() 调用后并且在事件发送前执行。默认状况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而若是在 doOnSubscribe() 以后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io()) //在io执行上述操做
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
dialog.show(); //显示dialog
}
})
.subscribeOn(AndroidSchedulers.mainThread()) //在UI线程执行上述准备操做
.observeOn(AndroidSchedulers.mainThread())//在UI线程执行下面操做
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("----","开始了");
}
@Override
public void onNext(String s) {
Log.d("----", s);
dialog.dismiss();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d("----", "complete");
}
});
复制代码
public final Observable<T> subscribeOn(Scheduler scheduler)
复制代码
public final Observable<T> observeOn(Scheduler scheduler)
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
复制代码
以上就是Rxjava经常使用的一些操做符介绍和使用方法实例了
关于Rxjava系列二就到此结束啦,后面有时间我还会写写与retrofit2的结合使用,欢迎关注订阅!
欢迎关注做者darryrzhong,更多干货等你来拿哟.
更多精彩文章请关注