原文首发于微信公众号:jzman-blog,欢迎关注交流!java
RxJava 是 ReactiveX 在 Java 上的开源的实现,一个用于经过使用可观察序列来进行异步编程和基于事件的程序的库,这是官网的介绍,主要关注点是异步编程和链式调用以及事件序列。react
implementation "io.reactivex.rxjava2:rxjava:2.2.3"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
复制代码
RxJava 中的几个重要概念是:观察者(Observer) 、被观察者(Observable)和事件序列,事件序列彻底由被观察者者本身控制,那么被观察者若是在须要时通知观察者呢,这就须要被观察者与观察者之间创建订阅关系。创建订阅关系后,当被观察者发生变化,观察者就能在第一时间接收被观察者的变化。android
在 RxJava2 中观察者(Observer) 的事件回调方法有四个:编程
注意:数组
//观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//解除订阅
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
//发送事件时观察者回调
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
//发送事件时观察者回调(事件序列发生异常)
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
//发送事件时观察者回调(事件序列发送完毕)
Log.i(TAG, "onComplete--->");
}
};
复制代码
//被观察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Event1");
emitter.onNext("Event2");
emitter.onComplete();
emitter.onNext("Event3");
}
});
复制代码
//创建观察者与被观察者之间的订阅关系
observable.subscribe(observer);
复制代码
上述代码的输出结果参考以下:bash
onSubscribe--->
onNext--->Event1
onNext--->Event2
onComplete--->
复制代码
显然,因为在 发送完 Event2 以后就调用了 onComplete 方法,以后发送的事件 Event3 将不会被观察者收到。微信
上面代码还能够这样写,结果是同样的,具体参考以下:并发
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Event1");
emitter.onNext("Event2");
emitter.onComplete();
emitter.onNext("Event3");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
复制代码
上面代码中使用了 Observable 的 create 方法来建立 Observable,并以此来进行相关事件的发送,为帮助理解来看一下官方的关于 create 操做符的示意图:异步
Observable 中还提供了不少的静态方法来建立 Observable,下文将会介绍这些经常使用方法。ide
使用 just 能够建立一个发送指定事件的 Observable,just 发送事件的上限 10,即最多发送 10 个事件,相较 create 在必定程度上简化了处理流程,just 重载的方法以下:
public static <T> Observable<T> just(T item) public static <T> Observable<T> just(T item1, T item2) public static <T> Observable<T> just(T item1, T item2, T item3) public static <T> Observable<T> just(T item1, T item2, T item3, T item4) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10) 复制代码
下面是 just 操做符的简单使用:
//just操做符的简单使用
Observable.just("Event1", "Event2", "Event3")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
复制代码
上述代码的输出结果以下:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
复制代码
来看一下官方的关于 just 操做符的示意图,下面是 just 发送四个事件的示意图,具体以下:
使用 from 相关的操做符能够建立发送数组(array)、集合(Iterable) 以及异步任务(future)的 Observable,可将 from 相关的操做符分为以下几类:
//数组
public static <T> Observable<T> fromArray(T... items) //集合 public static <T> Observable<T> fromIterable(Iterable<? extends T> source) //异步任务 public static <T> Observable<T> fromFuture(Future<? extends T> future) //异步任务+超时时间 public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit) //异步任务+超时时间+线程调度器 public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) //异步任务+线程调度器 public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler) //Reactive Streams中的发布者,使用方式相似create操做符,事件的发送由发布者(被观察者)自行决定 public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher) 复制代码
下面是 fromArray 的使用方式,具体以下:
//fromArray操做符的简单使用
String[] events = {"Event1", "Event2", "Event3"};
Observable.fromArray(events).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
复制代码
看一下 fromArray 的官方示意图,具体以下:
下面是 fromIterable 的使用方式,具体以下:
//fromIterable操做符的简单使用
List<String> list = new ArrayList<>();
list.add("Event1");
list.add("Event2");
list.add("Event3");
Observable.fromIterable(list).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
复制代码
看一下 fromIterable 的官方示意图,具体以下:
上述代码的输出参考以下:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
复制代码
Callable 位于 java.util.concurrent 包下,和 Runnable 相似,可是带有返回值,使用 fromCallable 发出的事件是从主线程发出的,若是不订阅则不会执行 call 里面的操做,使用 fromCallable 要注意如下几点:
下面是 fromCallable 的简单使用,参考以下:
//fromCallable操做符的简单使用
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
//其余操做...
return "call";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s+Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
复制代码
上述到执行结果以下:
onSubscribe--->
onNext--->call
onComplete--->
复制代码
看一下 fromCallable 的官方示意图,具体以下:
从上面可知 fromFuture 有四个重载方法,参数中能够指定异步任务、任务超时时间、线程调度器等,先来认识一下 Future 接口,Future 接口位于 java.util.concurrent 包下,主要做用是对 Runnable 和 Callable 的异步任务执行进行任务是否执行的判断、任务结果的获取、具体任务的取消等,而 Runnable 和 Callable 伴随着线程的执行,这就意味着使用 fromFuture 发出的事件是从非 Main 线程发出,若是执行耗时任务要记得使用 subscribeOn 切换订阅线程,下面以 FutureTask 为例来讲明 fromFuture 的使用方式。
建立一个 Callable 用来执行异步任务,参考以下:
//异步任务
private class MCallable implements Callable<String> {
@Override
public String call() throws Exception {
Log.i(TAG, "任务执行开始--->");
Thread.sleep(5000);
Log.i(TAG, "任务执行结束--->");
return "MCallable";
}
}
复制代码
而后,建立一个 FutureTask ,参考以下:
//建立FutureTask
MCallable mCallable = new MCallable();
FutureTask<String> mFutureTask = new FutureTask<>(mCallable);
复制代码
而后,使用 Thread 执行上面建立的 Future,参考以下:
//执行FutureTask
new Thread(mFutureTask).start();
复制代码
最后,使用 fromFuture 建立与之对应的 Observeable 并订阅,参考以下:
//fromFuture
Observable.fromFuture(mFutureTask)
.subscribeOn(Schedulers.io()) //切换订阅线程
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s+Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
复制代码
上述代码的只想结果以下:
任务执行开始--->
onSubscribe--->
任务执行结束--->
onNext--->MCallable
onComplete--->
复制代码
看一下 fromFuture 的官方示意图,下面的示意图是 fromFuture 方法携带一个参数 Future 的示意图,具体以下:
上面的异步任务延时 5 秒,若是使用 fromFuture 的重载方法指定超时时间为 4 秒,参考以下:
//指定超时时间为4s
Observable.fromFuture(mFutureTask,4, TimeUnit.SECONDS,Schedulers.io())
//...
复制代码
此时,因为异步任务不能在 4 秒内完成,Observer 会相应的被触发 onError 方法,执行结果参考以下:
任务执行开始--->
onSubscribe--->
onError--->java.util.concurrent.TimeoutException
任务执行结束--->
复制代码
那么如何取消这个异步任务呢,这也正是 Future 的优势所在,能够随意的取消这个任务,具体参考以下:
//异步任务的取消
public void cancelTask(View view) {
if (mFutureTask.isDone()) {
Log.i(TAG, "任务已经完成--->");
} else {
Log.i(TAG, "任务正在执行--->");
boolean cancel = mFutureTask.cancel(true);
Log.i(TAG, "任务取消是否成功--cancel->" + cancel);
Log.i(TAG, "任务取消是否成功--isCancelled->" + mFutureTask.isCancelled());
}
}
复制代码
下面是在任务执行过程当中取消任务的执行结果,参考以下:
任务执行开始--->
onSubscribe--->
任务正在执行--->
任务取消是否成功--cancel->true
任务取消是否成功--isCancelled->true
onError--->java.util.concurrent.CancellationException
复制代码
这样就取消了正在执行的异步任务,这部份内容更多的是关于 Java Future 相关的知识。
使用 defer 建立 Observable 时,只有在订阅时去才会建立 Observable 并发送相关的事件,下面是 defer 操做符的使用,参考以下:
//defer
defer = "old";
Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return Observable.just(defer);
}
});
defer = "new";
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
复制代码
上述代码的执行结果以下:
onSubscribe--->
onNext--->new
onComplete--->
复制代码
显然,最终在订阅以前 Observable 工厂又建立了最新的 Observable,onNext 中接收的数据也是最新的,为了理解 defer 操做符,来看一下官方 defer 操做符的示意图:
使用 empty 操做符能够建立一个不发生任何数据但正常终止的 Observable,参考以下:
//empty
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext--->"+o);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
复制代码
上述代码的输出结果以下:
onSubscribe--->
onComplete--->
复制代码
为了方便理解 empty 操做符的使用,来看一些 empty 操做符的官方示意图:
使用 never 操做符能够建立一个不发生任何数据也不终止的 Observable,参考以下:
//never
Observable.never().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext--->"+o);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
复制代码
上述代码的输出结果以下:
onSubscribe--->
复制代码
为了方便理解 never 操做符的使用,来看一些 never 操做符的官方示意图:
timer 操做符能够建立一个带延时的发送固定数值 0 的 Observable,还能够指定线程调度器,timer 重载方法以下:
//延时
public static Observable<Long> timer(long delay, TimeUnit unit) //延时+线程调度器 public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) 复制代码
下面是 timer 的使用方式:
//timer
Observable.timer(3, TimeUnit.SECONDS, Schedulers.io()).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Long s) {
Log.i(TAG, "onNext--->"+s);
Log.i(TAG, "当前线程--->"+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
复制代码
上述代码的执行结果以下:
onSubscribe--->
//延时3秒收到数据
onNext--->0
当前线程--->RxCachedThreadScheduler-1
onComplete--->
复制代码
为了方便理解 timer 操做符的使用,来看一些 timer 操做符的官方示意图,下面以 timer 指定延时器和线程调度器的方式为例,具体以下:
使用 interval 操做符能够建立一个能够以固定时间间隔发送整数值的一个 Observable,interval 能够指定初始延时时间、时间间隔、线程调度器等,interval 重载方法以下:
//初始延时+时间间隔
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) //初始延时+时间间隔+线程调度器 public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) //时间间隔 public static Observable<Long> interval(long period, TimeUnit unit) //时间间隔+线程调度器 public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler) 复制代码
下面是 interval 的使用方式:
//interval
Observable.interval(3,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Long aLong) {
Log.i(TAG, "onNext--->"+aLong);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
复制代码
上述代码执行后就会以每一个 3 秒持续发送值为整数的事件,执行结果以下:
onSubscribe--->
onNext--->0
onNext--->1
onNext--->2
...
复制代码
为了方便理解 interval 操做符的使用,来看一些 interval 操做符的官方示意图,下面以 interval 指定间隔时间和时间单位的方式为例,具体以下:
使用 range 操做符能够建立一个能够发送指定整数范围值的一个 Observable,range 相关的方法有两个,只是数值的范围表示不一样,两个方法声明以下:
// int
public static Observable<Integer> range(final int start, final int count) // long public static Observable<Long> rangeLong(long start, long count) 复制代码
下面是 range 的使用方式,具体参考以下:
//range
Observable.range(1,5).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext--->"+integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
复制代码
上述代码的执行结果以下:
onSubscribe--->
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onComplete--->
复制代码
为了方便理解 range 操做符的使用,来看一些 range 操做符的官方示意图:
这篇文章主要介绍了 RxJava2 相关基础知识以及 RxJava2 中建立型操做符的理解和使用。能够选择关注我的微信公众号:jzman-blog 获取最新更新,一块儿交流学习!