RxJava是一种为异步编程而实现的库,异步是其重要特点,合理地利用异步编程可以提升系统的处理速度。可是异步也会带来线程的安全问题,并且异步并不等于并发,与异步概念相对应的是同步。java
在默认状况下,RxJava只在当前线程中运行,它是单线程的。此时Observable用于发射数据流,Observer用于接收和响应数据流,各类操做符(Operators)用于加工数据流,它们都在同一个线程中运行,实现出来的是一个同步的函数响应式。然而,函数响应式的实际应用是大部分操做都在后台处理,前台响应的一个过程。因此须要对刚才的流程作一下修改,改为Observable生成发射数据流,Operators加工数据流在后台线程中进行,Observer在前台线程中接受并响应数据。 此时会涉及使用多线程来操做RxJava,可使用RxJava的调度器(Scheduler)来实现。数据库
Scheduler是RxJava对线程控制其的一个抽象,RxJava内置了多个Scheduler的实现,它们基本知足绝大多数使用场景,以下表:编程
Scheduler | 做用 |
---|---|
single | 使用定长为1的线程池(new Sheduled Thread Pool(1)),重复利用这个线程 |
newThread | 每次都启用新线程,并在新线程中执行操做 |
computation | 使用的固定的线程池(Fixed Scheduler Pool),大小为CPU核数,适用于CPU密集型计算 |
io | 适用I/O操做(读写文件,读写数据库,网络信息交互等)所使用的Scheduler。行为模式和newThread() 差很少,区别在于:io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,于是多数状况下,io()比newThread()更有效率 |
tranpoline | 直接在当前线程运行,若是当前线程有其余任务正在执行,则会先暂停其余任务 |
Schedulers.from | 将java.util.concurrent.Executor转换成一个调度器实例,便可以自定义一个Executor来做为调度器 |
RxJava的被观察者们在使用操做符时能够利用线程调度器——Scheduler来切换线程,例如:安全
public void SchedulersTest() {
Observable.just("aaa", "bbb")
.observeOn(Schedulers.newThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
System.out.println("Map对应的线程:" + Thread.currentThread().getName() + "\t" + Thread.currentThread().getId());
return s.toUpperCase();
}
})
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("subscribe对应的线程:" + Thread.currentThread().getName() + "\t" + Thread.currentThread().getId());
System.out.println(s);
}
});
}
复制代码
下图不一样的箭头颜色表示不一样的线程:bash
其中,蓝色表示主线程、橙色表示newThread、粉色表示I/O线程。网络
Schedulers是一个静态工厂类,经过分析Schedulers的源码能够看出他有多个不一样类型的Scheduler。下面是Schedulers的各个工厂方法:多线程
computation()用于CPU密集型的计算任务,但并不适合I/O操做。并发
/**
* Creates and returns a {@link Scheduler} intended for computational work.
*
* @return a {@link Scheduler} meant for computation-bound work
*/
@NonNull
public static Scheduler computation() {
return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}
复制代码
io()用于I/O密集型任务,支持异步阻塞I/O操做,这个调度器的线程池会根据须要增加。对于普通的计算任务,请使用Schedulers.computation()
app
/**
* Creates and returns a {@link Scheduler} intended for IO-bound work.
*
* @return a {@link Scheduler} meant for IO-bound work
*/
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
复制代码
在RxJava2中与在RxJava1中的做用不一样。在RxJava2中表示当即执行,若是当前线程有任务在执行,则会将其暂停,等插入进来的新任务执行完成以后,再接着执行原先未完成的任务。在RxJava1中,表示在当前线程中等待其余任务完成以后,再执行新的任务。异步
/**
* Creates and returns a {@link Scheduler} that queues work on the current thread to be executed after the
* current work completes.
*
* @return a {@link Scheduler} that queues work on the current thread
*/
@NonNull
public static Scheduler trampoline() {
return TRAMPOLINE;
}
复制代码
newThread()为每一个任务建立一个新线程
/**
* Creates and returns a {@link Scheduler} that creates a new {@link Thread} for each unit of work.
*
* @return a {@link Scheduler} that creates new threads
*/
@NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
复制代码
single()拥有一个线程单例,全部的任务都在这一个线程中执行。当此线程中有任务执行时,它的任务将会按照先进先出的顺序依次执行。
/**
* Returns the common, single-thread backed Scheduler instance.
*
* @return a {@link Scheduler} that shares a single backing thread.
* @since 2.0
*/
@NonNull
public static Scheduler single() {
return RxJavaPlugins.onSingleScheduler(SINGLE);
}
复制代码
除此以外,还支持自定义的Executor来做为调度器。
/**
* Converts an {@link Executor} into a new Scheduler instance.
*
* @param executor
* the executor to wrap
* @return the new Scheduler wrapping the Executor
*/
@NonNull
public static Scheduler from(@NonNull Executor executor) {
return new ExecutorScheduler(executor);
}
复制代码
以下图,Scheduler是RxJava的线程任务调度器,Worker是线程任务的具体执行者。从Scheduler源码能够看出,Scheduler在schedulerDirect()
、schedulerPeriodicallyDirect()
方法中建立了Worker,而后会分别调用worker的schedule()
、schedulePeriodically()
来执行任务。
图片来源于:RxJava 线程模型分析
/**
* Schedules the execution of the given task with the given delay amount.
*
* <p>
* This method is safe to be called from multiple threads but there are no
* ordering guarantees between tasks.
*
* @param run the task to schedule
* @param delay the delay amount, non-positive values indicate non-delayed scheduling
* @param unit the unit of measure of the delay amount
* @return the Disposable that let us one cancel this particular delayed task.
* @since 2.0
*/
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
复制代码
/**
* Schedules a periodic execution of the given task with the given initial delay and period.
*
* <p>
* This method is safe to be called from multiple threads but there are no
* ordering guarantees between tasks.
*
* <p>
* The periodic execution is at a fixed rate, that is, the first execution will be after the initial
* delay, the second after initialDelay + period, the third after initialDelay + 2 * period, and so on.
*
* @param run the task to schedule
* @param initialDelay the initial delay amount, non-positive values indicate non-delayed scheduling
* @param period the period at which the task should be re-executed
* @param unit the unit of measure of the delay amount
* @return the Disposable that let us one cancel this particular delayed task.
* @since 2.0
*/
@NonNull
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
return periodicTask;
}
复制代码
Worker也是一个抽象类,从图2中能够看到,每种Scheduler会对应一种具体的Worker。
...
/**
* Schedules a cancelable action to be executed periodically. This default implementation schedules
* recursively and waits for actions to complete (instead of potentially executing long-running actions
* concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
* <p>
* Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as
* non-delayed scheduling of the first and any subsequent executions.
*
* @param run
* the Runnable to execute periodically
* @param initialDelay
* time to wait before executing the action for the first time; non-positive values indicate
* an non-delayed schedule
* @param period
* the time interval to wait each time in between executing the action; non-positive values
* indicate no delay between repeated schedules
* @param unit
* the time unit of {@code period}
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
@NonNull
public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
final SequentialDisposable first = new SequentialDisposable();
final SequentialDisposable sd = new SequentialDisposable(first);
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
final long periodInNanoseconds = unit.toNanos(period);
final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
periodInNanoseconds), initialDelay, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
first.replace(d);
return sd;
}
...
复制代码